From 6d8e2db072e213eb53f1bc62ef5236f3f11da6a2 Mon Sep 17 00:00:00 2001 From: John Doe Date: Sun, 12 Apr 2026 20:28:13 +0300 Subject: [PATCH] feat: now supports emojies in inline + little refactor --- Limoka.py | 671 ++++++++++++++++++++++++++++++++---------------------- 1 file changed, 405 insertions(+), 266 deletions(-) diff --git a/Limoka.py b/Limoka.py index b787ef3..7df6a7a 100644 --- a/Limoka.py +++ b/Limoka.py @@ -33,7 +33,7 @@ from .. import utils, loader from ..types import InlineCall logger = logging.getLogger("Limoka") -__version__ = (1, 4, 4) +__version__ = (1, 4, 5) WEIGHTS = { "inline.token_obtainment": 15, @@ -54,17 +54,25 @@ def _get_lang_value(data: Dict[str, Any], lang: str) -> str: return data.get(lang, data.get("default", data.get("en", ""))) -class Search: - def __init__(self, query, ix): +class SearchIndex: + """Handles full-text search operations.""" + + def __init__(self, query: str, index): + """ + Args: + query: Search query string + index: Whoosh index instance + """ self.schema = Schema( title=TEXT(stored=True), path=ID(stored=True), content=TEXT(stored=True) ) self.query = query - self.ix = ix + self.index = index - def search_module(self): - with self.ix.searcher() as searcher: - parser = QueryParser("content", self.ix.schema, group=OrGroup.factory(0.8)) + def search(self) -> List[str]: + """Execute search and return list of module paths.""" + with self.index.searcher() as searcher: + parser = QueryParser("content", self.index.schema, group=OrGroup.factory(0.8)) query = parser.parse(self.query) wildcard_query = Wildcard("content", f"*{self.query}*") fuzzy_query = FuzzyTerm("content", self.query, maxdist=2, prefixlength=1) @@ -75,12 +83,257 @@ class Search: return [] -class LimokaAPI: - async def fetch_json(self, base_url, path): - url = f"{base_url}{path}" +class APIClient: + """Handles HTTP requests for fetching JSON data.""" + + async def fetch_json(self, base_url: str, path: str) -> Dict[str, Any]: + """ + Fetch JSON data from a URL. + + Args: + base_url: Base URL for the API + path: Path to append to base URL + + Returns: + Parsed JSON as dictionary + """ async with aiohttp.ClientSession() as session: - async with session.get(url) as response: - return json.loads(await response.text()) + async with session.get(f"{base_url}{path}") as resp: + if resp.status == 200: + return json.loads(await resp.text()) + return {} + + +class ModuleRepository: + """Manages module data and filtering.""" + + def __init__(self, modules: Dict[str, Any], repositories: Dict[str, Any]): + """ + Args: + modules: Dictionary of module definitions + repositories: Dictionary of repository metadata + """ + self.modules = modules + self.repositories = repositories + + def apply_newbie_filter(self, filter_enabled: bool) -> Dict[str, Any]: + """ + Filter out modules from repositories tagged as 'newbie'. + + Args: + filter_enabled: Whether filtering is enabled + + Returns: + Filtered modules dictionary + """ + if not filter_enabled: + return self.modules + + filtered = {} + for path, info in self.modules.items(): + repo_key = "/".join(path.split("/")[:2]) if "/" in path else path + repo = self.repositories.get(repo_key) + tags = repo.get("tags", []) if repo else [] + if "newbie" not in tags: + filtered[path] = info + return filtered + + def get_tags_for_module(self, module_path: str) -> List[str]: + """Get repository tags for a specific module.""" + repo_key = "/".join(module_path.split("/")[:2]) if "/" in module_path else module_path + for repo_url in self.repositories: + if repo_url.replace("https://github.com/", "") == repo_key: + return self.repositories[repo_url].get("tags", []) + return [] + + def get_all_categories(self) -> List[str]: + """Get all unique categories from modules.""" + categories = set() + for module_data in self.modules.values(): + categories.update(module_data.get("category", ["No category"])) + return sorted(categories) + + +class CommandFormatter: + """Formats module commands and metadata.""" + + def __init__(self, strings: Dict[str, Any], bot_username: str, prefix: str): + """ + Args: + strings: Localized strings dictionary + bot_username: Bot username for inline handlers + prefix: Command prefix + """ + self.strings = strings + self.bot_username = bot_username + self.prefix = prefix + + def format_commands(self, module_info: Dict[str, Any], lang: str = "en") -> List[str]: + """ + Format module commands and handlers into display strings. + + Args: + module_info: Module information dictionary + lang: Language code + + Returns: + List of formatted command strings + """ + commands = [] + for i, cmd in enumerate(module_info.get("new_commands", []), 1): + name = cmd.get("name", "") + desc_map = cmd.get("description", {}) + emoji = self._get_emoji_for_number(i) + desc = _get_lang_value(desc_map, lang) or self.strings["no_info"] + commands.append( + self.strings["command_template"].format( + prefix=self.prefix, + command=html.escape(name), + emoji=emoji, + description=html.escape(desc), + ) + ) + + for handler in module_info.get("inline_handlers", []): + name = handler.get("name", "") + desc_map = handler.get("description", {}) + desc = _get_lang_value(desc_map, lang) or self.strings["no_info"] + commands.append( + self.strings["inline_handler_template"].format( + inline_bot=self.bot_username, + command=html.escape(name), + description=html.escape(desc), + ) + ) + return commands + + def _get_emoji_for_number(self, num: int) -> str: + """Get emoji representation for a number.""" + if num <= 10: + return self.strings["emojis"].get(num, "") + emoji = "" + for digit in str(num): + emoji += self.strings["emojis"].get(int(digit), "") + return emoji + + +class ModuleContentBuilder: + """Builds formatted module content for display.""" + + def __init__(self, strings: Dict[str, Any], formatter: CommandFormatter, repository: ModuleRepository): + """ + Args: + strings: Localized strings dictionary + formatter: CommandFormatter instance + repository: ModuleRepository instance + """ + self.strings = strings + self.formatter = formatter + self.repository = repository + + def build_content( + self, + module_info: Dict[str, Any], + query: str, + filters: Dict[str, List[str]], + include_categories: bool = True, + module_path: Optional[str] = None, + lang: str = "en", + ) -> tuple: + """ + Build complete formatted module content. + + Returns: + Tuple of (header, body_pages, footer, categories_text) + """ + name = html.escape(module_info.get("name") or self.strings["no_info"]) + cls_doc = module_info.get("cls_doc", {}) + description = html.escape( + _get_lang_value(cls_doc, lang) + or _get_lang_value(module_info.get("description", ""), lang) + or self.strings["no_info"] + ) + dev_username = html.escape(module_info["meta"].get("developer") or "Unknown") + + if len(description) > 300: + description = description[:297] + "…" + + categories_text = self._build_categories_text(filters) + commands = self.formatter.format_commands(module_info, lang) + header = self._build_header(query, name, description, dev_username, module_path) + footer = self._build_footer(module_path) + body_pages = self._paginate_commands(commands) + + return header, body_pages, footer, categories_text + + def _build_header(self, query: str, name: str, description: str, dev_username: str, module_path: Optional[str]) -> str: + """Build message header with module info and tags.""" + tags_list = self.repository.get_tags_for_module(module_path) if module_path else [] + tags_text = ", ".join(self.strings["tags"].get(tag, tag) for tag in tags_list) + + header_template = self.strings["found_header"] + if not tags_text: + header_template = header_template.replace( + "
🏷 Tags: {tags}
\n\n", + "" + ) + header_template = header_template.replace( + "
🏷 Теги: {tags}
\n\n", + "" + ) + + return header_template.format( + query=html.escape(query), + name=name, + description=description, + username=dev_username, + tags=tags_text, + ) + + def _build_footer(self, module_path: Optional[str]) -> str: + """Build message footer with download command.""" + clean_path = (module_path or "").replace("\\", "/") + return self.strings["found_footer"].format( + url=html.escape(self.formatter.strings.get("limokaurl", "")), + module_path=html.escape(clean_path), + prefix=self.formatter.prefix, + ) + + def _build_categories_text(self, filters: Dict[str, List[str]]) -> str: + """Build categories text if any are selected.""" + categories = filters.get("category", []) + if categories: + return "\n" + self.strings["selected_categories"].format( + categories=", ".join(html.escape(c) for c in categories) + ) + return "" + + def _paginate_commands(self, commands: List[str], max_length: int = 500) -> List[str]: + """Split commands into pages based on length.""" + if not commands: + return [""] + + commands_text = "".join(commands) + if len(commands_text) <= max_length: + return [commands_text] + + pages = [] + current_page = [] + current_length = 0 + + for cmd in commands: + if current_length + len(cmd) > max_length: + if current_page: + pages.append("".join(current_page)) + current_page = [] + current_length = 0 + current_page.append(cmd) + current_length += len(cmd) + + if current_page: + pages.append("".join(current_page)) + + return pages or [""] @loader.tds @@ -91,53 +344,53 @@ class Limoka(loader.Module): "name": "Limoka", "wait": ( "
Just wait\n" - "🔍 A search is underway among {count} modules " + "🔍 A search is underway among {count} modules " "for the query: {query}\n" "{fact}
" ), "found_header": ( - "
🔍 Found module {name} " + "
🔍 Found module {name} " "by query: {query}\n\n" - "ℹ️ Description: {description}\n" - "🧑‍💻 Developer: {username}\n\n" - "
🏷 Tags: {tags}
\n\n" + "ℹ️ Description: {description}\n" + "🧑‍💻 Developer: {username}\n\n" + "
🏷 Tags: {tags}
\n\n" ), "found_body": ("{commands}"), "found_footer": ( - "
\n🪄 {prefix}dlm {url}{module_path}
" + "
\n🪄 {prefix}dlm {url}{module_path}
" ), "caption_short": ( - "
🔍 {safe_name}\n" - "ℹ️ Description: {safe_desc}\n" - "🧑‍💻 Dev: {dev_username}\n" - "🪄 {prefix}dlm {module_path}
" + "
🔍 {safe_name}\n" + "ℹ️ Description: {safe_desc}\n" + "🧑‍💻 Dev: {dev_username}\n" + "🪄 {prefix}dlm {module_path}
" ), "command_template": "{emoji} {prefix}{command} — {description}\n", "inline_handler_template": "{inline_bot} {command} — {description}\n", "emojis": { - 1: "1️⃣", - 2: "2️⃣", - 3: "3️⃣", - 4: "4️⃣", - 5: "5️⃣", - 6: "6️⃣", - 7: "7️⃣", - 8: "8️⃣", - 9: "9️⃣", + 1: "1️⃣", + 2: "2️⃣", + 3: "3️⃣", + 4: "4️⃣", + 5: "5️⃣", + 6: "6️⃣", + 7: "7️⃣", + 8: "8️⃣", + 9: "9️⃣", }, - "404": "
Not found by query: {query}
", - "noargs": "
No args
", - "?": "
🔎 Request too short / not found
", + "404": "
Not found by query: {query}
", + "noargs": "
No args
", + "?": "
🔎 Request too short / not found
", "no_info": "
No information
", "facts": [ - "
🛡 The limoka catalog is carefully moderated!
", - "
🚀 Limoka performance allows you to search for modules quickly!
", + "
🛡 The limoka catalog is carefully moderated!
", + "
🚀 Limoka performance allows you to search for modules quickly!
", ], "inline404": "
Not found
", "inline?": "
Request too short / not found
", "inlinenoargs": "
Please, enter query
", "history": ( - "
🔎 Your search history:\n" + "
🔎 Your search history:\n" "{history}
" ), "filter_menu": "Choose filters", @@ -145,9 +398,9 @@ class Limoka(loader.Module): "apply_filters": "✅ Apply Filters", "clear_filters": "🗑 Clear Filters", "back_to_results": "🔙 Back to Results", - "empty_history": "
🔎 Your search history is empty!
", + "empty_history": "
🔎 Your search history is empty!
", "enter_query": "🔍 Enter new search query:", - "global_search": "
🔍 Global search for {query} — found {count} modules
", + "global_search": "
🔍 Global search for {query} — found {count} modules
", "change_query": "🔍 Change query", "no_modules": "
No modules available.
", "filter_title": "🏷 Filters", @@ -166,15 +419,15 @@ class Limoka(loader.Module): "inline_short_query": "
❌ Query too short (min 2 chars)
", "inline_switch_pm": "💬 Open in chat", "inline_switch_pm_text": "🔍 Results for: {query}", - "inline_start_message": "
🔍 Limoka Search\nType module name or keyword
", + "inline_start_message": "
🔍 Limoka Search\nType module name or keyword
", "first_page": "
This is the first page!
", "last_page": "
This is the last page!
", "display_error": "
Error displaying module. Please try again.
", "error_occurred": "
An error occurred. Please try again.
", - "start_search_form": "
🔍 Limoka Search\nEnter your query to search for modules:
", - "global_search_form": "
🔍 Global Search\nEnter your query to search ALL modules without filters:
", - "history_cleared": "
🧹 Search history cleared!
", - "invalid_history_arg": "
Invalid argument for history command. Use:\n.lshistory - show history\n.lshistory clear - clear history
", + "start_search_form": "
🔍 Limoka Search\nEnter your query to search for modules:
", + "global_search_form": "
🔍 Global Search\nEnter your query to search ALL modules without filters:
", + "history_cleared": "
🧹 Search history cleared!
", + "invalid_history_arg": "
Invalid argument for history command. Use:\n.lshistory - show history\n.lshistory clear - clear history
", "close": "❌ Close", "watcher_no_tag": "
❌ Invalid message format. No #limoka tag found.
", "watcher_invalid_format": "
❌ Invalid format. Expected: #limoka:path:signature
", @@ -197,30 +450,31 @@ class Limoka(loader.Module): "If error persists again, report to developers
" ), "body_page": "Commands", + "limokaurl": "https://raw.githubusercontent.com/MuRuLOSE/limoka/refs/heads/main/" } strings_ru = { "name": "Limoka", "wait": ( "
Подождите\n" - "🔍 Идёт поиск среди {count} модулей по запросу: {query}\n" + "🔍 Идёт поиск среди {count} модулей по запросу: {query}\n" "{fact}
" ), "found_header": ( - "
🔍 Найден модуль {name} " + "
🔍 Найден модуль {name} " "по запросу: {query}
\n\n" - "
ℹ️ Описание: {description}
\n" - "
🧑‍💻 Разработчик: {username}
\n\n" - "
🏷 Теги: {tags}
\n\n" + "
ℹ️ Описание: {description}
\n" + "
🧑‍💻 Разработчик: {username}
\n\n" + "
🏷 Теги: {tags}
\n\n" ), "found_body": ("{commands}"), "found_footer": ( - "\n
🪄 {prefix}dlm {url}{module_path}
" + "\n
🪄 {prefix}dlm {url}{module_path}
" ), "caption_short": ( - "
🔍 {safe_name}\n" - "ℹ️ Описание: {safe_desc}\n" - "🧑‍💻 Разработчик: {dev_username}\n" - "🪄 {prefix}dlm {module_path}
" + "
🔍 {safe_name}\n" + "ℹ️ Описание: {safe_desc}\n" + "🧑‍💻 Разработчик: {dev_username}\n" + "🪄 {prefix}dlm {module_path}
" ), "command_template": "
{emoji} {prefix}{command} — {description}
\n", "inline_handler_template": "{inline_bot} {command} — {description}\n", @@ -236,15 +490,15 @@ class Limoka(loader.Module): 9: "9️⃣", 10: "🔟" }, - "404": "
Не найдено по запросу: {query}
", - "noargs": "
Нет аргументов
", - "?": "
🔎 Запрос слишком короткий / не найден
", + "404": "
Не найдено по запросу: {query}
", + "noargs": "
Нет аргументов
", + "?": "
🔎 Запрос слишком короткий / не найден
", "no_info": "
Нет информации
", "facts": [ - "
🛡 Каталог Limoka тщательно модерируется!
", - "
🚀 Limoka позволяет искать модули с невероятной скоростью!
", + "
🛡 Каталог Limoka тщательно модерируется!
", + "
🚀 Limoka позволяет искать модули с невероятной скоростью!
", ( - "
🔎 Limoka имеет лучший поиск*!\n" + "
🔎 Limoka имеет лучший поиск*!\n" "* В сравнении с предыдущей версией Limoka
" ), ], @@ -252,7 +506,7 @@ class Limoka(loader.Module): "inline?": "
Запрос слишком короткий / не найден
", "inlinenoargs": "
Введите запрос
", "history": ( - "
🔎 История поиска:\n" + "
🔎 История поиска:\n" "{history}
" ), "filter_menu": "Выберите фильтры", @@ -260,9 +514,9 @@ class Limoka(loader.Module): "apply_filters": "✅ Применить фильтры", "clear_filters": "🗑 Очистить фильтры", "back_to_results": "🔙 Вернуться к результатам", - "empty_history": "
🔎 История поиска пуста!
", + "empty_history": "
🔎 История поиска пуста!
", "enter_query": "🔍 Введите новый поисковый запрос:", - "global_search": "
🔍 Глобальный поиск по {query} — найдено {count} модулей
", + "global_search": "
🔍 Глобальный поиск по {query} — найдено {count} модулей
", "change_query": "🔍 Изменить запрос", "no_modules": "
Модули недоступны.
", "filter_title": "🏷 Фильтры", @@ -281,15 +535,15 @@ class Limoka(loader.Module): "inline_short_query": "
❌ Запрос слишком короткий (мин. 2 символа)
", "inline_switch_pm": "💬 Открыть в чате", "inline_switch_pm_text": "🔍 Результаты для: {query}", - "inline_start_message": "
🔍 Limoka Поиск\nВведите название модуля или ключевое слово
", + "inline_start_message": "
🔍 Limoka Поиск\nВведите название модуля или ключевое слово
", "first_page": "
Это первая страница!
", "last_page": "
Это последняя страница!
", "display_error": "
Ошибка отображения модуля. Пожалуйста, попробуйте еще раз.
", "error_occurred": "
Произошла ошибка. Пожалуйста, попробуйте еще раз.
", - "start_search_form": "
🔍 Limoka Поиск\nВведите ваш запрос для поиска модулей:
", - "global_search_form": "
🔍 Глобальный Поиск\nВведите запрос для поиска ВСЕХ модулей без фильтров:
", - "history_cleared": "
🧹 История поиска очищена!
", - "invalid_history_arg": "
Неверный аргумент для команды истории. Используйте:\n.lshistory - показать историю\n.lshistory clear - очистить историю
", + "start_search_form": "
🔍 Limoka Поиск\nВведите ваш запрос для поиска модулей:
", + "global_search_form": "
🔍 Глобальный Поиск\nВведите запрос для поиска ВСЕХ модулей без фильтров:
", + "history_cleared": "
🧹 История поиска очищена!
", + "invalid_history_arg": "
Неверный аргумент для команды истории. Используйте:\n.lshistory - показать историю\n.lshistory clear - очистить историю
", "close": "❌ Закрыть", "watcher_no_tag": "
❌ Неверный формат сообщения. Тег #limoka не найден.
", "watcher_invalid_format": "
❌ Неверный формат. Ожидается: #limoka:path:signature
", @@ -316,7 +570,7 @@ class Limoka(loader.Module): } def __init__(self): - self.api = LimokaAPI() + self.api = APIClient() self.config = loader.ModuleConfig( loader.ConfigValue( "limokaurl", @@ -342,15 +596,13 @@ class Limoka(loader.Module): self._bot_username = "limoka_bbot" self._base_url = self.config["limokaurl"] - # Search session states self.SEARCH_STATES = { - "no_banner": "no_banner", # 404 - No banner - "global_search": "global_search", # Global search - "not_found": "not_found", # Not found (module) - "filter_select": "filter_select", # Select categories (filters) + "no_banner": "no_banner", + "global_search": "global_search", + "not_found": "not_found", + "filter_select": "filter_select", } - # State banners self.state_banners = { "no_banner": "https://raw.githubusercontent.com/MuRuLOSE/hikka-assets/refs/heads/main/Limoka%20-%20No%20banner.png", "global_search": "https://raw.githubusercontent.com/MuRuLOSE/hikka-assets/main/Limoka%20-%20Global%20Search.png", @@ -359,28 +611,8 @@ class Limoka(loader.Module): } def _filter_newbies(self, modules: Dict[str, Any]) -> Dict[str, Any]: - """Filter out modules which belong to repositories tagged as 'newbie'. - Returns the original dict when the feature is disabled or repositories - metadata is not available. - """ - try: - if not self.config.get("filter_newbies_modules"): - return modules - except Exception: - return modules - - if not getattr(self, "repositories", None): - return modules - - filtered: Dict[str, Any] = {} - for path, info in modules.items(): - repo_key = "/".join(path.split("/")[:2]) if "/" in path else path - repo = self.repositories.get(repo_key) - tags = repo.get("tags", []) if repo else [] - if "newbie" in tags: - continue - filtered[path] = info - return filtered + """[DEPRECATED] Use ModuleRepository.apply_newbie_filter instead.""" + return self.repository.apply_newbie_filter(self.config.get("filter_newbies_modules", False)) def _create_search_session( self, @@ -415,9 +647,10 @@ class Limoka(loader.Module): return self.state_banners.get(state) async def client_ready(self, client, db): + """Initialize client and load data.""" self.client: TelegramClient = client self.db = db - self.api = LimokaAPI() + self.api = APIClient() self.schema = Schema( title=TEXT(stored=True), path=ID(stored=True), content=TEXT(stored=True) ) @@ -426,19 +659,19 @@ class Limoka(loader.Module): self.ix = create_in("limoka_search", self.schema) else: self.ix = open_dir("limoka_search") + self._history = self.pointer("history", []) - self.modules = (await self.api.fetch_json(self._base_url, "modules.json")).get( - "modules", {} - ) - raw = (await self.api.fetch_json(self._base_url, "repositories.json")).get( - "repositories", [] - ) - self.repositories = {repo["url"]: repo for repo in raw} - # Apply newbie filter if enabled - try: - self.modules = self._filter_newbies(self.modules) - except Exception: - pass + raw_modules = (await self.api.fetch_json(self._base_url, "modules.json")).get("modules", {}) + raw_repos = (await self.api.fetch_json(self._base_url, "repositories.json")).get("repositories", []) + + repositories = {repo["url"]: repo for repo in raw_repos} + self.repository = ModuleRepository(raw_modules, repositories) + self.modules = self.repository.apply_newbie_filter(self.config["filter_newbies_modules"]) + + self._userbot_bot_username = (await self.inline.bot.get_me()).username + self.formatter = CommandFormatter(self.strings, self._userbot_bot_username, self.get_prefix()) + self.content_builder = ModuleContentBuilder(self.strings, self.formatter, self.repository) + self._service_bot_id = (await self.client.get_entity(self._bot_username)).id loop = asyncio.get_running_loop() @@ -448,9 +681,7 @@ class Limoka(loader.Module): try: message = await self.client.get_messages(self._bot_username, limit=1) if not message: - message = await self.client.send_message( - self._bot_username, "/start" - ) + message = await self.client.send_message(self._bot_username, "/start") await message.delete() await self.client( functions.messages.DeleteHistoryRequest( @@ -460,41 +691,35 @@ class Limoka(loader.Module): revoke=True, ) ) - except YouBlockedUserError: logger.warning( - f"Please unblock {self._bot_username} to enable external installation feature. Or disable external_install_allowed in Limoka settings to get rid of this message." + f"Please unblock {self._bot_username} to enable external installation feature." ) - self._userbot_bot_username = (await self.inline.bot.get_me()).username @loader.loop(interval=3600) async def _update_modules_loop(self): - self.modules = await self.api.fetch_json(self._base_url, "modules.json") - # Re-apply newbie filter after modules refresh - try: - self.modules = self._filter_newbies(self.modules) - except Exception: - pass + """Periodically update modules list and rebuild index.""" + raw_modules = await self.api.fetch_json(self._base_url, "modules.json") + self.modules = self.repository.apply_newbie_filter( + self.config.get("filter_newbies_modules", False) + ) await self._update_index() async def _update_index(self): + """Rebuild full-text search index from modules.""" try: writer = self.ix.writer() - modules_to_index = self._filter_newbies(self.modules) - for module_path, module_data in modules_to_index.items(): + for module_path, module_data in self.modules.items(): writer.add_document( title=module_data["name"], path=module_path, content=module_data["name"] + " " + ( - module_data.get("description") - or "" + module_data.get("description", "") + " " + ( - (module_data.get("meta").get("developer") or "") - if module_data.get("meta") - else "" + (module_data.get("meta", {}).get("developer") or "") ) ), ) @@ -508,17 +733,11 @@ class Limoka(loader.Module): writer.commit() except LockError: folder = os.path.join(BASE_DIR, "limoka_search") - if os.path.commonpath([folder, BASE_DIR]) == BASE_DIR and os.path.exists(folder): shutil.rmtree(folder) await self._update_index() else: - logger.error( - ( - f"Skipping unsafe rmtree for {folder}. Please, report this to developer. ", - f"Debug info: folder={folder}, base_dir={BASE_DIR}, common_path={os.path.commonpath([folder, BASE_DIR])}, exists={os.path.exists(folder)}", - ) - ) + logger.error(f"Skipping unsafe rmtree for {folder}") async def _validate_url(self, url: str) -> Optional[str]: if not url or url in self._invalid_banners: @@ -578,37 +797,8 @@ class Limoka(loader.Module): return self.db.get(f"{userbot}.translations", "lang") def generate_commands(self, module_info, lang: str = "en"): - commands = [] - for i, cmd in enumerate(module_info.get("new_commands", []), 1): - name = cmd.get("name", "") - desc_map = cmd.get("description", {}) - if i <= 10: - emoji = self.strings["emojis"].get(i, "") - else: - emoji = "" - for digit in str(i): - emoji += self.strings["emojis"].get(int(digit), "") - desc = _get_lang_value(desc_map, lang) or self.strings["no_info"] - commands.append( - self.strings["command_template"].format( - prefix=self.get_prefix(), - command=html.escape(name), - emoji=emoji, - description=html.escape(desc), - ) - ) - for i, handler in enumerate(module_info.get("inline_handlers", []), 1): - name = handler.get("name", "") - desc_map = handler.get("description", {}) - desc = _get_lang_value(desc_map, lang) or self.strings["no_info"] - commands.append( - self.strings["inline_handler_template"].format( - inline_bot=self._userbot_bot_username, - command=html.escape(name), - description=html.escape(desc), - ) - ) - return commands + """[DEPRECATED] Use CommandFormatter.format_commands instead.""" + return self.formatter.format_commands(module_info, lang) def _format_module_content( self, @@ -619,81 +809,10 @@ class Limoka(loader.Module): module_path: Optional[str] = None, lang: str = "en", ) -> tuple: - name = html.escape(module_info.get("name") or self.strings["no_info"]) - cls_doc = module_info.get("cls_doc", {}) - description = html.escape( - _get_lang_value(cls_doc, lang) - or _get_lang_value(module_info.get("description", ""), lang) - or self.strings["no_info"] + """[DEPRECATED] Use ModuleContentBuilder.build_content instead.""" + return self.content_builder.build_content( + module_info, query, filters, include_categories, module_path, lang ) - dev_username = html.escape(module_info["meta"].get("developer") or "Unknown") - raw_path = ( - module_path if module_path is not None else module_info.get("path", "") - ) - clean_module_path = (raw_path or "").replace("\\", "/") - commands = self.generate_commands(module_info, lang) - categories_text = "" - if include_categories: - categories = filters.get("category", []) - if categories: - categories_text = "\n" + self.strings["selected_categories"].format( - categories=", ".join(html.escape(c) for c in categories) - ) - if len(description) > 300: - description = description[:297] + "…" - repo_key = ( - "/".join(module_path.split("/")[:2]) if "/" in module_path else module_path - ) - tags_list = [] - for x in self.repositories: - if x.replace("https://github.com/", "") == repo_key: - tags_list = self.repositories.get(x, {}).get("tags", []) - break - tags_text = ", ".join(self.strings["tags"].get(tag, tag) for tag in tags_list) - - header_template = self.strings["found_header"] - if not tags_text: - header_template = header_template.replace( - "
🏷 Tags: {tags}
\n\n", - "" - ) - header_template = header_template.replace( - "
🏷 Теги: {tags}
\n\n", - "" - ) - - header = header_template.format( - query=html.escape(query), - name=name, - description=description, - username=dev_username, - tags=tags_text, - ) - commands_text = "".join(commands) - if len(commands_text) <= 500: - body_pages = [commands_text] if commands_text else [""] - else: - body_pages = [] - current_page = [] - current_length = 0 - for cmd in commands: - if current_length + len(cmd) > 500: - if current_page: - body_pages.append("".join(current_page)) - current_page = [] - current_length = 0 - current_page.append(cmd) - current_length += len(cmd) - if current_page: - body_pages.append("".join(current_page)) - if not body_pages: - body_pages = [""] - footer = self.strings["found_footer"].format( - url=html.escape(self.config["limokaurl"]), - module_path=html.escape(clean_module_path), - prefix=html.escape(self.get_prefix()), - ) - return header, body_pages, footer, categories_text def _build_navigation_markup(self, session: Dict[str, Any]) -> list: result = session["results"] @@ -849,17 +968,21 @@ class Limoka(loader.Module): logger.error("message_or_call is None in _safe_display") return if isinstance(message_or_call, Message): + # WORKAROUND: Telegram doesn't show premium emojis in first call, + # until it's edited. Firstly sending something, than fixing. if photo is not None: - await self.inline.form( - text=text, + msg = await self.inline.form( + text="🍋", message=message_or_call, - reply_markup=markup, + reply_markup=[[{"text": "🍋", "action": "close"}]], photo=photo, ) + await msg.edit(text=text, reply_markup=markup, photo=photo) else: - await self.inline.form( - text=text, message=message_or_call, reply_markup=markup + msg = await self.inline.form( + text="🍋", message=message_or_call, reply_markup=[[{"text": "🍋", "action": "close"}]] ) + await msg.edit(text=text, reply_markup=markup) else: if photo is not None: await message_or_call.edit( @@ -998,13 +1121,11 @@ class Limoka(loader.Module): ) async def _select_category(self, call: InlineCall, session: Dict[str, Any]): + """Display category selection menu.""" query = session["query"] current_filters = session["filters"] - all_categories = set() - for module_data in self.modules.values(): - all_categories.update(module_data.get("category", ["No category"])) - categories = sorted(all_categories) + categories = self.repository.get_all_categories() if not categories: await call.edit( self.strings["no_categories"], @@ -1031,8 +1152,6 @@ class Limoka(loader.Module): if cat in selected_categories: button_text = "✅ " + button_text - # Create new session with updated filters - new_session = session.copy() row.append( { "text": button_text, @@ -1089,12 +1208,13 @@ class Limoka(loader.Module): async def _show_results( self, call: InlineCall, session: Dict[str, Any], from_filters: bool = False ): + """Display search results with optional category filtering.""" query = session["query"] filters = session["filters"] - searcher = Search(query.lower(), self.ix) + searcher = SearchIndex(query.lower(), self.ix) try: - result = searcher.search_module() + result = searcher.search() except Exception: await call.edit(self.strings["?"], reply_markup=[]) return @@ -1154,7 +1274,6 @@ class Limoka(loader.Module): module_path = filtered_result[0] module_info = self.modules[module_path] - # Create session for displaying module display_session = self._create_search_session( state=self.SEARCH_STATES["global_search"], query=query, @@ -1205,9 +1324,8 @@ class Limoka(loader.Module): ], ) return - searcher = Search(query.lower(), self.ix) try: - result = searcher.search_module() + result = SearchIndex(query.lower(), self.ix).search() except Exception: await call.edit( self.strings["?"], @@ -1289,9 +1407,8 @@ class Limoka(loader.Module): async def _show_global_results(self, call: InlineCall, session: Dict[str, Any]): query = session["query"] - searcher = Search(query.lower(), self.ix) try: - result = searcher.search_module() + result = SearchIndex(query.lower(), self.ix).search() except Exception: await call.edit(self.strings["?"], reply_markup=[]) return @@ -1407,11 +1524,35 @@ class Limoka(loader.Module): markup.append( [{"text": self.strings.get("close", "❌ Close"), "action": "close", "style": "danger"}] ) - await self.inline.form( - text=self.strings["start_search_form"], + # WORKAROUND: Telegram doesnt show emojis in inline forms as expected, + # until the form is edited. Sending with lemon, then fixing. + workaround_markup = [ + [ + { + "text": "🍋 Enter query", + "input": "Enter query", + "handler": self._enter_query_handler, + } + ], + [ + { + "text": "🍋 Results", + "callback": self._show_global_form, + "args": (message,), + } + ], + ] + workaround_markup.append( + [{"text": "🍋 Close", "action": "close", "style": "danger"}] + ) + msg = await self.inline.form( + text="🍋 Limoka\n🍋 Enter query", message=message, + reply_markup=workaround_markup, + ) + await msg.edit( + text=self.strings["start_search_form"], reply_markup=markup, - photo=self._get_banner_for_state("global_search"), ) return history = self.get("history", []) @@ -1427,9 +1568,8 @@ class Limoka(loader.Module): query=args, ), ) - searcher = Search(args.lower(), self.ix) try: - result = searcher.search_module() + result = SearchIndex(args.lower(), self.ix).search() except Exception: return await utils.answer(message, self.strings["?"]) if not result: @@ -1505,9 +1645,8 @@ class Limoka(loader.Module): ], ) return - searcher = Search(query.lower(), self.ix) try: - result = searcher.search_module() + result = SearchIndex(query.lower(), self.ix).search() except Exception: await call.edit( self.strings["?"],