__version__ = (3, 0, 4)
# █ █ ▀ █▄▀ ▄▀█ █▀█ ▀
# █▀█ █ █ █ █▀█ █▀▄ █
# © Copyright 2022
# https://t.me/hikariatama
#
# 🔒 Licensed under the GNU AGPLv3
# 🌐 https://www.gnu.org/licenses/agpl-3.0.html
# meta title: PM->BL
# meta pic: https://img.icons8.com/external-dreamcreateicons-flat-dreamcreateicons/512/000000/external-death-halloween-dreamcreateicons-flat-dreamcreateicons.png
# meta banner: https://mods.hikariatama.ru/badges/pmbl.jpg
# meta developer: @hikarimods
# scope: hikka_only
# scope: hikka_min 1.5.0
import contextlib
import logging
import time
from typing import Optional
from telethon.tl.functions.contacts import BlockRequest
from telethon.tl.functions.messages import DeleteHistoryRequest, ReportSpamRequest
from telethon.tl.types import Message, PeerUser, User
from telethon.utils import get_display_name, get_peer_id
from .. import loader, utils
logger = logging.getLogger(__name__)
def format_(state: Optional[bool]) -> str:
if state is None:
return "❔"
return "🫡" if state else "🙅♂️ Not"
@loader.tds
class PMBLMod(loader.Module):
"""Bans and reports incoming messages from unknown users"""
strings = {
"name": "PMBL",
"state": (
"🛡 PM->BL is now"
" {}\nReport spam? - {}\nDelete dialog? - {}"
),
"args": (
"🚫 Usage example:"
" .pmblsett 0 0"
),
"args_pmban": (
"🚫 Usage example:"
" .pmbanlast 5"
),
"banned": (
"😃 Hey there"
" •ᴗ•\nUnit «SIGMA», the guardian of this account. You are"
" not approved! You can contact my owner in chat, if you need"
" help.\nI need to ban you in terms of security"
),
"removing": (
"🚮 Removing {} last"
" dialogs..."
),
"removed": (
"🚮 Removed {} last"
" dialogs!"
),
"user_not_specified": (
"🚫 You haven't specified"
" user"
),
"approved": (
"✋ {} approved in pm'
),
"banned_log": (
'👮 I banned {}.\n\n{} Reported'
" spam\n{} Deleted dialog\n\n"
" 📝 Message\n{}"
),
"hello": (
"🔏 Unit «SIGMA» protects your personal messages from intrusions. It"
" will block everyone, who's trying to invade you.\n\nUse"
" .pmbl to enable protection, .pmblsett to"
" configure it and .pmbanlast if you've already been"
" pm-raided."
),
}
strings_ru = {
"state": (
"🛡 Текущее состояние"
" PM->BL: {}\nСообщать о спаме? - {}\nУдалять диалог? - {}"
),
"args": (
"🚫 Пример:"
" .pmblsett 0 0"
),
"args_pmban": (
"🚫 Пример:"
" .pmbanlast 5"
),
"banned": (
"😃 Добрый день"
" •ᴗ•\nЮнит «SIGMA», защитник этого аккаунта. Вы не"
" потверждены! Вы можете связаться с моим владельцем в чате,"
" если нужна помощь.\nЯ вынужден заблокировать вас из соображений"
" безопасности"
),
"hello": (
"🔏 Юнит «SIGMA» защищает твои личные сообщенния от неизвестных"
" пользователей. Он будет блокировать всех, кто не соответствует"
" настройкам.\n\nВведи .pmbl для активации защиты,"
" .pmblsett для ее настройки и .pmbanlast если"
" нужно очистить уже прошедший рейд на личные сообщения."
),
"removing": (
"🚮 Удаляю {} последних"
" диалогов..."
),
"removed": (
"🚮 Удалил {} последних"
" диалогов!"
),
"user_not_specified": (
"🚫 Укажи"
" пользователя"
),
"_cmd_doc_pmbl": "Включить или выключить защиту",
"_cmd_doc_pmbanlast": (
"<количество> - Забанить и удалить n последних диалогов с пользователями"
),
"_cmd_doc_allowpm": "<пользователь> - Разрешить пользователю писать тебе в ЛС",
"_cls_doc": "Блокирует и репортит входящие сообщения от незнакомцев",
"approved": (
"✋ {} одобрен в лс'
),
"banned_log": (
'👮 Я заблокировал {}.\n\n{}'
" Сообщил"
" о спаме\n{} Удалил диалог\n\n📝"
" Сообщение\n{}"
),
}
strings_de = {
"state": (
"🛡 Aktueller PM->BL"
" Status: {}\nSpam melden? - {}\nDialoge löschen? - {}"
),
"args": (
"🚫 Beispiel:"
" .pmblsett 0 0"
),
"args_pmban": (
"🚫 Beispiel:"
" .pmbanlast 5"
),
"banned": (
"😃 Hallo"
" •ᴗ•\nEinheit «SIGMA», der Schutz dieses Accounts. Sie"
" sind nicht autorisiert! Sie können sich an den Besitzer meines"
" Accounts wenden, wenn Sie Hilfe benötigen.\nIch bin gezwungen, Sie aus"
" Sicherheitsgründen zu sperren"
),
"hello": (
"🔏 Einheit «SIGMA» schützt Ihre persönlichen Nachrichten vor"
" unbekannten Benutzern. Es wird alle blockieren, die nicht den"
" Einstellungen entsprechen.\n\nGeben Sie .pmbl ein, um die"
" Schutzfunktion zu aktivieren, .pmblsett zum Konfigurieren"
" und .pmbanlast, wenn Sie bereits einen Raid auf Ihre"
" persönlichen Nachrichten durchgeführt haben."
),
"removing": (
"🚮 Entferne {} letzte"
" Dialoge..."
),
"removed": (
"🚮 Entfernt {} letzte"
" Dialoge!"
),
"user_not_specified": (
"🚫 Du hast keinen"
" Benutzer angegeben"
),
"_cmd_doc_pmbl": "Aktiviert oder deaktiviert den Schutz",
"_cmd_doc_pmbanlast": (
" - Bannt und löscht n letzte Dialoge mit Benutzern"
),
"_cmd_doc_allowpm": (
" - Erlaubt dem Benutzer, dir eine private Nachricht zu senden"
),
"_cls_doc": "Blockiert und meldet eingehende Nachrichten von Unbekannten",
"approved": (
"✋ {} wurde in den Ls genehmigt'
),
"banned_log": (
'👮 Ich habe {} geblockt.\n\n{} Hat'
" über Spam berichtet\n{} Hat den Dialog gelöscht\n\n📝"
" Nachricht\n{}"
),
}
strings_tr = {
"state": (
"🛡 Şu anki PM->BL durumu:"
" {}\nSpam rapor edilsin mi? - {}\nSohbetler silinsin mi? - {}"
),
"args": (
"🚫 Örnek:"
" .pmblsett 0 0"
),
"args_pmban": (
"🚫 Örnek:"
" .pmbanlast 5"
),
"banned": (
"😃 Merhaba"
" •ᴗ•\n«SIGMA» birimi, hesabınızın koruması. Yetkili"
" değilsiniz! Yardım için hesabımın sahibi ile iletişime"
" geçebilirsiniz.\nGüvenlik nedeniyle sizi zorunlu olarak"
" engelliyorum"
),
"hello": (
"🔏 «SIGMA» birimi, tanımadığınız kullanıcılarla kişisel"
" mesajlarınızı korur. Ayarlara uygun olmayanları tümünü engeller.\n\n"
".pmbl yazarak koruma özelliğini etkinleştirebilir, "
".pmblsett yazarak yapılandırabilir ve zaten kişisel"
" mesajlarınıza bir raid gerçekleştirdiyseniz .pmbanlast"
" yazarak bunu gerçekleştirebilirsiniz."
),
"removing": (
"🚮 Son {} sohbet"
" siliniyor..."
),
"removed": (
"🚮 Son {} sohbet"
" silindi!"
),
"user_not_specified": (
"🚫 Bir kullanıcı"
" belirtmediniz"
),
"_cmd_doc_pmbl": "Korumayı etkinleştirir veya devre dışı bırakır",
"_cmd_doc_pmbanlast": " - Kullanıcılarla son n sohbeti yasaklar ve siler",
"_cmd_doc_allowpm": (
" - Kullanıcıya kişisel mesaj göndermeye izin verir"
),
"_cls_doc": (
"Tanımadığınız kullanıcıların gelen mesajlarını engeller ve rapor eder"
),
"approved": (
"✋ {} Ls listesine eklendi'
),
"banned_log": (
'👮 {} engellendi.\n\n{} Spam rapor'
" etti\n{} Sohbeti sildi\n\n📝 Mesaj\n{}"
),
}
strings_uz = {
"state": (
"🛡 Joriy PM->BL holati:"
" {}\nSpam haqida xabar berilsinmi? - {}\nSuhbatlar o'chirilsinmi? -"
" {}"
),
"args": (
"🚫 Misol:"
" .pmblsett 0 0"
),
"args_pmban": (
"🚫 Misol:"
" .pmbanlast 5"
),
"banned": (
"😃 Salom"
" •ᴗ•\n«SIGMA» birimi, hisobingizni himoya. Ruxsat"
" berilmaganingiz! Yordam kerak bo'lsa hisobimning egasi bilan"
" bog'lanishingiz mumkin.\nXavfsizlik sababli sizni majbur qilishim"
" kerak"
),
"hello": (
"🔏 «SIGMA» birimi, tanimaydigan foydalanuvchilar bilan"
" shaxsiy xabarlarini himoya qiladi. Sozlamalarga mos bo'lmasa"
" barchasini bloklashadi.\n\n.pmbl yozib himoya"
" imkoniyatini yoqish, .pmblsett yozib konfiguratsiyani"
" o'zgartirish va agar sizda shaxsiy xabarlariga raid bormi bo'lsa"
" .pmbanlast yozib uni bajarishingiz mumkin."
),
"removing": (
"🚮 Son {} suhbat"
" o'chirilmoqda..."
),
"removed": (
"🚮 Son {} suhbat"
" o'chirildi!"
),
"user_not_specified": (
"🚫 Siz foydalanuvchi"
" belgilamadingiz"
),
"_cmd_doc_pmbl": "Himoyani yoqadi yoki o'chiradi",
"_cmd_doc_pmbanlast": (
" - Foydalanuvchilar bilan son n suhbatni yasaklaydi"
),
"_cmd_doc_allowpm": (
" - Foydalanuvchiga shaxsiy xabar yuborishga ruxsat beradi"
),
"_cls_doc": "Tanimaydigan foydalanuvchilar gelen xabarlarini bloklashadi",
"approved": (
"✋ {} Ls ro'yxatiga qo'shildi"
),
"banned_log": (
'👮 {} bloklandi.\n\n{} Spam xabar'
" berdi\n{} Suhbat o'chirildi\n\n📝"
" Xabar\n{}"
),
}
strings_hi = {
"state": (
"🛡 वर्तमान PM->BL स्थिति:"
" {}\nस्पैम रिपोर्ट करें? - {}\nडायलॉगहटाएं? - {}"
),
"args": (
"🚫 उदाहरण:"
" .pmblsett 0 0"
),
"args_pmban": (
"🚫 उदाहरण:"
" .pmbanlast 5"
),
"banned": (
"😃 नमस्ते"
" •ᴗ•\nयूनिट «SIGMA», इस खाते की सुरक्षा. आप"
" अनधिकृत हैं! आप मेरे खाते के मालिक को अपनी मदद के लिए या आपको"
" सहायता की आवश्यकता है तो उसे संपर्क कर सकते हैं।\nमैं आपको सुरक्षा के"
" कारण बंद करने के लिए बाधित कर दूंगा"
),
"hello": (
"🔏 यूनिट «SIGMA» अपने निजी संदेशों को अज्ञात उपयोगकर्ताओं से"
" सुरक्षित करता है। इसे सेटिंग्स के अनुसार सभी ब्लॉक करेगा।\n\n"
".pmbl दर्ज करें, ताकि सुरक्षा कार्यक्षमता सक्रिय हो, "
".pmblsett कॉन्फ़िगर करने के लिए और .pmbanlast, जब आपने"
" अपने निजी संदेशों पर एक रैड किया है।"
),
"removing": (
"🚮 {} अंतिम डायलॉग हटा"
" रहा है..."
),
"removed": (
"🚮 {} अंतिम डायलॉग हटा"
" दिया!"
),
"user_not_specified": (
"🚫 आपने किसी उपयोगकर्ता"
" को नहीं निर्दिष्ट किया"
),
"_cmd_doc_pmbl": "सुरक्षा को सक्षम या अक्षम करता है",
"_cmd_doc_pmbanlast": "<अंक> - उपयोगकर्ताओं के साथ निजी संदेशों को ब्लॉक और हटाता है",
"_cmd_doc_allowpm": (
"<उपयोगकर्ता> - उपयोगकर्ता को आपको एक निजी संदेश भेजने की अनुमति देता है"
),
"_cmd_doc_pmblsett": (
"<ब्लॉक> <अनुमति> - ब्लॉक और अनुमति को सेट करता है, जब आपके पास एक निजी संदेश आता है"
),
"_cls_doc": "एक निजी संदेश भेजने की अनुमति देता है",
}
def __init__(self):
self._queue = []
self._ban_queue = []
self.config = loader.ModuleConfig(
loader.ConfigValue(
"ignore_contacts",
True,
lambda: "Ignore contacts?",
validator=loader.validators.Boolean(),
),
loader.ConfigValue(
"ignore_active",
True,
lambda: "Ignore peers, where you participated?",
validator=loader.validators.Boolean(),
),
loader.ConfigValue(
"active_threshold",
5,
lambda: "What number of your messages is required to trust peer",
validator=loader.validators.Integer(minimum=1),
),
loader.ConfigValue(
"custom_message",
doc=lambda: "Custom message to notify untrusted peers. Leave empty for default one",
),
loader.ConfigValue(
"photo",
"https://github.com/hikariatama/assets/raw/master/unit_sigma.png",
lambda: "Photo, which is sent along with banned notification",
validator=loader.validators.Link(),
),
loader.ConfigValue(
"report_spam",
False,
lambda: "Report spam?",
validator=loader.validators.Boolean(),
),
loader.ConfigValue(
"delete_dialog",
False,
lambda: "Delete dialog?",
validator=loader.validators.Boolean(),
),
loader.ConfigValue(
"silent",
False,
lambda: "Do not send anything to banned user",
validator=loader.validators.Boolean(),
),
)
async def client_ready(self):
self._whitelist = self.get("whitelist", [])
self._ratelimit = []
self._ratelimit_timeout = 5 * 60
self._ratelimit_threshold = 10
if not self.get("ignore_hello", False):
await self.inline.bot.send_photo(
self._tg_id,
photo=(
r"https://github.com/hikariatama/assets/raw/master/unit_sigma.png"
),
caption=self.strings("hello"),
parse_mode="HTML",
)
self.set("ignore_hello", True)
async def pmblcmd(self, message: Message):
"""Toggle PMBL"""
current = self.get("state", False)
new = not current
self.set("state", new)
await utils.answer(
message,
self.strings("state").format(
"on" if new else "off",
"yes" if self.config["report_spam"] else "no",
"yes" if self.config["delete_dialog"] else "no",
),
)
async def pmbanlastcmd(self, message: Message):
""" - Ban and delete dialogs with n most new users"""
n = utils.get_args_raw(message)
if not n or not n.isdigit():
await utils.answer(message, self.strings("args_pmban"))
return
n = int(n)
await utils.answer(message, self.strings("removing").format(n))
dialogs = []
async for dialog in self._client.iter_dialogs(ignore_pinned=True):
try:
if not isinstance(dialog.message.peer_id, PeerUser):
continue
except AttributeError:
continue
m = (
await self._client.get_messages(
dialog.message.peer_id,
limit=1,
reverse=True,
)
)[0]
dialogs += [
(
get_peer_id(dialog.message.peer_id),
int(time.mktime(m.date.timetuple())),
)
]
dialogs.sort(key=lambda x: x[1])
to_ban = [d for d, _ in dialogs[::-1][:n]]
for d in to_ban:
await self._client(BlockRequest(id=d))
await self._client(DeleteHistoryRequest(peer=d, just_clear=True, max_id=0))
await utils.answer(message, self.strings("removed").format(n))
def _approve(self, user: int, reason: str = "unknown"):
self._whitelist += [user]
self._whitelist = list(set(self._whitelist))
self.set("whitelist", self._whitelist)
logger.debug(f"User approved in pm {user}, filter: {reason}")
return
async def allowpmcmd(self, message: Message):
""" - Allow user to pm you"""
args = utils.get_args_raw(message)
reply = await message.get_reply_message()
user = None
try:
user = await self._client.get_entity(args)
except Exception:
with contextlib.suppress(Exception):
user = await self._client.get_entity(reply.sender_id) if reply else None
if not user:
chat = await message.get_chat()
if not isinstance(chat, User):
await utils.answer(message, self.strings("user_not_specified"))
return
user = chat
self._approve(user.id, "manual_approve")
await utils.answer(
message, self.strings("approved").format(user.id, get_display_name(user))
)
async def watcher(self, message: Message):
if (
getattr(message, "out", False)
or not isinstance(message, Message)
or not isinstance(message.peer_id, PeerUser)
or not self.get("state", False)
or utils.get_chat_id(message)
in {
1271266957, # @replies
777000, # Telegram Notifications
self._tg_id, # Self
}
):
return
self._queue += [message]
@loader.loop(interval=0.05, autostart=True)
async def ban_loop(self):
if not self._ban_queue:
return
message = self._ban_queue.pop(0)
self._ratelimit = list(
filter(
lambda x: x + self._ratelimit_timeout < time.time(),
self._ratelimit,
)
)
dialog = None
if len(self._ratelimit) < self._ratelimit_threshold:
if not self.config["silent"]:
try:
await self._client.send_file(
message.peer_id,
self.config["photo"],
caption=self.config["custom_message"] or self.strings("banned"),
)
except Exception:
await utils.answer(
message,
self.config["custom_message"] or self.strings("banned"),
)
self._ratelimit += [round(time.time())]
try:
dialog = await self._client.get_entity(message.peer_id)
except ValueError:
pass
await self.inline.bot.send_message(
self._client.tg_id,
self.strings("banned_log").format(
dialog.id if dialog is not None else message.sender_id,
(
utils.escape_html(dialog.first_name)
if dialog is not None
else (
getattr(getattr(message, "sender", None), "username", None)
or message.sender_id
)
),
format_(self.config["report_spam"]),
format_(self.config["delete_dialog"]),
utils.escape_html(
""
if message.photo
else (
"