__version__ = (1, 2, 14) # ▄▀█ █▄ █ █▀█ █▄ █ █▀█ ▀▀█ █▀█ █ █ █▀ # █▀█ █ ▀█ █▄█ █ ▀█ ▀▀█ █ ▀▀█ ▀▀█ ▄█ # # © Copyright 2024 # # developed by @anon97945 # # https://t.me/apodiktum_modules # https://github.com/anon97945 # # 🔒 Licensed under the GNU GPLv3 # 🌐 https://www.gnu.org/licenses/gpl-3.0.html # meta developer: @apodiktum_modules # meta banner: https://t.me/apodiktum_dumpster/11 # meta pic: https://t.me/apodiktum_dumpster/13 # scope: hikka_only # scope: hikka_min 1.3.3 import asyncio import contextlib import logging import time from telethon.tl.types import Channel, Chat, Message, User, MessageEntityCustomEmoji from .. import loader, utils logger = logging.getLogger(__name__) @loader.tds class ApodiktumAdminToolsMod(loader.Module): """ Toolpack for Channel and Group Admins. """ strings = { "name": "Apo-AdminTools", "developer": "@anon97945", "_cfg_cst_auto_migrate": "Wheather to auto migrate defined changes on startup.", "_cfg_doc_admin_tag_chats": "React to @admin in given chats.", "_cfg_doc_ignore_admins": "Wheather to ignore tags from admins.", "_cfg_doc_whitelist": ( "Whether the `admin_tag_chats`-list is for included(True) or" " excluded(False) chats." ), "admin_tag": "The User {} asked for help.\n{}", "admin_tag_reply": "\n\nThe corresponding message from {} is:", "admin_tag_reply_msg": "Thanks, the owner of this Bot got informed.", "bce": "BlockCustomEmojis", "bce_triggered": "{}, you can't use custom emojis in this chat.", "bcu": "BlockChannelUser", "bcu_triggered": "{}, you can't write as a channel in this chat.", "bdl": "BlockDoubleLinks", "bdl_triggered": "{}, you need to wait before you can send this link again.", "bf": "BlockFlood", "bf_triggered": "{}, floodlimit exceeded.", "bgs": "BlockGifSpam", "bgs_triggered": "{}, you need to wait before you can send more gifs.", "bnc": "BlockNonComment", "bnc_triggered": "{}, you can only write comments in this chat.", "bnd": "BlockNonDiscussion", "bnd_triggered": ( "{}, the comments are limited to discussiongroup members, " "please join our discussiongroup first." "\n\n👉🏻 {}\n\nRespectfully, the admins." ), "bss": "BlockStickerSpam", "bss_triggered": "{}, you need to wait before you can send more stickers.", "error": "Your command was wrong.", "gl": "GroupLogger", "no_id": "Your input was no TG ID.", "no_int": "Your input was no Integer.", "not_dc": "This is no Groupchat.", "permerror": "You have no delete permissions in this chat.", "prot_db_string": ( "[{}] Current Database:\n\nWatcher:\n{}" "\n\nChatsettings:\n{}" ), "prot_settings": ( "[{}] Current settings in this chat are:\n{}" ), "prot_start": "[{}] Activated in this chat.", "prot_stopped": "[{}] Deactivated in this chat.", "prot_turned_off": "[{}] The module is now turned off in all chats.", "refresh_chat": "[AdminTools] Chat cache refreshed.", } strings_en = {} strings_de = { "_cfg_cst_auto_migrate": ( "Ob definierte Änderungen beim Start automatisch migriert werden sollen." ), "_cfg_doc_admin_tag_chats": "Reagieren Sie in bestimmten Chats auf @admin.", "_cls_doc": "Toolpack für Kanal- und Gruppenadministratoren.", "admin_tag": "Der Benutzer {} hat um Hilfe gebeten.\n{}", "admin_tag_reply": "\n\nDie entsprechende Nachricht von {} ist:", "admin_tag_reply_msg": "Danke, der Besitzer dieses Bots wurde informiert.", "bce_triggered": "{}, du kannst in diesem Chat keine custom emojis senden.", "bcu_triggered": "{}, du kannst in diesem Chat nicht als Kanal schreiben.", "bnc_triggered": "{}, du kannst in diesem Chat nur Kommentare schreiben.", "bdl_triggered": ( "{}, der Link wurde bereits gesendet. Du musst warten bis er erneut" " gesendet werden kann." ), "bf_triggered": "{}, floodlimit überschritten.", "bgs_triggered": "{}, du musst warten bis du weitere Gifs senden kannst.", "bnd_triggered": ( "{}, die Kommentarfunktion wurde auf die Chatmitglieder begrenzt, " "tritt bitte zuerst unserem Chat bei." "\n\n👉🏻 {}\n\nHochachtungsvoll, die Obrigkeit." ), "bss_triggered": "{}, du musst warten bis du weitere Sticker senden kannst.", "error": "Dein Befehl war falsch.", "no_id": "Ihre Eingabe war keine TG ID.", "no_int": "Ihre Eingabe war keine Integer.", "not_dc": "Dies ist kein Gruppenchat.", "permerror": "Sie haben in diesem Chat keine Löschberechtigung.", "prot_db_string": ( "[{} - Settings] Aktuelle" " Datenbank:\n\nWatcher:\n{}\n\nChateinstellungen:\n{}" ), "prot_settings": ( "[{} - Settings] Aktuelle Einstellungen in" " diesem Chat:\n{}" ), "prot_start": "[{}] In diesem Chat aktiviert.", "prot_stopped": "[{}] Der Chat wurde aus der Liste entfernt.", "prot_turned_off": "[{}] In allen Chats ausgeschaltet.", "refresh_chat": "[AdminTools] Der Chat Cache wurde aktualisiert.", } strings_ru = { "_cls_doc": "Пакет инструментов для администраторов каналов и групп.", "_cmd_doc_bcu": ( " \n  - Переключает" " BlockChannelUser для текущего чата.\n.bcu notify \n" "  - Переключает уведомление.\n.bcu ban" " \n  - Банит канал.\n.bcu deltimer" " <секунды/или 0>\n  - Удаляет уведомление в" " считанные секунды. 0, чтобы отключить.\n.bcu settings\n" "  - Показывает текущую конфигурацию чата.\n.bcu" " db\n  - Показывает текущую базу данных.\n.bcu" " clearall\n  - Очищает базу данных от" " BlockChannelUser.\n" ), "_cmd_doc_bnd": ( " \n  - Переключает" " BlockNonDiscussion для текущего чата.\n.bnd notify \n" "  - Переключает уведомление.\n.bnd mute" " <минут/или 0>\n  - Заглушает пользователя на Х" " минут. 0 чтобы отключить.\n.bnd deltimer <секунды/или 0>\n" "  - Удаляет уведомление в считанные секунды. 0" " чтобы отключить.\n.bnd settings\n  -" " Показывает текущую конфигурацию чата.\n.bnd db\n" "  - Показывает текущую базу данных.\n.bnd" " clearall\n  - Очищает базу данных от" " BlockNonDiscussion.\n" ), "_cmd_doc_gl": ( " \n  -" " Регистрирует чат логирования для выбранного канала.\n.gl rem \n" "  - Удаляет данный чат из наблюдателя.\n.gl" " db\n  - Показывает текущую базу данных.\n.gl" " settings\n  - Показывает текущую конфигурацию" " чата.\n.gl clearall\n  - Очищает базу данных" " от Group/Channel Logger.\n" ), "_cmd_doc_resfresh_chat": "Обновляет кэш чата в текущем чате.", "admin_tag": "Пользователь {} просит помощи.\n{}", "admin_tag_reply": "\n\nПересылаемое сообщение от\n{}:", "admin_tag_reply_msg": "Спасибо, владелец этого бота был проинформирован.", "bcu_triggered": "{}, ты не можешь писать тут от имени канала.", "bce_triggered": "{}, ты не можешь использовать кастомные эмодзи в этом чате.", "bnc_triggered": "{}, ты можешь писать только комментарии в этом чате.", "bnd_triggered": ( "{}, комментарии ограничены для участников группы обсуждения, " "Пожалуйста, для начала присоединитесь к нашей группе обсуждения." "\n\n👉🏻 {}\n\nС уважением, администраторы." ), "error": "Неверная команда", "no_id": "Ты ввёл не телеграм айди.", "no_int": "Введенное значение не является целым числом (int)", "not_dc": "Это не групповой чат", "permerror": "У вас недосточно прав для удаление сообщений в этом чате", "prot_db_string": ( "[{}] Текущая база" " данных:\n\nНаблюдающий:\n{}\n\nНастройки" " чата:\n{}" ), "prot_settings": "[{}] Текущие настройки в этом чате:\n{}", "prot_start": "[{}] Активировано в этом чате", "prot_stopped": "[{}] Деактивировано в этом чате", "prot_turned_off": "[{}] Теперь этот модуль выключен во всех чатах", "refresh_chat": "[AdminTools] Кэш чата обновлен.", } all_strings = { "strings": strings, "strings_en": strings, "strings_de": strings_de, "strings_ru": strings_ru, } changes = { "migration1": { "name": { "old": "Apo AdminTools", "new": "Apo-AdminTools", }, }, } def __init__(self): self._ratelimit = [] self.config = loader.ModuleConfig( loader.ConfigValue( "admin_tag", ["@admin"], doc=lambda: self.strings("_cfg_doc_admin_cst_tag"), validator=loader.validators.Series( loader.validators.String(), ), ), loader.ConfigValue( "admin_tag_chats", doc=lambda: self.strings("_cfg_doc_admin_tag_chats"), validator=loader.validators.Series( loader.validators.TelegramID(), ), ), loader.ConfigValue( "ignore_admins", True, doc=lambda: self.strings("_cfg_doc_ignore_admins"), validator=loader.validators.Boolean(), ), loader.ConfigValue( "tag_whitelist", False, doc=lambda: self.strings("_cfg_doc_whitelist"), validator=loader.validators.Boolean(), ), loader.ConfigValue( "auto_migrate", True, doc=lambda: self.strings("_cfg_cst_auto_migrate"), validator=loader.validators.Boolean(), ), # for MigratorClass ) async def client_ready(self): self._classname = self.__class__.__name__ self.apo_lib = await self.import_lib( "https://raw.githubusercontent.com/anon97945/hikka-libs/master/apodiktum_library.py", suspend_on_error=True, ) await self.apo_lib.migrator.auto_migrate_handler( self.__class__.__name__, self.strings("name"), self.changes, self.config["auto_migrate"], ) self.apo_lib.watcher_q.register(self.__class__.__name__, "q_watcher_logger") self.apo_lib.watcher_q.register(self.__class__.__name__, "q_watcher_protection") self._db_migrator() self._ratelimit_p_count = {"bdl": {}, "bf": {}, "bgs": {}, "bss": {}} self._ratelimit_notify = { "bce": {}, "bcu": {}, "bdl": {}, "bf": {}, "bgs": {}, "bnc": {}, "bnd": {}, "bss": {}, } self._msg_handler = {} async def on_unload(self): self.apo_lib.watcher_q.unregister(self.__class__.__name__, "q_watcher_logger") self.apo_lib.watcher_q.unregister( self.__class__.__name__, "q_watcher_protection" ) async def cadmintoolscmd(self, message: Message): """ Open the config for the module. """ name = self.strings("name") await self.allmodules.commands["config"]( await utils.answer(message, f"{self.get_prefix()}config {name}") ) async def refresh_chatcmd(self, message: Message): """ Refresh the chat cache in the current chat. """ chat_id = utils.get_chat_id(message) await self._client.get_fullchannel(chat_id, force=True) return await utils.answer( message, self.apo_lib.utils.get_str("refresh_chat", self.all_strings, message), ) async def bndcmd(self, message: Message): """   - Toggles BlockNonDiscussion for the current chat. .bnd notify  - Toggles the notification message. .bnd mute  - Mutes the user for x minutes. 0 to disable. .bnd deltimer  - Deletes the notification message in seconds. 0 to disable. .bnd settings  - Shows the current configuration of the chat. .bnd db  - Shows the current database. .bnd clearall  - Clears the db of BlockNonDiscussion. """ bnd = self._db.get(self._classname, "bnd", []) sets = self._db.get(self._classname, "bnd_sets", {}) args = utils.get_args_raw(message).lower() args = str(args).split() chat = await message.get_chat() chat_id_str = str(chat.id) if args and args[0] == "clearall": self._db.set(self._classname, "bnd", []) self._db.set(self._classname, "bnd_sets", {}) return await utils.answer( message, self.apo_lib.utils.get_str( "prot_turned_off", self.all_strings, message ).format(self.apo_lib.utils.get_str("bnd", self.all_strings, message)), ) if args and args[0] == "db": return await utils.answer( message, self.apo_lib.utils.get_str( "prot_db_string", self.all_strings, message ).format( self.apo_lib.utils.get_str("bnd", self.all_strings, message), bnd, sets, ), ) if message.is_private: await utils.answer( message, self.apo_lib.utils.get_str("not_dc", self.all_strings, message), ) return if ( (chat.admin_rights or chat.creator) and not chat.admin_rights.delete_messages or not chat.admin_rights and not chat.creator ) and (args or chat_id_str not in bnd): return await utils.answer( message, self.apo_lib.utils.get_str("permerror", self.all_strings, message), ) if not args: if chat_id_str not in bnd: bnd.append(chat_id_str) sets.setdefault(chat_id_str, {}) sets[chat_id_str].setdefault("notify", True) sets[chat_id_str].setdefault("mute", 1) sets[chat_id_str].setdefault("deltimer", 60) self._db.set(self._classname, "bnd", bnd) self._db.set(self._classname, "bnd_sets", sets) return await utils.answer( message, self.apo_lib.utils.get_str( "prot_start", self.all_strings, message ).format( self.apo_lib.utils.get_str("bnd", self.all_strings, message) ), ) bnd.remove(chat_id_str) self._db.set(self._classname, "bnd", bnd) return await utils.answer( message, self.apo_lib.utils.get_str( "prot_stopped", self.all_strings, message ).format(self.apo_lib.utils.get_str("bnd", self.all_strings, message)), ) if chat_id_str in bnd: if args[0] == "notify" and args[1] is not None: if not isinstance(self.apo_lib.utils.validate_boolean(args[1]), bool): return await utils.answer( message, self.apo_lib.utils.get_str("error", self.all_strings, message), ) sets[chat_id_str].update( {"notify": self.apo_lib.utils.validate_boolean(args[1])} ) elif args[0] == "mute" and args[1] is not None and chat_id_str in bnd: if not self.apo_lib.utils.validate_integer(args[1]): return await utils.answer( message, self.apo_lib.utils.get_str("no_int", self.all_strings, message), ) sets[chat_id_str].update({"mute": int(args[1])}) elif args[0] == "deltimer" and args[1] is not None and chat_id_str in bnd: if not self.apo_lib.utils.validate_integer(args[1]): return await utils.answer( message, self.apo_lib.utils.get_str("no_int", self.all_strings, message), ) sets[chat_id_str].update({"deltimer": int(args[1])}) elif args[0] != "settings" and chat_id_str in bnd: return self._db.set(self._classname, "bnd", bnd) self._db.set(self._classname, "bnd_sets", sets) return await utils.answer( message, self.apo_lib.utils.get_str( "prot_settings", self.all_strings, message ).format( self.apo_lib.utils.get_str("bnd", self.all_strings, message), sets[chat_id_str], ), ) async def bfcmd(self, message: Message): """   - Toggles BlockFlood for the current chat. .bf notify  - Toggles the notification message. .bf mute  - Mutes the user for x minutes. 0 to disable. .bf deltimer  - Deletes the notification message in seconds. 0 to disable. .bf settings  - Shows the current configuration of the chat. .bf db  - Shows the current database. .bf clearall  - Clears the db of BlockNonDiscussion. """ bf = self._db.get(self._classname, "bf", []) sets = self._db.get(self._classname, "bf_sets", {}) args = utils.get_args_raw(message).lower() args = str(args).split() chat = await message.get_chat() chat_id_str = str(chat.id) if args and args[0] == "clearall": self._db.set(self._classname, "bf", []) self._db.set(self._classname, "bf_sets", {}) return await utils.answer( message, self.apo_lib.utils.get_str( "prot_turned_off", self.all_strings, message ).format(self.apo_lib.utils.get_str("bf", self.all_strings, message)), ) if args and args[0] == "db": return await utils.answer( message, self.apo_lib.utils.get_str( "prot_db_string", self.all_strings, message ).format( self.apo_lib.utils.get_str("bf", self.all_strings, message), bf, sets, ), ) if message.is_private: await utils.answer( message, self.apo_lib.utils.get_str("not_dc", self.all_strings, message), ) return if ( (chat.admin_rights or chat.creator) and not chat.admin_rights.delete_messages or not chat.admin_rights and not chat.creator ) and (args or chat_id_str not in bf): return await utils.answer( message, self.apo_lib.utils.get_str("permerror", self.all_strings, message), ) if not args: if chat_id_str not in bf: bf.append(chat_id_str) sets.setdefault(chat_id_str, {}) sets[chat_id_str].setdefault("notify", True) sets[chat_id_str].setdefault("mute", 5) sets[chat_id_str].setdefault("deltimer", 60) sets[chat_id_str].setdefault("limit", 8) self._db.set(self._classname, "bf", bf) self._db.set(self._classname, "bf_sets", sets) return await utils.answer( message, self.apo_lib.utils.get_str( "prot_start", self.all_strings, message ).format( self.apo_lib.utils.get_str("bf", self.all_strings, message) ), ) bf.remove(chat_id_str) self._db.set(self._classname, "bf", bf) return await utils.answer( message, self.apo_lib.utils.get_str( "prot_stopped", self.all_strings, message ).format(self.apo_lib.utils.get_str("bf", self.all_strings, message)), ) if chat_id_str in bf: if args[0] == "notify" and args[1] is not None: if not isinstance(self.apo_lib.utils.validate_boolean(args[1]), bool): return await utils.answer( message, self.apo_lib.utils.get_str("error", self.all_strings, message), ) sets[chat_id_str].update( {"notify": self.apo_lib.utils.validate_boolean(args[1])} ) elif args[0] == "mute" and args[1] is not None and chat_id_str in bf: if not self.apo_lib.utils.validate_integer(args[1]): return await utils.answer( message, self.apo_lib.utils.get_str("no_int", self.all_strings, message), ) sets[chat_id_str].update({"mute": int(args[1])}) elif args[0] == "limit" and args[1] is not None and chat_id_str in bf: if not self.apo_lib.utils.validate_integer(args[1]): return await utils.answer( message, self.apo_lib.utils.get_str("no_int", self.all_strings, message), ) sets[chat_id_str].update({"limit": int(args[1])}) elif args[0] == "deltimer" and args[1] is not None and chat_id_str in bf: if not self.apo_lib.utils.validate_integer(args[1]): return await utils.answer( message, self.apo_lib.utils.get_str("no_int", self.all_strings, message), ) sets[chat_id_str].update({"deltimer": int(args[1])}) elif args[0] != "settings" and chat_id_str in bf: return self._db.set(self._classname, "bf", bf) self._db.set(self._classname, "bf_sets", sets) return await utils.answer( message, self.apo_lib.utils.get_str( "prot_settings", self.all_strings, message ).format( self.apo_lib.utils.get_str("bf", self.all_strings, message), sets[chat_id_str], ), ) async def bcucmd(self, message: Message): """   - Toggles BlockChannelUser for the current chat. .bcu notify  - Toggles the notification message. .bcu ban  - Bans the channel. .bcu deltimer  - Deletes the notification message in seconds. 0 to disable. .bcu settings  - Shows the current configuration of the chat. .bcu db  - Shows the current database. .bcu clearall  - Clears the db of BlockChannelUser. """ bcu = self._db.get(self._classname, "bcu", []) sets = self._db.get(self._classname, "bcu_sets", {}) args = utils.get_args_raw(message).lower().split() chat = await message.get_chat() chat_id_str = str(chat.id) if args and args[0] == "clearall": self._db.set(self._classname, "bcu", []) self._db.set(self._classname, "bcu_sets", {}) return await utils.answer( message, self.apo_lib.utils.get_str( "prot_turned_off", self.all_strings, message ).format(self.apo_lib.utils.get_str("bcu", self.all_strings, message)), ) if args and args[0] == "db": return await utils.answer( message, self.apo_lib.utils.get_str( "prot_db_string", self.all_strings, message ).format( self.apo_lib.utils.get_str("bcu", self.all_strings, message), bcu, sets, ), ) if message.is_private: await utils.answer( message, self.apo_lib.utils.get_str("not_dc", self.all_strings, message), ) return if ( (chat.admin_rights or chat.creator) and not chat.admin_rights.delete_messages or not chat.admin_rights and not chat.creator ) and (args or chat_id_str not in bcu): return await utils.answer( message, self.apo_lib.utils.get_str("permerror", self.all_strings, message), ) if not args: if chat_id_str not in bcu: bcu.append(chat_id_str) sets.setdefault(chat_id_str, {}) sets[chat_id_str].setdefault("notify", True) sets[chat_id_str].setdefault("ban", True) sets[chat_id_str].setdefault("deltimer", 60) self._db.set(self._classname, "bcu", bcu) self._db.set(self._classname, "bcu_sets", sets) return await utils.answer( message, self.apo_lib.utils.get_str( "prot_start", self.all_strings, message ).format( self.apo_lib.utils.get_str("bcu", self.all_strings, message) ), ) bcu.remove(chat_id_str) self._db.set(self._classname, "bcu", bcu) return await utils.answer( message, self.apo_lib.utils.get_str( "prot_stopped", self.all_strings, message ).format(self.apo_lib.utils.get_str("bcu", self.all_strings, message)), ) if chat_id_str in bcu: if args[0] == "notify" and args[1] is not None: if not isinstance(self.apo_lib.utils.validate_boolean(args[1]), bool): return await utils.answer( message, self.apo_lib.utils.get_str("error", self.all_strings, message), ) sets[chat_id_str].update( {"notify": self.apo_lib.utils.validate_boolean(args[1])} ) elif args[0] == "ban" and args[1] is not None and chat_id_str in bcu: if not isinstance(self.apo_lib.utils.validate_boolean(args[1]), bool): return await utils.answer( message, self.apo_lib.utils.get_str("no_int", self.all_strings, message), ) sets[chat_id_str].update( {"ban": self.apo_lib.utils.validate_boolean(args[1])} ) elif args[0] == "deltimer" and args[1] is not None and chat_id_str in bcu: if not self.apo_lib.utils.validate_integer(args[1]): return await utils.answer( message, self.apo_lib.utils.get_str("no_int", self.all_strings, message), ) sets[chat_id_str].update({"deltimer": int(args[1])}) elif args[0] != "settings" and chat_id_str in bcu: return self._db.set(self._classname, "bcu", bcu) self._db.set(self._classname, "bcu_sets", sets) return await utils.answer( message, self.apo_lib.utils.get_str( "prot_settings", self.all_strings, message ).format( self.apo_lib.utils.get_str("bcu", self.all_strings, message), sets[chat_id_str], ), ) async def bnccmd(self, message: Message): """   - Toggles BlockNonComment for the current chat. .bnc notify  - Toggles the notification message. .bnc mute  - Mutes the user for x minutes. 0 to disable. .bnc deltimer  - Deletes the notification message in seconds. 0 to disable. .bnc settings  - Shows the current configuration of the chat. .bnc db  - Shows the current database. .bnc clearall  - Clears the db of BlockNonComment. """ bnc = self._db.get(self._classname, "bnc", []) sets = self._db.get(self._classname, "bnc_sets", {}) args = utils.get_args_raw(message).lower() args = str(args).split() chat = await message.get_chat() chat_id_str = str(chat.id) if args and args[0] == "clearall": self._db.set(self._classname, "bnc", []) self._db.set(self._classname, "bnc_sets", {}) return await utils.answer( message, self.apo_lib.utils.get_str( "prot_turned_off", self.all_strings, message ).format(self.apo_lib.utils.get_str("bnc", self.all_strings, message)), ) if args and args[0] == "db": return await utils.answer( message, self.apo_lib.utils.get_str( "prot_db_string", self.all_strings, message ).format( self.apo_lib.utils.get_str("bnc", self.all_strings, message), bnc, sets, ), ) if message.is_private: await utils.answer( message, self.apo_lib.utils.get_str("not_dc", self.all_strings, message), ) return if ( (chat.admin_rights or chat.creator) and not chat.admin_rights.delete_messages or not chat.admin_rights and not chat.creator ) and (args or chat_id_str not in bnc): return await utils.answer( message, self.apo_lib.utils.get_str("permerror", self.all_strings, message), ) if not args: if chat_id_str not in bnc: bnc.append(chat_id_str) sets.setdefault(chat_id_str, {}) sets[chat_id_str].setdefault("notify", True) sets[chat_id_str].setdefault("mute", 1) sets[chat_id_str].setdefault("deltimer", 60) self._db.set(self._classname, "bnc", bnc) self._db.set(self._classname, "bnc_sets", sets) return await utils.answer( message, self.apo_lib.utils.get_str( "prot_start", self.all_strings, message ).format( self.apo_lib.utils.get_str("bnc", self.all_strings, message) ), ) bnc.remove(chat_id_str) self._db.set(self._classname, "bnc", bnc) return await utils.answer( message, self.apo_lib.utils.get_str( "prot_stopped", self.all_strings, message ).format(self.apo_lib.utils.get_str("bnc", self.all_strings, message)), ) if chat_id_str in bnc: if args[0] == "notify" and args[1] is not None: if not isinstance(self.apo_lib.utils.validate_boolean(args[1]), bool): return await utils.answer( message, self.apo_lib.utils.get_str("error", self.all_strings, message), ) sets[chat_id_str].update( {"notify": self.apo_lib.utils.validate_boolean(args[1])} ) elif args[0] == "mute" and args[1] is not None and chat_id_str in bnc: if not self.apo_lib.utils.validate_integer(args[1]): return await utils.answer( message, self.apo_lib.utils.get_str("no_int", self.all_strings, message), ) sets[chat_id_str].update({"mute": int(args[1])}) elif args[0] == "deltimer" and args[1] is not None and chat_id_str in bnc: if not self.apo_lib.utils.validate_integer(args[1]): return await utils.answer( message, self.apo_lib.utils.get_str("no_int", self.all_strings, message), ) sets[chat_id_str].update({"deltimer": int(args[1])}) elif args[0] != "settings" and chat_id_str in bnc: return self._db.set(self._classname, "bnc", bnc) self._db.set(self._classname, "bnc_sets", sets) return await utils.answer( message, self.apo_lib.utils.get_str( "prot_settings", self.all_strings, message ).format( self.apo_lib.utils.get_str("bnc", self.all_strings, message), sets[chat_id_str], ), ) async def bdlcmd(self, message: Message): """   - Toggles BlockDoubleLinks for the current chat. .bdl timeout - Sets the timeout for the double links. .bdl notify  - Toggles the notification message. .bdl deltimer  - Deletes the notification message in seconds. 0 to disable. .bdl settings  - Shows the current configuration of the chat. .bdl db  - Shows the current database. .bdl clearall  - Clears the db of BlockChannelUser. """ bdl = self._db.get(self._classname, "bdl", []) sets = self._db.get(self._classname, "bdl_sets", {}) args = utils.get_args_raw(message).lower().split() chat = await message.get_chat() chat_id_str = str(chat.id) if args and args[0] == "clearall": self._db.set(self._classname, "bdl", []) self._db.set(self._classname, "bdl_sets", {}) return await utils.answer( message, self.apo_lib.utils.get_str( "prot_turned_off", self.all_strings, message ).format(self.apo_lib.utils.get_str("bdl", self.all_strings, message)), ) if args and args[0] == "db": return await utils.answer( message, self.apo_lib.utils.get_str( "prot_db_string", self.all_strings, message ).format( self.apo_lib.utils.get_str("bdl", self.all_strings, message), bdl, sets, ), ) if message.is_private: await utils.answer( message, self.apo_lib.utils.get_str("not_dc", self.all_strings, message), ) return if ( (chat.admin_rights or chat.creator) and not chat.admin_rights.delete_messages or not chat.admin_rights and not chat.creator ) and (args or chat_id_str not in bdl): return await utils.answer( message, self.apo_lib.utils.get_str("permerror", self.all_strings, message), ) if not args: if chat_id_str not in bdl: bdl.append(chat_id_str) sets.setdefault(chat_id_str, {}) sets[chat_id_str].setdefault("notify", True) sets[chat_id_str].setdefault("timeout", 3600) sets[chat_id_str].setdefault("deltimer", 60) self._db.set(self._classname, "bdl", bdl) self._db.set(self._classname, "bdl_sets", sets) return await utils.answer( message, self.apo_lib.utils.get_str( "prot_start", self.all_strings, message ).format( self.apo_lib.utils.get_str("bdl", self.all_strings, message) ), ) bdl.remove(chat_id_str) self._db.set(self._classname, "bdl", bdl) return await utils.answer( message, self.apo_lib.utils.get_str( "prot_stopped", self.all_strings, message ).format(self.apo_lib.utils.get_str("bdl", self.all_strings, message)), ) if chat_id_str in bdl: if args[0] == "notify" and args[1] is not None: if not isinstance(self.apo_lib.utils.validate_boolean(args[1]), bool): return await utils.answer( message, self.apo_lib.utils.get_str("error", self.all_strings, message), ) sets[chat_id_str].update( {"notify": self.apo_lib.utils.validate_boolean(args[1])} ) elif args[0] == "timeout" and args[1] is not None and chat_id_str in bdl: if not self.apo_lib.utils.validate_integer(args[1]): return await utils.answer( message, self.apo_lib.utils.get_str("no_int", self.all_strings, message), ) sets[chat_id_str].update({"timeout": int(args[1])}) elif args[0] == "deltimer" and args[1] is not None and chat_id_str in bdl: if not self.apo_lib.utils.validate_integer(args[1]): return await utils.answer( message, self.apo_lib.utils.get_str("no_int", self.all_strings, message), ) sets[chat_id_str].update({"deltimer": int(args[1])}) elif args[0] != "settings" and chat_id_str in bdl: return self._db.set(self._classname, "bdl", bdl) self._db.set(self._classname, "bdl_sets", sets) return await utils.answer( message, self.apo_lib.utils.get_str( "prot_settings", self.all_strings, message ).format( self.apo_lib.utils.get_str("bdl", self.all_strings, message), sets[chat_id_str], ), ) async def bsscmd(self, message: Message): """   - Toggles BlockStickerSpam for the current chat. .bss timeout - Sets the timeout for the sticker spam. .bss notify  - Toggles the notification message. .bss deltimer  - Deletes the notification message in seconds. 0 to disable. .bss settings  - Shows the current configuration of the chat. .bss db  - Shows the current database. .bss clearall  - Clears the db of BlockChannelUser. """ bss = self._db.get(self._classname, "bss", []) sets = self._db.get(self._classname, "bss_sets", {}) args = utils.get_args_raw(message).lower().split() chat = await message.get_chat() chat_id_str = str(chat.id) if args and args[0] == "clearall": self._db.set(self._classname, "bss", []) self._db.set(self._classname, "bss_sets", {}) return await utils.answer( message, self.apo_lib.utils.get_str( "prot_turned_off", self.all_strings, message ).format(self.apo_lib.utils.get_str("bss", self.all_strings, message)), ) if args and args[0] == "db": return await utils.answer( message, self.apo_lib.utils.get_str( "prot_db_string", self.all_strings, message ).format( self.apo_lib.utils.get_str("bss", self.all_strings, message), bss, sets, ), ) if message.is_private: await utils.answer( message, self.apo_lib.utils.get_str("not_dc", self.all_strings, message), ) return if ( (chat.admin_rights or chat.creator) and not chat.admin_rights.delete_messages or not chat.admin_rights and not chat.creator ) and (args or chat_id_str not in bss): return await utils.answer( message, self.apo_lib.utils.get_str("permerror", self.all_strings, message), ) if not args: if chat_id_str not in bss: bss.append(chat_id_str) sets.setdefault(chat_id_str, {}) sets[chat_id_str].setdefault("notify", True) sets[chat_id_str].setdefault("timeout", 300) sets[chat_id_str].setdefault("deltimer", 60) self._db.set(self._classname, "bss", bss) self._db.set(self._classname, "bss_sets", sets) return await utils.answer( message, self.apo_lib.utils.get_str( "prot_start", self.all_strings, message ).format( self.apo_lib.utils.get_str("bss", self.all_strings, message) ), ) bss.remove(chat_id_str) self._db.set(self._classname, "bss", bss) return await utils.answer( message, self.apo_lib.utils.get_str( "prot_stopped", self.all_strings, message ).format(self.apo_lib.utils.get_str("bss", self.all_strings, message)), ) if chat_id_str in bss: if args[0] == "notify" and args[1] is not None: if not isinstance(self.apo_lib.utils.validate_boolean(args[1]), bool): return await utils.answer( message, self.apo_lib.utils.get_str("error", self.all_strings, message), ) sets[chat_id_str].update( {"notify": self.apo_lib.utils.validate_boolean(args[1])} ) elif args[0] == "timeout" and args[1] is not None and chat_id_str in bss: if not self.apo_lib.utils.validate_integer(args[1]): return await utils.answer( message, self.apo_lib.utils.get_str("no_int", self.all_strings, message), ) sets[chat_id_str].update({"timeout": int(args[1])}) elif args[0] == "deltimer" and args[1] is not None and chat_id_str in bss: if not self.apo_lib.utils.validate_integer(args[1]): return await utils.answer( message, self.apo_lib.utils.get_str("no_int", self.all_strings, message), ) sets[chat_id_str].update({"deltimer": int(args[1])}) elif args[0] != "settings" and chat_id_str in bss: return self._db.set(self._classname, "bss", bss) self._db.set(self._classname, "bss_sets", sets) return await utils.answer( message, self.apo_lib.utils.get_str( "prot_settings", self.all_strings, message ).format( self.apo_lib.utils.get_str("bss", self.all_strings, message), sets[chat_id_str], ), ) async def bcecmd(self, message: Message): """   - Toggles BlockCustomEmojis for the current chat. .bce timeout - Sets the timeout for the sticker spam. .bce notify  - Toggles the notification message. .bce deltimer  - Deletes the notification message in seconds. 0 to disable. .bce settings  - Shows the current configuration of the chat. .bce db  - Shows the current database. .bce clearall  - Clears the db of BlockChannelUser. """ bce = self._db.get(self._classname, "bce", []) sets = self._db.get(self._classname, "bce_sets", {}) args = utils.get_args_raw(message).lower().split() chat = await message.get_chat() chat_id_str = str(chat.id) if args and args[0] == "clearall": self._db.set(self._classname, "bce", []) self._db.set(self._classname, "bces_sets", {}) return await utils.answer( message, self.apo_lib.utils.get_str( "prot_turned_off", self.all_strings, message ).format(self.apo_lib.utils.get_str("bce", self.all_strings, message)), ) if args and args[0] == "db": return await utils.answer( message, self.apo_lib.utils.get_str( "prot_db_string", self.all_strings, message ).format( self.apo_lib.utils.get_str("bce", self.all_strings, message), bce, sets, ), ) if message.is_private: await utils.answer( message, self.apo_lib.utils.get_str("not_dc", self.all_strings, message), ) return if ( (chat.admin_rights or chat.creator) and not chat.admin_rights.delete_messages or not chat.admin_rights and not chat.creator ) and (args or chat_id_str not in bce): return await utils.answer( message, self.apo_lib.utils.get_str("permerror", self.all_strings, message), ) if not args: if chat_id_str not in bce: bce.append(chat_id_str) sets.setdefault(chat_id_str, {}) sets[chat_id_str].setdefault("notify", True) sets[chat_id_str].setdefault("mute", 1) sets[chat_id_str].setdefault("deltimer", 60) self._db.set(self._classname, "bce", bce) self._db.set(self._classname, "bce_sets", sets) return await utils.answer( message, self.apo_lib.utils.get_str( "prot_start", self.all_strings, message ).format( self.apo_lib.utils.get_str("bce", self.all_strings, message) ), ) bce.remove(chat_id_str) self._db.set(self._classname, "bce", bce) return await utils.answer( message, self.apo_lib.utils.get_str( "prot_stopped", self.all_strings, message ).format(self.apo_lib.utils.get_str("bce", self.all_strings, message)), ) if chat_id_str in bce: if args[0] == "notify" and args[1] is not None: if not isinstance(self.apo_lib.utils.validate_boolean(args[1]), bool): return await utils.answer( message, self.apo_lib.utils.get_str("error", self.all_strings, message), ) sets[chat_id_str].update( {"notify": self.apo_lib.utils.validate_boolean(args[1])} ) elif args[0] == "mute" and args[1] is not None and chat_id_str in bce: if not self.apo_lib.utils.validate_integer(args[1]): return await utils.answer( message, self.apo_lib.utils.get_str("no_int", self.all_strings, message), ) sets[chat_id_str].update({"mute": int(args[1])}) elif args[0] == "deltimer" and args[1] is not None and chat_id_str in bce: if not self.apo_lib.utils.validate_integer(args[1]): return await utils.answer( message, self.apo_lib.utils.get_str("no_int", self.all_strings, message), ) sets[chat_id_str].update({"deltimer": int(args[1])}) elif args[0] != "settings" and chat_id_str in bce: return self._db.set(self._classname, "bce", bce) self._db.set(self._classname, "bce_sets", sets) return await utils.answer( message, self.apo_lib.utils.get_str( "prot_settings", self.all_strings, message ).format( self.apo_lib.utils.get_str("bce", self.all_strings, message), sets[chat_id_str], ), ) async def bgscmd(self, message: Message): """   - Toggles BlockStickerSpam for the current chat. .bgs timeout - Sets the timeout for the sticker spam. .bgs notify  - Toggles the notification message. .bgs deltimer  - Deletes the notification message in seconds. 0 to disable. .bgs settings  - Shows the current configuration of the chat. .bgs db  - Shows the current database. .bgs clearall  - Clears the db of BlockChannelUser. """ bgs = self._db.get(self._classname, "bgs", []) sets = self._db.get(self._classname, "bgs_sets", {}) args = utils.get_args_raw(message).lower().split() chat = await message.get_chat() chat_id_str = str(chat.id) if args and args[0] == "clearall": self._db.set(self._classname, "bgs", []) self._db.set(self._classname, "bgs_sets", {}) return await utils.answer( message, self.apo_lib.utils.get_str( "prot_turned_off", self.all_strings, message ).format(self.apo_lib.utils.get_str("bgs", self.all_strings, message)), ) if args and args[0] == "db": return await utils.answer( message, self.apo_lib.utils.get_str( "prot_db_string", self.all_strings, message ).format( self.apo_lib.utils.get_str("bgs", self.all_strings, message), bgs, sets, ), ) if message.is_private: await utils.answer( message, self.apo_lib.utils.get_str("not_dc", self.all_strings, message), ) return if ( (chat.admin_rights or chat.creator) and not chat.admin_rights.delete_messages or not chat.admin_rights and not chat.creator ) and (args or chat_id_str not in bgs): return await utils.answer( message, self.apo_lib.utils.get_str("permerror", self.all_strings, message), ) if not args: if chat_id_str not in bgs: bgs.append(chat_id_str) sets.setdefault(chat_id_str, {}) sets[chat_id_str].setdefault("notify", True) sets[chat_id_str].setdefault("timeout", 300) sets[chat_id_str].setdefault("deltimer", 60) self._db.set(self._classname, "bgs", bgs) self._db.set(self._classname, "bgs_sets", sets) return await utils.answer( message, self.apo_lib.utils.get_str( "prot_start", self.all_strings, message ).format( self.apo_lib.utils.get_str("bgs", self.all_strings, message) ), ) bgs.remove(chat_id_str) self._db.set(self._classname, "bgs", bgs) return await utils.answer( message, self.apo_lib.utils.get_str( "prot_stopped", self.all_strings, message ).format(self.apo_lib.utils.get_str("bgs", self.all_strings, message)), ) if chat_id_str in bgs: if args[0] == "notify" and args[1] is not None: if not isinstance(self.apo_lib.utils.validate_boolean(args[1]), bool): return await utils.answer( message, self.apo_lib.utils.get_str("error", self.all_strings, message), ) sets[chat_id_str].update( {"notify": self.apo_lib.utils.validate_boolean(args[1])} ) elif args[0] == "timeout" and args[1] is not None and chat_id_str in bgs: if not self.apo_lib.utils.validate_integer(args[1]): return await utils.answer( message, self.apo_lib.utils.get_str("no_int", self.all_strings, message), ) sets[chat_id_str].update({"timeout": int(args[1])}) elif args[0] == "deltimer" and args[1] is not None and chat_id_str in bgs: if not self.apo_lib.utils.validate_integer(args[1]): return await utils.answer( message, self.apo_lib.utils.get_str("no_int", self.all_strings, message), ) sets[chat_id_str].update({"deltimer": int(args[1])}) elif args[0] != "settings" and chat_id_str in bgs: return self._db.set(self._classname, "bgs", bgs) self._db.set(self._classname, "bgs_sets", sets) return await utils.answer( message, self.apo_lib.utils.get_str( "prot_settings", self.all_strings, message ).format( self.apo_lib.utils.get_str("bgs", self.all_strings, message), sets[chat_id_str], ), ) async def glcmd(self, message: Message): """  - Logs given groupchat in given channel. .gl rem  - Removes given chat from watcher. .gl db  - Shows the current database. .gl settings  - Shows the current configuration of the chat. .gl clearall  - Clears the db of Group/Channel Logger. """ gl = self._db.get(self._classname, "gl", []) sets = self._db.get(self._classname, "gl_sets", {}) args = utils.get_args_raw(message).lower().split() chat_id = utils.get_chat_id(message) if not args: return await utils.answer( message, self.apo_lib.utils.get_str("error", self.all_strings, message), ) if args[0] == "clearall": self._db.set(self._classname, "gl", []) self._db.set(self._classname, "gl_sets", {}) return await utils.answer( message, self.apo_lib.utils.get_str( "prot_turned_off", self.all_strings, message ).format(self.apo_lib.utils.get_str("gl", self.all_strings, message)), ) if args[0] == "db": return await utils.answer( message, self.apo_lib.utils.get_str( "prot_db_string", self.all_strings, message ).format( self.apo_lib.utils.get_str("gl", self.all_strings, message), gl, sets, ), ) if args[0] is not None and self.apo_lib.utils.validate_tgid(args[0]): chat_id = args[0] elif args[0] == "rem": chat_id = args[1] elif args[0] == "db": return await utils.answer( message, self.apo_lib.utils.get_str( "prot_db_string", self.all_strings, message ).format( self.apo_lib.utils.get_str("gl", self.all_strings, message), sets ), ) elif args[0] not in ["clearall", "settings"]: return await utils.answer( message, self.apo_lib.utils.get_str("error", self.all_strings, message), ) elif not args: return await utils.answer( message, self.apo_lib.utils.get_str("error", self.all_strings, message), ) if ( args[0] == "rem" and self.apo_lib.utils.validate_tgid(args[1]) and chat_id in gl ): gl.remove(chat_id) self._db.set(self._classname, "gl", gl) return await utils.answer( message, self.apo_lib.utils.get_str( "prot_stopped", self.all_strings, message ).format(self.apo_lib.utils.get_str("gl", self.all_strings, message)), ) if args[0] == "rem" and ( self.apo_lib.utils.validate_tgid(args[1]) or chat_id not in gl ): return await utils.answer( message, self.apo_lib.utils.get_str("error", self.all_strings, message), ) if not self.apo_lib.utils.validate_tgid(chat_id): return await utils.answer( message, self.apo_lib.utils.get_str("error", self.all_strings, message), ) if chat_id not in gl: if not self.apo_lib.utils.validate_tgid( args[0] ) or not self.apo_lib.utils.validate_tgid(args[1]): return await utils.answer( message, self.apo_lib.utils.get_str("no_id", self.all_strings, message), ) gl.append(chat_id) sets.setdefault(chat_id, {}) sets[chat_id].setdefault("logchannel", args[1]) self._db.set(self._classname, "gl", gl) self._db.set(self._classname, "gl_sets", sets) return await utils.answer( message, self.apo_lib.utils.get_str( "prot_start", self.all_strings, message ).format(self.apo_lib.utils.get_str("gl", self.all_strings, message)), ) if len(args) == 2: if not self.apo_lib.utils.validate_tgid( args[0] ) or not self.apo_lib.utils.validate_tgid(args[1]): return await utils.answer( message, self.apo_lib.utils.get_str("no_id", self.all_strings, message), ) sets[chat_id].update({"logchannel": args[1]}) elif args[0] != "settings" and chat_id in gl: return self._db.set(self._classname, "gl", gl) self._db.set(self._classname, "gl_sets", sets) return await utils.answer( message, self.apo_lib.utils.get_str( "prot_settings", self.all_strings, message ).format( self.apo_lib.utils.get_str("gl", self.all_strings, message), sets[chat_id], ), ) async def p__bnd_handler( self, chat: Chat, user: User, message: Message, bnd: list, bnd_sets: dict, ): # sourcery skip: low-code-quality """ Block users which are not members of the group. :param chat: Chat object. :param user: User object. :param message: Message object. :param bnd: List of watched groups. :param bnd_sets: Dictionary of group IDs and their settings. """ if str(chat.id) not in bnd or message.id in self._msg_handler: return self._msg_handler = {message.id: "p__bnd"} asyncio.ensure_future(self.punish_handler(chat, user, message, "bnd", bnd_sets)) return async def p__bcu_handler( self, chat: Chat, user: User, message: Message, bcu: list, bcu_sets: dict, ): """ Block Channel Users. :param chat: Chat object. :param user: User object. :param message: Message object. :param bcu: List of watched groups. :param bcu_sets: Dictionary of group IDs and their settings. """ if str(chat.id) not in bcu or message.id in self._msg_handler: return self._msg_handler = {message.id: "p__bcu"} asyncio.ensure_future(self.punish_handler(chat, user, message, "bcu", bcu_sets)) return async def p__bnc_handler( self, chat: Chat, user: User, message: Message, bnc: list, bnc_sets: dict, ): # sourcery skip: low-code-quality """ Block Non Comments. :param chat: Chat object. :param user: User object. :param message: Message object. :param bnc: List of watched groups. :param bnc_sets: Dictionary of group IDs and their settings. """ if ( str(chat.id) not in bnc or message.id in self._msg_handler or ( message.is_reply and await self.apo_lib.utils.is_linkedchannel( message.chat_id, (await self.apo_lib.utils.get_first_msg(message)).sender_id, ) ) ): return self._msg_handler = {message.id: "p__bnc"} asyncio.ensure_future(self.punish_handler(chat, user, message, "bnc", bnc_sets)) return async def p__bf_handler( self, chat: Chat, user: User, message: Message, bf: list, bf_sets: dict, ): # sourcery skip: low-code-quality """ Block users who sends x messages in a row. :param chat: Chat object. :param user: User object. :param message: Message object. :param bf: List of watched groups. :param bf_sets: Dictionary of group IDs and their settings. """ if str(chat.id) not in bf: return if ( self._ratelimit_p_count["bf"].get(chat.id) and self._ratelimit_p_count["bf"][chat.id][0] == user.id and self._ratelimit_p_count["bf"][chat.id][1] >= bf_sets[str(chat.id)].get("limit") ): asyncio.ensure_future( self.punish_handler(chat, user, message, "bf", bf_sets) ) else: self._ratelimit_p_count["bf"].update( { chat.id: [ user.id, ( self._ratelimit_p_count["bf"][chat.id][1] + 1 if self._ratelimit_p_count["bf"].get(chat.id) else 1 ), ] } ) async def p__bce_handler( self, chat: Chat, user: User, message: Message, bce: list, bce_sets: dict, ): # sourcery skip: low-code-quality """ Block Custom Emojis in the chat. :param chat: Chat object. :param user: User object. :param message: Message object. :param bce: List of watched groups. :param bce_sets: Dictionary of group IDs and their settings. """ if ( str(chat.id) not in bce or not any( bool(ent) for ent, _ in message.get_entities_text() if isinstance(ent, MessageEntityCustomEmoji) ) or message.id in self._msg_handler ): return self._msg_handler = {message.id: "p__bce"} asyncio.ensure_future(self.punish_handler(chat, user, message, "bce", bce_sets)) return async def p__bdl_handler( self, chat: Chat, user: User, message: Message, bdl: list, bdl_sets: dict, ): # sourcery skip: low-code-quality """ Block double links in a group. :param chat: Chat object. :param message: Message object. :param bdl: List of watched id's. :param bdl_sets: Dictionary of group IDs and their settings. """ if str(chat.id) not in bdl or message.id in self._msg_handler: return url = self.apo_lib.utils.get_all_urls(message.text, rem_duplicates=True) url = url[0].lower() if len(url) > 0 else None if not url: return await self.p_ratelimit_handler(chat, user, message, "bdl", bdl_sets) async def p__bss_handler( self, chat: Chat, user: User, message: Message, bss: list, bss_sets: dict, ): # sourcery skip: low-code-quality """ Block Sticker Spam in a group. :param chat: Chat object. :param user: User object. :param message: Message object. :param bss: List of watched id's. :param bss_sets: Dictionary of group IDs and their settings. """ if ( str(chat.id) not in bss or not message.sticker or message.id in self._msg_handler ): return await self.p_ratelimit_handler(chat, user, message, "bss", bss_sets) async def p__bgs_handler( self, chat: Chat, user: User, message: Message, bgs: list, bgs_sets: dict, ): # sourcery skip: low-code-quality """ Block Gif Spam in a group. :param chat: Chat object. :param user: User object. :param message: Message object. :param bgs: List of watched id's. :param bgs_sets: Dictionary of group IDs and their settings. """ if ( str(chat.id) not in bgs or not message.gif or message.id in self._msg_handler ): return await self.p_ratelimit_handler(chat, user, message, "bgs", bgs_sets) async def p__gl( self, chat: Chat, user: User, message: Message, gl: list, gl_sets: dict, ): # sourcery skip: low-code-quality """ Log messages of a group. :param chat: Chat object. :param user: User object. :param message: Message object. :param gl: List of group IDs to log. :param gl_sets: Dictionary of group IDs and their settings. """ if message.is_private or str(chat.id) not in gl: return logchan_id = int(gl_sets[str(chat.id)].get("logchannel")) chat_tag = await self.apo_lib.utils.get_tag(chat, True) user_tag = await self.apo_lib.utils.get_tag(user, True) link = ( f"Chat: {chat_tag} | #ID_{chat.id}" + f"\nUser: {user_tag} | #ID_{user.id}" ) try: await message.forward_to(logchan_id) await self._client.send_message(logchan_id, link) except Exception as exc: # skipcq: PYL-W0703 if "FORWARDS_RESTRICTED" in str(exc): msgs = await self._client.get_messages(chat.id, ids=message.id) await self._client.send_message(logchan_id, message=msgs) await self._client.send_message(logchan_id, link) async def p__admin_handler( self, chat: Chat, user: User, message: Message, ): # sourcery skip: low-code-quality """ Watch for admintag messages :param chat: Chat object. :param user: User object. :param message: Message object :return: True if message is admintag """ if ( message.id in self._msg_handler or all( cst_tag.lower() not in [x.lower() for x in self.apo_lib.utils.raw_text(message).split()] for cst_tag in self.config["admin_tag"] ) or ( isinstance(user, User) and (perms := await self.apo_lib.utils.is_member(chat.id, user.id)) and perms.is_admin ) ): return self._msg_handler = {message.id: "p__admin"} asyncio.ensure_future(self.p__admin(chat, user, message)) async def p__admin( self, chat: Chat, user: User, message: Message ): # sourcery skip: low-code-quality admin_tag_string = self.apo_lib.utils.get_str( "admin_tag", self.all_strings, message ).format( await self.apo_lib.utils.get_tag(user.id, True), await utils.get_message_link(message), ) if message.is_reply: reply = await message.get_reply_message() reply_user = await reply.get_sender() admin_tag_string += self.apo_lib.utils.get_str( "admin_tag_reply", self.all_strings, message ).format( await self.apo_lib.utils.get_tag(reply_user, True), ) else: reply = None if await self.apo_lib.utils.check_inlinebot(chat.id): msg = await self.inline.bot.send_message( chat.id if str(chat.id).startswith("-100") else int(f"-100{chat.id}"), self.apo_lib.utils.get_str( "admin_tag_reply_msg", self.all_strings, message ), parse_mode="HTML", disable_web_page_preview=True, reply_to_message_id=message.id, allow_sending_without_reply=True, ) else: msg = await utils.answer( message, self.apo_lib.utils.get_str( "admin_tag_reply_msg", self.all_strings, message ), reply_to=message, ) await self.inline.bot.send_message( self.tg_id, admin_tag_string, parse_mode="HTML", disable_web_page_preview=True, ) if reply: try: await self.inline.bot.forward_message( self.tg_id, chat.id if str(chat.id).startswith("-100") else int(f"-100{chat.id}"), message_id=reply.id, ) except Exception as exc: # skipcq: PYL-W0703 if "Message has protected content" in str(exc): msgs = await self._client.get_messages(chat.id, ids=reply.id) await self.inline.bot.send_message( self.tg_id, msgs.message, parse_mode="HTML", disable_web_page_preview=True, ) await asyncio.sleep(30) await self.apo_lib.utils.delete_message(msg) async def q_watcher_logger(self, message: Message): await self._logger_queue_handler(message) async def q_watcher_protection(self, message: Message): await self._protection_queue_handler(message) async def _protection_queue_handler( self, message: Message ): # sourcery skip: low-code-quality if ( not isinstance(message, Message) or message.out or not message.is_channel or not message.is_group ): return chat_id = utils.get_chat_id(message) chat_id_str = str(chat_id) user_id = await self.apo_lib.utils.get_user_id(message) bce = self._db.get(self._classname, "bce", []) bce_sets = self._db.get(self._classname, "bce_sets", {}) bcu = self._db.get(self._classname, "bcu", []) bcu_sets = self._db.get(self._classname, "bcu_sets", {}) bdl = self._db.get(self._classname, "bdl", []) bdl_sets = self._db.get(self._classname, "bdl_sets", {}) bf = self._db.get(self._classname, "bf", []) bf_sets = self._db.get(self._classname, "bf_sets", {}) bgs = self._db.get(self._classname, "bgs", []) bgs_sets = self._db.get(self._classname, "bgs_sets", {}) bnc = self._db.get(self._classname, "bnc", []) bnc_sets = self._db.get(self._classname, "bnc_sets", {}) bnd = self._db.get(self._classname, "bnd", []) bnd_sets = self._db.get(self._classname, "bnd_sets", {}) bss = self._db.get(self._classname, "bss", []) bss_sets = self._db.get(self._classname, "bss_sets", {}) if user_id not in [chat_id, self.inline.bot_id] or ( chat_id_str in bnd or chat_id_str in bce or chat_id_str in bcu or chat_id_str in bdl or chat_id_str in bf or chat_id_str in bnc or chat_id_str in bss ): chat = await message.get_chat() self.apo_lib.utils.log( logging.DEBUG, __name__, "Get_sender.", debug_msg=True, ) user = await message.get_sender() self.apo_lib.utils.log( logging.DEBUG, __name__, "got sender. -> some ifs", debug_msg=True, ) if ( ( (not chat.admin_rights and not chat.creator) or not chat.admin_rights.delete_messages ) or ( isinstance(user, User) and (perms := await self.apo_lib.utils.is_member(chat, user)) and perms.is_admin ) or ( isinstance(user, Channel) and not (perms := None) and await self.apo_lib.utils.is_linkedchannel(chat, user) ) ): return self.apo_lib.utils.log( logging.DEBUG, __name__, "survived ifs.", debug_msg=True, ) await self.p__bf_handler(chat, user, message, bf, bf_sets) if isinstance(user, User) and not perms: await self.p__bnd_handler(chat, user, message, bnd, bnd_sets) if isinstance(user, Channel): await self.p__bcu_handler(chat, user, message, bcu, bcu_sets) await self.p__bnc_handler(chat, user, message, bnc, bnc_sets) await self.p__bdl_handler(chat, user, message, bdl, bdl_sets) await self.p__bss_handler(chat, user, message, bss, bss_sets) await self.p__bgs_handler(chat, user, message, bgs, bgs_sets) await self.p__bce_handler(chat, user, message, bce, bce_sets) if ( self.config["tag_whitelist"] and chat_id in self.config["admin_tag_chats"] ) or ( not self.config["tag_whitelist"] and chat_id not in self.config["admin_tag_chats"] ): chat = await message.get_chat() user = await message.get_sender() await self.p__admin_handler(chat, user, message) with contextlib.suppress(Exception): self._msg_handler.pop(message.id) return async def _logger_queue_handler(self, message: Message): if ( not isinstance(message, Message) or not message.is_channel or not message.is_group ): return chat_id = utils.get_chat_id(message) chat_id_str = str(chat_id) user_id = await self.apo_lib.utils.get_user_id(message) gl = self._db.get(self._classname, "gl", []) gl_sets = self._db.get(self._classname, "gl_sets", {}) bf = self._db.get(self._classname, "bf", []) if chat_id_str in gl: chat = await message.get_chat() user = await message.get_sender() await self.p__gl(chat, user, message, gl, gl_sets) if ( chat_id_str in bf and self._ratelimit_p_count["bf"].get(chat_id) and self._ratelimit_p_count["bf"][chat_id][0] != user_id ): self._ratelimit_p_count["bf"].pop(chat_id) return def _db_migrator(self): if self._db.get(self._classname, "migrate"): return for key1, value1 in list(self._db[self._classname].items()): if key1 in ["bnd", "bcu", "gl", "bdl", "bss", "bgs"]: self._db.set(self._classname, key1, list(map(str, value1))) if key1 in [ "bcu_sets", "gl_sets", "bnd_sets", "bdl_sets", "bss_sets", "bgs_sets", ]: for key2, value2 in list(value1.items()): if isinstance(key2, int): self._db[self._classname][key1].pop(key2) self._db[self._classname][key1].update({str(key2): value2}) for key1, value1 in list(self._db[self._classname].items()): if key1 in ["gl_sets"]: for key2, value2 in list(value1.items()): for key3, value3 in list(value2.items()): if key3 == "logchannel" and isinstance(value3, int): self._db[self._classname][key1][key2][key3] = str(value3) with contextlib.suppress(Exception): self._db[self._classname].pop("migrated") self._db.set(self._classname, "migrate", True) async def punish_handler( self, chat: Chat, user: User, message: Message, module_short: str, module_sets: dict, ): # sourcery skip: low-code-quality self.apo_lib.utils.log( logging.DEBUG, __name__, "Try to delete.", debug_msg=True, ) await self.apo_lib.utils.delete_message(message, True) if ( chat.admin_rights.ban_users and module_sets[str(chat.id)].get("mute") is not None and module_sets[str(chat.id)].get("mute") != 0 ): duration = module_sets[str(chat.id)].get("mute") await self.apo_lib.utils.mute(chat.id, user.id, duration) if module_sets[str(chat.id)].get("ban") is True: await self.apo_lib.utils.ban(chat.id, user.id) if module_sets[str(chat.id)].get("notify") is True and ( not self._ratelimit_notify[module_short].get(user.id) or self._ratelimit_notify[module_short].get(user.id) < time.time() ): for key, value in list(self._ratelimit_notify[module_short].items()): if value < time.time(): self._ratelimit_notify[module_short].pop(key) self._ratelimit_notify[module_short].update( { user.id: time.time() + module_sets[str(chat.id)].get("deltimer") if module_sets[str(chat.id)].get("deltimer") != 0 else time.time() + 15 } ) await asyncio.sleep(5) self.apo_lib.utils.log( logging.DEBUG, __name__, "Debug try usertag link.", debug_msg=True, ) self.apo_lib.utils.log( logging.DEBUG, __name__, f"{user}", debug_msg=True, ) usertag = await self.apo_lib.utils.get_tag(user, True) self.apo_lib.utils.log( logging.DEBUG, __name__, "Debug try link.", debug_msg=True, ) link = ( await self.apo_lib.utils.get_invite_link(chat) if module_short == "bnd" else None ) self.apo_lib.utils.log( logging.DEBUG, __name__, "Done.", debug_msg=True, ) self.apo_lib.utils.log( logging.DEBUG, __name__, f"{user}", debug_msg=True, ) if message.is_reply: reply = await self.apo_lib.utils.get_first_msg(message) else: reply = None if reply and not isinstance(await reply.get_sender(), Channel): reply = None if await self.apo_lib.utils.check_inlinebot(chat.id): msg = await self.inline.bot.send_message( chat.id if str(chat.id).startswith("-100") else int(f"-100{chat.id}"), self.apo_lib.utils.get_str( f"{module_short}_triggered", self.all_strings, message ).format(usertag, link), parse_mode="HTML", disable_web_page_preview=True, reply_to_message_id=getattr(reply, "id", None), allow_sending_without_reply=True, ) else: msg = await utils.answer( message, self.apo_lib.utils.get_str( f"{module_short}_triggered", self.all_strings, message ).format(usertag, link), ) if module_sets[str(chat.id)].get("deltimer") != 0: deltimer = module_sets[str(chat.id)].get("deltimer") await self.apo_lib.utils.delete_message(msg, deltimer=deltimer) async def p_ratelimit_handler(self, chat, user, message, module_short, module_sets): if ( self._ratelimit_p_count[module_short].get(chat.id) and user.id in self._ratelimit_p_count[module_short].get(chat.id) and ( not self._ratelimit_p_count[module_short][chat.id].get(user.id)[1] or self._ratelimit_p_count[module_short][chat.id].get(user.id)[1] >= module_short[str(chat.id)].get("limit") ) and self._ratelimit_p_count[module_short][chat.id].get(user.id)[0] > time.time() ): self._msg_handler = {message.id: f"p__{module_short}"} asyncio.ensure_future( self.punish_handler(chat, user, message, module_short, module_sets) ) if ( self._ratelimit_p_count[module_short].get(chat.id) and user.id in self._ratelimit_p_count[module_short].get(chat.id) and self._ratelimit_p_count[module_short][chat.id][user.id][0] < time.time() ): self._ratelimit_p_count[module_short][chat.id].pop(user.id) else: self._ratelimit_p_count[module_short].update( { chat.id: { user.id: [ time.time() + module_sets[str(chat.id)].get("timeout"), module_sets[str(chat.id)].get("Limit"), ] } } )