fix: some broken strings

feat: install button and new limoka version update notification
drop: watcher that installs module  from bot (temporary)
This commit is contained in:
2026-04-18 13:19:24 +03:00
parent eb71e39fcf
commit be47e59d97

453
Limoka.py
View File

@@ -25,15 +25,37 @@ from telethon import TelegramClient
from telethon.errors.rpcerrorlist import YouBlockedUserError from telethon.errors.rpcerrorlist import YouBlockedUserError
from telethon import functions from telethon import functions
from aiogram.types import InlineKeyboardMarkup, InlineKeyboardButton
import ast
try: try:
from aiogram.utils.exceptions import BadRequest from aiogram.utils.exceptions import BadRequest
except ImportError: except ImportError:
from aiogram.exceptions import TelegramBadRequest as BadRequest from aiogram.exceptions import TelegramBadRequest as BadRequest
from .. import utils, loader from .. import utils, loader
from ..types import InlineCall from ..types import BotInlineCall, InlineCall
logger = logging.getLogger("Limoka") logger = logging.getLogger("Limoka")
__version__ = (1, 4, 5) __version__ = (1, 5, 0)
def _parse_version_from_source(source: str):
try:
tree = ast.parse(source)
except SyntaxError:
return None
for node in tree.body:
if isinstance(node, ast.Assign):
for target in node.targets:
if isinstance(target, ast.Name) and target.id == "__version__":
try:
return ast.literal_eval(node.value)
except (ValueError, SyntaxError):
return None
return None
WEIGHTS = { WEIGHTS = {
"inline.token_obtainment": 15, "inline.token_obtainment": 15,
@@ -236,9 +258,11 @@ class ModuleContentBuilder:
module_info: Dict[str, Any], module_info: Dict[str, Any],
query: str, query: str,
filters: Dict[str, List[str]], filters: Dict[str, List[str]],
url: str,
include_categories: bool = True, include_categories: bool = True,
module_path: Optional[str] = None, module_path: Optional[str] = None,
lang: str = "en", lang: str = "en",
) -> tuple: ) -> tuple:
""" """
Build complete formatted module content. Build complete formatted module content.
@@ -260,39 +284,48 @@ class ModuleContentBuilder:
categories_text = self._build_categories_text(filters) categories_text = self._build_categories_text(filters)
commands = self.formatter.format_commands(module_info, lang) commands = self.formatter.format_commands(module_info, lang)
header = self._build_header(query, name, description, dev_username, module_path) header = self._build_header(query, name, description, dev_username, module_path, url)
footer = self._build_footer(module_path) footer = self._build_footer(module_path)
body_pages = self._paginate_commands(commands) body_pages = self._paginate_commands(commands)
return header, body_pages, footer, categories_text return header, body_pages, footer, categories_text
def _build_header(self, query: str, name: str, description: str, dev_username: str, module_path: Optional[str]) -> str: def _build_header(self, query: str, name: str, description: str, dev_username: str, module_path: Optional[str], url: str) -> str:
"""Build message header with module info and tags.""" """Build message header with module info and tags."""
tags_list = self.repository.get_tags_for_module(module_path) if module_path else [] tags_list = self.repository.get_tags_for_module(module_path) if module_path else []
tags_text = ", ".join(self.strings["tags"].get(tag, tag) for tag in tags_list) tags_text = ", ".join(self.strings["tags"].get(tag, tag) for tag in tags_list)
header_template = self.strings["found_header"] header_template = self.strings["found_header"]
if not tags_text: if not tags_text:
# Replace the tags line but keep blockquote closure at the end
header_template = header_template.replace( header_template = header_template.replace(
"<blockquote><b><tg-emoji emoji-id=5418376169055602355>🏷</tg-emoji> Tags:</b> {tags}</blockquote>\n\n", "\n\n<b><tg-emoji emoji-id=5418376169055602355>🏷</tg-emoji> Tags:</b> {tags}</blockquote>\n\n",
"" "</blockquote>\n\n"
) )
header_template = header_template.replace( header_template = header_template.replace(
"<blockquote><b><tg-emoji emoji-id=5418376169055602355>🏷</tg-emoji> Теги:</b> {tags}</blockquote>\n\n", "\n\n<b><tg-emoji emoji-id=5418376169055602355>🏷</tg-emoji> Теги:</b> {tags}</blockquote>\n\n",
"" "</blockquote>\n\n"
) )
return header_template.format( header = header_template.format(
query=html.escape(query), query=html.escape(query),
name=name, name=name,
description=description, description=description,
username=dev_username, username=dev_username,
tags=tags_text, tags=tags_text,
module_path=module_path,
url=url
) )
# Remove extra newlines
header = re.sub(r'\n+', '\n', header)
return header
def _build_footer(self, module_path: Optional[str]) -> str: def _build_footer(self, module_path: Optional[str]) -> str:
"""Build message footer with download command.""" """Build message footer with download command."""
clean_path = (module_path or "").replace("\\", "/") clean_path = (module_path or "").replace("\\", "/")
return "" # unused for now, but may be used in the future for additional info
return self.strings["found_footer"].format( return self.strings["found_footer"].format(
url=html.escape(self.formatter.strings.get("limokaurl", "")), url=html.escape(self.formatter.strings.get("limokaurl", "")),
module_path=html.escape(clean_path), module_path=html.escape(clean_path),
@@ -351,22 +384,20 @@ class Limoka(loader.Module):
"found_header": ( "found_header": (
"<blockquote><tg-emoji emoji-id=5413334818047940135>🔍</tg-emoji> Found module <b>{name}</b> " "<blockquote><tg-emoji emoji-id=5413334818047940135>🔍</tg-emoji> Found module <b>{name}</b> "
"by query: <b>{query}</b>\n\n" "by query: <b>{query}</b>\n\n"
"<b><tg-emoji emoji-id=5413350219800661019>🌐</tg-emoji> <a href='{url}{module_path}'>Source</a></b>\n"
"<b><tg-emoji emoji-id=5418376169055602355></tg-emoji> Description:</b> {description}\n" "<b><tg-emoji emoji-id=5418376169055602355></tg-emoji> Description:</b> {description}\n"
"<b><tg-emoji emoji-id=5418299289141004396>🧑‍💻</tg-emoji> Developer:</b> {username}\n\n" "<b><tg-emoji emoji-id=5418299289141004396>🧑‍💻</tg-emoji> Developer:</b> {username}\n\n"
"<blockquote><b><tg-emoji emoji-id=5418376169055602355>🏷</tg-emoji> Tags:</b> {tags}</blockquote>\n\n" "<b><tg-emoji emoji-id=5418376169055602355>🏷</tg-emoji> Tags:</b> {tags}</blockquote>\n\n"
), ),
"found_body": ("{commands}"), "found_body": ("{commands}"),
"found_footer": (
"<blockquote>\n<tg-emoji emoji-id=5411143117711624172>🪄</tg-emoji> <code>{prefix}dlm {url}{module_path}</code></blockquote>"
),
"caption_short": ( "caption_short": (
"<blockquote><tg-emoji emoji-id=5413334818047940135>🔍</tg-emoji> <b>{safe_name}</b>\n" "<blockquote><tg-emoji emoji-id=5413334818047940135>🔍</tg-emoji> <b>{safe_name}</b>\n"
"<b><tg-emoji emoji-id=5413350219800661019>🌐</tg-emoji> <a href='{url}{module_path}'>Source</a></b>\n"
"<b><tg-emoji emoji-id=5418376169055602355></tg-emoji> Description:</b> {safe_desc}\n" "<b><tg-emoji emoji-id=5418376169055602355></tg-emoji> Description:</b> {safe_desc}\n"
"<b><tg-emoji emoji-id=5418299289141004396>🧑‍💻</tg-emoji> Dev:</b> {dev_username}\n" "<b><tg-emoji emoji-id=5418299289141004396>🧑‍💻</tg-emoji> Dev:</b> {dev_username}</blockquote>\n"
"<tg-emoji emoji-id=5411143117711624172>🪄</tg-emoji> <code>{prefix}dlm {module_path}</code></blockquote>"
), ),
"command_template": "{emoji} <code>{prefix}{command}</code> — {description}\n", "command_template": "<blockquote>{emoji} <code>{prefix}{command}</code> — {description}</blockquote>\n",
"inline_handler_template": "{inline_bot} {command}{description}\n", "inline_handler_template": "<blockquote>{inline_bot} {command}{description}</blockquote>\n",
"emojis": { "emojis": {
1: "<tg-emoji emoji-id=5416037945909987712>1⃣</tg-emoji>", 1: "<tg-emoji emoji-id=5416037945909987712>1⃣</tg-emoji>",
2: "<tg-emoji emoji-id=5413855071731470617>2⃣</tg-emoji>", 2: "<tg-emoji emoji-id=5413855071731470617>2⃣</tg-emoji>",
@@ -414,6 +445,7 @@ class Limoka(loader.Module):
"global_button": "🌍 Results", "global_button": "🌍 Results",
"filtered_button": "🏷️ Filtered search", "filtered_button": "🏷️ Filtered search",
"inline_search": "🔍 Search in Limoka", "inline_search": "🔍 Search in Limoka",
"install_button": "🪄 Install",
"inline_no_results": "<blockquote>❌ No modules found</blockquote>", "inline_no_results": "<blockquote>❌ No modules found</blockquote>",
"inline_error": "<blockquote>❌ Search error occurred</blockquote>", "inline_error": "<blockquote>❌ Search error occurred</blockquote>",
"inline_short_query": "<blockquote>❌ Query too short (min 2 chars)</blockquote>", "inline_short_query": "<blockquote>❌ Query too short (min 2 chars)</blockquote>",
@@ -450,7 +482,13 @@ class Limoka(loader.Module):
"If error persists again, report to developers</blockquote>" "If error persists again, report to developers</blockquote>"
), ),
"body_page": "Commands", "body_page": "Commands",
"limokaurl": "https://raw.githubusercontent.com/MuRuLOSE/limoka/refs/heads/main/" "install_failed": "Installation failed. Check logs for details.",
"install_succeeded": "Module installed successfully!",
"update_available": (
"🔔 <b>New update available!</b>\n\n"
"New Limoka Version {version} already available. Please update for better performance, bug fixes, and new features.\n"
"Press the button below to update the module."
),
} }
strings_ru = { strings_ru = {
"name": "Limoka", "name": "Limoka",
@@ -461,23 +499,21 @@ class Limoka(loader.Module):
), ),
"found_header": ( "found_header": (
"<blockquote><tg-emoji emoji-id=5413334818047940135>🔍</tg-emoji> Найден модуль <b>{name}</b> " "<blockquote><tg-emoji emoji-id=5413334818047940135>🔍</tg-emoji> Найден модуль <b>{name}</b> "
"по запросу: <b>{query}</b></blockquote>\n\n" "по запросу: <b>{query}</b>\n\n"
"<blockquote><b><tg-emoji emoji-id=5418376169055602355></tg-emoji> Описание:</b> {description}</blockquote>\n" "<b><tg-emoji emoji-id=5413350219800661019>🌐</tg-emoji> <a href='{url}{module_path}'>Исходный код</a></b>\n"
"<blockquote><b><tg-emoji emoji-id=5418299289141004396>🧑‍💻</tg-emoji> Разработчик:</b> {username}</blockquote>\n\n" "<b><tg-emoji emoji-id=5418376169055602355></tg-emoji> Описание:</b> {description}\n"
"<blockquote><b><tg-emoji emoji-id=5418376169055602355>🏷</tg-emoji> Теги:</b> {tags}</blockquote>\n\n" "<b><tg-emoji emoji-id=5418299289141004396>🧑‍💻</tg-emoji> Разработчик:</b> {username}\n\n"
"<b><tg-emoji emoji-id=5418376169055602355>🏷</tg-emoji> Теги:</b> {tags}</blockquote>\n\n"
), ),
"found_body": ("{commands}"), "found_body": ("{commands}"),
"found_footer": (
"\n<blockquote><tg-emoji emoji-id=5411143117711624172>🪄</tg-emoji> <code>{prefix}dlm {url}{module_path}</code></blockquote>"
),
"caption_short": ( "caption_short": (
"<blockquote><tg-emoji emoji-id=5413334818047940135>🔍</tg-emoji> <b>{safe_name}</b>\n" "<blockquote><tg-emoji emoji-id=5413334818047940135>🔍</tg-emoji> <b>{safe_name}</b>\n"
"<b><tg-emoji emoji-id=5413350219800661019>🌐</tg-emoji> <a href='{url}{module_path}'>Исходный код</a></b>\n"
"<b><tg-emoji emoji-id=5418376169055602355></tg-emoji> Описание:</b> {safe_desc}\n" "<b><tg-emoji emoji-id=5418376169055602355></tg-emoji> Описание:</b> {safe_desc}\n"
"<b><tg-emoji emoji-id=5418299289141004396>🧑‍💻</tg-emoji> Разработчик:</b> {dev_username}\n" "<b><tg-emoji emoji-id=5418299289141004396>🧑‍💻</tg-emoji> Разработчик:</b> {dev_username}</blockquote>\n"
"<tg-emoji emoji-id=5411143117711624172>🪄</tg-emoji> <code>{prefix}dlm {module_path}</code></blockquote>"
), ),
"command_template": "<blockquote>{emoji} <code>{prefix}{command}</code> — {description}</blockquote>\n", "command_template": "<blockquote>{emoji} <code>{prefix}{command}</code> — {description}</blockquote>\n",
"inline_handler_template": "{inline_bot} {command}{description}\n", "inline_handler_template": "<blockquote>{inline_bot} {command}{description}</blockquote>\n",
"emojis": { "emojis": {
1: "<tg-emoji emoji-id=5416037945909987712>1⃣</tg-emoji>", 1: "<tg-emoji emoji-id=5416037945909987712>1⃣</tg-emoji>",
2: "<tg-emoji emoji-id=5413855071731470617>2⃣</tg-emoji>", 2: "<tg-emoji emoji-id=5413855071731470617>2⃣</tg-emoji>",
@@ -530,6 +566,7 @@ class Limoka(loader.Module):
"global_button": "🌍 Результаты", "global_button": "🌍 Результаты",
"filtered_button": "🏷️ Поиск с фильтрами", "filtered_button": "🏷️ Поиск с фильтрами",
"inline_search": "🔍 Поиск в Limoka", "inline_search": "🔍 Поиск в Limoka",
"install_button": "🪄 Установить",
"inline_no_results": "<blockquote>❌ Модули не найдены</blockquote>", "inline_no_results": "<blockquote>❌ Модули не найдены</blockquote>",
"inline_error": "<blockquote>❌ Ошибка поиска</blockquote>", "inline_error": "<blockquote>❌ Ошибка поиска</blockquote>",
"inline_short_query": "<blockquote>❌ Запрос слишком короткий (мин. 2 символа)</blockquote>", "inline_short_query": "<blockquote>❌ Запрос слишком короткий (мин. 2 символа)</blockquote>",
@@ -566,6 +603,15 @@ class Limoka(loader.Module):
"Если ошибка сохраняется снова, сообщите разработчикам</blockquote>" "Если ошибка сохраняется снова, сообщите разработчикам</blockquote>"
), ),
"body_page": "Команды", "body_page": "Команды",
"install_failed": "Установка не удалась. Проверьте логи для деталей.",
"install_succeeded": "Модуль успешно установлен!",
"update_available": (
"🔔 <b>Доступно новое обновление!</b>\n\n"
"Новая версия Limoka {version} уже доступна. Пожалуйста, обновитесь для лучшей производительности, исправления багов и новых функций.\n"
"Нажмите кнопку ниже, чтобы обновить модуль."
),
"no_updates_available": "Нет доступных обновлений. У вас установлена последняя версия Limoka.",
"module_update_available": "Уведомление об обновлении модуля было отправлено, проверьте {bot}.",
"_cls_doc": "Модули теперь в одном месте с простым и удобным поиском!", "_cls_doc": "Модули теперь в одном месте с простым и удобным поиском!",
} }
@@ -590,11 +636,18 @@ class Limoka(loader.Module):
lambda: "If enabled, modules from developers with newbies tag will be not shown.", lambda: "If enabled, modules from developers with newbies tag will be not shown.",
validator=loader.validators.Boolean(), validator=loader.validators.Boolean(),
), ),
loader.ConfigValue(
"auto_update_check",
True,
lambda: "If enabled, Limoka will periodically check for updates and notify you when a new version is available.",
validator=loader.validators.Boolean(),
)
) )
self.name = self.strings["name"] self.name = self.strings["name"]
self._invalid_banners = set() self._invalid_banners = set()
self._bot_username = "limoka_bbot" self._bot_username = "limoka_bbot"
self._base_url = self.config["limokaurl"] self._base_url = self.config["limokaurl"]
self._self_bot_username = None
self.SEARCH_STATES = { self.SEARCH_STATES = {
"no_banner": "no_banner", "no_banner": "no_banner",
@@ -613,6 +666,35 @@ class Limoka(loader.Module):
def _filter_newbies(self, modules: Dict[str, Any]) -> Dict[str, Any]: def _filter_newbies(self, modules: Dict[str, Any]) -> Dict[str, Any]:
"""[DEPRECATED] Use ModuleRepository.apply_newbie_filter instead.""" """[DEPRECATED] Use ModuleRepository.apply_newbie_filter instead."""
return self.repository.apply_newbie_filter(self.config.get("filter_newbies_modules", False)) return self.repository.apply_newbie_filter(self.config.get("filter_newbies_modules", False))
@loader.loop(interval=3600*24)
async def periodic_update_check(self):
"""Periodically check for module updates if auto_update_check is enabled."""
if self.config["auto_update_check"]:
await self.check_for_module_update()
async def check_for_module_update(self):
async with aiohttp.ClientSession() as session:
try:
async with session.get(self._base_url + "Limoka.py", timeout=10) as response:
if response.status == 200:
version = _parse_version_from_source(await response.text())
if version is not None and version > __version__:
# TODO: TODO
await self.inline.bot.send_message(
self._tg_id,
self.strings["update_available"].format(version=version),
reply_markup=InlineKeyboardMarkup().add(InlineKeyboardButton("Update Now", callback_data="limoka:update_module"))
)
return True
return False
except Exception as e:
logger.error(f"Error checking for module update: {e}")
@loader.callback_handler()
async def callback_handler(self, call: BotInlineCall):
if call.data == "limoka:update_module":
await self._install_module_limoka(call)
def _create_search_session( def _create_search_session(
self, self,
@@ -668,8 +750,8 @@ class Limoka(loader.Module):
self.repository = ModuleRepository(raw_modules, repositories) self.repository = ModuleRepository(raw_modules, repositories)
self.modules = self.repository.apply_newbie_filter(self.config["filter_newbies_modules"]) self.modules = self.repository.apply_newbie_filter(self.config["filter_newbies_modules"])
self._userbot_bot_username = (await self.inline.bot.get_me()).username self._self_bot_username = (await self.inline.bot.get_me()).username
self.formatter = CommandFormatter(self.strings, self._userbot_bot_username, self.get_prefix()) self.formatter = CommandFormatter(self.strings, self._self_bot_username, self.get_prefix())
self.content_builder = ModuleContentBuilder(self.strings, self.formatter, self.repository) self.content_builder = ModuleContentBuilder(self.strings, self.formatter, self.repository)
self._service_bot_id = (await self.client.get_entity(self._bot_username)).id self._service_bot_id = (await self.client.get_entity(self._bot_username)).id
@@ -805,13 +887,15 @@ class Limoka(loader.Module):
module_info: Dict[str, Any], module_info: Dict[str, Any],
query: str, query: str,
filters: Dict[str, List[str]], filters: Dict[str, List[str]],
url: str,
include_categories: bool = True, include_categories: bool = True,
module_path: Optional[str] = None, module_path: Optional[str] = None,
lang: str = "en", lang: str = "en"
) -> tuple: ) -> tuple:
"""[DEPRECATED] Use ModuleContentBuilder.build_content instead.""" """[DEPRECATED] Use ModuleContentBuilder.build_content instead."""
return self.content_builder.build_content( return self.content_builder.build_content(
module_info, query, filters, include_categories, module_path, lang module_info, query, filters, url, include_categories, module_path, lang
) )
def _build_navigation_markup(self, session: Dict[str, Any]) -> list: def _build_navigation_markup(self, session: Dict[str, Any]) -> list:
@@ -951,6 +1035,15 @@ class Limoka(loader.Module):
}, },
] ]
) )
markup.append(
[
{
"text": self.strings["install_button"],
"callback": self._install_module,
"args": (session,),
},
]
)
markup.append( markup.append(
[{"text": self.strings.get("close", "❌ Close"), "action": "close", "style": "danger"}] [{"text": self.strings.get("close", "❌ Close"), "action": "close", "style": "danger"}]
) )
@@ -1024,6 +1117,7 @@ class Limoka(loader.Module):
include_categories=True, include_categories=True,
module_path=module_path, module_path=module_path,
lang=lang, lang=lang,
url=self._base_url
) )
current_body = body_pages[min(page_body, len(body_pages) - 1)] current_body = body_pages[min(page_body, len(body_pages) - 1)]
full_message = header + current_body + footer + categories_text full_message = header + current_body + footer + categories_text
@@ -1070,12 +1164,38 @@ class Limoka(loader.Module):
include_categories=True, include_categories=True,
module_path=module_path, module_path=module_path,
lang=self.user_lang, lang=self.user_lang,
url=self.config["limokaurl"]
) )
new_page_body = min(page_body + 1, len(body_pages) - 1) new_page_body = min(page_body + 1, len(body_pages) - 1)
await self._display_module( await self._display_module(
call, module_info, module_path, session, page_body=new_page_body call, module_info, module_path, session, page_body=new_page_body
) )
async def _install_module(self, call: InlineCall, session: Dict[str, Any]):
try:
loader = self.lookup("Loader")
await loader.download_and_install(f"{self.config['limokaurl']}{session['results'][session['current_index']]}")
if getattr(loader, "fully_loaded", False):
loader.update_modules_in_db()
except Exception:
await call.answer(f"{self.strings['install_failed']}", alert=True)
else:
await call.answer(f"{self.strings['install_succeeded']}", alert=True)
async def _install_module_limoka(self, call: InlineCall):
try:
loader = self.lookup("Loader")
await loader.download_and_install(f"{self.config['limokaurl']}Limoka.py")
logger.info(f"Downloading and installing: {self.config['limokaurl']}Limoka.py")
if getattr(loader, "fully_loaded", False):
loader.update_modules_in_db()
except Exception:
await call.answer(f"{self.strings['install_failed']}", alert=True)
else:
await call.answer(f"{self.strings['install_succeeded']}", alert=True)
async def _display_filter_menu(self, call: InlineCall, session: Dict[str, Any]): async def _display_filter_menu(self, call: InlineCall, session: Dict[str, Any]):
query = session["query"] query = session["query"]
current_filters = session["filters"] current_filters = session["filters"]
@@ -1589,6 +1709,15 @@ class Limoka(loader.Module):
message, module_info, module_path, display_session, 0 message, module_info, module_path, display_session, 0
) )
@loader.command(ru_doc="Проверить наличие обновлений модуля")
async def limokaupdatecmd(self, message: Message):
"""Check for module updates"""
is_update_available = await self.check_for_module_update()
if is_update_available:
await utils.answer(message, self.strings["module_update_available"].format(bot=self._self_bot_username))
else:
await utils.answer(message, self.strings["no_updates_available"])
async def _show_global_form(self, call: InlineCall, message: Message): async def _show_global_form(self, call: InlineCall, message: Message):
markup = [ markup = [
[ [
@@ -1742,135 +1871,135 @@ class Limoka(loader.Module):
self.strings["history"].format(history="\n".join(formatted_history)), self.strings["history"].format(history="\n".join(formatted_history)),
) )
@loader.watcher(from_dl=False) # @loader.watcher(from_dl=False)
async def secure_install_watcher(self, message: Message): # async def secure_install_watcher(self, message: Message):
if not message.text: # if not message.text:
return # return
if not hasattr(message, "from_id") or not message.from_id: # if not hasattr(message, "from_id") or not message.from_id:
return # return
sender_id = None # sender_id = None
if hasattr(message.from_id, "user_id"): # if hasattr(message.from_id, "user_id"):
sender_id = message.from_id.user_id # sender_id = message.from_id.user_id
elif hasattr(message.from_id, "channel_id"): # elif hasattr(message.from_id, "channel_id"):
sender_id = message.from_id.channel_id # sender_id = message.from_id.channel_id
if sender_id != self._service_bot_id: # if sender_id != self._service_bot_id:
# logger.debug("Message not from official bot, ignoring") # # logger.debug("Message not from official bot, ignoring")
return # return
if not self.config["external_install_allowed"]: # if not self.config["external_install_allowed"]:
return # return
try: # try:
clean_text = ( # clean_text = (
getattr(message, "raw_text", None) # getattr(message, "raw_text", None)
or getattr(message, "message", None) # or getattr(message, "message", None)
or message.text # or message.text
or "" # or ""
) # )
if message.entities: # if message.entities:
from html import unescape # from html import unescape
clean_text = unescape(clean_text) # clean_text = unescape(clean_text)
clean_text = re.sub(r"<[^>]+>", "", clean_text) # clean_text = re.sub(r"<[^>]+>", "", clean_text)
match = re.search(r"#limoka:([^\s\"'<>]+)", clean_text) # match = re.search(r"#limoka:([^\s\"'<>]+)", clean_text)
if not match: # if not match:
logger.debug( # logger.debug(
"No #limoka tag found in cleaned text; leaving original message intact" # "No #limoka tag found in cleaned text; leaving original message intact"
) # )
return # return
tag_content = match.group(1) # tag_content = match.group(1)
parts = tag_content.split(":", 1) # parts = tag_content.split(":", 1)
if len(parts) != 2: # if len(parts) != 2:
logger.error("Invalid tag format after cleaning") # logger.error("Invalid tag format after cleaning")
await utils.answer(message, self.strings["watcher_invalid_format"]) # await utils.answer(message, self.strings["watcher_invalid_format"])
return # return
module_path, signature_hex = parts # module_path, signature_hex = parts
module_path = re.sub(r"[<>\"']", "", module_path).strip() # module_path = re.sub(r"[<>\"']", "", module_path).strip()
if module_path.startswith("href="): # if module_path.startswith("href="):
module_path = module_path[5:].strip('"').strip("'") # module_path = module_path[5:].strip('"').strip("'")
if module_path not in self.modules: # if module_path not in self.modules:
found = False # found = False
for db_path in self.modules.keys(): # for db_path in self.modules.keys():
if module_path in db_path or db_path in module_path: # if module_path in db_path or db_path in module_path:
module_path = db_path # module_path = db_path
found = True # found = True
break # break
if not found: # if not found:
logger.warning(f"Module not found after cleanup: {module_path}") # logger.warning(f"Module not found after cleanup: {module_path}")
await utils.answer( # await utils.answer(
message, # message,
self.strings["watcher_module_not_found"].format( # self.strings["watcher_module_not_found"].format(
path=html.escape(module_path) # path=html.escape(module_path)
), # ),
) # )
return # return
try: # try:
import base64 # import base64
from cryptography.hazmat.primitives.asymmetric import ed25519 # from cryptography.hazmat.primitives.asymmetric import ed25519
PUB_KEY_B64 = ( # PUB_KEY_B64 = (
"MCowBQYDK2VwAyEA1ltSnqtf3pGBuctuAYqHivCXsaRtKOVxavai7yin7ZE=" # "MCowBQYDK2VwAyEA1ltSnqtf3pGBuctuAYqHivCXsaRtKOVxavai7yin7ZE="
) # )
der_bytes = base64.b64decode(PUB_KEY_B64) # der_bytes = base64.b64decode(PUB_KEY_B64)
raw_pubkey = der_bytes[-32:] # raw_pubkey = der_bytes[-32:]
module_url = self.config["limokaurl"] + module_path # module_url = self.config["limokaurl"] + module_path
async with aiohttp.ClientSession() as session: # async with aiohttp.ClientSession() as session:
async with session.get(module_url, timeout=10) as resp: # async with session.get(module_url, timeout=10) as resp:
if resp.status != 200: # if resp.status != 200:
logger.error( # logger.error(
f"Failed to fetch module for verification: {module_url} (HTTP {resp.status})" # f"Failed to fetch module for verification: {module_url} (HTTP {resp.status})"
) # )
await utils.answer( # await utils.answer(
message, self.strings["watcher_loader_missing"] # message, self.strings["watcher_loader_missing"]
) # )
return # return
module_bytes = await resp.read() # module_bytes = await resp.read()
sha256 = hashlib.sha256(module_bytes).hexdigest() # sha256 = hashlib.sha256(module_bytes).hexdigest()
public_key = ed25519.Ed25519PublicKey.from_public_bytes( # public_key = ed25519.Ed25519PublicKey.from_public_bytes(
raw_pubkey # raw_pubkey
) # )
signature = bytes.fromhex(signature_hex) # signature = bytes.fromhex(signature_hex)
signed_payload = f"{module_path}|{sha256}".encode() # signed_payload = f"{module_path}|{sha256}".encode()
public_key.verify(signature, signed_payload) # public_key.verify(signature, signed_payload)
except Exception as e: # except Exception as e:
logger.error(f"Signature verification failed for {module_path}: {e}") # logger.error(f"Signature verification failed for {module_path}: {e}")
await utils.answer(message, self.strings["watcher_signature_invalid"]) # await utils.answer(message, self.strings["watcher_signature_invalid"])
return # return
loader_mod = self.lookup("loader") # loader_mod = self.lookup("loader")
if not loader_mod: # if not loader_mod:
logger.error("Loader module not found") # logger.error("Loader module not found")
await utils.answer(message, self.strings["watcher_loader_missing"]) # await utils.answer(message, self.strings["watcher_loader_missing"])
return # return
module_url = self.config["limokaurl"] + module_path # module_url = self.config["limokaurl"] + module_path
status = await loader_mod.download_and_install(module_url, None) # status = await loader_mod.download_and_install(module_url, None)
if getattr(loader_mod, "fully_loaded", False): # if getattr(loader_mod, "fully_loaded", False):
loader_mod.update_modules_in_db() # loader_mod.update_modules_in_db()
try: # try:
await message.delete() # await message.delete()
except Exception as e: # except Exception as e:
logger.error(f"Failed to delete message: {e}") # logger.error(f"Failed to delete message: {e}")
if status: # if status:
try: # try:
bot_peer = await self.client.get_entity(self._service_bot_id) # bot_peer = await self.client.get_entity(self._service_bot_id)
await self.client.send_message( # await self.client.send_message(
bot_peer, f"#limoka:sucsess:{message.id}" # bot_peer, f"#limoka:sucsess:{message.id}"
) # )
except Exception as e: # except Exception as e:
logger.error(f"Failed to send success confirmation: {e}") # logger.error(f"Failed to send success confirmation: {e}")
else: # else:
logger.error(f"Installation failed with status: {status}") # logger.error(f"Installation failed with status: {status}")
try: # try:
bot_peer = await self.client.get_entity(self._service_bot_id) # bot_peer = await self.client.get_entity(self._service_bot_id)
await self.client.send_message( # await self.client.send_message(
bot_peer, f"#limoka:failed:{message.id}" # bot_peer, f"#limoka:failed:{message.id}"
) # )
except Exception as e: # except Exception as e:
logger.error(f"Failed to send failure notification: {e}") # logger.error(f"Failed to send failure notification: {e}")
except Exception as e: # except Exception as e:
logger.exception(f"CRITICAL ERROR in secure_install_watcher: {e}") # logger.exception(f"CRITICAL ERROR in secure_install_watcher: {e}")
try: # try:
await utils.answer( # await utils.answer(
message, self.strings["watcher_critical"].format(error=str(e)[:100]) # message, self.strings["watcher_critical"].format(error=str(e)[:100])
) # )
await asyncio.sleep(5) # await asyncio.sleep(5)
await message.delete() # await message.delete()
except Exception: # except Exception:
pass # pass