__version__ = (0, 1, 23)
# ▄▀█ █▄ █ █▀█ █▄ █ █▀█ ▀▀█ █▀█ █ █ █▀
# █▀█ █ ▀█ █▄█ █ ▀█ ▀▀█ █ ▀▀█ ▀▀█ ▄█
#
# © Copyright 2024
#
# developed by @anon97945
#
# https://t.me/apodiktum_modules
# https://github.com/anon97945
#
# 🔒 Licensed under the GNU GPLv3
# 🌐 https://www.gnu.org/licenses/gpl-3.0.html
# meta developer: @apodiktum_modules
# meta banner: https://t.me/apodiktum_dumpster/11
# meta pic: https://t.me/apodiktum_dumpster/13
# scope: hikka_only
# scope: hikka_min 1.6.1
import contextlib
import logging
from datetime import datetime, timezone
from telethon.errors import MessageIdInvalidError
from telethon.tl.types import Message
from .. import loader, utils
logger = logging.getLogger(__name__)
@loader.tds
class ApodiktumMsgMergerMod(loader.Module):
"""
This module will merge own messages, if there is no message in between.
"""
strings = {
"name": "Apo-MsgMerger",
"developer": "@anon97945",
"_cfg_active": "Whether the module is turned on (or not).",
"_cfg_blacklist_chats": "The list of chats that the module will watch(or not).",
"_cfg_cst_auto_migrate": "Wheather to auto migrate defined changes on startup.",
"_cfg_new_line_prefix": "The prefix which will be added to the new line.",
"_cfg_edit_timeout": (
"The maximum time in minuted to edit the message. 0 for no limit."
),
"_cfg_ignore_prefix": "The prefix to ignore the merging fully.",
"_cfg_link_preview": (
"Whether to send webpage previews.\nLeave empty to use"
" automatically decide based on the messages to merge."
),
"_cfg_merge_own_reply": "Whether to merge any message from own reply.",
"_cfg_merge_own_reply_msg": (
"The message which will stay if the message is merged from own reply."
),
"_cfg_merge_urls": "Whether to merge messages with URLs.",
"_cfg_new_lines": "The number of new lines to add to the message.",
"_cfg_reverse_merge": (
"Whether to merge into the new(True) or old(False) message."
),
"_cfg_skip_emoji": "Whether to skip the merging of messages with single emoji.",
"_cfg_skip_reactions": (
"Whether to skip the merging of messages with reactions."
),
"_cfg_skip_length": "The length of the message to skip the merging.",
"_cfg_skip_prefix": "The prefix to skip the merging.",
"_cfg_skip_reply": "Whether to skip the merging of messages with reply.",
"_cfg_whitelist": (
"Whether the chatlist includes(True) or excludes(False) the chat."
),
"undo_merge_fail": "Failed to unmerge the messages.",
"nothing_to_merge": "Nothing to merge.",
}
strings_en = {}
strings_de = {}
strings_ru = {}
all_strings = {
"strings": strings,
"strings_en": strings,
"strings_de": strings_de,
"strings_ru": strings_ru,
}
changes = {
"migration1": {
"name": {
"old": "Apo MsgMerger",
"new": "Apo-MsgMerger",
},
},
}
def __init__(self):
self._ratelimit = []
self.merged_msgs = {}
self.config = loader.ModuleConfig(
loader.ConfigValue(
"active",
True,
doc=lambda: self.strings("_cfg_active"),
validator=loader.validators.Boolean(),
),
loader.ConfigValue(
"chatlist",
doc=lambda: self.strings("_cfg_blacklist_chats"),
validator=loader.validators.Series(loader.validators.TelegramID()),
),
loader.ConfigValue(
"edit_timeout",
2,
doc=lambda: self.strings("_cfg_edit_timeout"),
validator=loader.validators.Union(
loader.validators.Integer(minimum=1),
loader.validators.NoneType(),
),
),
loader.ConfigValue(
"ignore_prefix",
["+"],
doc=lambda: self.strings("_cfg_ignore_prefix"),
validator=loader.validators.Series(
loader.validators.Union(
loader.validators.String(length=1),
loader.validators.NoneType(),
),
),
),
loader.ConfigValue(
"link_preview",
doc=lambda: self.strings("_cfg_link_preview"),
validator=loader.validators.Union(
loader.validators.Boolean(),
loader.validators.NoneType(),
),
),
loader.ConfigValue(
"merge_own_reply",
False,
doc=lambda: self.strings("_cfg_merge_own_reply"),
validator=loader.validators.Boolean(),
),
loader.ConfigValue(
"merge_urls",
True,
doc=lambda: self.strings("_cfg_merge_urls"),
validator=loader.validators.Boolean(),
),
loader.ConfigValue(
"new_line_pref",
">",
doc=lambda: self.strings("_cfg_new_line_prefix"),
validator=loader.validators.Union(
loader.validators.String(length=1),
loader.validators.NoneType(),
),
),
loader.ConfigValue(
"new_lines",
1,
doc=lambda: self.strings("_cfg_new_lines"),
validator=loader.validators.Integer(minimum=1, maximum=2),
),
loader.ConfigValue(
"own_reply_msg",
"☝️",
doc=lambda: self.strings("_cfg_merge_own_reply_msg"),
validator=loader.validators.Union(
loader.validators.String(),
loader.validators.NoneType(),
),
),
loader.ConfigValue(
"reverse_merge",
True,
doc=lambda: self.strings("_cfg_reverse_merge"),
validator=loader.validators.Boolean(),
),
loader.ConfigValue(
"skip_emoji",
True,
doc=lambda: self.strings("_cfg_skip_emoji"),
validator=loader.validators.Boolean(),
),
loader.ConfigValue(
"skip_length",
doc=lambda: self.strings("_cfg_skip_length"),
validator=loader.validators.Union(
loader.validators.Integer(minimum=0),
loader.validators.NoneType(),
),
),
loader.ConfigValue(
"skip_prefix",
[">"],
doc=lambda: self.strings("_cfg_skip_prefix"),
validator=loader.validators.Series(
loader.validators.Union(
loader.validators.String(length=1),
loader.validators.NoneType(),
),
),
),
loader.ConfigValue(
"skip_reactions",
True,
doc=lambda: self.strings("_cfg_skip_reactions"),
validator=loader.validators.Boolean(),
),
loader.ConfigValue(
"skip_reply",
False,
doc=lambda: self.strings("_cfg_skip_reply"),
validator=loader.validators.Boolean(),
),
loader.ConfigValue(
"whitelist",
False,
doc=lambda: self.strings("_cfg_whitelist"),
validator=loader.validators.Boolean(),
),
)
async def client_ready(self):
self.apo_lib = await self.import_lib(
"https://raw.githubusercontent.com/anon97945/hikka-libs/master/apodiktum_library.py",
suspend_on_error=True,
)
await self.apo_lib.migrator.auto_migrate_handler(
self.__class__.__name__,
self.strings("name"),
self.changes,
self.config["auto_migrate"],
)
self.apo_lib.watcher_q.register(self.__class__.__name__)
async def on_unload(self):
self.apo_lib.watcher_q.unregister(self.__class__.__name__)
async def cmsgmergercmd(self, message: Message):
"""
open the config of the module.
"""
name = self.strings("name")
await self.allmodules.commands["config"](
await utils.answer(message, f"{self.get_prefix()}config {name}")
)
async def mergecmd(self, message: Message):
"""
merge all messages of own until the last message of another user.
"""
chat_id = utils.get_chat_id(message)
text = ""
del_msgs = []
merge_msgs = []
try:
msgs = await self._client.get_messages(chat_id, limit=100)
for i in range(-len(msgs), -1):
if (
not isinstance(msgs[i], Message)
or not msgs[i].out
or msgs[i].forward
or msgs[i].via_bot
or msgs[i].sender_id != self.tg_id
or (self.config["skip_reactions"] and msgs[i].reactions)
or (msgs[i].media and not getattr(msgs[i].media, "webpage", False))
or (not self.config["merge_own_reply"] and msgs[i].is_reply)
):
break
if i != -len(msgs):
del_msgs += [msgs[i].id]
merge_msgs += [msgs[i]]
if merge_msgs:
for msg in reversed(merge_msgs):
if text:
text += "\n" * self.config["new_lines"]
if self.config["new_line_pref"]:
text += self.config["new_line_pref"]
if (
not text
and self.apo_lib.utils.raw_text(msg).startswith(
self.get_prefix()
)
and not self.config["new_line_pref"]
):
text += ">"
text += self.apo_lib.utils.raw_text(msg, True)
await self._client.delete_messages(chat_id, del_msgs)
if not text:
text = self.apo_lib.utils.get_str(
"nothing_to_merge", self.all_strings, message
)
return await utils.answer(message, text)
except IndexError:
return
async def unmergecmd(self, message: Message):
"""
unmerge the messages.
"""
chat_id = utils.get_chat_id(message)
if utils.get_chat_id(message) in self.merged_msgs:
try:
for key, msg_value in self.merged_msgs[chat_id]["message"].items():
if key == "text":
msg_text = msg_value
elif key == "link_preview":
msg_link_preview = msg_value
for key, msg_value in self.merged_msgs[chat_id]["last_msg"].items():
if key == "id":
last_msg_id = msg_value
elif key == "text":
last_msg_text = msg_value
elif key == "link_preview":
last_msg_link_preview = msg_value
await self.client.edit_message(
chat_id,
last_msg_id,
last_msg_text,
link_preview=last_msg_link_preview,
)
await self.client.edit_message(
message, msg_text, link_preview=msg_link_preview
)
except MessageIdInvalidError:
await utils.answer(message, self.strings("undo_merge_fail"))
else:
await utils.answer(message, self.strings("undo_merge_fail"))
self.merged_msgs.clear()
async def q_watcher(self, message: Message):
await self._queue_handler(message)
async def _queue_handler(self, message: Message):
if (
not self.config["active"]
or not isinstance(message, Message)
or not message.out
or message.forward
or message.via_bot
or self.apo_lib.utils.raw_text(message).startswith(self.get_prefix())
):
return
chat_id = utils.get_chat_id(message)
if (self.config["whitelist"] and chat_id not in self.config["chatlist"]) or (
not self.config["whitelist"] and chat_id in self.config["chatlist"]
):
return
if self.config["ignore_prefix"] and any(
self.apo_lib.utils.raw_text(message).startswith(prefix)
for prefix in self.config["ignore_prefix"]
):
return
if self.config["skip_prefix"] and (
found_prefix := next(
(
prefix
for prefix in self.config["skip_prefix"]
if self.apo_lib.utils.raw_text(message).startswith(prefix)
),
None,
)
):
text = message.text.replace(utils.escape_html(found_prefix), "", 1)
if len(text) > 0:
with contextlib.suppress(Exception):
await message.edit(text)
return
if (
(
self.config["skip_length"]
and len(self.apo_lib.utils.remove_html(message.text))
>= self.config["skip_length"]
)
or (self.config["skip_reactions"] and message.reactions)
or (
message.media
and not getattr(message.media, "webpage", False)
or (
not self.config["merge_urls"]
and self.apo_lib.utils.get_entityurls(message)
)
)
):
return
last_msg = None
try:
if utils.get_topic(message):
last_msg_iter = await self._client.get_messages(
chat_id, limit=5, reply_to=utils.get_topic(message)
)
else:
last_msg_iter = await self._client.get_messages(chat_id, limit=5)
for i in range(-4, -1):
if last_msg_iter[i].id != message.id:
last_msg = last_msg_iter[i]
break
except IndexError:
return
if (
self.config["merge_own_reply"]
and message.is_reply
and (
not message.reply_to_msg_id
or message.reply_to_msg_id != utils.get_topic(message)
)
):
last_msg_reply = await message.get_reply_message()
last_msg = last_msg_reply
else:
last_msg_reply = None
if (
(
self.config["skip_emoji"]
and (
self.apo_lib.utils.is_emoji(self.apo_lib.utils.raw_text(message))
or self.apo_lib.utils.is_emoji(
self.apo_lib.utils.raw_text(last_msg)
)
)
)
or (
self.config["skip_reply"]
and not self.config["merge_own_reply"]
and (message.is_reply or last_msg.is_reply)
and (
not message.reply_to_msg_id
or message.reply_to_msg_id != utils.get_topic(message)
)
)
or (
last_msg.is_reply
and message.is_reply
and (
not message.reply_to_msg_id
or message.reply_to_msg_id != utils.get_topic(message)
)
and not self.config["merge_own_reply"]
)
):
return
if (
last_msg.sender_id != self.tg_id
or not isinstance(last_msg, Message)
or last_msg.via_bot
or last_msg.fwd_from
or (self.config["skip_reactions"] and last_msg.reactions)
or (
last_msg.media
and not getattr(last_msg.media, "webpage", False)
or (
not self.config["merge_urls"]
and self.apo_lib.utils.get_entityurls(last_msg)
)
)
or self.apo_lib.utils.remove_html(last_msg.text)[0] == self.get_prefix()
):
return
if self.config["ignore_prefix"] and any(
self.apo_lib.utils.raw_text(last_msg).startswith(prefix)
for prefix in self.config["ignore_prefix"]
):
return
if (
self.config["edit_timeout"]
and (
datetime.now(timezone.utc) - (last_msg.edit_date or last_msg.date)
).total_seconds()
> self.config["edit_timeout"] * 60
) and (
self.config["merge_own_reply"]
and (
not message.is_reply
or message.reply_to_msg_id
and message.reply_to_msg_id == utils.get_topic(message)
)
or not self.config["merge_own_reply"]
):
return
text = last_msg.text
text += "\n" * self.config["new_lines"]
if self.config["new_line_pref"]:
text += self.config["new_line_pref"]
text += message.text
if (
(message.is_reply and message.reply_to_msg_id != utils.get_topic(message))
or self.config["reverse_merge"]
) and (
not self.config["merge_own_reply"]
or not message.is_reply
or message.reply_to_msg_id == utils.get_topic(message)
):
message, last_msg = last_msg, message
message_text = last_msg.text
last_msg_text = message.text
else:
message_text = message.text
last_msg_text = last_msg.text
if self.config["link_preview"] is None:
link_preview = getattr(message.media, "webpage", False) or getattr(
last_msg.media, "webpage", False
)
else:
link_preview = bool(self.config["link_preview"])
try:
if self.config["reverse_merge"] and (
self.config["merge_own_reply"]
and (
(
last_msg.is_reply
and last_msg.reply_to_msg_id != utils.get_topic(last_msg)
)
or (
message.is_reply
and (
not message.reply_to_msg_id
or message.reply_to_msg_id != utils.get_topic(message)
)
)
)
):
if last_msg.is_reply and last_msg.reply_to_msg_id != utils.get_topic(
last_msg
):
reply = await last_msg.get_reply_message()
else:
reply = await message.get_reply_message()
await last_msg.delete()
msg = await last_msg.client.send_message(
chat_id, text, reply_to=reply, link_preview=link_preview
)
else:
msg = await last_msg.edit(text, link_preview=link_preview)
if msg.out:
if (
self.config["merge_own_reply"]
and self.config["own_reply_msg"]
and not self.config["reverse_merge"]
and (
message.is_reply
and (
not message.reply_to_msg_id
or message.reply_to_msg_id != utils.get_topic(message)
)
)
):
await message.edit(
self.config["own_reply_msg"], link_preview=link_preview
)
else:
await message.delete()
self.merged_msgs.clear()
self.merged_msgs[chat_id] = {
"message": {
"text": message_text,
"link_preview": link_preview,
},
"last_msg": {
"id": msg.id or last_msg.id,
"text": last_msg_text,
"link_preview": link_preview,
},
}
return
except Exception as exc:
self.apo_lib.utils.log(
logging.DEBUG,
__name__,
f"Edit failed on last_msg:\n{str(exc)}",
debug_msg=True,
)
return