diff --git a/Limoka.py b/Limoka.py index b787ef3..7df6a7a 100644 --- a/Limoka.py +++ b/Limoka.py @@ -33,7 +33,7 @@ from .. import utils, loader from ..types import InlineCall logger = logging.getLogger("Limoka") -__version__ = (1, 4, 4) +__version__ = (1, 4, 5) WEIGHTS = { "inline.token_obtainment": 15, @@ -54,17 +54,25 @@ def _get_lang_value(data: Dict[str, Any], lang: str) -> str: return data.get(lang, data.get("default", data.get("en", ""))) -class Search: - def __init__(self, query, ix): +class SearchIndex: + """Handles full-text search operations.""" + + def __init__(self, query: str, index): + """ + Args: + query: Search query string + index: Whoosh index instance + """ self.schema = Schema( title=TEXT(stored=True), path=ID(stored=True), content=TEXT(stored=True) ) self.query = query - self.ix = ix + self.index = index - def search_module(self): - with self.ix.searcher() as searcher: - parser = QueryParser("content", self.ix.schema, group=OrGroup.factory(0.8)) + def search(self) -> List[str]: + """Execute search and return list of module paths.""" + with self.index.searcher() as searcher: + parser = QueryParser("content", self.index.schema, group=OrGroup.factory(0.8)) query = parser.parse(self.query) wildcard_query = Wildcard("content", f"*{self.query}*") fuzzy_query = FuzzyTerm("content", self.query, maxdist=2, prefixlength=1) @@ -75,12 +83,257 @@ class Search: return [] -class LimokaAPI: - async def fetch_json(self, base_url, path): - url = f"{base_url}{path}" +class APIClient: + """Handles HTTP requests for fetching JSON data.""" + + async def fetch_json(self, base_url: str, path: str) -> Dict[str, Any]: + """ + Fetch JSON data from a URL. + + Args: + base_url: Base URL for the API + path: Path to append to base URL + + Returns: + Parsed JSON as dictionary + """ async with aiohttp.ClientSession() as session: - async with session.get(url) as response: - return json.loads(await response.text()) + async with session.get(f"{base_url}{path}") as resp: + if resp.status == 200: + return json.loads(await resp.text()) + return {} + + +class ModuleRepository: + """Manages module data and filtering.""" + + def __init__(self, modules: Dict[str, Any], repositories: Dict[str, Any]): + """ + Args: + modules: Dictionary of module definitions + repositories: Dictionary of repository metadata + """ + self.modules = modules + self.repositories = repositories + + def apply_newbie_filter(self, filter_enabled: bool) -> Dict[str, Any]: + """ + Filter out modules from repositories tagged as 'newbie'. + + Args: + filter_enabled: Whether filtering is enabled + + Returns: + Filtered modules dictionary + """ + if not filter_enabled: + return self.modules + + filtered = {} + for path, info in self.modules.items(): + repo_key = "/".join(path.split("/")[:2]) if "/" in path else path + repo = self.repositories.get(repo_key) + tags = repo.get("tags", []) if repo else [] + if "newbie" not in tags: + filtered[path] = info + return filtered + + def get_tags_for_module(self, module_path: str) -> List[str]: + """Get repository tags for a specific module.""" + repo_key = "/".join(module_path.split("/")[:2]) if "/" in module_path else module_path + for repo_url in self.repositories: + if repo_url.replace("https://github.com/", "") == repo_key: + return self.repositories[repo_url].get("tags", []) + return [] + + def get_all_categories(self) -> List[str]: + """Get all unique categories from modules.""" + categories = set() + for module_data in self.modules.values(): + categories.update(module_data.get("category", ["No category"])) + return sorted(categories) + + +class CommandFormatter: + """Formats module commands and metadata.""" + + def __init__(self, strings: Dict[str, Any], bot_username: str, prefix: str): + """ + Args: + strings: Localized strings dictionary + bot_username: Bot username for inline handlers + prefix: Command prefix + """ + self.strings = strings + self.bot_username = bot_username + self.prefix = prefix + + def format_commands(self, module_info: Dict[str, Any], lang: str = "en") -> List[str]: + """ + Format module commands and handlers into display strings. + + Args: + module_info: Module information dictionary + lang: Language code + + Returns: + List of formatted command strings + """ + commands = [] + for i, cmd in enumerate(module_info.get("new_commands", []), 1): + name = cmd.get("name", "") + desc_map = cmd.get("description", {}) + emoji = self._get_emoji_for_number(i) + desc = _get_lang_value(desc_map, lang) or self.strings["no_info"] + commands.append( + self.strings["command_template"].format( + prefix=self.prefix, + command=html.escape(name), + emoji=emoji, + description=html.escape(desc), + ) + ) + + for handler in module_info.get("inline_handlers", []): + name = handler.get("name", "") + desc_map = handler.get("description", {}) + desc = _get_lang_value(desc_map, lang) or self.strings["no_info"] + commands.append( + self.strings["inline_handler_template"].format( + inline_bot=self.bot_username, + command=html.escape(name), + description=html.escape(desc), + ) + ) + return commands + + def _get_emoji_for_number(self, num: int) -> str: + """Get emoji representation for a number.""" + if num <= 10: + return self.strings["emojis"].get(num, "") + emoji = "" + for digit in str(num): + emoji += self.strings["emojis"].get(int(digit), "") + return emoji + + +class ModuleContentBuilder: + """Builds formatted module content for display.""" + + def __init__(self, strings: Dict[str, Any], formatter: CommandFormatter, repository: ModuleRepository): + """ + Args: + strings: Localized strings dictionary + formatter: CommandFormatter instance + repository: ModuleRepository instance + """ + self.strings = strings + self.formatter = formatter + self.repository = repository + + def build_content( + self, + module_info: Dict[str, Any], + query: str, + filters: Dict[str, List[str]], + include_categories: bool = True, + module_path: Optional[str] = None, + lang: str = "en", + ) -> tuple: + """ + Build complete formatted module content. + + Returns: + Tuple of (header, body_pages, footer, categories_text) + """ + name = html.escape(module_info.get("name") or self.strings["no_info"]) + cls_doc = module_info.get("cls_doc", {}) + description = html.escape( + _get_lang_value(cls_doc, lang) + or _get_lang_value(module_info.get("description", ""), lang) + or self.strings["no_info"] + ) + dev_username = html.escape(module_info["meta"].get("developer") or "Unknown") + + if len(description) > 300: + description = description[:297] + "…" + + categories_text = self._build_categories_text(filters) + commands = self.formatter.format_commands(module_info, lang) + header = self._build_header(query, name, description, dev_username, module_path) + footer = self._build_footer(module_path) + body_pages = self._paginate_commands(commands) + + return header, body_pages, footer, categories_text + + def _build_header(self, query: str, name: str, description: str, dev_username: str, module_path: Optional[str]) -> str: + """Build message header with module info and tags.""" + tags_list = self.repository.get_tags_for_module(module_path) if module_path else [] + tags_text = ", ".join(self.strings["tags"].get(tag, tag) for tag in tags_list) + + header_template = self.strings["found_header"] + if not tags_text: + header_template = header_template.replace( + "
\n\n", + "" + ) + header_template = header_template.replace( + "🏷 Tags: {tags}
\n\n", + "" + ) + + return header_template.format( + query=html.escape(query), + name=name, + description=description, + username=dev_username, + tags=tags_text, + ) + + def _build_footer(self, module_path: Optional[str]) -> str: + """Build message footer with download command.""" + clean_path = (module_path or "").replace("\\", "/") + return self.strings["found_footer"].format( + url=html.escape(self.formatter.strings.get("limokaurl", "")), + module_path=html.escape(clean_path), + prefix=self.formatter.prefix, + ) + + def _build_categories_text(self, filters: Dict[str, List[str]]) -> str: + """Build categories text if any are selected.""" + categories = filters.get("category", []) + if categories: + return "\n" + self.strings["selected_categories"].format( + categories=", ".join(html.escape(c) for c in categories) + ) + return "" + + def _paginate_commands(self, commands: List[str], max_length: int = 500) -> List[str]: + """Split commands into pages based on length.""" + if not commands: + return [""] + + commands_text = "".join(commands) + if len(commands_text) <= max_length: + return [commands_text] + + pages = [] + current_page = [] + current_length = 0 + + for cmd in commands: + if current_length + len(cmd) > max_length: + if current_page: + pages.append("".join(current_page)) + current_page = [] + current_length = 0 + current_page.append(cmd) + current_length += len(cmd) + + if current_page: + pages.append("".join(current_page)) + + return pages or [""] @loader.tds @@ -91,53 +344,53 @@ class Limoka(loader.Module): "name": "Limoka", "wait": ( "🏷 Теги: {tags}
Just wait\n" - "" ), "found_header": ( - "🔍 A search is underway among {count} modules " + "🔍 A search is underway among {count} modules " "for the query:{query}\n" "{fact}
🔍 Found module {name} " + "🔍 Found module {name} " "by query: {query}\n\n" - "ℹ️ Description: {description}\n" - "🧑💻 Developer: {username}\n\n" - "\n\n" + "🏷 Tags: {tags}ℹ️ Description: {description}\n" + "🧑💻 Developer: {username}\n\n" + "\n\n" ), "found_body": ("{commands}"), "found_footer": ( - "🏷 Tags: {tags}\n" + "🪄 {prefix}dlm {url}{module_path}\n" ), "caption_short": ( - "🪄 {prefix}dlm {url}{module_path}" + "🔍 {safe_name}\n" - "ℹ️ Description: {safe_desc}\n" - "🧑💻 Dev: {dev_username}\n" - "🪄 {prefix}dlm {module_path}" ), "command_template": "{emoji}🔍 {safe_name}\n" + "ℹ️ Description: {safe_desc}\n" + "🧑💻 Dev: {dev_username}\n" + "🪄 {prefix}dlm {module_path}{prefix}{command}— {description}\n", "inline_handler_template": "{inline_bot} {command} — {description}\n", "emojis": { - 1: "1️⃣ ", - 2: "2️⃣ ", - 3: "3️⃣ ", - 4: "4️⃣ ", - 5: "5️⃣ ", - 6: "6️⃣ ", - 7: "7️⃣ ", - 8: "8️⃣ ", - 9: "9️⃣ ", + 1: "1️⃣ ", + 2: "2️⃣ ", + 3: "3️⃣ ", + 4: "4️⃣ ", + 5: "5️⃣ ", + 6: "6️⃣ ", + 7: "7️⃣ ", + 8: "8️⃣ ", + 9: "9️⃣ ", }, - "404": "", - "noargs": "❌ Not found by query: {query}", - "?": "❌ No args", + "404": "🔎 Request too short / not found", + "noargs": "❌ Not found by query: {query}", + "?": "❌ No args", "no_info": "🔎 Request too short / not foundNo information", "facts": [ - "", - "🛡 The limoka catalog is carefully moderated!", + "🚀 Limoka performance allows you to search for modules quickly!", + "🛡 The limoka catalog is carefully moderated!", ], "inline404": "🚀 Limoka performance allows you to search for modules quickly!Not found", "inline?": "Request too short / not found", "inlinenoargs": "Please, enter query", "history": ( - "" ), "body_page": "Commands", + "limokaurl": "https://raw.githubusercontent.com/MuRuLOSE/limoka/refs/heads/main/" } strings_ru = { "name": "Limoka", "wait": ( "🔎 Your search history:\n" + "" ), "filter_menu": "Choose filters", @@ -145,9 +398,9 @@ class Limoka(loader.Module): "apply_filters": "✅ Apply Filters", "clear_filters": "🗑 Clear Filters", "back_to_results": "🔙 Back to Results", - "empty_history": "🔎 Your search history:\n" "{history}", + "empty_history": "🔎 Your search history is empty!", "enter_query": "🔍 Enter new search query:", - "global_search": "🔎 Your search history is empty!", + "global_search": "🔍 Global search for {query} — found {count} modules", "change_query": "🔍 Change query", "no_modules": "🔍 Global search for {query} — found {count} modulesNo modules available.", "filter_title": "🏷 Filters", @@ -166,15 +419,15 @@ class Limoka(loader.Module): "inline_short_query": "❌ Query too short (min 2 chars)", "inline_switch_pm": "💬 Open in chat", "inline_switch_pm_text": "🔍 Results for: {query}", - "inline_start_message": "", + "inline_start_message": "🔍 Limoka Search\nType module name or keyword", "first_page": "🔍 Limoka Search\nType module name or keywordThis is the first page!", "last_page": "This is the last page!", "display_error": "Error displaying module. Please try again.", "error_occurred": "An error occurred. Please try again.", - "start_search_form": "", - "global_search_form": "🔍 Limoka Search\nEnter your query to search for modules:", - "history_cleared": "🔍 Global Search\nEnter your query to search ALL modules without filters:", - "invalid_history_arg": "🧹 Search history cleared!", + "start_search_form": "❌ Invalid argument for history command. Use:\n.lshistory- show history\n.lshistory clear- clear history", + "global_search_form": "🔍 Limoka Search\nEnter your query to search for modules:", + "history_cleared": "🔍 Global Search\nEnter your query to search ALL modules without filters:", + "invalid_history_arg": "🧹 Search history cleared!", "close": "❌ Close", "watcher_no_tag": "❌ Invalid argument for history command. Use:\n.lshistory- show history\n.lshistory clear- clear history❌ Invalid message format. No #limoka tag found.", "watcher_invalid_format": "❌ Invalid format. Expected: #limoka:path:signature", @@ -197,30 +450,31 @@ class Limoka(loader.Module): "If error persists again, report to developersПодождите\n" - "" ), "found_header": ( - "🔍 Идёт поиск среди {count} модулей по запросу:{query}\n" + "🔍 Идёт поиск среди {count} модулей по запросу:{query}\n" "{fact}🔍 Найден модуль {name} " + "\n\n" - "🔍 Найден модуль {name} " "по запросу: {query}\n" - "ℹ️ Описание: {description}\n\n" - "🧑💻 Разработчик: {username}\n\n" + "🏷 Теги: {tags}\n" + "ℹ️ Описание: {description}\n\n" + "🧑💻 Разработчик: {username}\n\n" ), "found_body": ("{commands}"), "found_footer": ( - "\n🏷 Теги: {tags}" + "\n🪄 {prefix}dlm {url}{module_path}" ), "caption_short": ( - "🪄 {prefix}dlm {url}{module_path}" + "🔍 {safe_name}\n" - "ℹ️ Описание: {safe_desc}\n" - "🧑💻 Разработчик: {dev_username}\n" - "🪄 {prefix}dlm {module_path}" ), "command_template": "🔍 {safe_name}\n" + "ℹ️ Описание: {safe_desc}\n" + "🧑💻 Разработчик: {dev_username}\n" + "🪄 {prefix}dlm {module_path}{emoji}\n", "inline_handler_template": "{inline_bot} {command} — {description}\n", @@ -236,15 +490,15 @@ class Limoka(loader.Module): 9: "{prefix}{command}— {description}9️⃣ ", 10: "🔟 " }, - "404": "", - "noargs": "❌ Не найдено по запросу: {query}", - "?": "❌ Нет аргументов", + "404": "🔎 Запрос слишком короткий / не найден", + "noargs": "❌ Не найдено по запросу: {query}", + "?": "❌ Нет аргументов", "no_info": "🔎 Запрос слишком короткий / не найденНет информации", "facts": [ - "", - "🛡 Каталог Limoka тщательно модерируется!", + "🚀 Limoka позволяет искать модули с невероятной скоростью!", + "🛡 Каталог Limoka тщательно модерируется!", ( - "🚀 Limoka позволяет искать модули с невероятной скоростью!🔎 Limoka имеет лучший поиск*!\n" + "" ), ], @@ -252,7 +506,7 @@ class Limoka(loader.Module): "inline?": "🔎 Limoka имеет лучший поиск*!\n" "* В сравнении с предыдущей версией LimokaЗапрос слишком короткий / не найден", "inlinenoargs": "Введите запрос", "history": ( - "🔎 История поиска:\n" + "" ), "filter_menu": "Выберите фильтры", @@ -260,9 +514,9 @@ class Limoka(loader.Module): "apply_filters": "✅ Применить фильтры", "clear_filters": "🗑 Очистить фильтры", "back_to_results": "🔙 Вернуться к результатам", - "empty_history": "🔎 История поиска:\n" "{history}", + "empty_history": "🔎 История поиска пуста!", "enter_query": "🔍 Введите новый поисковый запрос:", - "global_search": "🔎 История поиска пуста!", + "global_search": "🔍 Глобальный поиск по {query} — найдено {count} модулей", "change_query": "🔍 Изменить запрос", "no_modules": "🔍 Глобальный поиск по {query} — найдено {count} модулейМодули недоступны.", "filter_title": "🏷 Фильтры", @@ -281,15 +535,15 @@ class Limoka(loader.Module): "inline_short_query": "❌ Запрос слишком короткий (мин. 2 символа)", "inline_switch_pm": "💬 Открыть в чате", "inline_switch_pm_text": "🔍 Результаты для: {query}", - "inline_start_message": "", + "inline_start_message": "🔍 Limoka Поиск\nВведите название модуля или ключевое слово", "first_page": "🔍 Limoka Поиск\nВведите название модуля или ключевое словоЭто первая страница!", "last_page": "Это последняя страница!", "display_error": "Ошибка отображения модуля. Пожалуйста, попробуйте еще раз.", "error_occurred": "Произошла ошибка. Пожалуйста, попробуйте еще раз.", - "start_search_form": "", - "global_search_form": "🔍 Limoka Поиск\nВведите ваш запрос для поиска модулей:", - "history_cleared": "🔍 Глобальный Поиск\nВведите запрос для поиска ВСЕХ модулей без фильтров:", - "invalid_history_arg": "🧹 История поиска очищена!", + "start_search_form": "❌ Неверный аргумент для команды истории. Используйте:\n.lshistory- показать историю\n.lshistory clear- очистить историю", + "global_search_form": "🔍 Limoka Поиск\nВведите ваш запрос для поиска модулей:", + "history_cleared": "🔍 Глобальный Поиск\nВведите запрос для поиска ВСЕХ модулей без фильтров:", + "invalid_history_arg": "🧹 История поиска очищена!", "close": "❌ Закрыть", "watcher_no_tag": "❌ Неверный аргумент для команды истории. Используйте:\n.lshistory- показать историю\n.lshistory clear- очистить историю❌ Неверный формат сообщения. Тег #limoka не найден.", "watcher_invalid_format": "❌ Неверный формат. Ожидается: #limoka:path:signature", @@ -316,7 +570,7 @@ class Limoka(loader.Module): } def __init__(self): - self.api = LimokaAPI() + self.api = APIClient() self.config = loader.ModuleConfig( loader.ConfigValue( "limokaurl", @@ -342,15 +596,13 @@ class Limoka(loader.Module): self._bot_username = "limoka_bbot" self._base_url = self.config["limokaurl"] - # Search session states self.SEARCH_STATES = { - "no_banner": "no_banner", # 404 - No banner - "global_search": "global_search", # Global search - "not_found": "not_found", # Not found (module) - "filter_select": "filter_select", # Select categories (filters) + "no_banner": "no_banner", + "global_search": "global_search", + "not_found": "not_found", + "filter_select": "filter_select", } - # State banners self.state_banners = { "no_banner": "https://raw.githubusercontent.com/MuRuLOSE/hikka-assets/refs/heads/main/Limoka%20-%20No%20banner.png", "global_search": "https://raw.githubusercontent.com/MuRuLOSE/hikka-assets/main/Limoka%20-%20Global%20Search.png", @@ -359,28 +611,8 @@ class Limoka(loader.Module): } def _filter_newbies(self, modules: Dict[str, Any]) -> Dict[str, Any]: - """Filter out modules which belong to repositories tagged as 'newbie'. - Returns the original dict when the feature is disabled or repositories - metadata is not available. - """ - try: - if not self.config.get("filter_newbies_modules"): - return modules - except Exception: - return modules - - if not getattr(self, "repositories", None): - return modules - - filtered: Dict[str, Any] = {} - for path, info in modules.items(): - repo_key = "/".join(path.split("/")[:2]) if "/" in path else path - repo = self.repositories.get(repo_key) - tags = repo.get("tags", []) if repo else [] - if "newbie" in tags: - continue - filtered[path] = info - return filtered + """[DEPRECATED] Use ModuleRepository.apply_newbie_filter instead.""" + return self.repository.apply_newbie_filter(self.config.get("filter_newbies_modules", False)) def _create_search_session( self, @@ -415,9 +647,10 @@ class Limoka(loader.Module): return self.state_banners.get(state) async def client_ready(self, client, db): + """Initialize client and load data.""" self.client: TelegramClient = client self.db = db - self.api = LimokaAPI() + self.api = APIClient() self.schema = Schema( title=TEXT(stored=True), path=ID(stored=True), content=TEXT(stored=True) ) @@ -426,19 +659,19 @@ class Limoka(loader.Module): self.ix = create_in("limoka_search", self.schema) else: self.ix = open_dir("limoka_search") + self._history = self.pointer("history", []) - self.modules = (await self.api.fetch_json(self._base_url, "modules.json")).get( - "modules", {} - ) - raw = (await self.api.fetch_json(self._base_url, "repositories.json")).get( - "repositories", [] - ) - self.repositories = {repo["url"]: repo for repo in raw} - # Apply newbie filter if enabled - try: - self.modules = self._filter_newbies(self.modules) - except Exception: - pass + raw_modules = (await self.api.fetch_json(self._base_url, "modules.json")).get("modules", {}) + raw_repos = (await self.api.fetch_json(self._base_url, "repositories.json")).get("repositories", []) + + repositories = {repo["url"]: repo for repo in raw_repos} + self.repository = ModuleRepository(raw_modules, repositories) + self.modules = self.repository.apply_newbie_filter(self.config["filter_newbies_modules"]) + + self._userbot_bot_username = (await self.inline.bot.get_me()).username + self.formatter = CommandFormatter(self.strings, self._userbot_bot_username, self.get_prefix()) + self.content_builder = ModuleContentBuilder(self.strings, self.formatter, self.repository) + self._service_bot_id = (await self.client.get_entity(self._bot_username)).id loop = asyncio.get_running_loop() @@ -448,9 +681,7 @@ class Limoka(loader.Module): try: message = await self.client.get_messages(self._bot_username, limit=1) if not message: - message = await self.client.send_message( - self._bot_username, "/start" - ) + message = await self.client.send_message(self._bot_username, "/start") await message.delete() await self.client( functions.messages.DeleteHistoryRequest( @@ -460,41 +691,35 @@ class Limoka(loader.Module): revoke=True, ) ) - except YouBlockedUserError: logger.warning( - f"Please unblock {self._bot_username} to enable external installation feature. Or disable external_install_allowed in Limoka settings to get rid of this message." + f"Please unblock {self._bot_username} to enable external installation feature." ) - self._userbot_bot_username = (await self.inline.bot.get_me()).username @loader.loop(interval=3600) async def _update_modules_loop(self): - self.modules = await self.api.fetch_json(self._base_url, "modules.json") - # Re-apply newbie filter after modules refresh - try: - self.modules = self._filter_newbies(self.modules) - except Exception: - pass + """Periodically update modules list and rebuild index.""" + raw_modules = await self.api.fetch_json(self._base_url, "modules.json") + self.modules = self.repository.apply_newbie_filter( + self.config.get("filter_newbies_modules", False) + ) await self._update_index() async def _update_index(self): + """Rebuild full-text search index from modules.""" try: writer = self.ix.writer() - modules_to_index = self._filter_newbies(self.modules) - for module_path, module_data in modules_to_index.items(): + for module_path, module_data in self.modules.items(): writer.add_document( title=module_data["name"], path=module_path, content=module_data["name"] + " " + ( - module_data.get("description") - or "" + module_data.get("description", "") + " " + ( - (module_data.get("meta").get("developer") or "") - if module_data.get("meta") - else "" + (module_data.get("meta", {}).get("developer") or "") ) ), ) @@ -508,17 +733,11 @@ class Limoka(loader.Module): writer.commit() except LockError: folder = os.path.join(BASE_DIR, "limoka_search") - if os.path.commonpath([folder, BASE_DIR]) == BASE_DIR and os.path.exists(folder): shutil.rmtree(folder) await self._update_index() else: - logger.error( - ( - f"Skipping unsafe rmtree for {folder}. Please, report this to developer. ", - f"Debug info: folder={folder}, base_dir={BASE_DIR}, common_path={os.path.commonpath([folder, BASE_DIR])}, exists={os.path.exists(folder)}", - ) - ) + logger.error(f"Skipping unsafe rmtree for {folder}") async def _validate_url(self, url: str) -> Optional[str]: if not url or url in self._invalid_banners: @@ -578,37 +797,8 @@ class Limoka(loader.Module): return self.db.get(f"{userbot}.translations", "lang") def generate_commands(self, module_info, lang: str = "en"): - commands = [] - for i, cmd in enumerate(module_info.get("new_commands", []), 1): - name = cmd.get("name", "") - desc_map = cmd.get("description", {}) - if i <= 10: - emoji = self.strings["emojis"].get(i, "") - else: - emoji = "" - for digit in str(i): - emoji += self.strings["emojis"].get(int(digit), "") - desc = _get_lang_value(desc_map, lang) or self.strings["no_info"] - commands.append( - self.strings["command_template"].format( - prefix=self.get_prefix(), - command=html.escape(name), - emoji=emoji, - description=html.escape(desc), - ) - ) - for i, handler in enumerate(module_info.get("inline_handlers", []), 1): - name = handler.get("name", "") - desc_map = handler.get("description", {}) - desc = _get_lang_value(desc_map, lang) or self.strings["no_info"] - commands.append( - self.strings["inline_handler_template"].format( - inline_bot=self._userbot_bot_username, - command=html.escape(name), - description=html.escape(desc), - ) - ) - return commands + """[DEPRECATED] Use CommandFormatter.format_commands instead.""" + return self.formatter.format_commands(module_info, lang) def _format_module_content( self, @@ -619,81 +809,10 @@ class Limoka(loader.Module): module_path: Optional[str] = None, lang: str = "en", ) -> tuple: - name = html.escape(module_info.get("name") or self.strings["no_info"]) - cls_doc = module_info.get("cls_doc", {}) - description = html.escape( - _get_lang_value(cls_doc, lang) - or _get_lang_value(module_info.get("description", ""), lang) - or self.strings["no_info"] + """[DEPRECATED] Use ModuleContentBuilder.build_content instead.""" + return self.content_builder.build_content( + module_info, query, filters, include_categories, module_path, lang ) - dev_username = html.escape(module_info["meta"].get("developer") or "Unknown") - raw_path = ( - module_path if module_path is not None else module_info.get("path", "") - ) - clean_module_path = (raw_path or "").replace("\\", "/") - commands = self.generate_commands(module_info, lang) - categories_text = "" - if include_categories: - categories = filters.get("category", []) - if categories: - categories_text = "\n" + self.strings["selected_categories"].format( - categories=", ".join(html.escape(c) for c in categories) - ) - if len(description) > 300: - description = description[:297] + "…" - repo_key = ( - "/".join(module_path.split("/")[:2]) if "/" in module_path else module_path - ) - tags_list = [] - for x in self.repositories: - if x.replace("https://github.com/", "") == repo_key: - tags_list = self.repositories.get(x, {}).get("tags", []) - break - tags_text = ", ".join(self.strings["tags"].get(tag, tag) for tag in tags_list) - - header_template = self.strings["found_header"] - if not tags_text: - header_template = header_template.replace( - "\n\n", - "" - ) - header_template = header_template.replace( - "🏷 Tags: {tags}\n\n", - "" - ) - - header = header_template.format( - query=html.escape(query), - name=name, - description=description, - username=dev_username, - tags=tags_text, - ) - commands_text = "".join(commands) - if len(commands_text) <= 500: - body_pages = [commands_text] if commands_text else [""] - else: - body_pages = [] - current_page = [] - current_length = 0 - for cmd in commands: - if current_length + len(cmd) > 500: - if current_page: - body_pages.append("".join(current_page)) - current_page = [] - current_length = 0 - current_page.append(cmd) - current_length += len(cmd) - if current_page: - body_pages.append("".join(current_page)) - if not body_pages: - body_pages = [""] - footer = self.strings["found_footer"].format( - url=html.escape(self.config["limokaurl"]), - module_path=html.escape(clean_module_path), - prefix=html.escape(self.get_prefix()), - ) - return header, body_pages, footer, categories_text def _build_navigation_markup(self, session: Dict[str, Any]) -> list: result = session["results"] @@ -849,17 +968,21 @@ class Limoka(loader.Module): logger.error("message_or_call is None in _safe_display") return if isinstance(message_or_call, Message): + # WORKAROUND: Telegram doesn't show premium emojis in first call, + # until it's edited. Firstly sending something, than fixing. if photo is not None: - await self.inline.form( - text=text, + msg = await self.inline.form( + text="🍋", message=message_or_call, - reply_markup=markup, + reply_markup=[[{"text": "🍋", "action": "close"}]], photo=photo, ) + await msg.edit(text=text, reply_markup=markup, photo=photo) else: - await self.inline.form( - text=text, message=message_or_call, reply_markup=markup + msg = await self.inline.form( + text="🍋", message=message_or_call, reply_markup=[[{"text": "🍋", "action": "close"}]] ) + await msg.edit(text=text, reply_markup=markup) else: if photo is not None: await message_or_call.edit( @@ -998,13 +1121,11 @@ class Limoka(loader.Module): ) async def _select_category(self, call: InlineCall, session: Dict[str, Any]): + """Display category selection menu.""" query = session["query"] current_filters = session["filters"] - all_categories = set() - for module_data in self.modules.values(): - all_categories.update(module_data.get("category", ["No category"])) - categories = sorted(all_categories) + categories = self.repository.get_all_categories() if not categories: await call.edit( self.strings["no_categories"], @@ -1031,8 +1152,6 @@ class Limoka(loader.Module): if cat in selected_categories: button_text = "✅ " + button_text - # Create new session with updated filters - new_session = session.copy() row.append( { "text": button_text, @@ -1089,12 +1208,13 @@ class Limoka(loader.Module): async def _show_results( self, call: InlineCall, session: Dict[str, Any], from_filters: bool = False ): + """Display search results with optional category filtering.""" query = session["query"] filters = session["filters"] - searcher = Search(query.lower(), self.ix) + searcher = SearchIndex(query.lower(), self.ix) try: - result = searcher.search_module() + result = searcher.search() except Exception: await call.edit(self.strings["?"], reply_markup=[]) return @@ -1154,7 +1274,6 @@ class Limoka(loader.Module): module_path = filtered_result[0] module_info = self.modules[module_path] - # Create session for displaying module display_session = self._create_search_session( state=self.SEARCH_STATES["global_search"], query=query, @@ -1205,9 +1324,8 @@ class Limoka(loader.Module): ], ) return - searcher = Search(query.lower(), self.ix) try: - result = searcher.search_module() + result = SearchIndex(query.lower(), self.ix).search() except Exception: await call.edit( self.strings["?"], @@ -1289,9 +1407,8 @@ class Limoka(loader.Module): async def _show_global_results(self, call: InlineCall, session: Dict[str, Any]): query = session["query"] - searcher = Search(query.lower(), self.ix) try: - result = searcher.search_module() + result = SearchIndex(query.lower(), self.ix).search() except Exception: await call.edit(self.strings["?"], reply_markup=[]) return @@ -1407,11 +1524,35 @@ class Limoka(loader.Module): markup.append( [{"text": self.strings.get("close", "❌ Close"), "action": "close", "style": "danger"}] ) - await self.inline.form( - text=self.strings["start_search_form"], + # WORKAROUND: Telegram doesnt show emojis in inline forms as expected, + # until the form is edited. Sending with lemon, then fixing. + workaround_markup = [ + [ + { + "text": "🍋 Enter query", + "input": "Enter query", + "handler": self._enter_query_handler, + } + ], + [ + { + "text": "🍋 Results", + "callback": self._show_global_form, + "args": (message,), + } + ], + ] + workaround_markup.append( + [{"text": "🍋 Close", "action": "close", "style": "danger"}] + ) + msg = await self.inline.form( + text="🍋 Limoka\n🍋 Enter query", message=message, + reply_markup=workaround_markup, + ) + await msg.edit( + text=self.strings["start_search_form"], reply_markup=markup, - photo=self._get_banner_for_state("global_search"), ) return history = self.get("history", []) @@ -1427,9 +1568,8 @@ class Limoka(loader.Module): query=args, ), ) - searcher = Search(args.lower(), self.ix) try: - result = searcher.search_module() + result = SearchIndex(args.lower(), self.ix).search() except Exception: return await utils.answer(message, self.strings["?"]) if not result: @@ -1505,9 +1645,8 @@ class Limoka(loader.Module): ], ) return - searcher = Search(query.lower(), self.ix) try: - result = searcher.search_module() + result = SearchIndex(query.lower(), self.ix).search() except Exception: await call.edit( self.strings["?"],🏷 Теги: {tags}