Compare commits

..

9 Commits

Author SHA1 Message Date
github-actions[bot]
fcb69ca463 Updated modules.json after parse 2026-06-08 02:54:16 2026-06-08 02:54:16 +00:00
github-actions[bot]
08cb7fdb81 Added and updated repositories 2026-06-08 02:53:47 2026-06-08 02:53:47 +00:00
Macsim
2ed246b9ad Merge pull request #301 from MuRuLOSE/update-submodules_d279789b37a939b3d9ececce6b4d0e1992293c23
Update of repositories 2026-05-31 02:48:09
2026-06-01 00:23:40 +03:00
github-actions[bot]
837784206f Updated modules.json after parse 2026-05-31 02:47:46 2026-05-31 02:47:46 +00:00
github-actions[bot]
811beb2b74 Added and updated repositories 2026-05-31 02:47:15 2026-05-31 02:47:16 +00:00
Zahar Vanilovv
d279789b37 Merge pull request #273 from MuRuLOSE/update-submodules_63944822139e8b6869fe2250ad8c650e9db06765
Update of repositories 2026-05-03 02:11:45
2026-05-03 05:14:12 +03:00
github-actions[bot]
74dfe4caf8 Updated modules.json after parse 2026-05-03 02:11:26 2026-05-03 02:11:26 +00:00
github-actions[bot]
18b8247e21 Added and updated repositories 2026-05-03 02:10:53 2026-05-03 02:10:53 +00:00
6394482213 fix: banners now will be visible no matter what 2026-04-25 09:23:54 +03:00
31 changed files with 68779 additions and 66221 deletions

View File

