__version__ = (3, 0, 4) # █ █ ▀ █▄▀ ▄▀█ █▀█ ▀ # █▀█ █ █ █ █▀█ █▀▄ █ # © Copyright 2022 # https://t.me/hikariatama # # 🔒 Licensed under the GNU AGPLv3 # 🌐 https://www.gnu.org/licenses/agpl-3.0.html # meta title: PM->BL # meta pic: https://img.icons8.com/external-dreamcreateicons-flat-dreamcreateicons/512/000000/external-death-halloween-dreamcreateicons-flat-dreamcreateicons.png # meta banner: https://mods.hikariatama.ru/badges/pmbl.jpg # meta developer: @hikarimods # scope: hikka_only # scope: hikka_min 1.5.0 import contextlib import logging import time from typing import Optional from telethon.tl.functions.contacts import BlockRequest from telethon.tl.functions.messages import DeleteHistoryRequest, ReportSpamRequest from telethon.tl.types import Message, PeerUser, User from telethon.utils import get_display_name, get_peer_id from .. import loader, utils logger = logging.getLogger(__name__) def format_(state: Optional[bool]) -> str: if state is None: return "❔" return "🫡" if state else "🙅‍♂️ Not" @loader.tds class PMBLMod(loader.Module): """Bans and reports incoming messages from unknown users""" strings = { "name": "PMBL", "state": ( "🛡 PM->BL is now" " {}\nReport spam? - {}\nDelete dialog? - {}" ), "args": ( "🚫 Usage example:" " .pmblsett 0 0" ), "args_pmban": ( "🚫 Usage example:" " .pmbanlast 5" ), "banned": ( "😃 Hey there" " •ᴗ•\nUnit «SIGMA», the guardian of this account. You are" " not approved! You can contact my owner in chat, if you need" " help.\nI need to ban you in terms of security" ), "removing": ( "🚮 Removing {} last" " dialogs..." ), "removed": ( "🚮 Removed {} last" " dialogs!" ), "user_not_specified": ( "🚫 You haven't specified" " user" ), "approved": ( " {} approved in pm' ), "banned_log": ( '👮 I banned {}.\n\n{} Reported' " spam\n{} Deleted dialog\n\n" " 📝 Message\n{}" ), "hello": ( "🔏 Unit «SIGMA» protects your personal messages from intrusions. It" " will block everyone, who's trying to invade you.\n\nUse" " .pmbl to enable protection, .pmblsett to" " configure it and .pmbanlast if you've already been" " pm-raided." ), } strings_ru = { "state": ( "🛡 Текущее состояние" " PM->BL: {}\nСообщать о спаме? - {}\nУдалять диалог? - {}" ), "args": ( "🚫 Пример:" " .pmblsett 0 0" ), "args_pmban": ( "🚫 Пример:" " .pmbanlast 5" ), "banned": ( "😃 Добрый день" " •ᴗ•\nЮнит «SIGMA», защитник этого аккаунта. Вы не" " потверждены! Вы можете связаться с моим владельцем в чате," " если нужна помощь.\nЯ вынужден заблокировать вас из соображений" " безопасности" ), "hello": ( "🔏 Юнит «SIGMA» защищает твои личные сообщенния от неизвестных" " пользователей. Он будет блокировать всех, кто не соответствует" " настройкам.\n\nВведи .pmbl для активации защиты," " .pmblsett для ее настройки и .pmbanlast если" " нужно очистить уже прошедший рейд на личные сообщения." ), "removing": ( "🚮 Удаляю {} последних" " диалогов..." ), "removed": ( "🚮 Удалил {} последних" " диалогов!" ), "user_not_specified": ( "🚫 Укажи" " пользователя" ), "_cmd_doc_pmbl": "Включить или выключить защиту", "_cmd_doc_pmbanlast": ( "<количество> - Забанить и удалить n последних диалогов с пользователями" ), "_cmd_doc_allowpm": "<пользователь> - Разрешить пользователю писать тебе в ЛС", "_cls_doc": "Блокирует и репортит входящие сообщения от незнакомцев", "approved": ( " {} одобрен в лс' ), "banned_log": ( '👮 Я заблокировал {}.\n\n{}' " Сообщил" " о спаме\n{} Удалил диалог\n\n📝" " Сообщение\n{}" ), } strings_de = { "state": ( "🛡 Aktueller PM->BL" " Status: {}\nSpam melden? - {}\nDialoge löschen? - {}" ), "args": ( "🚫 Beispiel:" " .pmblsett 0 0" ), "args_pmban": ( "🚫 Beispiel:" " .pmbanlast 5" ), "banned": ( "😃 Hallo" " •ᴗ•\nEinheit «SIGMA», der Schutz dieses Accounts. Sie" " sind nicht autorisiert! Sie können sich an den Besitzer meines" " Accounts wenden, wenn Sie Hilfe benötigen.\nIch bin gezwungen, Sie aus" " Sicherheitsgründen zu sperren" ), "hello": ( "🔏 Einheit «SIGMA» schützt Ihre persönlichen Nachrichten vor" " unbekannten Benutzern. Es wird alle blockieren, die nicht den" " Einstellungen entsprechen.\n\nGeben Sie .pmbl ein, um die" " Schutzfunktion zu aktivieren, .pmblsett zum Konfigurieren" " und .pmbanlast, wenn Sie bereits einen Raid auf Ihre" " persönlichen Nachrichten durchgeführt haben." ), "removing": ( "🚮 Entferne {} letzte" " Dialoge..." ), "removed": ( "🚮 Entfernt {} letzte" " Dialoge!" ), "user_not_specified": ( "🚫 Du hast keinen" " Benutzer angegeben" ), "_cmd_doc_pmbl": "Aktiviert oder deaktiviert den Schutz", "_cmd_doc_pmbanlast": ( " - Bannt und löscht n letzte Dialoge mit Benutzern" ), "_cmd_doc_allowpm": ( " - Erlaubt dem Benutzer, dir eine private Nachricht zu senden" ), "_cls_doc": "Blockiert und meldet eingehende Nachrichten von Unbekannten", "approved": ( " {} wurde in den Ls genehmigt' ), "banned_log": ( '👮 Ich habe {} geblockt.\n\n{} Hat' " über Spam berichtet\n{} Hat den Dialog gelöscht\n\n📝" " Nachricht\n{}" ), } strings_tr = { "state": ( "🛡 Şu anki PM->BL durumu:" " {}\nSpam rapor edilsin mi? - {}\nSohbetler silinsin mi? - {}" ), "args": ( "🚫 Örnek:" " .pmblsett 0 0" ), "args_pmban": ( "🚫 Örnek:" " .pmbanlast 5" ), "banned": ( "😃 Merhaba" " •ᴗ•\n«SIGMA» birimi, hesabınızın koruması. Yetkili" " değilsiniz! Yardım için hesabımın sahibi ile iletişime" " geçebilirsiniz.\nGüvenlik nedeniyle sizi zorunlu olarak" " engelliyorum" ), "hello": ( "🔏 «SIGMA» birimi, tanımadığınız kullanıcılarla kişisel" " mesajlarınızı korur. Ayarlara uygun olmayanları tümünü engeller.\n\n" ".pmbl yazarak koruma özelliğini etkinleştirebilir, " ".pmblsett yazarak yapılandırabilir ve zaten kişisel" " mesajlarınıza bir raid gerçekleştirdiyseniz .pmbanlast" " yazarak bunu gerçekleştirebilirsiniz." ), "removing": ( "🚮 Son {} sohbet" " siliniyor..." ), "removed": ( "🚮 Son {} sohbet" " silindi!" ), "user_not_specified": ( "🚫 Bir kullanıcı" " belirtmediniz" ), "_cmd_doc_pmbl": "Korumayı etkinleştirir veya devre dışı bırakır", "_cmd_doc_pmbanlast": " - Kullanıcılarla son n sohbeti yasaklar ve siler", "_cmd_doc_allowpm": ( " - Kullanıcıya kişisel mesaj göndermeye izin verir" ), "_cls_doc": ( "Tanımadığınız kullanıcıların gelen mesajlarını engeller ve rapor eder" ), "approved": ( " {} Ls listesine eklendi' ), "banned_log": ( '👮 {} engellendi.\n\n{} Spam rapor' " etti\n{} Sohbeti sildi\n\n📝 Mesaj\n{}" ), } strings_uz = { "state": ( "🛡 Joriy PM->BL holati:" " {}\nSpam haqida xabar berilsinmi? - {}\nSuhbatlar o'chirilsinmi? -" " {}" ), "args": ( "🚫 Misol:" " .pmblsett 0 0" ), "args_pmban": ( "🚫 Misol:" " .pmbanlast 5" ), "banned": ( "😃 Salom" " •ᴗ•\n«SIGMA» birimi, hisobingizni himoya. Ruxsat" " berilmaganingiz! Yordam kerak bo'lsa hisobimning egasi bilan" " bog'lanishingiz mumkin.\nXavfsizlik sababli sizni majbur qilishim" " kerak" ), "hello": ( "🔏 «SIGMA» birimi, tanimaydigan foydalanuvchilar bilan" " shaxsiy xabarlarini himoya qiladi. Sozlamalarga mos bo'lmasa" " barchasini bloklashadi.\n\n.pmbl yozib himoya" " imkoniyatini yoqish, .pmblsett yozib konfiguratsiyani" " o'zgartirish va agar sizda shaxsiy xabarlariga raid bormi bo'lsa" " .pmbanlast yozib uni bajarishingiz mumkin." ), "removing": ( "🚮 Son {} suhbat" " o'chirilmoqda..." ), "removed": ( "🚮 Son {} suhbat" " o'chirildi!" ), "user_not_specified": ( "🚫 Siz foydalanuvchi" " belgilamadingiz" ), "_cmd_doc_pmbl": "Himoyani yoqadi yoki o'chiradi", "_cmd_doc_pmbanlast": ( " - Foydalanuvchilar bilan son n suhbatni yasaklaydi" ), "_cmd_doc_allowpm": ( " - Foydalanuvchiga shaxsiy xabar yuborishga ruxsat beradi" ), "_cls_doc": "Tanimaydigan foydalanuvchilar gelen xabarlarini bloklashadi", "approved": ( " {} Ls ro'yxatiga qo'shildi" ), "banned_log": ( '👮 {} bloklandi.\n\n{} Spam xabar' " berdi\n{} Suhbat o'chirildi\n\n📝" " Xabar\n{}" ), } strings_hi = { "state": ( "🛡 वर्तमान PM->BL स्थिति:" " {}\nस्पैम रिपोर्ट करें? - {}\nडायलॉगहटाएं? - {}" ), "args": ( "🚫 उदाहरण:" " .pmblsett 0 0" ), "args_pmban": ( "🚫 उदाहरण:" " .pmbanlast 5" ), "banned": ( "😃 नमस्ते" " •ᴗ•\nयूनिट «SIGMA», इस खाते की सुरक्षा. आप" " अनधिकृत हैं! आप मेरे खाते के मालिक को अपनी मदद के लिए या आपको" " सहायता की आवश्यकता है तो उसे संपर्क कर सकते हैं।\nमैं आपको सुरक्षा के" " कारण बंद करने के लिए बाधित कर दूंगा" ), "hello": ( "🔏 यूनिट «SIGMA» अपने निजी संदेशों को अज्ञात उपयोगकर्ताओं से" " सुरक्षित करता है। इसे सेटिंग्स के अनुसार सभी ब्लॉक करेगा।\n\n" ".pmbl दर्ज करें, ताकि सुरक्षा कार्यक्षमता सक्रिय हो, " ".pmblsett कॉन्फ़िगर करने के लिए और .pmbanlast, जब आपने" " अपने निजी संदेशों पर एक रैड किया है।" ), "removing": ( "🚮 {} अंतिम डायलॉग हटा" " रहा है..." ), "removed": ( "🚮 {} अंतिम डायलॉग हटा" " दिया!" ), "user_not_specified": ( "🚫 आपने किसी उपयोगकर्ता" " को नहीं निर्दिष्ट किया" ), "_cmd_doc_pmbl": "सुरक्षा को सक्षम या अक्षम करता है", "_cmd_doc_pmbanlast": "<अंक> - उपयोगकर्ताओं के साथ निजी संदेशों को ब्लॉक और हटाता है", "_cmd_doc_allowpm": ( "<उपयोगकर्ता> - उपयोगकर्ता को आपको एक निजी संदेश भेजने की अनुमति देता है" ), "_cmd_doc_pmblsett": ( "<ब्लॉक> <अनुमति> - ब्लॉक और अनुमति को सेट करता है, जब आपके पास एक निजी संदेश आता है" ), "_cls_doc": "एक निजी संदेश भेजने की अनुमति देता है", } def __init__(self): self._queue = [] self._ban_queue = [] self.config = loader.ModuleConfig( loader.ConfigValue( "ignore_contacts", True, lambda: "Ignore contacts?", validator=loader.validators.Boolean(), ), loader.ConfigValue( "ignore_active", True, lambda: "Ignore peers, where you participated?", validator=loader.validators.Boolean(), ), loader.ConfigValue( "active_threshold", 5, lambda: "What number of your messages is required to trust peer", validator=loader.validators.Integer(minimum=1), ), loader.ConfigValue( "custom_message", doc=lambda: "Custom message to notify untrusted peers. Leave empty for default one", ), loader.ConfigValue( "photo", "https://github.com/hikariatama/assets/raw/master/unit_sigma.png", lambda: "Photo, which is sent along with banned notification", validator=loader.validators.Link(), ), loader.ConfigValue( "report_spam", False, lambda: "Report spam?", validator=loader.validators.Boolean(), ), loader.ConfigValue( "delete_dialog", False, lambda: "Delete dialog?", validator=loader.validators.Boolean(), ), loader.ConfigValue( "silent", False, lambda: "Do not send anything to banned user", validator=loader.validators.Boolean(), ), ) async def client_ready(self): self._whitelist = self.get("whitelist", []) self._ratelimit = [] self._ratelimit_timeout = 5 * 60 self._ratelimit_threshold = 10 if not self.get("ignore_hello", False): await self.inline.bot.send_photo( self._tg_id, photo=( r"https://github.com/hikariatama/assets/raw/master/unit_sigma.png" ), caption=self.strings("hello"), parse_mode="HTML", ) self.set("ignore_hello", True) async def pmblcmd(self, message: Message): """Toggle PMBL""" current = self.get("state", False) new = not current self.set("state", new) await utils.answer( message, self.strings("state").format( "on" if new else "off", "yes" if self.config["report_spam"] else "no", "yes" if self.config["delete_dialog"] else "no", ), ) async def pmbanlastcmd(self, message: Message): """ - Ban and delete dialogs with n most new users""" n = utils.get_args_raw(message) if not n or not n.isdigit(): await utils.answer(message, self.strings("args_pmban")) return n = int(n) await utils.answer(message, self.strings("removing").format(n)) dialogs = [] async for dialog in self._client.iter_dialogs(ignore_pinned=True): try: if not isinstance(dialog.message.peer_id, PeerUser): continue except AttributeError: continue m = ( await self._client.get_messages( dialog.message.peer_id, limit=1, reverse=True, ) )[0] dialogs += [ ( get_peer_id(dialog.message.peer_id), int(time.mktime(m.date.timetuple())), ) ] dialogs.sort(key=lambda x: x[1]) to_ban = [d for d, _ in dialogs[::-1][:n]] for d in to_ban: await self._client(BlockRequest(id=d)) await self._client(DeleteHistoryRequest(peer=d, just_clear=True, max_id=0)) await utils.answer(message, self.strings("removed").format(n)) def _approve(self, user: int, reason: str = "unknown"): self._whitelist += [user] self._whitelist = list(set(self._whitelist)) self.set("whitelist", self._whitelist) logger.debug(f"User approved in pm {user}, filter: {reason}") return async def allowpmcmd(self, message: Message): """ - Allow user to pm you""" args = utils.get_args_raw(message) reply = await message.get_reply_message() user = None try: user = await self._client.get_entity(args) except Exception: with contextlib.suppress(Exception): user = await self._client.get_entity(reply.sender_id) if reply else None if not user: chat = await message.get_chat() if not isinstance(chat, User): await utils.answer(message, self.strings("user_not_specified")) return user = chat self._approve(user.id, "manual_approve") await utils.answer( message, self.strings("approved").format(user.id, get_display_name(user)) ) async def watcher(self, message: Message): if ( getattr(message, "out", False) or not isinstance(message, Message) or not isinstance(message.peer_id, PeerUser) or not self.get("state", False) or utils.get_chat_id(message) in { 1271266957, # @replies 777000, # Telegram Notifications self._tg_id, # Self } ): return self._queue += [message] @loader.loop(interval=0.05, autostart=True) async def ban_loop(self): if not self._ban_queue: return message = self._ban_queue.pop(0) self._ratelimit = list( filter( lambda x: x + self._ratelimit_timeout < time.time(), self._ratelimit, ) ) dialog = None if len(self._ratelimit) < self._ratelimit_threshold: if not self.config["silent"]: try: await self._client.send_file( message.peer_id, self.config["photo"], caption=self.config["custom_message"] or self.strings("banned"), ) except Exception: await utils.answer( message, self.config["custom_message"] or self.strings("banned"), ) self._ratelimit += [round(time.time())] try: dialog = await self._client.get_entity(message.peer_id) except ValueError: pass await self.inline.bot.send_message( self._client.tg_id, self.strings("banned_log").format( dialog.id if dialog is not None else message.sender_id, ( utils.escape_html(dialog.first_name) if dialog is not None else ( getattr(getattr(message, "sender", None), "username", None) or message.sender_id ) ), format_(self.config["report_spam"]), format_(self.config["delete_dialog"]), utils.escape_html( "" if message.photo else ( "