# Copyright (C) 2019 The Raphielscape Company LLC. # # Licensed under the Raphielscape Public License, Version 1.c (the "License"); # you may not use this file except in compliance with the License. """ Userbot module to help you manage a group """ from asyncio import sleep from os import remove from telethon.errors import (BadRequestError, ChatAdminRequiredError, ImageProcessFailedError, PhotoCropSizeSmallError, UserAdminInvalidError) from telethon.errors.rpcerrorlist import (UserIdInvalidError, MessageTooLongError) from telethon.tl.functions.channels import (EditAdminRequest, EditBannedRequest, EditPhotoRequest) from telethon.tl.functions.messages import UpdatePinnedMessageRequest from telethon.tl.types import (PeerChat, PeerChannel, ChannelParticipantsAdmins, ChatAdminRights, ChatBannedRights, MessageEntityMentionName, MessageMediaPhoto, ChannelParticipantsBots) from userbot import BOTLOG, BOTLOG_CHATID, CMD_HELP, bot from userbot.events import register # =================== CONSTANT =================== PP_TOO_SMOL = "`Картинка слишком маленькая`" PP_ERROR = "`Сбой при обработке изображения`" NO_ADMIN = "`Я не админ здесь!`" NO_PERM = "`У меня нет достаточных разрешений!`" NO_SQL = "`Запуск в режиме Non-SQL!`" CHAT_PP_CHANGED = "`Картинка чата изменена`" CHAT_PP_ERROR = "`Некоторые проблемы с обновлением рисунка,`" \ "`может быть потому что я не админ здесь,`" \ "`или не умею достаточных прав.`" INVALID_MEDIA = "`Недействительное резрешение`" BANNED_RIGHTS = ChatBannedRights( until_date=None, view_messages=True, send_messages=True, send_media=True, send_stickers=True, send_gifs=True, send_games=True, send_inline=True, embed_links=True, ) UNBAN_RIGHTS = ChatBannedRights( until_date=None, send_messages=None, send_media=None, send_stickers=None, send_gifs=None, send_games=None, send_inline=None, embed_links=None, ) MUTE_RIGHTS = ChatBannedRights(until_date=None, send_messages=True) UNMUTE_RIGHTS = ChatBannedRights(until_date=None, send_messages=False) # ================================================ @register(outgoing=True, pattern="^\.setgpic$", groups_only=True) async def set_group_photo(gpic): """ Команда .setgpic изменяет картинку группы""" if not gpic.is_group: await gpic.edit("`Я не думаю, что это группа.`") return replymsg = await gpic.get_reply_message() chat = await gpic.get_chat() admin = chat.admin_rights creator = chat.creator photo = None if not admin and not creator: await gpic.edit(NO_ADMIN) return if replymsg and replymsg.media: if isinstance(replymsg.media, MessageMediaPhoto): photo = await gpic.client.download_media(message=replymsg.photo) elif "image" in replymsg.media.document.mime_type.split('/'): photo = await gpic.client.download_file(replymsg.media.document) else: await gpic.edit(INVALID_MEDIA) if photo: try: await gpic.client( EditPhotoRequest(gpic.chat_id, await gpic.client.upload_file(photo))) await gpic.edit(CHAT_PP_CHANGED) except PhotoCropSizeSmallError: await gpic.edit(PP_TOO_SMOL) except ImageProcessFailedError: await gpic.edit(PP_ERROR) @register(outgoing=True, pattern="^\.promote(?: |$)(.*)", groups_only=True) async def promote(promt): """Команда .promote повышает реплайнутого/тэгнутого пользователя""" # Get targeted chat chat = await promt.get_chat() # Grab admin status or creator in a chat admin = chat.admin_rights creator = chat.creator # If not admin and not creator, also return if not admin and not creator: await promt.edit(NO_ADMIN) return new_rights = ChatAdminRights(add_admins=False, invite_users=True, change_info=False, ban_users=True, delete_messages=True, pin_messages=True) await promt.edit("`Повышение...`") user, rank = await get_user_from_event(promt) if not rank: # Just in case. rank = "admeme" if user: pass else: return # Try to promote if current user is admin or creator try: await promt.client( EditAdminRequest(promt.chat_id, user.id, new_rights, rank)) await promt.edit("`Повышение успешно!`") # If Telethon spit BadRequestError, assume # we don't have Promote permission except BadRequestError: await promt.edit(NO_PERM) return # Announce to the logging group if we have promoted successfully if BOTLOG: await promt.client.send_message( BOTLOG_CHATID, "#ПОВЫШЕНИЕ\n" f"ПОЛЬЗОВАТЕЛЬ: [{user.first_name}](tg://user?id={user.id})\n" f"ЧАТ: {promt.chat.title}(`{promt.chat_id}`)") @register(outgoing=True, pattern="^\.demote(?: |$)(.*)", groups_only=True) async def demote(dmod): """Команда .demote понижает реплайнутого/тэгнутого пользователя""" # Admin right check chat = await dmod.get_chat() admin = chat.admin_rights creator = chat.creator if not admin and not creator: await dmod.edit(NO_ADMIN) return # If passing, declare that we're going to demote await dmod.edit("`Понижение...`") rank = "admeme" # dummy rank, lol. user = await get_user_from_event(dmod) user = user[0] if user: pass else: return # New rights after demotion newrights = ChatAdminRights(add_admins=None, invite_users=None, change_info=None, ban_users=None, delete_messages=None, pin_messages=None) # Edit Admin Permission try: await dmod.client( EditAdminRequest(dmod.chat_id, user.id, newrights, rank)) # If we catch BadRequestError from Telethon # Assume we don't have permission to demote except BadRequestError: await dmod.edit(NO_PERM) return await dmod.edit("`Понижение успешно!`") # Announce to the logging group if we have demoted successfully if BOTLOG: await dmod.client.send_message( BOTLOG_CHATID, "#ПОНИЖЕНИЕ\n" f"ПОЛЬЗОВАТЕЛЬ: [{user.first_name}](tg://user?id={user.id})\n" f"ЧАТ: {dmod.chat.title}(`{dmod.chat_id}`)") @register(outgoing=True, pattern="^\.ban(?: |$)(.*)", groups_only=True) async def ban(bon): """Команда .ban банит реплайнутого/тэгнутого пользователя""" # Here laying the sanity check chat = await bon.get_chat() admin = chat.admin_rights creator = chat.creator # Well if not admin and not creator: await bon.edit(NO_ADMIN) return user, reason = await get_user_from_event(bon) if user: pass else: return # Announce that we're going to whack the pest await bon.edit("`Ударить вредителя!`") try: await bon.client(EditBannedRequest(bon.chat_id, user.id, BANNED_RIGHTS)) except BadRequestError: await bon.edit(NO_PERM) return # Helps ban group join spammers more easily try: reply = await bon.get_reply_message() if reply: await reply.delete() except BadRequestError: await bon.edit( "`У меня нет прав на удаление сообщений! Но он все равно забанен!`") return # Delete message and then tell that the command # is done gracefully # Shout out the ID, so that fedadmins can fban later if reason: await bon.edit(f"{user.first_name} забанен(-а)!!\ \nID: `{str(user.id)}`\ \nПричина: {reason}") else: await bon.edit(f"{user.first_name} забанен(-а)!!\ \nID: `{str(user.id)}`") # Announce to the logging group if we have banned the person # successfully! if BOTLOG: await bon.client.send_message( BOTLOG_CHATID, "#БАН\n" f"ПОЛЬЗОВАТЕЛЬ: [{user.first_name}](tg://user?id={user.id})\n" f"ЧАТ: {bon.chat.title}(`{bon.chat_id}`)") @register(outgoing=True, pattern="^\.unban(?: |$)(.*)", groups_only=True) async def nothanos(unbon): """Команда .unban разбанит реплайнутого/тэгнутого пользователя""" # Here laying the sanity check chat = await unbon.get_chat() admin = chat.admin_rights creator = chat.creator # Well if not admin and not creator: await unbon.edit(NO_ADMIN) return # If everything goes well... await unbon.edit("`Разбан...`") user = await get_user_from_event(unbon) user = user[0] if user: pass else: return try: await unbon.client( EditBannedRequest(unbon.chat_id, user.id, UNBAN_RIGHTS)) await unbon.edit("```Разбан успешен!```") if BOTLOG: await unbon.client.send_message( BOTLOG_CHATID, "#РАЗБАН\n" f"ПОЛЬЗОВАТЕЛЬ: [{user.first_name}](tg://user?id={user.id})\n" f"ЧАТ: {unbon.chat.title}(`{unbon.chat_id}`)") except UserIdInvalidError: await unbon.edit("`Ох, моя логика разбана сломалась!`") @register(outgoing=True, pattern="^\.mute(?: |$)(.*)", groups_only=True) async def spider(spdr): """ Это функция в основном приглушает писки :) """ # Check if the function running under SQL mode try: from userbot.modules.sql_helper.spam_mute_sql import mute except AttributeError: await spdr.edit(NO_SQL) return # Admin or creator check chat = await spdr.get_chat() admin = chat.admin_rights creator = chat.creator # If not admin and not creator, return if not admin and not creator: await spdr.edit(NO_ADMIN) return user, reason = await get_user_from_event(spdr) if user: pass else: return self_user = await spdr.client.get_me() if user.id == self_user.id: await spdr.edit( "`Руки слишком коротки, я не могу заклеить себя скотчем...\n(ヘ・_・)ヘ┳━┳`") return # If everything goes well, do announcing and mute await spdr.edit("`Достаю скотч!`") if mute(spdr.chat_id, user.id) is False: return await spdr.edit('`Ошибка! Пользователь возможно уже в муте.`') else: try: await spdr.client( EditBannedRequest(spdr.chat_id, user.id, MUTE_RIGHTS)) # Announce that the function is done if reason: await spdr.edit(f"`Безопасно заткнут скотчем!!!`\nПричина: {reason}") else: await spdr.edit("`Безопасно заткнут скотчем!!!`") # Announce to logging group if BOTLOG: await spdr.client.send_message( BOTLOG_CHATID, "#МУТ\n" f"ПОЛЬЗОВАТЕЛЬ: [{user.first_name}](tg://user?id={user.id})\n" f"ЧАТ: {spdr.chat.title}(`{spdr.chat_id}`)") except UserIdInvalidError: return await spdr.edit("`Ох, моя логика мута сломалась!`") @register(outgoing=True, pattern="^\.unmute(?: |$)(.*)", groups_only=True) async def unmoot(unmot): """Команда .unmute размутить реплайнутого/тэгнутого пользователя""" # Admin or creator check chat = await unmot.get_chat() admin = chat.admin_rights creator = chat.creator # If not admin and not creator, return if not admin and not creator: await unmot.edit(NO_ADMIN) return # Check if the function running under SQL mode try: from userbot.modules.sql_helper.spam_mute_sql import unmute except AttributeError: await unmot.edit(NO_SQL) return # If admin or creator, inform the user and start unmuting await unmot.edit('```Размут...```') user = await get_user_from_event(unmot) user = user[0] if user: pass else: return if unmute(unmot.chat_id, user.id) is False: return await unmot.edit("`Ошибка! Пользователь возможно уже размучен.`") else: try: await unmot.client( EditBannedRequest(unmot.chat_id, user.id, UNBAN_RIGHTS)) await unmot.edit("```Размут успешен```") except UserIdInvalidError: await unmot.edit("`Ох, моя логика размута сломалась!`") return if BOTLOG: await unmot.client.send_message( BOTLOG_CHATID, "#РАЗМУТ\n" f"ПОЛЬЗОВАТЕЛЬ: [{user.first_name}](tg://user?id={user.id})\n" f"ЧАТ: {unmot.chat.title}(`{unmot.chat_id}`)") @register(incoming=True, disable_errors=True) async def muter(moot): """Используется для удаления сообщений от людей в муте""" try: from userbot.modules.sql_helper.spam_mute_sql import is_muted from userbot.modules.sql_helper.gmute_sql import is_gmuted except AttributeError: return muted = is_muted(moot.chat_id) gmuted = is_gmuted(moot.sender_id) rights = ChatBannedRights( until_date=None, send_messages=True, send_media=True, send_stickers=True, send_gifs=True, send_games=True, send_inline=True, embed_links=True, ) if muted: for i in muted: if str(i.sender) == str(moot.sender_id): try: await moot.delete() await moot.client( EditBannedRequest(moot.chat_id, moot.sender_id, rights)) except (BadRequestError, UserAdminInvalidError, ChatAdminRequiredError, UserIdInvalidError): await moot.client.send_read_acknowledge( moot.chat_id, moot.id) if gmuted: for i in gmuted: if i.sender == str(moot.sender_id): try: await moot.delete() except BadRequestError: await moot.client.send_read_acknowledge( moot.chat_id, moot.id) @register(outgoing=True, pattern="^\.ungmute(?: |$)(.*)", groups_only=True) async def ungmoot(un_gmute): """Команда .ungmute ungmutes the target in the userbot """ # Admin or creator check chat = await un_gmute.get_chat() admin = chat.admin_rights creator = chat.creator # If not admin and not creator, return if not admin and not creator: await un_gmute.edit(NO_ADMIN) return # Check if the function running under SQL mode try: from userbot.modules.sql_helper.gmute_sql import ungmute except AttributeError: await un_gmute.edit(NO_SQL) return user = await get_user_from_event(un_gmute) user = user[0] if user: pass else: return # If pass, inform and start ungmuting await un_gmute.edit('```Ungmuting...```') if ungmute(user.id) is False: await un_gmute.edit("`Error! User probably not gmuted.`") else: # Inform about success await un_gmute.edit("```Ungmuted Successfully```") if BOTLOG: await un_gmute.client.send_message( BOTLOG_CHATID, "#UNGMUTE\n" f"USER: [{user.first_name}](tg://user?id={user.id})\n" f"CHAT: {un_gmute.chat.title}(`{un_gmute.chat_id}`)") @register(outgoing=True, pattern="^\.gmute(?: |$)(.*)", groups_only=True) async def gspider(gspdr): """ For .gmute command, globally mutes the replied/tagged person """ # Admin or creator check chat = await gspdr.get_chat() admin = chat.admin_rights creator = chat.creator # If not admin and not creator, return if not admin and not creator: await gspdr.edit(NO_ADMIN) return # Check if the function running under SQL mode try: from userbot.modules.sql_helper.gmute_sql import gmute except AttributeError: await gspdr.edit(NO_SQL) return user, reason = await get_user_from_event(gspdr) if user: pass else: return # If pass, inform and start gmuting await gspdr.edit("`Grabs a huge, sticky duct tape!`") if gmute(user.id) is False: await gspdr.edit( '`Error! User probably already gmuted.\nRe-rolls the tape.`') else: if reason: await gspdr.edit(f"`Globally taped!`Reason: {reason}") else: await gspdr.edit("`Globally taped!`") if BOTLOG: await gspdr.client.send_message( BOTLOG_CHATID, "#GMUTE\n" f"USER: [{user.first_name}](tg://user?id={user.id})\n" f"CHAT: {gspdr.chat.title}(`{gspdr.chat_id}`)") @register(outgoing=True, pattern="^\.zombies(?: |$)(.*)", groups_only=True) async def rm_deletedacc(show): """ For .delusers command, list all the ghost/deleted accounts in a chat. """ if not show.is_group: await show.edit("`I don't think this is a group.`") return con = show.pattern_match.group(1).lower() del_u = 0 del_status = "`No deleted accounts found, Group is cleaned as Hell`" if con != "clean": await show.edit("`Searching for zombie accounts...`") async for user in show.client.iter_participants(show.chat_id): if user.deleted: del_u += 1 await sleep(1) if del_u > 0: del_status = f"Found **{del_u}** deleted account(s) in this group,\ \nclean them by using `.zombies clean`" await show.edit(del_status) return # Here laying the sanity check chat = await show.get_chat() admin = chat.admin_rights creator = chat.creator # Well if not admin and not creator: await show.edit("`I am not an admin here!`") return await show.edit("`Deleting deleted accounts...\nOh I can do that?!?!`") del_u = 0 del_a = 0 async for user in show.client.iter_participants(show.chat_id): if user.deleted: try: await show.client( EditBannedRequest(show.chat_id, user.id, BANNED_RIGHTS)) except ChatAdminRequiredError: await show.edit("`I don't have ban rights in this group`") return except UserAdminInvalidError: del_u -= 1 del_a += 1 await show.client( EditBannedRequest(show.chat_id, user.id, UNBAN_RIGHTS)) del_u += 1 if del_u > 0: del_status = f"Cleaned **{del_u}** deleted account(s)" if del_a > 0: del_status = f"Cleaned **{del_u}** deleted account(s) \ \n**{del_a}** deleted admin accounts are not removed" await show.edit(del_status) await sleep(2) await show.delete() if BOTLOG: await show.client.send_message( BOTLOG_CHATID, "#CLEANUP\n" f"Cleaned **{del_u}** deleted account(s) !!\ \nCHAT: {show.chat.title}(`{show.chat_id}`)") @register(outgoing=True, pattern="^\.admins$", groups_only=True) async def get_admin(show): """ For .admins command, list all of the admins of the chat. """ info = await show.client.get_entity(show.chat_id) title = info.title if info.title else "this chat" mentions = f'Admins in {title}: \n' try: async for user in show.client.iter_participants( show.chat_id, filter=ChannelParticipantsAdmins): if not user.deleted: link = f"{user.first_name}" userid = f"{user.id}" mentions += f"\n{link} {userid}" else: mentions += f"\nDeleted Account {user.id}" except ChatAdminRequiredError as err: mentions += " " + str(err) + "\n" try: await show.edit(mentions, parse_mode="html") except MessageTooLongError: await show.edit( "Damn, too many admins here. Uploading admin list as file.") file = open("adminlist.txt", "w+") file.write(mentions) file.close() await show.client.send_file( show.chat_id, "adminlist.txt", caption='Admins in {}'.format(title), reply_to=show.id, ) remove("adminlist.txt") @register(outgoing=True, pattern="^\.bots$", groups_only=True) async def get_bots(show): """ For .bots command, list all of the bots of the chat. """ info = await show.client.get_entity(show.chat_id) title = info.title if info.title else "this chat" mentions = f'Bots in {title}:\n' try: if isinstance(show.to_id, PeerChat): await show.edit("`I heard that only Supergroups can have bots.`") return else: async for user in show.client.iter_participants( show.chat_id, filter=ChannelParticipantsBots): if not user.deleted: link = f"{user.first_name}" userid = f"{user.id}" mentions += f"\n{link} {userid}" else: mentions += f"\nDeleted Bot {user.id}" except ChatAdminRequiredError as err: mentions += " " + str(err) + "\n" try: await show.edit(mentions, parse_mode="html") except MessageTooLongError: await show.edit( "Damn, too many bots here. Uploading bots list as file.") file = open("botlist.txt", "w+") file.write(mentions) file.close() await show.client.send_file( show.chat_id, "botlist.txt", caption='Bots in {}'.format(title), reply_to=show.id, ) remove("botlist.txt") @register(outgoing=True, pattern="^\.pin(?: |$)(.*)", groups_only=True) async def pin(msg): """ For .pin command, pins the replied/tagged message on the top the chat. """ # Admin or creator check chat = await msg.get_chat() admin = chat.admin_rights creator = chat.creator # If not admin and not creator, return if not admin and not creator: await msg.edit(NO_ADMIN) return to_pin = msg.reply_to_msg_id if not to_pin: await msg.edit("`Reply to a message to pin it.`") return options = msg.pattern_match.group(1) is_silent = True if options.lower() == "loud": is_silent = False try: await msg.client( UpdatePinnedMessageRequest(msg.to_id, to_pin, is_silent)) except BadRequestError: await msg.edit(NO_PERM) return await msg.edit("`Pinned Successfully!`") user = await get_user_from_id(msg.from_id, msg) if BOTLOG: await msg.client.send_message( BOTLOG_CHATID, "#PIN\n" f"ADMIN: [{user.first_name}](tg://user?id={user.id})\n" f"CHAT: {msg.chat.title}(`{msg.chat_id}`)\n" f"LOUD: {not is_silent}") @register(outgoing=True, pattern="^\.kick(?: |$)(.*)", groups_only=True) async def kick(usr): """ For .kick command, kicks the replied/tagged person from the group. """ # Admin or creator check chat = await usr.get_chat() admin = chat.admin_rights creator = chat.creator # If not admin and not creator, return if not admin and not creator: await usr.edit(NO_ADMIN) return user, reason = await get_user_from_event(usr) if not user: await usr.edit("`Couldn't fetch user.`") return await usr.edit("`Kicking...`") try: await usr.client.kick_participant(usr.chat_id, user.id) await sleep(.5) except Exception as e: await usr.edit(NO_PERM) return if reason: await usr.edit( f"`Kicked` [{user.first_name}](tg://user?id={user.id})`!`\nReason: {reason}" ) else: await usr.edit( f"`Kicked` [{user.first_name}](tg://user?id={user.id})`!`") if BOTLOG: await usr.client.send_message( BOTLOG_CHATID, "#KICK\n" f"USER: [{user.first_name}](tg://user?id={user.id})\n" f"CHAT: {usr.chat.title}(`{usr.chat_id}`)\n") @register(outgoing=True, pattern="^\.users ?(.*)", groups_only=True) async def get_users(show): """ For .users command, list all of the users in a chat. """ info = await show.client.get_entity(show.chat_id) title = info.title if info.title else "this chat" mentions = 'Users in {}: \n'.format(title) try: if not show.pattern_match.group(1): async for user in show.client.iter_participants(show.chat_id): if not user.deleted: mentions += f"\n[{user.first_name}](tg://user?id={user.id}) `{user.id}`" else: mentions += f"\nDeleted Account `{user.id}`" else: searchq = show.pattern_match.group(1) async for user in show.client.iter_participants( show.chat_id, search=f'{searchq}'): if not user.deleted: mentions += f"\n[{user.first_name}](tg://user?id={user.id}) `{user.id}`" else: mentions += f"\nDeleted Account `{user.id}`" except ChatAdminRequiredError as err: mentions += " " + str(err) + "\n" try: await show.edit(mentions) except MessageTooLongError: await show.edit( "Damn, this is a huge group. Uploading users lists as file.") file = open("userslist.txt", "w+") file.write(mentions) file.close() await show.client.send_file( show.chat_id, "userslist.txt", caption='Users in {}'.format(title), reply_to=show.id, ) remove("userslist.txt") async def get_user_from_event(event): """ Get the user from argument or replied message. """ args = event.pattern_match.group(1).split(':', 1) extra = None if event.reply_to_msg_id and not len(args) == 2: previous_message = await event.get_reply_message() user_obj = await event.client.get_entity(previous_message.from_id) extra = event.pattern_match.group(1) elif len(args[0]) > 0: user = args[0] if len(args) == 2: extra = args[1] if user.isnumeric(): user = int(user) if not user: await event.edit("`Pass the user's username, id or reply!`") return if event.message.entities is not None: probable_user_mention_entity = event.message.entities[0] if isinstance(probable_user_mention_entity, MessageEntityMentionName): user_id = probable_user_mention_entity.user_id user_obj = await event.client.get_entity(user_id) return user_obj try: user_obj = await event.client.get_entity(user) except (TypeError, ValueError) as err: await event.edit(str(err)) return None return user_obj, extra async def get_user_from_id(user, event): if isinstance(user, str): user = int(user) try: user_obj = await event.client.get_entity(user) except (TypeError, ValueError) as err: await event.edit(str(err)) return None return user_obj CMD_HELP.update({ "admin": ".promote : (or) reply to a message with .promote \ \nUsage: Provides admin rights to the person in the chat.\ \n\n.demote (or) reply to a message with .demote\ \nUsage: Revokes the person's admin permissions in the chat.\ \n\n.ban : (or) reply to a message with .ban \ \nUsage: Bans the person off your chat.\ \n\n.unban (or) reply to a message with .unban\ \nUsage: Removes the ban from the person in the chat.\ \n\n.mute : reply to a message with .mute \ \nUsage: Mutes the person in the chat, works on admins too.\ \n\n.unmute (or) reply to a message with .unmute\ \nUsage: Removes the person from the muted list.\ \n\n.gmute : (or) reply to a message with .gmute \ \nUsage: Mutes the person in all groups you have in common with them.\ \n\n.ungmute (or) reply to a message with .ungmute\ \nUsage: Removes the person from the global mute list.\ \n\n.delusers\ \nUsage: Searches for deleted accounts in a group. Use .delusers clean to remove deleted accounts from the group.\ \n\n.admins\ \nUsage: Retrieves a list of admins in the chat.\ \n\n.bots\ \nUsage: Retrieves a list of bots in the chat.\ \n\n.users or .users \ \nUsage: Retrieves all (or queried) users in the chat.\ \n\n.setgpic \ \nUsage: Changes the group's display picture." })