diff --git a/Limoka.py b/Limoka.py
index 08ac337..7b0fb2b 100644
--- a/Limoka.py
+++ b/Limoka.py
@@ -2,7 +2,7 @@
# requires: whoosh cryptography
-import datetime
+from collections import Counter, defaultdict
from whoosh.index import create_in, open_dir
from whoosh.fields import Schema, TEXT, ID
from whoosh.qparser import QueryParser, OrGroup
@@ -15,27 +15,41 @@ import html
import json
import re
import asyncio
-from typing import Union, List, Dict, Any, Optional
+from typing import Iterable, Union, List, Dict, Any, Optional
import hashlib
from telethon.types import Message
from telethon.errors.rpcerrorlist import WebpageMediaEmptyError
from telethon import TelegramClient
from telethon.errors.rpcerrorlist import YouBlockedUserError
-from telethon import functions, types
+from telethon import functions
+
try:
from aiogram.utils.exceptions import BadRequest
except ImportError:
from aiogram.exceptions import TelegramBadRequest as BadRequest
from .. import utils, loader
from ..types import InlineCall
+
logger = logging.getLogger("Limoka")
-__version__ = (1, 4, 0)
+__version__ = (1, 4, 1)
+
+WEIGHTS = {
+ "inline.token_obtainment": 15,
+ "main": 10,
+ "inline": 7,
+ "translations": 5,
+ "security": 3,
+}
+
+DEFAULT_WEIGHT = 1
+
def _get_lang_value(data: Dict[str, Any], lang: str) -> str:
if not isinstance(data, dict):
return str(data) if data else ""
return data.get(lang, data.get("default", data.get("en", "")))
+
class Search:
def __init__(self, query, ix):
self.schema = Schema(
@@ -56,6 +70,7 @@ class Search:
return list(set(result["path"] for result in results))
return []
+
class LimokaAPI:
async def fetch_json(self, base_url, path):
url = f"{base_url}{path}"
@@ -63,9 +78,11 @@ class LimokaAPI:
async with session.get(url) as response:
return json.loads(await response.text())
+
@loader.tds
class Limoka(loader.Module):
"""Modules are now in one place with easy searching!"""
+
strings = {
"name": "Limoka",
"wait": (
@@ -81,9 +98,7 @@ class Limoka(loader.Module):
"🧑💻 Developer: {username}\n\n"
"🏷 Tags: {tags}\n\n"
),
- "found_body": (
- "{commands}"
- ),
+ "found_body": ("{commands}"),
"found_footer": (
"\n🪄 {prefix}dlm {url}{module_path}"
),
@@ -168,8 +183,8 @@ class Limoka(loader.Module):
"hikkatrusted": "Hikka Trusted",
"nonactive": "Non-Active Repository",
"nonlongermaintained": "No Longer Maintained Repository",
- "newbie": "Newbie"
- }
+ "newbie": "Newbie",
+ },
}
strings_ru = {
"name": "Limoka",
@@ -185,9 +200,7 @@ class Limoka(loader.Module):
"🧑💻 Разработчик: {username}\n\n"
"🏷 Теги: {tags}\n\n"
),
- "found_body": (
- "{commands}"
- ),
+ "found_body": ("{commands}"),
"found_footer": (
"\n🪄 {prefix}dlm {url}{module_path}"
),
@@ -220,7 +233,7 @@ class Limoka(loader.Module):
(
"🔎 Limoka имеет лучший поиск*!\n"
"* В сравнении с предыдущей версией Limoka"
- )
+ ),
],
"inline404": "Не найдено",
"inline?": "Запрос слишком короткий / не найден",
@@ -276,7 +289,7 @@ class Limoka(loader.Module):
"hikkatrusted": "Hikka Trusted",
"nonactive": "Неактивный репозиторий",
"nonlongermaintained": "Неподдерживаемый репозиторий",
- "newbie": "Новичок"
+ "newbie": "Новичок",
},
"_cls_doc": "Модули теперь в одном месте с простым и удобным поиском!",
}
@@ -307,22 +320,22 @@ class Limoka(loader.Module):
self._invalid_banners = set()
self._bot_username = "limoka_bbot"
self._base_url = self.config["limokaurl"]
-
+
# Search session states
self.SEARCH_STATES = {
- "no_banner": "no_banner", # 404 - Нет баннера
- "global_search": "global_search", # Глобальный поиск
- "not_found": "not_found", # Не найдено (модуль)
- "filter_select": "filter_select", # Выбор категорий (фильтров)
+ "no_banner": "no_banner", # 404 - Нет баннера
+ "global_search": "global_search", # Глобальный поиск
+ "not_found": "not_found", # Не найдено (модуль)
+ "filter_select": "filter_select", # Выбор категорий (фильтров)
}
-
+
# State banners - placeholders for now
self.state_banners = {
"no_banner": "https://raw.githubusercontent.com/MuRuLOSE/hikka-assets/refs/heads/main/Limoka%20-%20No%20banner.png",
"global_search": "https://raw.githubusercontent.com/MuRuLOSE/hikka-assets/main/Limoka%20-%20Global%20Search.png",
"not_found": "https://raw.githubusercontent.com/MuRuLOSE/hikka-assets/main/Limoka%20-%20Not%20Found.png",
"filter_select": "https://raw.githubusercontent.com/MuRuLOSE/hikka-assets/main/Limoka%20-%20Categories.png",
- }
+ }
def _filter_newbies(self, modules: Dict[str, Any]) -> Dict[str, Any]:
"""Filter out modules which belong to repositories tagged as 'newbie'.
@@ -357,7 +370,7 @@ class Limoka(loader.Module):
current_index: int = 0,
) -> Dict[str, Any]:
"""Create a search session dictionary to track state across callbacks.
-
+
Args:
state: Current search state (one of SEARCH_STATES values)
query: Current search query
@@ -365,7 +378,7 @@ class Limoka(loader.Module):
results: Search results list
current_index: Index of current result being displayed
banner_url: Banner image URL for current state
-
+
Returns:
Dictionary containing the complete session state
"""
@@ -393,16 +406,13 @@ class Limoka(loader.Module):
else:
self.ix = open_dir("limoka_search")
self._history = self.pointer("history", [])
- self.modules = (await self.api.fetch_json(
- self._base_url, "modules.json"
- )).get("modules", {})
- raw = (await self.api.fetch_json(
- self._base_url, "repositories.json"
- )).get("repositories", [])
- self.repositories = {
- repo["url"]: repo
- for repo in raw
- }
+ self.modules = (await self.api.fetch_json(self._base_url, "modules.json")).get(
+ "modules", {}
+ )
+ raw = (await self.api.fetch_json(self._base_url, "repositories.json")).get(
+ "repositories", []
+ )
+ self.repositories = {repo["url"]: repo for repo in raw}
# Apply newbie filter if enabled
try:
self.modules = self._filter_newbies(self.modules)
@@ -414,24 +424,28 @@ class Limoka(loader.Module):
try:
message = await self.client.get_messages(self._bot_username, limit=1)
if not message:
- message = await self.client.send_message(self._bot_username, "/start")
+ message = await self.client.send_message(
+ self._bot_username, "/start"
+ )
await message.delete()
- await self.client(functions.messages.DeleteHistoryRequest(
- peer=self._bot_username,
- max_id=0,
- just_clear=True,
- revoke=True,
- ))
+ await self.client(
+ functions.messages.DeleteHistoryRequest(
+ peer=self._bot_username,
+ max_id=0,
+ just_clear=True,
+ revoke=True,
+ )
+ )
except YouBlockedUserError:
- logger.warning(f"Please unblock {self._bot_username} to enable external installation feature. Or disable external_install_allowed in Limoka settings to get rid of this message.")
+ logger.warning(
+ f"Please unblock {self._bot_username} to enable external installation feature. Or disable external_install_allowed in Limoka settings to get rid of this message."
+ )
self._userbot_bot_username = (await self.inline.bot.get_me()).username
@loader.loop(interval=3600)
async def _update_modules_loop(self):
- self.modules = await self.api.fetch_json(
- self._base_url, "modules.json"
- )
+ self.modules = await self.api.fetch_json(self._base_url, "modules.json")
# Re-apply newbie filter after modules refresh
try:
self.modules = self._filter_newbies(self.modules)
@@ -446,7 +460,18 @@ class Limoka(loader.Module):
writer.add_document(
title=module_data["name"],
path=module_path,
- content=module_data["name"] + " " + (module_data.get("description") or "" + " " + ((module_data.get("meta").get("developer") or "") if module_data.get("meta") else "")),
+ content=module_data["name"]
+ + " "
+ + (
+ module_data.get("description")
+ or ""
+ + " "
+ + (
+ (module_data.get("meta").get("developer") or "")
+ if module_data.get("meta")
+ else ""
+ )
+ ),
)
for func in module_data.get("commands", []):
for command, description in func.items():
@@ -462,7 +487,9 @@ class Limoka(loader.Module):
return None
try:
async with aiohttp.ClientSession() as session:
- async with session.head(url, timeout=5, allow_redirects=True) as response:
+ async with session.head(
+ url, timeout=5, allow_redirects=True
+ ) as response:
if response.status != 200:
self._invalid_banners.add(url)
return None
@@ -475,9 +502,43 @@ class Limoka(loader.Module):
if url:
self._invalid_banners.add(url)
return None
-
+
+ def find_userbot(self, keys: Iterable[str]) -> str | None:
+ scores = defaultdict(int)
+
+ for key in keys:
+ parts = key.split(".")
+
+ # проходим все префиксы
+ for i in range(1, len(parts)):
+ prefix = ".".join(parts[:i])
+ suffix = ".".join(parts[i:])
+
+ weight = WEIGHTS.get(suffix, DEFAULT_WEIGHT)
+
+ scores[prefix] += weight
+
+ if not scores:
+ return None
+
+ return max(scores, key=scores.get)
+
+
+ @property
def user_lang(self) -> str:
- self.db.get("heroku.translations", "lang")
+
+ userbot = self.find_userbot(self.db.keys())
+
+ if not userbot:
+ logger.warning(
+ "Cannot determine userbot type. "
+ "Probably not FTG-like Userbot? "
+ "Defaulting language to English. "
+ "If this is unexpected, please report to the module developer."
+ )
+ return "en"
+
+ return self.db.get(f"{userbot}.translations", "lang")
def generate_commands(self, module_info, lang: str = "en"):
commands = []
@@ -507,7 +568,6 @@ class Limoka(loader.Module):
)
return commands
-
def _format_module_content(
self,
module_info: Dict[str, Any],
@@ -520,12 +580,14 @@ class Limoka(loader.Module):
name = html.escape(module_info.get("name") or self.strings["no_info"])
cls_doc = module_info.get("cls_doc", {})
description = html.escape(
- _get_lang_value(cls_doc, lang) or
- _get_lang_value(module_info.get("description", ""), lang) or
- self.strings["no_info"]
+ _get_lang_value(cls_doc, lang)
+ or _get_lang_value(module_info.get("description", ""), lang)
+ or self.strings["no_info"]
+ )
+ dev_username = html.escape(module_info["meta"].get("developer") or "Unknown")
+ raw_path = (
+ module_path if module_path is not None else module_info.get("path", "")
)
- dev_username = html.escape(module_info["meta"].get("developer", "Unknown"))
- raw_path = module_path if module_path is not None else module_info.get("path", "")
clean_module_path = (raw_path or "").replace("\\", "/")
commands = self.generate_commands(module_info, lang)
categories_text = ""
@@ -537,7 +599,9 @@ class Limoka(loader.Module):
)
if len(description) > 300:
description = description[:297] + "…"
- repo_key = "/".join(module_path.split("/")[:2]) if "/" in module_path else module_path
+ repo_key = (
+ "/".join(module_path.split("/")[:2]) if "/" in module_path else module_path
+ )
tags_list = []
for x in self.repositories:
if x.replace("https://github.com/", "") == repo_key:
@@ -577,14 +641,12 @@ class Limoka(loader.Module):
)
return header, body_pages, footer, categories_text
- def _build_navigation_markup(
- self, session: Dict[str, Any]
- ) -> list:
+ def _build_navigation_markup(self, session: Dict[str, Any]) -> list:
result = session["results"]
index = session["current_index"]
query = session["query"]
filters = session["filters"]
-
+
page = index + 1
markup = [
[
@@ -596,7 +658,11 @@ class Limoka(loader.Module):
{"text": f"{page}/{len(result)}", "callback": self._inline_void},
{
"text": "⏩" if index + 1 < len(result) else "🚫",
- "callback": self._next_page if index + 1 < len(result) else self._inline_void,
+ "callback": (
+ self._next_page
+ if index + 1 < len(result)
+ else self._inline_void
+ ),
"args": (session,) if index + 1 < len(result) else (),
},
],
@@ -625,60 +691,93 @@ class Limoka(loader.Module):
return markup
def _build_module_markup(
- self, session: Dict[str, Any], body_pages: List[str], page_body: int, module_path: str
+ self,
+ session: Dict[str, Any],
+ body_pages: List[str],
+ page_body: int,
+ module_path: str,
) -> list:
result = session["results"]
index = session["current_index"]
query = session["query"]
filters = session["filters"]
-
+
markup = []
if len(body_pages) > 1:
- markup.append([
- {
- "text": "◀️" if page_body > 0 else "🚫",
- "callback": self._previous_body_page if page_body > 0 else self._inline_void,
- "args": (session, module_path, page_body) if page_body > 0 else (),
- },
- {"text": f"Body {page_body + 1}/{len(body_pages)}", "callback": self._inline_void},
- {
- "text": "▶️" if page_body + 1 < len(body_pages) else "🚫",
- "callback": self._next_body_page if page_body + 1 < len(body_pages) else self._inline_void,
- "args": (session, module_path, page_body) if page_body + 1 < len(body_pages) else (),
- },
- ])
+ markup.append(
+ [
+ {
+ "text": "◀️" if page_body > 0 else "🚫",
+ "callback": (
+ self._previous_body_page
+ if page_body > 0
+ else self._inline_void
+ ),
+ "args": (
+ (session, module_path, page_body) if page_body > 0 else ()
+ ),
+ },
+ {
+ "text": f"Body {page_body + 1}/{len(body_pages)}",
+ "callback": self._inline_void,
+ },
+ {
+ "text": "▶️" if page_body + 1 < len(body_pages) else "🚫",
+ "callback": (
+ self._next_body_page
+ if page_body + 1 < len(body_pages)
+ else self._inline_void
+ ),
+ "args": (
+ (session, module_path, page_body)
+ if page_body + 1 < len(body_pages)
+ else ()
+ ),
+ },
+ ]
+ )
page = index + 1
- markup.append([
- {
- "text": "⏪" if index > 0 else "🚫",
- "callback": self._previous_page if index > 0 else self._inline_void,
- "args": (session,) if index > 0 else (),
- },
- {"text": f"{page}/{len(result)}", "callback": self._inline_void},
- {
- "text": "⏩" if index + 1 < len(result) else "🚫",
- "callback": self._next_page if index + 1 < len(result) else self._inline_void,
- "args": (session,) if index + 1 < len(result) else (),
- },
- ])
- markup.append([
- {
- "text": "🔍 " + self.strings["filter_menu"].split(":")[0],
- "callback": self._display_filter_menu,
- "args": (session,),
- },
- {
- "text": "🔄 " + self.strings["change_query"],
- "callback": self._enter_query,
- },
- ])
- markup.append([
- {
- "text": self.strings["global_button"],
- "callback": self._show_global_results,
- "args": (session,),
- },
- ])
+ markup.append(
+ [
+ {
+ "text": "⏪" if index > 0 else "🚫",
+ "callback": self._previous_page if index > 0 else self._inline_void,
+ "args": (session,) if index > 0 else (),
+ },
+ {"text": f"{page}/{len(result)}", "callback": self._inline_void},
+ {
+ "text": "⏩" if index + 1 < len(result) else "🚫",
+ "callback": (
+ self._next_page
+ if index + 1 < len(result)
+ else self._inline_void
+ ),
+ "args": (session,) if index + 1 < len(result) else (),
+ },
+ ]
+ )
+ markup.append(
+ [
+ {
+ "text": "🔍 " + self.strings["filter_menu"].split(":")[0],
+ "callback": self._display_filter_menu,
+ "args": (session,),
+ },
+ {
+ "text": "🔄 " + self.strings["change_query"],
+ "callback": self._enter_query,
+ },
+ ]
+ )
+ markup.append(
+ [
+ {
+ "text": self.strings["global_button"],
+ "callback": self._show_global_results,
+ "args": (session,),
+ },
+ ]
+ )
markup.append(
[{"text": self.strings.get("close", "❌ Close"), "action": "close"}]
)
@@ -709,7 +808,9 @@ class Limoka(loader.Module):
)
else:
if photo is not None:
- await message_or_call.edit(text=text, reply_markup=markup, photo=photo)
+ await message_or_call.edit(
+ text=text, reply_markup=markup, photo=photo
+ )
else:
await message_or_call.edit(text=text, reply_markup=markup)
except (BadRequest, WebpageMediaEmptyError) as e:
@@ -730,8 +831,11 @@ class Limoka(loader.Module):
try:
query = session["query"]
filters = session["filters"]
-
- lang = self.user_lang()
+
+ lang = self.user_lang
+ logger.info(
+ f"Displaying module: {module_path} for query: {query} with filters: {filters} in language: {lang}"
+ )
module_banner_raw = module_info.get("meta", {}).get("banner")
photo = await self._validate_url(module_banner_raw)
@@ -750,11 +854,11 @@ class Limoka(loader.Module):
current_body = body_pages[min(page_body, len(body_pages) - 1)]
full_message = header + current_body + footer + categories_text
- markup = self._build_module_markup(session, body_pages, page_body, module_path)
-
- await self._safe_display(
- message_or_call, full_message, markup, photo
+ markup = self._build_module_markup(
+ session, body_pages, page_body, module_path
)
+
+ await self._safe_display(message_or_call, full_message, markup, photo)
except Exception as e:
logger.exception(f"Error in _display_module: {e}")
if isinstance(message_or_call, Message):
@@ -763,30 +867,45 @@ class Limoka(loader.Module):
await message_or_call.edit(self.strings["error_occurred"])
async def _previous_body_page(
- self, call: InlineCall, session: Dict[str, Any], module_path: str, page_body: int
+ self,
+ call: InlineCall,
+ session: Dict[str, Any],
+ module_path: str,
+ page_body: int,
):
module_info = self.modules[module_path]
new_page_body = max(page_body - 1, 0)
- await self._display_module(call, module_info, module_path, session, page_body=new_page_body)
+ await self._display_module(
+ call, module_info, module_path, session, page_body=new_page_body
+ )
async def _next_body_page(
- self, call: InlineCall, session: Dict[str, Any], module_path: str, page_body: int
+ self,
+ call: InlineCall,
+ session: Dict[str, Any],
+ module_path: str,
+ page_body: int,
):
module_info = self.modules[module_path]
query = session["query"]
filters = session["filters"]
header, body_pages, footer, categories_text = self._format_module_content(
- module_info, query, filters, include_categories=True, module_path=module_path, lang=self.user_lang()
+ module_info,
+ query,
+ filters,
+ include_categories=True,
+ module_path=module_path,
+ lang=self.user_lang,
)
new_page_body = min(page_body + 1, len(body_pages) - 1)
- await self._display_module(call, module_info, module_path, session, page_body=new_page_body)
+ await self._display_module(
+ call, module_info, module_path, session, page_body=new_page_body
+ )
- async def _display_filter_menu(
- self, call: InlineCall, session: Dict[str, Any]
- ):
+ async def _display_filter_menu(self, call: InlineCall, session: Dict[str, Any]):
query = session["query"]
current_filters = session["filters"]
-
+
categories = current_filters.get("category", [])
filters_text = self.strings["selected_categories"].format(
categories=(
@@ -823,14 +942,14 @@ class Limoka(loader.Module):
[{"text": self.strings.get("close", "❌ Close"), "action": "close"}],
]
text = self.strings["filter_menu"].format(query=query) + f"\n{filters_text}"
- await call.edit(text, reply_markup=markup, photo=self._get_banner_for_state("filter_select"))
+ await call.edit(
+ text, reply_markup=markup, photo=self._get_banner_for_state("filter_select")
+ )
- async def _select_category(
- self, call: InlineCall, session: Dict[str, Any]
- ):
+ async def _select_category(self, call: InlineCall, session: Dict[str, Any]):
query = session["query"]
current_filters = session["filters"]
-
+
all_categories = set()
for module_data in self.modules.values():
all_categories.update(module_data.get("category", ["No category"]))
@@ -860,7 +979,7 @@ class Limoka(loader.Module):
)
if cat in selected_categories:
button_text = "✅ " + button_text
-
+
# Create new session with updated filters
new_session = session.copy()
row.append(
@@ -893,7 +1012,7 @@ class Limoka(loader.Module):
):
query = session["query"]
current_filters = session["filters"]
-
+
new_filters = current_filters.copy()
selected_categories = new_filters.get("category", [])
if category in selected_categories:
@@ -921,7 +1040,7 @@ class Limoka(loader.Module):
):
query = session["query"]
filters = session["filters"]
-
+
searcher = Search(query.lower(), self.ix)
try:
result = searcher.search_module()
@@ -983,7 +1102,7 @@ class Limoka(loader.Module):
return
module_path = filtered_result[0]
module_info = self.modules[module_path]
-
+
# Create session for displaying module
display_session = self._create_search_session(
state=self.SEARCH_STATES["global_search"],
@@ -992,9 +1111,7 @@ class Limoka(loader.Module):
results=filtered_result,
current_index=0,
)
- await self._display_module(
- call, module_info, module_path, display_session, 0
- )
+ await self._display_module(call, module_info, module_path, display_session, 0)
async def _enter_query_handler(
self, call_or_query, query: Optional[str] = None, *args, **kwargs
@@ -1074,7 +1191,7 @@ class Limoka(loader.Module):
return
module_path = result[0]
module_info = self.modules[module_path]
-
+
# Create session for displaying module
display_session = self._create_search_session(
state=self.SEARCH_STATES["global_search"],
@@ -1098,11 +1215,13 @@ class Limoka(loader.Module):
{
"text": self.strings["back_to_results"],
"callback": self._show_results,
- "args": (self._create_search_session(
- state=self.SEARCH_STATES["global_search"],
- query=query or "",
- filters={},
- ),),
+ "args": (
+ self._create_search_session(
+ state=self.SEARCH_STATES["global_search"],
+ query=query or "",
+ filters={},
+ ),
+ ),
}
],
[
@@ -1116,7 +1235,7 @@ class Limoka(loader.Module):
async def _show_global_results(self, call: InlineCall, session: Dict[str, Any]):
query = session["query"]
-
+
searcher = Search(query.lower(), self.ix)
try:
result = searcher.search_module()
@@ -1145,7 +1264,7 @@ class Limoka(loader.Module):
if not info:
continue
name = info.get("name", "Unknown")
-
+
global_session = self._create_search_session(
state=self.SEARCH_STATES["global_search"],
query=query,
@@ -1171,47 +1290,37 @@ class Limoka(loader.Module):
self, call: InlineCall, module_path: str, session: Dict[str, Any]
):
module_info = self.modules[module_path]
- await self._display_module(
- call, module_info, module_path, session, 0
- )
+ await self._display_module(call, module_info, module_path, session, 0)
- async def _next_page(
- self, call: InlineCall, session: Dict[str, Any]
- ):
+ async def _next_page(self, call: InlineCall, session: Dict[str, Any]):
result = session["results"]
index = session["current_index"]
-
+
if index + 1 >= len(result):
await call.answer(self.strings["last_page"])
return
index += 1
module_path = result[index]
module_info = self.modules[module_path]
-
+
new_session = session.copy()
new_session["current_index"] = index
- await self._display_module(
- call, module_info, module_path, new_session, 0
- )
+ await self._display_module(call, module_info, module_path, new_session, 0)
- async def _previous_page(
- self, call: InlineCall, session: Dict[str, Any]
- ):
+ async def _previous_page(self, call: InlineCall, session: Dict[str, Any]):
result = session["results"]
index = session["current_index"]
-
+
if index - 1 < 0:
await call.answer(self.strings["first_page"])
return
index -= 1
module_path = result[index]
module_info = self.modules[module_path]
-
+
new_session = session.copy()
new_session["current_index"] = index
- await self._display_module(
- call, module_info, module_path, new_session, 0
- )
+ await self._display_module(call, module_info, module_path, new_session, 0)
async def _inline_void(self, call: InlineCall):
await call.answer()
@@ -1244,7 +1353,7 @@ class Limoka(loader.Module):
text=self.strings["start_search_form"],
message=message,
reply_markup=markup,
- photo=self._get_banner_for_state("global_search")
+ photo=self._get_banner_for_state("global_search"),
)
return
history = self.get("history", [])
@@ -1269,7 +1378,7 @@ class Limoka(loader.Module):
return await utils.answer(message, self.strings["404"].format(query=args))
module_path = result[0]
module_info = self.modules[module_path]
-
+
# Create session for displaying module
display_session = self._create_search_session(
state=self.SEARCH_STATES["global_search"],
@@ -1278,7 +1387,9 @@ class Limoka(loader.Module):
results=result,
current_index=0,
)
- await self._display_module(message, module_info, module_path, display_session, 0)
+ await self._display_module(
+ message, module_info, module_path, display_session, 0
+ )
async def _show_global_form(self, call: InlineCall, message: Message):
markup = [
@@ -1309,12 +1420,12 @@ class Limoka(loader.Module):
self, call: InlineCall, query: str, message: Message, *args, **kwargs
):
global_session = self._create_search_session(
- state=self.SEARCH_STATES["global_search"],
- query=query,
- filters={},
- results=[],
- current_index=0,
- ) # idk what is that crap but it works lol
+ state=self.SEARCH_STATES["global_search"],
+ query=query,
+ filters={},
+ results=[],
+ current_index=0,
+ ) # idk what is that crap but it works lol
if len(query) <= 1:
await call.edit(
self.strings["?"],
@@ -1442,16 +1553,20 @@ class Limoka(loader.Module):
elif hasattr(message.from_id, "channel_id"):
sender_id = message.from_id.channel_id
if sender_id != self._service_bot_id:
- logger.debug("Message not from official bot, ignoring")
+ # logger.debug("Message not from official bot, ignoring")
return
if not self.config["external_install_allowed"]:
return
try:
- clean_text = getattr(message, "raw_text", None) or getattr(
- message, "message", None
- ) or message.text or ""
+ clean_text = (
+ getattr(message, "raw_text", None)
+ or getattr(message, "message", None)
+ or message.text
+ or ""
+ )
if message.entities:
from html import unescape
+
clean_text = unescape(clean_text)
clean_text = re.sub(r"<[^>]+>", "", clean_text)
match = re.search(r"#limoka:([^\s\"'<>]+)", clean_text)
@@ -1480,25 +1595,37 @@ class Limoka(loader.Module):
if not found:
logger.warning(f"Module not found after cleanup: {module_path}")
await utils.answer(
- message, self.strings["watcher_module_not_found"].format(path=html.escape(module_path))
+ message,
+ self.strings["watcher_module_not_found"].format(
+ path=html.escape(module_path)
+ ),
)
return
try:
import base64
from cryptography.hazmat.primitives.asymmetric import ed25519
- PUB_KEY_B64 = "MCowBQYDK2VwAyEA1ltSnqtf3pGBuctuAYqHivCXsaRtKOVxavai7yin7ZE="
+
+ PUB_KEY_B64 = (
+ "MCowBQYDK2VwAyEA1ltSnqtf3pGBuctuAYqHivCXsaRtKOVxavai7yin7ZE="
+ )
der_bytes = base64.b64decode(PUB_KEY_B64)
raw_pubkey = der_bytes[-32:]
module_url = self.config["limokaurl"] + module_path
async with aiohttp.ClientSession() as session:
async with session.get(module_url, timeout=10) as resp:
if resp.status != 200:
- logger.error(f"Failed to fetch module for verification: {module_url} (HTTP {resp.status})")
- await utils.answer(message, self.strings["watcher_loader_missing"])
+ logger.error(
+ f"Failed to fetch module for verification: {module_url} (HTTP {resp.status})"
+ )
+ await utils.answer(
+ message, self.strings["watcher_loader_missing"]
+ )
return
module_bytes = await resp.read()
sha256 = hashlib.sha256(module_bytes).hexdigest()
- public_key = ed25519.Ed25519PublicKey.from_public_bytes(raw_pubkey)
+ public_key = ed25519.Ed25519PublicKey.from_public_bytes(
+ raw_pubkey
+ )
signature = bytes.fromhex(signature_hex)
signed_payload = f"{module_path}|{sha256}".encode()
public_key.verify(signature, signed_payload)
@@ -1522,21 +1649,27 @@ class Limoka(loader.Module):
if status:
try:
bot_peer = await self.client.get_entity(self._service_bot_id)
- await self.client.send_message(bot_peer, f"#limoka:sucsess:{message.id}")
+ await self.client.send_message(
+ bot_peer, f"#limoka:sucsess:{message.id}"
+ )
except Exception as e:
logger.error(f"Failed to send success confirmation: {e}")
else:
logger.error(f"Installation failed with status: {status}")
try:
bot_peer = await self.client.get_entity(self._service_bot_id)
- await self.client.send_message(bot_peer, f"#limoka:failed:{message.id}")
+ await self.client.send_message(
+ bot_peer, f"#limoka:failed:{message.id}"
+ )
except Exception as e:
logger.error(f"Failed to send failure notification: {e}")
except Exception as e:
logger.exception(f"CRITICAL ERROR in secure_install_watcher: {e}")
try:
- await utils.answer(message, self.strings["watcher_critical"].format(error=str(e)[:100]))
+ await utils.answer(
+ message, self.strings["watcher_critical"].format(error=str(e)[:100])
+ )
await asyncio.sleep(5)
await message.delete()
except Exception:
- pass
\ No newline at end of file
+ pass