# --------------------------------------------------------------------------------- # ░█▀▄░▄▀▀▄░█▀▄░█▀▀▄░█▀▀▄░█▀▀▀░▄▀▀▄░░░█▀▄▀█ # ░█░░░█░░█░█░█░█▄▄▀░█▄▄█░█░▀▄░█░░█░░░█░▀░█ # ░▀▀▀░░▀▀░░▀▀░░▀░▀▀░▀░░▀░▀▀▀▀░░▀▀░░░░▀░░▒▀ # # _____ _ _ # | ____|_ _| |_| |_ __ _ ___ _ _ # | _| \ \/ / __| __/ _` / __| | | | # | |___ > <| |_| || (_| \__ \ |_| | # |_____/_/\_\\__|\__\__,_|___/\__, | # |___/ # Name: PMBan # Description: Ban in pm for time # Author: @codrago_m, @exttasy1 # --------------------------------------------------------------------------------- # 🔒 Licensed under the GNU AGPLv3 # 🌐 https://www.gnu.org/licenses/agpl-3.0.html # --------------------------------------------------------------------------------- # Author: @codrago, @exttasy1 # Commands: pmban, pmunban # scope: hikka_only # meta developer: @codrago_m, @exttasy1 # meta banner: https://raw.githubusercontent.com/coddrago/modules/refs/heads/main/banner.png # meta pic: https://envs.sh/Hoh.webp # --------------------------------------------------------------------------------- __version__ = (1, 0, 0) from .. import loader, utils import telethon, asyncio, re, pymorphy2, math, time, logging from apscheduler.schedulers.asyncio import AsyncIOScheduler logging.getLogger('apscheduler').setLevel(logging.WARNING) logging.getLogger('pymorphy2').setLevel(logging.WARNING) @loader.tds class PMBan(loader.Module): """Ban in pm for time""" strings = { "name": "PMBan", } @staticmethod def _pluralize(morph, count, word): word_morph = morph.parse(word)[0] return word_morph.make_agree_with_number(count).word def format_time(self, seconds): morph = pymorphy2.MorphAnalyzer() if seconds < 0 or seconds == 0: return "forever" days = math.floor(seconds / (60 * 60 * 24)) seconds %= (60 * 60 * 24) hours = math.floor(seconds / (60 * 60)) seconds %= (60 * 60) minutes = math.floor(seconds / 60) seconds = math.floor(seconds % 60) time_parts = [] if days > 0: time_parts.append(f"for {days} {self._pluralize(morph, days, 'day')}") if hours > 0: time_parts.append(f"for {hours} {self._pluralize(morph, hours, 'hour')}") if minutes > 0: time_parts.append(f"for {minutes} {self._pluralize(morph, minutes, 'minute')}") if seconds > 0 or not time_parts: time_parts.append(f"for {seconds} {self._pluralize(morph, seconds, 'second')}") return " ".join(time_parts) @staticmethod def convert_time(text: str) -> int: try: if not str(text)[:-1].isdigit(): return 0 if "d" in str(text): text = int(text[:-1]) * 60 * 60 * 24 if "h" in str(text): text = int(text[:-1]) * 60 * 60 if "m" in str(text): text = int(text[:-1]) * 60 if "s" in str(text): text = int(text[:-1]) text = int(re.sub(r"[^0-9]", "", str(text))) except ValueError: return 0 return text async def loop_unban(self): ban_list = self.get('ban_list') for user in ban_list: user_id, time_ban = next(iter(user.items())) if time_ban[0] <= 0: continue end_ban = time_ban[0] + time_ban[1] if end_ban <= time.time(): await self.client(telethon.tl.functions.contacts.UnblockRequest(id=int(user_id))) await self.client.send_message(int(user_id), f" You have been successfully unbanned.") ban_list = [d for d in ban_list if int(user_id) not in d] self.set('ban_list', ban_list) async def client_ready(self): if not (ban_list := self.get('ban_list')) or not isinstance(ban_list, list): self.set('ban_list', []) self.scheduler = AsyncIOScheduler() self.scheduler.add_job(self.loop_unban, 'interval', seconds=3) self.scheduler.start() async def on_unload(self): self.scheduler.shutdown() @loader.command() async def pmban(self, message): """| ban in PM for time""" if not (args := utils.get_args_raw(message)): stime = 0 ban_list = self.get('ban_list') if any(message.chat_id in d for d in ban_list): await utils.answer(message, f" User already has been banned") return if args: stime = self.convert_time(args) str_time = self.format_time(seconds=stime).replace("minutes", "minute").replace("second", "second") await self.client(telethon.tl.functions.contacts.BlockRequest(id=message.chat_id)) await utils.answer(message, f" User succesfully banned {str_time}.") ban_list.append({message.chat_id: [stime, int(time.time())]}) self.set('ban_list', ban_list) @loader.command() async def pmunban(self, message): """| unban in PM""" ban_list = self.get('ban_list') print(ban_list) reply = await message.get_reply_message() if not reply: await utils.answer(message, f" User not found, please write this command in response to user messages.") return if not any(reply.from_id in d for d in ban_list): await utils.answer(message, f" User not banned") return await self.client(telethon.tl.functions.contacts.UnblockRequest(id=reply.from_id)) await utils.answer(message, f" User succesfully unbanned.") ban_list = [d for d in ban_list if reply.from_id not in d] self.set('ban_list', ban_list)