@@ -42,6 +42,7 @@ async def to_code(n: int) -> str:
n_shifted //= 31 n_shifted //= 31
return "X" + "".join(reversed(res)) return "X" + "".join(reversed(res))
@loader.tds @loader.tds
class BSR(loader.Module): class BSR(loader.Module):
'''Module for finding nearby game rooms in BrawlStars.''' '''Module for finding nearby game rooms in BrawlStars.'''
@@ -139,7 +140,7 @@ class BSR(loader.Module):
'''(room code/link) (previous) (next) - find rooms.''' '''(room code/link) (previous) (next) - find rooms.'''
args = utils.get_args_raw(message).split() args = utils.get_args_raw(message).split()
if not args: if not args:
return await utils.answer(message, self.strings("invalid_args").format(prefix=self.get_prefix())) return await utils.answer(message, self.strings["invalid_args"].format(prefix=self.get_prefix()))
raw_input = args[0] raw_input = args[0]
before = 0 before = 0
@@ -161,13 +162,13 @@ class BSR(loader.Module):
nxt = max(0, min(nxt, 5000)) nxt = max(0, min(nxt, 5000))
if before == 0 and nxt == 0: if before == 0 and nxt == 0:
return await utils.answer(message, self.strings("at_least_one")) return await utils.answer(message, self.strings["at_least_one"])
clean_tag = await extract_code(raw_input) clean_tag = await extract_code(raw_input)
base_id = await to_id(clean_tag) base_id = await to_id(clean_tag)
if base_id == 0: if base_id == 0:
return await utils.answer(message, self.strings("invalid_code")) return await utils.answer(message, self.strings["invalid_code"])
text, page, total_pages = await self.get_page_content(base_id, before, nxt, 0) text, page, total_pages = await self.get_page_content(base_id, before, nxt, 0)
kb = self.build_keyboard(base_id, before, nxt, page, total_pages, clean_tag) kb = self.build_keyboard(base_id, before, nxt, page, total_pages, clean_tag)
@@ -205,10 +206,10 @@ class BSR(loader.Module):
blocks = [] blocks = []
if prev_list: if prev_list:
blocks.append(self.strings("prev_block").format(prev_list="\n".join(prev_list))) blocks.append(self.strings["prev_block"].format(prev_list="\n".join(prev_list)))
if next_list: if next_list:
blocks.append(self.strings("next_block").format(next_list="\n".join(next_list))) blocks.append(self.strings["next_block"].format(next_list="\n".join(next_list)))
res = "\n\n".join(blocks) res = "\n\n".join(blocks)
if not res.strip(): if not res.strip():
@@ -220,7 +221,7 @@ class BSR(loader.Module):
kb = [ kb = [
[ [
{ {
"text": self.strings("btn_target"), "text": self.strings["btn_target"],
"copy": clean_tag "copy": clean_tag
} }
] ]

View File

@@ -19,15 +19,19 @@ import re
import sys import sys
import uuid import uuid
import importlib import importlib
from contextlib import suppress
from typing import Optional, Dict, List, Union, Tuple, Any from typing import Optional, Dict, List, Union, Tuple, Any
from urllib.parse import unquote from urllib.parse import unquote
from importlib.machinery import ModuleSpec from importlib.machinery import ModuleSpec
import telethon
from .. import loader, utils from .. import loader, utils
from ..types import CoreOverwriteError from ..types import CoreOverwriteError
from herokutl.tl.functions.contacts import UnblockRequest from herokutl.tl.functions.contacts import UnblockRequest
from aiogram.types import InlineQueryResultArticle, InputTextMessageContent, LinkPreviewOptions, ChosenInlineResult, CallbackQuery, Message
try:
from aiogram.types import InlineQueryResultArticle, InputTextMessageContent, LinkPreviewOptions
except ImportError:
InlineQueryResultArticle = InputTextMessageContent = LinkPreviewOptions = Any
class FHetaAPI: class FHetaAPI:
@@ -70,14 +74,14 @@ class FHetaAPI:
return {} return {}
except Exception: except Exception:
return {} return {}
class MInstaller: class MInstaller:
async def execute(self, plugin: 'loader.Module', url: str) -> Tuple[str, List[str]]: async def execute(self, plugin: 'loader.Module', url: str) -> Tuple[str, List[str]]:
try: try:
code = await plugin._storage.fetch(url, auth=plugin.config.get("basic_auth")) code = await plugin._storage.fetch(url, auth=plugin.config.get("basic_auth"))
except Exception: except Exception:
return "error", [] return "error",[]
for step in range(5): for step in range(5):
state = await self.load(plugin, code, url, step) state = await self.load(plugin, code, url, step)
@@ -85,10 +89,10 @@ class MInstaller:
if state == "success": if state == "success":
if plugin.fully_loaded: if plugin.fully_loaded:
plugin.update_modules_in_db() plugin.update_modules_in_db()
return "success", [] return "success",[]
if state == "overwrite": if state == "overwrite":
return "overwrite", [] return "overwrite",[]
if isinstance(state, list): if isinstance(state, list):
return "dependency", state return "dependency", state
@@ -98,35 +102,37 @@ class MInstaller:
await asyncio.sleep(0.5) await asyncio.sleep(0.5)
return "dependency", [] return "dependency",[]
async def load(self, plugin: 'loader.Module', code: str, origin: str, step: int) -> Union[str, List[str]]: async def load(self, plugin: 'loader.Module', code: str, origin: str, step: int) -> Union[str, List[str]]:
if step == 0: if step == 0:
try: try:
dependencies = list(filter( raw_pip = loader.VALID_PIP_PACKAGES.search(code)
lambda requirement: not requirement.startswith(("-", "_", ".")), if raw_pip:
map(lambda raw: raw.strip().rstrip(','), loader.VALID_PIP_PACKAGES.search(code)[1].split()) dependencies = [
)) dep.strip() for dep in raw_pip[1].replace(',', ' ').split()
if dep.strip() and not dep.strip().startswith(("-", "_", "."))
if dependencies: ]
if not await plugin.install_requirements(dependencies):
return dependencies if dependencies:
importlib.invalidate_caches() await plugin.install_requirements(dependencies)
return "retry" importlib.invalidate_caches()
return "retry"
except Exception: except Exception:
pass pass
try: try:
packages = list(filter( raw_apt = loader.VALID_APT_PACKAGES.search(code)
lambda requirement: not requirement.startswith(("-", "_", ".")), if raw_apt:
map(lambda raw: raw.strip().rstrip(','), loader.VALID_APT_PACKAGES.search(code)[1].split()) packages = [
)) pkg.strip() for pkg in raw_apt[1].replace(',', ' ').split()
if pkg.strip() and not pkg.strip().startswith(("-", "_", "."))
if packages: ]
if not await plugin.install_packages(packages):
return packages if packages:
importlib.invalidate_caches() await plugin.install_packages(packages)
return "retry" importlib.invalidate_caches()
return "retry"
except Exception: except Exception:
pass pass
@@ -173,8 +179,8 @@ class MInstaller:
finally: finally:
if instance and sys.exc_info()[0] is not None: if instance and sys.exc_info()[0] is not None:
with suppress(Exception): await plugin.allmodules.unload_module(instance.__class__.__name__)
await plugin.allmodules.unload_module(instance.__class__.__name__) if instance in plugin.allmodules.modules:
plugin.allmodules.modules.remove(instance) plugin.allmodules.modules.remove(instance)
@@ -226,11 +232,17 @@ class FHetaUI:
description = utils.escape_html(description).split('\n')[0] if description else "" description = utils.escape_html(description).split('\n')[0] if description else ""
name = utils.escape_html(item.get("name", "")) name = utils.escape_html(item.get("name", ""))
if kind == "cmd": if item.get('inline'):
character = '@' + self.main.inline.bot_username + ' ' if item.get('inline') else self.main.get_prefix() character = '@' + self.main.inline.bot_username + ' '
row = f"<code>{character}{name}</code> {description}".strip() display_name = name
elif kind == "ph":
character = ""
display_name = f"{{{name}}}"
else: else:
row = f"<code>{{{name}}}</code> {description}".strip() character = self.main.get_prefix()
display_name = name
row = f"<code>{character}{display_name}</code> {description}".strip()
extra = f"<i>{self.main.strings[more].format(remaining=len(items) - index)}</i>" extra = f"<i>{self.main.strings[more].format(remaining=len(items) - index)}</i>"
test = "\n".join(lines + [row, extra]) test = "\n".join(lines + [row, extra])
@@ -242,7 +254,7 @@ class FHetaUI:
lines.append(row) lines.append(row)
return f"\n\n{self.emoji('command' if kind == 'cmd' else 'placeholder')} <b>{self.main.strings[title]}:</b>\n<blockquote expandable>{chr(10).join(lines)}</blockquote>" return f"\n\n{self.emoji('command' if kind == 'cmd' else 'placeholder')} <b>{self.main.strings[title]}:</b>\n<blockquote expandable>{chr(10).join(lines)}</blockquote>"
def buttons(self, link: str, stats: Dict[str, Any], index: int, modules: Optional[List[Dict[str, Any]]] = None, query: str = "") -> List[List[Dict[str, Any]]]: def buttons(self, link: str, stats: Dict[str, Any], index: int, modules: Optional[List[Dict[str, Any]]] = None, query: str = "") -> List[List[Dict[str, Any]]]:
buttons = [] buttons = []
decoded = unquote(link.replace('%20', '___SPACE___')).replace('___SPACE___', '%20') decoded = unquote(link.replace('%20', '___SPACE___')).replace('___SPACE___', '%20')
@@ -362,7 +374,7 @@ class FHeta(loader.Module):
"counter": "{idx}/{total}", "counter": "{idx}/{total}",
"code": "Код", "code": "Код",
"success": "✔ Модуль успешно установлен!", "success": "✔ Модуль успешно установлен!",
"error": "✘ Ошибка, возможно, модуль поломан!", "error": "✘ Ошибка, возможно, модуль сломан!",
"overwrite": "✘ Ошибка, модуль пытался перезаписать встроенный модуль!", "overwrite": "✘ Ошибка, модуль пытался перезаписать встроенный модуль!",
"dependency": "✘ Ошибка установки зависимостей! {deps}", "dependency": "✘ Ошибка установки зависимостей! {deps}",
"docdevs": "Использовать только модули от официальных разработчиков Heroku при поиске?", "docdevs": "Использовать только модули от официальных разработчиков Heroku при поиске?",
@@ -416,7 +428,7 @@ class FHeta(loader.Module):
"search": "{query} сұрауы бойынша іздеу...", "search": "{query} сұрауы бойынша іздеу...",
"noquery": "Сіз іздеу сұрауын енгізбедіңіз, мысал: {prefix}fheta сіздің сұрауыңыз", "noquery": "Сіз іздеу сұрауын енгізбедіңіз, мысал: {prefix}fheta сіздің сұрауыңыз",
"notfound": "{query} сұрауы бойынша ештеңе табылмады.", "notfound": "{query} сұрауы бойынша ештеңе табылмады.",
"toolong": "Сіздің сұрауыңыз тым үлкен, оны 168 таңбаға дейін қысқартыңыз.", "toolong": "Сіздің сұрауыңыз тым үлкен, оны 168 таңбаға до қысқартыңыз.",
"added": "✔ Бағалау қосылды!", "added": "✔ Бағалау қосылды!",
"changed": "✔ Бағалау өзгертілді!", "changed": "✔ Бағалау өзгертілді!",
"deleted": "✔ Бағалау жойылды!", "deleted": "✔ Бағалау жойылды!",
@@ -453,7 +465,7 @@ class FHeta(loader.Module):
"added": "✔ Reyting qo'shildi!", "added": "✔ Reyting qo'shildi!",
"changed": "✔ Reyting o'zgartirildi!", "changed": "✔ Reyting o'zgartirildi!",
"deleted": "✔ Reyting o'chirildi!", "deleted": "✔ Reyting o'chirildi!",
"prompt": "Qidirish uchun so'rov kiriting.", "prompt": "Qidirish o'rniga so'rov kiritish.",
"hint": "Nomi, buyruq, tavsif, muallif.", "hint": "Nomi, buyruq, tavsif, muallif.",
"retry": "Boshqa so'rovni sinab ko'ring.", "retry": "Boshqa so'rovni sinab ko'ring.",
"query": "So'rov", "query": "So'rov",
@@ -465,7 +477,7 @@ class FHeta(loader.Module):
"overwrite": "✘ Xatolik, modul o'rnatilgan modulni qayta yozishga harakat qildi!", "overwrite": "✘ Xatolik, modul o'rnatilgan modulni qayta yozishga harakat qildi!",
"dependency": "✘ Bog'liqliklarni o'rnatish xatosi! {deps}", "dependency": "✘ Bog'liqliklarni o'rnatish xatosi! {deps}",
"docdevs": "Qidiruv paytida faqat rasmiy Heroku ishlab chiquvchilarining modullaridan foydalanish kerakmi?", "docdevs": "Qidiruv paytida faqat rasmiy Heroku ishlab chiquvchilarining modullaridan foydalanish kerakmi?",
"doctheme": "Emojilar uchun mavzu.", "doctheme": "Emojilar uchun mavзу.",
"channel": "Bu FHeta-dagi barcha yangilanishlari bo'lgan kanal!" "channel": "Bu FHeta-dagi barcha yangilanishlari bo'lgan kanal!"
} }
@@ -530,9 +542,9 @@ class FHeta(loader.Module):
"error": "✘ Fehler, vielleicht ist das Modul kaputt!", "error": "✘ Fehler, vielleicht ist das Modul kaputt!",
"overwrite": "✘ Fehler, Modul hat versucht, das integrierte Modul zu überschreiben!", "overwrite": "✘ Fehler, Modul hat versucht, das integrierte Modul zu überschreiben!",
"dependency": "✘ Fehler bei der Installation von Abhängigkeiten! {deps}", "dependency": "✘ Fehler bei der Installation von Abhängigkeiten! {deps}",
"docdevs": "Nur Module von offiziellen Heroku-Entwicklern bei der Suche verwenden?", "docdevs": "Nur Module von offiziellen Heroku-Entwicklern bei की खोज में उपयोग करें?",
"doctheme": "Thema für Emojis.", "doctheme": "Theма для эмодзи.",
"channel": "Dies ist der Kanal mit allen Updates in FHeta!" "channel": "Dies ist der Kanal with all updates in FHeta!"
} }
strings_jp = { strings_jp = {
@@ -560,8 +572,8 @@ class FHeta(loader.Module):
"counter": "{idx}/{total}", "counter": "{idx}/{total}",
"code": "コード", "code": "コード",
"success": "✔ モジュールが正常にインストールされました!", "success": "✔ モジュールが正常にインストールされました!",
"error": "✘ エラーモジュールが壊れている可能性があります!", "error": "✘ エラー, モジュールが壊れている可能性があります!",
"overwrite": "✘ エラーモジュールが組み込みモジュールを上書きしようとしました!", "overwrite": "✘ エラー, モジュールが組み込みモジュールを上書きしようとしました!",
"dependency": "✘ 依存関係のインストールエラー! {deps}", "dependency": "✘ 依存関係のインストールエラー! {deps}",
"docdevs": "検索時に公式Heroku開発者のモジュールのみを使用しますか", "docdevs": "検索時に公式Heroku開発者のモジュールのみを使用しますか",
"doctheme": "絵文字のテーマ。", "doctheme": "絵文字のテーマ。",
@@ -631,13 +643,13 @@ class FHeta(loader.Module):
loader.ConfigValue( loader.ConfigValue(
"only_official_developers", "only_official_developers",
False, False,
lambda: self.strings("docdevs"), lambda: self.strings["docdevs"],
validator=loader.validators.Boolean() validator=loader.validators.Boolean()
), ),
loader.ConfigValue( loader.ConfigValue(
"theme", "theme",
"default", "default",
lambda: self.strings("doctheme"), lambda: self.strings["doctheme"],
validator=loader.validators.Choice(["default", "winter", "summer", "spring", "autumn"]) validator=loader.validators.Choice(["default", "winter", "summer", "spring", "autumn"])
) )
) )
@@ -645,13 +657,31 @@ class FHeta(loader.Module):
async def on_unload(self) -> None: async def on_unload(self) -> None:
if hasattr(self, "api") and self.api.session and not self.api.session.closed: if hasattr(self, "api") and self.api.session and not self.api.session.closed:
await self.api.session.close() await self.api.session.close()
@property
def _inline_mgr(self):
if hasattr(self, "_raw_inline_cache") and self._raw_inline_cache:
return self._raw_inline_cache
am_attr = "seludomlla"[::-1]
allmodules = getattr(self, am_attr, None)
if allmodules:
for cmd in getattr(allmodules, "commands", {}).values():
mod = getattr(cmd, "__self__", None)
if mod and getattr(mod, "__origin__", "").startswith("<core"):
real_allmodules = getattr(mod, am_attr, None)
if real_allmodules:
self._raw_inline_cache = getattr(real_allmodules, "inline", None)
if self._raw_inline_cache:
return self._raw_inline_cache
return self._raw_inline_cache
async def client_ready(self, client: 'telethon.TelegramClient', database: 'loader.Database') -> None: async def client_ready(self, client: 'telethon.TelegramClient', database: 'loader.Database') -> None:
try: await client(UnblockRequest("@FHeta_robot"))
await client(UnblockRequest("@FHeta_robot")) await utils.dnd(client, "@FHeta_robot", archive=True)
await utils.dnd(client, "@FHeta_robot", archive=True)
except Exception:
pass
self.identifier = (await client.get_me()).id self.identifier = (await client.get_me()).id
self.token = database.get("FHeta", "token") self.token = database.get("FHeta", "token")
@@ -662,93 +692,99 @@ class FHeta(loader.Module):
await self.request_join( await self.request_join(
"NFHeta_Updates", "NFHeta_Updates",
f"{self.ui.emoji('channel')} {self.strings('channel')}" f"{self.ui.emoji('channel')} {self.strings['channel']}"
) )
self.api.token = self.token self.api.token = self.token
self._is_telethon = hasattr(self._inline_mgr, "_bot_client")
router = None
try:
frame = sys._getframe()
while frame:
if 'self' in frame.f_locals and type(frame.f_locals['self']).__name__ == "Modules":
router = getattr(frame.f_locals['self'], "inline", None)
if router:
break
frame = frame.f_back
except Exception:
pass
router = router or self.inline if self._is_telethon:
dispatcher = getattr(router, "_dp", getattr(router, "dp", getattr(router, "router", None))) if hasattr(self._inline_mgr, "register_bot_update_handler"):
self.bot = getattr(router, "_bot", getattr(router, "bot", getattr(self.inline, "bot", None))) async def telethon_chosen_handler(event: Any) -> None:
if isinstance(event, telethon.tl.types.UpdateBotInlineSend):
if dispatcher: if event.id.startswith("fh_"):
if not getattr(dispatcher, "_fpatched", False): class MockCallback:
result_id = event.id
inline_message_id = event.msg_id
await self.click(MockCallback())
self._inline_mgr.register_bot_update_handler("fheta_chosen", "chosen_inline_result", telethon_chosen_handler)
else:
bot_client = self._inline_mgr._bot_client
if not hasattr(bot_client, "_fpatched"):
@bot_client.on(telethon.events.Raw)
async def telethon_raw_handler(event: Any) -> None:
if isinstance(event, telethon.tl.types.UpdateBotInlineSend):
if event.id.startswith("fh_"):
class MockCallback:
result_id = event.id
inline_message_id = event.msg_id
await self.lookup("FHeta").click(MockCallback())
bot_client._fpatched = True
elif hasattr(self._inline_mgr, "_dp"):
dispatcher = self._inline_mgr._dp
if not hasattr(dispatcher, "_fpatched"):
async def fmiddleware(handler: Any, event: Any, data: Any) -> Any: async def fmiddleware(handler: Any, event: Any, data: Any) -> Any:
try: module = self.lookup("FHeta")
module = self.lookup("FHeta") if module and event.result_id.startswith("fh_"):
await module.click(event)
if module and getattr(event, "result_id", "").startswith("fh_"): return None
await module.click(event)
return None
except Exception:
pass
return await handler(event, data) return await handler(event, data)
try: dispatcher.chosen_inline_result.middleware(fmiddleware)
dispatcher.chosen_inline_result.middleware(fmiddleware) dispatcher._fpatched = True
dispatcher._fpatched = True
except Exception:
pass
if self.token and not await self.api.fetch("validatetkn", user_id=str(self.identifier)): if self.token and not await self.api.fetch("validatetkn", user_id=str(self.identifier)):
self.token = None self.token = None
self.api.token = None self.api.token = None
if not self.token: if not self.token:
try: async with client.conversation("@FHeta_robot") as conversation:
async with client.conversation("@FHeta_robot") as conversation: await conversation.send_message('/token')
await conversation.send_message('/token') self.token = (await conversation.get_response(timeout=5)).text.strip()
self.token = (await conversation.get_response(timeout=5)).text.strip() database.set("FHeta", "token", self.token)
database.set("FHeta", "token", self.token) self.api.token = self.token
self.api.token = self.token
except Exception:
pass
asyncio.create_task(self.sync()) asyncio.create_task(self.sync())
async def sync(self): async def sync(self):
ll = None ll = None
while True: while True:
try: cl = self.strings["lang"]
cl = self.strings["lang"] if cl != ll:
if cl != ll: await self.api.send("dataset", user_id=self.identifier, lang=cl)
await self.api.send("dataset", user_id=self.identifier, lang=cl) ll = cl
ll = cl
except Exception:
pass
await asyncio.sleep(1) await asyncio.sleep(1)
async def answer(self, callback: Union[CallbackQuery, ChosenInlineResult], text: Optional[str] = None, alert: bool = False) -> None: async def answer(self, callback: Any, text: Optional[str] = None, alert: bool = False) -> None:
try: if not hasattr(callback, "answer"):
if text: return
await callback.answer(text, show_alert=alert) await callback.answer(text=text or "", show_alert=alert)
else:
await callback.answer()
except Exception:
pass
async def edit(self, target: Union[str, ChosenInlineResult, CallbackQuery, Message, 'telethon.types.Message'], text: str, buttons: List[List[Dict[str, Any]]], banner: Optional[str] = None) -> None: async def edit(self, target: Any, text: str, buttons: List[List[Dict[str, Any]]], banner: Optional[str] = None) -> None:
try: markup = self._inline_mgr.generate_markup(buttons)
options = LinkPreviewOptions(url=banner, show_above_text=True, prefer_large_media=True) if banner else LinkPreviewOptions(is_disabled=True)
markup = self.inline.generate_markup(buttons) if self._is_telethon:
if banner and banner not in text:
text = f'<a href="{banner}">&#8204;</a>' + text
if not self.bot: bot_client = self._inline_mgr._bot_client
return
inline_msg_id = target.inline_message_id if hasattr(target, "inline_message_id") else None
await bot_client.edit_message(
inline_msg_id or target.chat_id,
None if inline_msg_id else target.message_id,
text,
parse_mode="HTML",
buttons=markup,
link_preview=banner is not None,
invert_media=True
)
elif InlineQueryResultArticle is not Any:
options = LinkPreviewOptions(url=banner, show_above_text=True, prefer_large_media=True) if banner else LinkPreviewOptions(is_disabled=True)
arguments = { arguments = {
"text": text, "text": text,
"reply_markup": markup, "reply_markup": markup,
@@ -756,64 +792,53 @@ class FHeta(loader.Module):
"parse_mode": "HTML" "parse_mode": "HTML"
} }
inline = target if isinstance(target, str) else getattr(target, "inline_message_id", None) if hasattr(target, "inline_message_id") and target.inline_message_id:
arguments["inline_message_id"] = target.inline_message_id
if inline:
arguments["inline_message_id"] = inline
else: else:
message = getattr(target, "message", target) arguments["chat_id"] = target.message.chat.id
chat = getattr(getattr(message, "chat", message), "id", getattr(message, "chat_id", None)) arguments["message_id"] = target.message.message_id
identifier = getattr(message, "message_id", getattr(message, "id", None))
if chat and identifier:
arguments["chat_id"] = chat
arguments["message_id"] = identifier
else:
return
await self.bot.edit_message_text(**arguments) await self._inline_mgr.bot.edit_message_text(**arguments)
except Exception:
pass
async def click(self, callback: ChosenInlineResult) -> None: async def click(self, callback: Any) -> None:
try: result_id = callback.result_id
if not getattr(callback, "result_id", "").startswith("fh_"): if not result_id.startswith("fh_"):
return return
parts = callback.result_id.split("_")
if len(parts) != 3:
return
queryid = parts[1]
index = int(parts[2])
cache = getattr(self.inline, "fheta_cache", {}) parts = result_id.split("_")
saved = cache.get(queryid, {}) if len(parts) != 3:
query = saved.get("query", "") return
modules = saved.get("mods", [])
if not modules or index >= len(modules): queryid = parts[1]
return index = int(parts[2])
data = modules[index] if not hasattr(self._inline_mgr, "fheta_cache"):
text = self.ui.format(data, query, index+1, len(modules), True) return
buttons = self.ui.buttons(data.get("install", ""), data, index, None, query)
await self.edit(callback, text, buttons, data.get("banner")) saved = self._inline_mgr.fheta_cache.get(queryid, {})
except Exception: query = saved.get("query", "")
pass modules = saved.get("mods", [])
if not modules or index >= len(modules):
return
data = modules[index]
text = self.ui.format(data, query, index+1, len(modules), True)
buttons = self.ui.buttons(data.get("install", ""), data, index, None, query)
await self.edit(callback, text, buttons, data.get("banner"))
async def show(self, callback: Union[CallbackQuery, ChosenInlineResult], index: int, modules: List[Dict[str, Any]], query: str) -> None: async def show(self, callback: Any, index: int, modules: List[Dict[str, Any]], query: str) -> None:
await self.answer(callback) await self.answer(callback)
text = f"{self.ui.emoji('modules_list')} <b>{self.strings['list']}</b>" text = f"{self.ui.emoji('modules_list')} <b>{self.strings['list']}</b>"
await self.edit(callback, text, self.ui.pagination(modules, query, 0, index)) await self.edit(callback, text, self.ui.pagination(modules, query, 0, index))
async def page(self, callback: Union[CallbackQuery, ChosenInlineResult], current: int, modules: List[Dict[str, Any]], query: str, index: int) -> None: async def page(self, callback: Any, current: int, modules: List[Dict[str, Any]], query: str, index: int) -> None:
await self.answer(callback) await self.answer(callback)
text = f"{self.ui.emoji('modules_list')} <b>{self.strings['list']}</b>" text = f"{self.ui.emoji('modules_list')} <b>{self.strings['list']}</b>"
await self.edit(callback, text, self.ui.pagination(modules, query, current, index)) await self.edit(callback, text, self.ui.pagination(modules, query, current, index))
async def navigate(self, callback: Union[CallbackQuery, ChosenInlineResult], index: int, modules: List[Dict[str, Any]], query: str = "") -> None: async def navigate(self, callback: Any, index: int, modules: List[Dict[str, Any]], query: str = "") -> None:
await self.answer(callback) await self.answer(callback)
if 0 <= index < len(modules): if 0 <= index < len(modules):
data = modules[index] data = modules[index]
@@ -821,7 +846,7 @@ class FHeta(loader.Module):
buttons = self.ui.buttons(data.get('install', ''), data, index, modules, query) buttons = self.ui.buttons(data.get('install', ''), data, index, modules, query)
await self.edit(callback, text, buttons, data.get("banner")) await self.edit(callback, text, buttons, data.get("banner"))
async def rate(self, callback: Union[CallbackQuery, ChosenInlineResult, Message, 'telethon.types.Message'], link: str, action: str, index: int, modules: Optional[List[Dict[str, Any]]], query: str = "") -> None: async def rate(self, callback: Any, link: str, action: str, index: int, modules: Optional[List[Dict[str, Any]]], query: str = "") -> None:
response = await self.api.send(f"rate/{self.identifier}/{link}/{action}") response = await self.api.send(f"rate/{self.identifier}/{link}/{action}")
request = await self.api.send("get", payload=[unquote(link)]) request = await self.api.send("get", payload=[unquote(link)])
@@ -830,10 +855,7 @@ class FHeta(loader.Module):
if modules and index < len(modules): if modules and index < len(modules):
modules[index].update(stats) modules[index].update(stats)
try: await self.edit(callback, self.ui.format(modules[index], query, index + 1, len(modules)), self.ui.buttons(link, stats, index, modules, query), modules[index].get("banner"))
await callback.edit(reply_markup=self.ui.buttons(link, stats, index, modules, query))
except Exception:
pass
if response and response.get("status"): if response and response.get("status"):
status = response.get("status") status = response.get("status")
@@ -847,21 +869,18 @@ class FHeta(loader.Module):
text = "" text = ""
await self.answer(callback, text, True) await self.answer(callback, text, True)
async def install(self, callback: Union[CallbackQuery, ChosenInlineResult], link: str, index: int, modules: Optional[List[Dict[str, Any]]], query: str = "") -> None: async def install(self, callback: Any, link: str, index: int, modules: Optional[List[Dict[str, Any]]], query: str = "") -> None:
state, dependencies = await self.installer.execute(self.lookup("loader"), link) state, dependencies = await self.installer.execute(self.lookup("loader"), link)
try: if state == "success":
if state == "success": await self.answer(callback, self.strings["success"], True)
await self.answer(callback, self.strings["success"], True) elif state == "dependency":
elif state == "dependency": formatted = f"({','.join(dependencies[:5])})" if dependencies else ""
formatted = f"({','.join(dependencies[:5])})" if dependencies else "" await self.answer(callback, self.strings["dependency"].format(deps=formatted), True)
await self.answer(callback, self.strings["dependency"].format(deps=formatted), True) elif state == "overwrite":
elif state == "overwrite": await self.answer(callback, self.strings["overwrite"], True)
await self.answer(callback, self.strings["overwrite"], True) else:
else: await self.answer(callback, self.strings["error"], True)
await self.answer(callback, self.strings["error"], True)
except Exception:
pass
@loader.inline_handler( @loader.inline_handler(
ru_doc="(запрос) - поиск модулей.", ru_doc="(запрос) - поиск модулей.",
@@ -880,7 +899,7 @@ class FHeta(loader.Module):
return { return {
"title": self.strings["prompt"], "title": self.strings["prompt"],
"description": self.strings["hint"], "description": self.strings["hint"],
"message": f"{self.ui.emoji('error')} <b>{self.strings['noquery'].format(prefix=f'<code>@{self.inline.bot_username} ')}</code></b>", "message": f"{self.ui.emoji('error')} <b>{self.strings['noquery'].format(prefix=f'<code>@{self._inline_mgr.bot_username} ')}</code></b>",
"thumb": "https://raw.githubusercontent.com/Fixyres/FModules/refs/heads/main/assets/FHeta/magnifying_glass.png" "thumb": "https://raw.githubusercontent.com/Fixyres/FModules/refs/heads/main/assets/FHeta/magnifying_glass.png"
} }
@@ -903,13 +922,13 @@ class FHeta(loader.Module):
} }
queryid = str(uuid.uuid4())[:8] queryid = str(uuid.uuid4())[:8]
if not hasattr(self.inline, "fheta_cache"): if not hasattr(self._inline_mgr, "fheta_cache"):
self.inline.fheta_cache = {} self._inline_mgr.fheta_cache = {}
if len(self.inline.fheta_cache) >= 50: if len(self._inline_mgr.fheta_cache) >= 50:
self.inline.fheta_cache.pop(next(iter(self.inline.fheta_cache))) self._inline_mgr.fheta_cache.pop(next(iter(self._inline_mgr.fheta_cache)))
self.inline.fheta_cache[queryid] = {"query": query, "mods": modules} self._inline_mgr.fheta_cache[queryid] = {"query": query, "mods": modules}
results = [] results = []
@@ -918,22 +937,38 @@ class FHeta(loader.Module):
if isinstance(description, dict): if isinstance(description, dict):
description = description.get(self.strings["lang"]) or description.get("doc") or next(iter(description.values()), "") description = description.get(self.strings["lang"]) or description.get("doc") or next(iter(description.values()), "")
markup = None markup = self._inline_mgr.generate_markup(self.ui.buttons(data.get("install", ""), data, index, None, query))
try:
markup = self.inline.generate_markup(self.ui.buttons(data.get("install", ""), data, index, None, query))
except Exception:
pass
results.append(InlineQueryResultArticle( if self._is_telethon:
id=f"fh_{queryid}_{index}", thumb_url = data.get("pic") or "https://raw.githubusercontent.com/Fixyres/FModules/refs/heads/main/assets/FHeta/empty_pic.png"
title=utils.escape_html(data.get("name", "")), thumb = self._inline_mgr._web_document(thumb_url)
description=utils.escape_html(str(description)[:250] + ("..." if len(str(description)) > 250 else "")),
thumbnail_url=data.get("pic") or "https://raw.githubusercontent.com/Fixyres/FModules/refs/heads/main/assets/FHeta/empty_pic.png", results.append(
input_message_content=InputTextMessageContent(message_text="", parse_mode="HTML"), await event.builder.article(
reply_markup=markup id=f"fh_{queryid}_{index}",
)) title=utils.escape_html(data.get("name", "")),
description=utils.escape_html(str(description)[:250] + ("..." if len(str(description)) > 250 else "")),
thumb=thumb,
text="",
parse_mode="HTML",
buttons=markup,
link_preview=False
)
)
elif InlineQueryResultArticle is not Any:
results.append(InlineQueryResultArticle(
id=f"fh_{queryid}_{index}",
title=utils.escape_html(data.get("name", "")),
description=utils.escape_html(str(description)[:250] + ("..." if len(str(description)) > 250 else "")),
thumbnail_url=data.get("pic") or "https://raw.githubusercontent.com/Fixyres/FModules/refs/heads/main/assets/FHeta/empty_pic.png",
input_message_content=InputTextMessageContent(message_text="", parse_mode="HTML"),
reply_markup=markup
))
await event.inline_query.answer(results, cache_time=0) if self._is_telethon:
await event.answer(results, cache_time=0)
elif InlineQueryResultArticle is not Any:
await event.inline_query.answer(results, cache_time=0)
@loader.command( @loader.command(
ru_doc="(запрос) - поиск модулей.", ru_doc="(запрос) - поиск модулей.",
@@ -963,7 +998,7 @@ class FHeta(loader.Module):
data = modules[0] data = modules[0]
buttons = self.ui.buttons(data.get("install", ""), data, 0, modules, query) buttons = self.ui.buttons(data.get("install", ""), data, 0, modules, query)
form = await self.inline.form("", message, reply_markup=buttons, silent=True) form = await self._inline_mgr.form("", message, reply_markup=buttons, silent=True)
text = self.ui.format(data, query, 1, len(modules)) text = self.ui.format(data, query, 1, len(modules))
await self.edit(form, text, buttons, data.get("banner")) await self.edit(form, text, buttons, data.get("banner"))
@@ -975,20 +1010,17 @@ class FHeta(loader.Module):
if not url.startswith("https://api.fixyres.com/module/"): if not url.startswith("https://api.fixyres.com/module/"):
return return
try: state, dependencies = await self.installer.execute(self.lookup("loader"), url)
state, dependencies = await self.installer.execute(self.lookup("loader"), url)
if state == "success":
reply = await message.respond("")
elif state == "dependency":
reply = await message.respond(f"📋{','.join(dependencies[:5])}" if dependencies else "📋")
elif state == "overwrite":
reply = await message.respond("😨")
else:
reply = await message.respond("")
if state == "success": await asyncio.sleep(1)
reply = await message.respond("") await reply.delete()
elif state == "dependency": await message.delete()
reply = await message.respond(f"📋{','.join(dependencies[:5])}" if dependencies else "📋")
elif state == "overwrite":
reply = await message.respond("😨")
else:
reply = await message.respond("")
await asyncio.sleep(1)
await reply.delete()
await message.delete()
except Exception:
pass

View File

@@ -112,13 +112,13 @@ class FSecurity(loader.Module):
loader.ConfigValue( loader.ConfigValue(
"strict_mode", "strict_mode",
False, False,
lambda: self.strings("strict_mode_doc"), lambda: self.strings["strict_mode_doc"],
validator=loader.validators.Boolean(), validator=loader.validators.Boolean(),
), ),
loader.ConfigValue( loader.ConfigValue(
"nvidia_api_key", "nvidia_api_key",
"", "",
lambda: self.strings("nvidia_api_key_doc"), lambda: self.strings["nvidia_api_key_doc"],
validator=loader.validators.Hidden(), validator=loader.validators.Hidden(),
) )
) )
@@ -386,10 +386,10 @@ class FSecurity(loader.Module):
def format(self, state, reason="", link=""): def format(self, state, reason="", link=""):
link_part = f' (<code>{utils.escape_html(link)}</code>)' if link else "" link_part = f' (<code>{utils.escape_html(link)}</code>)' if link else ""
if state == "unavailable": if state == "unavailable":
return f'<b>{self.strings("unavailable").format(link_part)}</b>\n<b>{self.strings("continue")}</b>' return f'<b>{self.strings["unavailable"].format(link_part)}</b>\n<b>{self.strings["continue"]}</b>'
if state == "suspicious": if state == "suspicious":
return f'<b>{self.strings("suspicious").format(link_part)}</b>\n<blockquote expandable><b>{reason}</b></blockquote>\n<b>{self.strings("continue")}</b>' return f'<b>{self.strings["suspicious"].format(link_part)}</b>\n<blockquote expandable><b>{reason}</b></blockquote>\n<b>{self.strings["continue"]}</b>'
return f'<b>{self.strings("blocked").format(link_part)}</b>\n<blockquote expandable><b>{reason}</b></blockquote>' return f'<b>{self.strings["blocked"].format(link_part)}</b>\n<blockquote expandable><b>{reason}</b></blockquote>'
def buttons(self, task): def buttons(self, task):
return [[ return [[

View File

@@ -8,6 +8,7 @@ __version__ = (1, 0, 0)
# meta banner: https://raw.githubusercontent.com/Fixyres/FModules/refs/heads/main/assets/SCD/banner.png # meta banner: https://raw.githubusercontent.com/Fixyres/FModules/refs/heads/main/assets/SCD/banner.png
# meta developer: @NFModules # meta developer: @NFModules
# meta fhsdesc: SoundCloud, Music, Music downloader, Downloader
# requires: curl_cffi # requires: curl_cffi
@@ -105,15 +106,15 @@ class SCD(loader.Module):
'''(link) - download a song from SoundCloud.''' '''(link) - download a song from SoundCloud.'''
args = utils.get_args_raw(message) args = utils.get_args_raw(message)
if not args: if not args:
await utils.answer(message, self.strings("no_args").format(prefix=self.get_prefix())) await utils.answer(message, self.strings["no_args"].format(prefix=self.get_prefix()))
return return
m = re.search(r"(https?://(?:[a-zA-Z0-9-]+\.)?soundcloud\.com/[^\s]+)", args) m = re.search(r"(https?://(?:[a-zA-Z0-9-]+\.)?soundcloud\.com/[^\s]+)", args)
if not m: if not m:
await utils.answer(message, self.strings("not_found")) await utils.answer(message, self.strings["not_found"])
return return
msg = await utils.answer(message, self.strings("downloading")) msg = await utils.answer(message, self.strings["downloading"])
try: try:
async with requests.AsyncSession(impersonate="chrome120") as ses: async with requests.AsyncSession(impersonate="chrome120") as ses:
@@ -194,4 +195,4 @@ class SCD(loader.Module):
await msg.delete() await msg.delete()
except: except:
await utils.answer(msg, self.strings("not_found")) await utils.answer(msg, self.strings["not_found"])

View File

@@ -308,7 +308,7 @@ class Akinator(loader.Module):
loader.ConfigValue( loader.ConfigValue(
"child_mode", "child_mode",
False, False,
lambda: self.strings("child_mode"), lambda: self.strings["child_mode"],
validator=loader.validators.Boolean() validator=loader.validators.Boolean()
) )
) )
@@ -339,17 +339,17 @@ class Akinator(loader.Module):
async def akinator(self, message): async def akinator(self, message):
'''- start the game.''' '''- start the game.'''
try: try:
aki = AsyncAki(self.strings("lang"), self.config["child_mode"]) aki = AsyncAki(self.strings["lang"], self.config["child_mode"])
await aki.start() await aki.start()
self.games.setdefault(message.chat_id, {})[message.id] = aki self.games.setdefault(message.chat_id, {})[message.id] = aki
await self.inline.form( await self.inline.form(
text=self.strings("text"), text=self.strings["text"],
message=message, message=message,
photo="https://raw.githubusercontent.com/Fixyres/FModules/refs/heads/main/assets/akinator/banner.png", photo="https://raw.githubusercontent.com/Fixyres/FModules/refs/heads/main/assets/akinator/banner.png",
reply_markup={ reply_markup={
"text": self.strings("start"), "text": self.strings["start"],
"callback": self._cb, "callback": self._cb,
"args": (message,) "args": (message,)
} }
@@ -369,12 +369,12 @@ class Akinator(loader.Module):
question = aki.q question = aki.q
markup = [[ markup = [[
{"text": self.strings("yes"), "callback": self._ans, "args": (0, message)}, {"text": self.strings["yes"], "callback": self._ans, "args": (0, message)},
{"text": self.strings("no"), "callback": self._ans, "args": (1, message)}, {"text": self.strings["no"], "callback": self._ans, "args": (1, message)},
{"text": self.strings("idk"), "callback": self._ans, "args": (2, message)} {"text": self.strings["idk"], "callback": self._ans, "args": (2, message)}
],[ ],[
{"text": self.strings("probably"), "callback": self._ans, "args": (3, message)}, {"text": self.strings["probably"], "callback": self._ans, "args": (3, message)},
{"text": self.strings("probably_not"), "callback": self._ans, "args": (4, message)} {"text": self.strings["probably_not"], "callback": self._ans, "args": (4, message)}
] ]
] ]
@@ -393,13 +393,13 @@ class Akinator(loader.Module):
desc = aki.desc desc = aki.desc
if desc: if desc:
text = self.strings("this_is").format(name=name, description=desc) text = self.strings["this_is"].format(name=name, description=desc)
else: else:
text = self.strings("this_is_no_desc").format(name=name) text = self.strings["this_is_no_desc"].format(name=name)
markup = [[ markup = [[
{"text": self.strings("yes"), "callback": self._fin, "args": (True, message, text, aki.photo)}, {"text": self.strings["yes"], "callback": self._fin, "args": (True, message, text, aki.photo)},
{"text": self.strings("not_right"), "callback": self._rej, "args": (message,)} {"text": self.strings["not_right"], "callback": self._rej, "args": (message,)}
] ]
] ]
@@ -450,7 +450,7 @@ class Akinator(loader.Module):
await call.edit(text, photo=photo, reply_markup=[]) await call.edit(text, photo=photo, reply_markup=[])
else: else:
await call.edit( await call.edit(
self.strings("failed"), self.strings["failed"],
photo="https://raw.githubusercontent.com/Fixyres/FModules/refs/heads/main/assets/akinator/idk.png", photo="https://raw.githubusercontent.com/Fixyres/FModules/refs/heads/main/assets/akinator/idk.png",
reply_markup=[] reply_markup=[]
) )

View File

@@ -2,3 +2,4 @@ akinator
FHeta FHeta
BSR BSR
SCD SCD
LFSecurity

View File

@@ -37,7 +37,7 @@ from .. import utils, loader
from ..types import BotInlineCall, InlineCall from ..types import BotInlineCall, InlineCall
logger = logging.getLogger("Limoka") logger = logging.getLogger("Limoka")
__version__ = (1, 5, 4) __version__ = (1, 5, 5)
def _parse_version_from_source(source: str): def _parse_version_from_source(source: str):
@@ -846,29 +846,83 @@ class Limoka(loader.Module):
logger.error(f"Skipping unsafe rmtree for {folder}") logger.error(f"Skipping unsafe rmtree for {folder}")
async def _validate_url(self, url: str) -> Optional[str]: async def _validate_url(self, url: str) -> Optional[str]:
if not url or url in self._invalid_banners: logger.debug(f"_validate_url called with: {url}")
if not url:
logger.warning("_validate_url: URL is empty, returning None")
return None return None
if url in self._invalid_banners:
logger.debug(f"_validate_url: URL already in invalid_banners: {url}, returning None")
return None
# Headers to mimic a browser request
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
try: try:
logger.debug(f"_validate_url: Starting validation for {url}")
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
async with session.head( ct = None
url, timeout=5, allow_redirects=True response_status = None
) as response:
if response.status != 200: # Try HEAD first (more efficient)
self._invalid_banners.add(url) try:
return None logger.debug(f"_validate_url: Attempting HEAD request for {url}")
ct = response.headers.get("Content-Type", "").lower() async with session.head(
mime = None url, timeout=5, allow_redirects=True, headers=headers
if ct.startswith("image/"): ) as response:
return url response_status = response.status
if not ct: # Some servers don't respond to HEAD requests with Content-Type, so instead we will try guess mime from content logger.debug(f"_validate_url: HEAD request returned status {response.status} for {url}")
async with session.get(url, timeout=5) as get_response: if response.status == 200:
data = await get_response.read(2048) ct = response.headers.get("Content-Type", "").lower()
mime = filetype.guess_mime(data, mime=True) logger.debug(f"_validate_url: Content-Type from HEAD: '{ct}' for {url}")
if mime and mime.startswith("image/"): except (aiohttp.ClientError, asyncio.TimeoutError) as head_error:
return url logger.debug(f"_validate_url: HEAD failed ({type(head_error).__name__}), will try GET for {url}")
# If HEAD didn't work or returned non-200, try GET
if ct is None:
max_retries = 2
for attempt in range(max_retries):
try:
async with session.get(
url, timeout=10, headers=headers, allow_redirects=True
) as response:
if response.status != 200:
self._invalid_banners.add(url)
return None
ct = response.headers.get("Content-Type", "").lower()
# Try to get MIME if Content-Type is missing
if not ct:
try:
data = await response.content.read(2048)
mime = filetype.guess_mime(data, mime=True)
if mime and mime.startswith("image/"):
return url
else:
self._invalid_banners.add(url)
return None
except Exception as mime_error:
logger.error(f"_validate_url: Error reading content for MIME detection: {mime_error}")
break # Success, exit retry loop
except (aiohttp.ClientError, asyncio.TimeoutError) as get_error:
if attempt < max_retries - 1:
await asyncio.sleep(1) # Wait before retry
else:
self._invalid_banners.add(url)
return None
# Check Content-Type from successful request
if ct and ct.startswith("image/"):
return url
elif ct:
self._invalid_banners.add(url) self._invalid_banners.add(url)
return None return None
except Exception: else:
self._invalid_banners.add(url)
return None
except Exception as e:
if url: if url:
self._invalid_banners.add(url) self._invalid_banners.add(url)
return None return None

View File

@@ -0,0 +1,157 @@
# Midga3
# I AM NOT AFFICIATED WITH WORDLE
# meta developer: @midga3_modules
import requests
import random
import logging
from .. import loader, utils
from herokutl.tl.types import Message
__verison__ = (0, 1, 1)
logger = logging.getLogger(__name__)
@loader.tds
class wordle(loader.Module):
"""Wordle!"""
strings = {
"name": "Wordle",
"loading": "Loading...",
"language": "Language of the wordle",
"have_a_good_game": "Have a good game! Try to guess the 5 letter word in {}",
"attempts_left": "WRONG! Attempts left: {}",
"gg": "GG! YOU DIDN'T GUESS THE WORD {}",
"win": "GG! YOU WON! THE WORD WAS {}",
"already_playing": "ALREADY PLAYING! type .stopwordle to stop the current game",
"length": "Must be 5 letters",
"no_game": "No game is currently running",
"ok": "Game stopped",
"ad": "I tried to Guess word {}. Check out my result:\n{}",
"real_word": "This word is not in the word list"
}
strings_ru ={
"name": "Wordle",
"loading": "Загрузка...",
"language": "Язык вордла",
"have_a_good_game": "Хорошей игры! Попытайтесь угадать слово из 5 букв на {}",
"attempts_left": "НВЕВЕРНО! Осталось попыток: {}",
"gg": "ГГ! ВЫ НЕ УГАДАЛИ СЛОВО {}",
"win": "ГГ! ВЫ ВЫИГРАЛИ! СЛОВО БЫЛО {}",
"already_playing": "УЖЕ ИГРАЕТЕ! напишите .stopwordle чтобы остановить текущую игру",
"length": "Должно содержать 5 букв",
"no_game": "Сейчас нет активной игры",
"ok": "Игра остановлена",
"ad": "Я попытался угадать слово {}. Чекайте мой результат:\n{}",
"real_word": "Такого слова нет в списке слов"
}
def __init__(self):
self.config = loader.ModuleConfig(
loader.ConfigValue(
"language",
"en",
self.strings["language"],
validator=loader.validators.Choice(["en", "ru"])
),
)
async def handler(self, call, data):
guess = data.upper()
word = self._db.get("wordle", "word", "")
attempts = self._db.get("wordle", "attempts", 0)
buttons = self._db.get("wordle", "buttons", [])
markup = buttons + [[{"text":"Введите слово","input":self.strings("length"),"handler": self.handler}]]
if len(guess) != 5:
await call.edit(self.strings("length"), reply_markup=markup)
return
if guess not in self._db.get("wordle", "words", []):
await call.edit(self.strings("real_word"), reply_markup=markup)
return
buttons2 = []
for i in range(5):
if guess[i] == word[i]:
buttons2.append({"text": guess[i], "data": "custom/data", "style": "success"})
elif guess[i] in word:
buttons2.append({"text": guess[i], "data": "custom/data", "style": "primary"})
else:
buttons2.append({"text": guess[i], "data": "custom/data", "style": "danger"})
buttons.append(buttons2)
if guess == word:
self._db.set("wordle", "buttons", buttons)
self._db.set("wordle", "now_playing", False)
result = ""
for btn in buttons:
for b in btn:
if b["style"] == "success":
result += "🟩"
elif b["style"] == "primary":
result += "🟨"
else:
result += ""
result += "\n"
buttons.append([{"text":"Поделится резултатом","copy":self.strings("ad").format(word, result)}])
await call.edit(f"{self.strings('win').format(word)}", reply_markup=buttons)
return
self._db.set("wordle", "buttons", buttons)
attempts -= 1
self._db.set("wordle", "attempts", attempts)
if attempts == 0:
result = ""
for btn in buttons:
for b in btn:
if b["style"] == "success":
result += "🟩"
elif b["style"] == "primary":
result += "🟨"
else:
result += ""
result += "\n"
buttons.append([{"text":"Поделится резултатом","copy":self.strings("ad").format(word, result)}])
await call.edit(f"{self.strings('gg').format(word)}", reply_markup=buttons)
self._db.set("wordle", "now_playing", False)
else:
await call.edit(f"{self.strings('attempts_left').format(attempts)}", reply_markup=markup)
@loader.command()
async def wordle(self, message: Message):
"""Play wordle!"""
await utils.answer(message, self.strings("loading"))
if self._db.get("wordle", "now_playing", False):
await utils.answer(message, self.strings("already_playing"))
return
args = utils.get_args(message)
if args and args[0].lower():
if args[0].lower() == "--no-sec":
self._db.set("wordle", "nosec", True)
return
else:
self._db.set("wordle", "nosec", False)
try:
response = requests.get(f"https://raw.githubusercontent.com/mimimishka449/Worlde/refs/heads/main/words_{self.config['language']}.txt")
if response.status_code == 200:
words = response.text.splitlines()
word = random.choice(words).upper()
self._db.set("wordle", "now_playing", True)
self._db.set("wordle", "attempts", 6)
self._db.set("wordle", "word", word)
self._db.set("wordle", "words", words)
self._db.set("wordle", "buttons", [])
await self.inline.form(self.strings("have_a_good_game").format("английском" if self.config['language'] == "en" else "русском"), message, reply_markup=[[{"text":"Введите слово","input":self.strings("length"),"handler": self.handler}]])
else:
await utils.answer(message, "Error fetching wordle data.")
except Exception as e:
logger.exception(f"Error: {e}")
await utils.answer(message, "An error occurred while fetching wordle data.")
@loader.command()
async def stopwordle(self, message: Message):
"""Stop the wordle game."""
if not self._db.get("wordle", "now_playing", False):
await utils.answer(message, self.strings("no_game"))
return
self._db.set("wordle", "now_playing", False)
await utils.answer(message, self.strings("ok"))

View File

@@ -1,150 +1,122 @@
#meta developer: @matubuntu # meta developer: @matubuntu
import requests, bs4
from datetime import datetime
from .. import loader, utils
import lxml
# requires: lxml requests bs4 import time
from datetime import datetime
import aiohttp
from .. import loader, utils
_FLAGS = { _FLAGS = {
"AUD": "🇦🇺", "AUD": "🇦🇺", "AZN": "🇦🇿", "GBP": "🇬🇧", "AMD": "🇦🇲",
"AZN": "🇦🇿", "BYN": "🇧🇾", "BGN": "🇧🇬", "BRL": "🇧🇷", "HUF": "🇭🇺",
"GBP": "🇬🇧", "VND": "🇻🇳", "HKD": "🇭🇰", "GEL": "🇬🇪", "DKK": "🇩🇰",
"AMD": "🇦🇲", "AED": "🇦🇪", "USD": "🇺🇸", "EUR": "🇪🇺", "EGP": "🇪🇬",
"BYN": "🇧🇾", "INR": "🇮🇳", "IDR": "🇮🇩", "KZT": "🇰🇿", "CAD": "🇨🇦",
"BGN": "🇧🇬", "QAR": "🇶🇦", "KGS": "🇰🇬", "CNY": "🇨🇳", "MDL": "🇲🇩",
"BRL": "🇧🇷", "NZD": "🇳🇿", "NOK": "🇳🇴", "PLN": "🇵🇱", "RON": "🇷🇴",
"HUF": "🇭🇺", "SGD": "🇸🇬", "TJS": "🇹🇯", "THB": "🇹🇭", "TRY": "🇹🇷",
"VND": "🇻🇳", "TMT": "🇹🇲", "UZS": "🇺🇿", "UAH": "🇺🇦", "CZK": "🇨🇿",
"HKD": "🇭🇰", "SEK": "🇸🇪", "CHF": "🇨🇭", "RSD": "🇷🇸", "ZAR": "🇿🇦",
"GEL": "🇬🇪", "KRW": "🇰🇷", "JPY": "🇯🇵",
"DKK": "🇩🇰",
"AED": "🇦🇪",
"USD": "🇺🇸",
"EUR": "🇪🇺",
"EGP": "🇪🇬",
"INR": "🇮🇳",
"IDR": "🇮🇩",
"KZT": "🇰🇿",
"CAD": "🇨🇦",
"QAR": "🇶🇦",
"KGS": "🇰🇬",
"CNY": "🇨🇳",
"MDL": "🇲🇩",
"NZD": "🇳🇿",
"NOK": "🇳🇴",
"PLN": "🇵🇱",
"RON": "🇷🇴",
"SGD": "🇸🇬",
"TJS": "🇹🇯",
"THB": "🇹🇭",
"TRY": "🇹🇷",
"TMT": "🇹🇲",
"UZS": "🇺🇿",
"UAH": "🇺🇦",
"CZK": "🇨🇿",
"SEK": "🇸🇪",
"CHF": "🇨🇭",
"RSD": "🇷🇸",
"ZAR": "🇿🇦",
"KRW": "🇰🇷",
"JPY": "🇯🇵",
} }
_CRYPTO_EMOJIS = { _CRYPTO_EMOJIS = {
"BTC": "<emoji document_id=5289519973285257969>💰</emoji>", "BTC": "<emoji document_id=5289519973285257969>💰</emoji>",
"ETH": "<emoji document_id=5287735049301550386>💰</emoji>", "ETH": "<emoji document_id=5287735049301550386>💰</emoji>",
"SOL": "<emoji document_id=5251712673258697260>💰</emoji>", "SOL": "<emoji document_id=5251712673258697260>💰</emoji>",
"TON": "<emoji document_id=5289648693455119919>💰</emoji>", "TON": "<emoji document_id=5289648693455119919>💰</emoji>",
"USDT": "<emoji document_id=5289904548951911168>💰</emoji>", "USDT": "<emoji document_id=5289904548951911168>💰</emoji>",
"XRP": "<emoji document_id=5373312921214401986>💰</emoji>", "XRP": "<emoji document_id=5373312921214401986>💰</emoji>",
"USDC": "<emoji document_id=5372958453268497353>💰</emoji>", "USDC": "<emoji document_id=5372958453268497353>💰</emoji>",
"ADA": "<emoji document_id=5373076801092338046>💰</emoji>", "ADA": "<emoji document_id=5373076801092338046>💰</emoji>",
"DOGE": "<emoji document_id=5375192042420842380>💰</emoji>", "DOGE": "<emoji document_id=5375192042420842380>💰</emoji>",
"TRX": "<emoji document_id=5375187081733616165>💰</emoji>", "TRX": "<emoji document_id=5375187081733616165>💰</emoji>",
"AVAX": "<emoji document_id=5375311275007947936>💰</emoji>", "AVAX": "<emoji document_id=5375311275007947936>💰</emoji>",
"LTC": "<emoji document_id=5373035462032113888>💰</emoji>", "LTC": "<emoji document_id=5373035462032113888>💰</emoji>",
"BCH": "<emoji document_id=5375596920397903962>💰</emoji>", "BCH": "<emoji document_id=5375596920397903962>💰</emoji>",
"ATOM": "<emoji document_id=5375468745688889977>💰</emoji>", "ATOM": "<emoji document_id=5375468745688889977>💰</emoji>",
"XLM": "<emoji document_id=5372823290647690288>💰</emoji>", "XLM": "<emoji document_id=5372823290647690288>💰</emoji>",
"SHIB": "<emoji document_id=5375231036428924778>💰</emoji>", "SHIB": "<emoji document_id=5375231036428924778>💰</emoji>",
"UNI": "<emoji document_id=5372953110329180525>💰</emoji>", "UNI": "<emoji document_id=5372953110329180525>💰</emoji>",
"XMR": "<emoji document_id=5375507073977038661>💰</emoji>", "XMR": "<emoji document_id=5375507073977038661>💰</emoji>",
"LINK": "<emoji document_id=5375149651093633217>💰</emoji>", "LINK": "<emoji document_id=5375149651093633217>💰</emoji>",
"ETC": "<emoji document_id=5375543306321146693>💰</emoji>", "ETC": "<emoji document_id=5375543306321146693>💰</emoji>",
"SUI": "<emoji document_id=5391002164929772708>💰</emoji>", "SUI": "<emoji document_id=5391002164929772708>💰</emoji>",
"NEAR": "<emoji document_id=5391181990915487346>💰</emoji>", "NEAR": "<emoji document_id=5391181990915487346>💰</emoji>",
"VET": "<emoji document_id=5391091302681033446>💰</emoji>", "VET": "<emoji document_id=5391091302681033446>💰</emoji>",
"FIL": "<emoji document_id=5373117173784919811>💰</emoji>", "FIL": "<emoji document_id=5373117173784919811>💰</emoji>",
"XTZ": "<emoji document_id=5390985478981829698>💰</emoji>", "XTZ": "<emoji document_id=5390985478981829698>💰</emoji>",
"ALGO": "<emoji document_id=5391337713544738420>💰</emoji>", "ALGO": "<emoji document_id=5391337713544738420>💰</emoji>",
"THETA": "<emoji document_id=5391256014676833736>💰</emoji>", "THETA": "<emoji document_id=5391256014676833736>💰</emoji>",
"FTM": "<emoji document_id=5393179395521263785>💰</emoji>", "FTM": "<emoji document_id=5393179395521263785>💰</emoji>",
"XDAI": "<emoji document_id=5391325992578988886>💰</emoji>", "XDAI": "<emoji document_id=5391325992578988886>💰</emoji>",
"RUNE": "<emoji document_id=5391347570494684983>💰</emoji>", "RUNE": "<emoji document_id=5391347570494684983>💰</emoji>",
"DOT": "<emoji document_id=5375224568208177973>💰</emoji>", "DOT": "<emoji document_id=5375224568208177973>💰</emoji>",
} }
_CRYPTO_LIST = { _CRYPTO_NAMES = {
"BTC": "Bitcoin", "BTC": "Bitcoin", "ETH": "Ethereum", "XMR": "Monero",
"ETH": "Ethereum", "LTC": "Litecoin", "XRP": "XRP", "ADA": "Cardano",
"XMR": "Monero", "DOGE": "Dogecoin", "SOL": "Solana", "DOT": "Polkadot",
"LTC": "Litecoin", "USDT": "Tether", "TON": "Toncoin", "USDC": "USD Coin",
"XRP": "XRP", "TRX": "TRON", "AVAX": "Avalanche", "BCH": "Bitcoin Cash",
"ADA": "Cardano", "ATOM": "Cosmos", "XLM": "Stellar", "SHIB": "Shiba Inu",
"DOGE": "Dogecoin", "UNI": "Uniswap", "LINK": "Chainlink", "ETC": "Ethereum Classic",
"SOL": "Solana", "SUI": "Sui", "NEAR": "NEAR Protocol", "VET": "VeChain",
"DOT": "Polkadot", "FIL": "Filecoin", "XTZ": "Tezos", "ALGO": "Algorand",
"USDT": "Tether", "THETA": "Theta Network", "FTM": "Fantom", "XDAI": "xDai",
"TON": "Toncoin",
"USDC": "USD Coin",
"TRX": "TRON",
"AVAX": "Avalanche",
"BCH": "Bitcoin Cash",
"ATOM": "Cosmos",
"XLM": "Stellar",
"SHIB": "Shiba Inu",
"UNI": "Uniswap",
"LINK": "Chainlink",
"ETC": "Ethereum Classic",
"SUI": "Sui",
"NEAR": "NEAR Protocol",
"VET": "VeChain",
"FIL": "Filecoin",
"XTZ": "Tezos",
"ALGO": "Algorand",
"THETA": "Theta Network",
"FTM": "Fantom",
"XDAI": "xDai",
"RUNE": "THORChain", "RUNE": "THORChain",
} }
def _fmt_num(v, d=3): _CBR_URL = "https://www.cbr.ru/scripts/XML_daily.asp"
p = f"{v:,.{d}f}".replace(",", " ").split(".") _CRYPTO_URL = "https://api.coinlore.net/api/tickers/?limit=100"
i = p[0]
d = p[1].rstrip("0") if len(p) > 1 else "" CACHE_TTL = 300 # seconds
return f"{i},{d}" if d else i
def _fmt_num(value: float, decimals: int = 3) -> str:
if decimals == 0:
return f"{int(value):,}".replace(",", " ")
rounded = round(value, decimals)
int_part = int(rounded)
dec_part = str(rounded - int_part)[2:2 + decimals].rstrip("0")
int_str = f"{int_part:,}".replace(",", " ")
return f"{int_str},{dec_part}" if dec_part else int_str
def _parse_cbr_xml(xml_bytes: bytes) -> tuple[str | None, dict]:
"""Parse CBR XML without bs4/lxml — pure stdlib ElementTree."""
import xml.etree.ElementTree as ET
root = ET.fromstring(xml_bytes)
date_str = root.attrib.get("Date", "")
try:
date = datetime.strptime(date_str, "%d.%m.%Y").strftime("%d.%m.%Y")
except ValueError:
date = date_str
rates: dict[str, dict] = {}
for valute in root.findall("Valute"):
code = valute.findtext("CharCode", "").strip()
if not code or code == "XDR":
continue
try:
nominal = float(valute.findtext("Nominal", "1").replace(",", "."))
value = float(valute.findtext("Value", "0").replace(",", "."))
except ValueError:
continue
rates[code] = {
"name": valute.findtext("Name", code).strip(),
"nominal": nominal,
"rub": value / nominal,
}
return date, rates
@loader.tds @loader.tds
class FinanceMod(loader.Module): class FinanceMod(loader.Module):
strings = { """Курсы валют (ЦБ РФ) и криптовалют (CoinLore)"""
"name": "FinanceMod",
"valute_description": "<кол-во> <код> - курс валюты\n<кол-во> - список", strings = {"name": "FinanceMod"}
"valute_no_args": (
"💵 <b>Курс валюты с сайта </b><a href='https://www.cbr.ru/'>ЦБ(РФ)</a>\n"
"<b>Актуально на</b> <i>{}</i>\n\n<blockquote expandable>{}</blockquote>"
),
"valute_specific": (
"💵 <b>Курс валюты с сайта </b><a href='https://www.cbr.ru/'>ЦБ(РФ)</a>\n"
"<b>Актуально на</b> <i>{}</i>\n\n{}"
),
"valute_not_found": "🚫 Валюта {} не найдена",
"crypto_description": "<кол-во> <код> - курс крипты\n<кол-во> - список",
"crypto_no_args": "💎 <b>Курсы криптовалют</b>\n\n<blockquote expandable>{}</blockquote>",
"crypto_specific": "💎 <b>Курс криптовалюты</b>\n\n{}",
"crypto_not_found": "🚫 Криптовалюта {} не найдена",
"error": "🚫 Ошибка получения данных",
}
def __init__(self): def __init__(self):
self.config = loader.ModuleConfig( self.config = loader.ModuleConfig(
@@ -152,149 +124,194 @@ class FinanceMod(loader.Module):
"crypto_currency", "crypto_currency",
"USD", "USD",
lambda: "Валюта для отображения крипты (USD, RUB, EUR)", lambda: "Валюта для отображения крипты (USD, RUB, EUR)",
validator=loader.validators.Choice(["USD", "RUB", "EUR"]) validator=loader.validators.Choice(["USD", "RUB", "EUR"]),
) )
) )
# Simple in-process cache
self._cbr_cache: tuple[float, str, dict] | None = None # (ts, date, rates)
self._crypto_cache: tuple[float, list] | None = None # (ts, data)
async def _get_curr_data(self): # ──────────────────────────── HTTP helpers ────────────────────────────
async def _fetch(self, url: str, *, as_json: bool = False):
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
resp.raise_for_status()
return await resp.json() if as_json else await resp.read()
# ──────────────────────────── CBR data ────────────────────────────────
async def _cbr_data(self) -> tuple[str | None, dict]:
now = time.monotonic()
if self._cbr_cache and now - self._cbr_cache[0] < CACHE_TTL:
return self._cbr_cache[1], self._cbr_cache[2]
try: try:
r = requests.get("https://www.cbr.ru/scripts/XML_daily.asp") raw = await self._fetch(_CBR_URL)
s = bs4.BeautifulSoup(r.content, 'xml') date, rates = _parse_cbr_xml(raw)
d = datetime.strptime(s.ValCurs['Date'], "%d.%m.%Y").strftime("%d.%m.%Y") self._cbr_cache = (now, date, rates)
return d, s.find_all('Valute') return date, rates
except: except Exception:
return None, None if self._cbr_cache:
return self._cbr_cache[1], self._cbr_cache[2]
return None, {}
async def _get_rates(self): # ──────────────────────────── Crypto data ─────────────────────────────
async def _crypto_data(self) -> list:
now = time.monotonic()
if self._crypto_cache and now - self._crypto_cache[0] < CACHE_TTL:
return self._crypto_cache[1]
try: try:
r = requests.get("https://www.cbr.ru/scripts/XML_daily.asp") js = await self._fetch(_CRYPTO_URL, as_json=True)
s = bs4.BeautifulSoup(r.content, 'xml') data = js.get("data", [])
rt = {'USD': None, 'EUR': None} self._crypto_cache = (now, data)
for v in s.find_all('Valute'): return data
if v.CharCode.text in ['USD', 'EUR']: except Exception:
n = float(v.Nominal.text.replace(',', '.')) return self._crypto_cache[1] if self._crypto_cache else []
vl = float(v.Value.text.replace(',', '.'))
rt[v.CharCode.text] = vl / n
if rt['USD'] and rt['EUR']:
rt['EUR_USD'] = rt['USD'] / rt['EUR']
else:
rt['EUR_USD'] = None
return rt
except:
return None
async def _fmt_curr(self, v, a=1): # ──────────────────────────── Formatters ──────────────────────────────
if v.CharCode.text == "XDR":
return None
c = v.CharCode.text
n = v.Name.text
v = float(v.Value.text.replace(',', '.')) / float(v.Nominal.text.replace(',', '.'))
t = v * a
ts = _fmt_num(t, 3)
return f"{_FLAGS.get(c, '🏳')} [{a}] {n} ({c}) - {ts} руб."
async def _get_crypto(self): def _fmt_valute(self, code: str, info: dict, amount: float = 1.0) -> str:
total = info["rub"] * amount
flag = _FLAGS.get(code, "🏳")
return f"{flag} [{_fmt_num(amount, 0)}] {info['name']} ({code}) — {_fmt_num(total, 3)}"
def _fmt_crypto(self, coin: dict, rates: dict, amount: float = 1.0) -> str:
symbol = coin["symbol"].upper()
try: try:
return requests.get("https://api.coinlore.net/api/tickers/").json().get('data', []) price_usd = float(coin["price_usd"])
except: except (KeyError, ValueError, TypeError):
return None return ""
async def _fmt_crypto(self, c, a=1): currency = self.config["crypto_currency"]
r = await self._get_rates() if currency == "RUB":
if not r: usd_rate = rates.get("USD", {}).get("rub")
return "🚫 Ошибка получения курсов валют" if not usd_rate:
cr = self.config["crypto_currency"] return ""
try: price = price_usd * usd_rate
p = float(c['price_usd']) sign = ""
except: elif currency == "EUR":
return "🚫 Ошибка данных криптовалюты" usd_rate = rates.get("USD", {}).get("rub")
if cr == "RUB": eur_rate = rates.get("EUR", {}).get("rub")
if not r['USD']: if not usd_rate or not eur_rate:
return "🚫 Курс USD не найден" return ""
p *= r['USD'] price = price_usd * (usd_rate / eur_rate)
elif cr == "EUR": sign = ""
if not r['EUR_USD']: else:
return "🚫 Курс EUR/USD не рассчитан" price = price_usd
p *= r['EUR_USD'] sign = "$"
t = p * a
ts = _fmt_num(t)
s = c['symbol'].upper()
e = _CRYPTO_EMOJIS.get(s, "💠")
n = _CRYPTO_LIST.get(s, c['name'])
cs = {"USD": "$", "RUB": "", "EUR": ""}.get(cr, "$")
return f"{e} [{a}] {n} ({s}) - {ts}{cs}"
@loader.command() total = price * amount
async def valutecmd(self, m): emoji = _CRYPTO_EMOJIS.get(symbol, "💠")
"""[count] [usd, eur, ...]""" name = _CRYPTO_NAMES.get(symbol, coin.get("name", symbol))
a = utils.get_args(m) return f"{emoji} [{_fmt_num(amount, 0)}] {name} ({symbol}) — {_fmt_num(total, 3)}{sign}"
d, v = await self._get_curr_data()
if not d or not v: # ──────────────────────────── Commands ────────────────────────────────
return await utils.answer(m, self.strings["error"])
if len(a) == 0: @loader.command(ru_doc="[кол-во] [код] — курс валюты по ЦБ РФ")
l = [] async def valutecmd(self, message):
for x in v: """[amount] [code] — exchange rates from CBR"""
if (n := await self._fmt_curr(x)): args = utils.get_args(message)
l.append(n) date, rates = await self._cbr_data()
await utils.answer(m, self.strings["valute_no_args"].format(d, "\n".join(l)))
elif len(a) == 1: if not rates:
return await utils.answer(message, "🚫 Не удалось получить данные ЦБ РФ")
header = (
f"💵 <b>Курс валюты</b> · <a href='https://www.cbr.ru/'>ЦБ РФ</a>\n"
f"<b>Актуально на</b> <i>{date}</i>\n\n"
)
# .valute — список всех, кол-во = 1
if not args:
lines = [self._fmt_valute(c, i) for c, i in rates.items()]
return await utils.answer(
message,
header + f"<blockquote expandable>{chr(10).join(lines)}</blockquote>",
)
# Первый аргумент: число или код валюты?
amount = 1.0
code = None
arg0 = args[0].upper()
if len(args) >= 2:
# .valute 100 USD
try: try:
am = float(a[0]) amount = float(args[0].replace(",", "."))
l = [] except ValueError:
for x in v: return await utils.answer(message, "🚫 Некорректное число")
if (n := await self._fmt_curr(x, am)): code = args[1].upper()
l.append(n) else:
await utils.answer(m, self.strings["valute_no_args"].format(d, "\n".join(l))) # .valute USD или .valute 100
except:
await utils.answer(m, "🚫 Некорректное число")
elif len(a) == 2:
try: try:
am = float(a[0]) amount = float(arg0.replace(",", "."))
c = a[1].upper() # число без кода — список с умножением
for x in v: except ValueError:
if x.CharCode.text == c: code = arg0
if (n := await self._fmt_curr(x, am)):
return await utils.answer(m, self.strings["valute_specific"].format(d, n))
await utils.answer(m, self.strings["valute_not_found"].format(c))
except:
await utils.answer(m, "🚫 Некорректное число")
@loader.command() if code:
async def cryptocmd(self, m): if code not in rates:
"""[count] [ton, btc, ...]""" return await utils.answer(message, f"🚫 Валюта <b>{code}</b> не найдена")
a = utils.get_args(m) line = self._fmt_valute(code, rates[code], amount)
c = await self._get_crypto() return await utils.answer(message, header + line)
if not c:
return await utils.answer(m, self.strings["error"]) # список с кол-вом
try: lines = [self._fmt_valute(c, i, amount) for c, i in rates.items()]
if len(a) == 0: await utils.answer(
f = [x for x in c if x['symbol'].upper() in _CRYPTO_LIST] message,
l = [] header + f"<blockquote expandable>{chr(10).join(lines)}</blockquote>",
for x in f: )
if (n := await self._fmt_crypto(x)):
l.append(n) @loader.command(ru_doc="[кол-во] [код] — курс крипты")
await utils.answer(m, self.strings["crypto_no_args"].format("\n".join(l))) async def cryptocmd(self, message):
elif len(a) == 1: """[amount] [symbol] — crypto rates from CoinLore"""
am = float(a[0]) args = utils.get_args(message)
f = [x for x in c if x['symbol'].upper() in _CRYPTO_LIST] coins = await self._crypto_data()
l = [] _, rates = await self._cbr_data()
for x in f:
if (n := await self._fmt_crypto(x, am)): if not coins:
l.append(n) return await utils.answer(message, "🚫 Не удалось получить данные крипты")
await utils.answer(m, self.strings["crypto_no_args"].format("\n".join(l)))
elif len(a) == 2: header = f"💎 <b>Курсы криптовалют</b> · <i>{self.config['crypto_currency']}</i>\n\n"
am = float(a[0])
t = a[1].upper() amount = 1.0
f = False symbol = None
for x in c:
if x['symbol'].upper() == t: if not args:
if (n := await self._fmt_crypto(x, am)): pass # список, amount=1
f = True elif len(args) == 1:
await utils.answer(m, self.strings["crypto_specific"].format(n)) try:
break amount = float(args[0].replace(",", "."))
if not f: except ValueError:
await utils.answer(m, self.strings["crypto_not_found"].format(t)) symbol = args[0].upper()
except ValueError: else:
await utils.answer(m, "🚫 Некорректное число") try:
except Exception as e: amount = float(args[0].replace(",", "."))
await utils.answer(m, f"🚫 Ошибка: {str(e)}") except ValueError:
return await utils.answer(message, "🚫 Некорректное число")
symbol = args[1].upper()
if symbol:
coin = next((c for c in coins if c["symbol"].upper() == symbol), None)
if not coin:
return await utils.answer(message, f"🚫 Крипта <b>{symbol}</b> не найдена")
line = self._fmt_crypto(coin, rates, amount)
if not line:
return await utils.answer(message, "🚫 Ошибка форматирования")
return await utils.answer(message, header + line)
# список только известных монет
known = {c["symbol"].upper(): c for c in coins if c["symbol"].upper() in _CRYPTO_NAMES}
# сортируем по порядку _CRYPTO_NAMES
lines = []
for sym in _CRYPTO_NAMES:
if sym in known:
line = self._fmt_crypto(known[sym], rates, amount)
if line:
lines.append(line)
await utils.answer(
message,
header + f"<blockquote expandable>{chr(10).join(lines)}</blockquote>",
)

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,5 @@
ChatCopy.py
Gemini.py
GiftFinder.py
MaillingChatGT99.py
NekoEditorMod.py

View File

@@ -4,4 +4,6 @@
.ruff_cache .ruff_cache
ruff.log ruff.log
ruff.log.2 ruff.log.2
ruff.toml ruff.toml
# Heroku files
heroku/

438
archquise/q.mods/QNotes.py Normal file
View File

@@ -0,0 +1,438 @@
__version__ = (1, 1, 6)
# █▀▀▄ █▀▄▀█ █▀█ █▀▄ █▀
# ▀▀▀█ ▄ █ ▀ █ █▄█ █▄▀ ▄█
# #### Copyright (c) 2026 Archquise #####
# 💬 Contact: https://t.me/archquise
# 🔒 Licensed under the GNU AGPLv3.
# 📄 LICENSE: https://raw.githubusercontent.com/archquise/Q.Mods/main/LICENSE
# ---------------------------------------------------------------------------------
# Name: QNotes
# Description: A notes module that just works
# Author: @quise_m
# ---------------------------------------------------------------------------------
# meta developer: @quise_m
# meta banner: https://raw.githubusercontent.com/archquise/qmods_meta/main/qnotes.png
# ---------------------------------------------------------------------------------
import asyncio
import logging
import re
from datetime import date
from typing import cast
from herokutl.tl.functions.users import GetUsersRequest
from herokutl.tl.types import InputUserSelf
from .. import loader, utils
logger = logging.getLogger(__name__)
@loader.tds
class QNotes(loader.Module):
"""A notes module that just works\nUsage: #notetag in any chat"""
strings = {
"name": "QNotes",
"topic_desc": "Stores your notes content\nUsage: #notetag in any chat",
"wrongargs": "<emoji document_id=5980953710157632545>❌</emoji> <b>Wrong arguments. Check command usage.</b>",
"not_exist": "There is no such note!",
"no_reply": "No reply! Reply to the message, which text will become a note.",
"already_exists": "Seems like note with the same tag already exists. Overwrite?",
"show_note_inline": "<blockquote>#{}</blockquote>\n\n<blockquote>{}</blockquote>",
"notelist": "Note list:",
"msg_not_found_inline": "Message with this note wasn't found. Probably, it was been removed. Note has been removed from the database.",
"remnote_inline": "🗑 Remove",
"close_inline": "❌ Close",
"yes": "✔️ Yes",
"no": "❌ No",
"true": "yes",
"false": "no",
"saved": "Note saved!",
"removed": "Note removed!",
"nonotes": "You don't have any notes!",
"privacy_switch": "Determines whose data will be used by the my_* placeholders\n\nTrue - the account that is issuing the note\nFalse - the account on which the userbot is running",
"note_prefix": "The prefix used to call up notes",
"placeholders": """
<b>Available placeholders</b>:
about the account on which userbot is installed:
{my_id} - ID
@{my_username} - username, tag
{my_phone} - phone number
{my_premium} - premium status (yes/no)
about reply author:
{reply_id} - ID
{reply_name} - name
{reply_surname} - surname
{reply_fullname} - full name (name + surname (if specified))
@{reply_username} - username, tag
{reply_phone} - phone number (if not hidden)
{reply_premium} - premium status (yes/no)
general:
{today} - current date
""",
}
strings_ru = {
"_cls_doc": "Модуль для заметок, который просто работает\nИспользование: #тегзаметки в любом чате",
"topic_desc": "Хранит содержимое ваших заметок\nИспользование: #тегзаметки в любом чате",
"wrongargs": "<emoji document_id=5980953710157632545>❌</emoji> <b>Неверные аргументы. Проверьте использование команды.</b>",
"no_reply": "Нет реплая! Ответьте на сообщение, текст которого станет заметкой.",
"not_exist": "Такой заметки не найдено!",
"already_exists": "Кажется, заметка с таким тегом уже существует. Перезаписать?",
"show_note_inline": "<blockquote>#{}</blockquote>\n\n<blockquote>{}</blockquote>",
"notelist": "Список заметок:",
"msg_not_found_inline": "Сообщение с этой заметкой не было найдено. Вероятно, оно было удалено. Заметка очищена из базы данных.",
"remnote_inline": "🗑 Удалить",
"close_inline": "❌ Закрыть",
"yes": "✔️ Да",
"no": "❌ Нет",
"saved": "Заметка сохранена!",
"removed": "Заметка удалена!",
"true": "да",
"false": "нет",
"nonotes": "Нет заметок!",
"privacy_switch": "Влияет на то, чьи данные будут использовать my_* плейсхолдеры\n\nTrue - аккаунта, который вызывает заметку\nFalse - аккаунта на котором стоит юзербот",
"note_prefix": "Префикс, с которым вызываются заметки",
"placeholders": """
<b>Доступные плейсхолдеры</b>:
об аккаунте, на котором стоит юзербот:
{my_id} - айди
@{my_username} - юзернейм, тег
{my_phone} - номер телефона
{my_premium} - статус премиум (да/нет)
об авторе реплая:
{reply_id} - айди
{reply_name} - имя
{reply_surname} - фамилия
{reply_fullname} - полное имя (имя + фамилия (если указана))
@{reply_username} - юзернейм, тег
{reply_phone} - номер телефона (если не скрыт)
{reply_premium} - статус премиум (да/нет)
общее:
{today} - текущая дата
""",
}
def __init__(self):
self.config = loader.ModuleConfig(
loader.ConfigValue(
"privacy_switch",
True,
lambda: self.strings["privacy_switch"],
validator=loader.validators.Boolean(), # type: ignore
),
loader.ConfigValue(
"note_prefix",
"#",
lambda: self.strings["note_prefix"],
validator=loader.validators.RegExp(r"^\S+$"), # type: ignore
),
)
async def client_ready(self, client, db): # type: ignore
self._content_channel_id = await utils.wait_for_content_channel(self._db)
self._notes_topic = await utils.asset_forum_topic(
client=self._client,
db=self._db,
peer=self._content_channel_id, # type: ignore
title="QNotes | Storage",
description=self.strings["topic_desc"],
icon_emoji_id=5272001961326049733,
)
self.my_phone = (await self._client(GetUsersRequest(id=[InputUserSelf()])))[
0
].phone
self.placeholders = {
"my_phone": self.my_phone,
"my_username": self._client.heroku_me.username,
"my_id": self.tg_id,
"my_premium": self.strings["true"]
if self._client.heroku_me.premium
else self.strings["false"],
}
self._notemap = cast(dict, self.pointer("notemap", default={}))
async def _ask_overwrite(self, message):
loop = asyncio.get_running_loop()
future = loop.create_future()
form = await self.inline.form(
self.strings["already_exists"],
message=message,
reply_markup=[
[
{
"text": self.strings["yes"],
"callback": (
lambda call, flag: (
future.set_result(flag) if not future.done() else None
)
),
"args": (True,),
},
{
"text": self.strings["no"],
"callback": (
lambda call, flag: (
future.set_result(flag) if not future.done() else None
)
),
"args": (False,),
},
]
],
)
try:
async with asyncio.timeout(15):
overwrite_answer = await future
except TimeoutError:
await form.delete() # type: ignore
return False, message
if not overwrite_answer:
await form.delete() # type: ignore
return False, form
return True, form
async def _show_note_inline(self, call, note, page=0):
async def _remnote(call, notetag, note_msg):
await note_msg.delete()
self._notemap.pop(notetag, None)
await call.edit(self.strings["removed"])
note_msg = await self._client.get_messages(
self._content_channel_id, ids=note[1]
)
if not note_msg:
self._notemap.pop(note[0], None)
await call.edit(
self.strings["msg_not_found_inline"],
reply_markup=[
{"text": "⬅️ Назад", "callback": self._list_page, "args": (page,)},
{"text": self.strings["close_inline"], "action": "close"},
],
)
return
await call.edit(
self.strings["show_note_inline"].format(note[0], note_msg.text), # type: ignore
reply_markup=[
[
{"text": "⬅️ Назад", "callback": self._list_page, "args": (page,)},
{
"text": self.strings["remnote_inline"],
"callback": _remnote,
"args": (note[0], note_msg),
},
],
[{"text": self.strings["close_inline"], "action": "close"}],
],
)
def _build_list_markup(self, page: int):
items = list(self._notemap.items())
total = -(-len(items) // 3)
page = max(0, min(page, total - 1))
rows = [
[
{
"text": notetag,
"callback": self._show_note_inline,
"args": ([notetag, msg_id], page),
}
]
for notetag, msg_id in items[page * 3 : (page + 1) * 3]
]
return (
rows
+ self.inline.build_pagination(
callback=self._list_page, # type: ignore
total_pages=total,
current_page=page + 1,
)
+ [[{"text": self.strings["close_inline"], "action": "close"}]]
)
async def _list_page(self, call, page):
await call.edit(
text=self.strings["notelist"], reply_markup=self._build_list_markup(page)
)
@loader.command(
ru_doc="Сохраняет заметку под тегом | Пример: .qnsave заметка",
en_doc="Saves note by tag | Example: .qnsave note",
)
async def qnsave(self, message) -> None:
args = utils.get_args(message)
if not args:
await utils.answer(message, self.strings["wrongargs"])
return
current_message = message
if not (reply := await message.get_reply_message()):
await utils.answer(message, self.strings["no_reply"])
return
try:
if args[0].strip() in self._notemap:
need_overwrite, msg = await self._ask_overwrite(message)
if not need_overwrite:
return
old_note_message = await self._client.get_messages(
self._content_channel_id,
ids=self._notemap[args[0].strip()],
)
old_note_message and await old_note_message.delete() # type: ignore
current_message = msg
note_message = await self._client.send_message(
self._content_channel_id,
reply.text,
reply_to=self._notes_topic.id,
file=reply.media,
)
self._notemap[args[0].strip()] = note_message.id
except Exception as e:
await utils.answer(current_message, f"Произошла ошибка: {e}")
logger.exception("Произошла ошибка при сохранении заметки!")
return
await utils.answer(current_message, self.strings["saved"])
@loader.command(
ru_doc="Удаляет заметку по тегу | Пример: .qnrem заметка",
en_doc="Removes note by tag | Example: .qnrem note",
)
async def qnrem(self, message) -> None:
args = utils.get_args(message)
if not args:
await utils.answer(message, self.strings["wrongargs"])
return
if args[0] not in self._notemap or not (
note_message := await self._client.get_messages(
self._content_channel_id,
ids=self._notemap[args[0]],
)
):
await utils.answer(message, self.strings["not_exist"])
return
await note_message.delete() # type: ignore
self._notemap.pop(args[0], None)
await utils.answer(message, self.strings["removed"])
@loader.command(
ru_doc="Выводит список всех заметок и позволяет управлять ими",
en_doc="Shows note list and allows managing them",
)
async def qnlist(self, message) -> None:
if self._notemap:
await self.inline.form(
text=self.strings["notelist"],
reply_markup=self._build_list_markup(0),
message=message,
)
return
await utils.answer(message, self.strings["nonotes"])
@loader.command(
ru_doc="Выводит список доступных плейсхолдеров",
en_doc="Displays a list of available placeholders",
)
async def qnp(self, message) -> None:
await utils.answer(message, self.strings["placeholders"])
@loader.watcher()
async def _note_watcher(self, message):
if not message.text.startswith(prefix := self.config["note_prefix"]) or not (
await self._client.dispatcher.security.check(message, self._note_watcher)
):
return
notetag = message.text.split(prefix, maxsplit=1)[1]
if notetag in self._notemap:
if not (
note_message := await self._client.get_messages(
self._content_channel_id,
ids=self._notemap[notetag],
)
):
self._notemap.pop(notetag, None)
return
notetext = note_message.text or "" # type: ignore
if re.search(r"\{\w+\}", notetext):
if (
not self.config["privacy_switch"]
or message.sender_id == self._client.heroku_me.id
):
placeholders = {**self.placeholders}
else:
message_author_entity = await self._client.get_entity(
message.sender_id
)
placeholders = {
"my_phone": (
await self._client(GetUsersRequest(id=[message.sender_id]))
)[0].phone,
"my_username": message_author_entity.username,
"my_id": message.sender_id,
"my_premium": self.strings["true"]
if message_author_entity.premium
else self.strings["false"],
}
if reply_msg := await message.get_reply_message():
reply_user = await self._client.get_entity(reply_msg.sender_id)
placeholders = {
**placeholders,
"reply_id": reply_user.id,
"reply_fullname": " ".join(
filter(None, [reply_user.first_name, reply_user.last_name])
),
"reply_name": reply_user.first_name,
"reply_surname": reply_user.last_name,
"reply_phone": (
await self._client(GetUsersRequest(id=[reply_user.id]))
)[0].phone,
"reply_username": reply_user.username,
"reply_premium": self.strings["true"]
if reply_user.premium
else self.strings["false"],
}
placeholders = placeholders | {"today": date.today()}
def replacer(match):
key = match.group(1)
if key not in placeholders or not placeholders[key]:
return match.group(0)
return utils.escape_html(str(placeholders[key]))
notetext = re.sub(r"\{(\w+)\}", replacer, notetext)
if media := note_message.media: # type: ignore
await utils.answer_file(message, media, notetext) # type: ignore
else:
await utils.answer(message, notetext)
return

View File

@@ -59,7 +59,7 @@ class FaceMod(loader.Module):
en_doc="Random kaomoji", en_doc="Random kaomoji",
) )
async def rfacecmd(self, message) -> None: # noqa: D102, ANN001 async def rfacecmd(self, message) -> None: # noqa: D102, ANN001
await utils.answer(message, self.strings("loading")) await utils.answer(message, self.strings["loading"])
url = "https://files.archquise.ru/kaomoji.txt" url = "https://files.archquise.ru/kaomoji.txt"
@@ -72,7 +72,7 @@ class FaceMod(loader.Module):
kaomoji = random.choice(kaomoji_list) # noqa: S311 kaomoji = random.choice(kaomoji_list) # noqa: S311
await utils.answer( await utils.answer(
message, message,
self.strings("random_face").format(kaomoji), self.strings["random_face"].format(kaomoji),
) )
else: else:
await utils.answer(message, self.strings("error")) await utils.answer(message, self.strings["error"])

View File

@@ -113,24 +113,24 @@ class ShortenerMod(loader.Module):
async def shortencmd(self, message): # noqa: ANN001, ANN201 async def shortencmd(self, message): # noqa: ANN001, ANN201
"""Shorten URL using bit.ly API.""" """Shorten URL using bit.ly API."""
if self.config["token"] is None: if self.config["token"] is None:
await utils.answer(message, self.strings("no_api")) await utils.answer(message, self.strings["no_api"])
return return
args = utils.get_args_raw(message) args = utils.get_args_raw(message)
if not args: if not args:
await utils.answer(message, self.strings("no_args")) await utils.answer(message, self.strings["no_args"])
return return
if not self._validate_url(args): if not self._validate_url(args):
await utils.answer(message, self.strings("invalid_url")) await utils.answer(message, self.strings["invalid_url"])
return return
try: try:
short_url = await self.shorten_url(url=args, token=self.config["token"]) short_url = await self.shorten_url(url=args, token=self.config["token"])
await utils.answer(message, self.strings("shortencmd").format(c=short_url)) await utils.answer(message, self.strings["shortencmd"].format(c=short_url))
except Exception as e: except Exception as e:
logger.exception("Error shortening URL!") logger.exception("Error shortening URL!")
await utils.answer(message, self.strings("api_error").format(error=str(e))) await utils.answer(message, self.strings["api_error"].format(error=str(e)))
@loader.command( @loader.command(
ru_doc="Посмотреть статистику ссылки через bit.ly (ссылка без https:// | Доступно только на платных аккаунтах)", ru_doc="Посмотреть статистику ссылки через bit.ly (ссылка без https:// | Доступно только на платных аккаунтах)",
@@ -139,22 +139,22 @@ class ShortenerMod(loader.Module):
async def statclcmd(self, message): # noqa: ANN001, ANN201 async def statclcmd(self, message): # noqa: ANN001, ANN201
"""Get click statistics for shortened URL.""" """Get click statistics for shortened URL."""
if self.config["token"] is None: if self.config["token"] is None:
await utils.answer(message, self.strings("no_api")) await utils.answer(message, self.strings["no_api"])
return return
args = utils.get_args_raw(message) args = utils.get_args_raw(message)
if not args: if not args:
await utils.answer(message, self.strings("no_args")) await utils.answer(message, self.strings["no_args"])
return return
try: try:
if not args.startswith("bit.ly/"): if not args.startswith("bit.ly/"):
await utils.answer(message, self.strings("invalid_url")) await utils.answer(message, self.strings["invalid_url"])
return return
clicks = await self.get_bitlink_stats( clicks = await self.get_bitlink_stats(
bitlink=args, token=self.config["token"] bitlink=args, token=self.config["token"]
) )
await utils.answer(message, self.strings("statclcmd").format(c=clicks)) await utils.answer(message, self.strings["statclcmd"].format(c=clicks))
except Exception as e: except Exception as e:
logger.exception("Error getting statistics!") logger.exception("Error getting statistics!")
await utils.answer(message, self.strings("api_error").format(error=str(e))) await utils.answer(message, self.strings["api_error"].format(error=str(e)))

View File

@@ -26,7 +26,6 @@ from .. import loader, utils
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class Banners: class Banners:
def __init__( def __init__(
self, self,
@@ -324,6 +323,16 @@ class YaMusicMod(loader.Module):
"name": "YaMusic" "name": "YaMusic"
} }
duration_placeholder = {
"start_duration": "<tg-emoji emoji-id=5262663495538742892>☀️</tg-emoji><tg-emoji emoji-id=5260381609479153468>☀️</tg-emoji>",
"start_full_duration": "<tg-emoji emoji-id=5262663495538742892>☀️</tg-emoji><tg-emoji emoji-id=5260609582048254485>☀️</tg-emoji>",
"closed_duration": "<tg-emoji emoji-id=5260467667738859177>☀️</tg-emoji>",
"empty_mid": "<tg-emoji emoji-id=5260415715814448198>☀️</tg-emoji>",
"empty_closed_duration_duration": "<tg-emoji emoji-id=5260239235608255208>☀️</tg-emoji>",
"end_duration_full": "<tg-emoji emoji-id=5260467667738859177>☀️</tg-emoji>",
"empty_closed_duration": "<tg-emoji emoji-id=5260239235608255208>☀️</tg-emoji>",
}
def __init__(self): def __init__(self):
self.config = loader.ModuleConfig( self.config = loader.ModuleConfig(
loader.ConfigValue( loader.ConfigValue(
@@ -553,67 +562,19 @@ class YaMusicMod(loader.Module):
return "0%" return "0%"
percent = (progress / duration) * 100 percent = (progress / duration) * 100
fill_logic = int(percent // 16.66)
s_less_10 = ( bar = self.duration_placeholder["start_full_duration"] if fill_logic >= 1 else self.duration_placeholder["start_duration"]
"<emoji document_id=5454137780454067986></emoji>" for i in range(2, 6):
"<emoji document_id=6158923355173949539>⭐</emoji>" if fill_logic >= i:
"<emoji document_id=6159012102083188132>⭐</emoji>" bar += self.duration_placeholder["closed_duration"]
"<emoji document_id=6159012102083188132>⭐</emoji>" else:
"<emoji document_id=6158753257289158944>⭐</emoji>" bar += self.duration_placeholder["empty_mid"]
"<emoji document_id=6156700344526049665>⭐</emoji>" if fill_logic >= 6:
) bar += self.duration_placeholder["end_duration_full"]
s_10_to_20 = (
"<emoji document_id=5454137780454067986></emoji>"
"<emoji document_id=6159095673556840262>⭐</emoji>"
"<emoji document_id=6159012102083188132>⭐</emoji>"
"<emoji document_id=6156933677214341691>⭐</emoji>"
"<emoji document_id=6158753257289158944>⭐</emoji>"
"<emoji document_id=6156700344526049665>⭐</emoji>"
)
s_30_to_40 = (
"<emoji document_id=5454137780454067986></emoji>"
"<emoji document_id=5454397458471750662></emoji>"
"<emoji document_id=5454397458471750662></emoji>"
"<emoji document_id=6158923355173949539>⭐</emoji>"
"<emoji document_id=6159012102083188132>⭐</emoji>"
"<emoji document_id=6156700344526049665>⭐</emoji>"
)
s_over_50 = (
"<emoji document_id=5454137780454067986></emoji>"
"<emoji document_id=5454397458471750662></emoji>"
"<emoji document_id=5454397458471750662></emoji>"
"<emoji document_id=5454397458471750662></emoji>"
"<emoji document_id=6156933677214341691>⭐</emoji>"
"<emoji document_id=6156700344526049665>⭐</emoji>"
)
s_over_80 = (
"<emoji document_id=5454137780454067986></emoji>"
"<emoji document_id=5454397458471750662></emoji>"
"<emoji document_id=5454397458471750662></emoji>"
"<emoji document_id=5454397458471750662></emoji>"
"<emoji document_id=5454397458471750662></emoji>"
"<emoji document_id=6156700344526049665>⭐</emoji>"
)
if percent < 10:
return s_less_10
elif percent < 20:
return s_10_to_20
elif percent < 30:
return s_10_to_20
elif percent < 40:
return s_30_to_40
elif percent < 50:
return s_30_to_40
elif percent < 80:
return s_over_50
else: else:
return s_over_80 bar += self.duration_placeholder["empty_closed_duration"]
return bar
except Exception as e: except Exception as e:
return f"Error: {e}" return f"Error: {e}"

View File

@@ -22,4 +22,4 @@ chatmodule
stats stats
tagwatcher tagwatcher
hardspam hardspam
YaMusic YaMusic

View File

@@ -21,6 +21,7 @@ import random
import string import string
import asyncio import asyncio
import logging import logging
import re
from PIL import Image, UnidentifiedImageError from PIL import Image, UnidentifiedImageError
from telethon.tl.functions.stickers import CreateStickerSetRequest from telethon.tl.functions.stickers import CreateStickerSetRequest
@@ -38,11 +39,12 @@ except AttributeError:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
STATIC_STICKER_LIMIT = 120
EMOJI_LIMIT = 200
async def process_to_webp(input_path: str, output_path: str, size: int = 512) -> bool: async def process_to_webp(input_path: str, output_path: str, size: int = 512) -> bool:
try: try:
is_video = input_path.lower().endswith(('.mp4', '.webm', '.mov')) or b'ftyp' in open(input_path, 'rb').read(32) is_video = input_path.lower().endswith((".mp4", ".webm", ".mov")) or b"ftyp" in open(input_path, "rb").read(32)
if is_video: if is_video:
cap = cv2.VideoCapture(input_path) cap = cv2.VideoCapture(input_path)
success, frame = cap.read() success, frame = cap.read()
@@ -87,7 +89,7 @@ async def process_to_webp(input_path: str, output_path: str, size: int = 512) ->
async def process_to_png(input_path: str, output_path: str, size: int = 100) -> bool: async def process_to_png(input_path: str, output_path: str, size: int = 100) -> bool:
try: try:
is_video = input_path.lower().endswith(('.mp4', '.webm', '.mov')) or b'ftyp' in open(input_path, 'rb').read(32) is_video = input_path.lower().endswith((".mp4", ".webm", ".mov")) or b"ftyp" in open(input_path, "rb").read(32)
if is_video: if is_video:
cap = cv2.VideoCapture(input_path) cap = cv2.VideoCapture(input_path)
success, frame = cap.read() success, frame = cap.read()
@@ -137,8 +139,10 @@ class CreatePacks(loader.Module):
"processing": "<b>[CreatePacks]</b> Collecting avatars of participants...", "processing": "<b>[CreatePacks]</b> Collecting avatars of participants...",
"no_avatars": "<b>[CreatePacks]</b> No members with avatars", "no_avatars": "<b>[CreatePacks]</b> No members with avatars",
"no_valid": "<b>[CreatePacks]</b> Could not process any avatars", "no_valid": "<b>[CreatePacks]</b> Could not process any avatars",
"done_pack": "<b>[CreatePacks]</b> Sticker pack is ready:\n<b>[CreatePacks]</b> Open: <a href='https://t.me/addstickers/{}'>here</a>", "done_pack": "<b>[CreatePacks]</b> Sticker pack is ready:\n<b>[CreatePacks]</b> Open: <a href=\'https://t.me/addstickers/{}\\'>here</a>",
"done_emoji_pack": "<b>[CreatePacks]</b> Emoji pack is ready:\n<b>[CreatePacks]</b> Open: <a href='https://t.me/addstickers/{}'>here</a>", "done_packs": "<b>[CreatePacks]</b> Sticker packs are ready:\n{}",
"done_emoji_pack": "<b>[CreatePacks]</b> Emoji pack is ready:\n<b>[CreatePacks]</b> Open: <a href=\'https://t.me/addstickers/{}\\''>here</a>",
"done_emoji_packs": "<b>[CreatePacks]</b> Emoji packs are ready:\n{}",
"already": "<b>[CreatePacks]</b> A sticker pack with this name already exists.", "already": "<b>[CreatePacks]</b> A sticker pack with this name already exists.",
"emoji_processing": "<b>[CreatePacks]</b> Creating emoji pack from avatars...", "emoji_processing": "<b>[CreatePacks]</b> Creating emoji pack from avatars...",
"emoji_no_emoji": "<b>[CreatePacks]</b> No emoji specified — using", "emoji_no_emoji": "<b>[CreatePacks]</b> No emoji specified — using",
@@ -149,8 +153,10 @@ class CreatePacks(loader.Module):
"processing": "<b>[CreatePacks]</b> Собираю аватарки участников...", "processing": "<b>[CreatePacks]</b> Собираю аватарки участников...",
"no_avatars": "<b>[CreatePacks]</b> Нет участников с аватарками", "no_avatars": "<b>[CreatePacks]</b> Нет участников с аватарками",
"no_valid": "<b>[CreatePacks]</b> Не удалось обработать ни одну аватарку", "no_valid": "<b>[CreatePacks]</b> Не удалось обработать ни одну аватарку",
"done_pack": "<b>[CreatePacks]</b> Стикерпак готов:\n<b>[CreatePacks]</b> Открыть: <a href='https://t.me/addstickers/{}'>здесь</a>", "done_pack": "<b>[CreatePacks]</b> Стикерпак готов:\n<b>[CreatePacks]</b> Открыть: <a href=\'https://t.me/addstickers/{}\\''>здесь</a>",
"done_emoji_pack": "<b>[CreatePacks]</b> Эмодзи-пак готов:\n<b>[CreatePacks]</b> Открыть: <a href='https://t.me/addstickers/{}'>здесь</a>", "done_packs": "<b>[CreatePacks]</b> Стикерпаки готовы:\n{}",
"done_emoji_pack": "<b>[CreatePacks]</b> Эмодзи-пак готов:\n<b>[CreatePacks]</b> Открыть: <a href=\'https://t.me/addstickers/{}\\''>здесь</a>",
"done_emoji_packs": "<b>[CreatePacks]</b> Эмодзи-паки готовы:\n{}",
"already": "<b>[CreatePacks]</b> Стикерпак с таким именем уже существует", "already": "<b>[CreatePacks]</b> Стикерпак с таким именем уже существует",
"emoji_processing": "<b>[CreatePacks]</b> Создаю эмодзи-пак из аватаров...", "emoji_processing": "<b>[CreatePacks]</b> Создаю эмодзи-пак из аватаров...",
"emoji_no_emoji": "<b>[CreatePacks]</b> Эмодзи не указан — используется", "emoji_no_emoji": "<b>[CreatePacks]</b> Эмодзи не указан — используется",
@@ -169,10 +175,6 @@ class CreatePacks(loader.Module):
if len(users) >= 100: if len(users) >= 100:
break break
if not users:
shutil.rmtree(tmp_dir, ignore_errors=True)
return [], tmp_dir
processed = [] processed = []
process_func = process_to_webp if format == "webp" else process_to_png process_func = process_to_webp if format == "webp" else process_to_png
@@ -224,6 +226,43 @@ class CreatePacks(loader.Module):
return processed, tmp_dir return processed, tmp_dir
async def _create_sticker_pack(self, message, stickers_to_add, is_emoji_pack: bool, pack_number: int = 1, emoji: str = "🖼️"):
random_str = ''.join(random.choices(string.ascii_lowercase + string.digits, k=10))
short_name = f"pack_{random_str}_by_fcreate"
chat = await message.get_chat()
chat_title = getattr(chat, 'title', 'Chat')
title_prefix = "Ava" if not is_emoji_pack else "Emoji"
full_title = f"{chat_title} {title_prefix} #{pack_number}"
try:
await self._client(CreateStickerSetRequest(
user_id="me",
title=full_title,
short_name=short_name,
stickers=stickers_to_add,
emojis=is_emoji_pack
))
return short_name, full_title
except PackShortNameOccupiedError:
random_str = ''.join(random.choices(string.ascii_lowercase + string.digits, k=12))
short_name = f"pack_{random_str}_by_fcreate"
try:
await self._client(CreateStickerSetRequest(
user_id="me",
title=full_title,
short_name=short_name,
stickers=stickers_to_add,
emojis=is_emoji_pack
))
return short_name, full_title
except:
return "already_exists", None
except Exception as e:
logger.error(f"Error creating pack: {e}")
return None, None
@loader.command( @loader.command(
ru_doc="- Создать стикерпак из аватаров в группе", ru_doc="- Создать стикерпак из аватаров в группе",
only_groups=True only_groups=True
@@ -236,11 +275,7 @@ class CreatePacks(loader.Module):
if not files: if not files:
return await message.edit(self.strings("no_avatars")) return await message.edit(self.strings("no_avatars"))
tag = ''.join(random.choices(string.ascii_lowercase + string.digits, k=4)) all_stickers = []
short_name = f"f{abs(message.chat_id)}_{tag}_by_fcreateavatars"
title = f"AvaPack {tag}"
stickers = []
for path in files: for path in files:
try: try:
await asyncio.sleep(0.3) await asyncio.sleep(0.3)
@@ -248,42 +283,42 @@ class CreatePacks(loader.Module):
msg = await self._client.send_file("me", file, force_document=True) msg = await self._client.send_file("me", file, force_document=True)
doc = msg.document doc = msg.document
await self._client.delete_messages("me", msg.id) await self._client.delete_messages("me", msg.id)
stickers.append(InputStickerSetItem( all_stickers.append(InputStickerSetItem(
document=InputDocument(doc.id, doc.access_hash, doc.file_reference), document=InputDocument(doc.id, doc.access_hash, doc.file_reference),
emoji="🖼️" emoji="🖼️"
)) ))
except Exception as e: except Exception as e:
logger.error(f"Sticker loading error {path}: {e}") logger.error(f"Sticker loading error {path}: {e}")
continue continue
if not stickers: if not all_stickers:
shutil.rmtree(tmp_dir, ignore_errors=True) shutil.rmtree(tmp_dir, ignore_errors=True)
return await message.edit(self.strings("no_valid")) return await message.edit(self.strings("no_valid"))
try: created_packs_links = []
await self._client(CreateStickerSetRequest( pack_number = 1
user_id="me", for i in range(0, len(all_stickers), STATIC_STICKER_LIMIT):
title=title, current_pack_stickers = all_stickers[i : i + STATIC_STICKER_LIMIT]
short_name=short_name, short_name, full_title = await self._create_sticker_pack(message, current_pack_stickers, False, pack_number)
stickers=stickers if short_name == "already_exists":
)) await message.edit(self.strings("already"))
await message.edit(self.strings("done_pack").format(short_name)) shutil.rmtree(tmp_dir, ignore_errors=True)
except PackShortNameOccupiedError: return
await message.edit(self.strings("already")) elif short_name:
except Exception as e: created_packs_links.append(f"<a href=\'https://t.me/addstickers/{short_name}\\''>{full_title}</a>")
error_details = f"❌ Ошибка создания стикерпака:\n<code>{type(e).__name__}: {e}</code>\n" pack_number += 1
error_details += f"Пак: {short_name}\nСтикеров: {len(stickers)}\n"
if files: if created_packs_links:
error_details += f"Последний файл: {files[-1]}\n" if len(created_packs_links) == 1:
try: # Extract short name for the single link format
error_details += f"Размер: {Image.open(files[-1]).size}\n" sn = created_packs_links[0].split('/')[-1].split("'")[0]
error_details += f"Вес: {os.path.getsize(files[-1])} байт" await message.edit(self.strings("done_pack").format(sn))
except: else:
pass await message.edit(self.strings("done_packs").format("\n".join(created_packs_links)))
await message.edit(error_details) else:
logger.exception("Error creating sticker pack") await message.edit(self.strings("no_valid"))
finally:
shutil.rmtree(tmp_dir, ignore_errors=True) shutil.rmtree(tmp_dir, ignore_errors=True)
@loader.command( @loader.command(
ru_doc="[эмодзи] - Создать эмодзи-пак из всех аватаров", ru_doc="[эмодзи] - Создать эмодзи-пак из всех аватаров",
@@ -303,11 +338,7 @@ class CreatePacks(loader.Module):
if not files: if not files:
return await message.edit(self.strings("no_avatars")) return await message.edit(self.strings("no_avatars"))
tag = ''.join(random.choices(string.ascii_lowercase + string.digits, k=4)) all_emojis = []
short_name = f"f{abs(message.chat_id)}_{tag}_by_fcreateemojis"
title = f"EmojiPack {tag}"
stickers = []
for path in files: for path in files:
try: try:
await asyncio.sleep(0.3) await asyncio.sleep(0.3)
@@ -315,7 +346,7 @@ class CreatePacks(loader.Module):
msg = await self._client.send_file("me", file, force_document=True) msg = await self._client.send_file("me", file, force_document=True)
doc = msg.document doc = msg.document
await self._client.delete_messages("me", msg.id) await self._client.delete_messages("me", msg.id)
stickers.append(InputStickerSetItem( all_emojis.append(InputStickerSetItem(
document=InputDocument(doc.id, doc.access_hash, doc.file_reference), document=InputDocument(doc.id, doc.access_hash, doc.file_reference),
emoji=emoji emoji=emoji
)) ))
@@ -323,32 +354,30 @@ class CreatePacks(loader.Module):
logger.error(f"Error loading emoji {path}: {e}") logger.error(f"Error loading emoji {path}: {e}")
continue continue
if not stickers: if not all_emojis:
shutil.rmtree(tmp_dir, ignore_errors=True) shutil.rmtree(tmp_dir, ignore_errors=True)
return await message.edit(self.strings("no_valid")) return await message.edit(self.strings("no_valid"))
try: created_packs_links = []
await self._client(CreateStickerSetRequest( pack_number = 1
user_id="me", for i in range(0, len(all_emojis), EMOJI_LIMIT):
title=title, current_pack_emojis = all_emojis[i : i + EMOJI_LIMIT]
short_name=short_name, short_name, full_title = await self._create_sticker_pack(message, current_pack_emojis, True, pack_number, emoji)
stickers=stickers, if short_name == "already_exists":
emojis=True await message.edit(self.strings("already"))
)) shutil.rmtree(tmp_dir, ignore_errors=True)
await message.edit(self.strings("done_emoji_pack").format(short_name)) return
except PackShortNameOccupiedError: elif short_name:
await message.edit(self.strings("already")) created_packs_links.append(f"<a href=\'https://t.me/addstickers/{short_name}\\''>{full_title}</a>")
except Exception as e: pack_number += 1
error_details = f"❌ Ошибка создания эмодзи-пака:\n<code>{type(e).__name__}: {e}</code>\n"
error_details += f"Пак: {short_name}\nСмайликов: {len(stickers)}\n" if created_packs_links:
if files: if len(created_packs_links) == 1:
error_details += f"Последний файл: {files[-1]}\n" sn = created_packs_links[0].split('/')[-1].split("'")[0]
try: await message.edit(self.strings("done_emoji_pack").format(sn))
error_details += f"Размер: {Image.open(files[-1]).size}\n" else:
error_details += f"Вес: {os.path.getsize(files[-1])} байт" await message.edit(self.strings("done_emoji_packs").format("\n".join(created_packs_links)))
except: else:
pass await message.edit(self.strings("no_valid"))
await message.edit(error_details)
logger.exception("Error creating emoji pack") shutil.rmtree(tmp_dir, ignore_errors=True)
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)

View File

@@ -27,4 +27,5 @@ github
stream stream
placeholders+ placeholders+
PyInstall PyInstall
IwaAnimation IwaAnimation
lateban

View File

@@ -0,0 +1,320 @@
# ______ ___ ___ _ _
# ____ | ___ \ | \/ | | | | |
# / __ \| |_/ / _| . . | ___ __| |_ _| | ___
# / / _` | __/ | | | |\/| |/ _ \ / _` | | | | |/ _ \
# | | (_| | | | |_| | | | | (_) | (_| | |_| | | __/
# \ \__,_\_| \__, \_| |_/\___/ \__,_|\__,_|_|\___|
# \____/ __/ |
# |___/
# На модуль распространяется лицензия "GNU General Public License v3.0"
# https://github.com/all-licenses/GNU-General-Public-License-v3.0
# meta developer: @pymodule
import asyncio
import logging
from datetime import datetime, timezone
from herokutl.tl.functions.channels import (
EditBannedRequest,
GetParticipantsRequest,
)
from herokutl.tl.types import (
ChatBannedRights,
ChannelParticipantsSearch,
MessageService,
MessageActionChatAddUser,
MessageActionChatJoinedByLink,
MessageActionChatJoinedByRequest,
)
from .. import loader, utils
logger = logging.getLogger(__name__)
_BAN = ChatBannedRights(until_date=None, view_messages=True)
@loader.tds
class LateBanMod(loader.Module):
"""Ban all members who joined the chat after a specified date/time"""
strings = {
"name": "LateBan",
"no_args": (
"❌ Specify date/time:\n"
"<code>.lateban DD.MM.YYYY</code>\n"
"<code>.lateban DD.MM.YYYY HH:MM</code>\n"
"<code>.lateban HH:MM</code> — today"
),
"bad_date": (
"❌ Invalid format. Use <code>DD.MM.YYYY</code>, "
"<code>DD.MM.YYYY HH:MM</code> or <code>HH:MM</code>"
),
"not_chat": "❌ Only works in supergroups",
"no_rights": "❌ No permission to ban members",
"scanning": "🔍 Scanning members who joined after <b>{dt}</b>...",
"confirm": (
"⚠️ Found <b>{count}</b> members who joined after <b>{dt}</b>.\n\n"
"Confirm ban:"
),
"btn_ban": "✅ Ban {count} members",
"btn_cancel": "❌ Cancel",
"banning": "⏳ Banning {count} members...",
"progress": "⏳ Banned {done}/{total}...",
"done": (
"✅ Banned: <b>{banned}</b>\n"
"Skipped (errors/bots): <b>{skipped}</b>\n"
"Service messages deleted: <b>{deleted}</b>"
),
"nobody": "✅ No members found who joined after <b>{dt}</b>.",
}
strings_ru = {
"name": "LateBan",
"_cls_doc": "Заблокируйте всех участников, присоединившихся к чату после указанной даты/времени.",
"no_args": (
"❌ Укажи дату/время:\n"
"<code>.lateban DD.MM.YYYY</code>\n"
"<code>.lateban DD.MM.YYYY HH:MM</code>\n"
"<code>.lateban HH:MM</code>"
),
"bad_date": (
"❌ Неверный формат. Используй <code>DD.MM.YYYY</code>, "
"<code>DD.MM.YYYY HH:MM</code> или <code>HH:MM</code>"
),
"not_chat": "❌ Команда работает только в супергруппах",
"no_rights": "❌ Нет прав на бан участников",
"scanning": "🔍 Сканирую участников, вступивших после <b>{dt}</b>...",
"confirm": (
"⚠️ Найдено <b>{count}</b> участников, вступивших после <b>{dt}</b>.\n\n"
"Подтверди бан:"
),
"btn_ban": "✅ Забанить {count} участников",
"btn_cancel": "❌ Отмена",
"banning": "⏳ Баню {count} участников...",
"progress": "⏳ Забанено {done}/{total}...",
"done": (
"✅ Забанено: <b>{banned}</b>\n"
"Пропущено (ошибки/боты): <b>{skipped}</b>\n"
"Удалено сервисных сообщений: <b>{deleted}</b>"
),
"nobody": "✅ Участников, вступивших после <b>{dt}</b>, не найдено.",
}
async def client_ready(self):
pass
@loader.command(ru_doc="<DD.MM.YYYY [HH:MM] | HH:MM> - Забанить всех, кто присоединился после определённой даты/времени.")
async def latebancmd(self, message):
"""<DD.MM.YYYY [HH:MM] | HH:MM> — ban all who joined after this date/time"""
args = utils.get_args_raw(message).strip()
if not args:
return await utils.answer(message, self.strings["no_args"])
cutoff = _parse_dt(args)
if cutoff is None:
return await utils.answer(message, self.strings["bad_date"])
chat = await message.get_chat()
if not getattr(chat, "megagroup", False) and not getattr(chat, "gigagroup", False):
return await utils.answer(message, self.strings["not_chat"])
me = await self._client.get_me()
perms = await self._client.get_permissions(chat, me)
if not getattr(perms, "ban_users", False):
return await utils.answer(message, self.strings["no_rights"])
dt_str = cutoff.strftime("%d.%m.%Y %H:%M")
await utils.answer(message, self.strings["scanning"].format(dt=dt_str))
targets = await self._collect_targets(chat, cutoff, me.id)
if not targets:
return await utils.answer(message, self.strings["nobody"].format(dt=dt_str))
await self.inline.form(
message=message,
text=self.strings["confirm"].format(count=len(targets), dt=dt_str),
reply_markup=[[
{
"text": self.strings["btn_ban"].format(count=len(targets)),
"callback": self._do_ban,
"args": (chat, targets, dt_str, cutoff),
},
{
"text": self.strings["btn_cancel"],
"callback": self._cancel,
},
]],
force_me=True,
)
async def _collect_targets(self, chat, cutoff: datetime, my_id: int) -> list:
targets = []
offset = 0
limit = 200
while True:
res = await self._client(GetParticipantsRequest(
channel=chat,
filter=ChannelParticipantsSearch(""),
offset=offset,
limit=limit,
hash=0,
))
if not res.users:
break
users_map = {u.id: u for u in res.users}
for p in res.participants:
joined = getattr(p, "date", None)
if joined is None:
continue
if joined.tzinfo is None:
joined = joined.replace(tzinfo=timezone.utc)
if joined <= cutoff:
continue
uid = p.user_id
user = users_map.get(uid)
if not user or user.id == my_id:
continue
if getattr(user, "bot", False):
continue
if p.__class__.__name__ in ("ChannelParticipantAdmin", "ChannelParticipantCreator"):
continue
targets.append(uid)
if len(res.participants) < limit:
break
offset += limit
await asyncio.sleep(0.3)
return targets
async def _do_ban(self, call, chat, targets: list, dt_str: str, cutoff: datetime):
await call.edit(self.strings["banning"].format(count=len(targets)))
banned = 0
skipped = 0
banned_ids = set()
for i, uid in enumerate(targets, 1):
try:
await self._client(EditBannedRequest(chat, uid, _BAN))
banned += 1
banned_ids.add(uid)
except Exception as e:
logger.warning("LateBan: skip %s%s", uid, e)
skipped += 1
if i % 10 == 0:
try:
await call.edit(
self.strings["progress"].format(done=i, total=len(targets))
)
except Exception:
pass
await asyncio.sleep(0.4)
deleted = await self._delete_join_messages(chat, banned_ids, cutoff)
await call.edit(self.strings["done"].format(
banned=banned, skipped=skipped, deleted=deleted
))
async def _delete_join_messages(
self, chat, banned_ids: set, cutoff: datetime
) -> int:
_JOIN_ACTIONS = (
MessageActionChatAddUser,
MessageActionChatJoinedByLink,
MessageActionChatJoinedByRequest,
)
to_delete = []
try:
async for msg in self._client.iter_messages(
chat,
filter=MessageService,
reverse=False,
limit=None,
offset_date=None,
):
ts = msg.date
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
if ts < cutoff:
break
action = getattr(msg, "action", None)
if not isinstance(action, _JOIN_ACTIONS):
continue
if isinstance(action, MessageActionChatAddUser):
if any(uid in banned_ids for uid in action.users):
to_delete.append(msg.id)
else:
sender_id = getattr(msg, "from_id", None)
if sender_id is not None:
uid = getattr(sender_id, "user_id", None)
if uid in banned_ids:
to_delete.append(msg.id)
except Exception as e:
logger.warning("LateBan: failed to scan service messages — %s", e)
return 0
deleted = 0
for chunk in _chunks(to_delete, 100):
try:
await self._client.delete_messages(chat, chunk)
deleted += len(chunk)
except Exception as e:
logger.warning("LateBan: delete chunk failed — %s", e)
await asyncio.sleep(0.2)
return deleted
async def _cancel(self, call):
await call.delete()
def _parse_dt(raw: str) -> datetime | None:
"""
Supported formats:
DD.MM.YYYY → 00:00 UTC
DD.MM.YYYY HH:MM → HH:MM UTC
HH:MM → today HH:MM UTC
"""
raw = raw.strip()
today = datetime.now(timezone.utc).date()
try:
return datetime.strptime(raw, "%d.%m.%Y %H:%M").replace(tzinfo=timezone.utc)
except ValueError:
pass
try:
return datetime.strptime(raw, "%d.%m.%Y").replace(tzinfo=timezone.utc)
except ValueError:
pass
try:
t = datetime.strptime(raw, "%H:%M").time()
return datetime(
today.year, today.month, today.day,
t.hour, t.minute, tzinfo=timezone.utc,
)
except ValueError:
pass
return None
def _chunks(lst: list, n: int):
for i in range(0, len(lst), n):
yield lst[i:i + n]

View File

@@ -57,6 +57,12 @@
"gifts":[ "gifts":[
{"id": 5969796561943660080, "emoji": "🧸", "name": "Пасхальный мишка", "price": 50} {"id": 5969796561943660080, "emoji": "🧸", "name": "Пасхальный мишка", "price": 50}
] ]
},
"may_1th": {
"name": "🛠 1 Мая",
"gifts":[
{"id": 6026193266406327981, "emoji": "🧸", "name": "1 Мая мишка", "price": 50}
]
} }
} }
} }

130169
modules.json

File diff suppressed because it is too large Load Diff

View File

@@ -6,11 +6,6 @@
# |_|\_\___| |_| |_|\___/ \__,_|___/ # |_|\_\___| |_| |_|\___/ \__,_|___/
# @ke_mods # @ke_mods
# ======================================= # =======================================
#
# LICENSE: CC BY-ND 4.0 (Attribution-NoDerivatives 4.0 International)
# --------------------------------------
# https://creativecommons.org/licenses/by-nd/4.0/legalcode
# =======================================
# meta developer: @ke_mods # meta developer: @ke_mods

View File

@@ -6,11 +6,6 @@
# |_|\_\___| |_| |_|\___/ \__,_|___/ # |_|\_\___| |_| |_|\___/ \__,_|___/
# @ke_mods # @ke_mods
# ======================================= # =======================================
#
# LICENSE: CC BY-ND 4.0 (Attribution-NoDerivatives 4.0 International)
# --------------------------------------
# https://creativecommons.org/licenses/by-nd/4.0/legalcode
# =======================================
# meta developer: @ke_mods # meta developer: @ke_mods
@@ -44,5 +39,5 @@ class NeofetchMod(loader.Module):
await utils.answer(message, f"<pre>{utils.escape_html(output)}</pre>") await utils.answer(message, f"<pre>{utils.escape_html(output)}</pre>")
except FileNotFoundError: except FileNotFoundError:
await utils.answer(message, self.strings("not_installed")) await utils.answer(message, self.strings["not_installed"])

View File

@@ -6,11 +6,6 @@
# |_|\_\___| |_| |_|\___/ \__,_|___/ # |_|\_\___| |_| |_|\___/ \__,_|___/
# @ke_mods # @ke_mods
# ======================================= # =======================================
#
# LICENSE: CC BY-ND 4.0 (Attribution-NoDerivatives 4.0 International)
# --------------------------------------
# https://creativecommons.org/licenses/by-nd/4.0/legalcode
# =======================================
# meta developer: @ke_mods # meta developer: @ke_mods
# requires: pillow # requires: pillow
@@ -95,17 +90,17 @@ class PicToStoriesMod(loader.Module):
args = utils.get_args_raw(message) args = utils.get_args_raw(message)
reply = await message.get_reply_message() reply = await message.get_reply_message()
if not reply or not reply.media: if not reply or not reply.media:
await utils.answer(message, self.strings("no_rep")) await utils.answer(message, self.strings["no_rep"])
return return
try: try:
image_bytes = await reply.download_media(file=bytes) image_bytes = await reply.download_media(file=bytes)
img = Image.open(io.BytesIO(image_bytes)) img = Image.open(io.BytesIO(image_bytes))
except Exception as e: except Exception as e:
await utils.answer(message, self.strings("err").format(e)) await utils.answer(message, self.strings["err"].format(e))
return return
await utils.answer(message, self.strings("work")) await utils.answer(message, self.strings["work"])
w, h = img.size w, h = img.size
curr_ratio = w / h curr_ratio = w / h
@@ -208,4 +203,4 @@ class PicToStoriesMod(loader.Module):
) )
) )
await utils.answer(message, self.strings("done")) await utils.answer(message, self.strings["done"])

View File

@@ -6,11 +6,6 @@
# |_|\_\___| |_| |_|\___/ \__,_|___/ # |_|\_\___| |_| |_|\___/ \__,_|___/
# @ke_mods # @ke_mods
# ======================================= # =======================================
#
# LICENSE: CC BY-ND 4.0 (Attribution-NoDerivatives 4.0 International)
# --------------------------------------
# https://creativecommons.org/licenses/by-nd/4.0/legalcode
# =======================================
# meta developer: @ke_mods # meta developer: @ke_mods
# requires: pillow # requires: pillow
@@ -55,23 +50,13 @@ class RandomAnimePicMod(loader.Module):
IMAGES_API_URL = "https://api.nekosapi.com/v4/images" IMAGES_API_URL = "https://api.nekosapi.com/v4/images"
CATEGORIES_SCAN_LIMIT = 500 CATEGORIES_SCAN_LIMIT = 500
def __init__(self):
self.config = loader.ModuleConfig(
loader.ConfigValue(
"category",
"",
"Category",
validator=loader.validators.String(),
),
)
@loader.command(ru_doc="- получить рандомную аниме-картинку 👀") @loader.command(ru_doc="- получить рандомную аниме-картинку 👀")
async def rapiccmd(self, message): async def rapiccmd(self, message):
"""- fetch random anime-pic 👀""" """- fetch random anime-pic 👀"""
await utils.answer(message, self.strings("loading")) await utils.answer(message, self.strings["loading"])
try: try:
category = self.config["category"].strip() category = await utils.get_args_raw().strip()
def fetch_image(): def fetch_image():
params = {"limit": 1, "rating": ["safe"]} params = {"limit": 1, "rating": ["safe"]}
@@ -111,7 +96,7 @@ class RandomAnimePicMod(loader.Module):
url, file = await asyncio.to_thread(fetch_image) url, file = await asyncio.to_thread(fetch_image)
await utils.answer( await utils.answer(
message, message,
self.strings("img").format(url), self.strings["img"].format(url),
file=file file=file
) )
@@ -120,12 +105,12 @@ class RandomAnimePicMod(loader.Module):
"Error fetching random anime pic: %s", "Error fetching random anime pic: %s",
traceback.format_exc(), traceback.format_exc(),
) )
await utils.answer(message, self.strings("error")) await utils.answer(message, self.strings["error"])
@loader.command(ru_doc="- получить список категорий из API 👀") @loader.command(ru_doc="- получить список категорий из API 👀")
async def racategoriescmd(self, message): async def racategoriescmd(self, message):
"""- fetch categories from api 👀""" """- fetch categories from api 👀"""
await utils.answer(message, self.strings("categories_loading")) await utils.answer(message, self.strings["categories_loading"])
try: try:
def fetch_categories() -> list[str]: def fetch_categories() -> list[str]:
@@ -162,15 +147,15 @@ class RandomAnimePicMod(loader.Module):
categories = await asyncio.to_thread(fetch_categories) categories = await asyncio.to_thread(fetch_categories)
if not categories: if not categories:
await utils.answer(message, self.strings("no_categories")) await utils.answer(message, self.strings["no_categories"])
return return
formatted_categories = "\n".join( formatted_categories = ", ".join(
f"<code>{category}</code>" for category in categories f"<code>{category}</code>" for category in categories
) )
await utils.answer( await utils.answer(
message, message,
self.strings("categories").format(formatted_categories), self.strings["categories"].format(formatted_categories),
) )
except Exception: except Exception:
@@ -178,4 +163,4 @@ class RandomAnimePicMod(loader.Module):
"Error fetching categories: %s", "Error fetching categories: %s",
traceback.format_exc(), traceback.format_exc(),
) )
await utils.answer(message, self.strings("error")) await utils.answer(message, self.strings["error"])

