# █ █ ▀ █▄▀ ▄▀█ █▀█ ▀ # █▀█ █ █ █ █▀█ █▀▄ █ # © Copyright 2022 # https://t.me/hikariatama # # 🔒 Licensed under the GNU AGPLv3 # 🌐 https://www.gnu.org/licenses/agpl-3.0.html # meta pic: https://img.icons8.com/external-wanicon-flat-wanicon/344/external-dead-halloween-costume-avatar-wanicon-flat-wanicon.png # meta developer: @hikarimods # meta banner: https://mods.hikariatama.ru/badges/inactive.jpg # scope: hikka_only # scope: hikka_min 1.3.0 import asyncio import contextlib import logging import time from telethon.tl.types import Message from telethon.utils import get_display_name from .. import loader, utils from ..inline.types import InlineCall logger = logging.getLogger(__name__) @loader.tds class Inactive(loader.Module): """Blocks people who are inactive for a long time. Check .config""" strings = { "name": "Inactive", "config": ( "🚫 You need to" " configure module first: \n\n⚙️ {}config {}" ), "confirm": ( "⚠️ Please, confirm that you want to start cleaning this chat from" " inactive users with these parameters:\n\n⌚️ Inactive time:" " {}\n💭 Minimal amount of messages: {}\n\n☝️ Please, note," " that this operation might take a lot of API requests and cause" " FloodWaits" ), "start": "🧹 Start", "cancel": "🔻 Cancel", "configure": "⚙️ Open config", "started": "😼 Processing started! This message will update", "processing": ( "🫶 Processed {} messages from {} users. Already found {} users to" " {} and" " {} trusted\n\nStill processing..." ), "kick": "kick", "ban": "ban", "processing_complete": ( "😻 Processing complete! Processed {} messages from {} users. Found {}" " users to {}. Apply restrictions?\n" ), "processing_already": "😼 Processing already in progress!", "restrictions_applied": "🔒 Action `{}` applied to {} inactive users!", "cancelling_processing": "🔻 Cancelling processing...", "processing_cancelled": "😼 Processing cancelled!", "hrs": "hour(-s)", "applying_restrictions": ( "🔒 Applying restrictions. Found {} users to {}" ), "restrict": "🔒 Restrict", "no_users": "😼 No inactive users found!", "messages": "messages", "waiting_lock": ( "🛃 Processing is already active in other chat, waiting for lock to" " release" ), } strings_ru = { "config": ( "🚫 Вам нужно вначале" " настроить модуль: \n\n⚙️ {}config {}" ), "confirm": ( "⚠️ Пожалуйста, подтвердите, что вы хотите начать очистку этого чата от" " неактивных пользователей с этими параметрами:\n\n⌚️ Время" " неактивности: {}\n💭 Минимальное количество сообщений: {}\n\n☝️" " Пожалуйста, обратите внимание, что эта операция может занять много API" " запросов и вызвать FloodWait'ы" ), "start": "🧹 Начать", "cancel": "🔻 Отмена", "configure": "⚙️ Открыть настройки", "started": "😼 Обработка началась! Это сообщение будет обновляться", "processing": ( "🫶 Обработано {} сообщений от {} пользователей. Уже найдено {}" " пользователей для {} и {} доверенных\n\nВсе еще обрабатываю..." ), "kick": "кика", "ban": "бана", "processing_complete": ( "😻 Обработка завершена! Обработано {} сообщений от {} пользователей." " Найдено {} пользователей для {}. Применять ограничения?\n" ), "processing_already": "😼 Обработка уже выполняется!", "restrictions_applied": ( "🔒 Действие `{}` применено к {} неактивным пользователям!" ), "cancelling_processing": "🔻 Отменяю обработку...", "processing_cancelled": "😼 Обработка отменена!", "hrs": "час(-ов)", "applying_restrictions": ( "🔒 Применяю ограничения. Найдено {} пользователей для {}" ), "restrict": "🔒 Ограничить", "no_users": "😼 Не найдено неактивных пользователей!", "messages": "сообщений", "waiting_lock": ( "🛃 Обработка уже выполняется в другом чате, жду освобождения" " блокировки" ), } _lock = {} _global_lock = asyncio.Lock() def __init__(self): self.config = loader.ModuleConfig( loader.ConfigValue( "action", "kick", "Action to perform when user is inactive", validator=loader.validators.Choice(["ban", "kick"]), ), loader.ConfigValue( "inactive_time", None, ( "If specified, any user, which sent no messages for this amount of" " hours, will be blocked." ), validator=loader.validators.Union( loader.validators.Integer(minimum=1), loader.validators.NoneType() ), ), loader.ConfigValue( "inactive_messages", None, ( "If specified, any user, which sent less than this amount of" " messages, will be blocked." ), validator=loader.validators.Union( loader.validators.Integer(minimum=1), loader.validators.NoneType() ), ), ) async def _configure(self, call: InlineCall): await self.lookup("HikkaConfig").inline__configure( call, self.__class__.__name__, obj_type=False, ) async def _cancel(self, call: InlineCall, chat_id: int): if chat_id in self._lock: self._lock[chat_id].set() await call.edit(self.strings("processing_cancelled")) async def _start(self, call: InlineCall, chat_id: int): if chat_id in self._lock: await call.edit(self.strings("processing_already")) return self._lock[chat_id] = asyncio.Event() markup = { "text": self.strings("cancel"), "callback": self._cancel, "args": (chat_id,), } chat = await self._client.get_entity(chat_id) data = {} restrict = set() processing_finished = asyncio.Event() async def _(): nonlocal call, data, restrict while True: await asyncio.sleep(20) if ( processing_finished.is_set() or chat_id not in self._lock or self._lock[chat_id].is_set() ): break await call.edit( self.strings("processing").format( sum([len(user_messages) for user_messages in data.values()]), len(data), len(restrict), self.strings(self.config["action"]), len( [ user for user, messages in data.items() if ( not self.config["inactive_messages"] or len(messages) > self.config["inactive_messages"] ) and ( not self.config["inactive_time"] or messages and time.time() - max(messages) < self.config["inactive_time"] * 3600 ) ] ), ), reply_markup=markup, ) await call.edit( ( self.strings("waiting_lock") if self._global_lock.locked() else self.strings("started") ), reply_markup=markup, ) async with self._global_lock: if self._lock[chat_id].is_set(): await call.edit(self.strings("processing_cancelled")) self._lock.pop(chat_id) return task = asyncio.ensure_future(_()) names = {} with contextlib.suppress(Exception): await self._client.end_takeout(True) async with self._client.takeout( **({"megagroups": True} if chat.megagroup else {"chats": True}) ) as takeout: async for user in takeout.iter_participants(chat): data.setdefault(user.id, []) names[user.id] = get_display_name(user) async for message in takeout.iter_messages(chat, wait_time=5): sender = message.sender_id if sender not in names: continue date = time.mktime(message.date.timetuple()) data.setdefault(sender, []).append(date) if self.config["inactive_time"]: if ( time.time() - max(data[sender]) > self.config["inactive_time"] * 3600 ): restrict.add(sender) elif sender in restrict: restrict.remove(sender) if self.config["inactive_messages"]: if len(data[sender]) < self.config["inactive_messages"]: restrict.add(sender) elif sender in restrict: restrict.remove(sender) if ( self.config["inactive_messages"] and all( len(msgs) > self.config["inactive_messages"] for msgs in data.values() ) and ( not self.config["inactive_time"] or all( msgs and time.time() - max(msgs) > self.config["inactive_time"] * 3600 for msgs in data.values() ) ) ): break if self._lock[chat_id].is_set(): await call.edit(self.strings("processing_cancelled")) self._lock.pop(chat_id) return for user, messages in data.items(): if ( self.config["inactive_messages"] and len(messages) < self.config["inactive_messages"] or self.config["inactive_time"] and time.time() - max(messages) > self.config["inactive_time"] * 3600 ): restrict.add(user) elif user in restrict: restrict.remove(user) processing_finished.set() task.cancel() if not restrict: await call.edit(self.strings("no_users")) self._lock.pop(chat_id) return m = self.strings("processing_complete").format( sum([len(user_messages) for user_messages in data.values()]), len(data), len(restrict), self.strings(self.config["action"]), ) for user in restrict: line = ( "\n▫️ {utils.escape_html(names.get(user, user))}" f" ({len(data[user])} {self.strings('messages')}," f" {round((time.time() - max(data[user])) / 3600, 1) if data[user] else 'n/a'} {self.strings('hrs')})" ) if len(m + line) >= 4096: m += "\n..." break m += line await call.edit( m, reply_markup=[ { "text": self.strings("restrict"), "callback": self._restrict, "args": (chat_id, restrict, markup), }, { "text": self.strings("cancel"), "callback": self._im_cancel, "args": (chat_id,), }, ], ) async def _im_cancel(self, call: InlineCall, chat_id: int): self._lock.pop(chat_id) await call.edit(self.strings("processing_cancelled")) async def _restrict( self, call: InlineCall, chat_id: int, restrict: set, markup: dict, ): await call.edit( self.strings("applying_restrictions").format( len(restrict), self.strings(self.config["action"]) ), reply_markup=markup, ) for user_id in restrict: if self.config["action"] == "kick": await self._client.kick_participant(chat_id, user_id) else: await self._client.edit_permissions( chat_id, user_id, until_date=0, view_messages=False, send_messages=False, send_media=False, send_stickers=False, send_gifs=False, send_games=False, send_inline=False, send_polls=False, change_info=False, invite_users=False, ) await asyncio.sleep(3) if self._lock[chat_id].is_set(): await call.edit(self.strings("processing_cancelled")) self._lock.pop(chat_id) return await call.edit( self.strings("restrictions_applied").format( self.strings(self.config["action"]), len(restrict), ) ) self._lock.pop(chat_id) @loader.command(ru_doc="Запустить чистку неактивных юзеров") async def inactive(self, message: Message): """Start inactive users cleaner""" if not self.config["inactive_time"] and not self.config["inactive_messages"]: await utils.answer( message, self.strings("config").format( self.get_prefix(), self.__class__.__name__, ), ) return if utils.get_chat_id(message) in self._lock: await utils.answer(message, self.strings("processing_already")) return await self.inline.form( message=message, text=self.strings("confirm").format( ( f'{self.config["inactive_time"]} {self.strings("hrs")}' if self.config["inactive_time"] else "-" ), self.config["inactive_messages"] or "-", ), reply_markup=[ [ { "text": self.strings("start"), "callback": self._start, "args": (utils.get_chat_id(message),), }, {"text": self.strings("cancel"), "action": "close"}, ], [{"text": self.strings("configure"), "callback": self._configure}], ], )