diff --git a/Limoka.py b/Limoka.py
index ac7d4d0..fa5112b 100644
--- a/Limoka.py
+++ b/Limoka.py
@@ -1,6 +1,31 @@
# meta developer: @limokanews
# requires: whoosh
+# Limoka search module.
+
+# This module loads a remote `modules.json`, builds a Whoosh index and
+# exposes inline and chat commands to search and display module
+# information. It handles remote banner validation and falls back to an
+# external PNG hosted in the repository when a module banner is missing.
+# The fallback is provided as a URL (`self._fallback_banner_url`). Depending
+# on the client library the `photo` parameter may accept a URL, a file
+# path or a file-like object; this implementation prefers using the
+# external URL for the fallback.
+
+# Note: Expected `modules.json` record format:
+
+# {
+# "path/to/module.py": {
+# "name": "ModuleName",
+# "description": "Short description",
+# "meta": {"banner": "https://.../image.png", "developer": "@dev"},
+# "commands": [{"cmd1": "desc1"}, {"cmd2": "desc2"}],
+# "category": ["fun", "tools"]
+# }
+# }
+# Whoosh index in `userbotFolder/limoka_search/index`.
+
+
from whoosh.index import create_in, open_dir
from whoosh.fields import Schema, TEXT, ID
from whoosh.qparser import QueryParser, OrGroup
@@ -12,34 +37,31 @@ import logging
import os
import html
import json
-import re
-from datetime import datetime
+
import asyncio
from typing import Union, List, Dict, Any, Optional
from telethon.types import Message
from telethon.errors.rpcerrorlist import WebpageMediaEmptyError
-from telethon import events
+
try:
from aiogram.utils.exceptions import BadRequest
except ImportError:
from aiogram.exceptions import TelegramBadRequest as BadRequest
from .. import utils, loader
-from ..types import InlineQuery, InlineCall
+from ..types import InlineCall
logger = logging.getLogger("Limoka")
-__version__ = (1, 2, 1)
+__version__ = (1, 2, 3)
class Search:
def __init__(self, query, ix):
self.schema = Schema(
- title=TEXT(stored=True),
- path=ID(stored=True),
- content=TEXT(stored=True)
+ title=TEXT(stored=True), path=ID(stored=True), content=TEXT(stored=True)
)
self.query = query
self.ix = ix
@@ -67,7 +89,7 @@ class LimokaAPI:
@loader.tds
class Limoka(loader.Module):
- """Hikka modules are now in one place with easy searching!"""
+ """Modules are now in one place with easy searching!"""
strings = {
"name": "Limoka",
@@ -84,6 +106,12 @@ class Limoka(loader.Module):
"{commands}\n"
"🪄 {prefix}dlm {url}{module_path}"
),
+ "caption_short": (
+ "🔍 {safe_name}\n"
+ "ℹ️ Description: {safe_desc}\n"
+ "🧑💻 Dev: {dev_username}\n\n"
+ "🪄 {prefix}dlm {module_path}"
+ ),
"command_template": "{emoji} {prefix}{command} — {description}\n",
"emojis": {
1: "1️⃣",
@@ -138,6 +166,15 @@ class Limoka(loader.Module):
"inline_switch_pm": "💬 Open in chat",
"inline_switch_pm_text": "🔍 Results for: {query}",
"inline_start_message": "🔍 Limoka Search\n\nType module name or keyword",
+ "first_page": "This is the first page!",
+ "last_page": "This is the last page!",
+ "display_error": "Error displaying module. Please try again.",
+ "error_occurred": "An error occurred. Please try again.",
+ "start_search_form": "🔍 Limoka Search\n\nEnter your query to search for Hikka modules:",
+ "global_search_form": "🔍 Global Search\n\nEnter your query to search ALL modules without filters:",
+ "history_cleared": "🧹 Search history cleared!",
+ "invalid_history_arg": "❌ Invalid argument for history command. Use:\n.lshistory - show history\n.lshistory clear - clear history",
+ "close": "❌ Close",
}
strings_ru = {
@@ -155,6 +192,12 @@ class Limoka(loader.Module):
"{commands}\n"
"🪄 {prefix}dlm {url}{module_path}"
),
+ "caption_short": (
+ "🔍 {safe_name}\n"
+ "ℹ️ Описание: {safe_desc}\n"
+ "🧑💻 Разработчик: {dev_username}\n\n"
+ "🪄 {prefix}dlm {module_path}"
+ ),
"command_template": "{emoji} {prefix}{command} — {description}\n",
"emojis": {
1: "1️⃣",
@@ -209,6 +252,16 @@ class Limoka(loader.Module):
"inline_switch_pm": "💬 Открыть в чате",
"inline_switch_pm_text": "🔍 Результаты для: {query}",
"inline_start_message": "🔍 Limoka Поиск\n\nВведите название модуля или ключевое слово",
+ "first_page": "Это первая страница!",
+ "last_page": "Это последняя страница!",
+ "display_error": "Ошибка отображения модуля. Пожалуйста, попробуйте еще раз.",
+ "error_occurred": "Произошла ошибка. Пожалуйста, попробуйте еще раз.",
+ "start_search_form": "🔍 Limoka Поиск\n\nВведите ваш запрос для поиска модулей Hikka:",
+ "global_search_form": "🔍 Глобальный Поиск\n\nВведите запрос для поиска ВСЕХ модулей без фильтров:",
+ "history_cleared": "🧹 История поиска очищена!",
+ "invalid_history_arg": "❌ Неверный аргумент для команды истории. Используйте:\n.lshistory - показать историю\n.lshistory clear - очистить историю",
+ "close": "❌ Закрыть",
+ "_cls_doc": "Модули теперь в одном месте с простым и удобным поиском!",
}
def __init__(self):
@@ -223,16 +276,16 @@ class Limoka(loader.Module):
)
self.name = self.strings["name"]
self._invalid_banners = set()
- self.fallback_banner = "https://github.com/MuRuLOSE/limoka/raw/main/assets/limoka404.png"
+ # Also keep a convenient external fallback URL for plain search display
+ # (used when no valid banner is available and no filters are applied).
+ self._fallback_banner_url = "https://raw.githubusercontent.com/MuRuLOSE/limoka/refs/heads/main/assets/limoka404.png"
async def client_ready(self, client, db):
self.client = client
self.db = db
self.api = LimokaAPI()
self.schema = Schema(
- title=TEXT(stored=True),
- path=ID(stored=True),
- content=TEXT(stored=True)
+ title=TEXT(stored=True), path=ID(stored=True), content=TEXT(stored=True)
)
os.makedirs("limoka_search", exist_ok=True)
@@ -242,6 +295,7 @@ class Limoka(loader.Module):
self.ix = open_dir("limoka_search")
self._history = self.pointer("history", [])
+
self.modules = await self.api.get_all_modules(
f"{self.config['limokaurl']}modules.json"
)
@@ -253,34 +307,51 @@ class Limoka(loader.Module):
writer.add_document(
title=module_data["name"],
path=module_path,
- content=module_data["name"] + " " + (module_data["description"] or "")
+ content=module_data["name"] + " " + (module_data["description"] or ""),
)
for func in module_data["commands"]:
for command, description in func.items():
writer.add_document(
title=module_data["name"],
path=module_path,
- content=f"{command} {description}"
+ content=f"{command} {description}",
)
writer.commit()
- async def _validate_url(self, url: str) -> str:
+ async def _validate_url(self, url: str) -> Optional[str]:
+ """Validate a remote URL points to an image.
+
+ Args:
+ url: Remote URL to validate.
+
+ Returns:
+ The same URL if it points to an image and is reachable, otherwise
+ ``None``.
+
+ Side effects:
+ Adds invalid URLs to ``self._invalid_banners`` to avoid repeated
+ checks.
+ """
+ # Return the url if valid, otherwise None. Do not return or use
+ # a global fallback here; fallback handling is done by the caller
+ # based on display context.
if not url or url in self._invalid_banners:
- return self.fallback_banner
+ return None
try:
async with aiohttp.ClientSession() as session:
async with session.head(url, timeout=5) as response:
if response.status != 200:
self._invalid_banners.add(url)
- return self.fallback_banner
+ return None
content_type = response.headers.get("Content-Type", "").lower()
if not content_type.startswith("image/"):
self._invalid_banners.add(url)
- return self.fallback_banner
+ return None
return url
except (aiohttp.ClientError, asyncio.TimeoutError):
- self._invalid_banners.add(url)
- return self.fallback_banner
+ if url:
+ self._invalid_banners.add(url)
+ return None
def generate_commands(self, module_info):
commands = []
@@ -290,7 +361,7 @@ class Limoka(loader.Module):
break
for command, description in func.items():
emoji = self.strings["emojis"].get(i, "")
- desc = (description or self.strings["no_info"])
+ desc = description or self.strings["no_info"]
if len(desc) > 150:
desc = desc[:147] + "…"
commands.append(
@@ -303,187 +374,36 @@ class Limoka(loader.Module):
)
return commands[:5]
- async def _display_filter_menu(self, call: InlineCall, query: str, current_filters: dict):
- categories = current_filters.get("category", [])
- filters_text = self.strings["selected_categories"].format(
- categories=', '.join(categories) if categories else self.strings["no_category"]
- )
-
- markup = [
- [
- {"text": self.strings["filter_cat"], "callback": self._select_category, "args": (query, current_filters)},
- ],
- [
- {"text": self.strings["apply_filters"], "callback": self._apply_filters, "args": (query, current_filters)},
- {"text": self.strings["clear_filters"], "callback": self._clear_filters, "args": (query,)},
- ],
- [
- {"text": self.strings["back_to_results"], "callback": self._show_results, "args": (query, {})},
- ]
- ]
-
- text = self.strings["filter_menu"].format(query=query) + f"\n\n{filters_text}"
- await call.edit(text, reply_markup=markup)
-
- async def _select_category(self, call: InlineCall, query: str, current_filters: dict):
- all_categories = set()
- for module_data in self.modules.values():
- all_categories.update(module_data.get("category", ["No category"]))
- categories = sorted(all_categories)
-
- if not categories:
- await call.edit(self.strings["no_categories"], reply_markup=[
- [{"text": self.strings["back"], "callback": self._display_filter_menu, "args": (query, current_filters)}]
- ])
- return
-
- selected_categories = current_filters.get("category", [])
- buttons = []
- row = []
-
- for i, cat in enumerate(categories):
- button_text = (self.strings["category"].format(category=cat) if "category" in self.strings else f"📁 {cat}")
- if cat in selected_categories:
- button_text = "✅ " + button_text
-
- row.append({
- "text": button_text,
- "callback": self._toggle_category,
- "args": (query, current_filters, cat)
- })
-
- if (i + 1) % 3 == 0 or i == len(categories) - 1:
- buttons.append(row)
- row = []
-
- buttons.append([
- {"text": self.strings["back"], "callback": self._display_filter_menu, "args": (query, current_filters)}
- ])
-
- text = self.strings["select_category"].format(query=query)
- await call.edit(text, reply_markup=buttons)
-
- async def _toggle_category(self, call: InlineCall, query: str, current_filters: dict, category: str):
- new_filters = current_filters.copy()
- selected_categories = new_filters.get("category", [])
-
- if category in selected_categories:
- selected_categories.remove(category)
- else:
- selected_categories.append(category)
-
- if selected_categories:
- new_filters["category"] = selected_categories
- else:
- new_filters.pop("category", None)
-
- await self._select_category(call, query, new_filters)
-
- async def _apply_filters(self, call: InlineCall, query: str, filters: dict):
- await self._show_results(call, query, filters, from_filters=True)
-
- async def _clear_filters(self, call: InlineCall, query: str):
- await self._show_results(call, query, {}, from_filters=True)
-
- async def _show_results(self, call: InlineCall, query: str, filters: dict, from_filters: bool = False):
- searcher = Search(query.lower(), self.ix)
- try:
- result = searcher.search_module()
- except Exception:
- await call.edit(self.strings["?"], reply_markup=[])
- return
-
- if not result:
- markup = [[{"text": self.strings["back"], "callback": self._display_filter_menu, "args": (query, filters)}]] if from_filters else []
- await call.edit(self.strings["404"].format(query=query), reply_markup=markup)
- return
-
- if filters.get("category"):
- filtered_result = [
- path for path in result
- if any(cat in self.modules.get(path, {}).get("category", ["No category"]) for cat in filters["category"])
- ]
- else:
- filtered_result = result
-
- if not filtered_result:
- markup = [[{"text": self.strings["back"], "callback": self._display_filter_menu, "args": (query, filters)}]] if from_filters else []
- await call.edit(self.strings["404"].format(query=query), reply_markup=markup)
- return
-
- module_path = filtered_result[0]
- module_info = self.modules[module_path]
- await self._display_module(call, module_info, module_path, query, filtered_result, 0, filters)
-
- async def _enter_query_handler(self, call: InlineCall, query: str, *args, **kwargs):
- """Handler for inline query input"""
- if len(query) <= 1:
- await call.edit(self.strings["?"], reply_markup=[[{"text": "🔄 " + self.strings["change_query"], "callback": self._enter_query}]])
- return
- searcher = Search(query.lower(), self.ix)
- try:
- result = searcher.search_module()
- except Exception:
- await call.edit(self.strings["?"], reply_markup=[[{"text": "🔄 " + self.strings["change_query"], "callback": self._enter_query}]])
- return
-
- if not result:
- await call.edit(
- self.strings["404"].format(query=query),
- reply_markup=[[{"text": "🔄 " + self.strings["change_query"], "callback": self._enter_query}]]
- )
- return
-
- module_path = result[0]
- module_info = self.modules[module_path]
- await self._display_module(call, module_info, module_path, query, result, 0, {})
-
- async def _enter_query(self, call: InlineCall):
- """Show input form for new query"""
- markup = [
- [
- {
- "text": "✍️ " + self.strings["enter_query"],
- "input": self.strings["enter_query"],
- "handler": self._enter_query_handler,
- }
- ],
- [
- {
- "text": self.strings["back_to_results"],
- "callback": self._inline_void,
- }
- ]
- ]
-
- await call.edit(
- self.strings["enter_query"],
- reply_markup=markup
- )
-
- async def _display_module(
+ def _format_module_content(
self,
- message_or_call: Union[Message, InlineCall, None],
module_info: Dict[str, Any],
- module_path: str,
query: str,
- result: List[Any],
- index: int,
- filters: Dict[str, List[str]]
- ):
+ filters: Dict[str, List[str]],
+ include_categories: bool = True,
+ module_path: Optional[str] = None,
+ ) -> tuple:
+ """Formats the module content for display."""
name = html.escape(module_info.get("name") or self.strings["no_info"])
- description = html.escape(module_info.get("description") or self.strings["no_info"])
- dev_username = html.escape(module_info["meta"].get("developer", "Unknown"))
-
- clean_module_path = module_path.replace("\\", "/")
- banner_url = await self._validate_url(module_info["meta"].get("banner"))
- commands = self.generate_commands(module_info)
- page = index + 1
-
- categories = filters.get("category", [])
- filters_text = self.strings["selected_categories"].format(
- categories=', '.join(html.escape(c) for c in categories) if categories else self.strings["no_category"]
+ description = html.escape(
+ module_info.get("description") or self.strings["no_info"]
)
+ dev_username = html.escape(module_info["meta"].get("developer", "Unknown"))
+
+ # Prefer explicit module_path argument (caller provides the key),
+ # otherwise fall back to module_info['path'] if present.
+ raw_path = (
+ module_path if module_path is not None else module_info.get("path", "")
+ )
+ clean_module_path = (raw_path or "").replace("\\", "/")
+ commands = self.generate_commands(module_info)
+
+ categories_text = ""
+ if include_categories:
+ categories = filters.get("category", [])
+ if categories:
+ categories_text = "\n\n" + self.strings["selected_categories"].format(
+ categories=", ".join(html.escape(c) for c in categories)
+ )
if len(description) > 300:
description = description[:297] + "…"
@@ -499,25 +419,33 @@ class Limoka(loader.Module):
module_path=html.escape(clean_module_path),
)
- full_message = core_message[:4096]
+ full_message = core_message[:4096] + categories_text[:100]
caption_message = full_message
if len(caption_message) > 1024:
- safe_query = html.escape(query[:30]) + ("..." if len(query) > 30 else "")
safe_name = name[:40] + ("..." if len(name) > 40 else "")
safe_desc = description[:100] + ("…" if len(description) > 100 else "")
-
- caption_message = (
- f"🔍 {safe_name}\n"
- f"ℹ️ Desc: {safe_desc}\n"
- f"🧑💻 Dev: {dev_username}\n\n"
- f"🪄 {self.get_prefix()}dlm {self.config['limokaurl']}{html.escape(clean_module_path)}"
+
+ caption_message = self.strings["caption_short"].format(
+ safe_name=safe_name,
+ safe_desc=safe_desc,
+ dev_username=dev_username,
+ prefix=self.get_prefix(),
+ module_path=html.escape(self.config["limokaurl"] + clean_module_path),
)[:1024]
- static_suffix = f"\n{filters_text}"
- if len(caption_message) + len(static_suffix) <= 1024:
- caption_message += static_suffix
+ if categories_text:
+ remaining_space = 1024 - len(caption_message)
+ if remaining_space > 0:
+ caption_message += categories_text[:remaining_space]
+ return caption_message, full_message
+
+ def _build_navigation_markup(
+ self, result: List[str], index: int, query: str, filters: Dict[str, List[str]]
+ ) -> list:
+ """Create navigation markup for inline results."""
+ page = index + 1
markup = [
[
{
@@ -528,78 +456,485 @@ class Limoka(loader.Module):
{"text": f"{page}/{len(result)}", "callback": self._inline_void},
{
"text": "⏩" if index + 1 < len(result) else "🚫",
- "callback": self._next_page if index + 1 < len(result) else self._inline_void,
- "args": (result, index, query, filters) if index + 1 < len(result) else (),
+ "callback": (
+ self._next_page
+ if index + 1 < len(result)
+ else self._inline_void
+ ),
+ "args": (
+ (result, index, query, filters)
+ if index + 1 < len(result)
+ else ()
+ ),
},
],
[
- {"text": "🔍 " + self.strings["filter_menu"].split(":")[0], "callback": self._display_filter_menu, "args": (query, filters)},
- {"text": "🔄 " + self.strings["change_query"], "callback": self._enter_query},
+ {
+ "text": "🔍 " + self.strings["filter_menu"].split(":")[0],
+ "callback": self._display_filter_menu,
+ "args": (query, filters),
+ },
+ {
+ "text": "🔄 " + self.strings["change_query"],
+ "callback": self._enter_query,
+ },
],
[
- {"text": self.strings["global_button"], "callback": self._show_global_results, "args": (query,)},
- ]
+ {
+ "text": self.strings["global_button"],
+ "callback": self._show_global_results,
+ "args": (query,),
+ },
+ ],
]
+ # Add a universal close button to the navigation markup
+ markup.append(
+ [{"text": self.strings.get("close", "❌ Close"), "action": "close"}]
+ )
+ return markup
+ async def _safe_display(
+ self,
+ message_or_call: Union[Message, InlineCall],
+ text: str,
+ markup: list,
+ photo: Optional[Any] = None,
+ ):
+ """Safely display module information, handling potential errors."""
try:
if message_or_call is None:
- logger.error("message_or_call is None in _display_module")
+ logger.error("message_or_call is None in _safe_display")
return
if isinstance(message_or_call, Message):
- if banner_url and banner_url != self.fallback_banner:
+ if photo is not None:
+ # photo can be a URL/str, file path or a file-like object
await self.inline.form(
- text=caption_message,
+ text=text,
message=message_or_call,
reply_markup=markup,
- photo=banner_url
+ photo=photo,
)
else:
await self.inline.form(
- text=full_message,
- message=message_or_call,
- reply_markup=markup
+ text=text, message=message_or_call, reply_markup=markup
)
else:
- if banner_url and banner_url != self.fallback_banner:
+ if photo is not None:
await message_or_call.edit(
- text=caption_message,
- reply_markup=markup,
- photo=banner_url
+ text=text, reply_markup=markup, photo=photo
)
else:
- await message_or_call.edit(
- text=full_message,
- reply_markup=markup
- )
+ await message_or_call.edit(text=text, reply_markup=markup)
except (BadRequest, WebpageMediaEmptyError) as e:
+ logger.exception(f"Error in _safe_display: {e}")
+ if isinstance(message_or_call, Message):
+ await utils.answer(message_or_call, self.strings["display_error"])
+ elif hasattr(message_or_call, "edit"):
+ await message_or_call.edit(self.strings["display_error"])
+
+ async def _display_module(
+ self,
+ message_or_call: Union[Message, InlineCall],
+ module_info: Dict[str, Any],
+ module_path: str,
+ query: str,
+ result: List[str],
+ index: int,
+ filters: Dict[str, List[str]],
+ ):
+ """Display module information with banner and formatted content.
+
+ Args:
+ message_or_call: Message or InlineCall object where the module
+ will be displayed.
+ module_info: Dictionary with module metadata (name, description,
+ meta.banner, commands, category).
+ module_path: Path key of the module in `self.modules`.
+ query: Original search query string.
+ result: Full list of matched module paths.
+ index: Index of the current module in `result`.
+ filters: Active filters (e.g., categories). If ``filters`` is
+ empty and no valid remote banner exists, the external fallback
+ URL (`self._fallback_banner_url`) will be used.
+
+ Notes:
+ The method attempts to validate a remote banner URL via
+ :meth:`_validate_url`. If validation succeeds the remote URL is
+ passed to the messaging client. If validation fails and ``filters``
+ is empty, the external fallback URL (`self._fallback_banner_url`)
+ will be used. Behavior may vary depending on the messaging client
+ used (Telethon/aiogram/etc.).
+ """
+ try:
+ banner_url = await self._validate_url(module_info["meta"].get("banner"))
+
+ caption_message, full_message = self._format_module_content(
+ module_info,
+ query,
+ filters,
+ include_categories=True,
+ module_path=module_path,
+ )
+
+ markup = self._build_navigation_markup(result, index, query, filters)
+
+ # Determine which banner to use. If banner_url is valid, use it.
+ # If no valid banner and no filters are applied (normal search display),
+ # create an in-memory BytesIO from the embedded base64 and use it.
+ banner_to_use = None
+ if banner_url:
+ banner_to_use = banner_url
+ else:
+ if not filters:
+ # Use external fallback URL for plain search display.
+ banner_to_use = getattr(self, "_fallback_banner_url", None)
+
+ display_text = caption_message if banner_to_use else full_message
+ await self._safe_display(
+ message_or_call, display_text, markup, banner_to_use
+ )
+
+ except Exception as e:
logger.exception(f"Error in _display_module: {e}")
- if message_or_call is None:
- return
-
- try:
- if isinstance(message_or_call, Message):
- target_message = message_or_call
- elif hasattr(message_or_call, 'message') and isinstance(message_or_call.message, Message):
- target_message = message_or_call.message
- else:
- target_message = await self.client.send_message(
- self._me,
- "Error occurred, please try again."
- )
-
- await self.inline.form(
- text=full_message[:4096],
- message=target_message,
- reply_markup=markup
+ if isinstance(message_or_call, Message):
+ await utils.answer(message_or_call, self.strings["error_occurred"])
+ elif hasattr(message_or_call, "edit"):
+ await message_or_call.edit(self.strings["error_occurred"])
+
+ async def _display_filter_menu(
+ self, call: InlineCall, query: str, current_filters: dict
+ ):
+ categories = current_filters.get("category", [])
+ filters_text = self.strings["selected_categories"].format(
+ categories=(
+ ", ".join(categories) if categories else self.strings["no_category"]
+ )
+ )
+
+ markup = [
+ [
+ {
+ "text": self.strings["filter_cat"],
+ "callback": self._select_category,
+ "args": (query, current_filters),
+ },
+ ],
+ [
+ {
+ "text": self.strings["apply_filters"],
+ "callback": self._apply_filters,
+ "args": (query, current_filters),
+ },
+ {
+ "text": self.strings["clear_filters"],
+ "callback": self._clear_filters,
+ "args": (query,),
+ },
+ ],
+ [
+ {
+ "text": self.strings["back_to_results"],
+ "callback": self._show_results,
+ "args": (query, {}, True),
+ },
+ ],
+ [{"text": self.strings.get("close", "❌ Close"), "action": "close"}],
+ ]
+
+ text = self.strings["filter_menu"].format(query=query) + f"\n\n{filters_text}"
+ await call.edit(text, reply_markup=markup)
+
+ async def _select_category(
+ self, call: InlineCall, query: str, current_filters: dict
+ ):
+ all_categories = set()
+ for module_data in self.modules.values():
+ all_categories.update(module_data.get("category", ["No category"]))
+ categories = sorted(all_categories)
+
+ if not categories:
+ await call.edit(
+ self.strings["no_categories"],
+ reply_markup=[
+ [
+ {
+ "text": self.strings["back"],
+ "callback": self._display_filter_menu,
+ "args": (query, current_filters),
+ }
+ ]
+ ],
+ )
+ return
+
+ selected_categories = current_filters.get("category", [])
+ buttons = []
+ row = []
+
+ for i, cat in enumerate(categories):
+ button_text = (
+ self.strings["category"].format(category=cat)
+ if "category" in self.strings
+ else f"📁 {cat}"
+ )
+ if cat in selected_categories:
+ button_text = "✅ " + button_text
+
+ row.append(
+ {
+ "text": button_text,
+ "callback": self._toggle_category,
+ "args": (query, current_filters, cat),
+ }
+ )
+
+ if (i + 1) % 3 == 0 or i == len(categories) - 1:
+ buttons.append(row)
+ row = []
+
+ buttons.append(
+ [
+ {
+ "text": self.strings["back"],
+ "callback": self._display_filter_menu,
+ "args": (query, current_filters),
+ }
+ ]
+ )
+
+ # Add close button to category selector
+ buttons.append(
+ [{"text": self.strings.get("close", "❌ Close"), "action": "close"}]
+ )
+
+ text = self.strings["select_category"].format(query=query)
+ await call.edit(text, reply_markup=buttons)
+
+ async def _toggle_category(
+ self, call: InlineCall, query: str, current_filters: dict, category: str
+ ):
+ new_filters = current_filters.copy()
+ selected_categories = new_filters.get("category", [])
+
+ if category in selected_categories:
+ selected_categories.remove(category)
+ else:
+ selected_categories.append(category)
+
+ if selected_categories:
+ new_filters["category"] = selected_categories
+ else:
+ new_filters.pop("category", None)
+
+ await self._select_category(call, query, new_filters)
+
+ async def _apply_filters(self, call: InlineCall, query: str, filters: dict):
+ await self._show_results(call, query, filters, from_filters=True)
+
+ async def _clear_filters(self, call: InlineCall, query: str):
+ await self._show_results(call, query, {}, from_filters=True)
+
+ async def _show_results(
+ self, call: InlineCall, query: str, filters: dict, from_filters: bool = False
+ ):
+ searcher = Search(query.lower(), self.ix)
+ try:
+ result = searcher.search_module()
+ except Exception:
+ await call.edit(self.strings["?"], reply_markup=[])
+ return
+
+ if not result:
+ markup = (
+ [
+ [
+ {
+ "text": self.strings["back"],
+ "callback": self._display_filter_menu,
+ "args": (query, filters),
+ }
+ ]
+ ]
+ if from_filters
+ else []
+ )
+ # Always provide a close button on empty-result screens
+ markup.append(
+ [{"text": self.strings.get("close", "❌ Close"), "action": "close"}]
+ )
+ await call.edit(
+ self.strings["404"].format(query=query), reply_markup=markup
+ )
+ return
+
+ if filters.get("category"):
+ filtered_result = [
+ path
+ for path in result
+ if any(
+ cat in self.modules.get(path, {}).get("category", ["No category"])
+ for cat in filters["category"]
)
- except Exception as inner_e:
- logger.exception(f"Secondary error in error handling: {inner_e}")
- try:
- if isinstance(message_or_call, Message):
- await utils.answer(message_or_call, "Error displaying module. Please try again.")
- except Exception:
- pass
+ ]
+ else:
+ filtered_result = result
+
+ if not filtered_result:
+ markup = (
+ [
+ [
+ {
+ "text": self.strings["back"],
+ "callback": self._display_filter_menu,
+ "args": (query, filters),
+ }
+ ]
+ ]
+ if from_filters
+ else []
+ )
+ # Add close button when filtered results are empty
+ markup.append(
+ [{"text": self.strings.get("close", "❌ Close"), "action": "close"}]
+ )
+ await call.edit(
+ self.strings["404"].format(query=query), reply_markup=markup
+ )
+ return
+
+ module_path = filtered_result[0]
+ module_info = self.modules[module_path]
+ await self._display_module(
+ call, module_info, module_path, query, filtered_result, 0, filters
+ )
+
+ async def _enter_query_handler(
+ self, call_or_query, query: Optional[str] = None, *args, **kwargs
+ ):
+ """Handler for inline query input.
+
+ This handler is tolerant to different calling conventions used by the
+ framework: some callers provide `(call, query)`, others may provide
+ `(query,)` or `(query, call)` depending on context. Normalize the
+ inputs so the handler works from menus and forms alike.
+ """
+ # Normalize parameters: try to find `call` (message or InlineCall)
+ call = None
+ if query is None and isinstance(call_or_query, str):
+ # Called as (query, ...) — search text is first argument
+ query = call_or_query
+ for a in args:
+ if hasattr(a, "edit") or isinstance(a, Message):
+ call = a
+ break
+ else:
+ # Expected calling convention: (call, query, ...)
+ call = call_or_query
+
+ if call is None:
+ logger.error("_enter_query_handler: missing call/context")
+ return
+
+ if not query:
+ await call.edit(
+ self.strings["?"],
+ reply_markup=[
+ [
+ {
+ "text": "🔄 " + self.strings["change_query"],
+ "callback": self._enter_query,
+ }
+ ]
+ ],
+ )
+ return
+
+ if len(query) <= 1:
+ await call.edit(
+ self.strings["?"],
+ reply_markup=[
+ [
+ {
+ "text": "🔄 " + self.strings["change_query"],
+ "callback": self._enter_query,
+ }
+ ]
+ ],
+ )
+ return
+
+ searcher = Search(query.lower(), self.ix)
+ try:
+ result = searcher.search_module()
+ except Exception:
+ await call.edit(
+ self.strings["?"],
+ reply_markup=[
+ [
+ {
+ "text": "🔄 " + self.strings["change_query"],
+ "callback": self._enter_query,
+ }
+ ]
+ ],
+ )
+ return
+
+ if not result:
+ await call.edit(
+ self.strings["404"].format(query=query),
+ reply_markup=[
+ [
+ {
+ "text": "🔄 " + self.strings["change_query"],
+ "callback": self._enter_query,
+ }
+ ],
+ [
+ {
+ "text": self.strings.get("close", "❌ Close"),
+ "action": "close",
+ }
+ ],
+ ],
+ )
+ return
+
+ module_path = result[0]
+ module_info = self.modules[module_path]
+ await self._display_module(call, module_info, module_path, query, result, 0, {})
+
+ async def _enter_query(self, call: InlineCall, query: Optional[str] = None):
+ """Show input form for new query.
+
+ Accepts an optional `query` when called from other menus so the
+ "back to results" button can restore the previous search context.
+ """
+ markup = [
+ [
+ {
+ "text": "✍️ " + self.strings["enter_query"],
+ "input": self.strings["enter_query"],
+ "handler": self._enter_query_handler,
+ }
+ ],
+ [
+ {
+ "text": self.strings["back_to_results"],
+ "callback": self._show_results,
+ "args": (query or "", {}),
+ }
+ ],
+ [
+ {
+ "text": self.strings.get("close", "❌ Close"),
+ "action": "close",
+ }
+ ],
+ ]
+
+ await call.edit(self.strings["enter_query"], reply_markup=markup)
async def _show_global_results(self, call: InlineCall, query: str):
searcher = Search(query.lower(), self.ix)
@@ -610,14 +945,21 @@ class Limoka(loader.Module):
return
if not result:
- await call.edit(self.strings["404"].format(query=query), reply_markup=[
- [{"text": "🔄 " + self.strings["change_query"], "callback": self._enter_query}]
- ])
+ await call.edit(
+ self.strings["404"].format(query=query),
+ reply_markup=[
+ [
+ {
+ "text": "🔄 " + self.strings["change_query"],
+ "callback": self._enter_query,
+ }
+ ]
+ ],
+ )
return
text = self.strings["global_search"].format(
- query=html.escape(query),
- count=len(result)
+ query=html.escape(query), count=len(result)
)
buttons = []
for i, path in enumerate(result[:15]):
@@ -625,43 +967,56 @@ class Limoka(loader.Module):
if not info:
continue
name = info.get("name", "Unknown")
- buttons.append([
- {
- "text": f"{i+1}. {name}",
- "callback": self._display_module_from_global,
- "args": (path, query, result)
- }
- ])
- buttons.append([{"text": self.strings["change_query"], "callback": self._enter_query}])
-
- await call.edit(
- text=text[:4096],
- reply_markup=buttons
+ buttons.append(
+ [
+ {
+ "text": f"{i+1}. {name}",
+ "callback": self._display_module_from_global,
+ "args": (path, query, result),
+ }
+ ]
+ )
+ buttons.append(
+ [{"text": self.strings["change_query"], "callback": self._enter_query}]
)
- async def _display_module_from_global(self, call: InlineCall, module_path: str, query: str, result: list):
- module_info = self.modules[module_path]
- await self._display_module(call, module_info, module_path, query, result, result.index(module_path), {})
+ await call.edit(text=text[:4096], reply_markup=buttons)
- async def _next_page(self, call: InlineCall, result: list, index: int, query: str, filters: dict):
+ async def _display_module_from_global(
+ self, call: InlineCall, module_path: str, query: str, result: list
+ ):
+ module_info = self.modules[module_path]
+ await self._display_module(
+ call, module_info, module_path, query, result, result.index(module_path), {}
+ )
+
+ async def _next_page(
+ self, call: InlineCall, result: list, index: int, query: str, filters: dict
+ ):
if index + 1 >= len(result):
- await call.answer("This is the last page!" if not hasattr(self, "strings_ru") else "Это последняя страница!")
+ await call.answer(self.strings["last_page"])
return
index += 1
module_path = result[index]
module_info = self.modules[module_path]
- await self._display_module(call, module_info, module_path, query, result, index, filters)
+ await self._display_module(
+ call, module_info, module_path, query, result, index, filters
+ )
- async def _previous_page(self, call: InlineCall, result: list, index: int, query: str, filters: dict):
+ async def _previous_page(
+ self, call: InlineCall, result: list, index: int, query: str, filters: dict
+ ):
if index - 1 < 0:
- await call.answer("This is the first page!" if not hasattr(self, "strings_ru") else "Это первая страница!")
+ await call.answer(self.strings["first_page"])
return
index -= 1
module_path = result[index]
module_info = self.modules[module_path]
- await self._display_module(call, module_info, module_path, query, result, index, filters)
+ await self._display_module(
+ call, module_info, module_path, query, result, index, filters
+ )
async def _inline_void(self, call: InlineCall):
await call.answer()
@@ -670,7 +1025,7 @@ class Limoka(loader.Module):
async def limokacmd(self, message: Message):
"""[query / nothing] - Search modules"""
args = utils.get_args_raw(message)
-
+
if not args:
markup = [
[
@@ -686,25 +1041,25 @@ class Limoka(loader.Module):
"callback": self._show_global_form,
"args": (message,),
}
- ]
+ ],
]
-
+ # Close button on the main no-args form
+ markup.append(
+ [{"text": self.strings.get("close", "❌ Close"), "action": "close"}]
+ )
+
await self.inline.form(
- text="🔍 Limoka Search\n\n"
- "Enter your query to search for Hikka modules:",
+ text=self.strings["start_search_form"],
message=message,
reply_markup=markup,
- photo=self.fallback_banner
)
return
- if len(self._history) >= 10:
- self._history = self._history[-9:]
- self._history.append(args)
- self.pointer("history", self._history)
-
- if len(args) <= 1:
- return await utils.answer(message, self.strings["?"])
+ history = self.get("history", [])
+ if len(history) >= 10:
+ history = history[-9:]
+ history.append(args)
+ self.set("history", history)
await utils.answer(
message,
@@ -726,7 +1081,9 @@ class Limoka(loader.Module):
module_path = result[0]
module_info = self.modules[module_path]
- await self._display_module(message, module_info, module_path, args, result, 0, {})
+ await self._display_module(
+ message, module_info, module_path, args, result, 0, {}
+ )
async def _show_global_form(self, call: InlineCall, message: Message):
markup = [
@@ -740,40 +1097,88 @@ class Limoka(loader.Module):
],
[
{
- "text": "🔙 Back",
+ "text": "🔙 " + self.strings["back"],
"callback": self._inline_void,
}
- ]
+ ],
+ [
+ {
+ "text": self.strings.get("close", "❌ Close"),
+ "action": "close",
+ }
+ ],
]
-
- await call.edit(
- "🔍 Global Search\n\n"
- "Enter your query to search ALL modules without filters:",
- reply_markup=markup
- )
- async def _global_search_handler(self, call: InlineCall, query: str, message: Message, *args, **kwargs):
+ await call.edit(self.strings["global_search_form"], reply_markup=markup)
+
+ async def _global_search_handler(
+ self, call: InlineCall, query: str, message: Message, *args, **kwargs
+ ):
if len(query) <= 1:
- await call.edit(self.strings["?"], reply_markup=[[{"text": "🔄 Try again", "callback": lambda c: self._show_global_form(c, message)}]])
+ await call.edit(
+ self.strings["?"],
+ reply_markup=[
+ [
+ {
+ "text": "🔄 " + self.strings["change_query"],
+ "callback": lambda c: self._show_global_form(c, message),
+ }
+ ],
+ [
+ {
+ "text": self.strings.get("close", "❌ Close"),
+ "action": "close",
+ }
+ ],
+ ],
+ )
return
-
+
searcher = Search(query.lower(), self.ix)
try:
result = searcher.search_module()
except Exception:
- await call.edit(self.strings["?"], reply_markup=[[{"text": "🔄 Try again", "callback": lambda c: self._show_global_form(c, message)}]])
+ await call.edit(
+ self.strings["?"],
+ reply_markup=[
+ [
+ {
+ "text": "🔄 " + self.strings["change_query"],
+ "callback": lambda c: self._show_global_form(c, message),
+ }
+ ],
+ [
+ {
+ "text": self.strings.get("close", "❌ Close"),
+ "action": "close",
+ }
+ ],
+ ],
+ )
return
if not result:
await call.edit(
self.strings["404"].format(query=query),
- reply_markup=[[{"text": "🔄 Try again", "callback": lambda c: self._show_global_form(c, message)}]]
+ reply_markup=[
+ [
+ {
+ "text": "🔄 " + self.strings["change_query"],
+ "callback": lambda c: self._show_global_form(c, message),
+ }
+ ],
+ [
+ {
+ "text": self.strings.get("close", "❌ Close"),
+ "action": "close",
+ }
+ ],
+ ],
)
return
text = self.strings["global_search"].format(
- query=html.escape(query),
- count=len(result)
+ query=html.escape(query), count=len(result)
)
buttons = []
for i, path in enumerate(result[:15]):
@@ -781,31 +1186,54 @@ class Limoka(loader.Module):
if not info:
continue
name = info.get("name", "Unknown")
- buttons.append([
+ buttons.append(
+ [
+ {
+ "text": f"{i+1}. {name}",
+ "callback": self._display_module_from_global,
+ "args": (path, query, result),
+ }
+ ]
+ )
+ buttons.append(
+ [
{
- "text": f"{i+1}. {name}",
- "callback": self._display_module_from_global,
- "args": (path, query, result)
+ "text": "🔄 " + self.strings["change_query"],
+ "callback": lambda c: self._show_global_form(c, message),
}
- ])
- buttons.append([{"text": "🔄 " + self.strings["change_query"], "callback": lambda c: self._show_global_form(c, message)}])
-
- await call.edit(
- text=text[:4096],
- reply_markup=buttons
+ ]
+ )
+ buttons.append(
+ [{"text": self.strings.get("close", "❌ Close"), "action": "close"}]
)
- @loader.command(ru_doc="— Показать историю поиска")
+ await call.edit(text=text[:4096], reply_markup=buttons)
+
+ @loader.command(ru_doc="[clear] — Показать или очистить историю поиска")
async def lshistorycmd(self, message: Message):
- """ - Show search history"""
- if not self._history:
+ """[clear] - Show or clear search history"""
+ args = utils.get_args_raw(message).strip().lower()
+
+ if args == "clear":
+ self.set("history", [])
+ await utils.answer(message, self.strings["history_cleared"])
+ return
+
+ if args:
+ await utils.answer(message, self.strings["invalid_history_arg"])
+ return
+
+ history = self.get("history", [])
+
+ if not history:
await utils.answer(message, self.strings["empty_history"])
return
- formatted_history = [f"{i+1}. {utils.escape_html(h)}" for i, h in enumerate(self._history[-10:])]
+ formatted_history = [
+ f"{i+1}. {utils.escape_html(h)}"
+ for i, h in enumerate(history[-10:])
+ ]
await utils.answer(
- message,
- self.strings["history"].format(
- history='\n'.join(formatted_history)
- )
- )
\ No newline at end of file
+ message,
+ self.strings["history"].format(history="\n".join(formatted_history)),
+ )