Files
limoka/anon97945/hikka-mods/quotes.py
2025-07-10 21:02:34 +03:00

568 lines
17 KiB
Python

__version__ = (2, 1, 1)
# ▄▀█ █▄ █ █▀█ █▄ █ █▀█ ▀▀█ █▀█ █ █ █▀
# █▀█ █ ▀█ █▄█ █ ▀█ ▀▀█ █ ▀▀█ ▀▀█ ▄█
#
# © Copyright 2024
#
# developed by @anon97945
#
# https://t.me/apodiktum_modules
# https://github.com/anon97945
#
# 🔒 Licensed under the GNU AGPLv3
# 🌐 https://www.gnu.org/licenses/agpl-3.0.html
# Original author: @mishase
# API author: @mishase
# requires: requests Pillow cryptg
import logging
import contextlib
import hashlib
import json
import requests
import io
import PIL
from telethon import utils
from telethon.utils import get_display_name
from telethon.tl.types import (
Message,
MessageEntityBold,
MessageEntityItalic,
MessageEntityMention,
MessageEntityTextUrl,
MessageEntityCode,
MessageEntityMentionName,
MessageEntityHashtag,
MessageEntityCashtag,
MessageEntityBotCommand,
MessageEntityUrl,
MessageEntityStrike,
MessageEntityUnderline,
MessageEntityPhone,
ChatPhotoEmpty,
MessageMediaPhoto,
MessageMediaDocument,
MessageMediaWebPage,
User,
PeerUser,
PeerBlocked,
PeerChannel,
PeerChat,
DocumentAttributeSticker,
ChannelParticipantsAdmins,
ChannelParticipantCreator,
)
from .. import loader, utils
logger = logging.getLogger(__name__)
null = None
false = False
true = True
PIL.Image.MAX_IMAGE_PIXELS = null
class dict(dict):
def __setattr__(self, attr, value):
self[attr] = value
@loader.tds
class ApodiktumQuotesMod(loader.Module):
"""Quote a message using Mishase Quotes API"""
strings = {
"name": "Apo-Quotes",
"developer": "@anon97945",
"_cfg_msg_limit": "Messages limit",
"_cfg_max_width": "Max width (px)",
"_cfg_scale_factor": "Scale factor",
"_cfg_square_avatar": "Square avatar",
"_cfg_text_color": "Text color",
"_cfg_reply_line_color": "Reply line color",
"_cfg_admin_title_color": "Admin title color",
"_cfg_message_border_radius": "Message radius (px)",
"_cfg_reply_thumb_border_radius": "Reply thumbnail radius (px)",
"_cfg_picture_border_radius": "Picture radius (px)",
"_cfg_background_color": "Background color",
}
strings_de = {}
strings_ru = {}
all_strings = {
"strings": strings,
"strings_en": strings,
"strings_de": strings_de,
"strings_ru": strings_ru,
}
changes = {}
def __init__(self):
self.config = loader.ModuleConfig(
loader.ConfigValue(
"QUOTE_MESSAGES_LIMIT",
50,
doc=lambda: self.strings("_cfg_msg_limit"),
validator=loader.validators.Integer(),
),
# "QUOTE_MESSAGES_LIMIT",
# 50,
# "Messages limit",
loader.ConfigValue(
"MAX_WIDTH",
384,
doc=lambda: self.strings("_cfg_max_width"),
validator=loader.validators.Integer(),
),
# "MAX_WIDTH",
# 384,
# "Max width (px)",
loader.ConfigValue(
"SCALE_FACTOR",
5,
doc=lambda: self.strings("_cfg_scale_factor"),
validator=loader.validators.Integer(),
),
# "SCALE_FACTOR",
# 5,
# "Scale factor",
loader.ConfigValue(
"SQUARE_AVATAR",
False,
doc=lambda: self.strings("_cfg_square_avatar"),
validator=loader.validators.Boolean(),
),
# "SQUARE_AVATAR",
# false,
# "Square avatar",
loader.ConfigValue(
"TEXT_COLOR",
"white",
doc=lambda: self.strings("_cfg_text_color"),
validator=loader.validators.String(),
),
# "TEXT_COLOR",
# "white",
# "Text color",
loader.ConfigValue(
"REPLY_LINE_COLOR",
"white",
doc=lambda: self.strings("_cfg_reply_line_color"),
validator=loader.validators.String(),
),
# "REPLY_LINE_COLOR",
# "white",
# "Reply line color",
loader.ConfigValue(
"REPLY_THUMB_BORDER_RADIUS",
2,
doc=lambda: self.strings("_cfg_reply_thumb_border_radius"),
validator=loader.validators.Integer(),
),
# "REPLY_THUMB_BORDER_RADIUS",
# 2,
# "Reply thumbnail radius (px)",
loader.ConfigValue(
"ADMINTITLE_COLOR",
"#969ba0",
doc=lambda: self.strings("_cfg_admin_title_color"),
validator=loader.validators.String(),
),
# "ADMINTITLE_COLOR",
# "#969ba0",
# "Admin title color",
loader.ConfigValue(
"MESSAGE_BORDER_RADIUS",
10,
doc=lambda: self.strings("_cfg_message_border_radius"),
validator=loader.validators.Integer(),
),
# "MESSAGE_BORDER_RADIUS",
# 10,
# "Message radius (px)",
loader.ConfigValue(
"PICTURE_BORDER_RADIUS",
8,
doc=lambda: self.strings("_cfg_picture_border_radius"),
validator=loader.validators.Integer(),
),
# "PICTURE_BORDER_RADIUS",
# 8,
# "Picture radius (px)",
loader.ConfigValue(
"BACKGROUND_COLOR",
"#162330",
doc=lambda: self.strings("_cfg_background_color"),
validator=loader.validators.String(),
),
# "BACKGROUND_COLOR",
# "#162330",
# "Background color",
)
async def client_ready(self):
self.apo_lib = await self.import_lib(
"https://raw.githubusercontent.com/anon97945/hikka-libs/master/apodiktum_library.py",
suspend_on_error=True,
)
await self.apo_lib.migrator.auto_migrate_handler(
self.__class__.__name__,
self.strings("name"),
self.changes,
self.config["auto_migrate"],
)
async def cquotescmd(self, message: Message):
"""
This will open the config for the module.
"""
name = self.strings("name")
await self.allmodules.commands["config"](
await utils.answer(message, f"{self.get_prefix()}config {name}")
)
async def quotecmd(self, msg):
"""Quote a message. Args: .<count> .file"""
args = utils.get_args_raw(msg)
reply = await msg.get_reply_message()
if not reply:
return await msg.edit("No reply message")
if not msg.out:
msg = await msg.reply("_")
count = 1
forceDocument = false
if args:
args = args.split()
forceDocument = "file" in args
with contextlib.suppress(StopIteration):
count = next(int(arg) for arg in args if arg.isdigit())
count = max(1, min(self.config["QUOTE_MESSAGES_LIMIT"], count))
messagePacker = MessagePacker(self._client)
if count == 1:
await msg.edit("<b>Processing...</b>")
await messagePacker.add(reply)
if count > 1:
it = self._client.iter_messages(
reply.peer_id,
offset_id=reply.id,
reverse=true,
add_offset=1,
limit=count,
)
i = 1
async for message in it:
await msg.edit(f"<b>Processing {i}/{count}</b>")
i += 1
await messagePacker.add(message)
messages = messagePacker.messages
if not messages:
return await msg.edit("No messages to quote")
files = [("files", f) for f in messagePacker.files.values()]
if not files:
files.append(("files", bytearray()))
await msg.edit("<b>API Processing...</b>")
resp = await utils.run_sync(
requests.post,
"https://quotes.mishase.dev/create",
data={
"data": json.dumps(
{
"messages": messages,
"maxWidth": self.config["MAX_WIDTH"],
"scaleFactor": self.config["SCALE_FACTOR"],
"squareAvatar": self.config["SQUARE_AVATAR"],
"textColor": self.config["TEXT_COLOR"],
"replyLineColor": self.config["REPLY_LINE_COLOR"],
"adminTitleColor": self.config["ADMINTITLE_COLOR"],
"messageBorderRadius": self.config["MESSAGE_BORDER_RADIUS"],
"replyThumbnailBorderRadius": self.config[
"REPLY_THUMB_BORDER_RADIUS"
],
"pictureBorderRadius": self.config["PICTURE_BORDER_RADIUS"],
"backgroundColor": self.config["BACKGROUND_COLOR"],
}
),
"moduleBuild": null,
},
files=files,
timeout=99,
)
if resp.status_code == 418:
logger.error("API Error: %s", resp.text)
msg.delete()
return
await msg.edit("<b>Sending...</b>")
image = io.BytesIO()
image.name = "quote.webp"
PIL.Image.open(io.BytesIO(resp.content)).save(image, "WEBP")
image.seek(0)
await self._client.send_message(
msg.peer_id,
file=image,
force_document=forceDocument,
reply_to=utils.get_topic(msg),
)
await msg.delete()
async def fquotecmd(self, msg):
"""Fake message quote. Args: @<username>/<id>/<reply> <text>"""
args = utils.get_args_raw(msg)
reply = await msg.get_reply_message()
splitArgs = args.split(maxsplit=1)
if len(splitArgs) == 2 and (
splitArgs[0].startswith("@") or splitArgs[0].isdigit()
):
user = (
splitArgs[0][1:] if splitArgs[0].startswith("@") else int(splitArgs[0])
)
text = splitArgs[1]
elif reply:
user = reply.sender_id
text = args
else:
return await msg.edit("Incorrect args")
try:
uid = (await self._client.get_entity(user)).id
except Exception:
return await msg.edit("User not found")
async def getMessage():
return Message(0, uid, message=text)
msg.message = ""
msg.get_reply_message = getMessage
await self.quotecmd(msg)
class MessagePacker:
def __init__(self, client):
self.files = {}
self.messages = []
self._client = client
async def add(self, msg):
packed = await self.packMessage(msg)
if packed:
self.messages.append(packed)
async def packMessage(self, msg):
obj = {}
if text := msg.message:
obj["text"] = text
if entities := MessagePacker.encodeEntities(msg.entities or []):
obj["entities"] = entities
if media := msg.media:
file = await self.downloadMedia(media)
if file:
obj["picture"] = {"file": file}
if "text" not in obj and "picture" not in obj:
return null
obj["author"] = await self.encodeAuthor(msg)
reply = await msg.get_reply_message()
if reply:
obj["reply"] = await self.encodeReply(reply)
return obj
@staticmethod
def encodeEntities(entities):
encEntities = []
for entity in entities:
if entityType := MessagePacker.getEntityType(entity):
encEntities.append(
{
"type": entityType,
"offset": entity.offset,
"length": entity.length,
}
)
return encEntities
@staticmethod
def getEntityType(entity):
t = type(entity)
if t is MessageEntityBold:
return "bold"
if t is MessageEntityItalic:
return "italic"
if t in [MessageEntityUrl, MessageEntityPhone]:
return "url"
if t is MessageEntityCode:
return "monospace"
if t is MessageEntityStrike:
return "strikethrough"
if t is MessageEntityUnderline:
return "underline"
if t in [
MessageEntityMention,
MessageEntityTextUrl,
MessageEntityMentionName,
MessageEntityHashtag,
MessageEntityCashtag,
MessageEntityBotCommand,
]:
return "bluetext"
return null
async def downloadMedia(self, inMedia, thumb=null):
media = MessagePacker.getMedia(inMedia)
if not media:
return null
mid = str(media.id)
if thumb:
mid += f".{str(thumb)}"
if mid not in self.files:
try:
mime = media.mime_type
except AttributeError:
mime = "image/jpg"
dl = await self._client.download_media(media, bytes, thumb=thumb)
self.files[mid] = (str(len(self.files)), dl, mime)
return self.files[mid][0]
@staticmethod
def getMedia(media):
t = type(media)
if t is MessageMediaPhoto:
return media.photo
if t is MessageMediaDocument:
for attribute in media.document.attributes:
if isinstance(attribute, DocumentAttributeSticker):
return media.document
elif t is MessageMediaWebPage:
if media.webpage.type == "photo":
return media.webpage.photo
return null
async def downloadProfilePicture(self, entity):
media = entity.photo
if not media or isinstance(media, ChatPhotoEmpty):
return null
mid = str(media.photo_id)
if mid not in self.files:
dl = await self._client.download_profile_photo(entity, bytes)
self.files[mid] = (str(len(self.files)), dl, "image/jpg")
return self.files[mid][0]
async def encodeAuthor(self, msg):
uid, name, picture, adminTitle = await self.getAuthor(msg)
obj = {"id": uid, "name": name}
if picture:
obj["picture"] = {"file": picture}
if adminTitle:
obj["adminTitle"] = adminTitle
return obj
async def getAuthor(self, msg, full=true):
uid = null
name = null
picture = null
adminTitle = null
chat = msg.peer_id
peer = msg.from_id or chat
if fwd := msg.fwd_from:
peer = fwd.from_id
name = fwd.post_author or fwd.from_name
t = type(peer)
if t is int:
uid = peer
elif t is PeerUser:
uid = peer.user_id
elif t is PeerChannel:
uid = peer.channel_id
elif t is PeerChat:
uid = peer.chat_id
elif t is PeerBlocked:
uid = peer.peer_id
elif not peer:
uid = int(hashlib.shake_256(name.encode("utf-8")).hexdigest(6), 16)
if not name:
entity = null
try:
entity = await self._client.get_entity(peer)
except Exception:
entity = await msg.get_chat()
if isinstance(entity, User) and entity.deleted:
name = "Deleted Account"
else:
name = get_display_name(entity)
if full:
picture = await self.downloadProfilePicture(entity)
if isinstance(chat, (PeerChannel, PeerChat)):
admins = await self._client.get_participants(
chat, filter=ChannelParticipantsAdmins
)
for admin in admins:
participant = admin.participant
if participant.user_id == uid:
try:
adminTitle = participant.rank
except AttributeError:
adminTitle = null
if not adminTitle:
if isinstance(participant, ChannelParticipantCreator):
adminTitle = "owner"
else:
adminTitle = "admin"
break
return uid, name, picture, adminTitle
async def encodeReply(self, reply):
obj = {}
if text := reply.message:
obj["text"] = text
elif media := reply.media:
t = type(media)
obj.text = "📷 Photo" if t is MessageMediaPhoto else "💾 File"
name = (await self.getAuthor(reply, full=false))[1]
obj["author"] = name
if media := reply.media:
file = await self.downloadMedia(media, -1)
if file:
obj["thumbnail"] = {"file": file}
return obj