From 789df8333877825c929d1ebdb28400f36ef2162a Mon Sep 17 00:00:00 2001 From: Macsim Date: Sat, 22 Nov 2025 13:33:52 +0300 Subject: [PATCH] too much changes idk really, just some bugfixes and QOL features --- Limoka.py | 1086 +++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 757 insertions(+), 329 deletions(-) diff --git a/Limoka.py b/Limoka.py index ac7d4d0..fa5112b 100644 --- a/Limoka.py +++ b/Limoka.py @@ -1,6 +1,31 @@ # meta developer: @limokanews # requires: whoosh +# Limoka search module. + +# This module loads a remote `modules.json`, builds a Whoosh index and +# exposes inline and chat commands to search and display module +# information. It handles remote banner validation and falls back to an +# external PNG hosted in the repository when a module banner is missing. +# The fallback is provided as a URL (`self._fallback_banner_url`). Depending +# on the client library the `photo` parameter may accept a URL, a file +# path or a file-like object; this implementation prefers using the +# external URL for the fallback. + +# Note: Expected `modules.json` record format: + +# { +# "path/to/module.py": { +# "name": "ModuleName", +# "description": "Short description", +# "meta": {"banner": "https://.../image.png", "developer": "@dev"}, +# "commands": [{"cmd1": "desc1"}, {"cmd2": "desc2"}], +# "category": ["fun", "tools"] +# } +# } +# Whoosh index in `userbotFolder/limoka_search/index`. + + from whoosh.index import create_in, open_dir from whoosh.fields import Schema, TEXT, ID from whoosh.qparser import QueryParser, OrGroup @@ -12,34 +37,31 @@ import logging import os import html import json -import re -from datetime import datetime + import asyncio from typing import Union, List, Dict, Any, Optional from telethon.types import Message from telethon.errors.rpcerrorlist import WebpageMediaEmptyError -from telethon import events + try: from aiogram.utils.exceptions import BadRequest except ImportError: from aiogram.exceptions import TelegramBadRequest as BadRequest from .. import utils, loader -from ..types import InlineQuery, InlineCall +from ..types import InlineCall logger = logging.getLogger("Limoka") -__version__ = (1, 2, 1) +__version__ = (1, 2, 3) class Search: def __init__(self, query, ix): self.schema = Schema( - title=TEXT(stored=True), - path=ID(stored=True), - content=TEXT(stored=True) + title=TEXT(stored=True), path=ID(stored=True), content=TEXT(stored=True) ) self.query = query self.ix = ix @@ -67,7 +89,7 @@ class LimokaAPI: @loader.tds class Limoka(loader.Module): - """Hikka modules are now in one place with easy searching!""" + """Modules are now in one place with easy searching!""" strings = { "name": "Limoka", @@ -84,6 +106,12 @@ class Limoka(loader.Module): "{commands}\n" "🪄 {prefix}dlm {url}{module_path}" ), + "caption_short": ( + "🔍 {safe_name}\n" + "ℹ️ Description: {safe_desc}\n" + "🧑‍💻 Dev: {dev_username}\n\n" + "🪄 {prefix}dlm {module_path}" + ), "command_template": "{emoji} {prefix}{command} — {description}\n", "emojis": { 1: "1️⃣", @@ -138,6 +166,15 @@ class Limoka(loader.Module): "inline_switch_pm": "💬 Open in chat", "inline_switch_pm_text": "🔍 Results for: {query}", "inline_start_message": "🔍 Limoka Search\n\nType module name or keyword", + "first_page": "This is the first page!", + "last_page": "This is the last page!", + "display_error": "Error displaying module. Please try again.", + "error_occurred": "An error occurred. Please try again.", + "start_search_form": "🔍 Limoka Search\n\nEnter your query to search for Hikka modules:", + "global_search_form": "🔍 Global Search\n\nEnter your query to search ALL modules without filters:", + "history_cleared": "🧹 Search history cleared!", + "invalid_history_arg": " Invalid argument for history command. Use:\n.lshistory - show history\n.lshistory clear - clear history", + "close": "❌ Close", } strings_ru = { @@ -155,6 +192,12 @@ class Limoka(loader.Module): "{commands}\n" "🪄 {prefix}dlm {url}{module_path}" ), + "caption_short": ( + "🔍 {safe_name}\n" + "ℹ️ Описание: {safe_desc}\n" + "🧑‍💻 Разработчик: {dev_username}\n\n" + "🪄 {prefix}dlm {module_path}" + ), "command_template": "{emoji} {prefix}{command} — {description}\n", "emojis": { 1: "1️⃣", @@ -209,6 +252,16 @@ class Limoka(loader.Module): "inline_switch_pm": "💬 Открыть в чате", "inline_switch_pm_text": "🔍 Результаты для: {query}", "inline_start_message": "🔍 Limoka Поиск\n\nВведите название модуля или ключевое слово", + "first_page": "Это первая страница!", + "last_page": "Это последняя страница!", + "display_error": "Ошибка отображения модуля. Пожалуйста, попробуйте еще раз.", + "error_occurred": "Произошла ошибка. Пожалуйста, попробуйте еще раз.", + "start_search_form": "🔍 Limoka Поиск\n\nВведите ваш запрос для поиска модулей Hikka:", + "global_search_form": "🔍 Глобальный Поиск\n\nВведите запрос для поиска ВСЕХ модулей без фильтров:", + "history_cleared": "🧹 История поиска очищена!", + "invalid_history_arg": " Неверный аргумент для команды истории. Используйте:\n.lshistory - показать историю\n.lshistory clear - очистить историю", + "close": "❌ Закрыть", + "_cls_doc": "Модули теперь в одном месте с простым и удобным поиском!", } def __init__(self): @@ -223,16 +276,16 @@ class Limoka(loader.Module): ) self.name = self.strings["name"] self._invalid_banners = set() - self.fallback_banner = "https://github.com/MuRuLOSE/limoka/raw/main/assets/limoka404.png" + # Also keep a convenient external fallback URL for plain search display + # (used when no valid banner is available and no filters are applied). + self._fallback_banner_url = "https://raw.githubusercontent.com/MuRuLOSE/limoka/refs/heads/main/assets/limoka404.png" async def client_ready(self, client, db): self.client = client self.db = db self.api = LimokaAPI() self.schema = Schema( - title=TEXT(stored=True), - path=ID(stored=True), - content=TEXT(stored=True) + title=TEXT(stored=True), path=ID(stored=True), content=TEXT(stored=True) ) os.makedirs("limoka_search", exist_ok=True) @@ -242,6 +295,7 @@ class Limoka(loader.Module): self.ix = open_dir("limoka_search") self._history = self.pointer("history", []) + self.modules = await self.api.get_all_modules( f"{self.config['limokaurl']}modules.json" ) @@ -253,34 +307,51 @@ class Limoka(loader.Module): writer.add_document( title=module_data["name"], path=module_path, - content=module_data["name"] + " " + (module_data["description"] or "") + content=module_data["name"] + " " + (module_data["description"] or ""), ) for func in module_data["commands"]: for command, description in func.items(): writer.add_document( title=module_data["name"], path=module_path, - content=f"{command} {description}" + content=f"{command} {description}", ) writer.commit() - async def _validate_url(self, url: str) -> str: + async def _validate_url(self, url: str) -> Optional[str]: + """Validate a remote URL points to an image. + + Args: + url: Remote URL to validate. + + Returns: + The same URL if it points to an image and is reachable, otherwise + ``None``. + + Side effects: + Adds invalid URLs to ``self._invalid_banners`` to avoid repeated + checks. + """ + # Return the url if valid, otherwise None. Do not return or use + # a global fallback here; fallback handling is done by the caller + # based on display context. if not url or url in self._invalid_banners: - return self.fallback_banner + return None try: async with aiohttp.ClientSession() as session: async with session.head(url, timeout=5) as response: if response.status != 200: self._invalid_banners.add(url) - return self.fallback_banner + return None content_type = response.headers.get("Content-Type", "").lower() if not content_type.startswith("image/"): self._invalid_banners.add(url) - return self.fallback_banner + return None return url except (aiohttp.ClientError, asyncio.TimeoutError): - self._invalid_banners.add(url) - return self.fallback_banner + if url: + self._invalid_banners.add(url) + return None def generate_commands(self, module_info): commands = [] @@ -290,7 +361,7 @@ class Limoka(loader.Module): break for command, description in func.items(): emoji = self.strings["emojis"].get(i, "") - desc = (description or self.strings["no_info"]) + desc = description or self.strings["no_info"] if len(desc) > 150: desc = desc[:147] + "…" commands.append( @@ -303,187 +374,36 @@ class Limoka(loader.Module): ) return commands[:5] - async def _display_filter_menu(self, call: InlineCall, query: str, current_filters: dict): - categories = current_filters.get("category", []) - filters_text = self.strings["selected_categories"].format( - categories=', '.join(categories) if categories else self.strings["no_category"] - ) - - markup = [ - [ - {"text": self.strings["filter_cat"], "callback": self._select_category, "args": (query, current_filters)}, - ], - [ - {"text": self.strings["apply_filters"], "callback": self._apply_filters, "args": (query, current_filters)}, - {"text": self.strings["clear_filters"], "callback": self._clear_filters, "args": (query,)}, - ], - [ - {"text": self.strings["back_to_results"], "callback": self._show_results, "args": (query, {})}, - ] - ] - - text = self.strings["filter_menu"].format(query=query) + f"\n\n{filters_text}" - await call.edit(text, reply_markup=markup) - - async def _select_category(self, call: InlineCall, query: str, current_filters: dict): - all_categories = set() - for module_data in self.modules.values(): - all_categories.update(module_data.get("category", ["No category"])) - categories = sorted(all_categories) - - if not categories: - await call.edit(self.strings["no_categories"], reply_markup=[ - [{"text": self.strings["back"], "callback": self._display_filter_menu, "args": (query, current_filters)}] - ]) - return - - selected_categories = current_filters.get("category", []) - buttons = [] - row = [] - - for i, cat in enumerate(categories): - button_text = (self.strings["category"].format(category=cat) if "category" in self.strings else f"📁 {cat}") - if cat in selected_categories: - button_text = "✅ " + button_text - - row.append({ - "text": button_text, - "callback": self._toggle_category, - "args": (query, current_filters, cat) - }) - - if (i + 1) % 3 == 0 or i == len(categories) - 1: - buttons.append(row) - row = [] - - buttons.append([ - {"text": self.strings["back"], "callback": self._display_filter_menu, "args": (query, current_filters)} - ]) - - text = self.strings["select_category"].format(query=query) - await call.edit(text, reply_markup=buttons) - - async def _toggle_category(self, call: InlineCall, query: str, current_filters: dict, category: str): - new_filters = current_filters.copy() - selected_categories = new_filters.get("category", []) - - if category in selected_categories: - selected_categories.remove(category) - else: - selected_categories.append(category) - - if selected_categories: - new_filters["category"] = selected_categories - else: - new_filters.pop("category", None) - - await self._select_category(call, query, new_filters) - - async def _apply_filters(self, call: InlineCall, query: str, filters: dict): - await self._show_results(call, query, filters, from_filters=True) - - async def _clear_filters(self, call: InlineCall, query: str): - await self._show_results(call, query, {}, from_filters=True) - - async def _show_results(self, call: InlineCall, query: str, filters: dict, from_filters: bool = False): - searcher = Search(query.lower(), self.ix) - try: - result = searcher.search_module() - except Exception: - await call.edit(self.strings["?"], reply_markup=[]) - return - - if not result: - markup = [[{"text": self.strings["back"], "callback": self._display_filter_menu, "args": (query, filters)}]] if from_filters else [] - await call.edit(self.strings["404"].format(query=query), reply_markup=markup) - return - - if filters.get("category"): - filtered_result = [ - path for path in result - if any(cat in self.modules.get(path, {}).get("category", ["No category"]) for cat in filters["category"]) - ] - else: - filtered_result = result - - if not filtered_result: - markup = [[{"text": self.strings["back"], "callback": self._display_filter_menu, "args": (query, filters)}]] if from_filters else [] - await call.edit(self.strings["404"].format(query=query), reply_markup=markup) - return - - module_path = filtered_result[0] - module_info = self.modules[module_path] - await self._display_module(call, module_info, module_path, query, filtered_result, 0, filters) - - async def _enter_query_handler(self, call: InlineCall, query: str, *args, **kwargs): - """Handler for inline query input""" - if len(query) <= 1: - await call.edit(self.strings["?"], reply_markup=[[{"text": "🔄 " + self.strings["change_query"], "callback": self._enter_query}]]) - return - searcher = Search(query.lower(), self.ix) - try: - result = searcher.search_module() - except Exception: - await call.edit(self.strings["?"], reply_markup=[[{"text": "🔄 " + self.strings["change_query"], "callback": self._enter_query}]]) - return - - if not result: - await call.edit( - self.strings["404"].format(query=query), - reply_markup=[[{"text": "🔄 " + self.strings["change_query"], "callback": self._enter_query}]] - ) - return - - module_path = result[0] - module_info = self.modules[module_path] - await self._display_module(call, module_info, module_path, query, result, 0, {}) - - async def _enter_query(self, call: InlineCall): - """Show input form for new query""" - markup = [ - [ - { - "text": "✍️ " + self.strings["enter_query"], - "input": self.strings["enter_query"], - "handler": self._enter_query_handler, - } - ], - [ - { - "text": self.strings["back_to_results"], - "callback": self._inline_void, - } - ] - ] - - await call.edit( - self.strings["enter_query"], - reply_markup=markup - ) - - async def _display_module( + def _format_module_content( self, - message_or_call: Union[Message, InlineCall, None], module_info: Dict[str, Any], - module_path: str, query: str, - result: List[Any], - index: int, - filters: Dict[str, List[str]] - ): + filters: Dict[str, List[str]], + include_categories: bool = True, + module_path: Optional[str] = None, + ) -> tuple: + """Formats the module content for display.""" name = html.escape(module_info.get("name") or self.strings["no_info"]) - description = html.escape(module_info.get("description") or self.strings["no_info"]) - dev_username = html.escape(module_info["meta"].get("developer", "Unknown")) - - clean_module_path = module_path.replace("\\", "/") - banner_url = await self._validate_url(module_info["meta"].get("banner")) - commands = self.generate_commands(module_info) - page = index + 1 - - categories = filters.get("category", []) - filters_text = self.strings["selected_categories"].format( - categories=', '.join(html.escape(c) for c in categories) if categories else self.strings["no_category"] + description = html.escape( + module_info.get("description") or self.strings["no_info"] ) + dev_username = html.escape(module_info["meta"].get("developer", "Unknown")) + + # Prefer explicit module_path argument (caller provides the key), + # otherwise fall back to module_info['path'] if present. + raw_path = ( + module_path if module_path is not None else module_info.get("path", "") + ) + clean_module_path = (raw_path or "").replace("\\", "/") + commands = self.generate_commands(module_info) + + categories_text = "" + if include_categories: + categories = filters.get("category", []) + if categories: + categories_text = "\n\n" + self.strings["selected_categories"].format( + categories=", ".join(html.escape(c) for c in categories) + ) if len(description) > 300: description = description[:297] + "…" @@ -499,25 +419,33 @@ class Limoka(loader.Module): module_path=html.escape(clean_module_path), ) - full_message = core_message[:4096] + full_message = core_message[:4096] + categories_text[:100] caption_message = full_message if len(caption_message) > 1024: - safe_query = html.escape(query[:30]) + ("..." if len(query) > 30 else "") safe_name = name[:40] + ("..." if len(name) > 40 else "") safe_desc = description[:100] + ("…" if len(description) > 100 else "") - - caption_message = ( - f"🔍 {safe_name}\n" - f"ℹ️ Desc: {safe_desc}\n" - f"🧑‍💻 Dev: {dev_username}\n\n" - f"🪄 {self.get_prefix()}dlm {self.config['limokaurl']}{html.escape(clean_module_path)}" + + caption_message = self.strings["caption_short"].format( + safe_name=safe_name, + safe_desc=safe_desc, + dev_username=dev_username, + prefix=self.get_prefix(), + module_path=html.escape(self.config["limokaurl"] + clean_module_path), )[:1024] - static_suffix = f"\n{filters_text}" - if len(caption_message) + len(static_suffix) <= 1024: - caption_message += static_suffix + if categories_text: + remaining_space = 1024 - len(caption_message) + if remaining_space > 0: + caption_message += categories_text[:remaining_space] + return caption_message, full_message + + def _build_navigation_markup( + self, result: List[str], index: int, query: str, filters: Dict[str, List[str]] + ) -> list: + """Create navigation markup for inline results.""" + page = index + 1 markup = [ [ { @@ -528,78 +456,485 @@ class Limoka(loader.Module): {"text": f"{page}/{len(result)}", "callback": self._inline_void}, { "text": "⏩" if index + 1 < len(result) else "🚫", - "callback": self._next_page if index + 1 < len(result) else self._inline_void, - "args": (result, index, query, filters) if index + 1 < len(result) else (), + "callback": ( + self._next_page + if index + 1 < len(result) + else self._inline_void + ), + "args": ( + (result, index, query, filters) + if index + 1 < len(result) + else () + ), }, ], [ - {"text": "🔍 " + self.strings["filter_menu"].split(":")[0], "callback": self._display_filter_menu, "args": (query, filters)}, - {"text": "🔄 " + self.strings["change_query"], "callback": self._enter_query}, + { + "text": "🔍 " + self.strings["filter_menu"].split(":")[0], + "callback": self._display_filter_menu, + "args": (query, filters), + }, + { + "text": "🔄 " + self.strings["change_query"], + "callback": self._enter_query, + }, ], [ - {"text": self.strings["global_button"], "callback": self._show_global_results, "args": (query,)}, - ] + { + "text": self.strings["global_button"], + "callback": self._show_global_results, + "args": (query,), + }, + ], ] + # Add a universal close button to the navigation markup + markup.append( + [{"text": self.strings.get("close", "❌ Close"), "action": "close"}] + ) + return markup + async def _safe_display( + self, + message_or_call: Union[Message, InlineCall], + text: str, + markup: list, + photo: Optional[Any] = None, + ): + """Safely display module information, handling potential errors.""" try: if message_or_call is None: - logger.error("message_or_call is None in _display_module") + logger.error("message_or_call is None in _safe_display") return if isinstance(message_or_call, Message): - if banner_url and banner_url != self.fallback_banner: + if photo is not None: + # photo can be a URL/str, file path or a file-like object await self.inline.form( - text=caption_message, + text=text, message=message_or_call, reply_markup=markup, - photo=banner_url + photo=photo, ) else: await self.inline.form( - text=full_message, - message=message_or_call, - reply_markup=markup + text=text, message=message_or_call, reply_markup=markup ) else: - if banner_url and banner_url != self.fallback_banner: + if photo is not None: await message_or_call.edit( - text=caption_message, - reply_markup=markup, - photo=banner_url + text=text, reply_markup=markup, photo=photo ) else: - await message_or_call.edit( - text=full_message, - reply_markup=markup - ) + await message_or_call.edit(text=text, reply_markup=markup) except (BadRequest, WebpageMediaEmptyError) as e: + logger.exception(f"Error in _safe_display: {e}") + if isinstance(message_or_call, Message): + await utils.answer(message_or_call, self.strings["display_error"]) + elif hasattr(message_or_call, "edit"): + await message_or_call.edit(self.strings["display_error"]) + + async def _display_module( + self, + message_or_call: Union[Message, InlineCall], + module_info: Dict[str, Any], + module_path: str, + query: str, + result: List[str], + index: int, + filters: Dict[str, List[str]], + ): + """Display module information with banner and formatted content. + + Args: + message_or_call: Message or InlineCall object where the module + will be displayed. + module_info: Dictionary with module metadata (name, description, + meta.banner, commands, category). + module_path: Path key of the module in `self.modules`. + query: Original search query string. + result: Full list of matched module paths. + index: Index of the current module in `result`. + filters: Active filters (e.g., categories). If ``filters`` is + empty and no valid remote banner exists, the external fallback + URL (`self._fallback_banner_url`) will be used. + + Notes: + The method attempts to validate a remote banner URL via + :meth:`_validate_url`. If validation succeeds the remote URL is + passed to the messaging client. If validation fails and ``filters`` + is empty, the external fallback URL (`self._fallback_banner_url`) + will be used. Behavior may vary depending on the messaging client + used (Telethon/aiogram/etc.). + """ + try: + banner_url = await self._validate_url(module_info["meta"].get("banner")) + + caption_message, full_message = self._format_module_content( + module_info, + query, + filters, + include_categories=True, + module_path=module_path, + ) + + markup = self._build_navigation_markup(result, index, query, filters) + + # Determine which banner to use. If banner_url is valid, use it. + # If no valid banner and no filters are applied (normal search display), + # create an in-memory BytesIO from the embedded base64 and use it. + banner_to_use = None + if banner_url: + banner_to_use = banner_url + else: + if not filters: + # Use external fallback URL for plain search display. + banner_to_use = getattr(self, "_fallback_banner_url", None) + + display_text = caption_message if banner_to_use else full_message + await self._safe_display( + message_or_call, display_text, markup, banner_to_use + ) + + except Exception as e: logger.exception(f"Error in _display_module: {e}") - if message_or_call is None: - return - - try: - if isinstance(message_or_call, Message): - target_message = message_or_call - elif hasattr(message_or_call, 'message') and isinstance(message_or_call.message, Message): - target_message = message_or_call.message - else: - target_message = await self.client.send_message( - self._me, - "Error occurred, please try again." - ) - - await self.inline.form( - text=full_message[:4096], - message=target_message, - reply_markup=markup + if isinstance(message_or_call, Message): + await utils.answer(message_or_call, self.strings["error_occurred"]) + elif hasattr(message_or_call, "edit"): + await message_or_call.edit(self.strings["error_occurred"]) + + async def _display_filter_menu( + self, call: InlineCall, query: str, current_filters: dict + ): + categories = current_filters.get("category", []) + filters_text = self.strings["selected_categories"].format( + categories=( + ", ".join(categories) if categories else self.strings["no_category"] + ) + ) + + markup = [ + [ + { + "text": self.strings["filter_cat"], + "callback": self._select_category, + "args": (query, current_filters), + }, + ], + [ + { + "text": self.strings["apply_filters"], + "callback": self._apply_filters, + "args": (query, current_filters), + }, + { + "text": self.strings["clear_filters"], + "callback": self._clear_filters, + "args": (query,), + }, + ], + [ + { + "text": self.strings["back_to_results"], + "callback": self._show_results, + "args": (query, {}, True), + }, + ], + [{"text": self.strings.get("close", "❌ Close"), "action": "close"}], + ] + + text = self.strings["filter_menu"].format(query=query) + f"\n\n{filters_text}" + await call.edit(text, reply_markup=markup) + + async def _select_category( + self, call: InlineCall, query: str, current_filters: dict + ): + all_categories = set() + for module_data in self.modules.values(): + all_categories.update(module_data.get("category", ["No category"])) + categories = sorted(all_categories) + + if not categories: + await call.edit( + self.strings["no_categories"], + reply_markup=[ + [ + { + "text": self.strings["back"], + "callback": self._display_filter_menu, + "args": (query, current_filters), + } + ] + ], + ) + return + + selected_categories = current_filters.get("category", []) + buttons = [] + row = [] + + for i, cat in enumerate(categories): + button_text = ( + self.strings["category"].format(category=cat) + if "category" in self.strings + else f"📁 {cat}" + ) + if cat in selected_categories: + button_text = "✅ " + button_text + + row.append( + { + "text": button_text, + "callback": self._toggle_category, + "args": (query, current_filters, cat), + } + ) + + if (i + 1) % 3 == 0 or i == len(categories) - 1: + buttons.append(row) + row = [] + + buttons.append( + [ + { + "text": self.strings["back"], + "callback": self._display_filter_menu, + "args": (query, current_filters), + } + ] + ) + + # Add close button to category selector + buttons.append( + [{"text": self.strings.get("close", "❌ Close"), "action": "close"}] + ) + + text = self.strings["select_category"].format(query=query) + await call.edit(text, reply_markup=buttons) + + async def _toggle_category( + self, call: InlineCall, query: str, current_filters: dict, category: str + ): + new_filters = current_filters.copy() + selected_categories = new_filters.get("category", []) + + if category in selected_categories: + selected_categories.remove(category) + else: + selected_categories.append(category) + + if selected_categories: + new_filters["category"] = selected_categories + else: + new_filters.pop("category", None) + + await self._select_category(call, query, new_filters) + + async def _apply_filters(self, call: InlineCall, query: str, filters: dict): + await self._show_results(call, query, filters, from_filters=True) + + async def _clear_filters(self, call: InlineCall, query: str): + await self._show_results(call, query, {}, from_filters=True) + + async def _show_results( + self, call: InlineCall, query: str, filters: dict, from_filters: bool = False + ): + searcher = Search(query.lower(), self.ix) + try: + result = searcher.search_module() + except Exception: + await call.edit(self.strings["?"], reply_markup=[]) + return + + if not result: + markup = ( + [ + [ + { + "text": self.strings["back"], + "callback": self._display_filter_menu, + "args": (query, filters), + } + ] + ] + if from_filters + else [] + ) + # Always provide a close button on empty-result screens + markup.append( + [{"text": self.strings.get("close", "❌ Close"), "action": "close"}] + ) + await call.edit( + self.strings["404"].format(query=query), reply_markup=markup + ) + return + + if filters.get("category"): + filtered_result = [ + path + for path in result + if any( + cat in self.modules.get(path, {}).get("category", ["No category"]) + for cat in filters["category"] ) - except Exception as inner_e: - logger.exception(f"Secondary error in error handling: {inner_e}") - try: - if isinstance(message_or_call, Message): - await utils.answer(message_or_call, "Error displaying module. Please try again.") - except Exception: - pass + ] + else: + filtered_result = result + + if not filtered_result: + markup = ( + [ + [ + { + "text": self.strings["back"], + "callback": self._display_filter_menu, + "args": (query, filters), + } + ] + ] + if from_filters + else [] + ) + # Add close button when filtered results are empty + markup.append( + [{"text": self.strings.get("close", "❌ Close"), "action": "close"}] + ) + await call.edit( + self.strings["404"].format(query=query), reply_markup=markup + ) + return + + module_path = filtered_result[0] + module_info = self.modules[module_path] + await self._display_module( + call, module_info, module_path, query, filtered_result, 0, filters + ) + + async def _enter_query_handler( + self, call_or_query, query: Optional[str] = None, *args, **kwargs + ): + """Handler for inline query input. + + This handler is tolerant to different calling conventions used by the + framework: some callers provide `(call, query)`, others may provide + `(query,)` or `(query, call)` depending on context. Normalize the + inputs so the handler works from menus and forms alike. + """ + # Normalize parameters: try to find `call` (message or InlineCall) + call = None + if query is None and isinstance(call_or_query, str): + # Called as (query, ...) — search text is first argument + query = call_or_query + for a in args: + if hasattr(a, "edit") or isinstance(a, Message): + call = a + break + else: + # Expected calling convention: (call, query, ...) + call = call_or_query + + if call is None: + logger.error("_enter_query_handler: missing call/context") + return + + if not query: + await call.edit( + self.strings["?"], + reply_markup=[ + [ + { + "text": "🔄 " + self.strings["change_query"], + "callback": self._enter_query, + } + ] + ], + ) + return + + if len(query) <= 1: + await call.edit( + self.strings["?"], + reply_markup=[ + [ + { + "text": "🔄 " + self.strings["change_query"], + "callback": self._enter_query, + } + ] + ], + ) + return + + searcher = Search(query.lower(), self.ix) + try: + result = searcher.search_module() + except Exception: + await call.edit( + self.strings["?"], + reply_markup=[ + [ + { + "text": "🔄 " + self.strings["change_query"], + "callback": self._enter_query, + } + ] + ], + ) + return + + if not result: + await call.edit( + self.strings["404"].format(query=query), + reply_markup=[ + [ + { + "text": "🔄 " + self.strings["change_query"], + "callback": self._enter_query, + } + ], + [ + { + "text": self.strings.get("close", "❌ Close"), + "action": "close", + } + ], + ], + ) + return + + module_path = result[0] + module_info = self.modules[module_path] + await self._display_module(call, module_info, module_path, query, result, 0, {}) + + async def _enter_query(self, call: InlineCall, query: Optional[str] = None): + """Show input form for new query. + + Accepts an optional `query` when called from other menus so the + "back to results" button can restore the previous search context. + """ + markup = [ + [ + { + "text": "✍️ " + self.strings["enter_query"], + "input": self.strings["enter_query"], + "handler": self._enter_query_handler, + } + ], + [ + { + "text": self.strings["back_to_results"], + "callback": self._show_results, + "args": (query or "", {}), + } + ], + [ + { + "text": self.strings.get("close", "❌ Close"), + "action": "close", + } + ], + ] + + await call.edit(self.strings["enter_query"], reply_markup=markup) async def _show_global_results(self, call: InlineCall, query: str): searcher = Search(query.lower(), self.ix) @@ -610,14 +945,21 @@ class Limoka(loader.Module): return if not result: - await call.edit(self.strings["404"].format(query=query), reply_markup=[ - [{"text": "🔄 " + self.strings["change_query"], "callback": self._enter_query}] - ]) + await call.edit( + self.strings["404"].format(query=query), + reply_markup=[ + [ + { + "text": "🔄 " + self.strings["change_query"], + "callback": self._enter_query, + } + ] + ], + ) return text = self.strings["global_search"].format( - query=html.escape(query), - count=len(result) + query=html.escape(query), count=len(result) ) buttons = [] for i, path in enumerate(result[:15]): @@ -625,43 +967,56 @@ class Limoka(loader.Module): if not info: continue name = info.get("name", "Unknown") - buttons.append([ - { - "text": f"{i+1}. {name}", - "callback": self._display_module_from_global, - "args": (path, query, result) - } - ]) - buttons.append([{"text": self.strings["change_query"], "callback": self._enter_query}]) - - await call.edit( - text=text[:4096], - reply_markup=buttons + buttons.append( + [ + { + "text": f"{i+1}. {name}", + "callback": self._display_module_from_global, + "args": (path, query, result), + } + ] + ) + buttons.append( + [{"text": self.strings["change_query"], "callback": self._enter_query}] ) - async def _display_module_from_global(self, call: InlineCall, module_path: str, query: str, result: list): - module_info = self.modules[module_path] - await self._display_module(call, module_info, module_path, query, result, result.index(module_path), {}) + await call.edit(text=text[:4096], reply_markup=buttons) - async def _next_page(self, call: InlineCall, result: list, index: int, query: str, filters: dict): + async def _display_module_from_global( + self, call: InlineCall, module_path: str, query: str, result: list + ): + module_info = self.modules[module_path] + await self._display_module( + call, module_info, module_path, query, result, result.index(module_path), {} + ) + + async def _next_page( + self, call: InlineCall, result: list, index: int, query: str, filters: dict + ): if index + 1 >= len(result): - await call.answer("This is the last page!" if not hasattr(self, "strings_ru") else "Это последняя страница!") + await call.answer(self.strings["last_page"]) return index += 1 module_path = result[index] module_info = self.modules[module_path] - await self._display_module(call, module_info, module_path, query, result, index, filters) + await self._display_module( + call, module_info, module_path, query, result, index, filters + ) - async def _previous_page(self, call: InlineCall, result: list, index: int, query: str, filters: dict): + async def _previous_page( + self, call: InlineCall, result: list, index: int, query: str, filters: dict + ): if index - 1 < 0: - await call.answer("This is the first page!" if not hasattr(self, "strings_ru") else "Это первая страница!") + await call.answer(self.strings["first_page"]) return index -= 1 module_path = result[index] module_info = self.modules[module_path] - await self._display_module(call, module_info, module_path, query, result, index, filters) + await self._display_module( + call, module_info, module_path, query, result, index, filters + ) async def _inline_void(self, call: InlineCall): await call.answer() @@ -670,7 +1025,7 @@ class Limoka(loader.Module): async def limokacmd(self, message: Message): """[query / nothing] - Search modules""" args = utils.get_args_raw(message) - + if not args: markup = [ [ @@ -686,25 +1041,25 @@ class Limoka(loader.Module): "callback": self._show_global_form, "args": (message,), } - ] + ], ] - + # Close button on the main no-args form + markup.append( + [{"text": self.strings.get("close", "❌ Close"), "action": "close"}] + ) + await self.inline.form( - text="🔍 Limoka Search\n\n" - "Enter your query to search for Hikka modules:", + text=self.strings["start_search_form"], message=message, reply_markup=markup, - photo=self.fallback_banner ) return - if len(self._history) >= 10: - self._history = self._history[-9:] - self._history.append(args) - self.pointer("history", self._history) - - if len(args) <= 1: - return await utils.answer(message, self.strings["?"]) + history = self.get("history", []) + if len(history) >= 10: + history = history[-9:] + history.append(args) + self.set("history", history) await utils.answer( message, @@ -726,7 +1081,9 @@ class Limoka(loader.Module): module_path = result[0] module_info = self.modules[module_path] - await self._display_module(message, module_info, module_path, args, result, 0, {}) + await self._display_module( + message, module_info, module_path, args, result, 0, {} + ) async def _show_global_form(self, call: InlineCall, message: Message): markup = [ @@ -740,40 +1097,88 @@ class Limoka(loader.Module): ], [ { - "text": "🔙 Back", + "text": "🔙 " + self.strings["back"], "callback": self._inline_void, } - ] + ], + [ + { + "text": self.strings.get("close", "❌ Close"), + "action": "close", + } + ], ] - - await call.edit( - "🔍 Global Search\n\n" - "Enter your query to search ALL modules without filters:", - reply_markup=markup - ) - async def _global_search_handler(self, call: InlineCall, query: str, message: Message, *args, **kwargs): + await call.edit(self.strings["global_search_form"], reply_markup=markup) + + async def _global_search_handler( + self, call: InlineCall, query: str, message: Message, *args, **kwargs + ): if len(query) <= 1: - await call.edit(self.strings["?"], reply_markup=[[{"text": "🔄 Try again", "callback": lambda c: self._show_global_form(c, message)}]]) + await call.edit( + self.strings["?"], + reply_markup=[ + [ + { + "text": "🔄 " + self.strings["change_query"], + "callback": lambda c: self._show_global_form(c, message), + } + ], + [ + { + "text": self.strings.get("close", "❌ Close"), + "action": "close", + } + ], + ], + ) return - + searcher = Search(query.lower(), self.ix) try: result = searcher.search_module() except Exception: - await call.edit(self.strings["?"], reply_markup=[[{"text": "🔄 Try again", "callback": lambda c: self._show_global_form(c, message)}]]) + await call.edit( + self.strings["?"], + reply_markup=[ + [ + { + "text": "🔄 " + self.strings["change_query"], + "callback": lambda c: self._show_global_form(c, message), + } + ], + [ + { + "text": self.strings.get("close", "❌ Close"), + "action": "close", + } + ], + ], + ) return if not result: await call.edit( self.strings["404"].format(query=query), - reply_markup=[[{"text": "🔄 Try again", "callback": lambda c: self._show_global_form(c, message)}]] + reply_markup=[ + [ + { + "text": "🔄 " + self.strings["change_query"], + "callback": lambda c: self._show_global_form(c, message), + } + ], + [ + { + "text": self.strings.get("close", "❌ Close"), + "action": "close", + } + ], + ], ) return text = self.strings["global_search"].format( - query=html.escape(query), - count=len(result) + query=html.escape(query), count=len(result) ) buttons = [] for i, path in enumerate(result[:15]): @@ -781,31 +1186,54 @@ class Limoka(loader.Module): if not info: continue name = info.get("name", "Unknown") - buttons.append([ + buttons.append( + [ + { + "text": f"{i+1}. {name}", + "callback": self._display_module_from_global, + "args": (path, query, result), + } + ] + ) + buttons.append( + [ { - "text": f"{i+1}. {name}", - "callback": self._display_module_from_global, - "args": (path, query, result) + "text": "🔄 " + self.strings["change_query"], + "callback": lambda c: self._show_global_form(c, message), } - ]) - buttons.append([{"text": "🔄 " + self.strings["change_query"], "callback": lambda c: self._show_global_form(c, message)}]) - - await call.edit( - text=text[:4096], - reply_markup=buttons + ] + ) + buttons.append( + [{"text": self.strings.get("close", "❌ Close"), "action": "close"}] ) - @loader.command(ru_doc="— Показать историю поиска") + await call.edit(text=text[:4096], reply_markup=buttons) + + @loader.command(ru_doc="[clear] — Показать или очистить историю поиска") async def lshistorycmd(self, message: Message): - """ - Show search history""" - if not self._history: + """[clear] - Show or clear search history""" + args = utils.get_args_raw(message).strip().lower() + + if args == "clear": + self.set("history", []) + await utils.answer(message, self.strings["history_cleared"]) + return + + if args: + await utils.answer(message, self.strings["invalid_history_arg"]) + return + + history = self.get("history", []) + + if not history: await utils.answer(message, self.strings["empty_history"]) return - formatted_history = [f"{i+1}. {utils.escape_html(h)}" for i, h in enumerate(self._history[-10:])] + formatted_history = [ + f"{i+1}. {utils.escape_html(h)}" + for i, h in enumerate(history[-10:]) + ] await utils.answer( - message, - self.strings["history"].format( - history='\n'.join(formatted_history) - ) - ) \ No newline at end of file + message, + self.strings["history"].format(history="\n".join(formatted_history)), + )