mirror of
https://github.com/MuRuLOSE/limoka.git
synced 2026-06-16 14:34:17 +02:00
804 lines
25 KiB
Python
804 lines
25 KiB
Python
# @Sekai_Yoneya
|
|
|
|
import hashlib
|
|
import io
|
|
import json
|
|
import logging
|
|
import os
|
|
import PIL
|
|
import requests
|
|
import telethon
|
|
from telethon.tl.types import (MessageEntityBold, MessageEntityItalic,
|
|
MessageEntityMention, MessageEntityTextUrl,
|
|
MessageEntityCode, MessageEntityMentionName,
|
|
MessageEntityHashtag, MessageEntityCashtag,
|
|
MessageEntityBotCommand, MessageEntityUrl,
|
|
MessageEntityStrike, MessageEntityUnderline,
|
|
MessageEntityPhone, ChannelParticipantsAdmins,
|
|
ChannelParticipantCreator,
|
|
PeerChannel,
|
|
User, PeerUser,
|
|
MessageMediaUnsupported)
|
|
|
|
from .. import loader, utils
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
BUILD_ID = "b890541d-be52-4b9d-8ced-f5359d9d7109" # None to disable autoupdates
|
|
MODULE_PATH = "https://quotes.mishase.dev/f/module.py"
|
|
|
|
|
|
@loader.tds
|
|
class QuotesMod(loader.Module):
|
|
"""Quotes a message"""
|
|
strings = {
|
|
"name": "Quotes",
|
|
"silent_processing_cfg_doc": ("Process quote "
|
|
"silently(mostly"
|
|
" w/o editing)"),
|
|
"module_endpoint_cfg_doc": "Module endpoint URL",
|
|
"quote_limit_cfg_doc": "Limit for messages per quote",
|
|
"max_width_cfg_doc": "Maximum quote width in pixels",
|
|
"scale_factor_cfg_doc": "Quote quality (up to 5.5)",
|
|
"square_avatar_cfg_doc": "Square avatar in quote",
|
|
"text_color_cfg_doc": "Color of text in quote",
|
|
"reply_line_color_cfg_doc": "Reply line color",
|
|
"reply_thumb_radius_cfg_doc": ("Reply media thumbnail "
|
|
"radius in pixels"),
|
|
"admintitle_color_cfg_doc": "Admin title color",
|
|
"message_radius_cfg_doc": "Message radius in px",
|
|
"picture_radius_cfg_doc": "Media picture radius in px",
|
|
"background_color_cfg_doc": "Quote background color",
|
|
"quote_limit_reached": ("The maximum number "
|
|
"of messages in "
|
|
"multiquote - {}."),
|
|
"fq_incorrect_args": ("<b>Args incorrect.</b> \"@$username (ID)"
|
|
"$text\" or \"$reply $text\""),
|
|
"updating": "<b>Updating...</b>",
|
|
"update_error": "<b>Update error</b>",
|
|
"processing": "<b>Processing...</b>",
|
|
"unreachable_error": "<b>API Host is unreachable now. Please try again later.</b>",
|
|
"server_error": "<b>API Error occured :)</b>",
|
|
"no_reply": "<b>You didn't reply to a message.</b>",
|
|
"creator": "Owner",
|
|
"admin": "Admin",
|
|
"channel": "Channel",
|
|
"media_type_photo": "Photo",
|
|
"media_type_video": "📹Video",
|
|
"media_type_videomessage": "📹Video message",
|
|
"media_type_voice": "🎵Voice message",
|
|
"media_type_audio": "🎧Music: {} - {}",
|
|
"media_type_contact": "👤Contact: {}",
|
|
"media_type_poll": "📊Poll: ",
|
|
"media_type_quiz": "📊Quiz: ",
|
|
"media_type_location": "📍Location",
|
|
"media_type_gif": "🖼GIF",
|
|
"media_type_sticker": "Sticker",
|
|
"media_type_file": "💾File",
|
|
"dice_type_dice": "Dice",
|
|
"dice_type_dart": "Dart",
|
|
"ball_thrown": "Ball thrown",
|
|
"ball_kicked": "Ball kicked",
|
|
"dart_thrown": "Dart thrown",
|
|
"dart_almostthere": "almost there!",
|
|
"dart_missed": "missed!",
|
|
"dart_bullseye": "bullseye!"
|
|
}
|
|
|
|
def __init__(self):
|
|
self.config = loader.ModuleConfig("SILENT_PROCESSING", False,
|
|
lambda: self.strings["silent_processing_cfg_doc"],
|
|
"QUOTE_MESSAGES_LIMIT", 15,
|
|
lambda: self.strings["quote_limit_cfg_doc"],
|
|
"MAX_WIDTH", 384,
|
|
lambda: self.strings["max_width_cfg_doc"],
|
|
"SCALE_FACTOR", 5,
|
|
lambda: self.strings["scale_factor_cfg_doc"],
|
|
"SQUARE_AVATAR", False,
|
|
lambda: self.strings["square_avatar_cfg_doc"],
|
|
"TEXT_COLOR", "white",
|
|
lambda: self.strings["text_color_cfg_doc"],
|
|
"REPLY_LINE_COLOR", "white",
|
|
lambda: self.strings["reply_line_color_cfg_doc"],
|
|
"REPLY_THUMB_BORDER_RADIUS", 2,
|
|
lambda: self.strings["reply_thumb_radius_cfg_doc"],
|
|
"ADMINTITLE_COLOR", "#969ba0",
|
|
lambda: self.strings["admintitle_color_cfg_doc"],
|
|
"MESSAGE_BORDER_RADIUS", 10,
|
|
lambda: self.strings["message_radius_cfg_doc"],
|
|
"PICTURE_BORDER_RADIUS", 8,
|
|
lambda: self.strings["picture_radius_cfg_doc"],
|
|
"BACKGROUND_COLOR", "#162330",
|
|
lambda: self.strings["background_color_cfg_doc"])
|
|
|
|
async def client_ready(self, client, db):
|
|
self.client = client
|
|
|
|
@loader.unrestricted
|
|
@loader.ratelimit
|
|
async def mquotecmd(self, message):
|
|
await self.quotecmd(message)
|
|
|
|
@loader.unrestricted
|
|
@loader.ratelimit
|
|
async def quotecmd(self, message):
|
|
""".quote <reply> - quote a message"""
|
|
if not self.config["SILENT_PROCESSING"]:
|
|
mmm = await utils.answer(
|
|
message,
|
|
self.strings(
|
|
"processing",
|
|
message
|
|
)
|
|
)
|
|
args = utils.get_args_raw(message)
|
|
reply = await message.get_reply_message()
|
|
if not reply:
|
|
return await utils.answer(
|
|
mmm,
|
|
self.strings("no_reply", message)
|
|
)
|
|
if not args or not args.isdigit():
|
|
count = 1
|
|
else:
|
|
count = int(args.strip())
|
|
if count <= 0:
|
|
count = 1
|
|
if count > self.config["QUOTE_MESSAGES_LIMIT"]:
|
|
return await utils.answer(
|
|
mmm,
|
|
self.strings(
|
|
"quote_limit_reached",
|
|
message
|
|
).format(
|
|
self.config["QUOTE_MESSAGES_LIMIT"]
|
|
)
|
|
)
|
|
messages = list()
|
|
messages_client = list()
|
|
media_files = dict()
|
|
current_message = reply.id
|
|
end_message = current_message + count
|
|
while current_message != end_message:
|
|
msg = await self.client.get_messages(
|
|
message.to_id,
|
|
ids=current_message
|
|
)
|
|
if msg:
|
|
messages_client.append(msg)
|
|
current_message += 1
|
|
for msg in messages_client:
|
|
text = msg.raw_text
|
|
markdown = await get_markdown(msg)
|
|
admintitle = str()
|
|
if isinstance(msg.to_id, PeerChannel) and msg.fwd_from:
|
|
user = msg.forward.chat
|
|
else:
|
|
user = await msg.get_sender()
|
|
if msg.fwd_from:
|
|
if msg.fwd_from.saved_from_peer:
|
|
if isinstance(msg.fwd_from.saved_from_peer, PeerUser):
|
|
user_id = msg.fwd_from.saved_from_peer.user_id
|
|
elif isinstance(msg.fwd_from.saved_from_peer, PeerChannel):
|
|
user_id = msg.fwd_from.saved_from_peer.channel_id
|
|
else:
|
|
user_id = None
|
|
elif isinstance(msg.fwd_from.from_id, PeerChannel):
|
|
user_id = msg.fwd_from.from_id.channel_id
|
|
elif isinstance(msg.fwd_from.from_id, PeerUser):
|
|
user_id = msg.fwd_from.from_id.user_id
|
|
else:
|
|
user_id = None
|
|
if not user_id:
|
|
name = msg.fwd_from.from_name
|
|
user_id = int(hashlib.shake_256(name.encode("utf-8")).hexdigest(6), 16)
|
|
pfp = None
|
|
else:
|
|
sender = await self.client.get_entity(user_id)
|
|
name = telethon.utils.get_display_name(sender)
|
|
pfp = media_files.get(
|
|
"@av{}".format(str(user_id).lstrip("-")),
|
|
None
|
|
)
|
|
if msg.fwd_from.saved_from_peer:
|
|
profile_photo_url = msg.forward.chat
|
|
elif msg.fwd_from.from_name:
|
|
name = msg.fwd_from.from_name
|
|
profile_photo_url = None
|
|
elif msg.forward.sender:
|
|
name = telethon.utils. \
|
|
get_display_name(msg.forward.sender)
|
|
profile_photo_url = msg.forward.sender.id
|
|
elif msg.forward.chat:
|
|
profile_photo_url = user
|
|
if msg.fwd_from is not None and msg.fwd_from.post_author is not None:
|
|
name += f" ({msg.fwd_from.post_author})"
|
|
if not pfp:
|
|
pfp = await self.client \
|
|
.download_profile_photo(profile_photo_url, "mishase_cache/")
|
|
if pfp:
|
|
media_files[
|
|
"@av{}".format(str(user_id).lstrip("-"))
|
|
] = pfp
|
|
else:
|
|
user_id = msg.sender_id if \
|
|
msg.sender_id != 1087968824 \
|
|
else message.chat_id
|
|
if not message.chat or (user_id != message.chat_id or isinstance(message.chat, User)):
|
|
sender = await self.client.get_entity(user_id)
|
|
name = telethon.utils.get_display_name(sender)
|
|
pfp = media_files.get(
|
|
"@av{}".format(str(user_id).lstrip("-")),
|
|
None
|
|
)
|
|
if not pfp:
|
|
pfp = await self.client \
|
|
.download_profile_photo(msg.sender_id, "mishase_cache/")
|
|
if pfp:
|
|
media_files[
|
|
"@av{}".format(str(user_id).lstrip("-"))
|
|
] = pfp
|
|
else:
|
|
name = message.chat.title
|
|
try:
|
|
dl_chat_pfp = await self.client \
|
|
.download_profile_photo(user_id, "mishase_cache/")
|
|
no_pfp = False
|
|
except Exception as e:
|
|
logger.error(e, exc_info=True)
|
|
no_pfp = True
|
|
if no_pfp is False:
|
|
media_files[
|
|
"@av{}".format(str(user_id).lstrip("-"))
|
|
] = dl_chat_pfp
|
|
media_files = await check_media(self.client, msg, media_files, False)
|
|
message_reply = await msg.get_reply_message()
|
|
if message_reply:
|
|
reply_text = message_reply.raw_text
|
|
media_caption = await get_media_caption(message_reply)
|
|
if media_caption != "":
|
|
if not reply_text:
|
|
reply_text = media_caption
|
|
else:
|
|
reply_text = "{} \"{}\"".format(
|
|
media_caption,
|
|
reply_text
|
|
)
|
|
if message_reply.fwd_from:
|
|
if message_reply.forward.chat:
|
|
reply_sender = message_reply.forward.chat
|
|
reply_name = telethon.utils.get_display_name(
|
|
reply_sender
|
|
)
|
|
media_files = await check_media(
|
|
self.client,
|
|
message_reply,
|
|
media_files,
|
|
True
|
|
)
|
|
reply_message = {
|
|
"author": reply_name,
|
|
"text": reply_text,
|
|
}
|
|
if "@mediareply{}".format(message_reply.id) in media_files:
|
|
reply_message["thumbnail"] = {
|
|
"file": "@mediareply{}".format(message_reply.id)
|
|
}
|
|
elif message_reply.fwd_from.sender_id:
|
|
reply_sender = message_reply.fwd_from.sender_id
|
|
reply_user = await self.client.get_entity(reply_sender)
|
|
reply_name = telethon.utils.get_display_name(
|
|
reply_user
|
|
)
|
|
media_files = await check_media(
|
|
self.client,
|
|
message_reply,
|
|
media_files,
|
|
True
|
|
)
|
|
reply_message = {
|
|
"author": reply_name,
|
|
"text": reply_text,
|
|
}
|
|
if "@mediareply{}".format(message_reply.id) in media_files:
|
|
reply_message["thumbnail"] = {
|
|
"file": "@mediareply{}".format(message_reply.id)
|
|
}
|
|
else:
|
|
media_files = await check_media(
|
|
self.client,
|
|
message_reply,
|
|
media_files,
|
|
True
|
|
)
|
|
reply_message = {
|
|
"author": message_reply.fwd_from.from_name,
|
|
"text": reply_text,
|
|
}
|
|
if "@mediareply" in media_files:
|
|
reply_message["thumbnail"] = {
|
|
"file": "@mediareply"
|
|
}
|
|
elif message_reply.sender_id:
|
|
reply_sender = message_reply.sender
|
|
reply_name = telethon.utils.get_display_name(reply_sender)
|
|
media_files = await check_media(
|
|
self.client,
|
|
message_reply,
|
|
media_files,
|
|
True
|
|
)
|
|
reply_message = {
|
|
"author": reply_name,
|
|
"text": reply_text
|
|
}
|
|
if "@mediareply{}".format(message_reply.id) in media_files:
|
|
reply_message["thumbnail"] = {
|
|
"file": "@mediareply{}".format(message_reply.id)
|
|
}
|
|
else:
|
|
reply_name = message.chat
|
|
media_files = await check_media(
|
|
self.client,
|
|
message_reply,
|
|
media_files,
|
|
True
|
|
)
|
|
reply_message = {
|
|
"author": reply_name,
|
|
"text": reply_text
|
|
}
|
|
if "@mediareply{}".format(message_reply.id) in media_files:
|
|
reply_message["thumbnail"] = {
|
|
"file": "@mediareply{}".format(message_reply.id)
|
|
}
|
|
else:
|
|
reply_message = None
|
|
if message.chat:
|
|
if msg.sender_id and not msg.fwd_from:
|
|
admins = await self.client \
|
|
.get_participants(message.to_id, filter=ChannelParticipantsAdmins)
|
|
if msg.sender in admins:
|
|
admin = admins[admins.index(msg.sender)].participant
|
|
if not admin:
|
|
admintitle = " "
|
|
else:
|
|
admintitle = admin.rank
|
|
if not admintitle:
|
|
if type(admin) == ChannelParticipantCreator:
|
|
admintitle = self.strings(
|
|
"creator",
|
|
message
|
|
)
|
|
else:
|
|
admintitle = self.strings(
|
|
"admin",
|
|
message
|
|
)
|
|
elif msg.fwd_from.saved_from_peer:
|
|
admintitle = self.strings(
|
|
"channel",
|
|
message
|
|
)
|
|
elif msg.fwd_from.from_id:
|
|
admintitle = self.strings(
|
|
"channel",
|
|
message
|
|
)
|
|
else:
|
|
admintitle = reply.post_author if \
|
|
reply.post_author else " "
|
|
message_to_append = {
|
|
"text": text,
|
|
"reply": reply_message,
|
|
"entities": markdown,
|
|
"author": {
|
|
"id": str(user_id).lstrip("-"),
|
|
"name": name,
|
|
"adminTitle": admintitle
|
|
}
|
|
}
|
|
if f'@av{str(user_id).lstrip("-")}' in media_files:
|
|
message_to_append["author"]["picture"] = {
|
|
"file": f'@av{str(user_id).lstrip("-")}'
|
|
}
|
|
if "@media{}".format(msg.id) in media_files:
|
|
message_to_append["picture"] = {
|
|
"file": "@media{}".format(msg.id)
|
|
}
|
|
messages.append(message_to_append)
|
|
data = {
|
|
"messages": messages,
|
|
"maxWidth": self.config["MAX_WIDTH"],
|
|
"scaleFactor": self.config["SCALE_FACTOR"],
|
|
"squareAvatar": self.config["SQUARE_AVATAR"],
|
|
"textColor": self.config["TEXT_COLOR"],
|
|
"replyLineColor": self.config["REPLY_LINE_COLOR"],
|
|
"adminTitleColor": self.config["ADMINTITLE_COLOR"],
|
|
"messageBorderRadius": self.config["MESSAGE_BORDER_RADIUS"],
|
|
"replyThumbnailBorderRadius": self.config["REPLY_THUMB_BORDER_RADIUS"],
|
|
"pictureBorderRadius": self.config["PICTURE_BORDER_RADIUS"],
|
|
"backgroundColor": self.config["BACKGROUND_COLOR"],
|
|
}
|
|
files = []
|
|
for file in media_files.keys():
|
|
files.append(
|
|
(
|
|
"files",
|
|
(
|
|
file,
|
|
open(
|
|
media_files[file],
|
|
"rb"
|
|
),
|
|
"image/jpg"
|
|
)
|
|
)
|
|
)
|
|
if not files:
|
|
files.append(
|
|
(
|
|
"files",
|
|
(
|
|
"file",
|
|
bytearray(),
|
|
"text/text"
|
|
)
|
|
)
|
|
)
|
|
try:
|
|
req = await utils.run_sync(
|
|
requests.post,
|
|
"https://quotes.mishase.dev/create",
|
|
data={"data": json.dumps(data), "moduleBuild": BUILD_ID},
|
|
files=files,
|
|
timeout=100
|
|
)
|
|
except (requests.ConnectionError, requests.exceptions.Timeout):
|
|
await clean_files()
|
|
return await utils.answer(
|
|
mmm,
|
|
self.strings("unreachable_error", message)
|
|
)
|
|
except:
|
|
pass
|
|
await clean_files()
|
|
if req.status_code >= 520:
|
|
return await utils.answer(
|
|
mmm,
|
|
self.strings("unreachable_error", message)
|
|
)
|
|
if req.status_code >= 500:
|
|
return await utils.answer(
|
|
mmm,
|
|
self.strings("server_error", message)
|
|
)
|
|
if req.status_code == 418:
|
|
if not self.config["SILENT_PROCESSING"]:
|
|
await utils.answer(mmm, self.strings("updating", message))
|
|
if await update(self.allmodules.modules, message):
|
|
return await self.allmodules.commands["quote"](message)
|
|
else:
|
|
return await utils.answer(mmm, self.strings("update_error", message))
|
|
image = io.BytesIO()
|
|
image.name = "quote.webp"
|
|
try:
|
|
PIL.Image.open(io.BytesIO(req.content)).save(image, "WEBP")
|
|
image.seek(0)
|
|
return await utils.answer(mmm, image)
|
|
except Exception as e:
|
|
logger.error(e, exc_info=True)
|
|
return await utils.answer(
|
|
mmm,
|
|
self.strings(
|
|
"server_error",
|
|
message
|
|
)
|
|
)
|
|
|
|
@loader.unrestricted
|
|
@loader.ratelimit
|
|
async def fquotecmd(self, message):
|
|
""".fquote @<username> <text> or <reply> <text> - fake quote"""
|
|
if not self.config["SILENT_PROCESSING"]:
|
|
mmm = await utils.answer(
|
|
message,
|
|
self.strings(
|
|
"processing",
|
|
message
|
|
)
|
|
)
|
|
args = utils.get_args_raw(message)
|
|
reply = await message.get_reply_message()
|
|
spl_args = args.split(maxsplit=1)
|
|
if len(spl_args) == 2 and (spl_args[0].startswith('@') or spl_args[0].isdigit()):
|
|
user = spl_args[0][1:] if spl_args[0].startswith('@') else int(spl_args[0])
|
|
text = spl_args[1]
|
|
elif reply:
|
|
user = reply.sender_id
|
|
text = " ".join(spl_args)
|
|
else:
|
|
await utils.answer(
|
|
mmm,
|
|
self.strings(
|
|
"fq_incorrect_args",
|
|
message
|
|
)
|
|
)
|
|
return
|
|
try:
|
|
user = await self.client.get_entity(user)
|
|
name = telethon.utils.get_display_name(user)
|
|
avatar = await self.client.download_profile_photo(user.id, "mishase_cache/") if not str(user).isdigit() else \
|
|
await self.client.download_profile_photo(user, "mishase_cache/")
|
|
except:
|
|
await utils.answer(
|
|
mmm,
|
|
self.strings(
|
|
"fq_incorrect_args",
|
|
message
|
|
)
|
|
)
|
|
return
|
|
files = []
|
|
msg = {
|
|
"text": text,
|
|
"reply": None,
|
|
"entities": [],
|
|
"author": {
|
|
"id": str(user.id) if not str(user).isdigit() else str(user),
|
|
"name": name,
|
|
"adminTitle": ' ',
|
|
}
|
|
}
|
|
if avatar:
|
|
msg['author']['picture'] = {'file': f'@av{str(user.id).lstrip("-")}'}
|
|
files.append(("files", (
|
|
f'@av{str(user.id if not str(user).isdigit() else user).lstrip("-")}', open(avatar, "rb"),
|
|
"image/jpg")))
|
|
else:
|
|
files.append(("files", ("file", bytearray(), "text/text")))
|
|
data = {
|
|
"messages": [msg],
|
|
"maxWidth": self.config["MAX_WIDTH"],
|
|
"scaleFactor": self.config["SCALE_FACTOR"],
|
|
"squareAvatar": self.config["SQUARE_AVATAR"],
|
|
"textColor": self.config["TEXT_COLOR"],
|
|
"replyLineColor": self.config["REPLY_LINE_COLOR"],
|
|
"adminTitleColor": self.config["ADMINTITLE_COLOR"],
|
|
"messageBorderRadius": self.config["MESSAGE_BORDER_RADIUS"],
|
|
"replyThumbnailBorderRadius": self.config["REPLY_THUMB_BORDER_RADIUS"],
|
|
"pictureBorderRadius": self.config["PICTURE_BORDER_RADIUS"],
|
|
"backgroundColor": self.config["BACKGROUND_COLOR"],
|
|
}
|
|
try:
|
|
req = await utils.run_sync(
|
|
requests.post,
|
|
"https://quotes.mishase.dev/create",
|
|
data={"data": json.dumps(data), "moduleBuild": BUILD_ID},
|
|
files=files,
|
|
timeout=100
|
|
)
|
|
except (requests.ConnectionError, requests.exceptions.Timeout):
|
|
await clean_files()
|
|
return await utils.answer(
|
|
mmm,
|
|
self.strings("unreachable_error", message)
|
|
)
|
|
await clean_files()
|
|
if req.status_code >= 520:
|
|
return await utils.answer(
|
|
mmm,
|
|
self.strings("unreachable_error", message)
|
|
)
|
|
if req.status_code >= 500:
|
|
return await utils.answer(
|
|
mmm,
|
|
self.strings("server_error", message)
|
|
)
|
|
if req.status_code == 418:
|
|
if not self.config["SILENT_PROCESSING"]:
|
|
await utils.answer(mmm, self.strings("updating", message))
|
|
if await update(self.allmodules.modules, message):
|
|
return await self.allmodules.commands["quote"](message)
|
|
else:
|
|
return await utils.answer(mmm, self.strings("update_error", message))
|
|
image = io.BytesIO()
|
|
image.name = "fquote.webp"
|
|
try:
|
|
PIL.Image.open(io.BytesIO(req.content)).save(image, "WEBP")
|
|
image.seek(0)
|
|
return await utils.answer(mmm, image)
|
|
except Exception as e:
|
|
logger.error(e, exc_info=True)
|
|
return await utils.answer(
|
|
mmm,
|
|
self.strings(
|
|
"server_error",
|
|
message
|
|
)
|
|
)
|
|
|
|
|
|
async def clean_files():
|
|
return os.system("rm -rf mishase_cache/*")
|
|
|
|
|
|
async def update(modules, message, url=MODULE_PATH):
|
|
loader = next(filter(lambda x: "LoaderMod" == x.__class__.__name__, modules))
|
|
try:
|
|
if await loader.download_and_install(url, message):
|
|
loader._db.set(__name__, "loaded_modules",
|
|
list(set(loader._db.get(__name__, "loaded_modules", [])).union([url])))
|
|
return True
|
|
else:
|
|
return False
|
|
except Exception as e:
|
|
logger.error(e, exc_info=True)
|
|
return False
|
|
|
|
|
|
async def check_media(client, reply, media_files: dict, is_reply: bool):
|
|
allowed_exts = [".png", ".webp", ".jpg", ".jpeg", ".tgs", ".mp4"]
|
|
if reply and reply.media:
|
|
if reply.photo:
|
|
data = reply.photo
|
|
elif reply.document:
|
|
if reply.audio or reply.voice:
|
|
return media_files
|
|
elif reply.file.ext not in allowed_exts:
|
|
return media_files
|
|
data = reply.media.document
|
|
else:
|
|
return media_files
|
|
else:
|
|
return media_files
|
|
if not data or data is None:
|
|
return media_files
|
|
else:
|
|
if reply.file.ext == ".tgs":
|
|
if not is_reply:
|
|
data = await client.download_media(data, "mishase_cache/")
|
|
new = data + ".png"
|
|
os.system(f"lottie_convert.py \"{data}\" \"{new}\" --frame 5")
|
|
data = new
|
|
else:
|
|
data = await client.download_media(data, thumb=-1)
|
|
elif reply.file.ext == ".mp4":
|
|
data = await client.download_media(data, thumb=-1)
|
|
else:
|
|
data = await client.download_media(data, "mishase_cache/")
|
|
if not is_reply:
|
|
media_files["@media{}".format(reply.id)] = data
|
|
else:
|
|
media_files["@mediareply{}".format(reply.id)] = data
|
|
return media_files
|
|
|
|
|
|
async def get_media_caption(reply_message):
|
|
if reply_message and reply_message.media:
|
|
if type(reply_message.media) == MessageMediaUnsupported:
|
|
return "Unsupported message media"
|
|
if reply_message.photo:
|
|
return mQuotesMod.strings["media_type_photo"]
|
|
dice = False
|
|
try:
|
|
dice = True if reply_message.dice else False
|
|
except AttributeError:
|
|
try:
|
|
dice = True if type(reply_message.media) == telethon.tl.types.MessageMediaDice else False
|
|
except AttributeError:
|
|
pass
|
|
if dice:
|
|
dice_type = ""
|
|
dice_text = reply_message.media.value
|
|
if reply_message.media.emoticon == "🎲":
|
|
dice_type = mQuotesMod.strings["dice_type_dice"]
|
|
return "{} {}: {}".format(reply_message.media.emoticon, dice_type, dice_text)
|
|
elif reply_message.media.emoticon == "🎯":
|
|
if dice_text == 1:
|
|
dice_text = mQuotesMod.strings["dart_missed"]
|
|
elif dice_text == 5:
|
|
dice_text = mQuotesMod.strings["dart_almostthere"]
|
|
elif dice_text == 6:
|
|
dice_text = mQuotesMod.strings["dart_bullseye"]
|
|
else:
|
|
return "{} {}".format(reply_message.media.emoticon, mQuotesMod.strings["dart_thrown"])
|
|
dice_type = mQuotesMod.strings["dice_type_dart"]
|
|
return "{} {}: {}".format(reply_message.media.emoticon, dice_type, dice_text)
|
|
elif reply_message.media.emoticon == "🏀":
|
|
return "{} {}".format(reply_message.media.emoticon, mQuotesMod.strings["ball_thrown"])
|
|
elif reply_message.media.emoticon == b"\xe2\x9a\xbd".decode():
|
|
return "{} {}".format(reply_message.media.emoticon, mQuotesMod.strings["ball_kicked"])
|
|
else:
|
|
return "Unsupported dice type ({}): {}" \
|
|
.format(reply_message.media.emoticon, reply_message.media.value)
|
|
elif reply_message.poll:
|
|
if reply_message.poll.poll.quiz:
|
|
return mQuotesMod.strings["media_type_quiz"] + reply_message.poll.poll.question
|
|
else:
|
|
return mQuotesMod.strings["media_type_poll"] + reply_message.poll.poll.question
|
|
elif reply_message.geo:
|
|
return mQuotesMod.strings["media_type_location"]
|
|
elif reply_message.contact:
|
|
name = reply_message.contact.first_name + reply_message.contact.last_name
|
|
return mQuotesMod.strings["media_type_contact"].format(
|
|
name
|
|
)
|
|
elif reply_message.document:
|
|
if reply_message.gif:
|
|
return mQuotesMod.strings["media_type_gif"]
|
|
elif reply_message.video:
|
|
if reply_message.video.attributes[0].round_message:
|
|
return mQuotesMod.strings["media_type_videomessage"]
|
|
else:
|
|
return mQuotesMod.strings["media_type_video"]
|
|
elif reply_message.audio:
|
|
return mQuotesMod.strings["media_type_audio"].format(
|
|
reply_message.audio.attributes[0].performer,
|
|
reply_message.audio.attributes[0].title
|
|
)
|
|
elif reply_message.voice:
|
|
return mQuotesMod.strings["media_type_voice"]
|
|
elif reply_message.sticker:
|
|
emoji = reply_message.file.emoji
|
|
caption = "{} {}".format(
|
|
emoji,
|
|
mQuotesMod.strings["media_type_sticker"]
|
|
) if emoji \
|
|
else mQuotesMod.strings["media_type_sticker"]
|
|
return caption
|
|
elif reply_message.file:
|
|
try:
|
|
file_name = reply_message.file.name
|
|
file_size = await humanize(
|
|
reply_message.file.size
|
|
)
|
|
return "{} {} ({})".format(
|
|
mQuotesMod.strings["media_type_file"],
|
|
file_name,
|
|
file_size
|
|
)
|
|
except Exception:
|
|
return mQuotesMod.strings["media_type_file"]
|
|
else:
|
|
return ""
|
|
else:
|
|
return ""
|
|
return ""
|
|
|
|
|
|
async def humanize(num: float, suffix: str = "B") -> str:
|
|
for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]:
|
|
if abs(num) < 1024.0:
|
|
return "%3.1f%s%s" % (num, unit, suffix)
|
|
num /= 1024.0
|
|
return "%.1f%s%s" % (num, "Yi", suffix)
|
|
|
|
|
|
async def get_markdown(message):
|
|
entities = []
|
|
if not message.entities:
|
|
return []
|
|
for entity in message.entities:
|
|
entity_type = type(entity)
|
|
start = entity.offset
|
|
end = entity.length
|
|
if entity_type is MessageEntityBold:
|
|
etype = "bold"
|
|
elif entity_type is MessageEntityItalic:
|
|
etype = "italic"
|
|
elif entity_type in [MessageEntityUrl, MessageEntityPhone]:
|
|
etype = "url"
|
|
elif entity_type is MessageEntityCode:
|
|
etype = "monospace"
|
|
elif entity_type is MessageEntityStrike:
|
|
etype = "strikethrough"
|
|
elif entity_type is MessageEntityUnderline:
|
|
etype = "underline"
|
|
elif entity_type in [MessageEntityMention, MessageEntityTextUrl, MessageEntityMentionName, MessageEntityHashtag,
|
|
MessageEntityCashtag, MessageEntityBotCommand]:
|
|
etype = "bluetext"
|
|
entities.append({"type": etype, "offset": start, "length": end})
|
|
return entities
|