From 61ebb7fa472caa2abd4d6c7523bcf8830258167c Mon Sep 17 00:00:00 2001 From: Macsim Date: Sun, 8 Feb 2026 20:17:50 +0300 Subject: [PATCH] Legacy version is here! --- LimokaLegacy.py | 1420 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1420 insertions(+) create mode 100644 LimokaLegacy.py diff --git a/LimokaLegacy.py b/LimokaLegacy.py new file mode 100644 index 0000000..4e322ea --- /dev/null +++ b/LimokaLegacy.py @@ -0,0 +1,1420 @@ +# meta developer: @limokanews +# requires: whoosh cryptography + +from collections import Counter, defaultdict +import shutil +from whoosh.index import create_in, open_dir +from whoosh.fields import Schema, TEXT, ID +from whoosh.qparser import QueryParser, OrGroup +from whoosh.query import FuzzyTerm, Wildcard +from whoosh.writing import LockError +import aiohttp +import random +import logging +import os +import html +import json +import re +import asyncio +from typing import Iterable, Union, List, Dict, Any, Optional +import hashlib +from telethon.types import Message +from telethon.errors.rpcerrorlist import WebpageMediaEmptyError +from telethon import TelegramClient +from telethon.errors.rpcerrorlist import YouBlockedUserError +from telethon import functions + +try: + from aiogram.utils.exceptions import BadRequest +except ImportError: + from aiogram.exceptions import TelegramBadRequest as BadRequest +from .. import utils, loader +from ..types import InlineCall + +logger = logging.getLogger("Limoka") +__version__ = (1, 0, 0) + +WEIGHTS = { + "inline.token_obtainment": 15, + "main": 10, + "inline": 7, + "translations": 5, + "security": 3, +} + +DEFAULT_WEIGHT = 1 + +BASE_DIR = os.getcwd() + + +def _get_lang_value(data: Dict[str, Any], lang: str) -> str: + if not isinstance(data, dict): + return str(data) if data else "" + return data.get(lang, data.get("default", data.get("en", ""))) + + +class Search: + def __init__(self, query, ix): + self.schema = Schema( + title=TEXT(stored=True), path=ID(stored=True), content=TEXT(stored=True) + ) + self.query = query + self.ix = ix + + def search_module(self): + with self.ix.searcher() as searcher: + parser = QueryParser("content", self.ix.schema, group=OrGroup.factory(0.8)) + query = parser.parse(self.query) + wildcard_query = Wildcard("content", f"*{self.query}*") + fuzzy_query = FuzzyTerm("content", self.query, maxdist=2, prefixlength=1) + for search_query in [query, wildcard_query, fuzzy_query]: + results = searcher.search(search_query) + if results: + return list(set(result["path"] for result in results)) + return [] + + +class LimokaAPI: + async def fetch_json(self, base_url, path): + url = f"{base_url}{path}" + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + return json.loads(await response.text()) + + +@loader.tds +class LimokaLegacy(loader.Module): + """ + Modules are now in one place with easy searching! + For Hikka and FTG Userbots. This module has outdated functionality and is kept for legacy reasons only. + Read https://t.me/limokanews/133 for more information. + """ + + strings = { + "name": "Limoka Legacy", + "wait": ( + "Just wait\n" + "🔍 A search is underway among {count} modules " + "for the query: {query}\n" + "{fact}" + ), + "found_header": ( + "🔍 Found module {name} " + "by query: {query}\n\n" + "ℹ️ Description: {description}\n" + "🧑‍💻 Developer: {username}\n\n" + "🏷 Tags: {tags}\n\n" + ), + "found_body": ("{commands}"), + "found_footer": "", + "caption_short": ( + "🔍 {safe_name}\n" + "ℹ️ Description: {safe_desc}\n" + "🧑‍💻 Dev: {dev_username}" + ), + "command_template": "{emoji} {prefix}{command} — {description}\n", + "inline_handler_template": "{inline_bot} {command} — {description}\n", + "emojis": { + 1: "1️⃣", + 2: "2️⃣", + 3: "3️⃣", + 4: "4️⃣", + 5: "5️⃣", + 6: "6️⃣", + 7: "7️⃣", + 8: "8️⃣", + 9: "9️⃣", + }, + "404": " Not found by query: {query}", + "noargs": " No args", + "?": "🔎 Request too short / not found", + "no_info": "No information", + "facts": [ + "🛡 The limoka catalog is carefully moderated!", + "🚀 Limoka performance allows you to search for modules quickly!", + ], + "history": ( + "🔎 Your search history:\n" + "{history}" + ), + "empty_history": "🔎 Your search history is empty!", + "enter_query": "🔍 Enter new search query:", + "global_search": "🔍 Global search for {query} — found {count} modules", + "change_query": "🔍 Change query", + "back": "🔙 Back", + "global_button": "🌍 Results", + "first_page": "This is the first page!", + "last_page": "This is the last page!", + "display_error": "Error displaying module. Please try again.", + "error_occurred": "An error occurred. Please try again.", + "start_search_form": "🔍 Limoka Search\nEnter your query to search for modules:", + "history_cleared": "🧹 Search history cleared!", + "invalid_history_arg": " Invalid argument for history command. Use:\n.lshistory - show history\n.lshistory clear - clear history", + "close": "❌ Close", + "indexing_in_progress": ( + "⚠️ Database is busy, " + "try again later. " + "If issue persists, try " + "removing limoka_index in the userbot's root folder. " + "If error persists again, report to developers" + ), + "install_btn": "🛠 Install", + "source_btn": "📦 Source", + "installed": "✅ Installed successfully!", + "install_failed": "❌ Installation failed!", + "tags": { + "newbie": "Newbie", + "herokutrusted": "Heroku Trusted", + "hikkatrusted": "Hikka Trusted", + "nonactive": "Non-active repository", + "nonlongermaintained": "Abandoned repository", + } + } + strings_ru = { + "name": "Limoka", + "wait": ( + "Подождите\n" + "🔍 Идёт поиск среди {count} модулей по запросу: {query}\n" + "{fact}" + ), + "found_header": ( + "🔍 Найден модуль {name} " + "по запросу: {query}\n\n" + "ℹ️ Описание: {description}\n" + "🧑‍💻 Разработчик: {username}\n\n" + "🏷 Теги: {tags}\n\n" + ), + "found_body": ("{commands}"), + "found_footer": "", + "caption_short": ( + "🔍 {safe_name}\n" + "ℹ️ Описание: {safe_desc}\n" + "🧑‍💻 Разработчик: {dev_username}" + ), + "command_template": "{emoji} {prefix}{command} — {description}\n", + "inline_handler_template": "{inline_bot} {command} — {description}\n", + "emojis": { + 1: "1️⃣", + 2: "2️⃣", + 3: "3️⃣", + 4: "4️⃣", + 5: "5️⃣", + 6: "6️⃣", + 7: "7️⃣", + 8: "8️⃣", + 9: "9️⃣", + }, + "404": " Не найдено по запросу: {query}", + "noargs": " Нет аргументов", + "?": "🔎 Запрос слишком короткий / не найден", + "no_info": "Нет информации", + "facts": [ + "🛡 Каталог Limoka тщательно модерируется!", + "🚀 Limoka позволяет искать модули с невероятной скоростью!", + ], + "history": ( + "🔎 История поиска:\n" + "{history}" + ), + "empty_history": "🔎 История поиска пуста!", + "enter_query": "🔍 Введите новый поисковый запрос:", + "global_search": "🔍 Глобальный поиск по {query} — найдено {count} модулей", + "change_query": "🔍 Изменить запрос", + "back": "🔙 Назад", + "global_button": "🌍 Результаты", + "first_page": "Это первая страница!", + "last_page": "Это последняя страница!", + "display_error": "Ошибка отображения модуля. Пожалуйста, попробуйте еще раз.", + "error_occurred": "Произошла ошибка. Пожалуйста, попробуйте еще раз.", + "start_search_form": "🔍 Limoka Поиск\nВведите ваш запрос для поиска модулей:", + "history_cleared": "🧹 История поиска очищена!", + "invalid_history_arg": " Неверный аргумент для команды истории. Используйте:\n.lshistory - показать историю\n.lshistory clear - очистить историю", + "close": "❌ Закрыть", + "indexing_in_progress": ( + "⚠️ База данных занята, " + "попробуйте снова через несколько секунд. " + "Если ошибка сохраняется, попробуйте " + "удалить limoka_index в корневой папке юзербота. " + "Если ошибка сохраняется снова, сообщите разработчикам" + ), + "install_btn": "🛠 Установить", + "source_btn": "📦 Исходный код", + "installed": "✅ Установлено успешно!", + "install_failed": "❌ Установка не удалась!", + "tags": { + "newbie": "Новичок", + "herokutrusted": "Heroku Trusted", + "hikkatrusted": "Hikka Trusted", + "nonactive": "Неактивный репозиторий", + "nonlongermaintained": "Заброшенный репозиторий", + } + } + + def __init__(self): + self.api = LimokaAPI() + self.config = loader.ModuleConfig( + loader.ConfigValue( + "limokaurl", + "https://raw.githubusercontent.com/MuRuLOSE/limoka/refs/heads/main/", + lambda: "Mirror (doesn't work): https://raw.githubusercontent.com/MuRuLOSE/limoka-mirror/refs/heads/main/", + validator=loader.validators.String(), + ), + loader.ConfigValue( + "external_install_allowed", + True, + lambda: "If enabled, module installation can be handled via external Limoka bot (@limoka_bbot) for better reliability.", + validator=loader.validators.Boolean(), + ), + loader.ConfigValue( + "filter_newbies_modules", + False, + lambda: "If enabled, modules from developers with newbies tag will be not shown.", + validator=loader.validators.Boolean(), + ), + ) + self.name = self.strings["name"] + self._invalid_banners = set() + self._bot_username = "limoka_bbot" + self._base_url = self.config["limokaurl"] + + BANNERS = { + "global_search": "https://github.com/MuRuLOSE/hikka-assets/blob/main/Limoka%20-%20Global%20Search.png?raw=true", + "not_found": "https://github.com/MuRuLOSE/hikka-assets/blob/main/Limoka%20-%20Not%20Found.png?raw=true", + "no_banner": "https://github.com/MuRuLOSE/hikka-assets/blob/main/Limoka%20-%20No%20banner.png?raw=true", + } + + def _filter_newbies(self, modules: Dict[str, Any]) -> Dict[str, Any]: + try: + if not self.config.get("filter_newbies_modules"): + return modules + except Exception: + return modules + + if not getattr(self, "repositories", None): + return modules + + filtered: Dict[str, Any] = {} + for path, info in modules.items(): + repo_key = "/".join(path.split("/")[:2]) if "/" in path else path + repo = self.repositories.get(repo_key) + tags = repo.get("tags", []) if repo else [] + if "newbie" in tags: + continue + filtered[path] = info + return filtered + + def _create_search_session( + self, + query: str = "", + results: Optional[List[str]] = None, + current_index: int = 0, + ) -> Dict[str, Any]: + return { + "query": query, + "results": results or [], + "current_index": current_index, + } + + async def client_ready(self, client, db): + self.client: TelegramClient = client + self.db = db + self.api = LimokaAPI() + self.schema = Schema( + title=TEXT(stored=True), path=ID(stored=True), content=TEXT(stored=True) + ) + os.makedirs("limoka_search", exist_ok=True) + if not os.path.exists("limoka_search/index"): + self.ix = create_in("limoka_search", self.schema) + else: + self.ix = open_dir("limoka_search") + self._history = self.pointer("history", []) + self.modules = (await self.api.fetch_json(self._base_url, "modules.json")).get( + "modules", {} + ) + raw = (await self.api.fetch_json(self._base_url, "repositories.json")).get( + "repositories", [] + ) + self.repositories = {repo["url"]: repo for repo in raw} + try: + self.modules = self._filter_newbies(self.modules) + except Exception: + pass + self._service_bot_id = (await self.client.get_entity(self._bot_username)).id + + loop = asyncio.get_running_loop() + self.ix_task = loop.run_in_executor( + None, lambda: asyncio.run(self._update_index()) + ) + + if self.config["external_install_allowed"]: + try: + message = await self.client.get_messages(self._bot_username, limit=1) + if not message: + message = await self.client.send_message( + self._bot_username, "/start" + ) + await message.delete() + await self.client( + functions.messages.DeleteHistoryRequest( + peer=self._bot_username, + max_id=0, + just_clear=True, + revoke=True, + ) + ) + + except YouBlockedUserError: + logger.warning( + f"Please unblock {self._bot_username} to enable external installation feature. Or disable external_install_allowed in Limoka settings to get rid of this message." + ) + self._userbot_bot_username = (await self.inline.bot.get_me()).username + + @loader.loop(interval=3600) + async def _update_modules_loop(self): + self.modules = await self.api.fetch_json(self._base_url, "modules.json") + try: + self.modules = self._filter_newbies(self.modules) + except Exception: + pass + await self._update_index() + + async def _update_index(self): + try: + writer = self.ix.writer() + modules_to_index = self._filter_newbies(self.modules) + for module_path, module_data in modules_to_index.items(): + writer.add_document( + title=module_data["name"], + path=module_path, + content=module_data["name"] + + " " + + ( + module_data.get("description") + or "" + + " " + + ( + (module_data.get("meta").get("developer") or "") + if module_data.get("meta") + else "" + ) + ), + ) + for func in module_data.get("commands", []): + for command, description in func.items(): + writer.add_document( + title=module_data["name"], + path=module_path, + content=f"{command} {description}", + ) + writer.commit() + except LockError: + folder = os.path.join(BASE_DIR, "limoka_search") + + if os.path.commonpath([folder, BASE_DIR]) == BASE_DIR and os.path.exists( + folder + ): + shutil.rmtree(folder) + await self._update_index() + else: + logger.error( + ( + f"Skipping unsafe rmtree for {folder}. Please, report this to developer. ", + f"Debug info: folder={folder}, base_dir={BASE_DIR}, common_path={os.path.commonpath([folder, BASE_DIR])}, exists={os.path.exists(folder)}", + ) + ) + + async def _validate_url(self, url: str) -> Optional[str]: + if not url or url in self._invalid_banners: + return None + try: + async with aiohttp.ClientSession() as session: + async with session.head( + url, timeout=5, allow_redirects=True + ) as response: + if response.status != 200: + self._invalid_banners.add(url) + return None + ct = response.headers.get("Content-Type", "").lower() + if not ct.startswith("image/"): + self._invalid_banners.add(url) + return None + return url + except Exception as e: + if url: + self._invalid_banners.add(url) + return None + + def find_userbot(self, keys: Iterable[str]) -> str | None: + scores = defaultdict(int) + + for key in keys: + parts = key.split(".") + + for i in range(1, len(parts)): + prefix = ".".join(parts[:i]) + suffix = ".".join(parts[i:]) + + weight = WEIGHTS.get(suffix, DEFAULT_WEIGHT) + + scores[prefix] += weight + + if not scores: + return None + + return max(scores, key=scores.get) + + @property + def user_lang(self) -> str: + def warn(msg: str): + logger.warning( + f"{msg} Defaulting language to English. " + "If this is unexpected, please report to the module developer." + ) + + lang = self.db.get(f"{__package__.split('.')[0]}.translations", "lang") + if lang: + return lang + + logger.warning( + "Cannot determine language from module translations. " + "Trying fallback method." + ) + + userbot = self.find_userbot(self.db.keys()) + if not userbot: + warn("Cannot determine userbot type.") + return "en" + + lang = self.db.get(f"{userbot}.translations", "lang") + if not lang: + warn("Cannot determine language from userbot translations.") + return "en" + + return lang + + def _get_description(self, desc_map: dict, lang: str) -> str: + desc = _get_lang_value(desc_map, lang) or self.strings["no_info"] + return html.escape(desc) + + def _get_emoji(self, index: int) -> str: + emojis = self.strings["emojis"] + + if index < 10: + return emojis.get(index, "") + + return "".join(emojis.get(int(d), "") for d in str(index)) + + def generate_commands(self, module_info, lang: str = "en"): + commands = [] + + for i, cmd in enumerate(module_info.get("new_commands", []), 1): + commands.append( + self.strings["command_template"].format( + prefix=self.get_prefix(), + command=html.escape(cmd.get("name", "")), + emoji=self._get_emoji(i), + description=self._get_description(cmd.get("description", {}), lang), + ) + ) + + for handler in module_info.get("inline_handlers", []): + commands.append( + self.strings["inline_handler_template"].format( + inline_bot=self._userbot_bot_username, + command=html.escape(handler.get("name", "")), + description=self._get_description( + handler.get("description", {}), lang + ), + ) + ) + + return commands + + def _format_module_content( + self, + module_info: Dict[str, Any], + query: str, + module_path: Optional[str] = None, + lang: str = "en", + ) -> tuple: + name = html.escape(module_info.get("name") or self.strings["no_info"]) + cls_doc = module_info.get("cls_doc", {}) + description = html.escape( + _get_lang_value(cls_doc, lang) + or _get_lang_value(module_info.get("description", ""), lang) + or self.strings["no_info"] + ) + dev_username = html.escape(module_info["meta"].get("developer") or "Unknown") + raw_path = ( + module_path if module_path is not None else module_info.get("path", "") + ) + clean_module_path = (raw_path or "").replace("\\", "/") + commands = self.generate_commands(module_info, lang) + if len(description) > 300: + description = description[:297] + "…" + repo_key = ( + "/".join(module_path.split("/")[:2]) if "/" in module_path else module_path + ) + tags_list = [] + for x in self.repositories: + if x.replace("https://github.com/", "") == repo_key: + tags_list = self.repositories.get(x, {}).get("tags", []) + break + logger.info(tags_list) + tags_text = ", ".join(self.strings["tags"].get(tag, tag) for tag in tags_list) + header = self.strings["found_header"].format( + query=html.escape(query), + name=name, + description=description, + username=dev_username, + tags=tags_text, + ) + commands_text = "".join(commands) + if len(commands_text) <= 500: + body_pages = [commands_text] if commands_text else [""] + else: + body_pages = [] + current_page = [] + current_length = 0 + for cmd in commands: + if current_length + len(cmd) > 500: + if current_page: + body_pages.append("".join(current_page)) + current_page = [] + current_length = 0 + current_page.append(cmd) + current_length += len(cmd) + if current_page: + body_pages.append("".join(current_page)) + if not body_pages: + body_pages = [""] + footer = self.strings["found_footer"] + return header, body_pages, footer + + def _build_module_markup( + self, + session: Dict[str, Any], + body_pages: List[str], + page_body: int, + module_path: str, + ) -> list: + result = session["results"] + index = session["current_index"] + + source_url = self._get_source_url(module_path) + + markup = [ + [ + { + "text": self.strings["install_btn"], + "callback": self._install_module, + "args": (module_path, session), + }, + { + "text": self.strings["source_btn"], + "url": source_url, + }, + ] + ] + if len(body_pages) > 1: + markup.append( + [ + { + "text": "◀️" if page_body > 0 else "🚫", + "callback": ( + self._previous_body_page + if page_body > 0 + else self._inline_void + ), + "args": ( + (session, module_path, page_body) if page_body > 0 else () + ), + }, + { + "text": f"Body {page_body + 1}/{len(body_pages)}", + "callback": self._inline_void, + }, + { + "text": "▶️" if page_body + 1 < len(body_pages) else "🚫", + "callback": ( + self._next_body_page + if page_body + 1 < len(body_pages) + else self._inline_void + ), + "args": ( + (session, module_path, page_body) + if page_body + 1 < len(body_pages) + else () + ), + }, + ] + ) + page = index + 1 + markup.append( + [ + { + "text": "⏪" if index > 0 else "🚫", + "callback": self._previous_page if index > 0 else self._inline_void, + "args": (session,) if index > 0 else (), + }, + {"text": f"{page}/{len(result)}", "callback": self._inline_void}, + { + "text": "⏩" if index + 1 < len(result) else "🚫", + "callback": ( + self._next_page + if index + 1 < len(result) + else self._inline_void + ), + "args": (session,) if index + 1 < len(result) else (), + }, + ] + ) + markup.append( + [ + { + "text": "🔄 " + self.strings["change_query"], + "callback": self._enter_query, + }, + ] + ) + markup.append( + [ + { + "text": self.strings["global_button"], + "callback": self._show_global_results, + "args": (session,), + }, + ] + ) + markup.append( + [{"text": self.strings.get("close", "❌ Close"), "action": "close"}] + ) + return markup + + async def _safe_display( + self, + message_or_call: Union[Message, InlineCall], + text: str, + markup: list, + photo: Optional[Any] = None, + ): + try: + if message_or_call is None: + logger.error("message_or_call is None in _safe_display") + return + if isinstance(message_or_call, Message): + if photo is not None: + await self.inline.form( + text=text, + message=message_or_call, + reply_markup=markup, + photo=photo, + ) + else: + await self.inline.form( + text=text, message=message_or_call, reply_markup=markup + ) + else: + if photo is not None: + await message_or_call.edit( + text=text, reply_markup=markup, photo=photo + ) + else: + await message_or_call.edit(text=text, reply_markup=markup) + except (BadRequest, WebpageMediaEmptyError) as e: + logger.exception(f"Error in _safe_display: {e}") + if isinstance(message_or_call, Message): + await utils.answer(message_or_call, self.strings["display_error"]) + elif hasattr(message_or_call, "edit"): + await message_or_call.edit(self.strings["display_error"]) + + async def _display_module( + self, + message_or_call: Union[Message, InlineCall], + module_info: Dict[str, Any], + module_path: str, + session: Dict[str, Any], + page_body: int = 0, + ): + try: + query = session["query"] + + lang = self.user_lang + module_banner_raw = module_info.get("meta", {}).get("banner") + photo = await self._validate_url( + module_banner_raw + ) or await self._validate_url( + "https://raw.githubusercontent.com/MuRuLOSE/hikka-assets/refs/heads/main/Limoka%20-%20No%20banner.png" + ) + + header, body_pages, footer = self._format_module_content( + module_info, + query, + module_path=module_path, + lang=lang, + ) + current_body = body_pages[min(page_body, len(body_pages) - 1)] + full_message = header + current_body + footer + + markup = self._build_module_markup( + session, body_pages, page_body, module_path + ) + + await self._safe_display(message_or_call, full_message, markup, photo) + except Exception as e: + logger.exception(f"Error in _display_module: {e}") + if isinstance(message_or_call, Message): + await utils.answer(message_or_call, self.strings["error_occurred"]) + elif hasattr(message_or_call, "edit"): + await message_or_call.edit(self.strings["error_occurred"]) + + async def _previous_body_page( + self, + call: InlineCall, + session: Dict[str, Any], + module_path: str, + page_body: int, + ): + module_info = self.modules[module_path] + new_page_body = max(page_body - 1, 0) + await self._display_module( + call, module_info, module_path, session, page_body=new_page_body + ) + + async def _next_body_page( + self, + call: InlineCall, + session: Dict[str, Any], + module_path: str, + page_body: int, + ): + module_info = self.modules[module_path] + query = session["query"] + header, body_pages, footer = self._format_module_content( + module_info, + query, + module_path=module_path, + lang=self.user_lang, + ) + new_page_body = min(page_body + 1, len(body_pages) - 1) + await self._display_module( + call, module_info, module_path, session, page_body=new_page_body + ) + + async def _install_module( + self, call: InlineCall, module_path: str, session: Dict[str, Any] + ): + module_url = self.config["limokaurl"] + module_path + loader_mod = self.lookup("loader") + if not loader_mod: + await call.answer( + self.strings.get("watcher_loader_missing", "Loader not found") + ) + return + status = await loader_mod.download_and_install(module_url, None) + if getattr(loader_mod, "fully_loaded", False): + loader_mod.update_modules_in_db() + if status: + await call.answer(self.strings["installed"]) + else: + await call.answer(self.strings["install_failed"]) + + def _get_source_url(self, module_path: str) -> str: + repo_key = ( + "/".join(module_path.split("/")[:2]) if "/" in module_path else module_path + ) + repo_url = "https://github.com/" + repo_key + repo = self.repositories.get(repo_url, {}) + branch = repo.get("branch", "main") + path_in_repo = ( + "/".join(module_path.split("/")[2:]) + if len(module_path.split("/")) > 2 + else module_path + ) + return f"{repo_url}/blob/{branch}/{path_in_repo}" + + async def _show_results(self, call: InlineCall, session: Dict[str, Any]): + query = session["query"] + + searcher = Search(query.lower(), self.ix) + try: + result = searcher.search_module() + except Exception: + await call.edit(self.strings["?"], reply_markup=[]) + return + if not result: + markup = [ + [{"text": self.strings.get("close", "❌ Close"), "action": "close"}] + ] + photo = await self._validate_url( + "https://raw.githubusercontent.com/MuRuLOSE/hikka-assets/main/Limoka%20-%20Not%20Found.png" + ) + await self._safe_display( + call, self.strings["404"].format(query=query), markup, photo + ) + return + module_path = result[0] + module_info = self.modules[module_path] + + display_session = self._create_search_session( + query=query, + results=result, + current_index=0, + ) + await self._display_module(call, module_info, module_path, display_session, 0) + + async def _enter_query_handler( + self, call_or_query, query: Optional[str] = None, *args, **kwargs + ): + call = None + if query is None and isinstance(call_or_query, str): + query = call_or_query + for a in args: + if hasattr(a, "edit") or isinstance(a, Message): + call = a + break + else: + call = call_or_query + if call is None: + logger.error("_enter_query_handler: missing call/context") + return + if not query: + await call.edit( + self.strings["?"], + reply_markup=[ + [ + { + "text": "🔄 " + self.strings["change_query"], + "callback": self._enter_query, + } + ] + ], + ) + return + if len(query) <= 1: + await call.edit( + self.strings["?"], + reply_markup=[ + [ + { + "text": "🔄 " + self.strings["change_query"], + "callback": self._enter_query, + } + ] + ], + ) + return + searcher = Search(query.lower(), self.ix) + try: + result = searcher.search_module() + except Exception: + await call.edit( + self.strings["?"], + reply_markup=[ + [ + { + "text": "🔄 " + self.strings["change_query"], + "callback": self._enter_query, + } + ] + ], + ) + return + if not result: + await call.edit( + self.strings["404"].format(query=query), + reply_markup=[ + [ + { + "text": "🔄 " + self.strings["change_query"], + "callback": self._enter_query, + } + ], + [ + { + "text": self.strings.get("close", "❌ Close"), + "action": "close", + } + ], + ], + ) + return + module_path = result[0] + module_info = self.modules[module_path] + + # Create session for displaying module + display_session = self._create_search_session( + query=query, + results=result, + current_index=0, + ) + await self._display_module(call, module_info, module_path, display_session, 0) + + async def _enter_query(self, call: InlineCall, query: Optional[str] = None): + markup = [ + [ + { + "text": "✍️ " + self.strings["enter_query"], + "input": self.strings["enter_query"], + "handler": self._enter_query_handler, + } + ], + [ + { + "text": self.strings["back_to_results"], + "callback": self._show_results, + "args": ( + self._create_search_session( + query=query or "", + ), + ), + } + ], + [ + { + "text": self.strings.get("close", "❌ Close"), + "action": "close", + } + ], + ] + await call.edit(self.strings["enter_query"], reply_markup=markup) + + async def _show_global_results(self, call: InlineCall, session: Dict[str, Any]): + query = session["query"] + + searcher = Search(query.lower(), self.ix) + try: + result = searcher.search_module() + except Exception: + await call.edit(self.strings["?"], reply_markup=[]) + return + if not result: + await call.edit( + self.strings["404"].format(query=query), + reply_markup=[ + [ + { + "text": "🔄 " + self.strings["change_query"], + "callback": self._enter_query, + } + ] + ], + ) + return + text = self.strings["global_search"].format( + query=html.escape(query), count=len(result) + ) + buttons = [] + for i, path in enumerate(result[:15]): + info = self.modules.get(path) + if not info: + continue + name = info.get("name", "Unknown") + + global_session = self._create_search_session( + query=query, + results=result, + current_index=i, + ) + buttons.append( + [ + { + "text": f"{i+1}. {name}", + "callback": self._display_module_from_global, + "args": (path, global_session), + } + ] + ) + buttons.append( + [{"text": self.strings["change_query"], "callback": self._enter_query}] + ) + await call.edit(text=text[:4096], reply_markup=buttons) + + async def _display_module_from_global( + self, call: InlineCall, module_path: str, session: Dict[str, Any] + ): + module_info = self.modules[module_path] + await self._display_module(call, module_info, module_path, session, 0) + + async def _next_page(self, call: InlineCall, session: Dict[str, Any]): + result = session["results"] + index = session["current_index"] + + if index + 1 >= len(result): + await call.answer(self.strings["last_page"]) + return + index += 1 + module_path = result[index] + module_info = self.modules[module_path] + + new_session = session.copy() + new_session["current_index"] = index + await self._display_module(call, module_info, module_path, new_session, 0) + + async def _previous_page(self, call: InlineCall, session: Dict[str, Any]): + result = session["results"] + index = session["current_index"] + + if index - 1 < 0: + await call.answer(self.strings["first_page"]) + return + index -= 1 + module_path = result[index] + module_info = self.modules[module_path] + + new_session = session.copy() + new_session["current_index"] = index + await self._display_module(call, module_info, module_path, new_session, 0) + + async def _inline_void(self, call: InlineCall): + await call.answer() + + @loader.command(ru_doc="[запрос / ничего] — Поиск модулей") + async def limokacmd(self, message: Message): + """[query / nothing] - Search modules""" + args = utils.get_args_raw(message) + lock_path = os.path.join(BASE_DIR, "limoka_search", "index.lock") + + if os.path.exists(lock_path): + await utils.answer(message, self.strings["indexing_in_progress"]) + return + if not args: + markup = [ + [ + { + "text": "✍️ " + self.strings["enter_query"], + "input": self.strings["enter_query"], + "handler": self._enter_query_handler, + } + ], + [ + { + "text": self.strings["global_button"], + "callback": self._show_global_form, + "args": (message,), + } + ], + ] + markup.append( + [{"text": self.strings.get("close", "❌ Close"), "action": "close"}] + ) + await self.inline.form( + text=self.strings["start_search_form"], + message=message, + reply_markup=markup, + # photo=self._get_banner_for_state("global_search"), + ) + return + history = self.get("history", []) + if len(history) >= 10: + history = history[-9:] + history.append(args) + self.set("history", history) + await utils.answer( + message, + self.strings["wait"].format( + count=len(self.modules), + fact=random.choice(self.strings["facts"]), + query=args, + ), + ) + searcher = Search(args.lower(), self.ix) + try: + result = searcher.search_module() + except Exception: + return await utils.answer(message, self.strings["?"]) + if not result: + return await utils.answer(message, self.strings["404"].format(query=args)) + module_path = result[0] + module_info = self.modules[module_path] + + # Create session for displaying module + display_session = self._create_search_session( + query=args, + results=result, + current_index=0, + ) + await self._display_module( + message, module_info, module_path, display_session, 0 + ) + + async def _show_global_form(self, call: InlineCall, message: Message): + markup = [ + [ + { + "text": "✍️ " + self.strings["enter_query"], + "input": self.strings["enter_query"], + "handler": self._global_search_handler, + "args": (message,), + } + ], + [ + { + "text": "🔙 " + self.strings["back"], + "callback": self._inline_void, + } + ], + [ + { + "text": self.strings.get("close", "❌ Close"), + "action": "close", + } + ], + ] + await call.edit(self.strings["global_search_form"], reply_markup=markup) + + async def _global_search_handler( + self, call: InlineCall, query: str, message: Message, *args, **kwargs + ): + global_session = self._create_search_session( + query=query, + results=[], + current_index=0, + ) # idk what is that crap but it works lol + if len(query) <= 1: + await call.edit( + self.strings["?"], + reply_markup=[ + [ + { + "text": "🔄 " + self.strings["change_query"], + "callback": lambda c: self._show_global_form(c, message), + } + ], + [ + { + "text": self.strings.get("close", "❌ Close"), + "action": "close", + } + ], + ], + ) + return + searcher = Search(query.lower(), self.ix) + try: + result = searcher.search_module() + except Exception: + await call.edit( + self.strings["?"], + reply_markup=[ + [ + { + "text": "🔄 " + self.strings["change_query"], + "callback": lambda c: self._show_global_form(c, message), + } + ], + [ + { + "text": self.strings.get("close", "❌ Close"), + "action": "close", + } + ], + ], + ) + return + if not result: + await call.edit( + self.strings["404"].format(query=query), + reply_markup=[ + [ + { + "text": "🔄 " + self.strings["change_query"], + "callback": lambda c: self._show_global_form(c, message), + } + ], + [ + { + "text": self.strings.get("close", "❌ Close"), + "action": "close", + } + ], + ], + ) + return + text = self.strings["global_search"].format( + query=html.escape(query), count=len(result) + ) + buttons = [] + for i, path in enumerate(result[:15]): + info = self.modules.get(path) + if not info: + continue + name = info.get("name", "Unknown") + buttons.append( + [ + { + "text": f"{i+1}. {name}", + "callback": self._display_module_from_global, + "args": (path, global_session), + } + ] + ) + buttons.append( + [ + { + "text": "🔄 " + self.strings["change_query"], + "callback": lambda c: self._show_global_form(c, message), + } + ] + ) + buttons.append( + [{"text": self.strings.get("close", "❌ Close"), "action": "close"}] + ) + await call.edit(text=text[:4096], reply_markup=buttons) + + @loader.command(ru_doc="[clear] — Показать или очистить историю поиска") + async def lshistorycmd(self, message: Message): + """[clear] - Show or clear search history""" + args = utils.get_args_raw(message).strip().lower() + if args == "clear": + self.set("history", []) + await utils.answer(message, self.strings["history_cleared"]) + return + if args: + await utils.answer(message, self.strings["invalid_history_arg"]) + return + history = self.get("history", []) + if not history: + await utils.answer(message, self.strings["empty_history"]) + return + formatted_history = [ + f"{i+1}. {utils.escape_html(h)}" + for i, h in enumerate(history[-10:]) + ] + await utils.answer( + message, + self.strings["history"].format(history="\n".join(formatted_history)), + ) + + @loader.watcher(from_dl=False) + async def secure_install_watcher(self, message: Message): + if not message.text: + return + if not hasattr(message, "from_id") or not message.from_id: + return + sender_id = None + if hasattr(message.from_id, "user_id"): + sender_id = message.from_id.user_id + elif hasattr(message.from_id, "channel_id"): + sender_id = message.from_id.channel_id + if sender_id != self._service_bot_id: + # logger.debug("Message not from official bot, ignoring") + return + if not self.config["external_install_allowed"]: + return + try: + clean_text = ( + getattr(message, "raw_text", None) + or getattr(message, "message", None) + or message.text + or "" + ) + if message.entities: + from html import unescape + + clean_text = unescape(clean_text) + clean_text = re.sub(r"<[^>]+>", "", clean_text) + match = re.search(r"#limoka:([^\s\"'<>]+)", clean_text) + if not match: + logger.debug( + "No #limoka tag found in cleaned text; leaving original message intact" + ) + return + tag_content = match.group(1) + parts = tag_content.split(":", 1) + if len(parts) != 2: + logger.error("Invalid tag format after cleaning") + await utils.answer(message, self.strings["watcher_invalid_format"]) + return + module_path, signature_hex = parts + module_path = re.sub(r"[<>\"']", "", module_path).strip() + if module_path.startswith("href="): + module_path = module_path[5:].strip('"').strip("'") + if module_path not in self.modules: + found = False + for db_path in self.modules.keys(): + if module_path in db_path or db_path in module_path: + module_path = db_path + found = True + break + if not found: + logger.warning(f"Module not found after cleanup: {module_path}") + await utils.answer( + message, + self.strings["watcher_module_not_found"].format( + path=html.escape(module_path) + ), + ) + return + try: + import base64 + from cryptography.hazmat.primitives.asymmetric import ed25519 + + PUB_KEY_B64 = ( + "MCowBQYDK2VwAyEA1ltSnqtf3pGBuctuAYqHivCXsaRtKOVxavai7yin7ZE=" + ) + der_bytes = base64.b64decode(PUB_KEY_B64) + raw_pubkey = der_bytes[-32:] + module_url = self.config["limokaurl"] + module_path + async with aiohttp.ClientSession() as session: + async with session.get(module_url, timeout=10) as resp: + if resp.status != 200: + logger.error( + f"Failed to fetch module for verification: {module_url} (HTTP {resp.status})" + ) + await utils.answer( + message, self.strings["watcher_loader_missing"] + ) + return + module_bytes = await resp.read() + sha256 = hashlib.sha256(module_bytes).hexdigest() + public_key = ed25519.Ed25519PublicKey.from_public_bytes( + raw_pubkey + ) + signature = bytes.fromhex(signature_hex) + signed_payload = f"{module_path}|{sha256}".encode() + public_key.verify(signature, signed_payload) + except Exception as e: + logger.error(f"Signature verification failed for {module_path}: {e}") + await utils.answer(message, self.strings["watcher_signature_invalid"]) + return + loader_mod = self.lookup("loader") + if not loader_mod: + logger.error("Loader module not found") + await utils.answer(message, self.strings["watcher_loader_missing"]) + return + module_url = self.config["limokaurl"] + module_path + status = await loader_mod.download_and_install(module_url, None) + if getattr(loader_mod, "fully_loaded", False): + loader_mod.update_modules_in_db() + try: + await message.delete() + except Exception as e: + logger.error(f"Failed to delete message: {e}") + if status: + try: + bot_peer = await self.client.get_entity(self._service_bot_id) + await self.client.send_message( + bot_peer, f"#limoka:sucsess:{message.id}" + ) + except Exception as e: + logger.error(f"Failed to send success confirmation: {e}") + else: + logger.error(f"Installation failed with status: {status}") + try: + bot_peer = await self.client.get_entity(self._service_bot_id) + await self.client.send_message( + bot_peer, f"#limoka:failed:{message.id}" + ) + except Exception as e: + logger.error(f"Failed to send failure notification: {e}") + except Exception as e: + logger.exception(f"CRITICAL ERROR in secure_install_watcher: {e}") + try: + await utils.answer( + message, self.strings["watcher_critical"].format(error=str(e)[:100]) + ) + await asyncio.sleep(5) + await message.delete() + except Exception: + pass