# █ █ ▀ █▄▀ ▄▀█ █▀█ ▀
# █▀█ █ █ █ █▀█ █▀▄ █
# © Copyright 2022
# https://t.me/hikariatama
#
# 🔒 Licensed under the GNU AGPLv3
# 🌐 https://www.gnu.org/licenses/agpl-3.0.html
# meta pic: https://static.dan.tatar/secret_chat_icon.png
# meta banner: https://mods.hikariatama.ru/badges/secret_chat.jpg
# meta developer: @hikarimods
# requires: telethon_secret_chat
# scope: hikka_only
# scope: hikka_min 1.2.10
import io
import logging
from telethon.events import NewMessage
from telethon.tl.functions.channels import CreateChannelRequest
from telethon.tl.types import Message
from telethon.utils import get_display_name
from telethon_secret_chat import SecretChatManager
from .. import loader, utils
logger = logging.getLogger(__name__)
@loader.tds
class SecretChatMod(loader.Module):
"""De-secrets secret chats"""
strings = {"name": "SecretChat", "state": "👀 SecretChat is now {}"}
def _get_chat_id(self, chat) -> int:
cid = [chat.admin_id] + [chat.participant_id]
cid.remove(self._tg_id)
cid = cid[0]
return cid
async def _create_chat(self, chat):
cid = self._get_chat_id(chat)
decrypted_chat = None
async for d in self._client.iter_dialogs():
if d.title == f"secret-chat-with-{cid}":
decrypted_chat = d.entity
if not decrypted_chat:
decrypted_chat = (
await self._client(
CreateChannelRequest(
f"secret-chat-with-{cid}",
"SecretChat conversation with {}",
megagroup=True,
)
)
).chats[0]
@self._client.on(NewMessage(chats=[decrypted_chat.id]))
async def secret_chat_processor(event):
"""secret_chat_processor"""
await self._manager.send_secret_message(chat.id, event.text)
await event.edit(f"<< {event.text}")
self._chats[cid] = decrypted_chat
self._manager = SecretChatManager(
self._client,
auto_accept=True,
new_chat_created=self._new_chat,
)
self._manager.add_secret_event_handler(func=self._replier)
self._chats = {}
self._secret_chats = {}
async def _replier(self, event):
if not self.get("state", False):
return
e = event.decrypted_event
user = self._secret_chats[event.message.chat_id]
if e.message:
await self._client.send_message(self._chats[user], f">> {e.message}")
if e.file:
try:
m = await self._manager.download_secret_media(e)
if m:
attrs = {}
f = io.BytesIO(m)
if "/" in (getattr(e.media, "mime_type", "") or ""):
f.name = "secret_media." + e.media.mime_type.split("/")[-1]
if getattr(e.media, "mime_type", None) == "audio/ogg":
attrs["voice_note"] = True
if getattr(e.media, "caption", False):
attrs["caption"] = e.media.caption
if "caption" not in attrs:
attrs["caption"] = ""
attrs["caption"] = ">> " + attrs["caption"]
await self._client.send_file(self._chats[user], f, **attrs)
except Exception:
await self._client.send_message(self._chats[user], ">>> [File]")
async def _new_chat(self, chat, _: bool):
if not self.get("state", False):
return
await self._create_chat(chat)
user = self._get_chat_id(chat)
self._secret_chats[chat.id] = user
u = await self._client.get_entity(user)
await self._client.send_message(
self._chats[user],
(
"㊙️ New secret chat with {get_display_name(u)} started'
),
)
async def on_unload(self):
self._client.remove_event_handler(self._manager._secret_chat_event_loop)
del self._manager
for handler in self._client.list_event_handlers():
if handler[0].__doc__ == "secret_chat_processor":
self._client.remove_event_handler(handler)
async def desecretcmd(self, message: Message):
"""Toggle secret chat handler"""
current = self.get("state", False)
new = not current
self.set("state", new)
await utils.answer(
message, self.strings("state").format("on" if new else "off")
)