# -*- coding: utf-8 -*- # API/Module authors: @Fl1yd, @spypm # Thank for.... - no1!!, no1 helped us, we did everything ourselves # But we took one method of get messages from the Mishase's and Droox's modules import base64 import io import json from time import gmtime from typing import List, Union import requests import telethon from telethon.tl import types from telethon.tl.patched import Message from .. import loader, utils def get_message_media(message: Message): data = None if message and message.media: data = ( message.photo or message.sticker or message.video or message.video_note or message.gif or message.web_preview ) return data def get_entities(entities: types.TypeMessageEntity): # coded by @droox r = [] if entities: for entity in entities: entity = entity.to_dict() entity["type"] = entity.pop("_").replace("MessageEntity", "").lower() r.append(entity) return r def get_message_text(message: Message, reply: bool = False): return ( "πŸ“· Π€ΠΎΡ‚ΠΎ" if message.photo and reply else message.file.emoji + " Π‘Ρ‚ΠΈΠΊΠ΅Ρ€" if message.sticker and reply else "πŸ“Ή ВидСосообщСниС" if message.video_note and reply else "πŸ“Ή Π’ΠΈΠ΄Π΅ΠΎ" if message.video and reply else "πŸ–Ό GIF" if message.gif and reply else "πŸ“Š ΠžΠΏΡ€ΠΎΡ" if message.poll else "πŸ“ ΠœΠ΅ΡΡ‚ΠΎΠΏΠΎΠ»ΠΎΠΆΠ΅Π½ΠΈΠ΅" if message.geo else "πŸ‘€ ΠšΠΎΠ½Ρ‚Π°ΠΊΡ‚" if message.contact else f"🎡 ГолосовоС сообщСниС: {strftime(message.voice.attributes[0].duration)}" if message.voice else f"🎧 ΠœΡƒΠ·Ρ‹ΠΊΠ°: {strftime(message.audio.attributes[0].duration)} | {message.audio.attributes[0].performer} - {message.audio.attributes[0].title}" if message.audio else f"πŸ’Ύ Π€Π°ΠΉΠ»: {message.file.name}" if type(message.media) == types.MessageMediaDocument and not get_message_media(message) else f"{message.media.emoticon} Дайс: {message.media.value}" if type(message.media) == types.MessageMediaDice else f"Service message: {message.action.to_dict()['_']}" if type(message) == types.MessageService else "" ) def strftime(time: Union[int, float]): t = gmtime(time) return ( f"{t.tm_hour:02d}:" if t.tm_hour > 0 else "" ) + f"{t.tm_min:02d}:{t.tm_sec:02d}" @loader.tds class ShitQuotesMod(loader.Module): """ Quotes by @sh1tchannel """ strings = { "name": "SQuotes", "no_reply": "[SQuotes] НСт рСплая", "processing": "[SQuotes] ΠžΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠ°...", "api_processing": "[SQuotes] ОТиданиС API...", "api_error": "[SQuotes] Ошибка API", "loading_media": "[SQuotes] ΠžΡ‚ΠΏΡ€Π°Π²ΠΊΠ°...", "no_args_or_reply": "[SQuotes] НСт Π°Ρ€Π³ΡƒΠΌΠ΅Π½Ρ‚ΠΎΠ² ΠΈΠ»ΠΈ рСплая", "args_error": "[SQuotes] ΠŸΡ€ΠΈ ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠ΅ Π°Ρ€Π³ΡƒΠΌΠ΅Π½Ρ‚ΠΎΠ² ΠΏΡ€ΠΎΠΈΠ·ΠΎΡˆΠ»Π° ошибка. Запрос Π±Ρ‹Π»: {}", "too_many_messages": "[SQuotes] Блишком ΠΌΠ½ΠΎΠ³ΠΎ сообщСний. ΠœΠ°ΠΊΡΠΈΠΌΡƒΠΌ: {}", } async def client_ready(self, client: telethon.TelegramClient, db: dict): self.client = client self.db = db self.api_endpoint = "https://quotes.fl1yd.su/generate" self.settings = self.get_settings() async def qcmd(self, message: types.Message): """ Π‘ΠΎΠΊΡ€Π°Ρ‰Π΅Π½ΠΈΠ΅ ΠΊΠΎΠΌΠ°Π½Π΄Ρ‹ .sq """ return await self.sqcmd(message) async def sqcmd(self, message: Message): """ ИспользованиС: β€’ .sq <ΠΊΠΎΠ»-Π²ΠΎ сообщСний> + <Ρ€Π΅ΠΏΠ»Π°ΠΉ> + + <Ρ†Π²Π΅Ρ‚ (ΠΏΠΎ ТСланию)> >>> .sq >>> .sq 2 #2d2d2d >>> .sq red >>> .sq !file """ args: List[str] = utils.get_args(message) if not await message.get_reply_message(): return await utils.answer(message, self.strings["no_reply"]) m = await utils.answer(message, self.strings["processing"]) isFile = "!file" in args [count] = [int(arg) for arg in args if arg.isdigit() and int(arg) > 0] or [1] [bg_color] = [arg for arg in args if arg != "!file" and not arg.isdigit()] or [ self.settings["bg_color"] ] if count > self.settings["max_messages"]: return await utils.answer( m, self.strings["too_many_messages"].format(self.settings["max_messages"]), ) payload = { "messages": await self.quote_parse_messages(message, count), "quote_color": bg_color, "text_color": self.settings["text_color"], } if self.settings["debug"]: file = open("SQuotesDebug.json", "w") json.dump( payload, file, indent=4, ensure_ascii=False, ) await message.respond(file=file.name) await utils.answer(m, self.strings["api_processing"]) r = await self._api_request(payload) if r.status_code != 200: return await utils.answer(m, self.strings["api_error"]) quote = io.BytesIO(r.content) quote.name = "SQuote" + (".png" if isFile else ".webp") await utils.answer(m, quote, force_document=isFile) return await m[-1].delete() async def quote_parse_messages(self, message: Message, count: int): payloads = [] messages = [ msg async for msg in self.client.iter_messages( message.chat_id, count, reverse=True, add_offset=1, offset_id=(await message.get_reply_message()).id, ) ] for message in messages: avatar = rank = reply_id = reply_name = reply_text = None entities = get_entities(message.entities) if message.fwd_from: if message.fwd_from.from_id: if type(message.fwd_from.from_id) == types.PeerChannel: user_id = message.fwd_from.from_id.channel_id else: user_id = message.fwd_from.from_id.user_id try: user = await self.client.get_entity(user_id) except Exception: name, avatar = await self.get_profile_data(message.sender) return ( "Π’ΠΎΡ‚ Π±Π»ΠΈΠ½, ΠΏΡ€ΠΎΠΈΠ·ΠΎΡˆΠ»Π° ошибка. Π’ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎ Π½Π° этом ΠΊΠ°Π½Π°Π»Π΅ тСбя Π·Π°Π±Π°Π½ΠΈΠ»ΠΈ, ΠΈ Π½Π΅Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎ ΠΏΠΎΠ»ΡƒΡ‡ΠΈΡ‚ΡŒ ΠΈΠ½Ρ„ΠΎΡ€ΠΌΠ°Ρ†ΠΈΡŽ.", None, message.sender.id, name, avatar, "ошибка :(", None, None, None, None, ) name, avatar = await self.get_profile_data(user) user_id = user.id elif name := message.fwd_from.from_name: user_id = message.chat_id else: if reply := await message.get_reply_message(): reply_id = reply.sender.id reply_name = telethon.utils.get_display_name(reply.sender) reply_text = get_message_text(reply, True) + ( ". " + reply.raw_text if reply.raw_text and get_message_text(reply, True) else reply.raw_text or "" ) user = await self.client.get_entity(message.sender) name, avatar = await self.get_profile_data(user) user_id = user.id if message.is_group and message.is_channel: admins = await self.client.get_participants( message.chat_id, filter=types.ChannelParticipantsAdmins ) if user in admins: admin = admins[admins.index(user)].participant rank = admin.rank or ( "creator" if type(admin) == types.ChannelParticipantCreator else "admin" ) media = await self.client.download_media( get_message_media(message), bytes, thumb=-1 ) media = base64.b64encode(media).decode() if media else None via_bot = message.via_bot.username if message.via_bot else None text = (message.raw_text or "") + ( ( "\n\n" + get_message_text(message) if message.raw_text else get_message_text(message) ) if get_message_text(message) else "" ) payloads.append( { "text": text, "media": media, "entities": entities, "author": { "id": user_id, "name": name, "avatar": avatar, "rank": rank or "", "via_bot": via_bot, }, "reply": {"id": reply_id, "name": reply_name, "text": reply_text}, } ) return payloads async def fsqcmd(self, message: Message): """ ИспользованиС: β€’ .fsq <@ ΠΈΠ»ΠΈ ID> + <тСкст> - ΠΊΠ²ΠΎΡ‚Π° ΠΎΡ‚ ΡŽΠ·Π΅Ρ€Π° с @ ΠΈΠ»ΠΈ ID + ΡƒΠΊΠ°Π·Π°Π½Π½Ρ‹ΠΉ тСкст >>> .fsq @onetimeusername Π’Π°ΠΌ ΠΏΠΈΠ·Π΄Π° β€’ .fsq <Ρ€Π΅ΠΏΠ»Π°ΠΉ> + <тСкст> - ΠΊΠ²ΠΎΡ‚Π° ΠΎΡ‚ ΡŽΠ·Π΅Ρ€Π° с рСплая + ΡƒΠΊΠ°Π·Π°Π½Π½Ρ‹ΠΉ тСкст >>> .fsq Π― Π»ΠΎΡ… β€’ .fsq <@ ΠΈΠ»ΠΈ ID> + <тСкст> + -r + <@ ΠΈΠ»ΠΈ ID> + <тСкст> - ΠΊΠ²ΠΎΡ‚Π° с Ρ„Π΅ΠΉΠΊΠΎΠ²Ρ‹ΠΌ Ρ€Π΅ΠΏΠ»Π°Π΅ΠΌ >>> .fsq @Fl1yd спасибо -r @onetimeusername Π’Ρ‹ ΠΊΡ€ΡƒΡ‚ΠΎΠΉ β€’ .fsq <@ ΠΈΠ»ΠΈ ID> + <тСкст> + -r + <@ ΠΈΠ»ΠΈ ID> + <тСкст>; <Π°Ρ€Π³ΡƒΠΌΠ΅Π½Ρ‚Ρ‹> - ΠΊΠ²ΠΎΡ‚Π° с Ρ„Π΅ΠΉΠΊΠΎΠ²Ρ‹ΠΌΠΈ ΠΌΡƒΠ»ΡŒΡ‚ΠΈ сообщСниями >>> .fsq @onetimeusername ΠŸΠ°Ρ†Π°Π½Ρ‹ ΠΈΠ· @sh1tchannel, ΠΆΠ΄ΠΈΡ‚Π΅ Π½Π°Π³Ρ€Π°Π΄Ρƒ Π·Π° Π°Ρ…ΡƒΠ΅Π½Π½Ρ‹ΠΉ Π±ΠΎΡ‚Π½Π΅Ρ‚; @guslslakkaakdkab Ρ‡Π΅Π²ΠΎ; @Fl1yd НАШ Π‘ΠžΠ’ΠΠ•Π’ Π›Π£Π§Π¨Π˜Π™ -r @guslslakkaakdkab Ρ‡Π΅Π²ΠΎ """ args: str = utils.get_args_raw(message) reply = await message.get_reply_message() if not (args or reply): return await utils.answer(message, self.strings["no_args_or_reply"]) m = await utils.answer(message, self.strings["processing"]) try: payload = await self.fakequote_parse_messages(args, reply) except (IndexError, ValueError): return await utils.answer( m, self.strings["args_error"].format(message.text) ) if len(payload) > self.settings["max_messages"]: return await utils.answer( m, self.strings["too_many_messages"].format(self.settings["max_messages"]), ) payload = { "messages": payload, "quote_color": self.settings["bg_color"], "text_color": self.settings["text_color"], } if self.settings["debug"]: file = open("SQuotesDebug.json", "w") json.dump( payload, file, indent=4, ensure_ascii=False, ) await message.respond(file=file.name) await utils.answer(m, self.strings["api_processing"]) r = await self._api_request(payload) if r.status_code != 200: return await utils.answer(m, self.strings["api_error"]) quote = io.BytesIO(r.content) quote.name = "SQuote.webp" await utils.answer(m, quote) return await m[-1].delete() async def fakequote_parse_messages(self, args: str, reply: Message): async def get_user(args: str): args_, text = args.split(), "" user = await self.client.get_entity( int(args_[0]) if args_[0].isdigit() else args_[0] ) if len(args_) < 2: user = await self.client.get_entity( int(args) if args.isdigit() else args ) else: text = args.split(maxsplit=1)[1] return user, text if reply or reply and args: user = reply.sender name, avatar = await self.get_profile_data(user) text = args or "" else: messages = [] for part in args.split("; "): user, text = await get_user(part) name, avatar = await self.get_profile_data(user) reply_id = reply_name = reply_text = None if " -r " in part: user, text = await get_user("".join(part.split(" -r ")[0])) user2, text2 = await get_user("".join(part.split(" -r ")[1])) name, avatar = await self.get_profile_data(user) name2, _ = await self.get_profile_data(user2) reply_id = user2.id reply_name = name2 reply_text = text2 messages.append( { "text": text, "media": None, "entities": None, "author": { "id": user.id, "name": name, "avatar": avatar, "rank": "", }, "reply": { "id": reply_id, "name": reply_name, "text": reply_text, }, } ) return messages return [ { "text": text, "media": None, "entities": None, "author": {"id": user.id, "name": name, "avatar": avatar, "rank": ""}, "reply": {"id": None, "name": None, "text": None}, } ] async def get_profile_data(self, user: types.User): avatar = await self.client.download_profile_photo(user.id, bytes) return ( telethon.utils.get_display_name(user), base64.b64encode(avatar).decode() if avatar else None, ) async def sqsetcmd(self, message: Message): """ ИспользованиС: β€’ .sqset (<Ρ†Π²Π΅Ρ‚ для bg_color/text_color> ) >>> .sqset bg_color #2d2d2d >>> .sqset debug true """ args: List[str] = utils.get_args_raw(message).split(maxsplit=1) if not args: return await utils.answer( message, f"[SQuotes] Настройки:\n\n" f"ΠœΠ°ΠΊΡΠΈΠΌΡƒΠΌ сообщСний (max_messages): {self.settings['max_messages']}\n" f"Π¦Π²Π΅Ρ‚ ΠΊΠ²ΠΎΡ‚Ρ‹ (bg_color): {self.settings['bg_color']}\n" f"Π¦Π²Π΅Ρ‚ тСкста (text_color): {self.settings['text_color']}\n" f"Π”Π΅Π±Π°Π³ (debug): {self.settings['debug']}\n\n" f"ΠΠ°ΡΡ‚Ρ€ΠΎΠΈΡ‚ΡŒ ΠΌΠΎΠΆΠ½ΠΎ с ΠΏΠΎΠΌΠΎΡ‰ΡŒΡŽ .sqset <ΠΏΠ°Ρ€Π°ΠΌΠ΅Ρ‚Ρ€> <Π·Π½Π°Ρ‡Π΅Π½ΠΈΠ΅> ΠΈΠ»ΠΈ reset", ) if args[0] == "reset": self.get_settings(True) return await utils.answer( message, "[SQuotes - Settings] Настойки ΠΊΠ²ΠΎΡ‚ Π±Ρ‹Π»ΠΈ ΡΠ±Ρ€ΠΎΡˆΠ΅Π½Ρ‹" ) if len(args) < 2: return await utils.answer( message, "[SQuotes - Settings] НСдостаточно Π°Ρ€Π³ΡƒΠΌΠ΅Π½Ρ‚ΠΎΠ²" ) mods = ["max_messages", "bg_color", "text_color", "debug"] if args[0] not in mods: return await utils.answer( message, f"[SQuotes - Settings] Π’Π°ΠΊΠΎΠ³ΠΎ ΠΏΠ°Ρ€Π°ΠΌΠ΅Ρ€Ρ‚Π° Π½Π΅Ρ‚, Π΅ΡΡ‚ΡŒ {', '.join(mods)}", ) elif args[0] == "debug": if args[1].lower() not in ["true", "false"]: return await utils.answer( message, "[SQuotes - Settings] Π’Π°ΠΊΠΎΠ³ΠΎ значСния ΠΏΠ°Ρ€Π°ΠΌΠ΅Ρ‚Ρ€Π° Π½Π΅Ρ‚, Π΅ΡΡ‚ΡŒ true/false", ) self.settings[args[0]] = args[1].lower() == "true" elif args[0] == "max_messages": if not args[1].isdigit(): return await utils.answer( message, "[SQuotes - Settings] Π­Ρ‚ΠΎ Π½Π΅ число" ) self.settings[args[0]] = int(args[1]) else: self.settings[args[0]] = args[1] self.db.set("SQuotes", "settings", self.settings) return await utils.answer( message, f"[SQuotes - Settings] Π—Π½Π°Ρ‡Π΅Π½ΠΈΠ΅ ΠΏΠ°Ρ€Π°ΠΌΠ΅Ρ‚Ρ€Π° {args[0]} Π±Ρ‹Π»ΠΎ выставлСно Π½Π° {args[1]}", ) def get_settings(self, force: bool = False): settings: dict = self.db.get("SQuotes", "settings", {}) if not settings or force: settings.update( { "max_messages": 15, "bg_color": "#162330", "text_color": "#fff", "debug": False, } ) self.db.set("SQuotes", "settings", settings) return settings async def _api_request(self, data: dict): return await utils.run_sync(requests.post, self.api_endpoint, json=data)