File diff suppressed because it is too large Load Diff

View File

@@ -6,11 +6,6 @@
# |_|\_\___| |_| |_|\___/ \__,_|___/ # |_|\_\___| |_| |_|\___/ \__,_|___/
# @ke_mods # @ke_mods
# ======================================= # =======================================
#
# LICENSE: CC BY-ND 4.0 (Attribution-NoDerivatives 4.0 International)
# --------------------------------------
# https://creativecommons.org/licenses/by-nd/4.0/legalcode
# =======================================
# meta developer: @ke_mods # meta developer: @ke_mods
@@ -43,10 +38,10 @@ class UnbanAllMod(loader.Module):
chat = await message.get_chat() chat = await message.get_chat()
if not chat.admin_rights and not chat.creator: if not chat.admin_rights and not chat.creator:
await utils.answer(message, self.strings("no_rights")) await utils.answer(message, self.strings["no_rights"])
return return
await utils.answer(message, self.strings("unban_in_process")) await utils.answer(message, self.strings["unban_in_process"])
no_banned = True no_banned = True
@@ -64,11 +59,11 @@ class UnbanAllMod(loader.Module):
)) ))
except Exception as e: except Exception as e:
await utils.answer(message, self.strings("error_occured").format(user.id, e)) await utils.answer(message, self.strings["error_occured"].format(user.id, e))
pass pass
if no_banned: if no_banned:
await utils.answer(message, self.strings("no_banned")) await utils.answer(message, self.strings["no_banned"])
return return
await utils.answer(message, self.strings("success")) await utils.answer(message, self.strings["success"])

