lang and dev_username hotfix

This commit is contained in:
2026-02-06 12:02:48 +03:00
parent f68d97d09b
commit e076d4ab28

499
Limoka.py
View File

@@ -2,7 +2,7 @@
# requires: whoosh cryptography
import datetime
from collections import Counter, defaultdict
from whoosh.index import create_in, open_dir
from whoosh.fields import Schema, TEXT, ID
from whoosh.qparser import QueryParser, OrGroup
@@ -15,27 +15,41 @@ import html
import json
import re
import asyncio
from typing import Union, List, Dict, Any, Optional
from typing import Iterable, Union, List, Dict, Any, Optional
import hashlib
from telethon.types import Message
from telethon.errors.rpcerrorlist import WebpageMediaEmptyError
from telethon import TelegramClient
from telethon.errors.rpcerrorlist import YouBlockedUserError
from telethon import functions, types
from telethon import functions
try:
from aiogram.utils.exceptions import BadRequest
except ImportError:
from aiogram.exceptions import TelegramBadRequest as BadRequest
from .. import utils, loader
from ..types import InlineCall
logger = logging.getLogger("Limoka")
__version__ = (1, 4, 0)
__version__ = (1, 4, 1)
WEIGHTS = {
"inline.token_obtainment": 15,
"main": 10,
"inline": 7,
"translations": 5,
"security": 3,
}
DEFAULT_WEIGHT = 1
def _get_lang_value(data: Dict[str, Any], lang: str) -> str:
if not isinstance(data, dict):
return str(data) if data else ""
return data.get(lang, data.get("default", data.get("en", "")))
class Search:
def __init__(self, query, ix):
self.schema = Schema(
@@ -56,6 +70,7 @@ class Search:
return list(set(result["path"] for result in results))
return []
class LimokaAPI:
async def fetch_json(self, base_url, path):
url = f"{base_url}{path}"
@@ -63,9 +78,11 @@ class LimokaAPI:
async with session.get(url) as response:
return json.loads(await response.text())
@loader.tds
class Limoka(loader.Module):
"""Modules are now in one place with easy searching!"""
strings = {
"name": "Limoka",
"wait": (
@@ -81,9 +98,7 @@ class Limoka(loader.Module):
"<b><emoji document_id=5418299289141004396>🧑‍💻</emoji> Developer:</b> {username}\n\n"
"<b><emoji document_id=5418376169055602355>🏷</emoji> Tags:</b> {tags}\n\n"
),
"found_body": (
"{commands}"
),
"found_body": ("{commands}"),
"found_footer": (
"\n<emoji document_id=5411143117711624172>🪄</emoji> <code>{prefix}dlm {url}{module_path}</code>"
),
@@ -168,8 +183,8 @@ class Limoka(loader.Module):
"hikkatrusted": "Hikka Trusted",
"nonactive": "Non-Active Repository",
"nonlongermaintained": "No Longer Maintained Repository",
"newbie": "Newbie"
}
"newbie": "Newbie",
},
}
strings_ru = {
"name": "Limoka",
@@ -185,9 +200,7 @@ class Limoka(loader.Module):
"<b><emoji document_id=5418299289141004396>🧑‍💻</emoji> Разработчик:</b> {username}\n\n"
"<b><emoji document_id=5418376169055602355>🏷</emoji> Теги:</b> {tags}\n\n"
),
"found_body": (
"{commands}"
),
"found_body": ("{commands}"),
"found_footer": (
"\n<emoji document_id=5411143117711624172>🪄</emoji> <code>{prefix}dlm {url}{module_path}</code>"
),
@@ -220,7 +233,7 @@ class Limoka(loader.Module):
(
"<emoji document_id=5188311512791393083>🔎</emoji> Limoka имеет лучший поиск*!\n"
"<i>* В сравнении с предыдущей версией Limoka</i>"
)
),
],
"inline404": "Не найдено",
"inline?": "Запрос слишком короткий / не найден",
@@ -276,7 +289,7 @@ class Limoka(loader.Module):
"hikkatrusted": "Hikka Trusted",
"nonactive": "Неактивный репозиторий",
"nonlongermaintained": "Неподдерживаемый репозиторий",
"newbie": "Новичок"
"newbie": "Новичок",
},
"_cls_doc": "Модули теперь в одном месте с простым и удобным поиском!",
}
@@ -307,22 +320,22 @@ class Limoka(loader.Module):
self._invalid_banners = set()
self._bot_username = "limoka_bbot"
self._base_url = self.config["limokaurl"]
# Search session states
self.SEARCH_STATES = {
"no_banner": "no_banner", # 404 - Нет баннера
"global_search": "global_search", # Глобальный поиск
"not_found": "not_found", # Не найдено (модуль)
"filter_select": "filter_select", # Выбор категорий (фильтров)
"no_banner": "no_banner", # 404 - Нет баннера
"global_search": "global_search", # Глобальный поиск
"not_found": "not_found", # Не найдено (модуль)
"filter_select": "filter_select", # Выбор категорий (фильтров)
}
# State banners - placeholders for now
self.state_banners = {
"no_banner": "https://raw.githubusercontent.com/MuRuLOSE/hikka-assets/refs/heads/main/Limoka%20-%20No%20banner.png",
"global_search": "https://raw.githubusercontent.com/MuRuLOSE/hikka-assets/main/Limoka%20-%20Global%20Search.png",
"not_found": "https://raw.githubusercontent.com/MuRuLOSE/hikka-assets/main/Limoka%20-%20Not%20Found.png",
"filter_select": "https://raw.githubusercontent.com/MuRuLOSE/hikka-assets/main/Limoka%20-%20Categories.png",
}
}
def _filter_newbies(self, modules: Dict[str, Any]) -> Dict[str, Any]:
"""Filter out modules which belong to repositories tagged as 'newbie'.
@@ -357,7 +370,7 @@ class Limoka(loader.Module):
current_index: int = 0,
) -> Dict[str, Any]:
"""Create a search session dictionary to track state across callbacks.
Args:
state: Current search state (one of SEARCH_STATES values)
query: Current search query
@@ -365,7 +378,7 @@ class Limoka(loader.Module):
results: Search results list
current_index: Index of current result being displayed
banner_url: Banner image URL for current state
Returns:
Dictionary containing the complete session state
"""
@@ -393,16 +406,13 @@ class Limoka(loader.Module):
else:
self.ix = open_dir("limoka_search")
self._history = self.pointer("history", [])
self.modules = (await self.api.fetch_json(
self._base_url, "modules.json"
)).get("modules", {})
raw = (await self.api.fetch_json(
self._base_url, "repositories.json"
)).get("repositories", [])
self.repositories = {
repo["url"]: repo
for repo in raw
}
self.modules = (await self.api.fetch_json(self._base_url, "modules.json")).get(
"modules", {}
)
raw = (await self.api.fetch_json(self._base_url, "repositories.json")).get(
"repositories", []
)
self.repositories = {repo["url"]: repo for repo in raw}
# Apply newbie filter if enabled
try:
self.modules = self._filter_newbies(self.modules)
@@ -414,24 +424,28 @@ class Limoka(loader.Module):
try:
message = await self.client.get_messages(self._bot_username, limit=1)
if not message:
message = await self.client.send_message(self._bot_username, "/start")
message = await self.client.send_message(
self._bot_username, "/start"
)
await message.delete()
await self.client(functions.messages.DeleteHistoryRequest(
peer=self._bot_username,
max_id=0,
just_clear=True,
revoke=True,
))
await self.client(
functions.messages.DeleteHistoryRequest(
peer=self._bot_username,
max_id=0,
just_clear=True,
revoke=True,
)
)
except YouBlockedUserError:
logger.warning(f"Please unblock {self._bot_username} to enable external installation feature. Or disable external_install_allowed in Limoka settings to get rid of this message.")
logger.warning(
f"Please unblock {self._bot_username} to enable external installation feature. Or disable external_install_allowed in Limoka settings to get rid of this message."
)
self._userbot_bot_username = (await self.inline.bot.get_me()).username
@loader.loop(interval=3600)
async def _update_modules_loop(self):
self.modules = await self.api.fetch_json(
self._base_url, "modules.json"
)
self.modules = await self.api.fetch_json(self._base_url, "modules.json")
# Re-apply newbie filter after modules refresh
try:
self.modules = self._filter_newbies(self.modules)
@@ -446,7 +460,18 @@ class Limoka(loader.Module):
writer.add_document(
title=module_data["name"],
path=module_path,
content=module_data["name"] + " " + (module_data.get("description") or "" + " " + ((module_data.get("meta").get("developer") or "") if module_data.get("meta") else "")),
content=module_data["name"]
+ " "
+ (
module_data.get("description")
or ""
+ " "
+ (
(module_data.get("meta").get("developer") or "")
if module_data.get("meta")
else ""
)
),
)
for func in module_data.get("commands", []):
for command, description in func.items():
@@ -462,7 +487,9 @@ class Limoka(loader.Module):
return None
try:
async with aiohttp.ClientSession() as session:
async with session.head(url, timeout=5, allow_redirects=True) as response:
async with session.head(
url, timeout=5, allow_redirects=True
) as response:
if response.status != 200:
self._invalid_banners.add(url)
return None
@@ -475,9 +502,43 @@ class Limoka(loader.Module):
if url:
self._invalid_banners.add(url)
return None
def find_userbot(self, keys: Iterable[str]) -> str | None:
scores = defaultdict(int)
for key in keys:
parts = key.split(".")
# проходим все префиксы
for i in range(1, len(parts)):
prefix = ".".join(parts[:i])
suffix = ".".join(parts[i:])
weight = WEIGHTS.get(suffix, DEFAULT_WEIGHT)
scores[prefix] += weight
if not scores:
return None
return max(scores, key=scores.get)
@property
def user_lang(self) -> str:
self.db.get("heroku.translations", "lang")
userbot = self.find_userbot(self.db.keys())
if not userbot:
logger.warning(
"Cannot determine userbot type. "
"Probably not FTG-like Userbot? "
"Defaulting language to English. "
"If this is unexpected, please report to the module developer."
)
return "en"
return self.db.get(f"{userbot}.translations", "lang")
def generate_commands(self, module_info, lang: str = "en"):
commands = []
@@ -507,7 +568,6 @@ class Limoka(loader.Module):
)
return commands
def _format_module_content(
self,
module_info: Dict[str, Any],
@@ -520,12 +580,14 @@ class Limoka(loader.Module):
name = html.escape(module_info.get("name") or self.strings["no_info"])
cls_doc = module_info.get("cls_doc", {})
description = html.escape(
_get_lang_value(cls_doc, lang) or
_get_lang_value(module_info.get("description", ""), lang) or
self.strings["no_info"]
_get_lang_value(cls_doc, lang)
or _get_lang_value(module_info.get("description", ""), lang)
or self.strings["no_info"]
)
dev_username = html.escape(module_info["meta"].get("developer") or "Unknown")
raw_path = (
module_path if module_path is not None else module_info.get("path", "")
)
dev_username = html.escape(module_info["meta"].get("developer", "Unknown"))
raw_path = module_path if module_path is not None else module_info.get("path", "")
clean_module_path = (raw_path or "").replace("\\", "/")
commands = self.generate_commands(module_info, lang)
categories_text = ""
@@ -537,7 +599,9 @@ class Limoka(loader.Module):
)
if len(description) > 300:
description = description[:297] + ""
repo_key = "/".join(module_path.split("/")[:2]) if "/" in module_path else module_path
repo_key = (
"/".join(module_path.split("/")[:2]) if "/" in module_path else module_path
)
tags_list = []
for x in self.repositories:
if x.replace("https://github.com/", "") == repo_key:
@@ -577,14 +641,12 @@ class Limoka(loader.Module):
)
return header, body_pages, footer, categories_text
def _build_navigation_markup(
self, session: Dict[str, Any]
) -> list:
def _build_navigation_markup(self, session: Dict[str, Any]) -> list:
result = session["results"]
index = session["current_index"]
query = session["query"]
filters = session["filters"]
page = index + 1
markup = [
[
@@ -596,7 +658,11 @@ class Limoka(loader.Module):
{"text": f"{page}/{len(result)}", "callback": self._inline_void},
{
"text": "" if index + 1 < len(result) else "🚫",
"callback": self._next_page if index + 1 < len(result) else self._inline_void,
"callback": (
self._next_page
if index + 1 < len(result)
else self._inline_void
),
"args": (session,) if index + 1 < len(result) else (),
},
],
@@ -625,60 +691,93 @@ class Limoka(loader.Module):
return markup
def _build_module_markup(
self, session: Dict[str, Any], body_pages: List[str], page_body: int, module_path: str
self,
session: Dict[str, Any],
body_pages: List[str],
page_body: int,
module_path: str,
) -> list:
result = session["results"]
index = session["current_index"]
query = session["query"]
filters = session["filters"]
markup = []
if len(body_pages) > 1:
markup.append([
{
"text": "◀️" if page_body > 0 else "🚫",
"callback": self._previous_body_page if page_body > 0 else self._inline_void,
"args": (session, module_path, page_body) if page_body > 0 else (),
},
{"text": f"Body {page_body + 1}/{len(body_pages)}", "callback": self._inline_void},
{
"text": "▶️" if page_body + 1 < len(body_pages) else "🚫",
"callback": self._next_body_page if page_body + 1 < len(body_pages) else self._inline_void,
"args": (session, module_path, page_body) if page_body + 1 < len(body_pages) else (),
},
])
markup.append(
[
{
"text": "◀️" if page_body > 0 else "🚫",
"callback": (
self._previous_body_page
if page_body > 0
else self._inline_void
),
"args": (
(session, module_path, page_body) if page_body > 0 else ()
),
},
{
"text": f"Body {page_body + 1}/{len(body_pages)}",
"callback": self._inline_void,
},
{
"text": "▶️" if page_body + 1 < len(body_pages) else "🚫",
"callback": (
self._next_body_page
if page_body + 1 < len(body_pages)
else self._inline_void
),
"args": (
(session, module_path, page_body)
if page_body + 1 < len(body_pages)
else ()
),
},
]
)
page = index + 1
markup.append([
{
"text": "" if index > 0 else "🚫",
"callback": self._previous_page if index > 0 else self._inline_void,
"args": (session,) if index > 0 else (),
},
{"text": f"{page}/{len(result)}", "callback": self._inline_void},
{
"text": "" if index + 1 < len(result) else "🚫",
"callback": self._next_page if index + 1 < len(result) else self._inline_void,
"args": (session,) if index + 1 < len(result) else (),
},
])
markup.append([
{
"text": "🔍 " + self.strings["filter_menu"].split(":")[0],
"callback": self._display_filter_menu,
"args": (session,),
},
{
"text": "🔄 " + self.strings["change_query"],
"callback": self._enter_query,
},
])
markup.append([
{
"text": self.strings["global_button"],
"callback": self._show_global_results,
"args": (session,),
},
])
markup.append(
[
{
"text": "" if index > 0 else "🚫",
"callback": self._previous_page if index > 0 else self._inline_void,
"args": (session,) if index > 0 else (),
},
{"text": f"{page}/{len(result)}", "callback": self._inline_void},
{
"text": "" if index + 1 < len(result) else "🚫",
"callback": (
self._next_page
if index + 1 < len(result)
else self._inline_void
),
"args": (session,) if index + 1 < len(result) else (),
},
]
)
markup.append(
[
{
"text": "🔍 " + self.strings["filter_menu"].split(":")[0],
"callback": self._display_filter_menu,
"args": (session,),
},
{
"text": "🔄 " + self.strings["change_query"],
"callback": self._enter_query,
},
]
)
markup.append(
[
{
"text": self.strings["global_button"],
"callback": self._show_global_results,
"args": (session,),
},
]
)
markup.append(
[{"text": self.strings.get("close", "❌ Close"), "action": "close"}]
)
@@ -709,7 +808,9 @@ class Limoka(loader.Module):
)
else:
if photo is not None:
await message_or_call.edit(text=text, reply_markup=markup, photo=photo)
await message_or_call.edit(
text=text, reply_markup=markup, photo=photo
)
else:
await message_or_call.edit(text=text, reply_markup=markup)
except (BadRequest, WebpageMediaEmptyError) as e:
@@ -730,8 +831,11 @@ class Limoka(loader.Module):
try:
query = session["query"]
filters = session["filters"]
lang = self.user_lang()
lang = self.user_lang
logger.info(
f"Displaying module: {module_path} for query: {query} with filters: {filters} in language: {lang}"
)
module_banner_raw = module_info.get("meta", {}).get("banner")
photo = await self._validate_url(module_banner_raw)
@@ -750,11 +854,11 @@ class Limoka(loader.Module):
current_body = body_pages[min(page_body, len(body_pages) - 1)]
full_message = header + current_body + footer + categories_text
markup = self._build_module_markup(session, body_pages, page_body, module_path)
await self._safe_display(
message_or_call, full_message, markup, photo
markup = self._build_module_markup(
session, body_pages, page_body, module_path
)
await self._safe_display(message_or_call, full_message, markup, photo)
except Exception as e:
logger.exception(f"Error in _display_module: {e}")
if isinstance(message_or_call, Message):
@@ -763,30 +867,45 @@ class Limoka(loader.Module):
await message_or_call.edit(self.strings["error_occurred"])
async def _previous_body_page(
self, call: InlineCall, session: Dict[str, Any], module_path: str, page_body: int
self,
call: InlineCall,
session: Dict[str, Any],
module_path: str,
page_body: int,
):
module_info = self.modules[module_path]
new_page_body = max(page_body - 1, 0)
await self._display_module(call, module_info, module_path, session, page_body=new_page_body)
await self._display_module(
call, module_info, module_path, session, page_body=new_page_body
)
async def _next_body_page(
self, call: InlineCall, session: Dict[str, Any], module_path: str, page_body: int
self,
call: InlineCall,
session: Dict[str, Any],
module_path: str,
page_body: int,
):
module_info = self.modules[module_path]
query = session["query"]
filters = session["filters"]
header, body_pages, footer, categories_text = self._format_module_content(
module_info, query, filters, include_categories=True, module_path=module_path, lang=self.user_lang()
module_info,
query,
filters,
include_categories=True,
module_path=module_path,
lang=self.user_lang,
)
new_page_body = min(page_body + 1, len(body_pages) - 1)
await self._display_module(call, module_info, module_path, session, page_body=new_page_body)
await self._display_module(
call, module_info, module_path, session, page_body=new_page_body
)
async def _display_filter_menu(
self, call: InlineCall, session: Dict[str, Any]
):
async def _display_filter_menu(self, call: InlineCall, session: Dict[str, Any]):
query = session["query"]
current_filters = session["filters"]
categories = current_filters.get("category", [])
filters_text = self.strings["selected_categories"].format(
categories=(
@@ -823,14 +942,14 @@ class Limoka(loader.Module):
[{"text": self.strings.get("close", "❌ Close"), "action": "close"}],
]
text = self.strings["filter_menu"].format(query=query) + f"\n{filters_text}"
await call.edit(text, reply_markup=markup, photo=self._get_banner_for_state("filter_select"))
await call.edit(
text, reply_markup=markup, photo=self._get_banner_for_state("filter_select")
)
async def _select_category(
self, call: InlineCall, session: Dict[str, Any]
):
async def _select_category(self, call: InlineCall, session: Dict[str, Any]):
query = session["query"]
current_filters = session["filters"]
all_categories = set()
for module_data in self.modules.values():
all_categories.update(module_data.get("category", ["No category"]))
@@ -860,7 +979,7 @@ class Limoka(loader.Module):
)
if cat in selected_categories:
button_text = "" + button_text
# Create new session with updated filters
new_session = session.copy()
row.append(
@@ -893,7 +1012,7 @@ class Limoka(loader.Module):
):
query = session["query"]
current_filters = session["filters"]
new_filters = current_filters.copy()
selected_categories = new_filters.get("category", [])
if category in selected_categories:
@@ -921,7 +1040,7 @@ class Limoka(loader.Module):
):
query = session["query"]
filters = session["filters"]
searcher = Search(query.lower(), self.ix)
try:
result = searcher.search_module()
@@ -983,7 +1102,7 @@ class Limoka(loader.Module):
return
module_path = filtered_result[0]
module_info = self.modules[module_path]
# Create session for displaying module
display_session = self._create_search_session(
state=self.SEARCH_STATES["global_search"],
@@ -992,9 +1111,7 @@ class Limoka(loader.Module):
results=filtered_result,
current_index=0,
)
await self._display_module(
call, module_info, module_path, display_session, 0
)
await self._display_module(call, module_info, module_path, display_session, 0)
async def _enter_query_handler(
self, call_or_query, query: Optional[str] = None, *args, **kwargs
@@ -1074,7 +1191,7 @@ class Limoka(loader.Module):
return
module_path = result[0]
module_info = self.modules[module_path]
# Create session for displaying module
display_session = self._create_search_session(
state=self.SEARCH_STATES["global_search"],
@@ -1098,11 +1215,13 @@ class Limoka(loader.Module):
{
"text": self.strings["back_to_results"],
"callback": self._show_results,
"args": (self._create_search_session(
state=self.SEARCH_STATES["global_search"],
query=query or "",
filters={},
),),
"args": (
self._create_search_session(
state=self.SEARCH_STATES["global_search"],
query=query or "",
filters={},
),
),
}
],
[
@@ -1116,7 +1235,7 @@ class Limoka(loader.Module):
async def _show_global_results(self, call: InlineCall, session: Dict[str, Any]):
query = session["query"]
searcher = Search(query.lower(), self.ix)
try:
result = searcher.search_module()
@@ -1145,7 +1264,7 @@ class Limoka(loader.Module):
if not info:
continue
name = info.get("name", "Unknown")
global_session = self._create_search_session(
state=self.SEARCH_STATES["global_search"],
query=query,
@@ -1171,47 +1290,37 @@ class Limoka(loader.Module):
self, call: InlineCall, module_path: str, session: Dict[str, Any]
):
module_info = self.modules[module_path]
await self._display_module(
call, module_info, module_path, session, 0
)
await self._display_module(call, module_info, module_path, session, 0)
async def _next_page(
self, call: InlineCall, session: Dict[str, Any]
):
async def _next_page(self, call: InlineCall, session: Dict[str, Any]):
result = session["results"]
index = session["current_index"]
if index + 1 >= len(result):
await call.answer(self.strings["last_page"])
return
index += 1
module_path = result[index]
module_info = self.modules[module_path]
new_session = session.copy()
new_session["current_index"] = index
await self._display_module(
call, module_info, module_path, new_session, 0
)
await self._display_module(call, module_info, module_path, new_session, 0)
async def _previous_page(
self, call: InlineCall, session: Dict[str, Any]
):
async def _previous_page(self, call: InlineCall, session: Dict[str, Any]):
result = session["results"]
index = session["current_index"]
if index - 1 < 0:
await call.answer(self.strings["first_page"])
return
index -= 1
module_path = result[index]
module_info = self.modules[module_path]
new_session = session.copy()
new_session["current_index"] = index
await self._display_module(
call, module_info, module_path, new_session, 0
)
await self._display_module(call, module_info, module_path, new_session, 0)
async def _inline_void(self, call: InlineCall):
await call.answer()
@@ -1244,7 +1353,7 @@ class Limoka(loader.Module):
text=self.strings["start_search_form"],
message=message,
reply_markup=markup,
photo=self._get_banner_for_state("global_search")
photo=self._get_banner_for_state("global_search"),
)
return
history = self.get("history", [])
@@ -1269,7 +1378,7 @@ class Limoka(loader.Module):
return await utils.answer(message, self.strings["404"].format(query=args))
module_path = result[0]
module_info = self.modules[module_path]
# Create session for displaying module
display_session = self._create_search_session(
state=self.SEARCH_STATES["global_search"],
@@ -1278,7 +1387,9 @@ class Limoka(loader.Module):
results=result,
current_index=0,
)
await self._display_module(message, module_info, module_path, display_session, 0)
await self._display_module(
message, module_info, module_path, display_session, 0
)
async def _show_global_form(self, call: InlineCall, message: Message):
markup = [
@@ -1309,12 +1420,12 @@ class Limoka(loader.Module):
self, call: InlineCall, query: str, message: Message, *args, **kwargs
):
global_session = self._create_search_session(
state=self.SEARCH_STATES["global_search"],
query=query,
filters={},
results=[],
current_index=0,
) # idk what is that crap but it works lol
state=self.SEARCH_STATES["global_search"],
query=query,
filters={},
results=[],
current_index=0,
) # idk what is that crap but it works lol
if len(query) <= 1:
await call.edit(
self.strings["?"],
@@ -1442,16 +1553,20 @@ class Limoka(loader.Module):
elif hasattr(message.from_id, "channel_id"):
sender_id = message.from_id.channel_id
if sender_id != self._service_bot_id:
logger.debug("Message not from official bot, ignoring")
# logger.debug("Message not from official bot, ignoring")
return
if not self.config["external_install_allowed"]:
return
try:
clean_text = getattr(message, "raw_text", None) or getattr(
message, "message", None
) or message.text or ""
clean_text = (
getattr(message, "raw_text", None)
or getattr(message, "message", None)
or message.text
or ""
)
if message.entities:
from html import unescape
clean_text = unescape(clean_text)
clean_text = re.sub(r"<[^>]+>", "", clean_text)
match = re.search(r"#limoka:([^\s\"'<>]+)", clean_text)
@@ -1480,25 +1595,37 @@ class Limoka(loader.Module):
if not found:
logger.warning(f"Module not found after cleanup: {module_path}")
await utils.answer(
message, self.strings["watcher_module_not_found"].format(path=html.escape(module_path))
message,
self.strings["watcher_module_not_found"].format(
path=html.escape(module_path)
),
)
return
try:
import base64
from cryptography.hazmat.primitives.asymmetric import ed25519
PUB_KEY_B64 = "MCowBQYDK2VwAyEA1ltSnqtf3pGBuctuAYqHivCXsaRtKOVxavai7yin7ZE="
PUB_KEY_B64 = (
"MCowBQYDK2VwAyEA1ltSnqtf3pGBuctuAYqHivCXsaRtKOVxavai7yin7ZE="
)
der_bytes = base64.b64decode(PUB_KEY_B64)
raw_pubkey = der_bytes[-32:]
module_url = self.config["limokaurl"] + module_path
async with aiohttp.ClientSession() as session:
async with session.get(module_url, timeout=10) as resp:
if resp.status != 200:
logger.error(f"Failed to fetch module for verification: {module_url} (HTTP {resp.status})")
await utils.answer(message, self.strings["watcher_loader_missing"])
logger.error(
f"Failed to fetch module for verification: {module_url} (HTTP {resp.status})"
)
await utils.answer(
message, self.strings["watcher_loader_missing"]
)
return
module_bytes = await resp.read()
sha256 = hashlib.sha256(module_bytes).hexdigest()
public_key = ed25519.Ed25519PublicKey.from_public_bytes(raw_pubkey)
public_key = ed25519.Ed25519PublicKey.from_public_bytes(
raw_pubkey
)
signature = bytes.fromhex(signature_hex)
signed_payload = f"{module_path}|{sha256}".encode()
public_key.verify(signature, signed_payload)
@@ -1522,21 +1649,27 @@ class Limoka(loader.Module):
if status:
try:
bot_peer = await self.client.get_entity(self._service_bot_id)
await self.client.send_message(bot_peer, f"#limoka:sucsess:{message.id}")
await self.client.send_message(
bot_peer, f"#limoka:sucsess:{message.id}"
)
except Exception as e:
logger.error(f"Failed to send success confirmation: {e}")
else:
logger.error(f"Installation failed with status: {status}")
try:
bot_peer = await self.client.get_entity(self._service_bot_id)
await self.client.send_message(bot_peer, f"#limoka:failed:{message.id}")
await self.client.send_message(
bot_peer, f"#limoka:failed:{message.id}"
)
except Exception as e:
logger.error(f"Failed to send failure notification: {e}")
except Exception as e:
logger.exception(f"CRITICAL ERROR in secure_install_watcher: {e}")
try:
await utils.answer(message, self.strings["watcher_critical"].format(error=str(e)[:100]))
await utils.answer(
message, self.strings["watcher_critical"].format(error=str(e)[:100])
)
await asyncio.sleep(5)
await message.delete()
except Exception:
pass
pass