__version__ = (2, 0, 0)
# █ █ ▀ █▄▀ ▄▀█ █▀█ ▀
# █▀█ █ █ █ █▀█ █▀▄ █
# © Copyright 2022
# https://t.me/hikariatama
#
# 🔒 Licensed under the GNU AGPLv3
# 🌐 https://www.gnu.org/licenses/agpl-3.0.html
# meta pic: https://img.icons8.com/fluency/344/cancel-2.png
# meta developer: @hikarimods
# meta banner: https://mods.hikariatama.ru/badges/banstickers.jpg
# scope: hikka_only
# scope: hikka_min 1.3.3
# requires: aiofile Pillow
import asyncio
import io
import logging
import math
import operator
import os
import time
from functools import reduce
from aiofile import async_open
from PIL import Image, ImageChops
from telethon.tl.functions.messages import GetStickerSetRequest
from telethon.tl.types import Message
from .. import loader, utils
logger = logging.getLogger(__name__)
@loader.tds
class BanStickers(loader.Module):
"""Bans stickerpacks, stickers and gifs in chat"""
strings = {
"name": "BanStickers",
"args": (
"😵 Reply to sticker is"
" required"
),
"sticker_banned": (
"🛡 This sticker is now"
" banned in current chat"
),
"pack_banned": (
"🛡 {} sticker(-s) from"
" pack {} are now banned in current chat"
),
"wait": (
"⏳ Banning stickers"
" from this pack in current chat..."
),
"sticker_not_banned": (
"😵 This sticker is not"
" banned in current chat"
),
"sticker_unbanned": (
"🔐 This sticker is no"
" longer banned in current chat"
),
"pack_unbanned": (
"🔐 {} / {} sticker(-s)"
" from pack {} are no longer banned in current chat"
),
"pack_not_banned": (
"😵 This pack is not"
" banned in current chat"
),
"no_restrictions": (
"🎉 This chat has"
" no restrictions"
),
"all_unbanned": (
"🔐 All stickers are"
" unbanned in current chat"
),
"already_restricted": (
"🎉 Animated and video"
" stickers are already restricted"
),
"not_restricted": (
"🎉 Animated stickers"
" are not restricted"
),
"animations_restricted": (
"🔐 Animated and video"
" stickers are now restricted in current chat"
),
"animations_unrestricted": (
"🔐 Animated stickers"
" are no longer restricted"
),
}
strings_ru = {
"args": (
"😵 Нужен ответ на"
" стикер"
),
"sticker_banned": (
"🛡 Этот стикер теперь"
" запрещен в текущем чате"
),
"pack_banned": (
"🛡 {} стикер(-ов) из"
" пака {} теперь запрещены в текущем чате"
),
"wait": (
"⏳ Запрещаю"
" стикеры из этого пака в текущем чате..."
),
"sticker_not_banned": (
"😵 Этот стикер не"
" запрещен в текущем чате"
),
"sticker_unbanned": (
"🔐 Этот стикер больше"
" не запрещен в текущем чате"
),
"pack_unbanned": (
"🔐 {} / {} стикер(-ов)"
" из пака {} больше не запрещены в текущем чате"
),
"pack_not_banned": (
"😵 Этот пак не запре"
"щен в текущем чате"
),
"no_restrictions": (
"🎉 В текущем чате нет"
" ограничений"
),
"all_unbanned": (
"🔐 Все стикеры"
" разблокированы в текущем чате"
),
"already_restricted": (
"🎉 Анимированные и"
" видео стикеры уже запрещены"
),
"not_restricted": (
"🎉 Анимированные"
" стикеры не запрещены"
),
"animations_restricted": (
"🔐 Анимированные и"
" видео стикеры запрещены в текущем чате"
),
"animations_unrestricted": (
"🔐 Анимированные"
" стикеры больше не запрещены"
),
}
strings_de = {
"args": (
"😵 Antwort auf einen"
" Sticker erforderlich"
),
"sticker_banned": (
"🛡 Dieser Sticker ist"
" nun im aktuellen Chat gesperrt"
),
"pack_banned": (
"🛡 {} Sticker aus dem"
" Pack {} sind nun im aktuellen Chat gesperrt"
),
"wait": (
"⏳ Sticker aus diesem"
" Pack werden im aktuellen Chat gesperrt..."
),
"sticker_not_banned": (
"😵 Dieser Sticker ist"
" nicht im aktuellen Chat gesperrt"
),
"sticker_unbanned": (
"🔐 Dieser Sticker ist"
" nun wieder im aktuellen Chat erlaubt"
),
"pack_unbanned": (
"🔐 {} / {} Sticker aus"
" dem Pack {} sind nun wieder im aktuellen Chat erlaubt"
),
"pack_not_banned": (
"😵 Dieses Pack ist im"
" aktuellen Chat nicht gesperrt"
),
"no_restrictions": (
"🎉 Im aktuellen Chat"
" gibt es keine Einschränkungen"
),
"all_unbanned": (
"🔐 Alle Sticker"
" sind im"
" aktuellen Chat wieder erlaubt"
),
"already_restricted": (
"🎉 Animierte Sticker"
" sind bereits gesperrt"
),
"not_restricted": (
"🎉 Animierte Sticker"
" sind nicht gesperrt"
),
"animations_restricted": (
"🔐 Animierte Sticker"
" sind nun im aktuellen Chat gesperrt"
),
"animations_unrestricted": (
"🔐 Animierte Sticker"
" sind nun wieder im aktuellen Chat erlaubt"
),
}
strings_hi = {
"args": (
"😵 एक स्टिकर पर उत्तर"
" आवश्यक है"
),
"sticker_banned": (
"🛡 इस स्टिकर को वर्तमान"
" चैट में प्रतिबंधित किया गया है"
),
"pack_banned": (
"🛡 {1} पैक से {0}"
" स्टिकर वर्तमान चैट में प्रतिबंधित किए गए हैं"
),
"wait": (
"⏳ वर्तमान चैट में {1}"
" पैक से स्टिकर प्रतिबंधित किए जा रहे हैं..."
),
"sticker_not_banned": (
"😵 वर्तमान चैट में इस"
" स्टिकर को प्रतिबंधित नहीं किया गया है"
),
"sticker_unbanned": (
"🔐 इस स्टिकर को वर्तमान"
" चैट में प्रतिबंधित नहीं किया गया है"
),
"pack_unbanned": (
"🔐 {1} पैक से {0}"
" स्टिकर वर्तमान चैट में प्रतिबंधित नहीं किए गए हैं"
),
"pack_not_banned": (
"😵 वर्तमान चैट में यह"
" पैक प्रतिबंधित नहीं किया गया है"
),
"no_restrictions": (
"🎉 वर्तमान चैट में कोई"
" प्रतिबंध नहीं है"
),
"all_unbanned": (
"🔐 वर्तमान चैट में सभी"
" स्टिकर प्रतिबंधित नहीं किए गए हैं"
),
"already_restricted": (
"🎉 आगे बढ़ने के लिए"
" पहले से ही प्रतिबंधित किए गए हैं"
),
"not_restricted": (
"🎉 इस स्टिकर को पहले"
" से ही प्रतिबंधित नहीं किया गया है"
),
"already_unrestricted": (
"🎉 इस स्टिकर को पहले"
" से ही प्रतिबंधित नहीं किया गया है"
),
"animations_restricted": (
"🛡 वर्तमान चैट में"
" एनीमेटेड स्टिकर अब प्रतिबंधित हैं"
),
"animations_unrestricted": (
"🔐 वर्तमान चैट में"
" एनीमेटेड स्टिकर अब प्रतिबंधित नहीं हैं"
),
}
strings_uz = {
"pack_banned": (
"🔐 {1} pakidan {0}"
" stikerlar cheklangan"
),
"sticker_banned": (
"🔐 Stiker"
" cheklangan"
),
"not_a_pack": (
"😵 Bu paket emas"
),
"pack_not_banned": (
"😵 Ushbu paket"
" cheklangan emas"
),
"sticker_not_banned": (
"😵 Ushbu stiker"
" cheklangan emas"
),
"sticker_unbanned": (
"🔐 Stiker cheklangan"
" emas"
),
"pack_unbanned": (
"🔐 {1} pakidan {0}"
" stikerlar cheklangan emas"
),
"no_restrictions": (
"🎉 Ushbu chatda"
" cheklangan stikerlar yo'q"
),
"all_unbanned": (
"🔐 Ushbu chatda barcha"
" stikerlar cheklangan emas"
),
"already_restricted": (
"🎉 Ushbu stiker oldin"
" cheklangan"
),
"not_restricted": (
"🎉 Ushbu stiker"
" cheklangan emas"
),
"already_unrestricted": (
"🎉 Ushbu stiker oldin"
" cheklangan emas"
),
"animations_restricted": (
"🛡 Ushbu chatda"
" animatsiya stikerlari cheklangan"
),
"animations_unrestricted": (
"🔐 Ushbu chatda"
" animatsiya stikerlari cheklangan emas"
),
}
strings_tr = {
"pack_banned": (
"🔐 {1} paketinden {0}"
" çıkartma yasaklandı"
),
"sticker_banned": (
"🔐 Çıkartma"
" yasaklandı"
),
"not_a_pack": (
"😵 Bu bir paket"
" değil"
),
"pack_not_banned": (
"😵 Bu paket"
" yasaklanmamış"
),
"sticker_not_banned": (
"😵 Bu çıkartma"
" yasaklanmamış"
),
"sticker_unbanned": (
"🔐 Çıkartma"
" yasaklanmamış"
),
"pack_unbanned": (
"🔐 {1} paketinden {0}"
" çıkartma yasaklanmamış"
),
"no_restrictions": (
"🎉 Bu sohbette"
" yasaklanmış çıkartma yok"
),
"all_unbanned": (
"🔐 Bu sohbette tüm"
" çıkartmalar yasaklanmamış"
),
"already_restricted": (
"🎉 Bu çıkartma zaten"
" yasaklanmış"
),
"not_restricted": (
"🎉 Bu çıkartma"
" yasaklanmamış"
),
"already_unrestricted": (
"🎉 Bu çıkartma zaten"
" yasaklanmamış"
),
"animations_restricted": (
"🛡 Bu sohbette"
" animasyonlu çıkartmalar yasaklanmış"
),
"animations_unrestricted": (
"🔐 Bu sohbette"
" animasyonlu çıkartmalar yasaklanmamış"
),
}
def __init__(self):
self.config = loader.ModuleConfig(
loader.ConfigValue(
"rms_threshold",
4.0,
(
"The lower this value is - the more light the detection will be."
" 0.0 - Full match, 4.0 - approximate match"
),
validator=loader.validators.Float(maximum=10.0),
),
loader.ConfigValue(
"bantime",
180,
(
"Once the user sent forbidden sticker, he will be restricted from"
" sending more for this amount of seconds"
),
validator=loader.validators.Integer(minimum=60),
),
)
async def client_ready(self):
self._banlist = self.pointer("banlist", {})
self._bananim = self.pointer("bananim", [])
dir_path = os.path.abspath(
os.path.join(utils.get_base_dir(), "..", "loaded_modules")
)
if not os.path.isdir(dir_path):
os.mkdir(dir_path)
dir_path = os.path.join(dir_path, "banmedia")
if not os.path.isdir(dir_path):
os.mkdir(dir_path)
self._db_path = dir_path
self._cache = {}
self._filecache = {}
for file in os.listdir(self._db_path):
async with async_open(os.path.join(self._db_path, file), "rb") as f:
self._cache[file] = await f.read()
@staticmethod
def rmsdiff(image_1: Image, image_2: Image) -> float:
"Calculate the root-mean-square difference between two images"
# https://stackoverflow.com/a/11818358/19170642
try:
h = ImageChops.difference(image_1, image_2).histogram()
except ValueError:
return 100.0
return math.sqrt(
reduce(operator.add, map(lambda h, i: h * (i**2), h, range(256)))
/ (float(image_1.size[0]) * image_1.size[1])
)
async def _add_cache(self, sticker_id: int, bytes_: bytes):
if not os.path.isfile(os.path.join(self._db_path, str(sticker_id))):
async with async_open(
os.path.join(self._db_path, str(sticker_id)), "wb"
) as f:
await f.write(bytes_)
self._cache[str(sticker_id)] = bytes_
async def _remove_cache(self, sticker_id: int):
if os.path.isfile(os.path.join(self._db_path, str(sticker_id))):
os.remove(os.path.join(self._db_path, str(sticker_id)))
if str(sticker_id) in self._cache:
self._cache.pop(str(sticker_id))
@loader.command(
ru_doc="<ответ на стикер> - Запретить стикер в текущем чате",
de_doc=" - Verbotene Sticker in diesem Chat",
hi_doc="<उत्तर दिए गए स्टिकर पर> - इस चैट में अनुमति नहीं देने वाले स्टिकर",
uz_doc=" - Joriy suhbatda stikerni taqiqlash",
tr_doc=" - Bu sohbette yasaklanmış çıkartma",
)
async def banstick(self, message: Message):
""" - Ban sticker in current chat"""
reply = await message.get_reply_message()
if not reply or not reply.sticker:
await utils.answer(message, self.strings("args"))
return
chat_id = str(utils.get_chat_id(message))
self._banlist.setdefault(chat_id, []).append(reply.sticker.id)
self._banlist[chat_id] = list(set(self._banlist[chat_id]))
if reply.sticker.mime_type.startswith("image"):
await self._add_cache(reply.sticker.id, await reply.download_media(bytes))
await utils.answer(message, self.strings("sticker_banned"))
@loader.command(
ru_doc="<ответ на стикер> - Запретить весь стикерпак в текущем чате",
de_doc=" - Verbotene Stickerpack in diesem Chat",
hi_doc="<उत्तर दिए गए स्टिकर पर> - इस चैट में अनुमति नहीं देने वाले स्टिकर पैक",
uz_doc=" - Joriy suhbatda stikerni taqiqlash",
tr_doc=" - Bu sohbette yasaklanmış çıkartma paketi",
)
async def banpack(self, message: Message):
""" - Ban the whole stickerpack in current chat"""
reply = await message.get_reply_message()
if not reply or not reply.sticker:
await utils.answer(message, self.strings("args"))
return
message = await utils.answer(message, self.strings("wait"))
stickerset = await self._client(
GetStickerSetRequest(
next(
attr.stickerset
for attr in reply.sticker.attributes
if hasattr(attr, "stickerset")
),
hash=0,
)
)
stickers = stickerset.documents
chat_id = str(utils.get_chat_id(message))
for sticker in stickers:
self._banlist.setdefault(chat_id, []).append(sticker.id)
if sticker.mime_type.startswith("image"):
await self._add_cache(
sticker.id,
await self._client.download_file(sticker, bytes),
)
await asyncio.sleep(1) # Light FW protection
self._banlist[chat_id] = list(set(self._banlist[chat_id]))
await utils.answer(
message,
self.strings("pack_banned").format(
len(stickers),
utils.escape_html(stickerset.set.title),
),
)
@loader.command(
ru_doc="<ответ на стикер> - Разбанить стикер в текущем чате",
de_doc=" - Entbanne Sticker in diesem Chat",
hi_doc="<उत्तर दिए गए स्टिकर पर> - इस चैट में अनुमति देने वाले स्टिकर",
uz_doc=" - Joriy suhbatda stikerni taqiqlash",
tr_doc=" - Bu sohbette yasaklanmış çıkartma",
)
async def unbanstick(self, message: Message):
""" - Unban sticker in current chat"""
reply = await message.get_reply_message()
if not reply or not reply.sticker:
await utils.answer(message, self.strings("args"))
return
chat_id = str(utils.get_chat_id(message))
if reply.sticker.id not in self._banlist.get(chat_id, []):
await utils.answer(message, self.strings("sticker_not_banned"))
return
self._banlist[chat_id].remove(reply.sticker.id)
if reply.sticker.mime_type.startswith("image"):
await self._remove_cache(reply.sticker.id)
await utils.answer(message, self.strings("sticker_unbanned"))
@loader.command(
ru_doc="<ответ на стикер> - Разбанить весь стикерпак в текущем чате"
)
async def unbanpack(self, message: Message):
""" - Unban the whole stickerpack in current chat"""
reply = await message.get_reply_message()
if not reply or not reply.sticker:
await utils.answer(message, self.strings("args"))
return
message = await utils.answer(message, self.strings("wait"))
stickerset = await self._client(
GetStickerSetRequest(
next(
attr.stickerset
for attr in reply.sticker.attributes
if hasattr(attr, "stickerset")
),
hash=0,
)
)
stickers = stickerset.documents
chat_id = str(utils.get_chat_id(message))
unbanned = 0
for sticker in stickers:
if sticker.id in self._banlist.get(chat_id, []):
self._banlist[chat_id].remove(sticker.id)
if sticker.mime_type.startswith("image"):
await self._remove_cache(sticker.id)
unbanned += 1
if not unbanned:
await utils.answer(message, self.strings("pack_not_banned"))
return
await utils.answer(
message,
self.strings("pack_unbanned").format(
unbanned,
len(stickers),
utils.escape_html(stickerset.set.title),
),
)
@loader.command(
ru_doc="Убрать все ограничения в текущем чате",
de_doc="Entferne alle Einschränkungen in diesem Chat",
hi_doc="इस चैट में सभी सीमाएं निकालें",
uz_doc="Joriy suhbatda barcha cheklarni olib tashlang",
tr_doc="Bu sohbetteki tüm yasaklamaları kaldırın",
)
async def unbanall(self, message: Message):
"""Remove all restrictions in current chat"""
chat_id = str(utils.get_chat_id(message))
if not self._banlist.get(chat_id, []):
await utils.answer(message, self.strings("no_restrictions"))
return
for sticker in self._banlist.pop(chat_id):
await self._remove_cache(sticker)
await utils.answer(message, self.strings("all_unbanned"))
@loader.command(
ru_doc="Запретить анимированные и видео стикеры в этом чате",
de_doc="Verbiete animierte und Video-Sticker in diesem Chat",
hi_doc="इस चैट में एनीमेटेड और वीडियो स्टिकर्स को अस्वीकार करें",
uz_doc="Bu suhbatda animatsiya va video stikerni taqiqlang",
tr_doc="Bu sohbette animasyonlu ve video çıkartmaları yasaklayın",
)
async def bananim(self, message: Message):
"""Restrict animated stickers in current chat"""
chat_id = str(utils.get_chat_id(message))
if chat_id in self._bananim:
await utils.answer(message, self.strings("already_restricted"))
return
self._bananim.append(chat_id)
await utils.answer(message, self.strings("animations_restricted"))
@loader.command(
ru_doc="Разблокировать анимированные и видео стикеры в этом чате",
de_doc=(
"Entferne die Einschränkung für animierte und Video-Sticker in diesem Chat"
),
hi_doc="इस चैट में एनीमेटेड और वीडियो स्टिकर्स की प्रतिबंध निकालें",
uz_doc="Bu suhbatda animatsiya va video stikerni taqiqlashni olib tashlang",
tr_doc="Bu sohbette animasyonlu ve video çıkartmaları yasaklamasını kaldırın",
)
async def unbananim(self, message: Message):
"""Unrestrict animated stickers in current chat"""
chat_id = str(utils.get_chat_id(message))
if chat_id not in self._bananim:
await utils.answer(message, self.strings("not_restricted"))
return
self._bananim.remove(chat_id)
await utils.answer(message, self.strings("animations_unrestricted"))
@loader.watcher("in", only_stickers=True, only_groups=True)
async def watcher(self, message: Message):
chat_id = str(utils.get_chat_id(message))
if not self._banlist.get(chat_id):
return
async def _restrict():
nonlocal message
await message.delete()
await self._client.edit_permissions(
message.peer_id,
message.sender_id,
until_date=time.time() + self.config["bantime"],
send_stickers=False,
)
if not message.sticker.mime_type.startswith("image"):
if chat_id in self._bananim or message.sticker.id in self._banlist[chat_id]:
await _restrict()
return
if message.sticker.id in self._filecache:
file = self._filecache[message.sticker.id]
else:
file = await message.download_media(bytes)
self._filecache[message.sticker.id] = file
image = Image.open(io.BytesIO(file))
for sticker_id, bytes_ in self._cache.items():
res = await utils.run_sync(
self.rmsdiff,
image,
Image.open(io.BytesIO(bytes_)),
)
if res < self.config["rms_threshold"]:
await self._add_cache(sticker_id, file)
return await _restrict()