mirror of
https://github.com/MuRuLOSE/limoka.git
synced 2026-06-16 14:34:17 +02:00
Added and updated repositories 2026-04-18 01:49:10
This commit is contained in:
@@ -302,7 +302,7 @@ class FHetaUI:
|
||||
|
||||
@loader.tds
|
||||
class FHeta(loader.Module):
|
||||
'''Module for searching modules! Watch all FHeta news in @FHeta_Updates!'''
|
||||
'''Module for searching modules! Watch all FHeta news in @NFHeta_Updates!'''
|
||||
|
||||
strings = {
|
||||
"name": "FHeta",
|
||||
@@ -338,7 +338,7 @@ class FHeta(loader.Module):
|
||||
}
|
||||
|
||||
strings_ru = {
|
||||
"_cls_doc": "Модуль для поиска модулей! Следите за всеми новостями FHeta в @FHeta_Updates!",
|
||||
"_cls_doc": "Модуль для поиска модулей! Следите за всеми новостями FHeta в @NFHeta_Updates!",
|
||||
"lang": "ru",
|
||||
"author": "от",
|
||||
"description": "Описание",
|
||||
@@ -371,7 +371,7 @@ class FHeta(loader.Module):
|
||||
}
|
||||
|
||||
strings_ua = {
|
||||
"_cls_doc": "Модуль для пошуку модулів! Слідкуйте за всіма новинами FHeta в @FHeta_Updates!",
|
||||
"_cls_doc": "Модуль для пошуку модулів! Слідкуйте за всіма новинами FHeta в @NFHeta_Updates!",
|
||||
"lang": "ua",
|
||||
"author": "від",
|
||||
"description": "Опис",
|
||||
@@ -404,7 +404,7 @@ class FHeta(loader.Module):
|
||||
}
|
||||
|
||||
strings_kz = {
|
||||
"_cls_doc": "Модульдерді іздеу модулі! FHeta барлық жаңалықтарын @FHeta_Updates арнасында қадағалаңыз!",
|
||||
"_cls_doc": "Модульдерді іздеу модулі! FHeta барлық жаңалықтарын @NFHeta_Updates арнасында қадағалаңыз!",
|
||||
"lang": "kz",
|
||||
"author": "авторы",
|
||||
"description": "Сипаттама",
|
||||
@@ -437,7 +437,7 @@ class FHeta(loader.Module):
|
||||
}
|
||||
|
||||
strings_uz = {
|
||||
"_cls_doc": "Modullarni qidirish moduli! FHeta barcha yangilanishlarini @FHeta_Updates kanalida kuzatib boring!",
|
||||
"_cls_doc": "Modullarni qidirish moduli! FHeta barcha yangilanishlarini @NFHeta_Updates kanalida kuzatib boring!",
|
||||
"lang": "uz",
|
||||
"author": "muallif",
|
||||
"description": "Tavsif",
|
||||
@@ -470,7 +470,7 @@ class FHeta(loader.Module):
|
||||
}
|
||||
|
||||
strings_fr = {
|
||||
"_cls_doc": "Module de recherche de modules! Suivez toutes les actualités FHeta sur @FHeta_Updates!",
|
||||
"_cls_doc": "Module de recherche de modules! Suivez toutes les actualités FHeta sur @NFHeta_Updates!",
|
||||
"lang": "fr",
|
||||
"author": "par",
|
||||
"description": "Description",
|
||||
@@ -503,7 +503,7 @@ class FHeta(loader.Module):
|
||||
}
|
||||
|
||||
strings_de = {
|
||||
"_cls_doc": "Modul zur Suche nach Modulen! Verfolgen Sie alle FHeta-Neuigkeiten auf @FHeta_Updates!",
|
||||
"_cls_doc": "Modul zur Suche nach Modulen! Verfolgen Sie alle FHeta-Neuigkeiten auf @NFHeta_Updates!",
|
||||
"lang": "de",
|
||||
"author": "von",
|
||||
"description": "Beschreibung",
|
||||
@@ -536,7 +536,7 @@ class FHeta(loader.Module):
|
||||
}
|
||||
|
||||
strings_jp = {
|
||||
"_cls_doc": "モジュール検索用モジュール!@FHeta_UpdatesでFHetaのすべてのニュースをフォローしてください!",
|
||||
"_cls_doc": "モジュール検索用モジュール!@NFHeta_UpdatesでFHetaのすべてのニュースをフォローしてください!",
|
||||
"lang": "jp",
|
||||
"author": "作成者",
|
||||
"description": "説明",
|
||||
@@ -577,7 +577,7 @@ class FHeta(loader.Module):
|
||||
"command": '<tg-emoji emoji-id="5341715473882955310">⚙️</tg-emoji>',
|
||||
"placeholder": '<tg-emoji emoji-id="5359785904535774578">🗒️</tg-emoji>',
|
||||
"module": '<tg-emoji emoji-id="5454112830989025752">📦</tg-emoji>',
|
||||
"channel": '<tg-emoji emoji-id="5278256077954105203">📢</tg-emoji>',
|
||||
"channel": '📢',
|
||||
"modules_list": '<tg-emoji emoji-id="5197269100878907942">📋</tg-emoji>'
|
||||
},
|
||||
"winter": {
|
||||
@@ -588,7 +588,7 @@ class FHeta(loader.Module):
|
||||
"command": '<tg-emoji emoji-id="5199503707938505333">🎅</tg-emoji>',
|
||||
"placeholder": '<tg-emoji emoji-id="5204046675236109418">🗒️</tg-emoji>',
|
||||
"module": '<tg-emoji emoji-id="5197708768091061888">🎁</tg-emoji>',
|
||||
"channel": '<tg-emoji emoji-id="5278256077954105203">📢</tg-emoji>',
|
||||
"channel": '📢',
|
||||
"modules_list": '<tg-emoji emoji-id="5345935030143196497">🎄</tg-emoji>'
|
||||
},
|
||||
"summer": {
|
||||
@@ -599,7 +599,7 @@ class FHeta(loader.Module):
|
||||
"command": '<tg-emoji emoji-id="5442644589703866634">🏄</tg-emoji>',
|
||||
"placeholder": '<tg-emoji emoji-id="5434121252874756456">🗒️</tg-emoji>',
|
||||
"module": '<tg-emoji emoji-id="5433645645376264953">🏖️</tg-emoji>',
|
||||
"channel": '<tg-emoji emoji-id="5278256077954105203">📢</tg-emoji>',
|
||||
"channel": '📢',
|
||||
"modules_list": '<tg-emoji emoji-id="5472178859300363509">🏖️</tg-emoji>'
|
||||
},
|
||||
"spring": {
|
||||
@@ -610,7 +610,7 @@ class FHeta(loader.Module):
|
||||
"command": '<tg-emoji emoji-id="5449850741667668411">🦋</tg-emoji>',
|
||||
"placeholder": '<tg-emoji emoji-id="5434121252874756456">🗒️</tg-emoji>',
|
||||
"module": '<tg-emoji emoji-id="5440911110838425969">🌿</tg-emoji>',
|
||||
"channel": '<tg-emoji emoji-id="5278256077954105203">📢</tg-emoji>',
|
||||
"channel": '📢',
|
||||
"modules_list": '<tg-emoji emoji-id="5440748683765227563">🌺</tg-emoji>'
|
||||
},
|
||||
"autumn": {
|
||||
@@ -621,7 +621,7 @@ class FHeta(loader.Module):
|
||||
"command": '<tg-emoji emoji-id="5212963577098417551">🍂</tg-emoji>',
|
||||
"placeholder": '<tg-emoji emoji-id="5363965354391388799">🗒️</tg-emoji>',
|
||||
"module": '<tg-emoji emoji-id="5249157915041865558">🍄</tg-emoji>',
|
||||
"channel": '<tg-emoji emoji-id="5278256077954105203">📢</tg-emoji>',
|
||||
"channel": '📢',
|
||||
"modules_list": '<tg-emoji emoji-id="5305495722618010655">🍂</tg-emoji>'
|
||||
}
|
||||
}
|
||||
|
||||
628
Fixyres/FModules/FSecurity.py
Normal file
628
Fixyres/FModules/FSecurity.py
Normal file
@@ -0,0 +1,628 @@
|
||||
__version__ = (1, 0, 0)
|
||||
|
||||
# meta developer: @NFModules
|
||||
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import html
|
||||
import sys
|
||||
import uuid
|
||||
import copy
|
||||
import hashlib
|
||||
import json
|
||||
import re
|
||||
from contextlib import suppress
|
||||
from .. import loader, utils
|
||||
|
||||
|
||||
@loader.tds
|
||||
class FSecurity(loader.Module):
|
||||
"""Module for automatic AI-based security checks of installed modules."""
|
||||
|
||||
strings = {
|
||||
"name": "FSecurity",
|
||||
"lang": "English",
|
||||
"unavailable": "AI module{} check is unavailable.",
|
||||
"suspicious": "AI interrupted installation of a suspicious module{}, reason:",
|
||||
"blocked": "AI blocked module installation{}, reason:",
|
||||
"continue": "Continue installation?",
|
||||
"strict_mode_doc": "Block loading modules by any method (lm/dlm allowed) if the AI API is unavailable or the module is suspicious. On restart, this also applies to already installed modules.",
|
||||
"nvidia_api_key_doc": "API key from build.nvidia.com, used for AI checks. If not specified, a public key from GitHub will be used."
|
||||
}
|
||||
|
||||
strings_ru = {
|
||||
"lang": "Russian",
|
||||
"_cls_doc": "Модуль для автоматической проверки устанавливаемых модулей через ИИ.",
|
||||
"unavailable": "Проверка модуля{} через ИИ недоступна.",
|
||||
"suspicious": "ИИ прервал установку подозрительного модуля{}, причина:",
|
||||
"blocked": "ИИ заблокировал установку модуля{}, причина:",
|
||||
"continue": "Продолжить установку?",
|
||||
"strict_mode_doc": "Не позволять загружать модули любым методом (lm/dlm разрешено), если API ИИ недоступен или модуль подозрителен. При перезагрузке работает даже на уже установленные модули.",
|
||||
"nvidia_api_key_doc": "API ключ от build.nvidia.com, используется для проверки через ИИ. Если вы его не укажете, будет использоваться общий ключ с GitHub."
|
||||
}
|
||||
|
||||
strings_ua = {
|
||||
"lang": "Ukraine",
|
||||
"_cls_doc": "Модуль для автоматичної перевірки встановлюваних модулів через ШІ.",
|
||||
"unavailable": "Перевірка модуля{} через ШІ недоступна.",
|
||||
"suspicious": "ШІ перервав встановлення підозрілого модуля{}, причина:",
|
||||
"blocked": "ШІ заблокував встановлення модуля{}, причина:",
|
||||
"continue": "Продовжити встановлення?",
|
||||
"strict_mode_doc": "Не дозволяти завантажувати модулі будь-яким методом (lm/dlm дозволено), якщо API ШІ недоступний або модуль підозрілий. При перезавантаженні працює навіть на вже встановлені модулі.",
|
||||
"nvidia_api_key_doc": "API ключ від build.nvidia.com, використовується для перевірки через ШІ. Якщо ви його не вкажете, буде використовуватися загальний ключ з GitHub."
|
||||
}
|
||||
|
||||
strings_de = {
|
||||
"lang": "Germany",
|
||||
"_cls_doc": "Modul zur automatischen Prüfung installierter Module mit KI.",
|
||||
"unavailable": "Die KI-Modulprüfung{} ist nicht verfügbar.",
|
||||
"suspicious": "Die KI hat die Installation eines verdächtigen Moduls unterbrochen{}, Grund:",
|
||||
"blocked": "Die KI hat die Modulinstallation blockiert{}, Grund:",
|
||||
"continue": "Installation fortsetzen?",
|
||||
"strict_mode_doc": "Das Laden von Modulen mit jeder Methode blockieren (lm/dlm erlaubt), wenn die KI-API nicht verfügbar ist oder das Modul verdächtig ist. Beim Neustart gilt dies auch für bereits installierte Module.",
|
||||
"nvidia_api_key_doc": "API-Schlüssel von build.nvidia.com, der für KI-Prüfungen verwendet wird. Wenn nicht angegeben, wird ein öffentlicher Schlüssel von GitHub verwendet."
|
||||
}
|
||||
|
||||
strings_jp = {
|
||||
"lang": "Japanese",
|
||||
"_cls_doc": "AIでインストールされるモジュールを自動チェックするモジュール。",
|
||||
"unavailable": "AIモジュール{}のチェックが利用できません。",
|
||||
"suspicious": "AIが疑わしいモジュールのインストールを中断しました{}、理由:",
|
||||
"blocked": "AIがモジュールのインストールをブロックしました{}、理由:",
|
||||
"continue": "インストールを続行しますか?",
|
||||
"strict_mode_doc": "AI APIが利用できない場合や疑わしいモジュールの場合、すべての方法でモジュールの読み込みをブロックします(lm/dlmは許可)。再起動時にはインストール済みモジュールにも適用されます。",
|
||||
"nvidia_api_key_doc": "build.nvidia.com のAPIキー。AIチェックに使用されます。指定しない場合は、GitHubのパブリックキーが使用されます。"
|
||||
}
|
||||
|
||||
strings_tr = {
|
||||
"lang": "Turkish",
|
||||
"_cls_doc": "Kurulan modülleri yapay zeka ile otomatik kontrol eden modül.",
|
||||
"unavailable": "Yapay zeka modül{} kontrolü kullanılamıyor.",
|
||||
"suspicious": "Yapay zeka şüpheli bir modülün kurulumunu durdurdu{}, sebep:",
|
||||
"blocked": "Yapay zeka modül kurulumunu engelledi{}, sebep:",
|
||||
"continue": "Kuruluma devam edilsin mi?",
|
||||
"strict_mode_doc": "AI API kullanılamıyorsa veya modül şüpheliyse, tüm yöntemlerle modül yüklenmesini engelle (lm/dlm izinli). Yeniden başlatmada zaten kurulu modüller için de geçerlidir.",
|
||||
"nvidia_api_key_doc": "Yapay zeka kontrolleri için kullanılan build.nvidia.com API anahtarı. Belirtilmezse GitHub'daki genel anahtar kullanılacaktır."
|
||||
}
|
||||
|
||||
strings_uz = {
|
||||
"lang": "Uzbekistan",
|
||||
"_cls_doc": "O'rnatilayotgan modullarni AI orqali avtomatik tekshiruvchi modul.",
|
||||
"unavailable": "AI modul{} tekshiruvi mavjud emas.",
|
||||
"suspicious": "AI shubhali modul o'rnatilishini to'xtatdi{}, sabab:",
|
||||
"blocked": "AI modul o'rnatilishini blokladi{}, sabab:",
|
||||
"continue": "O'rnatishni davom ettirasizmi?",
|
||||
"strict_mode_doc": "AI API mavjud bo'lmasa yoki modul shubhali bo'lsa, barcha usullar bilan modul yuklashni bloklash (lm/dlm ruxsat etilgan). Qayta ishga tushirishda allaqachon o'rnatilgan modullarga ham ta'sir qiladi.",
|
||||
"nvidia_api_key_doc": "build.nvidia.com API kaliti, AI orqali tekshirish uchun ishlatiladi. Agar ko'rsatmasangiz, GitHub-dan umumiy kalit ishlatiladi."
|
||||
}
|
||||
|
||||
strings_kz = {
|
||||
"lang": "Kazakhstan",
|
||||
"_cls_doc": "Орнатылатын модульдерді ЖИ арқылы автоматты тексеретін модуль.",
|
||||
"unavailable": "AI модуль{} тексеру қолжетімсіз.",
|
||||
"suspicious": "AI күдікті модульді орнатуды тоқтатты{}, себебі:",
|
||||
"blocked": "AI модульді орнатуды бұғаттады{}, себебі:",
|
||||
"continue": "Орнатуды жалғастырасыз ба?",
|
||||
"strict_mode_doc": "AI API қолжетімсіз болса немесе модуль күдікті болса, барлық әдістермен модуль жүктеуді бұғаттау (lm/dlm рұқсат етілген). Қайта іске қосқанда орнатылған модульдерге де қолданылады.",
|
||||
"nvidia_api_key_doc": "build.nvidia.com API кілті, ЖИ арқылы тексеру үшін қолданылады. Егер оны көрсетпесеңіз, GitHub-тан ортақ кілт пайдаланылады."
|
||||
}
|
||||
|
||||
def __init__(self):
|
||||
self.config = loader.ModuleConfig(
|
||||
loader.ConfigValue(
|
||||
"strict_mode",
|
||||
False,
|
||||
lambda: self.strings("strict_mode_doc"),
|
||||
validator=loader.validators.Boolean(),
|
||||
),
|
||||
loader.ConfigValue(
|
||||
"nvidia_api_key",
|
||||
"",
|
||||
lambda: self.strings("nvidia_api_key_doc"),
|
||||
validator=loader.validators.Hidden(),
|
||||
)
|
||||
)
|
||||
self.tasks = {}
|
||||
self.oreg = None
|
||||
self.oload = None
|
||||
|
||||
async def client_ready(self, client, db):
|
||||
self.__origin__ = "<fsecurity>"
|
||||
self.core = self.lookup("loader")
|
||||
self.modules = self.core.allmodules
|
||||
self.restore_hooks()
|
||||
self.patch()
|
||||
|
||||
async def on_unload(self):
|
||||
self.unpatch()
|
||||
|
||||
def _render_prompt(self, prompt, **values):
|
||||
rendered = prompt
|
||||
for key, value in values.items():
|
||||
rendered = rendered.replace("{" + key + "}", str(value))
|
||||
return rendered
|
||||
|
||||
def _split_code(self, code):
|
||||
chunk_size = 180000
|
||||
if len(code) <= chunk_size:
|
||||
return [code]
|
||||
|
||||
chunks = []
|
||||
current =[]
|
||||
current_len = 0
|
||||
|
||||
for line in code.splitlines(keepends=True):
|
||||
if current and current_len + len(line) > chunk_size:
|
||||
chunks.append("".join(current))
|
||||
current =[]
|
||||
current_len = 0
|
||||
|
||||
if len(line) > chunk_size:
|
||||
if current:
|
||||
chunks.append("".join(current))
|
||||
current =[]
|
||||
current_len = 0
|
||||
for i in range(0, len(line), chunk_size):
|
||||
chunks.append(line[i:i + chunk_size])
|
||||
continue
|
||||
|
||||
current.append(line)
|
||||
current_len += len(line)
|
||||
|
||||
if current:
|
||||
chunks.append("".join(current))
|
||||
|
||||
return chunks or [code]
|
||||
|
||||
def _parse_ai_json(self, raw_text):
|
||||
raw_text = (raw_text or "").strip()
|
||||
if not raw_text:
|
||||
return None
|
||||
|
||||
try:
|
||||
parsed = json.loads(raw_text)
|
||||
if isinstance(parsed, dict):
|
||||
return parsed
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
match = re.search(r"\{[\s\S]*\}", raw_text)
|
||||
if not match:
|
||||
return None
|
||||
|
||||
try:
|
||||
parsed = json.loads(match.group())
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
return parsed if isinstance(parsed, dict) else None
|
||||
|
||||
async def _fetch_prompt(self, session, url):
|
||||
async with session.get(url, timeout=10) as resp:
|
||||
if resp.status != 200:
|
||||
return None
|
||||
prompt = (await resp.text()).strip()
|
||||
return prompt or None
|
||||
|
||||
async def _get_prompts(self, session):
|
||||
main_prompt = await self._fetch_prompt(session, "https://raw.githubusercontent.com/Fixyres/FModules/refs/heads/main/assets/FSecurity/prompts/main.txt")
|
||||
chunk_prompt = await self._fetch_prompt(session, "https://raw.githubusercontent.com/Fixyres/FModules/refs/heads/main/assets/FSecurity/prompts/chank.txt")
|
||||
final_prompt = await self._fetch_prompt(session, "https://raw.githubusercontent.com/Fixyres/FModules/refs/heads/main/assets/FSecurity/prompts/final.txt")
|
||||
if not main_prompt or not chunk_prompt or not final_prompt:
|
||||
return None
|
||||
return {
|
||||
"main": main_prompt,
|
||||
"chunk": chunk_prompt,
|
||||
"final": final_prompt,
|
||||
}
|
||||
|
||||
async def _nvidia_request(self, session, api_key, system_prompt, user_prompt):
|
||||
async with session.post(
|
||||
"https://integrate.api.nvidia.com/v1/chat/completions",
|
||||
headers={"Authorization": f"Bearer {api_key}"},
|
||||
json={
|
||||
"model": "qwen/qwen3-coder-480b-a35b-instruct",
|
||||
"messages":[
|
||||
{"role": "system", "content": system_prompt},
|
||||
{"role": "user", "content": user_prompt},
|
||||
],
|
||||
"temperature": 0.4,
|
||||
"max_tokens": 1000,
|
||||
},
|
||||
timeout=180,
|
||||
) as resp:
|
||||
if resp.status != 200:
|
||||
return None
|
||||
data = await resp.json()
|
||||
choices = data.get("choices") or[]
|
||||
if not choices:
|
||||
return None
|
||||
return self._parse_ai_json(choices[0].get("message", {}).get("content", ""))
|
||||
|
||||
async def _local_ai_check(self, session, code, lang, api_key):
|
||||
prompts = await self._get_prompts(session)
|
||||
if not prompts:
|
||||
return None
|
||||
|
||||
chunks = self._split_code(code)
|
||||
if len(chunks) == 1:
|
||||
prompt = self._render_prompt(prompts["main"], lang=lang)
|
||||
return await self._nvidia_request(
|
||||
session,
|
||||
api_key,
|
||||
prompt,
|
||||
f"Analyze this module:\n\n```python\n{code}\n```",
|
||||
)
|
||||
|
||||
total = len(chunks)
|
||||
findings =[]
|
||||
|
||||
for index, chunk in enumerate(chunks, start=1):
|
||||
previous_context = "; ".join(
|
||||
f"Part {i}: {finding}"
|
||||
for i, finding in enumerate(findings, start=1)
|
||||
if finding
|
||||
) or "Previous parts: no issues found so far."
|
||||
|
||||
chunk_prompt = self._render_prompt(
|
||||
prompts["chunk"],
|
||||
total=total,
|
||||
current=index,
|
||||
previous_context=previous_context,
|
||||
lang=lang,
|
||||
)
|
||||
chunk_result = await self._nvidia_request(
|
||||
session,
|
||||
api_key,
|
||||
chunk_prompt,
|
||||
f"Part {index}/{total}:\n\n```python\n{chunk}\n```",
|
||||
)
|
||||
if not chunk_result:
|
||||
return None
|
||||
|
||||
chunk_verdict = str(chunk_result.get("chunk_verdict", "CLEAN")).lower()
|
||||
chunk_finding = str(chunk_result.get("findings", "") or "")
|
||||
|
||||
if chunk_verdict == "blocked":
|
||||
findings_text = "\n".join(
|
||||
f"- Part {i}: {finding}"
|
||||
for i, finding in enumerate(findings, start=1)
|
||||
if finding
|
||||
)
|
||||
if chunk_finding:
|
||||
findings_text = f"{findings_text}\n- Part {index}: {chunk_finding}".strip()
|
||||
|
||||
final_prompt = self._render_prompt(
|
||||
prompts["final"],
|
||||
total=total,
|
||||
findings=findings_text or "No prior findings.",
|
||||
lang=lang,
|
||||
)
|
||||
return await self._nvidia_request(
|
||||
session,
|
||||
api_key,
|
||||
final_prompt,
|
||||
"Give the final verdict based on all findings.",
|
||||
)
|
||||
|
||||
findings.append(chunk_finding if chunk_verdict != "clean" else "")
|
||||
|
||||
findings_text = "\n".join(
|
||||
f"- Part {i}: {finding}"
|
||||
for i, finding in enumerate(findings, start=1)
|
||||
if finding
|
||||
) or "All parts: no issues found."
|
||||
|
||||
final_prompt = self._render_prompt(
|
||||
prompts["final"],
|
||||
total=total,
|
||||
findings=findings_text,
|
||||
lang=lang,
|
||||
)
|
||||
return await self._nvidia_request(
|
||||
session,
|
||||
api_key,
|
||||
final_prompt,
|
||||
"Give the final verdict based on all findings.",
|
||||
)
|
||||
|
||||
async def check(self, code):
|
||||
try:
|
||||
lang = self.strings("lang") or "en"
|
||||
module_hash = hashlib.sha256(code.encode("utf-8")).hexdigest()
|
||||
|
||||
db_cache = self.get("cache", {})
|
||||
if module_hash in db_cache:
|
||||
cached = db_cache[module_hash]
|
||||
if cached.get("level") == "safe":
|
||||
return True
|
||||
return cached
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
api_keys = await self._get_api_keys(session)
|
||||
for api_key in api_keys:
|
||||
parsed = await self._local_ai_check(session, code, lang, api_key)
|
||||
if not isinstance(parsed, dict):
|
||||
continue
|
||||
|
||||
verdict = str(parsed.get("verdict", "BLOCKED")).lower()
|
||||
if verdict not in {"safe", "suspicious", "blocked"}:
|
||||
verdict = "blocked"
|
||||
summary = str(parsed.get("summary", "") or "")
|
||||
|
||||
result = {"level": verdict if verdict != "safe" else "safe"}
|
||||
if verdict != "safe":
|
||||
result["reason"] = summary
|
||||
|
||||
db_cache[module_hash] = result
|
||||
self.set("cache", db_cache)
|
||||
|
||||
if result["level"] == "safe":
|
||||
return True
|
||||
return result
|
||||
|
||||
return False
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
async def _get_api_keys(self, session):
|
||||
configured_key = self.config["nvidia_api_key"].strip()
|
||||
if configured_key:
|
||||
return [configured_key]
|
||||
|
||||
try:
|
||||
async with session.get(
|
||||
"https://raw.githubusercontent.com/Fixyres/FModules/refs/heads/main/assets/FSecurity/api_keys.txt",
|
||||
timeout=10,
|
||||
) as resp:
|
||||
if resp.status != 200:
|
||||
return[]
|
||||
raw_keys = (await resp.text()).strip()
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
return[key.strip() for key in raw_keys.split(",") if key.strip()]
|
||||
|
||||
def format(self, state, reason="", link=""):
|
||||
link_part = f' (<code>{utils.escape_html(link)}</code>)' if link else ""
|
||||
if state == "unavailable":
|
||||
return f'<b>{self.strings("unavailable").format(link_part)}</b>\n<b>{self.strings("continue")}</b>'
|
||||
if state == "suspicious":
|
||||
return f'<b>{self.strings("suspicious").format(link_part)}</b>\n<blockquote expandable><b>{reason}</b></blockquote>\n<b>{self.strings("continue")}</b>'
|
||||
return f'<b>{self.strings("blocked").format(link_part)}</b>\n<blockquote expandable><b>{reason}</b></blockquote>'
|
||||
|
||||
def buttons(self, task):
|
||||
return [[
|
||||
{"text": "✓", "callback": self.confirm, "args": (task, "yes")},
|
||||
{"text": "✗", "callback": self.confirm, "args": (task, "no")}
|
||||
]]
|
||||
|
||||
def closure_var(self, func, name):
|
||||
raw = getattr(func, "__func__", func)
|
||||
code = getattr(raw, "__code__", None)
|
||||
closure = getattr(raw, "__closure__", None)
|
||||
if not code or not closure or name not in code.co_freevars:
|
||||
return None
|
||||
|
||||
with suppress(Exception):
|
||||
return closure[code.co_freevars.index(name)].cell_contents
|
||||
|
||||
return None
|
||||
|
||||
def restore_hooks(self):
|
||||
with suppress(Exception):
|
||||
inst_reg = getattr(self.modules, "register_module")
|
||||
owner = getattr(inst_reg, "__self__", None)
|
||||
if (
|
||||
owner
|
||||
and owner is not self
|
||||
and owner.__class__.__name__ == self.__class__.__name__
|
||||
):
|
||||
original = getattr(owner, "oreg", None)
|
||||
if original:
|
||||
if getattr(original, "__self__", None) is None:
|
||||
self.modules.register_module = original.__get__(
|
||||
self.modules,
|
||||
self.modules.__class__,
|
||||
)
|
||||
else:
|
||||
self.modules.register_module = original
|
||||
|
||||
with suppress(Exception):
|
||||
inst_load = getattr(self.core, "load_module")
|
||||
raw = getattr(inst_load, "__func__", inst_load)
|
||||
if "FSecurity.patch.<locals>.load" in getattr(raw, "__qualname__", ""):
|
||||
original = self.closure_var(raw, "original")
|
||||
if original:
|
||||
if getattr(original, "__self__", None) is None:
|
||||
self.core.load_module = original.__get__(
|
||||
self.core,
|
||||
self.core.__class__,
|
||||
)
|
||||
else:
|
||||
self.core.load_module = original
|
||||
|
||||
def patch(self):
|
||||
if not self.oreg:
|
||||
self.oreg = getattr(self.modules, "register_module")
|
||||
if not self.oload:
|
||||
self.oload = self.core.load_module
|
||||
|
||||
original = self.oload
|
||||
|
||||
async def load(_, *args, **kwargs):
|
||||
base = utils.answer
|
||||
|
||||
async def answer(message, response, *a, **k):
|
||||
if isinstance(response, str) and "😖</tg-emoji>" in response:
|
||||
body = response.split("😖</tg-emoji>", 1)[1].strip()
|
||||
if body in {"", "<b></b>", "<b> </b>"}:
|
||||
with suppress(Exception):
|
||||
if hasattr(message, "delete"):
|
||||
await message.delete()
|
||||
return message
|
||||
|
||||
if body.startswith("<b>") and body.endswith("</b>"):
|
||||
decoded = html.unescape(body[3:-4])
|
||||
response = response.split("😖</tg-emoji>", 1)[0] + f'😖</tg-emoji> {decoded}' if decoded else response.split("😖</tg-emoji>", 1)[0] + '😖</tg-emoji>'
|
||||
|
||||
try:
|
||||
return await base(message, response, *a, **k)
|
||||
except Exception:
|
||||
with suppress(Exception):
|
||||
return await self._client.send_message(
|
||||
utils.get_chat_id(message),
|
||||
response,
|
||||
reply_to=getattr(message, "reply_to_msg_id", None),
|
||||
buttons=k.get("reply_markup"),
|
||||
)
|
||||
|
||||
return message
|
||||
|
||||
utils.answer = answer
|
||||
try:
|
||||
if getattr(original, "__self__", None) is None:
|
||||
return await original(_, *args, **kwargs)
|
||||
return await original(*args, **kwargs)
|
||||
finally:
|
||||
if utils.answer is answer:
|
||||
utils.answer = base
|
||||
|
||||
self.core.load_module = load.__get__(self.core, self.core.__class__)
|
||||
self.modules.register_module = self.register
|
||||
|
||||
def unpatch(self):
|
||||
if self.oreg:
|
||||
self.modules.register_module = self.oreg
|
||||
if getattr(self, "core", None) and self.oload:
|
||||
self.core.load_module = self.oload
|
||||
|
||||
def context(self):
|
||||
frame = sys._getframe()
|
||||
msg = None
|
||||
fmsg = None
|
||||
is_dlm_lm = False
|
||||
|
||||
while frame:
|
||||
locals = frame.f_locals
|
||||
if (
|
||||
frame.f_code.co_name == "load_module"
|
||||
and locals.get("self") is self.core
|
||||
and 'message' in locals
|
||||
and hasattr(locals['message'], 'edit')
|
||||
):
|
||||
if not msg:
|
||||
msg = locals['message']
|
||||
fmsg = locals.get('msg')
|
||||
|
||||
if frame.f_code.co_name in {"dlmod", "loadmod"}:
|
||||
is_dlm_lm = True
|
||||
if not msg and 'message' in locals and hasattr(locals['message'], 'edit'):
|
||||
msg = locals['message']
|
||||
|
||||
if frame.f_code.co_name == "download_and_install":
|
||||
if not msg and 'message' in locals and hasattr(locals['message'], 'edit'):
|
||||
msg = locals['message']
|
||||
|
||||
frame = frame.f_back
|
||||
|
||||
return msg, fmsg, is_dlm_lm
|
||||
|
||||
def target_chat(self, msg=None, fmsg=None):
|
||||
if not msg:
|
||||
return None
|
||||
|
||||
if not fmsg:
|
||||
return msg
|
||||
|
||||
with suppress(Exception):
|
||||
target = copy.copy(msg)
|
||||
target.reply_to_msg_id = fmsg.id
|
||||
return target
|
||||
|
||||
return None
|
||||
|
||||
async def call_oreg(self, spec, name, origin="<core>", save_fs=False):
|
||||
if getattr(self.oreg, "__self__", None) is None:
|
||||
return await self.oreg(self.modules, spec, name, origin, save_fs=save_fs)
|
||||
return await self.oreg(spec, name, origin, save_fs=save_fs)
|
||||
|
||||
async def register(self, spec, name, origin="<core>", save_fs=False):
|
||||
if origin != "<core>":
|
||||
code = ""
|
||||
|
||||
if hasattr(spec.loader, "data") and spec.loader.data:
|
||||
code = spec.loader.data
|
||||
if isinstance(code, bytes):
|
||||
code = code.decode("utf-8", errors="ignore")
|
||||
elif origin and origin.endswith(".py"):
|
||||
with suppress(Exception):
|
||||
with open(origin, "r", encoding="utf-8") as f:
|
||||
code = f.read()
|
||||
|
||||
if code:
|
||||
check = await self.check(code)
|
||||
|
||||
if check is not True:
|
||||
msg, fmsg, is_dlm_lm = self.context()
|
||||
target = self.target_chat(msg, fmsg)
|
||||
|
||||
if isinstance(check, dict):
|
||||
status = check.get("level", "blocked")
|
||||
reason = check.get("reason", "")
|
||||
else:
|
||||
status = "unavailable"
|
||||
reason = ""
|
||||
|
||||
link = origin if origin.startswith("http") else ""
|
||||
|
||||
if status == "blocked":
|
||||
if msg and target:
|
||||
raise loader.LoadError(self.format("blocked", reason, link))
|
||||
raise loader.LoadError("")
|
||||
|
||||
should_block = is_dlm_lm or self.config["strict_mode"]
|
||||
|
||||
if should_block and not (msg and target):
|
||||
raise loader.LoadError("")
|
||||
|
||||
if should_block and msg and target:
|
||||
task = str(uuid.uuid4())
|
||||
event = asyncio.Event()
|
||||
self.tasks[task] = {"event": event, "decision": False}
|
||||
|
||||
try:
|
||||
form = await self.inline.form(
|
||||
text=self.format(status, reason, link),
|
||||
message=target,
|
||||
reply_markup=self.buttons(task)
|
||||
)
|
||||
|
||||
if not form:
|
||||
raise loader.LoadError(reason)
|
||||
|
||||
await asyncio.wait_for(event.wait(), timeout=180.0)
|
||||
|
||||
if not self.tasks.pop(task)["decision"]:
|
||||
with suppress(Exception):
|
||||
await form.delete()
|
||||
raise loader.LoadError("")
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
self.tasks.pop(task, None)
|
||||
with suppress(Exception):
|
||||
await form.delete()
|
||||
raise loader.LoadError("")
|
||||
except loader.LoadError:
|
||||
raise
|
||||
except Exception:
|
||||
raise loader.LoadError("")
|
||||
|
||||
return await self.call_oreg(spec, name, origin, save_fs=save_fs)
|
||||
|
||||
async def confirm(self, call, task, action):
|
||||
if task in self.tasks:
|
||||
self.tasks[task]["decision"] = (action == "yes")
|
||||
self.tasks[task]["event"].set()
|
||||
with suppress(Exception):
|
||||
await call.delete()
|
||||
87
Fixyres/FModules/LFSecurity.py
Normal file
87
Fixyres/FModules/LFSecurity.py
Normal file
@@ -0,0 +1,87 @@
|
||||
__version__ = (1, 0, 0)
|
||||
|
||||
# meta developer: @NFModules
|
||||
# meta banner: https://raw.githubusercontent.com/Fixyres/FModules/refs/heads/main/assets/FSecurity/banner.png
|
||||
# meta fhsdesc: security, guard, antiscam, antivirus
|
||||
# scope: hikka_min 2.0.0
|
||||
|
||||
# ©️ Fixyres, 2024-2030
|
||||
# 🌐 https://github.com/Fixyres/FModules
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# 🔑 http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
import aiohttp
|
||||
import os
|
||||
from .. import loader
|
||||
|
||||
|
||||
@loader.tds
|
||||
class FSecurity(loader.Module):
|
||||
"""Module for automatic AI-based security checks of installed modules."""
|
||||
|
||||
strings = {
|
||||
"name": "FSecurity"
|
||||
}
|
||||
|
||||
strings_ru = {
|
||||
"_cls_doc": "Модуль для автоматической проверки устанавливаемых модулей через ИИ."
|
||||
}
|
||||
|
||||
strings_ua = {
|
||||
"_cls_doc": "Модуль для автоматичної перевірки встановлюваних модулів через ШІ."
|
||||
}
|
||||
|
||||
strings_de = {
|
||||
"_cls_doc": "Modul zur automatischen Prüfung installierter Module mit KI."
|
||||
}
|
||||
|
||||
strings_jp = {
|
||||
"_cls_doc": "AIでインストールされるモジュールを自動チェックするモジュール。"
|
||||
}
|
||||
|
||||
strings_tr = {
|
||||
"_cls_doc": "Kurulan modülleri yapay zeka ile otomatik kontrol eden modül."
|
||||
}
|
||||
|
||||
strings_uz = {
|
||||
"_cls_doc": "O'rnatilayotgan modullarni AI orqali avtomatik tekshiruvchi modul."
|
||||
}
|
||||
|
||||
strings_kz = {
|
||||
"_cls_doc": "Орнатылатын модульдерді ЖИ арқылы автоматты тексеретін модуль."
|
||||
}
|
||||
|
||||
async def client_ready(self, client, db):
|
||||
core = self.lookup("loader")
|
||||
|
||||
try:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(
|
||||
"https://raw.githubusercontent.com/Fixyres/FModules/refs/heads/main/FSecurity.py",
|
||||
timeout=aiohttp.ClientTimeout(total=15),
|
||||
) as resp:
|
||||
if resp.status != 200:
|
||||
return
|
||||
source = await resp.text()
|
||||
except Exception:
|
||||
return
|
||||
|
||||
target = os.path.join(
|
||||
os.path.dirname(loader.__file__),
|
||||
"modules",
|
||||
"FSecurity.py",
|
||||
)
|
||||
|
||||
try:
|
||||
with open(target, "w", encoding="utf-8") as f:
|
||||
f.write(source)
|
||||
except Exception:
|
||||
return
|
||||
|
||||
await core.unload_module("FSecurity")
|
||||
try:
|
||||
await core.load_module(source, None, "FSecurity", target, save_fs=False)
|
||||
except Exception:
|
||||
pass
|
||||
1
Fixyres/FModules/assets/FSecurity/api_keys.txt
Normal file
1
Fixyres/FModules/assets/FSecurity/api_keys.txt
Normal file
@@ -0,0 +1 @@
|
||||
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
|
||||
BIN
Fixyres/FModules/assets/FSecurity/banner.png
Normal file
BIN
Fixyres/FModules/assets/FSecurity/banner.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 166 KiB |
25
Fixyres/FModules/assets/FSecurity/prompts/chank.txt
Normal file
25
Fixyres/FModules/assets/FSecurity/prompts/chank.txt
Normal file
@@ -0,0 +1,25 @@
|
||||
You must strictly follow these rules with no exceptions.
|
||||
|
||||
Analyze only part {current}/{total} of a Hikka userbot Python module. Do NOT give a final verdict for the whole module.
|
||||
Previous context: {previous_context}
|
||||
|
||||
BLOCKED: encrypted/obfuscated code (base64, marshal, zlib, rot13, encoded exec, or any technique hiding real logic), account deletion, mass scam/spam/ads to all chats on load, session/auth_key/2FA exfiltration, bulk message/dialog dump to external destination, string "FSecurity" (if found → findings must be ONLY: "Attempted interaction with FSecurity." translated to {lang}, nothing else).
|
||||
SUSPICIOUS: watcher/scheduler/client_ready auto-installing modules without owner confirmation, download + exec of remote Python code without confirmation, runtime pip install or library download, third-party OAuth redirect.
|
||||
CLEAN: no security issue in this chunk.
|
||||
|
||||
Tie-breaking: BLOCKED vs SUSPICIOUS → SUSPICIOUS. SUSPICIOUS vs CLEAN → CLEAN.
|
||||
@loader.inline_handler, @loader.command, async def NAMEcmd, async def NAME_inline_handler = owner-only by default = not a threat.
|
||||
Owner-triggered exec/eval/shell = not a threat.
|
||||
A command (any function decorated with @loader.command, named NAMEcmd, or accessible only to the owner) that executes arbitrary code, runs shell commands, evaluates expressions, or calls exec/eval on owner-provided input = always CLEAN, never SUSPICIOUS. This is a standard userbot feature.
|
||||
|
||||
Respond ONLY with valid JSON:
|
||||
{"chunk_verdict":"CLEAN|SUSPICIOUS|BLOCKED","findings":"..."}
|
||||
|
||||
Findings rules (when not CLEAN):
|
||||
- Write in {lang}. Max 1000 chars.
|
||||
- Technical analysis for reading, not a reply. No "I found", no "you should". Third person only.
|
||||
- Do NOT mention which rule was triggered. Just describe what the code does.
|
||||
- Only the key threats in this chunk. Reference approximate line numbers within the chunk.
|
||||
- Use <code>text</code> for code references: function names, variables, URLs, string literals.
|
||||
- For obfuscation chains, wrap the whole chain in one <code> block: <code>base64.b64decode → zlib.decompress → exec</code>.
|
||||
- If CLEAN → findings must be empty string "".
|
||||
29
Fixyres/FModules/assets/FSecurity/prompts/final.txt
Normal file
29
Fixyres/FModules/assets/FSecurity/prompts/final.txt
Normal file
@@ -0,0 +1,29 @@
|
||||
You must strictly follow these rules with no exceptions.
|
||||
|
||||
A Hikka userbot module was split into {total} parts. Chunk findings:
|
||||
{findings}
|
||||
|
||||
Give the final verdict for the entire module based on all findings above.
|
||||
|
||||
BLOCKED: encrypted/obfuscated code, account deletion, mass scam/spam on load, session/auth_key theft, bulk message dump, string "FSecurity" (if found → summary must be ONLY: "Attempted interaction with FSecurity." translated to {lang}, nothing else).
|
||||
SUSPICIOUS: auto-install modules without confirmation, remote code download + exec without confirmation, runtime pip/library install, third-party OAuth redirect.
|
||||
SAFE: no real security issue across all parts.
|
||||
|
||||
Auto-install = SUSPICIOUS, never BLOCKED.
|
||||
Tie-breaking: BLOCKED vs SUSPICIOUS → SUSPICIOUS. SUSPICIOUS vs SAFE → SAFE.
|
||||
@loader.inline_handler, @loader.command, async def NAMEcmd, async def NAME_inline_handler = owner-only by default = not a threat.
|
||||
Owner-triggered exec/eval/shell = not a threat.
|
||||
A command (any function decorated with @loader.command, named NAMEcmd, or accessible only to the owner) that executes arbitrary code, runs shell commands, evaluates expressions, or calls exec/eval on owner-provided input = always SAFE, never SUSPICIOUS. This is a standard userbot feature.
|
||||
|
||||
Respond ONLY with valid JSON:
|
||||
{"verdict":"SAFE|SUSPICIOUS|BLOCKED","summary":"..."}
|
||||
|
||||
Summary rules (when not SAFE):
|
||||
- Write in {lang}. Max 1000 chars.
|
||||
- Combine the most important findings into one coherent technical analysis.
|
||||
- This is a report for reading, NOT a reply to a person. No "I found", no "you should". Third person only.
|
||||
- Do NOT mention which rule was triggered or explain criteria. Just describe what the code does.
|
||||
- Only the key threats. Reference line numbers from findings where available.
|
||||
- Use <code>text</code> for all code references: function names, variables, URLs, string literals.
|
||||
- For obfuscation, show the full chain in one <code> block: <code>base64.b64decode → zlib.decompress → exec</code>.
|
||||
- If SAFE → summary must be empty string "".
|
||||
37
Fixyres/FModules/assets/FSecurity/prompts/main.txt
Normal file
37
Fixyres/FModules/assets/FSecurity/prompts/main.txt
Normal file
@@ -0,0 +1,37 @@
|
||||
You must strictly follow these classification rules with no exceptions.
|
||||
|
||||
Classify a Hikka userbot Python module as BLOCKED, SUSPICIOUS, or SAFE.
|
||||
|
||||
BLOCKED (any single match):
|
||||
- Code is encrypted or obfuscated (base64, marshal, zlib, rot13, compile+exec of encoded data, or any technique that hides real logic).
|
||||
- Attempts to delete Telegram account (DeleteAccountRequest, client.delete_account, or equivalent).
|
||||
- On load (client_ready, __init__) automatically sends scam, spam, or ads to all chats/dialogs/contacts without owner action.
|
||||
- Steals and sends session string, auth_key, or 2FA password anywhere outside the device.
|
||||
- Collects and forwards all messages or dialogs to any external destination.
|
||||
- Contains the string "FSecurity" → summary must be ONLY: "Attempted interaction with FSecurity." translated to {lang}. Nothing else, no extra text.
|
||||
|
||||
SUSPICIOUS (any single match, only if BLOCKED did not trigger):
|
||||
- Watcher, scheduler, or client_ready auto-installs modules from any URL without per-action owner confirmation.
|
||||
- Downloads and executes remote Python code (exec/eval on fetched content) without owner confirmation.
|
||||
- Installs pip packages or downloads Python libraries at runtime from the internet.
|
||||
- OAuth or auth flow redirected through a non-official third-party domain.
|
||||
|
||||
SAFE: everything that does not match any rule above.
|
||||
- Owner-triggered exec/eval/shell = always SAFE.
|
||||
- A command (any function decorated with @loader.command, named NAMEcmd, or accessible only to the owner) that executes arbitrary code, runs shell commands, evaluates expressions, or calls exec/eval on owner-provided input = always SAFE, never SUSPICIOUS. This is a standard feature of userbots and poses no threat.
|
||||
- @loader.inline_handler, @loader.command, async def NAMEcmd, async def NAME_inline_handler = owner-only by default (no public access without explicit permission) = SAFE.
|
||||
|
||||
Tie-breaking: BLOCKED vs SUSPICIOUS → SUSPICIOUS. SUSPICIOUS vs SAFE → SAFE.
|
||||
|
||||
Respond ONLY with valid JSON:
|
||||
{"verdict":"SAFE|SUSPICIOUS|BLOCKED","summary":"..."}
|
||||
|
||||
Summary rules (when not SAFE):
|
||||
- Write in {lang}. Max 1000 chars.
|
||||
- This is a technical analysis meant to be read, NOT a reply to a person. Never write "I found", "you should", "I recommend". Write in third person.
|
||||
- Do NOT mention which rule was triggered or explain the classification criteria. Just describe what the code does.
|
||||
- Point out ONLY the key threats. Do NOT describe what the module does overall or list safe parts.
|
||||
- Reference the approximate line number where dangerous code appears: "line NN —".
|
||||
- Use <code>text</code> for every code reference: function names, variables, URLs, string literals.
|
||||
- For obfuscation, show the full decoding chain inside one <code> block: <code>base64.b64decode → zlib.decompress → marshal.loads → exec</code>.
|
||||
- If SAFE → summary must be empty string "".
|
||||
204
fiksofficial/python-modules/IwaAnimation.py
Normal file
204
fiksofficial/python-modules/IwaAnimation.py
Normal file
@@ -0,0 +1,204 @@
|
||||
# ______ ___ ___ _ _
|
||||
# ____ | ___ \ | \/ | | | | |
|
||||
# / __ \| |_/ / _| . . | ___ __| |_ _| | ___
|
||||
# / / _` | __/ | | | |\/| |/ _ \ / _` | | | | |/ _ \
|
||||
# | | (_| | | | |_| | | | | (_) | (_| | |_| | | __/
|
||||
# \ \__,_\_| \__, \_| |_/\___/ \__,_|\__,_|_|\___|
|
||||
# \____/ __/ |
|
||||
# |___/
|
||||
|
||||
# На модуль распространяется лицензия "GNU General Public License v3.0"
|
||||
# https://github.com/all-licenses/GNU-General-Public-License-v3.0
|
||||
|
||||
# meta developer: @pymodule
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import toml
|
||||
|
||||
from .. import loader, utils
|
||||
from herokutl.tl.types import Message
|
||||
|
||||
|
||||
@loader.tds
|
||||
class IwaAnimation(loader.Module):
|
||||
"""Frame-by-frame text animations loaded from .anim TOML files"""
|
||||
|
||||
strings = {
|
||||
"name": "IwaAnimation",
|
||||
"err_no_reply": "<b>{e} Reply to a .anim file.</b>",
|
||||
"err_not_anim": "<b>{e} File must have .anim extension.</b>",
|
||||
"err_bad_format": "<b>{e} Invalid file format (missing name or cmd).</b>",
|
||||
"err_no_frames": "<b>{e} No frames found in the file.</b>",
|
||||
"err_not_found": "<b>{e} Animation not found.</b>",
|
||||
"err_no_cmd": "<b>{e} Specify a command name.</b>",
|
||||
"err_generic": "<b>{e} Error:</b>\n\n{exc}",
|
||||
"ok_loaded": "<b>{s} Loaded: {name}\nCommand: <code>.anim {cmd}</code></b>",
|
||||
"ok_deleted": "<b>{s} Deleted.</b>",
|
||||
"list_header": "<blockquote><b>Animations:</b></blockquote>\n\n<blockquote expandable><b>",
|
||||
"list_row": "• <code>{cmd}</code> — {name} ({n} frames)\n",
|
||||
"list_footer": "</b></blockquote>",
|
||||
"list_empty": "<b>{e} No animations.</b>",
|
||||
}
|
||||
|
||||
strings_ru = {
|
||||
"name": "IwaAnimation",
|
||||
"err_no_reply": "<b>{e} Ответьте на .anim файл.</b>",
|
||||
"err_not_anim": "<b>{e} Файл должен быть формата .anim</b>",
|
||||
"err_bad_format": "<b>{e} Неверный формат файла (нет name или cmd).</b>",
|
||||
"err_no_frames": "<b>{e} В файле нет кадров.</b>",
|
||||
"err_not_found": "<b>{e} Анимация не найдена.</b>",
|
||||
"err_no_cmd": "<b>{e} Укажи команду.</b>",
|
||||
"err_generic": "<b>{e} Ошибка:</b>\n\n{exc}",
|
||||
"ok_loaded": "<b>{s} Загружено: {name}\nКоманда: <code>.anim {cmd}</code></b>",
|
||||
"ok_deleted": "<b>{s} Удалено.</b>",
|
||||
"list_header": "<blockquote><b>Анимации:</b></blockquote>\n\n<blockquote expandable><b>",
|
||||
"list_row": "• <code>{cmd}</code> — {name} ({n} кадров)\n",
|
||||
"list_footer": "</b></blockquote>",
|
||||
"list_empty": "<b>{e} Нет анимаций.</b>",
|
||||
}
|
||||
|
||||
_E = "<emoji document_id=5774077015388852135>❌</emoji>"
|
||||
_S = "<emoji document_id=5774022692642492953>✅</emoji>"
|
||||
|
||||
async def client_ready(self):
|
||||
if not self.db.get("IwaAnimations", "anims", False):
|
||||
self.db.set("IwaAnimations", "anims", {})
|
||||
|
||||
@loader.command(ru_doc="- Загрузить анимацию из полученного .anim файла")
|
||||
async def lanimcmd(self, message: Message):
|
||||
"""- Load animation from a replied .anim file"""
|
||||
reply = await message.get_reply_message()
|
||||
if not reply or not reply.document:
|
||||
return await utils.answer(
|
||||
message, self.strings["err_no_reply"].format(e=self._E)
|
||||
)
|
||||
|
||||
filename = reply.file.name or ""
|
||||
if not filename.endswith(".anim"):
|
||||
return await utils.answer(
|
||||
message, self.strings["err_not_anim"].format(e=self._E)
|
||||
)
|
||||
|
||||
tmp = "anim_load.anim"
|
||||
await reply.download_media(tmp)
|
||||
try:
|
||||
data = toml.load(tmp)
|
||||
name = data.get("name")
|
||||
cmd = data.get("cmd")
|
||||
delay = float(data.get("time", 0.5))
|
||||
|
||||
if not name or not cmd:
|
||||
return await utils.answer(
|
||||
message, self.strings["err_bad_format"].format(e=self._E)
|
||||
)
|
||||
|
||||
frames = []
|
||||
for key in sorted(
|
||||
(k for k in data if str(k).isdigit()), key=lambda x: int(x)
|
||||
):
|
||||
frame = data[key]
|
||||
frames.append("\n".join(frame) if isinstance(frame, list) else str(frame))
|
||||
|
||||
if not frames:
|
||||
return await utils.answer(
|
||||
message, self.strings["err_no_frames"].format(e=self._E)
|
||||
)
|
||||
|
||||
anims = self.db.get("IwaAnimations", "anims", {})
|
||||
anims[cmd] = {"name": name, "frames": frames, "delay": delay}
|
||||
self.db.set("IwaAnimations", "anims", anims)
|
||||
|
||||
await utils.answer(
|
||||
message,
|
||||
self.strings["ok_loaded"].format(s=self._S, name=name, cmd=cmd),
|
||||
)
|
||||
except Exception as exc:
|
||||
await utils.answer(
|
||||
message, self.strings["err_generic"].format(e=self._E, exc=exc)
|
||||
)
|
||||
finally:
|
||||
if os.path.exists(tmp):
|
||||
os.remove(tmp)
|
||||
|
||||
@loader.command(ru_doc="<cmd> - Воспроизвести загруженную анимацию")
|
||||
async def animcmd(self, message: Message):
|
||||
"""<cmd> - Play a loaded animation"""
|
||||
cmd = utils.get_args_raw(message)
|
||||
if not cmd:
|
||||
return await utils.answer(
|
||||
message, self.strings["err_no_cmd"].format(e=self._E)
|
||||
)
|
||||
|
||||
anims = self.db.get("IwaAnimations", "anims", {})
|
||||
if cmd not in anims:
|
||||
return await utils.answer(
|
||||
message, self.strings["err_not_found"].format(e=self._E)
|
||||
)
|
||||
|
||||
anim = anims[cmd]
|
||||
msg = await utils.answer(message, anim["frames"][0])
|
||||
try:
|
||||
for frame in anim["frames"][1:]:
|
||||
await asyncio.sleep(anim["delay"])
|
||||
await msg.edit(frame)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@loader.command(ru_doc="- Отобразить список всех загруженных анимаций")
|
||||
async def animscmd(self, message: Message):
|
||||
"""- List all loaded animations"""
|
||||
anims = self.db.get("IwaAnimations", "anims", {})
|
||||
if not anims:
|
||||
return await utils.answer(
|
||||
message, self.strings["list_empty"].format(e=self._E)
|
||||
)
|
||||
|
||||
text = self.strings["list_header"]
|
||||
for cmd, data in anims.items():
|
||||
text += self.strings["list_row"].format(
|
||||
cmd=cmd, name=data["name"], n=len(data["frames"])
|
||||
)
|
||||
text += self.strings["list_footer"]
|
||||
await utils.answer(message, text)
|
||||
|
||||
@loader.command(ru_doc="<cmd> - Удалить анимацию")
|
||||
async def delanimcmd(self, message: Message):
|
||||
"""<cmd> - Delete an animation"""
|
||||
cmd = utils.get_args_raw(message)
|
||||
anims = self.db.get("IwaAnimations", "anims", {})
|
||||
|
||||
if cmd not in anims:
|
||||
return await utils.answer(
|
||||
message, self.strings["err_not_found"].format(e=self._E)
|
||||
)
|
||||
|
||||
anims.pop(cmd)
|
||||
self.db.set("IwaAnimations", "anims", anims)
|
||||
await utils.answer(message, self.strings["ok_deleted"].format(s=self._S))
|
||||
|
||||
@loader.command(ru_doc="<cmd> - Экспорт анимации в файл .anim")
|
||||
async def dumpanimcmd(self, message: Message):
|
||||
"""<cmd> - Export an animation to a .anim file"""
|
||||
cmd = utils.get_args_raw(message)
|
||||
anims = self.db.get("IwaAnimations", "anims", {})
|
||||
|
||||
if cmd not in anims:
|
||||
return await utils.answer(
|
||||
message, self.strings["err_not_found"].format(e=self._E)
|
||||
)
|
||||
|
||||
anim = anims[cmd]
|
||||
data = {"name": anim["name"], "cmd": cmd, "time": str(anim["delay"])}
|
||||
for i, frame in enumerate(anim["frames"], start=1):
|
||||
data[str(i)] = frame.split("\n")
|
||||
|
||||
file = f"{cmd}.anim"
|
||||
try:
|
||||
with open(file, "w", encoding="utf-8") as f:
|
||||
toml.dump(data, f)
|
||||
await message.delete()
|
||||
await self._client.send_file(message.to_id, file)
|
||||
finally:
|
||||
if os.path.exists(file):
|
||||
os.remove(file)
|
||||
@@ -27,3 +27,4 @@ github
|
||||
stream
|
||||
placeholders+
|
||||
PyInstall
|
||||
IwaAnimation
|
||||
Reference in New Issue
Block a user