View File

@@ -6,11 +6,6 @@
# |_|\_\___| |_| |_|\___/ \__,_|___/ # |_|\_\___| |_| |_|\___/ \__,_|___/
# @ke_mods # @ke_mods
# ======================================= # =======================================
#
# LICENSE: CC BY-ND 4.0 (Attribution-NoDerivatives 4.0 International)
# --------------------------------------
# https://creativecommons.org/licenses/by-nd/4.0/legalcode
# =======================================
# meta developer: @ke_mods # meta developer: @ke_mods
# scope: ffmpeg # scope: ffmpeg

View File

@@ -1,4 +1,4 @@
__version__ = (1, 2, 0, 0) __version__ = (1, 3, 0, 0)
# This file is a part of Hikka Userbot! # This file is a part of Hikka Userbot!
# This product includes software developed by t.me/Fl1yd and t.me/spypm. # This product includes software developed by t.me/Fl1yd and t.me/spypm.
@@ -19,6 +19,10 @@ __version__ = (1, 2, 0, 0)
# - Added: Proxy for users from RF # - Added: Proxy for users from RF
# - Fixed: Correct reply author resolving for forwarded messages # - Fixed: Correct reply author resolving for forwarded messages
# Changelog v1.3:
# - Added: Message grouping for consecutive messages from the same user (hides avatar/name)
# - Changed: Replaced RU endpoint logic with direct proxy support via module config
# █▄█ █░█ █▀▄▀█ █▀▄▀█ █▄█   █▀▄▀█ █▀█ █▀▄ █▀ # █▄█ █░█ █▀▄▀█ █▀▄▀█ █▄█   █▀▄▀█ █▀█ █▀▄ █▀
# ░█░ █▄█ █░▀░█ █░▀░█ ░█░   █░▀░█ █▄█ █▄▀ ▄█ # ░█░ █▄█ █░▀░█ █░▀░█ ░█░   █░▀░█ █▄█ █▄▀ ▄█
@@ -155,9 +159,10 @@ class Dick:
return None return None
@staticmethod @staticmethod
async def post(url: str, data: dict): async def post(url: str, data: dict, proxy: Optional[str] = None):
try: try:
return await utils.run_sync(requests.post, url, json=data, timeout=30) px = {"http": proxy, "https": proxy} if proxy else None
return await utils.run_sync(requests.post, url, json=data, timeout=30, proxies=px)
except Exception: except Exception:
return None return None
@@ -199,12 +204,8 @@ class Quotes(loader.Module):
loader.ConfigValue("endpoint","https://kok.gay/gayotes/generate", loader.ConfigValue("endpoint","https://kok.gay/gayotes/generate",
lambda:"URL API-эндпоинта (можешь поднять локально - github.com/yummy1gay/quote-api)", lambda:"URL API-эндпоинта (можешь поднять локально - github.com/yummy1gay/quote-api)",
validator=loader.validators.Link()), validator=loader.validators.Link()),
loader.ConfigValue("use_rf_proxy", False, loader.ConfigValue("proxy", "",
lambda:'Включает прокси для РФ, если основной эндпоинт возвращает ошибку "Нетворк еррорь", и при этом сервер с юзерботом находится в России или ты сам сидишь в России с ограниченным доступом к зарубежным ресурсам (Termux / UserLAnd)', lambda:"Прокси для обхода блокировок (например: http://user:pass@ip:port). Оставь пустым, если не нужно."))
validator=loader.validators.Boolean()),
loader.ConfigValue("rf_endpoint", "https://ru.kok.gay/gayotes/generate",
lambda:"URL API-эндпоинта для РФ",
validator=loader.validators.Link()))
async def client_ready(self, client, db): async def client_ready(self, client, db):
self.client=client; self.db=db self.client=client; self.db=db
@@ -236,11 +237,11 @@ class Quotes(loader.Module):
"format": "webp" if not doc else "png", "type": self.config["type"]} "format": "webp" if not doc else "png", "type": self.config["type"]}
await utils.answer(st,self.strings["api_processing"]) await utils.answer(st,self.strings["api_processing"])
endpoint=self.config['rf_endpoint'] if self.config['use_rf_proxy'] else self.config['endpoint'] prx = self.config["proxy"] if self.config["proxy"] else None
r=await Dick.post(f"{endpoint}.webp",pay) r=await Dick.post(f"{self.config['endpoint']}.webp",pay,proxy=prx)
if not r or r.status_code!=200: if not r or r.status_code!=200:
try: err=r.json().get("error",f"HTTP {r.status_code}") if r else "Нетворк еррорь (попробуй включить <code>use_rf_proxy</code> в конфиге)" try: err=r.json().get("error",f"HTTP {r.status_code}") if r else "Нетворк еррорь (попробуй указать прокси в конфиге)"
except Exception: err=f"HTTP {r.status_code}" if r else "Нетворк еррорь (попробуй включить <code>use_rf_proxy</code> в конфиге)" except Exception: err=f"HTTP {r.status_code}" if r else "Нетворк еррорь (попробуй указать прокси в конфиге)"
return await utils.answer(st,self.strings["api_error"].format(err)) return await utils.answer(st,self.strings["api_error"].format(err))
buf=io.BytesIO(r.content); buf.name="YgQuote"+(".png" if doc else ".webp") buf=io.BytesIO(r.content); buf.name="YgQuote"+(".png" if doc else ".webp")
@@ -270,11 +271,11 @@ class Quotes(loader.Module):
"format": "webp","type":self.config["type"]} "format": "webp","type":self.config["type"]}
await utils.answer(st,self.strings["api_processing"]) await utils.answer(st,self.strings["api_processing"])
endpoint=self.config['rf_endpoint'] if self.config['use_rf_proxy'] else self.config['endpoint'] prx = self.config["proxy"] if self.config["proxy"] else None
r=await Dick.post(f"{endpoint}.webp",dickk) r=await Dick.post(f"{self.config['endpoint']}.webp",dickk,proxy=prx)
if not r or r.status_code!=200: if not r or r.status_code!=200:
try: err=r.json().get("error",f"HTTP {r.status_code}") if r else "Нетворк еррорь (попробуй включить <code>use_rf_proxy</code> в конфиге)" try: err=r.json().get("error",f"HTTP {r.status_code}") if r else "Нетворк еррорь (попробуй указать прокси в конфиге)"
except Exception: err=f"HTTP {r.status_code}" if r else "Нетворк еррорь (попробуй включить <code>use_rf_proxy</code> в конфиге)" except Exception: err=f"HTTP {r.status_code}" if r else "Нетворк еррорь (попробуй указать прокси в конфиге)"
return await utils.answer(st,self.strings["api_error"].format(err)) return await utils.answer(st,self.strings["api_error"].format(err))
buf=io.BytesIO(r.content); buf.name="YgQuote.webp" buf=io.BytesIO(r.content); buf.name="YgQuote.webp"
@@ -290,12 +291,18 @@ class Quotes(loader.Module):
return None return None
out: List[dict]=[] out: List[dict]=[]
prev_sender_id = None
for mm in lst: for mm in lst:
try: try:
u=await self.who(mm) u=await self.who(mm)
if not u: continue if not u: continue
current_sender_id = getattr(u,"id",0)
is_chained = (current_sender_id == prev_sender_id) if current_sender_id else False
name=telethon.utils.get_display_name(u); f,l=Dick.split(name) name=telethon.utils.get_display_name(u); f,l=Dick.split(name)
ava=await Dick.ava(self.client,getattr(u,"id",0)) if getattr(u,"id",None) else None
ava = await Dick.ava(self.client,current_sender_id) if (not is_chained and current_sender_id) else None
rb=None rb=None
try: try:
@@ -315,10 +322,16 @@ class Quotes(loader.Module):
txt=mm.raw_text or ""; ad=Dick.desc(mm) txt=mm.raw_text or ""; ad=Dick.desc(mm)
if ad: txt=f"{txt}\n\n{ad}" if txt else ad if ad: txt=f"{txt}\n\n{ad}" if txt else ad
item={"from":{"id":getattr(u,"id", 0),"first_name":getattr(u,"first_name","") or f,"last_name":getattr(u,"last_name","") or l, if is_chained:
"username":getattr(u,"username",None),"name":name,"photo":{"url":ava} if ava else {}}, item={"from":{"id":current_sender_id,"name":""},
"text":txt,"entities":Dick.ents(mm.entities),"avatar":True} "text":txt,"entities":Dick.ents(mm.entities),"avatar":False}
else:
item={"from":{"id":current_sender_id,"first_name":getattr(u,"first_name","") or f,"last_name":getattr(u,"last_name","") or l,
"username":getattr(u,"username",None),"name":name,"photo":{"url":ava} if ava else {}},
"text":txt,"entities":Dick.ents(mm.entities),"avatar":True}
es=getattr(u,"emoji_status",None)
if getattr(es,"document_id",None): item["from"]["emoji_status"]=str(es.document_id)
try: try:
if mm.voice: if mm.voice:
a = next((a for a in mm.voice.attributes or [] a = next((a for a in mm.voice.attributes or []
@@ -327,11 +340,10 @@ class Quotes(loader.Module):
except Exception: pass except Exception: pass
if med: item["voice" if "voice" in med else "media"] = med.get("voice", med) if med: item["voice" if "voice" in med else "media"] = med.get("voice", med)
es=getattr(u,"emoji_status",None)
if getattr(es,"document_id",None): item["from"]["emoji_status"]=str(es.document_id)
if rb: item["replyMessage"]=rb if rb: item["replyMessage"]=rb
out.append(item) out.append(item)
prev_sender_id = current_sender_id
except Exception: continue except Exception: continue
return out return out
@@ -378,6 +390,8 @@ class Quotes(loader.Module):
return await self.fake(f"{getattr(u,'id','')} {args}", None) return await self.fake(f"{getattr(u,'id','')} {args}", None)
out: List[dict]=[] out: List[dict]=[]
prev_sender_id = None
for part in args.split("; "): for part in args.split("; "):
try: try:
rb=None rb=None
@@ -388,22 +402,32 @@ class Quotes(loader.Module):
if not u1: continue if not u1: continue
txt1, ents1 = html.parse(t1) if t1 else ("", []) txt1, ents1 = html.parse(t1) if t1 else ("", [])
current_sender_id = u1.id
is_chained = (current_sender_id == prev_sender_id)
name=telethon.utils.get_display_name(u1); f,l=Dick.split(name) name=telethon.utils.get_display_name(u1); f,l=Dick.split(name)
ava=await Dick.ava(self.client,u1.id)
ava = await Dick.ava(self.client,u1.id) if not is_chained else None
if u2: if u2:
txt2, ents2 = html.parse(t2) if t2 else ("", []) txt2, ents2 = html.parse(t2) if t2 else ("", [])
name2=telethon.utils.get_display_name(u2); ava2=await Dick.ava(self.client,u2.id) name2=telethon.utils.get_display_name(u2); ava2=await Dick.ava(self.client,u2.id)
rb={"name":name2,"text":txt2,"entities":Dick.ents(ents2),"chatId":u2.id,"from":{"name":name2,"photo":{"url":ava2} if ava2 else {}}} rb={"name":name2,"text":txt2,"entities":Dick.ents(ents2),"chatId":u2.id,"from":{"name":name2,"photo":{"url":ava2} if ava2 else {}}}
msg={"from":{"id":u1.id,"first_name":getattr(u1,"first_name","") or f,"last_name":getattr(u1,"last_name","") or l, if is_chained:
"username":getattr(u1,"username",None),"name":name,"photo":{"url":ava} if ava else {}}, msg={"from":{"id":current_sender_id,"name":""},
"text":txt1,"entities":Dick.ents(ents1), "avatar":True} "text":txt1,"entities":Dick.ents(ents1), "avatar":False}
else:
es=getattr(u1,"emoji_status",None) msg={"from":{"id":current_sender_id,"first_name":getattr(u1,"first_name","") or f,"last_name":getattr(u1,"last_name","") or l,
if getattr(es,"document_id",None): msg["from"]["emoji_status"]=str(es.document_id) "username":getattr(u1,"username",None),"name":name,"photo":{"url":ava} if ava else {}},
"text":txt1,"entities":Dick.ents(ents1), "avatar":True}
es=getattr(u1,"emoji_status",None)
if getattr(es,"document_id",None): msg["from"]["emoji_status"]=str(es.document_id)
if rb: msg["replyMessage"]=rb if rb: msg["replyMessage"]=rb
out.append(msg) out.append(msg)
prev_sender_id = current_sender_id
except Exception: continue except Exception: continue
return out return out