__version__ = (2, 0, 0) # █ █ ▀ █▄▀ ▄▀█ █▀█ ▀ # █▀█ █ █ █ █▀█ █▀▄ █ # © Copyright 2022 # https://t.me/hikariatama # # 🔒 Licensed under the GNU AGPLv3 # 🌐 https://www.gnu.org/licenses/agpl-3.0.html # meta pic: https://img.icons8.com/fluency/344/cancel-2.png # meta developer: @hikarimods # meta banner: https://mods.hikariatama.ru/badges/banstickers.jpg # scope: hikka_only # scope: hikka_min 1.3.3 # requires: aiofile Pillow import asyncio import io import logging import math import operator import os import time from functools import reduce from aiofile import async_open from PIL import Image, ImageChops from telethon.tl.functions.messages import GetStickerSetRequest from telethon.tl.types import Message from .. import loader, utils logger = logging.getLogger(__name__) @loader.tds class BanStickers(loader.Module): """Bans stickerpacks, stickers and gifs in chat""" strings = { "name": "BanStickers", "args": ( "😵 Reply to sticker is" " required" ), "sticker_banned": ( "🛡 This sticker is now" " banned in current chat" ), "pack_banned": ( "🛡 {} sticker(-s) from" " pack {} are now banned in current chat" ), "wait": ( " Banning stickers" " from this pack in current chat..." ), "sticker_not_banned": ( "😵 This sticker is not" " banned in current chat" ), "sticker_unbanned": ( "🔐 This sticker is no" " longer banned in current chat" ), "pack_unbanned": ( "🔐 {} / {} sticker(-s)" " from pack {} are no longer banned in current chat" ), "pack_not_banned": ( "😵 This pack is not" " banned in current chat" ), "no_restrictions": ( "🎉 This chat has" " no restrictions" ), "all_unbanned": ( "🔐 All stickers are" " unbanned in current chat" ), "already_restricted": ( "🎉 Animated and video" " stickers are already restricted" ), "not_restricted": ( "🎉 Animated stickers" " are not restricted" ), "animations_restricted": ( "🔐 Animated and video" " stickers are now restricted in current chat" ), "animations_unrestricted": ( "🔐 Animated stickers" " are no longer restricted" ), } strings_ru = { "args": ( "😵 Нужен ответ на" " стикер" ), "sticker_banned": ( "🛡 Этот стикер теперь" " запрещен в текущем чате" ), "pack_banned": ( "🛡 {} стикер(-ов) из" " пака {} теперь запрещены в текущем чате" ), "wait": ( " Запрещаю" " стикеры из этого пака в текущем чате..." ), "sticker_not_banned": ( "😵 Этот стикер не" " запрещен в текущем чате" ), "sticker_unbanned": ( "🔐 Этот стикер больше" " не запрещен в текущем чате" ), "pack_unbanned": ( "🔐 {} / {} стикер(-ов)" " из пака {} больше не запрещены в текущем чате" ), "pack_not_banned": ( "😵 Этот пак не запре" "щен в текущем чате" ), "no_restrictions": ( "🎉 В текущем чате нет" " ограничений" ), "all_unbanned": ( "🔐 Все стикеры" " разблокированы в текущем чате" ), "already_restricted": ( "🎉 Анимированные и" " видео стикеры уже запрещены" ), "not_restricted": ( "🎉 Анимированные" " стикеры не запрещены" ), "animations_restricted": ( "🔐 Анимированные и" " видео стикеры запрещены в текущем чате" ), "animations_unrestricted": ( "🔐 Анимированные" " стикеры больше не запрещены" ), } strings_de = { "args": ( "😵 Antwort auf einen" " Sticker erforderlich" ), "sticker_banned": ( "🛡 Dieser Sticker ist" " nun im aktuellen Chat gesperrt" ), "pack_banned": ( "🛡 {} Sticker aus dem" " Pack {} sind nun im aktuellen Chat gesperrt" ), "wait": ( " Sticker aus diesem" " Pack werden im aktuellen Chat gesperrt..." ), "sticker_not_banned": ( "😵 Dieser Sticker ist" " nicht im aktuellen Chat gesperrt" ), "sticker_unbanned": ( "🔐 Dieser Sticker ist" " nun wieder im aktuellen Chat erlaubt" ), "pack_unbanned": ( "🔐 {} / {} Sticker aus" " dem Pack {} sind nun wieder im aktuellen Chat erlaubt" ), "pack_not_banned": ( "😵 Dieses Pack ist im" " aktuellen Chat nicht gesperrt" ), "no_restrictions": ( "🎉 Im aktuellen Chat" " gibt es keine Einschränkungen" ), "all_unbanned": ( "🔐 Alle Sticker" " sind im" " aktuellen Chat wieder erlaubt" ), "already_restricted": ( "🎉 Animierte Sticker" " sind bereits gesperrt" ), "not_restricted": ( "🎉 Animierte Sticker" " sind nicht gesperrt" ), "animations_restricted": ( "🔐 Animierte Sticker" " sind nun im aktuellen Chat gesperrt" ), "animations_unrestricted": ( "🔐 Animierte Sticker" " sind nun wieder im aktuellen Chat erlaubt" ), } strings_hi = { "args": ( "😵 एक स्टिकर पर उत्तर" " आवश्यक है" ), "sticker_banned": ( "🛡 इस स्टिकर को वर्तमान" " चैट में प्रतिबंधित किया गया है" ), "pack_banned": ( "🛡 {1} पैक से {0}" " स्टिकर वर्तमान चैट में प्रतिबंधित किए गए हैं" ), "wait": ( " वर्तमान चैट में {1}" " पैक से स्टिकर प्रतिबंधित किए जा रहे हैं..." ), "sticker_not_banned": ( "😵 वर्तमान चैट में इस" " स्टिकर को प्रतिबंधित नहीं किया गया है" ), "sticker_unbanned": ( "🔐 इस स्टिकर को वर्तमान" " चैट में प्रतिबंधित नहीं किया गया है" ), "pack_unbanned": ( "🔐 {1} पैक से {0}" " स्टिकर वर्तमान चैट में प्रतिबंधित नहीं किए गए हैं" ), "pack_not_banned": ( "😵 वर्तमान चैट में यह" " पैक प्रतिबंधित नहीं किया गया है" ), "no_restrictions": ( "🎉 वर्तमान चैट में कोई" " प्रतिबंध नहीं है" ), "all_unbanned": ( "🔐 वर्तमान चैट में सभी" " स्टिकर प्रतिबंधित नहीं किए गए हैं" ), "already_restricted": ( "🎉 आगे बढ़ने के लिए" " पहले से ही प्रतिबंधित किए गए हैं" ), "not_restricted": ( "🎉 इस स्टिकर को पहले" " से ही प्रतिबंधित नहीं किया गया है" ), "already_unrestricted": ( "🎉 इस स्टिकर को पहले" " से ही प्रतिबंधित नहीं किया गया है" ), "animations_restricted": ( "🛡 वर्तमान चैट में" " एनीमेटेड स्टिकर अब प्रतिबंधित हैं" ), "animations_unrestricted": ( "🔐 वर्तमान चैट में" " एनीमेटेड स्टिकर अब प्रतिबंधित नहीं हैं" ), } strings_uz = { "pack_banned": ( "🔐 {1} pakidan {0}" " stikerlar cheklangan" ), "sticker_banned": ( "🔐 Stiker" " cheklangan" ), "not_a_pack": ( "😵 Bu paket emas" ), "pack_not_banned": ( "😵 Ushbu paket" " cheklangan emas" ), "sticker_not_banned": ( "😵 Ushbu stiker" " cheklangan emas" ), "sticker_unbanned": ( "🔐 Stiker cheklangan" " emas" ), "pack_unbanned": ( "🔐 {1} pakidan {0}" " stikerlar cheklangan emas" ), "no_restrictions": ( "🎉 Ushbu chatda" " cheklangan stikerlar yo'q" ), "all_unbanned": ( "🔐 Ushbu chatda barcha" " stikerlar cheklangan emas" ), "already_restricted": ( "🎉 Ushbu stiker oldin" " cheklangan" ), "not_restricted": ( "🎉 Ushbu stiker" " cheklangan emas" ), "already_unrestricted": ( "🎉 Ushbu stiker oldin" " cheklangan emas" ), "animations_restricted": ( "🛡 Ushbu chatda" " animatsiya stikerlari cheklangan" ), "animations_unrestricted": ( "🔐 Ushbu chatda" " animatsiya stikerlari cheklangan emas" ), } strings_tr = { "pack_banned": ( "🔐 {1} paketinden {0}" " çıkartma yasaklandı" ), "sticker_banned": ( "🔐 Çıkartma" " yasaklandı" ), "not_a_pack": ( "😵 Bu bir paket" " değil" ), "pack_not_banned": ( "😵 Bu paket" " yasaklanmamış" ), "sticker_not_banned": ( "😵 Bu çıkartma" " yasaklanmamış" ), "sticker_unbanned": ( "🔐 Çıkartma" " yasaklanmamış" ), "pack_unbanned": ( "🔐 {1} paketinden {0}" " çıkartma yasaklanmamış" ), "no_restrictions": ( "🎉 Bu sohbette" " yasaklanmış çıkartma yok" ), "all_unbanned": ( "🔐 Bu sohbette tüm" " çıkartmalar yasaklanmamış" ), "already_restricted": ( "🎉 Bu çıkartma zaten" " yasaklanmış" ), "not_restricted": ( "🎉 Bu çıkartma" " yasaklanmamış" ), "already_unrestricted": ( "🎉 Bu çıkartma zaten" " yasaklanmamış" ), "animations_restricted": ( "🛡 Bu sohbette" " animasyonlu çıkartmalar yasaklanmış" ), "animations_unrestricted": ( "🔐 Bu sohbette" " animasyonlu çıkartmalar yasaklanmamış" ), } def __init__(self): self.config = loader.ModuleConfig( loader.ConfigValue( "rms_threshold", 4.0, ( "The lower this value is - the more light the detection will be." " 0.0 - Full match, 4.0 - approximate match" ), validator=loader.validators.Float(maximum=10.0), ), loader.ConfigValue( "bantime", 180, ( "Once the user sent forbidden sticker, he will be restricted from" " sending more for this amount of seconds" ), validator=loader.validators.Integer(minimum=60), ), ) async def client_ready(self): self._banlist = self.pointer("banlist", {}) self._bananim = self.pointer("bananim", []) dir_path = os.path.abspath( os.path.join(utils.get_base_dir(), "..", "loaded_modules") ) if not os.path.isdir(dir_path): os.mkdir(dir_path) dir_path = os.path.join(dir_path, "banmedia") if not os.path.isdir(dir_path): os.mkdir(dir_path) self._db_path = dir_path self._cache = {} self._filecache = {} for file in os.listdir(self._db_path): async with async_open(os.path.join(self._db_path, file), "rb") as f: self._cache[file] = await f.read() @staticmethod def rmsdiff(image_1: Image, image_2: Image) -> float: "Calculate the root-mean-square difference between two images" # https://stackoverflow.com/a/11818358/19170642 try: h = ImageChops.difference(image_1, image_2).histogram() except ValueError: return 100.0 return math.sqrt( reduce(operator.add, map(lambda h, i: h * (i**2), h, range(256))) / (float(image_1.size[0]) * image_1.size[1]) ) async def _add_cache(self, sticker_id: int, bytes_: bytes): if not os.path.isfile(os.path.join(self._db_path, str(sticker_id))): async with async_open( os.path.join(self._db_path, str(sticker_id)), "wb" ) as f: await f.write(bytes_) self._cache[str(sticker_id)] = bytes_ async def _remove_cache(self, sticker_id: int): if os.path.isfile(os.path.join(self._db_path, str(sticker_id))): os.remove(os.path.join(self._db_path, str(sticker_id))) if str(sticker_id) in self._cache: self._cache.pop(str(sticker_id)) @loader.command( ru_doc="<ответ на стикер> - Запретить стикер в текущем чате", de_doc=" - Verbotene Sticker in diesem Chat", hi_doc="<उत्तर दिए गए स्टिकर पर> - इस चैट में अनुमति नहीं देने वाले स्टिकर", uz_doc=" - Joriy suhbatda stikerni taqiqlash", tr_doc=" - Bu sohbette yasaklanmış çıkartma", ) async def banstick(self, message: Message): """ - Ban sticker in current chat""" reply = await message.get_reply_message() if not reply or not reply.sticker: await utils.answer(message, self.strings("args")) return chat_id = str(utils.get_chat_id(message)) self._banlist.setdefault(chat_id, []).append(reply.sticker.id) self._banlist[chat_id] = list(set(self._banlist[chat_id])) if reply.sticker.mime_type.startswith("image"): await self._add_cache(reply.sticker.id, await reply.download_media(bytes)) await utils.answer(message, self.strings("sticker_banned")) @loader.command( ru_doc="<ответ на стикер> - Запретить весь стикерпак в текущем чате", de_doc=" - Verbotene Stickerpack in diesem Chat", hi_doc="<उत्तर दिए गए स्टिकर पर> - इस चैट में अनुमति नहीं देने वाले स्टिकर पैक", uz_doc=" - Joriy suhbatda stikerni taqiqlash", tr_doc=" - Bu sohbette yasaklanmış çıkartma paketi", ) async def banpack(self, message: Message): """ - Ban the whole stickerpack in current chat""" reply = await message.get_reply_message() if not reply or not reply.sticker: await utils.answer(message, self.strings("args")) return message = await utils.answer(message, self.strings("wait")) stickerset = await self._client( GetStickerSetRequest( next( attr.stickerset for attr in reply.sticker.attributes if hasattr(attr, "stickerset") ), hash=0, ) ) stickers = stickerset.documents chat_id = str(utils.get_chat_id(message)) for sticker in stickers: self._banlist.setdefault(chat_id, []).append(sticker.id) if sticker.mime_type.startswith("image"): await self._add_cache( sticker.id, await self._client.download_file(sticker, bytes), ) await asyncio.sleep(1) # Light FW protection self._banlist[chat_id] = list(set(self._banlist[chat_id])) await utils.answer( message, self.strings("pack_banned").format( len(stickers), utils.escape_html(stickerset.set.title), ), ) @loader.command( ru_doc="<ответ на стикер> - Разбанить стикер в текущем чате", de_doc=" - Entbanne Sticker in diesem Chat", hi_doc="<उत्तर दिए गए स्टिकर पर> - इस चैट में अनुमति देने वाले स्टिकर", uz_doc=" - Joriy suhbatda stikerni taqiqlash", tr_doc=" - Bu sohbette yasaklanmış çıkartma", ) async def unbanstick(self, message: Message): """ - Unban sticker in current chat""" reply = await message.get_reply_message() if not reply or not reply.sticker: await utils.answer(message, self.strings("args")) return chat_id = str(utils.get_chat_id(message)) if reply.sticker.id not in self._banlist.get(chat_id, []): await utils.answer(message, self.strings("sticker_not_banned")) return self._banlist[chat_id].remove(reply.sticker.id) if reply.sticker.mime_type.startswith("image"): await self._remove_cache(reply.sticker.id) await utils.answer(message, self.strings("sticker_unbanned")) @loader.command( ru_doc="<ответ на стикер> - Разбанить весь стикерпак в текущем чате" ) async def unbanpack(self, message: Message): """ - Unban the whole stickerpack in current chat""" reply = await message.get_reply_message() if not reply or not reply.sticker: await utils.answer(message, self.strings("args")) return message = await utils.answer(message, self.strings("wait")) stickerset = await self._client( GetStickerSetRequest( next( attr.stickerset for attr in reply.sticker.attributes if hasattr(attr, "stickerset") ), hash=0, ) ) stickers = stickerset.documents chat_id = str(utils.get_chat_id(message)) unbanned = 0 for sticker in stickers: if sticker.id in self._banlist.get(chat_id, []): self._banlist[chat_id].remove(sticker.id) if sticker.mime_type.startswith("image"): await self._remove_cache(sticker.id) unbanned += 1 if not unbanned: await utils.answer(message, self.strings("pack_not_banned")) return await utils.answer( message, self.strings("pack_unbanned").format( unbanned, len(stickers), utils.escape_html(stickerset.set.title), ), ) @loader.command( ru_doc="Убрать все ограничения в текущем чате", de_doc="Entferne alle Einschränkungen in diesem Chat", hi_doc="इस चैट में सभी सीमाएं निकालें", uz_doc="Joriy suhbatda barcha cheklarni olib tashlang", tr_doc="Bu sohbetteki tüm yasaklamaları kaldırın", ) async def unbanall(self, message: Message): """Remove all restrictions in current chat""" chat_id = str(utils.get_chat_id(message)) if not self._banlist.get(chat_id, []): await utils.answer(message, self.strings("no_restrictions")) return for sticker in self._banlist.pop(chat_id): await self._remove_cache(sticker) await utils.answer(message, self.strings("all_unbanned")) @loader.command( ru_doc="Запретить анимированные и видео стикеры в этом чате", de_doc="Verbiete animierte und Video-Sticker in diesem Chat", hi_doc="इस चैट में एनीमेटेड और वीडियो स्टिकर्स को अस्वीकार करें", uz_doc="Bu suhbatda animatsiya va video stikerni taqiqlang", tr_doc="Bu sohbette animasyonlu ve video çıkartmaları yasaklayın", ) async def bananim(self, message: Message): """Restrict animated stickers in current chat""" chat_id = str(utils.get_chat_id(message)) if chat_id in self._bananim: await utils.answer(message, self.strings("already_restricted")) return self._bananim.append(chat_id) await utils.answer(message, self.strings("animations_restricted")) @loader.command( ru_doc="Разблокировать анимированные и видео стикеры в этом чате", de_doc=( "Entferne die Einschränkung für animierte und Video-Sticker in diesem Chat" ), hi_doc="इस चैट में एनीमेटेड और वीडियो स्टिकर्स की प्रतिबंध निकालें", uz_doc="Bu suhbatda animatsiya va video stikerni taqiqlashni olib tashlang", tr_doc="Bu sohbette animasyonlu ve video çıkartmaları yasaklamasını kaldırın", ) async def unbananim(self, message: Message): """Unrestrict animated stickers in current chat""" chat_id = str(utils.get_chat_id(message)) if chat_id not in self._bananim: await utils.answer(message, self.strings("not_restricted")) return self._bananim.remove(chat_id) await utils.answer(message, self.strings("animations_unrestricted")) @loader.watcher("in", only_stickers=True, only_groups=True) async def watcher(self, message: Message): chat_id = str(utils.get_chat_id(message)) if not self._banlist.get(chat_id): return async def _restrict(): nonlocal message await message.delete() await self._client.edit_permissions( message.peer_id, message.sender_id, until_date=time.time() + self.config["bantime"], send_stickers=False, ) if not message.sticker.mime_type.startswith("image"): if chat_id in self._bananim or message.sticker.id in self._banlist[chat_id]: await _restrict() return if message.sticker.id in self._filecache: file = self._filecache[message.sticker.id] else: file = await message.download_media(bytes) self._filecache[message.sticker.id] = file image = Image.open(io.BytesIO(file)) for sticker_id, bytes_ in self._cache.items(): res = await utils.run_sync( self.rmsdiff, image, Image.open(io.BytesIO(bytes_)), ) if res < self.config["rms_threshold"]: await self._add_cache(sticker_id, file) return await _restrict()