From 504d1f32e96845590ae30af09924e1783634917f Mon Sep 17 00:00:00 2001 From: Macsim Date: Fri, 30 Jan 2026 14:24:43 +0300 Subject: [PATCH] Limoka 1.4.0 --- .github/workflows/ci.yml | 82 ++-- Limoka.py | 858 +++++++++++++++++++++------------------ backup.py | 74 ++++ parse.py | 478 +++++++++++++++------- repositories.json | 141 ++++--- update_diffs.py | 193 +++++++++ 6 files changed, 1199 insertions(+), 627 deletions(-) create mode 100644 backup.py create mode 100644 update_diffs.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b8f6d93..0405e92 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -206,61 +206,53 @@ jobs: echo "Branch ${{ env.BRANCH_NAME }} does not exist in remote repository, skipping PR creation." fi + notify_diffs: + runs-on: ubuntu-latest + if: | + (github.event_name == 'push' && github.ref == 'refs/heads/main') || + (github.event_name == 'pull_request' && github.event.action == 'closed' && github.event.pull_request.merged == true) + needs: parse + steps: + - name: Checkout repository + uses: actions/checkout@v4 + with: + fetch-depth: ${{ env.GIT_DEPTH }} + - name: Configure Git for github-actions[bot] + run: | + git config --global user.email "github-actions[bot]@users.noreply.github.com" + git config --global user.name "github-actions[bot]" + - name: Set up Python 3.9 + uses: actions/setup-python@v5 + with: + python-version: '3.9' + - name: Install Python dependencies + run: pip install aiohttp + - name: Send module diffs to channel + env: + TELEGRAM_BOT_TOKEN: ${{ secrets.TELEGRAM_BOT_TOKEN }} + TELEGRAM_CHAT_ID: ${{ secrets.TELEGRAM_CHAT_ID_UPDATE }} + run: | + git fetch origin main + python3 update_diffs.py --token ${{ secrets.TELEGRAM_BOT_TOKEN }} --chat_id ${{ secrets.TELEGRAM_CHAT_ID_UPDATE }} --base_commit HEAD~1 + backup: runs-on: ubuntu-latest if: github.ref == 'refs/heads/main' || github.event_name == 'workflow_dispatch' needs: parse steps: - - name: Configure Git for github-actions[bot] - run: | - git config --global user.email "github-actions[bot]@users.noreply.github.com" - git config --global user.name "github-actions[bot]" - git config --global --list - name: Checkout repository uses: actions/checkout@v4 with: fetch-depth: ${{ env.GIT_DEPTH }} - - name: Create and send backup to Telegram + - name: Set up Python 3.9 + uses: actions/setup-python@v5 + with: + python-version: '3.9' + - name: Install Python dependencies + run: pip install aiohttp + - name: Run backup script env: TELEGRAM_BOT_TOKEN: ${{ secrets.TELEGRAM_BOT_TOKEN }} TELEGRAM_CHAT_ID: ${{ secrets.TELEGRAM_CHAT_ID }} run: | - echo "Creating .zip file of the repository with maximum compression..." - git archive --format=zip --output=repository-original.zip HEAD - zip -9 repository.zip repository-original.zip - rm repository-original.zip - echo "File size of the created .zip file:" - du -sh repository.zip - echo "Splitting the .zip file into 8 parts..." - split -b 49M repository.zip repository-part- - echo "Files created after split:" - ls repository-part-* - COMMIT_MESSAGE="$(git log -1 --pretty=%B)" - COMMIT_DATE="$(date --date="$(git log -1 --pretty=%ci)" +'%Y-%m-%d %H:%M:%S')" - COMMIT_HASH="$(git rev-parse --short=6 HEAD)" - COMMIT_URL="https://${REPO_URL}/commit/$(git rev-parse HEAD)" - MESSAGE="Commit Date: $COMMIT_DATE, Commit Message: $COMMIT_MESSAGE, Commit Hash: [\`$COMMIT_HASH\`]($COMMIT_URL)" - - echo "Sending .zip file parts to Telegram..." - - FIRST_PART=true - for part in $(ls repository-part-* | sort); do - if $FIRST_PART; then - TELEGRAM_API_URL="https://api.telegram.org/bot$TELEGRAM_BOT_TOKEN/sendDocument" - echo "Sending first file to Telegram: $TELEGRAM_API_URL" - curl -X POST "$TELEGRAM_API_URL" \ - -F chat_id=$TELEGRAM_CHAT_ID \ - -F document=@$part \ - -F caption="$MESSAGE" \ - -F parse_mode="Markdown" - FIRST_PART=false - else - TELEGRAM_API_URL="https://api.telegram.org/bot$TELEGRAM_BOT_TOKEN/sendDocument" - echo "Sending file to Telegram: $TELEGRAM_API_URL" - curl -X POST "$TELEGRAM_API_URL" \ - -F chat_id=$TELEGRAM_CHAT_ID \ - -F document=@$part - fi - done - - echo "Files sent to Telegram successfully!" + python backup.py --token ${{ secrets.TELEGRAM_BOT_TOKEN }} --chat_id ${{ secrets.TELEGRAM_CHAT_ID }} diff --git a/Limoka.py b/Limoka.py index abd819f..586f4cb 100644 --- a/Limoka.py +++ b/Limoka.py @@ -1,65 +1,38 @@ # meta developer: @limokanews # requires: whoosh cryptography -# Limoka search module. - -# This module loads a remote `modules.json`, builds a Whoosh index and -# exposes inline and chat commands to search and display module -# information. It handles remote banner validation and falls back to an -# external PNG hosted in the repository when a module banner is missing. -# The fallback is provided as a URL (`self._fallback_banner_url`). Depending -# on the client library the `photo` parameter may accept a URL, a file -# path or a file-like object; this implementation prefers using the -# external URL for the fallback. - -# Note: Expected `modules.json` record format: - -# { -# "path/to/module.py": { -# "name": "ModuleName", -# "description": "Short description", -# "meta": {"banner": "https://.../image.png", "developer": "@dev"}, -# "commands": [{"cmd1": "desc1"}, {"cmd2": "desc2"}], -# "category": ["fun", "tools"] -# } -# } -# Whoosh index in `userbotFolder/limoka_search/index`. - from whoosh.index import create_in, open_dir from whoosh.fields import Schema, TEXT, ID from whoosh.qparser import QueryParser, OrGroup from whoosh.query import FuzzyTerm, Wildcard - import aiohttp import random import logging import os import html import json - import re - import asyncio - from typing import Union, List, Dict, Any, Optional import hashlib - from telethon.types import Message from telethon.errors.rpcerrorlist import WebpageMediaEmptyError - +from telethon import TelegramClient +from telethon.errors.rpcerrorlist import YouBlockedUserError try: from aiogram.utils.exceptions import BadRequest except ImportError: from aiogram.exceptions import TelegramBadRequest as BadRequest - from .. import utils, loader from ..types import InlineCall - logger = logging.getLogger("Limoka") +__version__ = (1, 3, 1) -__version__ = (1, 3, 0) - +def _get_lang_value(data: Dict[str, Any], lang: str) -> str: + if not isinstance(data, dict): + return str(data) if data else "" + return data.get(lang, data.get("default", data.get("en", ""))) class Search: def __init__(self, query, ix): @@ -75,47 +48,51 @@ class Search: query = parser.parse(self.query) wildcard_query = Wildcard("content", f"*{self.query}*") fuzzy_query = FuzzyTerm("content", self.query, maxdist=2, prefixlength=1) - for search_query in [query, wildcard_query, fuzzy_query]: results = searcher.search(search_query) if results: return list(set(result["path"] for result in results)) - return [] - + return [] class LimokaAPI: - async def get_all_modules(self, url): + async def fetch_json(self, base_url, path): + url = f"{base_url}{path}" async with aiohttp.ClientSession() as session: async with session.get(url) as response: return json.loads(await response.text()) - @loader.tds class Limoka(loader.Module): """Modules are now in one place with easy searching!""" - strings = { "name": "Limoka", "wait": ( "Just wait\n" "🔍 A search is underway among {count} modules " - "for the query: {query}\n\n{fact}" + "for the query: {query}\n" + "{fact}" ), - "found": ( + "found_header": ( "🔍 Found module {name} " "by query: {query}\n\n" "ℹ️ Description: {description}\n" "🧑‍💻 Developer: {username}\n\n" - "{commands}\n" - "🪄 {prefix}dlm {url}{module_path}" + "🏷 Tags: {tags}\n\n" + ), + "found_body": ( + "{commands}" + ), + "found_footer": ( + "\n🪄 {prefix}dlm {url}{module_path}" ), "caption_short": ( "🔍 {safe_name}\n" "ℹ️ Description: {safe_desc}\n" - "🧑‍💻 Dev: {dev_username}\n\n" + "🧑‍💻 Dev: {dev_username}\n" "🪄 {prefix}dlm {module_path}" ), "command_template": "{emoji} {prefix}{command} — {description}\n", + "inline_handler_template": "{inline_bot} {command} — {description}\n", "emojis": { 1: "1️⃣", 2: "2️⃣", @@ -168,13 +145,13 @@ class Limoka(loader.Module): "inline_short_query": "❌ Query too short (min 2 chars)", "inline_switch_pm": "💬 Open in chat", "inline_switch_pm_text": "🔍 Results for: {query}", - "inline_start_message": "🔍 Limoka Search\n\nType module name or keyword", + "inline_start_message": "🔍 Limoka Search\nType module name or keyword", "first_page": "This is the first page!", "last_page": "This is the last page!", "display_error": "Error displaying module. Please try again.", "error_occurred": "An error occurred. Please try again.", - "start_search_form": "🔍 Limoka Search\n\nEnter your query to search for modules:", - "global_search_form": "🔍 Global Search\n\nEnter your query to search ALL modules without filters:", + "start_search_form": "🔍 Limoka Search\nEnter your query to search for modules:", + "global_search_form": "🔍 Global Search\nEnter your query to search ALL modules without filters:", "history_cleared": "🧹 Search history cleared!", "invalid_history_arg": " Invalid argument for history command. Use:\n.lshistory - show history\n.lshistory clear - clear history", "close": "❌ Close", @@ -184,30 +161,42 @@ class Limoka(loader.Module): "watcher_loader_missing": "❌ Loader module not found.", "watcher_module_not_found": "❌ Module not found in Limoka database: {path}", "watcher_critical": "❌ Critical error: {error}", + "tags": { + "herokutrusted": "Heroku Trusted", + "hikkatrusted": "Hikka Trusted", + "nonactive": "Non-Active Repository", + "nonlongermaintained": "No Longer Maintained Repository", + "newbie": "Newbie" + } } - strings_ru = { "name": "Limoka", "wait": ( "Подождите\n" - "🔍 Идёт поиск среди {count} модулей по запросу: {query}\n\n" + "🔍 Идёт поиск среди {count} модулей по запросу: {query}\n" "{fact}" ), - "found": ( + "found_header": ( "🔍 Найден модуль {name} " "по запросу: {query}\n\n" "ℹ️ Описание: {description}\n" "🧑‍💻 Разработчик: {username}\n\n" - "{commands}\n" - "🪄 {prefix}dlm {url}{module_path}" + "🏷 Теги: {tags}\n\n" + ), + "found_body": ( + "{commands}" + ), + "found_footer": ( + "\n🪄 {prefix}dlm {url}{module_path}" ), "caption_short": ( "🔍 {safe_name}\n" "ℹ️ Описание: {safe_desc}\n" - "🧑‍💻 Разработчик: {dev_username}\n\n" + "🧑‍💻 Разработчик: {dev_username}\n" "🪄 {prefix}dlm {module_path}" ), "command_template": "{emoji} {prefix}{command} — {description}\n", + "inline_handler_template": "{inline_bot} {command} — {description}\n", "emojis": { 1: "1️⃣", 2: "2️⃣", @@ -227,8 +216,8 @@ class Limoka(loader.Module): "🛡 Каталог Limoka тщательно модерируется!", "🚀 Limoka позволяет искать модули с невероятной скоростью!", ( - "🔎 Limoka имеет лучший поиск*!" - "\n * В сравнении с предыдущей версией Limoka" + "🔎 Limoka имеет лучший поиск*!\n" + "* В сравнении с предыдущей версией Limoka" ) ], "inline404": "Не найдено", @@ -264,13 +253,13 @@ class Limoka(loader.Module): "inline_short_query": "❌ Запрос слишком короткий (мин. 2 символа)", "inline_switch_pm": "💬 Открыть в чате", "inline_switch_pm_text": "🔍 Результаты для: {query}", - "inline_start_message": "🔍 Limoka Поиск\n\nВведите название модуля или ключевое слово", + "inline_start_message": "🔍 Limoka Поиск\nВведите название модуля или ключевое слово", "first_page": "Это первая страница!", "last_page": "Это последняя страница!", "display_error": "Ошибка отображения модуля. Пожалуйста, попробуйте еще раз.", "error_occurred": "Произошла ошибка. Пожалуйста, попробуйте еще раз.", - "start_search_form": "🔍 Limoka Поиск\n\nВведите ваш запрос для поиска модулей:", - "global_search_form": "🔍 Глобальный Поиск\n\nВведите запрос для поиска ВСЕХ модулей без фильтров:", + "start_search_form": "🔍 Limoka Поиск\nВведите ваш запрос для поиска модулей:", + "global_search_form": "🔍 Глобальный Поиск\nВведите запрос для поиска ВСЕХ модулей без фильтров:", "history_cleared": "🧹 История поиска очищена!", "invalid_history_arg": " Неверный аргумент для команды истории. Используйте:\n.lshistory - показать историю\n.lshistory clear - очистить историю", "close": "❌ Закрыть", @@ -280,6 +269,13 @@ class Limoka(loader.Module): "watcher_loader_missing": "❌ Модуль загрузчика не найден.", "watcher_module_not_found": "❌ Модуль не найден в базе Limoka: {path}", "watcher_critical": "❌ Критическая ошибка: {error}", + "tags": { + "herokutrusted": "Heroku Trusted", + "hikkatrusted": "Hikka Trusted", + "nonactive": "Неактивный репозиторий", + "nonlongermaintained": "Неподдерживаемый репозиторий", + "newbie": "Новичок" + }, "_cls_doc": "Модули теперь в одном месте с простым и удобным поиском!", } @@ -297,44 +293,153 @@ class Limoka(loader.Module): True, lambda: "If enabled, module installation can be handled via external Limoka bot (@limoka_bbot) for better reliability.", validator=loader.validators.Boolean(), - ) + ), + loader.ConfigValue( + "filter_newbies_modules", + False, + lambda: "If enabled, modules from developers with newbies tag will be not shown.", + validator=loader.validators.Boolean(), + ), ) self.name = self.strings["name"] self._invalid_banners = set() - # Also keep a convenient external fallback URL for plain search display - # (used when no valid banner is available and no filters are applied). - self._fallback_banner_url = "https://raw.githubusercontent.com/MuRuLOSE/limoka/refs/heads/main/assets/limoka404.png" + self._service_bot_id = 8581621390 + self._base_url = self.config["limokaurl"] + + # Search session states + self.SEARCH_STATES = { + "no_banner": "no_banner", # 404 - Нет баннера + "global_search": "global_search", # Глобальный поиск + "not_found": "not_found", # Не найдено (модуль) + "filter_select": "filter_select", # Выбор категорий (фильтров) + } + + # State banners - placeholders for now + self.state_banners = { + "no_banner": "https://raw.githubusercontent.com/MuRuLOSE/hikka-assets/refs/heads/main/Limoka%20-%20No%20banner.png", + "global_search": "https://raw.githubusercontent.com/MuRuLOSE/hikka-assets/main/Limoka%20-%20Global%20Search.png", + "not_found": "https://raw.githubusercontent.com/MuRuLOSE/hikka-assets/main/Limoka%20-%20Not%20Found.png", + "filter_select": "https://raw.githubusercontent.com/MuRuLOSE/hikka-assets/main/Limoka%20-%20Categories.png", + } + + def _filter_newbies(self, modules: Dict[str, Any]) -> Dict[str, Any]: + """Filter out modules which belong to repositories tagged as 'newbie'. + Returns the original dict when the feature is disabled or repositories + metadata is not available. + """ + try: + if not self.config.get("filter_newbies_modules"): + return modules + except Exception: + return modules + + if not getattr(self, "repositories", None): + return modules + + filtered: Dict[str, Any] = {} + for path, info in modules.items(): + repo_key = "/".join(path.split("/")[:2]) if "/" in path else path + repo = self.repositories.get(repo_key) + tags = repo.get("tags", []) if repo else [] + if "newbie" in tags: + continue + filtered[path] = info + return filtered + + def _create_search_session( + self, + state: str, + query: str = "", + filters: Optional[Dict[str, List[str]]] = None, + results: Optional[List[str]] = None, + current_index: int = 0, + ) -> Dict[str, Any]: + """Create a search session dictionary to track state across callbacks. + + Args: + state: Current search state (one of SEARCH_STATES values) + query: Current search query + filters: Active category filters + results: Search results list + current_index: Index of current result being displayed + banner_url: Banner image URL for current state + + Returns: + Dictionary containing the complete session state + """ + return { + "state": state, + "query": query, + "filters": filters or {}, + "results": results or [], + "current_index": current_index, + } + + def _get_banner_for_state(self, state: str) -> str: + return self.state_banners.get(state) async def client_ready(self, client, db): - self.client = client + self.client: TelegramClient = client self.db = db self.api = LimokaAPI() self.schema = Schema( title=TEXT(stored=True), path=ID(stored=True), content=TEXT(stored=True) ) - os.makedirs("limoka_search", exist_ok=True) if not os.path.exists("limoka_search/index"): self.ix = create_in("limoka_search", self.schema) else: self.ix = open_dir("limoka_search") - self._history = self.pointer("history", []) - - self.modules = await self.api.get_all_modules( - f"{self.config['limokaurl']}modules.json" + self.modules = await self.api.fetch_json( + self._base_url, "modules.json" ) + raw = (await self.api.fetch_json( + self._base_url, "repositories.json" + )).get("repositories", []) + self.repositories = { + repo["path"]: repo + for repo in raw + } + # Apply newbie filter if enabled + try: + self.modules = self._filter_newbies(self.modules) + except Exception: + pass + self._bot_username = (await self.client.get_entity(self._service_bot_id)).username + await self._update_index() + if self.config["external_install_allowed"]: + try: + message = await self.client.get_messages(self._bot_username, limit=1) + if not message: + message = await self.client.send_message(self._service_bot_id, "/start") + await message.delete() + except YouBlockedUserError: + logger.warning(f"Please unblock {self._bot_username} to enable external installation feature. Or disable external_install_allowed in Limoka settings to get rid of this message.") + self._userbot_bot_username = (await self.inline.bot.get_me()).username + + @loader.loop(interval=3600) + async def _update_modules_loop(self): + self.modules = await self.api.fetch_json( + self._base_url, "modules.json" + ) + # Re-apply newbie filter after modules refresh + try: + self.modules = self._filter_newbies(self.modules) + except Exception: + pass await self._update_index() async def _update_index(self): writer = self.ix.writer() - for module_path, module_data in self.modules.items(): + modules_to_index = self._filter_newbies(self.modules) + for module_path, module_data in modules_to_index.items(): writer.add_document( title=module_data["name"], path=module_path, - content=module_data["name"] + " " + (module_data["description"] or ""), + content=module_data["name"] + " " + (module_data.get("description") or "" + " " + ((module_data.get("meta").get("developer") or "") if module_data.get("meta") else "")), ) - for func in module_data["commands"]: + for func in module_data.get("commands", []): for command, description in func.items(): writer.add_document( title=module_data["name"], @@ -344,51 +449,33 @@ class Limoka(loader.Module): writer.commit() async def _validate_url(self, url: str) -> Optional[str]: - """Validate a remote URL points to an image. - - Args: - url: Remote URL to validate. - - Returns: - The same URL if it points to an image and is reachable, otherwise - ``None``. - - Side effects: - Adds invalid URLs to ``self._invalid_banners`` to avoid repeated - checks. - """ - # Return the url if valid, otherwise None. Do not return or use - # a global fallback here; fallback handling is done by the caller - # based on display context. if not url or url in self._invalid_banners: return None try: async with aiohttp.ClientSession() as session: - async with session.head(url, timeout=5) as response: + async with session.head(url, timeout=5, allow_redirects=True) as response: if response.status != 200: self._invalid_banners.add(url) return None - content_type = response.headers.get("Content-Type", "").lower() - if not content_type.startswith("image/"): + ct = response.headers.get("Content-Type", "").lower() + if not ct.startswith("image/"): self._invalid_banners.add(url) return None return url - except (aiohttp.ClientError, asyncio.TimeoutError): + except Exception as e: if url: self._invalid_banners.add(url) return None + + def user_lang(self) -> str: + self.db.get("heroku.translations", "lang") - def generate_commands(self, module_info): + def generate_commands(self, module_info, lang: str = "en"): commands = [] - for i, func in enumerate(module_info["commands"], 1): - if i > 9: - commands.append("…\n") - break + for i, func in enumerate(module_info.get("commands", []), 1): for command, description in func.items(): emoji = self.strings["emojis"].get(i, "") desc = description or self.strings["no_info"] - if len(desc) > 150: - desc = desc[:147] + "…" commands.append( self.strings["command_template"].format( prefix=self.get_prefix(), @@ -397,7 +484,18 @@ class Limoka(loader.Module): description=html.escape(desc), ) ) - return commands[:5] + for i, handler in enumerate(module_info.get("inline_handlers", []), 1): + name = handler.get("name", "") + desc_map = handler.get("description", {}) + desc = _get_lang_value(desc_map, lang) or self.strings["no_info"] + commands.append( + self.strings["inline_handler_template"].format( + inline_bot=self._userbot_bot_username, + command=html.escape(name), + description=html.escape(desc), + ) + ) + return commands def _format_module_content( self, @@ -406,98 +504,96 @@ class Limoka(loader.Module): filters: Dict[str, List[str]], include_categories: bool = True, module_path: Optional[str] = None, + lang: str = "en", ) -> tuple: - """Formats the module content for display.""" name = html.escape(module_info.get("name") or self.strings["no_info"]) + cls_doc = module_info.get("cls_doc", {}) description = html.escape( - module_info.get("description") or self.strings["no_info"] + _get_lang_value(cls_doc, lang) or + _get_lang_value(module_info.get("description", ""), lang) or + self.strings["no_info"] ) dev_username = html.escape(module_info["meta"].get("developer", "Unknown")) - - # Prefer explicit module_path argument (caller provides the key), - # otherwise fall back to module_info['path'] if present. - raw_path = ( - module_path if module_path is not None else module_info.get("path", "") - ) + raw_path = module_path if module_path is not None else module_info.get("path", "") clean_module_path = (raw_path or "").replace("\\", "/") - commands = self.generate_commands(module_info) - + commands = self.generate_commands(module_info, lang) categories_text = "" if include_categories: categories = filters.get("category", []) if categories: - categories_text = "\n\n" + self.strings["selected_categories"].format( + categories_text = "\n" + self.strings["selected_categories"].format( categories=", ".join(html.escape(c) for c in categories) ) - if len(description) > 300: description = description[:297] + "…" - - core_message = self.strings["found"].format( + repo_key = "/".join(module_path.split("/")[:2]) if "/" in module_path else module_path + tags_list = [] + for x in self.repositories: + if x == repo_key: + tags_list = self.repositories.get(x, {}).get("tags", []) + break + tags_text = ", ".join(self.strings["tags"].get(tag, tag) for tag in tags_list) + header = self.strings["found_header"].format( query=html.escape(query), name=name, description=description, - url=html.escape(self.config["limokaurl"]), username=dev_username, - commands="".join(commands), - prefix=html.escape(self.get_prefix()), - module_path=html.escape(clean_module_path), + tags=tags_text, ) - - full_message = core_message[:4096] + categories_text[:100] - - caption_message = full_message - if len(caption_message) > 1024: - safe_name = name[:40] + ("..." if len(name) > 40 else "") - safe_desc = description[:100] + ("…" if len(description) > 100 else "") - - caption_message = self.strings["caption_short"].format( - safe_name=safe_name, - safe_desc=safe_desc, - dev_username=dev_username, - prefix=self.get_prefix(), - module_path=html.escape(self.config["limokaurl"] + clean_module_path), - )[:1024] - - if categories_text: - remaining_space = 1024 - len(caption_message) - if remaining_space > 0: - caption_message += categories_text[:remaining_space] - - return caption_message, full_message + commands_text = "".join(commands) + if len(commands_text) <= 500: + body_pages = [commands_text] if commands_text else [""] + else: + body_pages = [] + current_page = [] + current_length = 0 + for cmd in commands: + if current_length + len(cmd) > 500: + if current_page: + body_pages.append("".join(current_page)) + current_page = [] + current_length = 0 + current_page.append(cmd) + current_length += len(cmd) + if current_page: + body_pages.append("".join(current_page)) + if not body_pages: + body_pages = [""] + footer = self.strings["found_footer"].format( + url=html.escape(self.config["limokaurl"]), + module_path=html.escape(clean_module_path), + prefix=html.escape(self.get_prefix()), + ) + return header, body_pages, footer, categories_text def _build_navigation_markup( - self, result: List[str], index: int, query: str, filters: Dict[str, List[str]] + self, session: Dict[str, Any] ) -> list: - """Create navigation markup for inline results.""" + result = session["results"] + index = session["current_index"] + query = session["query"] + filters = session["filters"] + page = index + 1 markup = [ [ { "text": "⏪" if index > 0 else "🚫", "callback": self._previous_page if index > 0 else self._inline_void, - "args": (result, index, query, filters) if index > 0 else (), + "args": (session,) if index > 0 else (), }, {"text": f"{page}/{len(result)}", "callback": self._inline_void}, { "text": "⏩" if index + 1 < len(result) else "🚫", - "callback": ( - self._next_page - if index + 1 < len(result) - else self._inline_void - ), - "args": ( - (result, index, query, filters) - if index + 1 < len(result) - else () - ), + "callback": self._next_page if index + 1 < len(result) else self._inline_void, + "args": (session,) if index + 1 < len(result) else (), }, ], [ { "text": "🔍 " + self.strings["filter_menu"].split(":")[0], "callback": self._display_filter_menu, - "args": (query, filters), + "args": (session,), }, { "text": "🔄 " + self.strings["change_query"], @@ -508,11 +604,70 @@ class Limoka(loader.Module): { "text": self.strings["global_button"], "callback": self._show_global_results, - "args": (query,), + "args": (session,), }, ], ] - # Add a universal close button to the navigation markup + markup.append( + [{"text": self.strings.get("close", "❌ Close"), "action": "close"}] + ) + return markup + + def _build_module_markup( + self, session: Dict[str, Any], body_pages: List[str], page_body: int, module_path: str + ) -> list: + result = session["results"] + index = session["current_index"] + query = session["query"] + filters = session["filters"] + + markup = [] + if len(body_pages) > 1: + markup.append([ + { + "text": "◀️" if page_body > 0 else "🚫", + "callback": self._previous_body_page if page_body > 0 else self._inline_void, + "args": (session, module_path, page_body) if page_body > 0 else (), + }, + {"text": f"Body {page_body + 1}/{len(body_pages)}", "callback": self._inline_void}, + { + "text": "▶️" if page_body + 1 < len(body_pages) else "🚫", + "callback": self._next_body_page if page_body + 1 < len(body_pages) else self._inline_void, + "args": (session, module_path, page_body) if page_body + 1 < len(body_pages) else (), + }, + ]) + page = index + 1 + markup.append([ + { + "text": "⏪" if index > 0 else "🚫", + "callback": self._previous_page if index > 0 else self._inline_void, + "args": (session,) if index > 0 else (), + }, + {"text": f"{page}/{len(result)}", "callback": self._inline_void}, + { + "text": "⏩" if index + 1 < len(result) else "🚫", + "callback": self._next_page if index + 1 < len(result) else self._inline_void, + "args": (session,) if index + 1 < len(result) else (), + }, + ]) + markup.append([ + { + "text": "🔍 " + self.strings["filter_menu"].split(":")[0], + "callback": self._display_filter_menu, + "args": (session,), + }, + { + "text": "🔄 " + self.strings["change_query"], + "callback": self._enter_query, + }, + ]) + markup.append([ + { + "text": self.strings["global_button"], + "callback": self._show_global_results, + "args": (session,), + }, + ]) markup.append( [{"text": self.strings.get("close", "❌ Close"), "action": "close"}] ) @@ -525,15 +680,12 @@ class Limoka(loader.Module): markup: list, photo: Optional[Any] = None, ): - """Safely display module information, handling potential errors.""" try: if message_or_call is None: logger.error("message_or_call is None in _safe_display") return - if isinstance(message_or_call, Message): if photo is not None: - # photo can be a URL/str, file path or a file-like object await self.inline.form( text=text, message=message_or_call, @@ -546,9 +698,7 @@ class Limoka(loader.Module): ) else: if photo is not None: - await message_or_call.edit( - text=text, reply_markup=markup, photo=photo - ) + await message_or_call.edit(text=text, reply_markup=markup, photo=photo) else: await message_or_call.edit(text=text, reply_markup=markup) except (BadRequest, WebpageMediaEmptyError) as e: @@ -563,63 +713,37 @@ class Limoka(loader.Module): message_or_call: Union[Message, InlineCall], module_info: Dict[str, Any], module_path: str, - query: str, - result: List[str], - index: int, - filters: Dict[str, List[str]], + session: Dict[str, Any], + page_body: int = 0, ): - """Display module information with banner and formatted content. - - Args: - message_or_call: Message or InlineCall object where the module - will be displayed. - module_info: Dictionary with module metadata (name, description, - meta.banner, commands, category). - module_path: Path key of the module in `self.modules`. - query: Original search query string. - result: Full list of matched module paths. - index: Index of the current module in `result`. - filters: Active filters (e.g., categories). If ``filters`` is - empty and no valid remote banner exists, the external fallback - URL (`self._fallback_banner_url`) will be used. - - Notes: - The method attempts to validate a remote banner URL via - :meth:`_validate_url`. If validation succeeds the remote URL is - passed to the messaging client. If validation fails and ``filters`` - is empty, the external fallback URL (`self._fallback_banner_url`) - will be used. Behavior may vary depending on the messaging client - used (Telethon/aiogram/etc.). - """ try: - banner_url = await self._validate_url(module_info["meta"].get("banner")) + query = session["query"] + filters = session["filters"] + + lang = self.user_lang() + module_banner_raw = module_info.get("meta", {}).get("banner") + photo = await self._validate_url(module_banner_raw) - caption_message, full_message = self._format_module_content( + if not photo: + state_banner_raw = self._get_banner_for_state("no_banner") + photo = await self._validate_url(state_banner_raw) + + header, body_pages, footer, categories_text = self._format_module_content( module_info, query, filters, include_categories=True, module_path=module_path, + lang=lang, ) + current_body = body_pages[min(page_body, len(body_pages) - 1)] + full_message = header + current_body + footer + categories_text - markup = self._build_navigation_markup(result, index, query, filters) + markup = self._build_module_markup(session, body_pages, page_body, module_path) - # Determine which banner to use. If banner_url is valid, use it. - # If no valid banner and no filters are applied (normal search display), - # create an in-memory BytesIO from the embedded base64 and use it. - banner_to_use = None - if banner_url: - banner_to_use = banner_url - else: - if not filters: - # Use external fallback URL for plain search display. - banner_to_use = getattr(self, "_fallback_banner_url", None) - - display_text = caption_message if banner_to_use else full_message await self._safe_display( - message_or_call, display_text, markup, banner_to_use + message_or_call, full_message, markup, photo ) - except Exception as e: logger.exception(f"Error in _display_module: {e}") if isinstance(message_or_call, Message): @@ -627,57 +751,79 @@ class Limoka(loader.Module): elif hasattr(message_or_call, "edit"): await message_or_call.edit(self.strings["error_occurred"]) - async def _display_filter_menu( - self, call: InlineCall, query: str, current_filters: dict + async def _previous_body_page( + self, call: InlineCall, session: Dict[str, Any], module_path: str, page_body: int ): + module_info = self.modules[module_path] + new_page_body = max(page_body - 1, 0) + await self._display_module(call, module_info, module_path, session, page_body=new_page_body) + + async def _next_body_page( + self, call: InlineCall, session: Dict[str, Any], module_path: str, page_body: int + ): + module_info = self.modules[module_path] + query = session["query"] + filters = session["filters"] + header, body_pages, footer, categories_text = self._format_module_content( + module_info, query, filters, include_categories=True, module_path=module_path, lang=self.user_lang() + ) + new_page_body = min(page_body + 1, len(body_pages) - 1) + await self._display_module(call, module_info, module_path, session, page_body=new_page_body) + + async def _display_filter_menu( + self, call: InlineCall, session: Dict[str, Any] + ): + query = session["query"] + current_filters = session["filters"] + categories = current_filters.get("category", []) filters_text = self.strings["selected_categories"].format( categories=( ", ".join(categories) if categories else self.strings["no_category"] ) ) - markup = [ [ { "text": self.strings["filter_cat"], "callback": self._select_category, - "args": (query, current_filters), + "args": (session,), }, ], [ { "text": self.strings["apply_filters"], "callback": self._apply_filters, - "args": (query, current_filters), + "args": (session,), }, { "text": self.strings["clear_filters"], "callback": self._clear_filters, - "args": (query,), + "args": (session,), }, ], [ { "text": self.strings["back_to_results"], "callback": self._show_results, - "args": (query, {}, True), + "args": (session, True), }, ], [{"text": self.strings.get("close", "❌ Close"), "action": "close"}], ] - - text = self.strings["filter_menu"].format(query=query) + f"\n\n{filters_text}" - await call.edit(text, reply_markup=markup) + text = self.strings["filter_menu"].format(query=query) + f"\n{filters_text}" + await call.edit(text, reply_markup=markup, photo=self._get_banner_for_state("filter_select")) async def _select_category( - self, call: InlineCall, query: str, current_filters: dict + self, call: InlineCall, session: Dict[str, Any] ): + query = session["query"] + current_filters = session["filters"] + all_categories = set() for module_data in self.modules.values(): all_categories.update(module_data.get("category", ["No category"])) categories = sorted(all_categories) - if not categories: await call.edit( self.strings["no_categories"], @@ -686,17 +832,15 @@ class Limoka(loader.Module): { "text": self.strings["back"], "callback": self._display_filter_menu, - "args": (query, current_filters), + "args": (session,), } ] ], ) return - selected_categories = current_filters.get("category", []) buttons = [] row = [] - for i, cat in enumerate(categories): button_text = ( self.strings["category"].format(category=cat) @@ -705,71 +849,74 @@ class Limoka(loader.Module): ) if cat in selected_categories: button_text = "✅ " + button_text - + + # Create new session with updated filters + new_session = session.copy() row.append( { "text": button_text, "callback": self._toggle_category, - "args": (query, current_filters, cat), + "args": (session, cat), } ) - if (i + 1) % 3 == 0 or i == len(categories) - 1: buttons.append(row) row = [] - buttons.append( [ { "text": self.strings["back"], "callback": self._display_filter_menu, - "args": (query, current_filters), + "args": (session,), } ] ) - - # Add close button to category selector buttons.append( [{"text": self.strings.get("close", "❌ Close"), "action": "close"}] ) - text = self.strings["select_category"].format(query=query) await call.edit(text, reply_markup=buttons) async def _toggle_category( - self, call: InlineCall, query: str, current_filters: dict, category: str + self, call: InlineCall, session: Dict[str, Any], category: str ): + query = session["query"] + current_filters = session["filters"] + new_filters = current_filters.copy() selected_categories = new_filters.get("category", []) - if category in selected_categories: selected_categories.remove(category) else: selected_categories.append(category) - if selected_categories: new_filters["category"] = selected_categories else: new_filters.pop("category", None) + new_session = session.copy() + new_session["filters"] = new_filters + await self._select_category(call, new_session) - await self._select_category(call, query, new_filters) + async def _apply_filters(self, call: InlineCall, session: Dict[str, Any]): + await self._show_results(call, session, from_filters=True) - async def _apply_filters(self, call: InlineCall, query: str, filters: dict): - await self._show_results(call, query, filters, from_filters=True) - - async def _clear_filters(self, call: InlineCall, query: str): - await self._show_results(call, query, {}, from_filters=True) + async def _clear_filters(self, call: InlineCall, session: Dict[str, Any]): + new_session = session.copy() + new_session["filters"] = {} + await self._show_results(call, new_session, from_filters=True) async def _show_results( - self, call: InlineCall, query: str, filters: dict, from_filters: bool = False + self, call: InlineCall, session: Dict[str, Any], from_filters: bool = False ): + query = session["query"] + filters = session["filters"] + searcher = Search(query.lower(), self.ix) try: result = searcher.search_module() except Exception: await call.edit(self.strings["?"], reply_markup=[]) return - if not result: markup = ( [ @@ -777,14 +924,13 @@ class Limoka(loader.Module): { "text": self.strings["back"], "callback": self._display_filter_menu, - "args": (query, filters), + "args": (session,), } ] ] if from_filters else [] ) - # Always provide a close button on empty-result screens markup.append( [{"text": self.strings.get("close", "❌ Close"), "action": "close"}] ) @@ -792,7 +938,6 @@ class Limoka(loader.Module): self.strings["404"].format(query=query), reply_markup=markup ) return - if filters.get("category"): filtered_result = [ path @@ -804,7 +949,6 @@ class Limoka(loader.Module): ] else: filtered_result = result - if not filtered_result: markup = ( [ @@ -812,14 +956,13 @@ class Limoka(loader.Module): { "text": self.strings["back"], "callback": self._display_filter_menu, - "args": (query, filters), + "args": (session,), } ] ] if from_filters else [] ) - # Add close button when filtered results are empty markup.append( [{"text": self.strings.get("close", "❌ Close"), "action": "close"}] ) @@ -827,40 +970,36 @@ class Limoka(loader.Module): self.strings["404"].format(query=query), reply_markup=markup ) return - module_path = filtered_result[0] module_info = self.modules[module_path] + + # Create session for displaying module + display_session = self._create_search_session( + state=self.SEARCH_STATES["global_search"], + query=query, + filters=filters, + results=filtered_result, + current_index=0, + ) await self._display_module( - call, module_info, module_path, query, filtered_result, 0, filters + call, module_info, module_path, display_session, 0 ) async def _enter_query_handler( self, call_or_query, query: Optional[str] = None, *args, **kwargs ): - """Handler for inline query input. - - This handler is tolerant to different calling conventions used by the - framework: some callers provide `(call, query)`, others may provide - `(query,)` or `(query, call)` depending on context. Normalize the - inputs so the handler works from menus and forms alike. - """ - # Normalize parameters: try to find `call` (message or InlineCall) call = None if query is None and isinstance(call_or_query, str): - # Called as (query, ...) — search text is first argument query = call_or_query for a in args: if hasattr(a, "edit") or isinstance(a, Message): call = a break else: - # Expected calling convention: (call, query, ...) call = call_or_query - if call is None: logger.error("_enter_query_handler: missing call/context") return - if not query: await call.edit( self.strings["?"], @@ -874,7 +1013,6 @@ class Limoka(loader.Module): ], ) return - if len(query) <= 1: await call.edit( self.strings["?"], @@ -888,7 +1026,6 @@ class Limoka(loader.Module): ], ) return - searcher = Search(query.lower(), self.ix) try: result = searcher.search_module() @@ -905,7 +1042,6 @@ class Limoka(loader.Module): ], ) return - if not result: await call.edit( self.strings["404"].format(query=query), @@ -925,17 +1061,20 @@ class Limoka(loader.Module): ], ) return - module_path = result[0] module_info = self.modules[module_path] - await self._display_module(call, module_info, module_path, query, result, 0, {}) + + # Create session for displaying module + display_session = self._create_search_session( + state=self.SEARCH_STATES["global_search"], + query=query, + filters={}, + results=result, + current_index=0, + ) + await self._display_module(call, module_info, module_path, display_session, 0) async def _enter_query(self, call: InlineCall, query: Optional[str] = None): - """Show input form for new query. - - Accepts an optional `query` when called from other menus so the - "back to results" button can restore the previous search context. - """ markup = [ [ { @@ -948,7 +1087,11 @@ class Limoka(loader.Module): { "text": self.strings["back_to_results"], "callback": self._show_results, - "args": (query or "", {}), + "args": (self._create_search_session( + state=self.SEARCH_STATES["global_search"], + query=query or "", + filters={}, + ),), } ], [ @@ -958,17 +1101,17 @@ class Limoka(loader.Module): } ], ] - await call.edit(self.strings["enter_query"], reply_markup=markup) - async def _show_global_results(self, call: InlineCall, query: str): + async def _show_global_results(self, call: InlineCall, session: Dict[str, Any]): + query = session["query"] + searcher = Search(query.lower(), self.ix) try: result = searcher.search_module() except Exception: await call.edit(self.strings["?"], reply_markup=[]) return - if not result: await call.edit( self.strings["404"].format(query=query), @@ -982,7 +1125,6 @@ class Limoka(loader.Module): ], ) return - text = self.strings["global_search"].format( query=html.escape(query), count=len(result) ) @@ -992,55 +1134,72 @@ class Limoka(loader.Module): if not info: continue name = info.get("name", "Unknown") + + global_session = self._create_search_session( + state=self.SEARCH_STATES["global_search"], + query=query, + filters={}, + results=result, + current_index=i, + ) buttons.append( [ { "text": f"{i+1}. {name}", "callback": self._display_module_from_global, - "args": (path, query, result), + "args": (path, global_session), } ] ) buttons.append( [{"text": self.strings["change_query"], "callback": self._enter_query}] ) - await call.edit(text=text[:4096], reply_markup=buttons) async def _display_module_from_global( - self, call: InlineCall, module_path: str, query: str, result: list + self, call: InlineCall, module_path: str, session: Dict[str, Any] ): module_info = self.modules[module_path] await self._display_module( - call, module_info, module_path, query, result, result.index(module_path), {} + call, module_info, module_path, session, 0 ) async def _next_page( - self, call: InlineCall, result: list, index: int, query: str, filters: dict + self, call: InlineCall, session: Dict[str, Any] ): + result = session["results"] + index = session["current_index"] + if index + 1 >= len(result): await call.answer(self.strings["last_page"]) return - index += 1 module_path = result[index] module_info = self.modules[module_path] + + new_session = session.copy() + new_session["current_index"] = index await self._display_module( - call, module_info, module_path, query, result, index, filters + call, module_info, module_path, new_session, 0 ) async def _previous_page( - self, call: InlineCall, result: list, index: int, query: str, filters: dict + self, call: InlineCall, session: Dict[str, Any] ): + result = session["results"] + index = session["current_index"] + if index - 1 < 0: await call.answer(self.strings["first_page"]) return - index -= 1 module_path = result[index] module_info = self.modules[module_path] + + new_session = session.copy() + new_session["current_index"] = index await self._display_module( - call, module_info, module_path, query, result, index, filters + call, module_info, module_path, new_session, 0 ) async def _inline_void(self, call: InlineCall): @@ -1050,7 +1209,6 @@ class Limoka(loader.Module): async def limokacmd(self, message: Message): """[query / nothing] - Search modules""" args = utils.get_args_raw(message) - if not args: markup = [ [ @@ -1068,24 +1226,21 @@ class Limoka(loader.Module): } ], ] - # Close button on the main no-args form markup.append( [{"text": self.strings.get("close", "❌ Close"), "action": "close"}] ) - await self.inline.form( text=self.strings["start_search_form"], message=message, reply_markup=markup, + photo=self._get_banner_for_state("global_search") ) return - history = self.get("history", []) if len(history) >= 10: history = history[-9:] history.append(args) self.set("history", history) - await utils.answer( message, self.strings["wait"].format( @@ -1094,21 +1249,25 @@ class Limoka(loader.Module): query=args, ), ) - searcher = Search(args.lower(), self.ix) try: result = searcher.search_module() except Exception: return await utils.answer(message, self.strings["?"]) - if not result: return await utils.answer(message, self.strings["404"].format(query=args)) - module_path = result[0] module_info = self.modules[module_path] - await self._display_module( - message, module_info, module_path, args, result, 0, {} + + # Create session for displaying module + display_session = self._create_search_session( + state=self.SEARCH_STATES["global_search"], + query=args, + filters={}, + results=result, + current_index=0, ) + await self._display_module(message, module_info, module_path, display_session, 0) async def _show_global_form(self, call: InlineCall, message: Message): markup = [ @@ -1133,12 +1292,18 @@ class Limoka(loader.Module): } ], ] - await call.edit(self.strings["global_search_form"], reply_markup=markup) async def _global_search_handler( self, call: InlineCall, query: str, message: Message, *args, **kwargs ): + global_session = self._create_search_session( + state=self.SEARCH_STATES["global_search"], + query=query, + filters={}, + results=[], + current_index=0, + ) # idk what is that crap but it works lol if len(query) <= 1: await call.edit( self.strings["?"], @@ -1158,7 +1323,6 @@ class Limoka(loader.Module): ], ) return - searcher = Search(query.lower(), self.ix) try: result = searcher.search_module() @@ -1181,7 +1345,6 @@ class Limoka(loader.Module): ], ) return - if not result: await call.edit( self.strings["404"].format(query=query), @@ -1201,7 +1364,6 @@ class Limoka(loader.Module): ], ) return - text = self.strings["global_search"].format( query=html.escape(query), count=len(result) ) @@ -1216,7 +1378,7 @@ class Limoka(loader.Module): { "text": f"{i+1}. {name}", "callback": self._display_module_from_global, - "args": (path, query, result), + "args": (path, global_session), } ] ) @@ -1231,29 +1393,23 @@ class Limoka(loader.Module): buttons.append( [{"text": self.strings.get("close", "❌ Close"), "action": "close"}] ) - await call.edit(text=text[:4096], reply_markup=buttons) @loader.command(ru_doc="[clear] — Показать или очистить историю поиска") async def lshistorycmd(self, message: Message): """[clear] - Show or clear search history""" args = utils.get_args_raw(message).strip().lower() - if args == "clear": self.set("history", []) await utils.answer(message, self.strings["history_cleared"]) return - if args: await utils.answer(message, self.strings["invalid_history_arg"]) return - history = self.get("history", []) - if not history: await utils.answer(message, self.strings["empty_history"]) return - formatted_history = [ f"{i+1}. {utils.escape_html(h)}" for i, h in enumerate(history[-10:]) @@ -1265,80 +1421,44 @@ class Limoka(loader.Module): @loader.watcher(from_dl=False) async def secure_install_watcher(self, message: Message): - """Secure install watcher for official Limoka bot. - - This watcher cleans HTML from incoming messages, extracts a - signed #limoka:: tag, verifies the signature and - triggers the loader to download and install the module if valid. - """ if not message.text: return - - # Verify sender id is present and comes from the official Limoka bot if not hasattr(message, "from_id") or not message.from_id: return - sender_id = None if hasattr(message.from_id, "user_id"): sender_id = message.from_id.user_id elif hasattr(message.from_id, "channel_id"): sender_id = message.from_id.channel_id - - if sender_id != 8581621390: + if sender_id != self._service_bot_id: logger.debug("Message not from official bot, ignoring") return - - # Only act when external installs are enabled if not self.config["external_install_allowed"]: return - try: - # Prefer raw_text/message when available to preserve original - # formatting (some clients provide parsed .text that loses - # tags/links). Fall back to .text if needed. clean_text = getattr(message, "raw_text", None) or getattr( message, "message", None ) or message.text or "" - if message.entities: from html import unescape - clean_text = unescape(clean_text) - # Remove HTML tags but keep their inner text so we don't - # accidentally remove the tag content when it's wrapped - # in an or similar. - clean_text = re.sub(r"<[^>]+>", "", clean_text) - - # Extract the first #limoka: occurrence. Allow for - # characters until whitespace or HTML/quote delimiters. + clean_text = re.sub(r"<[^>]+>", "", clean_text) match = re.search(r"#limoka:([^\s\"'<>]+)", clean_text) if not match: logger.debug( "No #limoka tag found in cleaned text; leaving original message intact" ) - # Do not send a user-visible reply for missing tag; simply exit. return - tag_content = match.group(1) - - # Expect format: : parts = tag_content.split(":", 1) if len(parts) != 2: logger.error("Invalid tag format after cleaning") await utils.answer(message, self.strings["watcher_invalid_format"]) - # Do not delete the original message on parse errors. return - module_path, signature_hex = parts - - # Strip leftover quote characters and whitespace module_path = re.sub(r"[<>\"']", "", module_path).strip() - - # Handle possible href= artifacts if module_path.startswith("href="): module_path = module_path[5:].strip('"').strip("'") - - # Try to resolve the module key in database if module_path not in self.modules: found = False for db_path in self.modules.keys(): @@ -1346,30 +1466,18 @@ class Limoka(loader.Module): module_path = db_path found = True break - if not found: logger.warning(f"Module not found after cleanup: {module_path}") await utils.answer( message, self.strings["watcher_module_not_found"].format(path=html.escape(module_path)) ) - # Keep original message in chat for inspection. return - - # logger.info(f"Module found in database: {module_path}") - - # Verify signature using embedded public key — signature covers - # the module path AND the SHA256 of the module content (format: - # "{module_path}|{sha256}"). Download module, compute hash and - # verify signature against that combined payload. try: import base64 from cryptography.hazmat.primitives.asymmetric import ed25519 - PUB_KEY_B64 = "MCowBQYDK2VwAyEA1ltSnqtf3pGBuctuAYqHivCXsaRtKOVxavai7yin7ZE=" der_bytes = base64.b64decode(PUB_KEY_B64) raw_pubkey = der_bytes[-32:] - - # Download module content to compute SHA256 module_url = self.config["limokaurl"] + module_path async with aiohttp.ClientSession() as session: async with session.get(module_url, timeout=10) as resp: @@ -1378,65 +1486,41 @@ class Limoka(loader.Module): await utils.answer(message, self.strings["watcher_loader_missing"]) return module_bytes = await resp.read() - - sha256 = hashlib.sha256(module_bytes).hexdigest() - - public_key = ed25519.Ed25519PublicKey.from_public_bytes(raw_pubkey) - signature = bytes.fromhex(signature_hex) - signed_payload = f"{module_path}|{sha256}".encode() - public_key.verify(signature, signed_payload) - # logger.info(f"Signature verified for {module_path} (sha256={sha256})") + sha256 = hashlib.sha256(module_bytes).hexdigest() + public_key = ed25519.Ed25519PublicKey.from_public_bytes(raw_pubkey) + signature = bytes.fromhex(signature_hex) + signed_payload = f"{module_path}|{sha256}".encode() + public_key.verify(signature, signed_payload) except Exception as e: logger.error(f"Signature verification failed for {module_path}: {e}") await utils.answer(message, self.strings["watcher_signature_invalid"]) - # Keep original message so admins can inspect the signed payload. return - - # Perform install via loader loader_mod = self.lookup("loader") if not loader_mod: logger.error("Loader module not found") await utils.answer(message, self.strings["watcher_loader_missing"]) - # Do not delete the original message on loader problems. return - module_url = self.config["limokaurl"] + module_path - # logger.info(f"Installing from URL: {module_url}") - status = await loader_mod.download_and_install(module_url, None) - if getattr(loader_mod, "fully_loaded", False): loader_mod.update_modules_in_db() - - # Attempt to remove the original message try: await message.delete() - # logger.info("Original message deleted") except Exception as e: logger.error(f"Failed to delete message: {e}") - - #logger.info(status) - if status: - # module_name = module_path.split("/")[-1].replace(".py", "") - # Notify official bot about success try: - bot_peer = await self.client.get_entity(8581621390) + bot_peer = await self.client.get_entity(self._service_bot_id) await self.client.send_message(bot_peer, f"#limoka:sucsess:{message.id}") - # logger.info(f"Sent success confirmation to bot for message {message.id}") except Exception as e: logger.error(f"Failed to send success confirmation: {e}") - - # logger.info(f"Module {module_name} installed successfully") else: logger.error(f"Installation failed with status: {status}") try: - bot_peer = await self.client.get_entity(8581621390) + bot_peer = await self.client.get_entity(self._service_bot_id) await self.client.send_message(bot_peer, f"#limoka:failed:{message.id}") - # logger.info(f"Sent failure notification to bot for message {message.id}") except Exception as e: logger.error(f"Failed to send failure notification: {e}") - except Exception as e: logger.exception(f"CRITICAL ERROR in secure_install_watcher: {e}") try: diff --git a/backup.py b/backup.py new file mode 100644 index 0000000..1dc16f1 --- /dev/null +++ b/backup.py @@ -0,0 +1,74 @@ +import asyncio +import aiohttp +import argparse +import subprocess +import os +import glob + +parser = argparse.ArgumentParser(description="Backup Script") +parser.add_argument( + "--token", + type=str, + required=True, + help="Token of Telegram bot", +) +parser.add_argument( + "--api_url", + type=str, + default="https://api.telegram.org", + help="API URL of Telegram API", +) +parser.add_argument( + "--chat_id", + type=str, + required=True, + help="Chat ID to send backup message to", +) + +arguments = parser.parse_args() + +async def send_file(session, file_path, caption=None): + url = f"{arguments.api_url}/bot{arguments.token}/sendDocument" + with open(file_path, 'rb') as f: + data = aiohttp.FormData() + data.add_field('chat_id', arguments.chat_id) + data.add_field('document', f, filename=os.path.basename(file_path)) + if caption: + data.add_field('caption', caption) + data.add_field('parse_mode', 'Markdown') + async with session.post(url, data=data) as response: + return await response.json() + +async def main(): + # Get commit info + commit_message = subprocess.check_output(['git', 'log', '-1', '--pretty=%B']).decode().strip() + commit_date = subprocess.check_output(['git', 'log', '-1', '--pretty=%ci']).decode().strip() + commit_hash = subprocess.check_output(['git', 'rev-parse', '--short=6', 'HEAD']).decode().strip() + commit_url = f"https://github.com/MuRuLOSE/limoka/commit/{subprocess.check_output(['git', 'rev-parse', 'HEAD']).decode().strip()}" + message = f"Commit Date: {commit_date}, Commit Message: {commit_message}, Commit Hash: [`{commit_hash}`]({commit_url})" + + # Create zip + subprocess.run(['git', 'archive', '--format=zip', '--output=repository-original.zip', 'HEAD']) + subprocess.run(['zip', '-9', 'repository.zip', 'repository-original.zip']) + os.remove('repository-original.zip') + + # Split zip + subprocess.run(['split', '-b', '49M', 'repository.zip', 'repository-part-']) + + # Send parts + async with aiohttp.ClientSession() as session: + parts = sorted(glob.glob('repository-part-*')) + first = True + for part in parts: + caption = message if first else None + result = await send_file(session, part, caption) + print(f"Sent {part}: {result}") + first = False + + # Cleanup + os.remove('repository.zip') + for part in parts: + os.remove(part) + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/parse.py b/parse.py index 4d10721..4b1b657 100644 --- a/parse.py +++ b/parse.py @@ -1,171 +1,353 @@ -import os import ast import json +import os +import logging +from typing import Dict, Any, Optional, List -from clone_repos import repos -from typing import Dict +logging.basicConfig(level=logging.WARNING, format="%(message)s") +logger = logging.getLogger(__name__) -# TODO: ADD VENV IGNORE +def safe_unparse(node: ast.AST) -> str: + try: + return ast.unparse(node) + except AttributeError: + return getattr(node, 'id', str(type(node).__name__)) +def load_blacklist(file_path): + with open(file_path, "r", encoding="utf-8") as f: + data = json.load(f) + repositories = data.get("repositories", []) + blacklisted_modules = {} -def get_module_info(module_path): - """Парсит Python-модуль и извлекает информацию о нем.""" - with open(module_path, "r", encoding="utf-8") as f: - module_content = f.read() + for i in repositories: + path = i.get("path", "") + blacklist = i.get("blacklist", []) + if path and blacklist: + blacklisted_modules[path] = blacklist - meta_info = {"pic": None, "banner": None} - for line in module_content.split("\n"): - if line.startswith("# meta"): - key, value = line.replace("# meta ", "").split(": ") - meta_info[key] = value + return blacklisted_modules - tree = ast.parse(module_content) +def extract_string_value(node: ast.AST) -> Optional[str]: + try: + if isinstance(node, ast.Constant) and isinstance(node.value, str): + return node.value + if isinstance(node, ast.Str): + return node.s + if isinstance(node, ast.Name): + return node.id + if isinstance(node, ast.Attribute): + return f"{safe_unparse(node.value)}.{node.attr}" + return str(node) + except Exception: + return None - def get_decorator_names(decorator_list): - return [ast.unparse(decorator) for decorator in decorator_list] - - def extract_loader_command_args(decorator): - """Извлекает аргументы `ru_doc` и `en_doc` из `@loader.command`.""" - if ( - isinstance(decorator, ast.Call) - and hasattr(decorator.func, "attr") - and decorator.func.attr == "command" - ): - ru_doc = None - en_doc = None - for keyword in decorator.keywords: - if keyword.arg == "ru_doc": - ru_doc = ast.literal_eval(keyword.value) - elif keyword.arg == "en_doc": - en_doc = ast.literal_eval(keyword.value) - return ru_doc, en_doc - return None, None - - result = {} - for node in ast.walk(tree): - if isinstance(node, ast.ClassDef): - decorators = get_decorator_names(node.decorator_list) - is_tds_mod = [d for d in decorators if "loader.tds" in d] - if "Mod" not in node.name and not is_tds_mod: +def extract_loader_command_args(decorator: ast.Call) -> Dict[str, Any]: + args = {"lang_docs": {}, "aliases": [], "usage": None} + try: + for kw in decorator.keywords: + arg_name = kw.arg + if not arg_name: continue + if arg_name.endswith("_doc"): + lang = arg_name[:-4] + args["lang_docs"][lang] = extract_string_value(kw.value) + elif arg_name == "aliases": + try: + val = ast.literal_eval(kw.value) + if isinstance(val, (list, tuple)): + args["aliases"] = list(val) + except (ValueError, SyntaxError): + pass + elif arg_name == "usage": + args["usage"] = extract_string_value(kw.value) + except Exception: + pass + return args - class_docstring = ast.get_docstring(node) - class_info = { - "name": node.name, - "description": class_docstring, - "meta": meta_info, - "commands": [], - "new_commands": [], - } +def get_module_info(module_path: str) -> Optional[Dict[str, Any]]: + try: + with open(module_path, "r", encoding="utf-8") as f: + source = f.read() + except Exception as e: + logger.warning(f"Skipping {module_path}: read failed — {e}") + return None - for class_body_node in node.body: - if isinstance(class_body_node, (ast.FunctionDef, ast.AsyncFunctionDef)): - decorators = get_decorator_names(class_body_node.decorator_list) - is_loader_command = [d for d in decorators if "command" in d] - if not is_loader_command and "cmd" not in class_body_node.name: - continue + source = source.lstrip('\ufeff') + source = ''.join(c for c in source if ord(c) >= 32 or c in '\n\r\t') if source else source - method_docstring = ast.get_docstring(class_body_node) - command_name = class_body_node.name - ru_doc, en_doc = None, None + meta = {"pic": None, "banner": None, "developer": None} + for line in source.splitlines(): + line = line.strip() + if line.startswith("# meta "): + try: + key, val = line[len("# meta "):].split(":", 1) + meta[key.strip()] = val.strip() + except ValueError: + pass - for decorator in class_body_node.decorator_list: - ru_doc_tmp, en_doc_tmp = extract_loader_command_args(decorator) - if ru_doc_tmp: - ru_doc = ru_doc_tmp - if en_doc_tmp: - en_doc = en_doc_tmp + try: + tree = ast.parse(source, filename=module_path) + except SyntaxError as e: + logger.warning(f"Skipping {module_path}: syntax error — {e}") + return { + "name": module_path.split(os.sep)[-1].replace(".py", ""), + "description": "", + "cls_doc": {}, + "meta": meta, + "commands": [], + "new_commands": [], + "inline_handlers": [], + "strings": {}, + "has_on_load": False, + "has_on_unload": False, + "class_cmd_names": {}, + } - descriptions = [] - if method_docstring: - descriptions.append(method_docstring) - if ru_doc: - descriptions.append(ru_doc) - if en_doc: - descriptions.append(en_doc) + module_data = None - class_info["commands"].append( - {command_name: ' '.join(descriptions)} - ) - - command_name = command_name.replace('cmd', '') - - class_info["new_commands"].append( - { - command_name: { - "ru_doc": ru_doc, - "en_doc": en_doc, - "doc": method_docstring, - } - } - ) - - result = class_info - - return result - -def parse_developers(base_dir: str) -> Dict[str, list]: - developers = { - "repo": set(), # используем set внутри функции - "channel": set() - } - - for repo_url in repos: - repo_path = repo_url.replace("https://github.com/", "") - try: - owner, repo_name = repo_path.split("/") - developers["repo"].add(owner) - except ValueError: - print(f"Incorrect URL of repository: {repo_url}") + for node in ast.walk(tree): + if not isinstance(node, ast.ClassDef): continue - for root, _, files in os.walk(base_dir): - for file in files: - if file.endswith(".py"): - file_path = os.path.join(root, file) - try: - module_info = get_module_info(file_path) - if module_info and "meta" in module_info: - developer = module_info["meta"].get('developer') - if developer: # Проверяем, что developer не None - # Разделяем строки с запятыми, &, | и пробелами - for dev in developer.replace(',', ' ').replace('&', ' ').replace('|', ' ').split(): - # Добавляем только элементы, начинающиеся с @ - if dev.startswith('@'): - developers["channel"].add(dev.strip()) - except Exception as e: - print(f"Ошибка при парсинге файла {file_path}: {e}") + is_module_class = ( + "Mod" in node.name or + any(isinstance(d, ast.Attribute) and safe_unparse(d).startswith("loader.tds") for d in node.decorator_list) or + any(isinstance(d, ast.Name) and d.id == "loader" for d in node.decorator_list) + ) - # Преобразуем set в list перед возвратом - return { - "repo": list(developers["repo"]), - "channel": list(developers["channel"]) + if not is_module_class: + continue + + info = { + "name": node.name, + "description": ast.get_docstring(node) or "", + "cls_doc": {}, + "meta": meta, + "commands": [], + "new_commands": [], + "inline_handlers": [], + "strings": {}, + "has_on_load": False, + "has_on_load": False, + "has_on_unload": False, + "class_cmd_names": {}, + } + + for item in node.body: + if isinstance(item, ast.Assign): + for target in item.targets: + if isinstance(target, ast.Name) and (target.id == "strings" or target.id.startswith("strings_")): + try: + lit = ast.literal_eval(item.value) + if isinstance(lit, dict): + if target.id == "strings": + info["strings"].update(lit) + if "_cls_doc" in lit: + info["cls_doc"]["default"] = lit["_cls_doc"] + else: + lang = target.id.split("_", 1)[1] if "_" in target.id else None + if lang: + for k, v in lit.items(): + if isinstance(k, str) and isinstance(v, str): + if k == "_cls_doc": + info["cls_doc"][lang] = v + elif k.startswith("_cmd_doc_"): + rest = k[len("_cmd_doc_"):] + info["strings"][f"_cmd_doc_{lang}_{rest}"] = v + info["strings"][f"_cmd_doc_{rest}_{lang}"] = v + elif k.startswith("_ihandle_doc_"): + rest = k[len("_ihandle_doc_"):] + info["strings"][f"_ihandle_doc_{lang}_{rest}"] = v + info["strings"][f"_ihandle_doc_{rest}_{lang}"] = v + elif k.startswith("_cls_cmd_"): + info["class_cmd_names"][lang] = v + else: + info["strings"][f"{k}_{lang}"] = v + except Exception: + pass + + if "_cls_doc" in info["strings"]: + info["cls_doc"]["default"] = info["strings"]["_cls_doc"] + + for func in node.body: + if not isinstance(func, (ast.FunctionDef, ast.AsyncFunctionDef)): + continue + + name = func.name + if name == "on_load": + info["has_on_load"] = True + continue + if name == "on_unload": + info["has_on_unload"] = True + continue + + is_decorated = any( + isinstance(d, ast.Call) and hasattr(d.func, 'attr') and + d.func.attr in ("command", "inline_handler", "unrestricted", "owner") + for d in func.decorator_list + ) + + if name.startswith("_") and not is_decorated: + continue + + cmd = { + "name": name, + "doc": ast.get_docstring(func) or "", + "lang_docs": {}, + "aliases": [], + "usage": None, + "inline": False, + "is_inline_handler": False, + "decorators": [], + "cmd_names": {}, + } + + for dec in func.decorator_list: + if isinstance(dec, ast.Call) and hasattr(dec.func, 'attr'): + attr = dec.func.attr + if attr == "command": + cmd.update(extract_loader_command_args(dec)) + elif attr == "inline_handler": + cmd["inline"] = True + cmd["is_inline_handler"] = True + elif attr in ("unrestricted", "owner", "support"): + cmd["decorators"].append(attr) + + for stmt in func.body: + if isinstance(stmt, ast.Assign): + for target in stmt.targets: + if isinstance(target, ast.Attribute): + attr = target.attr + val = extract_string_value(stmt.value) + if not val: + continue + if attr == "_cmd": + cmd["name"] = val + elif attr == "_doc": + cmd["doc"] = val + elif attr == "_cls_doc": + info["cls_doc"]["default"] = val + elif attr.startswith("_cls_doc_"): + lang = attr[len("_cls_doc_"):] + info["cls_doc"][lang] = val + elif attr.startswith("_cmd_"): + lang = attr[len("_cmd_"):] + cmd["cmd_names"][lang] = val + + is_command_name = "cmd" in name and not name.startswith("__") + if not (is_decorated or is_command_name): + continue + + clean_name = cmd["name"].replace("cmd", "").replace("_", "") + + descs = [] + legacy_key = f"_cmd_doc_{clean_name}" + legacy_doc = info["strings"].get(legacy_key) + base_doc = legacy_doc if legacy_doc else cmd["doc"] + if base_doc: + descs.append(base_doc) + + for lang, text in cmd["lang_docs"].items(): + if text: + descs.append(f"({lang.upper()}) {text}") + + for k, v in info["strings"].items(): + if k.startswith("_cmd_doc_") and clean_name in k and v: + if k.endswith(f"_{clean_name}"): + lang_part = k[len("_cmd_doc_"):-len(f"_{clean_name}")-1] + if lang_part: + descs.append(f"({lang_part.upper()}) {v}") + elif k.startswith(f"_cmd_doc_{clean_name}_"): + lang_part = k[len(f"_cmd_doc_{clean_name}_"):] + if lang_part: + descs.append(f"({lang_part.upper()}) {v}") + + full_desc = " | ".join(filter(None, descs)) + info["commands"].append({clean_name: full_desc}) + + desc_map = {"default": legacy_doc or cmd["doc"]} + desc_map.update(cmd["lang_docs"]) + + for k, v in info["strings"].items(): + if k.startswith("_cmd_doc_") and clean_name in k and v: + if k.endswith(f"_{clean_name}"): + lang_part = k[len("_cmd_doc_"):-len(f"_{clean_name}")-1] + if lang_part: + desc_map[lang_part] = v + elif k.startswith(f"_cmd_doc_{clean_name}_"): + lang_part = k[len(f"_cmd_doc_{clean_name}_"):] + if lang_part: + desc_map[lang_part] = v + + info["new_commands"].append({ + "name": clean_name, + "original_name": cmd["name"], + "description": desc_map, + "cmd_names": cmd["cmd_names"], + "aliases": cmd["aliases"], + "usage": cmd["usage"], + "inline": cmd["inline"], + "is_inline_handler": cmd["is_inline_handler"], + "decorators": cmd["decorators"], + }) + + if cmd["is_inline_handler"]: + inline_desc_map = {"default": cmd["doc"]} + inline_desc_map.update(cmd["lang_docs"]) + + for k, v in info["strings"].items(): + if k.startswith("_ihandle_doc_") and clean_name in k and v: + if k.endswith(f"_{clean_name}"): + lang_part = k[len("_ihandle_doc_"):-len(f"_{clean_name}")-1] + if lang_part: + inline_desc_map[lang_part] = v + elif k.startswith(f"_ihandle_doc_{clean_name}_"): + lang_part = k[len(f"_ihandle_doc_{clean_name}_"):] + if lang_part: + inline_desc_map[lang_part] = v + + info["inline_handlers"].append({ + "name": clean_name, + "description": inline_desc_map, + "decorators": cmd["decorators"], + }) + + module_data = info + break + + return module_data + +def main(): + base_dir = os.getcwd() + modules = {} + blacklisted_modules = load_blacklist("repositories.json") + + for root, dirs, files in os.walk(base_dir): + dirs[:] = [d for d in dirs if d not in ("venv", ".venv", "env", ".env", ".git")] + + for file in files: + if file.endswith(".py") and not file.startswith("_") and file not in blacklisted_modules.get(os.path.relpath(root, base_dir), []): + path = os.path.join(root, file) + try: + data = get_module_info(path) + if data: + rel = os.path.relpath(path, base_dir).replace("\\", "/") + modules[rel] = data + except Exception as e: + logger.error(f"Error processing {path}: {e}") + + output = { + "modules": modules, + "meta": { + "total_modules": len(modules), + "generated_at": __import__("datetime").datetime.now().isoformat(), + } } + with open("modules.json", "w", encoding="utf-8") as f: + json.dump(output, f, ensure_ascii=False, indent=2) -modules_data = {} -base_dir = os.getcwd() + print(f"modules.json written ({len(modules)} modules)") -for root, _, files in os.walk(base_dir): - for file in files: - if file.endswith(".py"): - file_path = os.path.join(root, file) - try: - module_info = get_module_info(file_path) - if module_info: - relative_path = os.path.relpath(file_path, base_dir) - modules_data[relative_path] = module_info - except Exception as e: - print(f"Ошибка при парсинге файла {file_path}: {e}") - -developers = parse_developers(base_dir) - -with open("modules.json", "w", encoding="utf-8") as json_file: - json.dump(modules_data, json_file, ensure_ascii=False, indent=2) - -print("Файл modules.json создан!") - -with open("developers.json", "w", encoding="utf-8") as json_file: - json.dump(developers, json_file, ensure_ascii=False, indent=2) - -print("Файл developers.json создан!") \ No newline at end of file +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/repositories.json b/repositories.json index aabbb9a..43cfa2f 100644 --- a/repositories.json +++ b/repositories.json @@ -2,191 +2,238 @@ "repositories": [ { "path": "DziruModules/hikkamods", - "tags": [] + "tags": [], + "blacklist": [] }, { "path": "kamolgks/Hikkamods", - "tags": [] + "tags": [], + "blacklist": [] }, { "path": "thomasmod/hikkamods", - "tags": [] + "tags": [], + "blacklist": [] }, { "path": "SkillsAngels/Modules", - "tags": [] + "tags": [], + "blacklist": [] }, { "path": "Sad0ff/modules-ftg", - "tags": [] + "tags": [], + "blacklist": [] }, { "path": "Yahikoro/Modules-for-FTG", - "tags": [] + "tags": [], + "blacklist": [] }, { "path": "KeyZenD/modules", - "tags": [] + "tags": [], + "blacklist": [] }, { "path": "AlpacaGang/ftg-modules", - "tags": [] + "tags": [], + "blacklist": [] }, { "path": "trololo65/Modules", - "tags": [] + "tags": [], + "blacklist": [] }, { "path": "Ijidishurka/modules", - "tags": [] + "tags": [], + "blacklist": [] }, { "path": "Fl1yd/FTG-Modules", - "tags": [] + "tags": [], + "blacklist": [] }, { "path": "D4n13l3k00/FTG-Modules", - "tags": [] + "tags": [], + "blacklist": [] }, { "path": "iamnalinor/FTG-modules", - "tags": ["hikkatrusted", "nonactive"] + "tags": ["hikkatrusted", "nonactive"], + "blacklist": [] }, { "path": "SekaiYoneya/modules", - "tags": [] + "tags": [], + "blacklist": [] }, { "path": "GeekTG/FTG-Modules", - "tags": [] + "tags": [], + "blacklist": [] }, { "path": "Den4ikSuperOstryyPer4ik/Astro-modules", - "tags": ["hikkatrusted", "herokutrusted"] + "tags": ["hikkatrusted", "herokutrusted"], + "blacklist": [] }, { "path": "vsecoder/hikka_modules", - "tags": ["hikkatrusted", "herokutrusted"] + "tags": ["hikkatrusted", "herokutrusted"], + "blacklist": [] }, { "path": "sqlmerr/hikka_mods", - "tags": ["hikkatrusted", "herokutrusted"] + "tags": ["hikkatrusted", "herokutrusted"], + "blacklist": [] }, { "path": "N3rcy/modules", - "tags": ["hikkatrusted", "herokutrusted"] + "tags": ["hikkatrusted", "herokutrusted"], + "blacklist": [] }, { "path": "KorenbZla/HikkaModules", - "tags": ["hikkatrusted", "herokutrusted"] + "tags": ["hikkatrusted", "herokutrusted"], + "blacklist": [] }, { "path": "MuRuLOSE/HikkaModulesRepo", - "tags": [] + "tags": [], + "blacklist": [] }, { "path": "coddrago/modules", - "tags": ["herokutrusted", "hikkatrusted"] + "tags": ["herokutrusted", "hikkatrusted"], + "blacklist": [] }, { "path": "1jpshiro/hikka-modules", - "tags": [] + "tags": [], + "blacklist": [] }, { "path": "MoriSummerz/ftg-mods", - "tags": ["hikkatrusted", "nonactive"] + "tags": ["hikkatrusted", "nonactive"], + "blacklist": [] }, { "path": "anon97945/hikka-mods", - "tags": ["hikkatrusted", "nonactive"] + "tags": ["hikkatrusted", "nonactive"], + "blacklist": [] }, { "path": "dorotorothequickend/DorotoroModules", - "tags": ["hikkatrusted", "nonlongermaintained"] + "tags": ["hikkatrusted", "nonlongermaintained"], + "blacklist": [] }, { "path": "AmoreForever/amoremods", - "tags": [] + "tags": [], + "blacklist": [] }, { "path": "idiotcoders/idiotmodules", - "tags": ["hikkatrusted", "herokutrusted", "nonactive"] + "tags": ["hikkatrusted", "herokutrusted", "nonactive"], + "blacklist": [] }, { "path": "CakesTwix/Hikka-Modules", - "tags": ["hikkatrusted", "nonactive"] + "tags": ["hikkatrusted", "nonactive"], + "blacklist": [] }, { "path": "archquise/H.Modules", - "tags": ["hikkatrusted", "nonactive"] + "tags": ["hikkatrusted", "nonactive"], + "blacklist": [] }, { "path": "GD-alt/mm-hikka-mods", - "tags": ["hikkatrusted", "herokutrusted", "nonactive"] + "tags": ["hikkatrusted", "herokutrusted", "nonactive"], + "blacklist": [] }, { "path": "HitaloSama/FTG-modules-repo", - "tags": [] + "tags": [], + "blacklist": [] }, { "path": "SekaiYoneya/Friendly-telegram", - "tags": [] + "tags": [], + "blacklist": [] }, { "path": "blazedzn/ftg-modules", - "tags": [] + "tags": [], + "blacklist": [] }, { "path": "hikariatama/ftg", - "tags": ["hikkatrusted", "nonactive"] + "tags": ["hikkatrusted", "nonactive"], + "blacklist": [] }, { "path": "m4xx1m/FTG", - "tags": [] + "tags": [], + "blacklist": [] }, { "path": "skillzmeow/skillzmods_hikka", - "tags": [] + "tags": [], + "blacklist": [] }, { "path": "fajox1/famods", - "tags": ["hikkatrusted", "herokutrusted"] + "tags": ["hikkatrusted", "herokutrusted"], + "blacklist": [] }, { "path": "TheKsenon/MyHikkaModules", - "tags": ["hikkatrusted", "herokutrusted"] + "tags": ["hikkatrusted", "herokutrusted"], + "blacklist": [] }, { "path": "cryptexctl/modules-mirror", - "tags": [] + "tags": [], + "blacklist": [] }, { "path": "Ruslan-Isaev/modules", - "tags": ["herokutrusted"] + "tags": ["herokutrusted"], + "blacklist": [] }, { "path": "shadowhikka/sh.modules", - "tags": [] + "tags": [], + "blacklist": [] }, { "path": "fiksofficial/python-modules", - "tags": ["herokutrusted"] + "tags": ["herokutrusted"], + "blacklist": [] }, { "path": "mead0wsss/mead0wsMods", - "tags": ["herokutrusted"] + "tags": ["herokutrusted"], + "blacklist": [] }, { "path": "SenkoGuardian/SenModules", - "tags": ["herokutrusted"] + "tags": ["herokutrusted"], + "blacklist": [] }, { "path": "ZetGoHack/nullmod", - "tags": ["herokutrusted"] + "tags": ["herokutrusted"], + "blacklist": [] }, { "path": "yummy1gay/limoka", - "tags": [] + "tags": [], + "blacklist": [] } ] } \ No newline at end of file diff --git a/update_diffs.py b/update_diffs.py new file mode 100644 index 0000000..d039d53 --- /dev/null +++ b/update_diffs.py @@ -0,0 +1,193 @@ +import asyncio +import aiohttp +import argparse +import subprocess +import os +import tempfile +from pathlib import Path + + +parser = argparse.ArgumentParser(description="Update Diffs Script") +parser.add_argument( + "--token", + type=str, + required=True, + help="Token of Telegram bot", +) +parser.add_argument( + "--api_url", + type=str, + default="https://api.telegram.org", + help="API URL of Telegram API", +) +parser.add_argument( + "--chat_id", + type=str, + required=True, + help="Chat ID to send updates to", +) +parser.add_argument( + "--base_commit", + type=str, + default="HEAD~1", + help="Base commit to compare against", +) + +arguments = parser.parse_args() + +async def send_message(session, text): + """Send a text message to the channel""" + url = f"{arguments.api_url}/bot{arguments.token}/sendMessage" + data = { + 'chat_id': arguments.chat_id, + 'text': text, + 'parse_mode': 'Markdown', + } + async with session.post(url, data=data) as response: + return await response.json() + +async def send_document(session, file_path, caption=None): + """Send a document to the channel""" + url = f"{arguments.api_url}/bot{arguments.token}/sendDocument" + with open(file_path, 'rb') as f: + data = aiohttp.FormData() + data.add_field('chat_id', arguments.chat_id) + data.add_field('document', f, filename=os.path.basename(file_path)) + if caption: + data.add_field('caption', caption) + data.add_field('parse_mode', 'Markdown') + async with session.post(url, data=data) as response: + return await response.json() + +def get_changed_files(base_commit): + """Get list of changed files between commits""" + try: + result = subprocess.check_output( + ['git', 'diff', '--name-only', base_commit, 'HEAD'], + cwd=os.getcwd() + ).decode().strip().split('\n') + return [f for f in result if f] + except subprocess.CalledProcessError: + return [] + +def get_file_diff(file_path, base_commit): + """Get diff for a specific file""" + try: + diff = subprocess.check_output( + ['git', 'diff', base_commit, 'HEAD', '--', file_path], + cwd=os.getcwd() + ).decode() + return diff + except subprocess.CalledProcessError: + return "" + +def is_module_file(file_path): + """Check if file is a Python module in a modules directory""" + # Check if it's a .py file and in a modules-like directory + return file_path.endswith('.py') and any( + part in file_path.lower() for part in [ + 'modules', 'mods', 'ftg', 'hikka' + ] + ) + +def extract_module_name(file_path): + """Extract module name from file path""" + return Path(file_path).stem + +async def main(): + changed_files = get_changed_files(arguments.base_commit) + + if not changed_files: + print("No changes detected") + return + + # Filter for module files only + module_files = [f for f in changed_files if is_module_file(f)] + + if not module_files: + print("No module changes detected") + return + + async with aiohttp.ClientSession() as session: + for file_path in module_files: + try: + module_name = extract_module_name(file_path) + + # Create message with raw GitHub URL + github_url = f"https://raw.githubusercontent.com/MuRuLOSE/limoka/refs/heads/main/{file_path}" + try: + new_hash = subprocess.check_output( + ['git', 'rev-list', '-n', '1', 'HEAD', '--', file_path], + cwd=os.getcwd() + ).decode().strip() + except Exception: + new_hash = 'HEAD' + + try: + old_hash = subprocess.check_output( + ['git', 'rev-list', '-n', '1', arguments.base_commit, '--', file_path], + cwd=os.getcwd() + ).decode().strip() + except Exception: + old_hash = arguments.base_commit + + diff_url = f"https://github.com/MuRuLOSE/limoka/compare/{old_hash}...{new_hash}.diff" + message = ( + f"🪼 Module {module_name} changes approved\n\n" + f"[File URL]({github_url}) | [Diff URL]({diff_url})\n\n" + ) + + # Get diff + diff = get_file_diff(file_path, arguments.base_commit) + + if not diff: + print(f"Skipping {file_path} - no diff content") + continue + + # Create temporary file with diff using only module name + diff_filename = f"{module_name}.diff" + with tempfile.NamedTemporaryFile( + mode='w', + suffix='', + prefix='', + delete=False, + encoding='utf-8', + dir=tempfile.gettempdir() + ) as tmp_file: + tmp_file.write(diff) + tmp_file_path = tmp_file.name + + try: + # Rename temp file to have proper name + final_path = os.path.join(tempfile.gettempdir(), diff_filename) + os.rename(tmp_file_path, final_path) + + # Send diff as document with full message as caption + doc_result = await send_document( + session, + final_path, + caption=message + ) + print(f"Sent diff for {module_name}: {doc_result}") + + except Exception as e: + print(f"Error sending {module_name}: {e}") + finally: + # Cleanup temp files + if os.path.exists(tmp_file_path): + try: + os.remove(tmp_file_path) + except: + pass + final_path = os.path.join(tempfile.gettempdir(), diff_filename) + if os.path.exists(final_path): + try: + os.remove(final_path) + except: + pass + + except Exception as e: + print(f"Error processing {file_path}: {e}") + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file