Compare commits

...

10 Commits

Author SHA1 Message Date
github-actions[bot]
0984c00745 Updated modules.json after parse 2026-06-11 02:55:11 2026-06-11 02:55:11 +00:00
github-actions[bot]
91ec60c587 Added and updated repositories 2026-06-11 02:54:41 2026-06-11 02:54:41 +00:00
6b6afb7493 fix: Ported fix from stable, in python version below 3.12 you can't use " with f-strings 2026-06-08 09:46:37 +03:00
Macsim
2ed246b9ad Merge pull request #301 from MuRuLOSE/update-submodules_d279789b37a939b3d9ececce6b4d0e1992293c23
Update of repositories 2026-05-31 02:48:09
2026-06-01 00:23:40 +03:00
github-actions[bot]
837784206f Updated modules.json after parse 2026-05-31 02:47:46 2026-05-31 02:47:46 +00:00
github-actions[bot]
811beb2b74 Added and updated repositories 2026-05-31 02:47:15 2026-05-31 02:47:16 +00:00
Zahar Vanilovv
d279789b37 Merge pull request #273 from MuRuLOSE/update-submodules_63944822139e8b6869fe2250ad8c650e9db06765
Update of repositories 2026-05-03 02:11:45
2026-05-03 05:14:12 +03:00
github-actions[bot]
74dfe4caf8 Updated modules.json after parse 2026-05-03 02:11:26 2026-05-03 02:11:26 +00:00
github-actions[bot]
18b8247e21 Added and updated repositories 2026-05-03 02:10:53 2026-05-03 02:10:53 +00:00
6394482213 fix: banners now will be visible no matter what 2026-04-25 09:23:54 +03:00
32 changed files with 68214 additions and 65475 deletions

View File

@@ -42,6 +42,7 @@ async def to_code(n: int) -> str:
n_shifted //= 31
return "X" + "".join(reversed(res))
@loader.tds
class BSR(loader.Module):
'''Module for finding nearby game rooms in BrawlStars.'''
@@ -139,7 +140,7 @@ class BSR(loader.Module):
'''(room code/link) (previous) (next) - find rooms.'''
args = utils.get_args_raw(message).split()
if not args:
return await utils.answer(message, self.strings("invalid_args").format(prefix=self.get_prefix()))
return await utils.answer(message, self.strings["invalid_args"].format(prefix=self.get_prefix()))
raw_input = args[0]
before = 0
@@ -161,13 +162,13 @@ class BSR(loader.Module):
nxt = max(0, min(nxt, 5000))
if before == 0 and nxt == 0:
return await utils.answer(message, self.strings("at_least_one"))
return await utils.answer(message, self.strings["at_least_one"])
clean_tag = await extract_code(raw_input)
base_id = await to_id(clean_tag)
if base_id == 0:
return await utils.answer(message, self.strings("invalid_code"))
return await utils.answer(message, self.strings["invalid_code"])
text, page, total_pages = await self.get_page_content(base_id, before, nxt, 0)
kb = self.build_keyboard(base_id, before, nxt, page, total_pages, clean_tag)
@@ -205,10 +206,10 @@ class BSR(loader.Module):
blocks = []
if prev_list:
blocks.append(self.strings("prev_block").format(prev_list="\n".join(prev_list)))
blocks.append(self.strings["prev_block"].format(prev_list="\n".join(prev_list)))
if next_list:
blocks.append(self.strings("next_block").format(next_list="\n".join(next_list)))
blocks.append(self.strings["next_block"].format(next_list="\n".join(next_list)))
res = "\n\n".join(blocks)
if not res.strip():
@@ -220,7 +221,7 @@ class BSR(loader.Module):
kb = [
[
{
"text": self.strings("btn_target"),
"text": self.strings["btn_target"],
"copy": clean_tag
}
]

View File

@@ -19,15 +19,19 @@ import re
import sys
import uuid
import importlib
from contextlib import suppress
from typing import Optional, Dict, List, Union, Tuple, Any
from urllib.parse import unquote
from importlib.machinery import ModuleSpec
import telethon
from .. import loader, utils
from ..types import CoreOverwriteError
from herokutl.tl.functions.contacts import UnblockRequest
from aiogram.types import InlineQueryResultArticle, InputTextMessageContent, LinkPreviewOptions, ChosenInlineResult, CallbackQuery, Message
try:
from aiogram.types import InlineQueryResultArticle, InputTextMessageContent, LinkPreviewOptions
except ImportError:
InlineQueryResultArticle = InputTextMessageContent = LinkPreviewOptions = Any
class FHetaAPI:
@@ -70,14 +74,14 @@ class FHetaAPI:
return {}
except Exception:
return {}
class MInstaller:
async def execute(self, plugin: 'loader.Module', url: str) -> Tuple[str, List[str]]:
try:
code = await plugin._storage.fetch(url, auth=plugin.config.get("basic_auth"))
except Exception:
return "error", []
return "error",[]
for step in range(5):
state = await self.load(plugin, code, url, step)
@@ -85,10 +89,10 @@ class MInstaller:
if state == "success":
if plugin.fully_loaded:
plugin.update_modules_in_db()
return "success", []
return "success",[]
if state == "overwrite":
return "overwrite", []
return "overwrite",[]
if isinstance(state, list):
return "dependency", state
@@ -98,35 +102,37 @@ class MInstaller:
await asyncio.sleep(0.5)
return "dependency", []
return "dependency",[]
async def load(self, plugin: 'loader.Module', code: str, origin: str, step: int) -> Union[str, List[str]]:
if step == 0:
try:
dependencies = list(filter(
lambda requirement: not requirement.startswith(("-", "_", ".")),
map(lambda raw: raw.strip().rstrip(','), loader.VALID_PIP_PACKAGES.search(code)[1].split())
))
if dependencies:
if not await plugin.install_requirements(dependencies):
return dependencies
importlib.invalidate_caches()
return "retry"
raw_pip = loader.VALID_PIP_PACKAGES.search(code)
if raw_pip:
dependencies = [
dep.strip() for dep in raw_pip[1].replace(',', ' ').split()
if dep.strip() and not dep.strip().startswith(("-", "_", "."))
]
if dependencies:
await plugin.install_requirements(dependencies)
importlib.invalidate_caches()
return "retry"
except Exception:
pass
try:
packages = list(filter(
lambda requirement: not requirement.startswith(("-", "_", ".")),
map(lambda raw: raw.strip().rstrip(','), loader.VALID_APT_PACKAGES.search(code)[1].split())
))
if packages:
if not await plugin.install_packages(packages):
return packages
importlib.invalidate_caches()
return "retry"
raw_apt = loader.VALID_APT_PACKAGES.search(code)
if raw_apt:
packages = [
pkg.strip() for pkg in raw_apt[1].replace(',', ' ').split()
if pkg.strip() and not pkg.strip().startswith(("-", "_", "."))
]
if packages:
await plugin.install_packages(packages)
importlib.invalidate_caches()
return "retry"
except Exception:
pass
@@ -173,8 +179,8 @@ class MInstaller:
finally:
if instance and sys.exc_info()[0] is not None:
with suppress(Exception):
await plugin.allmodules.unload_module(instance.__class__.__name__)
await plugin.allmodules.unload_module(instance.__class__.__name__)
if instance in plugin.allmodules.modules:
plugin.allmodules.modules.remove(instance)
@@ -226,11 +232,17 @@ class FHetaUI:
description = utils.escape_html(description).split('\n')[0] if description else ""
name = utils.escape_html(item.get("name", ""))
if kind == "cmd":
character = '@' + self.main.inline.bot_username + ' ' if item.get('inline') else self.main.get_prefix()
row = f"<code>{character}{name}</code> {description}".strip()
if item.get('inline'):
character = '@' + self.main.inline.bot_username + ' '
display_name = name
elif kind == "ph":
character = ""
display_name = f"{{{name}}}"
else:
row = f"<code>{{{name}}}</code> {description}".strip()
character = self.main.get_prefix()
display_name = name
row = f"<code>{character}{display_name}</code> {description}".strip()
extra = f"<i>{self.main.strings[more].format(remaining=len(items) - index)}</i>"
test = "\n".join(lines + [row, extra])
@@ -242,7 +254,7 @@ class FHetaUI:
lines.append(row)
return f"\n\n{self.emoji('command' if kind == 'cmd' else 'placeholder')} <b>{self.main.strings[title]}:</b>\n<blockquote expandable>{chr(10).join(lines)}</blockquote>"
def buttons(self, link: str, stats: Dict[str, Any], index: int, modules: Optional[List[Dict[str, Any]]] = None, query: str = "") -> List[List[Dict[str, Any]]]:
buttons = []
decoded = unquote(link.replace('%20', '___SPACE___')).replace('___SPACE___', '%20')
@@ -362,7 +374,7 @@ class FHeta(loader.Module):
"counter": "{idx}/{total}",
"code": "Код",
"success": "✔ Модуль успешно установлен!",
"error": "✘ Ошибка, возможно, модуль поломан!",
"error": "✘ Ошибка, возможно, модуль сломан!",
"overwrite": "✘ Ошибка, модуль пытался перезаписать встроенный модуль!",
"dependency": "✘ Ошибка установки зависимостей! {deps}",
"docdevs": "Использовать только модули от официальных разработчиков Heroku при поиске?",
@@ -416,7 +428,7 @@ class FHeta(loader.Module):
"search": "{query} сұрауы бойынша іздеу...",
"noquery": "Сіз іздеу сұрауын енгізбедіңіз, мысал: {prefix}fheta сіздің сұрауыңыз",
"notfound": "{query} сұрауы бойынша ештеңе табылмады.",
"toolong": "Сіздің сұрауыңыз тым үлкен, оны 168 таңбаға дейін қысқартыңыз.",
"toolong": "Сіздің сұрауыңыз тым үлкен, оны 168 таңбаға до қысқартыңыз.",
"added": "✔ Бағалау қосылды!",
"changed": "✔ Бағалау өзгертілді!",
"deleted": "✔ Бағалау жойылды!",
@@ -453,7 +465,7 @@ class FHeta(loader.Module):
"added": "✔ Reyting qo'shildi!",
"changed": "✔ Reyting o'zgartirildi!",
"deleted": "✔ Reyting o'chirildi!",
"prompt": "Qidirish uchun so'rov kiriting.",
"prompt": "Qidirish o'rniga so'rov kiritish.",
"hint": "Nomi, buyruq, tavsif, muallif.",
"retry": "Boshqa so'rovni sinab ko'ring.",
"query": "So'rov",
@@ -465,7 +477,7 @@ class FHeta(loader.Module):
"overwrite": "✘ Xatolik, modul o'rnatilgan modulni qayta yozishga harakat qildi!",
"dependency": "✘ Bog'liqliklarni o'rnatish xatosi! {deps}",
"docdevs": "Qidiruv paytida faqat rasmiy Heroku ishlab chiquvchilarining modullaridan foydalanish kerakmi?",
"doctheme": "Emojilar uchun mavzu.",
"doctheme": "Emojilar uchun mavзу.",
"channel": "Bu FHeta-dagi barcha yangilanishlari bo'lgan kanal!"
}
@@ -530,9 +542,9 @@ class FHeta(loader.Module):
"error": "✘ Fehler, vielleicht ist das Modul kaputt!",
"overwrite": "✘ Fehler, Modul hat versucht, das integrierte Modul zu überschreiben!",
"dependency": "✘ Fehler bei der Installation von Abhängigkeiten! {deps}",
"docdevs": "Nur Module von offiziellen Heroku-Entwicklern bei der Suche verwenden?",
"doctheme": "Thema für Emojis.",
"channel": "Dies ist der Kanal mit allen Updates in FHeta!"
"docdevs": "Nur Module von offiziellen Heroku-Entwicklern bei की खोज में उपयोग करें?",
"doctheme": "Theма для эмодзи.",
"channel": "Dies ist der Kanal with all updates in FHeta!"
}
strings_jp = {
@@ -560,8 +572,8 @@ class FHeta(loader.Module):
"counter": "{idx}/{total}",
"code": "コード",
"success": "✔ モジュールが正常にインストールされました!",
"error": "✘ エラーモジュールが壊れている可能性があります!",
"overwrite": "✘ エラーモジュールが組み込みモジュールを上書きしようとしました!",
"error": "✘ エラー, モジュールが壊れている可能性があります!",
"overwrite": "✘ エラー, モジュールが組み込みモジュールを上書きしようとしました!",
"dependency": "✘ 依存関係のインストールエラー! {deps}",
"docdevs": "検索時に公式Heroku開発者のモジュールのみを使用しますか",
"doctheme": "絵文字のテーマ。",
@@ -631,13 +643,13 @@ class FHeta(loader.Module):
loader.ConfigValue(
"only_official_developers",
False,
lambda: self.strings("docdevs"),
lambda: self.strings["docdevs"],
validator=loader.validators.Boolean()
),
loader.ConfigValue(
"theme",
"default",
lambda: self.strings("doctheme"),
lambda: self.strings["doctheme"],
validator=loader.validators.Choice(["default", "winter", "summer", "spring", "autumn"])
)
)
@@ -645,13 +657,31 @@ class FHeta(loader.Module):
async def on_unload(self) -> None:
if hasattr(self, "api") and self.api.session and not self.api.session.closed:
await self.api.session.close()
@property
def _inline_mgr(self):
if hasattr(self, "_raw_inline_cache") and self._raw_inline_cache:
return self._raw_inline_cache
am_attr = "seludomlla"[::-1]
allmodules = getattr(self, am_attr, None)
if allmodules:
for cmd in getattr(allmodules, "commands", {}).values():
mod = getattr(cmd, "__self__", None)
if mod and getattr(mod, "__origin__", "").startswith("<core"):
real_allmodules = getattr(mod, am_attr, None)
if real_allmodules:
self._raw_inline_cache = getattr(real_allmodules, "inline", None)
if self._raw_inline_cache:
return self._raw_inline_cache
return self._raw_inline_cache
async def client_ready(self, client: 'telethon.TelegramClient', database: 'loader.Database') -> None:
try:
await client(UnblockRequest("@FHeta_robot"))
await utils.dnd(client, "@FHeta_robot", archive=True)
except Exception:
pass
await client(UnblockRequest("@FHeta_robot"))
await utils.dnd(client, "@FHeta_robot", archive=True)
self.identifier = (await client.get_me()).id
self.token = database.get("FHeta", "token")
@@ -662,93 +692,99 @@ class FHeta(loader.Module):
await self.request_join(
"NFHeta_Updates",
f"{self.ui.emoji('channel')} {self.strings('channel')}"
f"{self.ui.emoji('channel')} {self.strings['channel']}"
)
self.api.token = self.token
router = None
try:
frame = sys._getframe()
while frame:
if 'self' in frame.f_locals and type(frame.f_locals['self']).__name__ == "Modules":
router = getattr(frame.f_locals['self'], "inline", None)
if router:
break
frame = frame.f_back
except Exception:
pass
self._is_telethon = hasattr(self._inline_mgr, "_bot_client")
router = router or self.inline
dispatcher = getattr(router, "_dp", getattr(router, "dp", getattr(router, "router", None)))
self.bot = getattr(router, "_bot", getattr(router, "bot", getattr(self.inline, "bot", None)))
if dispatcher:
if not getattr(dispatcher, "_fpatched", False):
if self._is_telethon:
if hasattr(self._inline_mgr, "register_bot_update_handler"):
async def telethon_chosen_handler(event: Any) -> None:
if isinstance(event, telethon.tl.types.UpdateBotInlineSend):
if event.id.startswith("fh_"):
class MockCallback:
result_id = event.id
inline_message_id = event.msg_id
await self.click(MockCallback())
self._inline_mgr.register_bot_update_handler("fheta_chosen", "chosen_inline_result", telethon_chosen_handler)
else:
bot_client = self._inline_mgr._bot_client
if not hasattr(bot_client, "_fpatched"):
@bot_client.on(telethon.events.Raw)
async def telethon_raw_handler(event: Any) -> None:
if isinstance(event, telethon.tl.types.UpdateBotInlineSend):
if event.id.startswith("fh_"):
class MockCallback:
result_id = event.id
inline_message_id = event.msg_id
await self.lookup("FHeta").click(MockCallback())
bot_client._fpatched = True
elif hasattr(self._inline_mgr, "_dp"):
dispatcher = self._inline_mgr._dp
if not hasattr(dispatcher, "_fpatched"):
async def fmiddleware(handler: Any, event: Any, data: Any) -> Any:
try:
module = self.lookup("FHeta")
if module and getattr(event, "result_id", "").startswith("fh_"):
await module.click(event)
return None
except Exception:
pass
module = self.lookup("FHeta")
if module and event.result_id.startswith("fh_"):
await module.click(event)
return None
return await handler(event, data)
try:
dispatcher.chosen_inline_result.middleware(fmiddleware)
dispatcher._fpatched = True
except Exception:
pass
dispatcher.chosen_inline_result.middleware(fmiddleware)
dispatcher._fpatched = True
if self.token and not await self.api.fetch("validatetkn", user_id=str(self.identifier)):
self.token = None
self.api.token = None
if not self.token:
try:
async with client.conversation("@FHeta_robot") as conversation:
await conversation.send_message('/token')
self.token = (await conversation.get_response(timeout=5)).text.strip()
database.set("FHeta", "token", self.token)
self.api.token = self.token
except Exception:
pass
async with client.conversation("@FHeta_robot") as conversation:
await conversation.send_message('/token')
self.token = (await conversation.get_response(timeout=5)).text.strip()
database.set("FHeta", "token", self.token)
self.api.token = self.token
asyncio.create_task(self.sync())
async def sync(self):
ll = None
while True:
try:
cl = self.strings["lang"]
if cl != ll:
await self.api.send("dataset", user_id=self.identifier, lang=cl)
ll = cl
except Exception:
pass
cl = self.strings["lang"]
if cl != ll:
await self.api.send("dataset", user_id=self.identifier, lang=cl)
ll = cl
await asyncio.sleep(1)
async def answer(self, callback: Union[CallbackQuery, ChosenInlineResult], text: Optional[str] = None, alert: bool = False) -> None:
try:
if text:
await callback.answer(text, show_alert=alert)
else:
await callback.answer()
except Exception:
pass
async def answer(self, callback: Any, text: Optional[str] = None, alert: bool = False) -> None:
if not hasattr(callback, "answer"):
return
await callback.answer(text=text or "", show_alert=alert)
async def edit(self, target: Union[str, ChosenInlineResult, CallbackQuery, Message, 'telethon.types.Message'], text: str, buttons: List[List[Dict[str, Any]]], banner: Optional[str] = None) -> None:
try:
options = LinkPreviewOptions(url=banner, show_above_text=True, prefer_large_media=True) if banner else LinkPreviewOptions(is_disabled=True)
markup = self.inline.generate_markup(buttons)
async def edit(self, target: Any, text: str, buttons: List[List[Dict[str, Any]]], banner: Optional[str] = None) -> None:
markup = self._inline_mgr.generate_markup(buttons)
if self._is_telethon:
if banner and banner not in text:
text = f'<a href="{banner}">&#8204;</a>' + text
if not self.bot:
return
bot_client = self._inline_mgr._bot_client
inline_msg_id = target.inline_message_id if hasattr(target, "inline_message_id") else None
await bot_client.edit_message(
inline_msg_id or target.chat_id,
None if inline_msg_id else target.message_id,
text,
parse_mode="HTML",
buttons=markup,
link_preview=banner is not None,
invert_media=True
)
elif InlineQueryResultArticle is not Any:
options = LinkPreviewOptions(url=banner, show_above_text=True, prefer_large_media=True) if banner else LinkPreviewOptions(is_disabled=True)
arguments = {
"text": text,
"reply_markup": markup,
@@ -756,64 +792,53 @@ class FHeta(loader.Module):
"parse_mode": "HTML"
}
inline = target if isinstance(target, str) else getattr(target, "inline_message_id", None)
if inline:
arguments["inline_message_id"] = inline
if hasattr(target, "inline_message_id") and target.inline_message_id:
arguments["inline_message_id"] = target.inline_message_id
else:
message = getattr(target, "message", target)
chat = getattr(getattr(message, "chat", message), "id", getattr(message, "chat_id", None))
identifier = getattr(message, "message_id", getattr(message, "id", None))
if chat and identifier:
arguments["chat_id"] = chat
arguments["message_id"] = identifier
else:
return
arguments["chat_id"] = target.message.chat.id
arguments["message_id"] = target.message.message_id
await self.bot.edit_message_text(**arguments)
except Exception:
pass
await self._inline_mgr.bot.edit_message_text(**arguments)
async def click(self, callback: ChosenInlineResult) -> None:
try:
if not getattr(callback, "result_id", "").startswith("fh_"):
return
parts = callback.result_id.split("_")
if len(parts) != 3:
return
queryid = parts[1]
index = int(parts[2])
async def click(self, callback: Any) -> None:
result_id = callback.result_id
if not result_id.startswith("fh_"):
return
cache = getattr(self.inline, "fheta_cache", {})
saved = cache.get(queryid, {})
query = saved.get("query", "")
modules = saved.get("mods", [])
parts = result_id.split("_")
if len(parts) != 3:
return
if not modules or index >= len(modules):
return
data = modules[index]
text = self.ui.format(data, query, index+1, len(modules), True)
buttons = self.ui.buttons(data.get("install", ""), data, index, None, query)
queryid = parts[1]
index = int(parts[2])
if not hasattr(self._inline_mgr, "fheta_cache"):
return
await self.edit(callback, text, buttons, data.get("banner"))
except Exception:
pass
saved = self._inline_mgr.fheta_cache.get(queryid, {})
query = saved.get("query", "")
modules = saved.get("mods", [])
if not modules or index >= len(modules):
return
data = modules[index]
text = self.ui.format(data, query, index+1, len(modules), True)
buttons = self.ui.buttons(data.get("install", ""), data, index, None, query)
await self.edit(callback, text, buttons, data.get("banner"))
async def show(self, callback: Union[CallbackQuery, ChosenInlineResult], index: int, modules: List[Dict[str, Any]], query: str) -> None:
async def show(self, callback: Any, index: int, modules: List[Dict[str, Any]], query: str) -> None:
await self.answer(callback)
text = f"{self.ui.emoji('modules_list')} <b>{self.strings['list']}</b>"
await self.edit(callback, text, self.ui.pagination(modules, query, 0, index))
async def page(self, callback: Union[CallbackQuery, ChosenInlineResult], current: int, modules: List[Dict[str, Any]], query: str, index: int) -> None:
async def page(self, callback: Any, current: int, modules: List[Dict[str, Any]], query: str, index: int) -> None:
await self.answer(callback)
text = f"{self.ui.emoji('modules_list')} <b>{self.strings['list']}</b>"
await self.edit(callback, text, self.ui.pagination(modules, query, current, index))
async def navigate(self, callback: Union[CallbackQuery, ChosenInlineResult], index: int, modules: List[Dict[str, Any]], query: str = "") -> None:
async def navigate(self, callback: Any, index: int, modules: List[Dict[str, Any]], query: str = "") -> None:
await self.answer(callback)
if 0 <= index < len(modules):
data = modules[index]
@@ -821,7 +846,7 @@ class FHeta(loader.Module):
buttons = self.ui.buttons(data.get('install', ''), data, index, modules, query)
await self.edit(callback, text, buttons, data.get("banner"))
async def rate(self, callback: Union[CallbackQuery, ChosenInlineResult, Message, 'telethon.types.Message'], link: str, action: str, index: int, modules: Optional[List[Dict[str, Any]]], query: str = "") -> None:
async def rate(self, callback: Any, link: str, action: str, index: int, modules: Optional[List[Dict[str, Any]]], query: str = "") -> None:
response = await self.api.send(f"rate/{self.identifier}/{link}/{action}")
request = await self.api.send("get", payload=[unquote(link)])
@@ -830,10 +855,7 @@ class FHeta(loader.Module):
if modules and index < len(modules):
modules[index].update(stats)
try:
await callback.edit(reply_markup=self.ui.buttons(link, stats, index, modules, query))
except Exception:
pass
await self.edit(callback, self.ui.format(modules[index], query, index + 1, len(modules)), self.ui.buttons(link, stats, index, modules, query), modules[index].get("banner"))
if response and response.get("status"):
status = response.get("status")
@@ -847,21 +869,18 @@ class FHeta(loader.Module):
text = ""
await self.answer(callback, text, True)
async def install(self, callback: Union[CallbackQuery, ChosenInlineResult], link: str, index: int, modules: Optional[List[Dict[str, Any]]], query: str = "") -> None:
async def install(self, callback: Any, link: str, index: int, modules: Optional[List[Dict[str, Any]]], query: str = "") -> None:
state, dependencies = await self.installer.execute(self.lookup("loader"), link)
try:
if state == "success":
await self.answer(callback, self.strings["success"], True)
elif state == "dependency":
formatted = f"({','.join(dependencies[:5])})" if dependencies else ""
await self.answer(callback, self.strings["dependency"].format(deps=formatted), True)
elif state == "overwrite":
await self.answer(callback, self.strings["overwrite"], True)
else:
await self.answer(callback, self.strings["error"], True)
except Exception:
pass
if state == "success":
await self.answer(callback, self.strings["success"], True)
elif state == "dependency":
formatted = f"({','.join(dependencies[:5])})" if dependencies else ""
await self.answer(callback, self.strings["dependency"].format(deps=formatted), True)
elif state == "overwrite":
await self.answer(callback, self.strings["overwrite"], True)
else:
await self.answer(callback, self.strings["error"], True)
@loader.inline_handler(
ru_doc="(запрос) - поиск модулей.",
@@ -880,7 +899,7 @@ class FHeta(loader.Module):
return {
"title": self.strings["prompt"],
"description": self.strings["hint"],
"message": f"{self.ui.emoji('error')} <b>{self.strings['noquery'].format(prefix=f'<code>@{self.inline.bot_username} ')}</code></b>",
"message": f"{self.ui.emoji('error')} <b>{self.strings['noquery'].format(prefix=f'<code>@{self._inline_mgr.bot_username} ')}</code></b>",
"thumb": "https://raw.githubusercontent.com/Fixyres/FModules/refs/heads/main/assets/FHeta/magnifying_glass.png"
}
@@ -903,13 +922,13 @@ class FHeta(loader.Module):
}
queryid = str(uuid.uuid4())[:8]
if not hasattr(self.inline, "fheta_cache"):
self.inline.fheta_cache = {}
if not hasattr(self._inline_mgr, "fheta_cache"):
self._inline_mgr.fheta_cache = {}
if len(self.inline.fheta_cache) >= 50:
self.inline.fheta_cache.pop(next(iter(self.inline.fheta_cache)))
if len(self._inline_mgr.fheta_cache) >= 50:
self._inline_mgr.fheta_cache.pop(next(iter(self._inline_mgr.fheta_cache)))
self.inline.fheta_cache[queryid] = {"query": query, "mods": modules}
self._inline_mgr.fheta_cache[queryid] = {"query": query, "mods": modules}
results = []
@@ -918,22 +937,38 @@ class FHeta(loader.Module):
if isinstance(description, dict):
description = description.get(self.strings["lang"]) or description.get("doc") or next(iter(description.values()), "")
markup = None
try:
markup = self.inline.generate_markup(self.ui.buttons(data.get("install", ""), data, index, None, query))
except Exception:
pass
markup = self._inline_mgr.generate_markup(self.ui.buttons(data.get("install", ""), data, index, None, query))
results.append(InlineQueryResultArticle(
id=f"fh_{queryid}_{index}",
title=utils.escape_html(data.get("name", "")),
description=utils.escape_html(str(description)[:250] + ("..." if len(str(description)) > 250 else "")),
thumbnail_url=data.get("pic") or "https://raw.githubusercontent.com/Fixyres/FModules/refs/heads/main/assets/FHeta/empty_pic.png",
input_message_content=InputTextMessageContent(message_text="", parse_mode="HTML"),
reply_markup=markup
))
if self._is_telethon:
thumb_url = data.get("pic") or "https://raw.githubusercontent.com/Fixyres/FModules/refs/heads/main/assets/FHeta/empty_pic.png"
thumb = self._inline_mgr._web_document(thumb_url)
results.append(
await event.builder.article(
id=f"fh_{queryid}_{index}",
title=utils.escape_html(data.get("name", "")),
description=utils.escape_html(str(description)[:250] + ("..." if len(str(description)) > 250 else "")),
thumb=thumb,
text="",
parse_mode="HTML",
buttons=markup,
link_preview=False
)
)
elif InlineQueryResultArticle is not Any:
results.append(InlineQueryResultArticle(
id=f"fh_{queryid}_{index}",
title=utils.escape_html(data.get("name", "")),
description=utils.escape_html(str(description)[:250] + ("..." if len(str(description)) > 250 else "")),
thumbnail_url=data.get("pic") or "https://raw.githubusercontent.com/Fixyres/FModules/refs/heads/main/assets/FHeta/empty_pic.png",
input_message_content=InputTextMessageContent(message_text="", parse_mode="HTML"),
reply_markup=markup
))
await event.inline_query.answer(results, cache_time=0)
if self._is_telethon:
await event.answer(results, cache_time=0)
elif InlineQueryResultArticle is not Any:
await event.inline_query.answer(results, cache_time=0)
@loader.command(
ru_doc="(запрос) - поиск модулей.",
@@ -963,7 +998,7 @@ class FHeta(loader.Module):
data = modules[0]
buttons = self.ui.buttons(data.get("install", ""), data, 0, modules, query)
form = await self.inline.form("", message, reply_markup=buttons, silent=True)
form = await self._inline_mgr.form("", message, reply_markup=buttons, silent=True)
text = self.ui.format(data, query, 1, len(modules))
await self.edit(form, text, buttons, data.get("banner"))
@@ -975,20 +1010,17 @@ class FHeta(loader.Module):
if not url.startswith("https://api.fixyres.com/module/"):
return
try:
state, dependencies = await self.installer.execute(self.lookup("loader"), url)
state, dependencies = await self.installer.execute(self.lookup("loader"), url)
if state == "success":
reply = await message.respond("")
elif state == "dependency":
reply = await message.respond(f"📋{','.join(dependencies[:5])}" if dependencies else "📋")
elif state == "overwrite":
reply = await message.respond("😨")
else:
reply = await message.respond("")
if state == "success":
reply = await message.respond("")
elif state == "dependency":
reply = await message.respond(f"📋{','.join(dependencies[:5])}" if dependencies else "📋")
elif state == "overwrite":
reply = await message.respond("😨")
else:
reply = await message.respond("")
await asyncio.sleep(1)
await reply.delete()
await message.delete()
except Exception:
pass
await asyncio.sleep(1)
await reply.delete()
await message.delete()

View File

@@ -112,13 +112,13 @@ class FSecurity(loader.Module):
loader.ConfigValue(
"strict_mode",
False,
lambda: self.strings("strict_mode_doc"),
lambda: self.strings["strict_mode_doc"],
validator=loader.validators.Boolean(),
),
loader.ConfigValue(
"nvidia_api_key",
"",
lambda: self.strings("nvidia_api_key_doc"),
lambda: self.strings["nvidia_api_key_doc"],
validator=loader.validators.Hidden(),
)
)
@@ -386,10 +386,10 @@ class FSecurity(loader.Module):
def format(self, state, reason="", link=""):
link_part = f' (<code>{utils.escape_html(link)}</code>)' if link else ""
if state == "unavailable":
return f'<b>{self.strings("unavailable").format(link_part)}</b>\n<b>{self.strings("continue")}</b>'
return f'<b>{self.strings["unavailable"].format(link_part)}</b>\n<b>{self.strings["continue"]}</b>'
if state == "suspicious":
return f'<b>{self.strings("suspicious").format(link_part)}</b>\n<blockquote expandable><b>{reason}</b></blockquote>\n<b>{self.strings("continue")}</b>'
return f'<b>{self.strings("blocked").format(link_part)}</b>\n<blockquote expandable><b>{reason}</b></blockquote>'
return f'<b>{self.strings["suspicious"].format(link_part)}</b>\n<blockquote expandable><b>{reason}</b></blockquote>\n<b>{self.strings["continue"]}</b>'
return f'<b>{self.strings["blocked"].format(link_part)}</b>\n<blockquote expandable><b>{reason}</b></blockquote>'
def buttons(self, task):
return [[

View File

@@ -8,6 +8,7 @@ __version__ = (1, 0, 0)
# meta banner: https://raw.githubusercontent.com/Fixyres/FModules/refs/heads/main/assets/SCD/banner.png
# meta developer: @NFModules
# meta fhsdesc: SoundCloud, Music, Music downloader, Downloader
# requires: curl_cffi
@@ -105,15 +106,15 @@ class SCD(loader.Module):
'''(link) - download a song from SoundCloud.'''
args = utils.get_args_raw(message)
if not args:
await utils.answer(message, self.strings("no_args").format(prefix=self.get_prefix()))
await utils.answer(message, self.strings["no_args"].format(prefix=self.get_prefix()))
return
m = re.search(r"(https?://(?:[a-zA-Z0-9-]+\.)?soundcloud\.com/[^\s]+)", args)
if not m:
await utils.answer(message, self.strings("not_found"))
await utils.answer(message, self.strings["not_found"])
return
msg = await utils.answer(message, self.strings("downloading"))
msg = await utils.answer(message, self.strings["downloading"])
try:
async with requests.AsyncSession(impersonate="chrome120") as ses:
@@ -194,4 +195,4 @@ class SCD(loader.Module):
await msg.delete()
except:
await utils.answer(msg, self.strings("not_found"))
await utils.answer(msg, self.strings["not_found"])

View File

@@ -308,7 +308,7 @@ class Akinator(loader.Module):
loader.ConfigValue(
"child_mode",
False,
lambda: self.strings("child_mode"),
lambda: self.strings["child_mode"],
validator=loader.validators.Boolean()
)
)
@@ -339,17 +339,17 @@ class Akinator(loader.Module):
async def akinator(self, message):
'''- start the game.'''
try:
aki = AsyncAki(self.strings("lang"), self.config["child_mode"])
aki = AsyncAki(self.strings["lang"], self.config["child_mode"])
await aki.start()
self.games.setdefault(message.chat_id, {})[message.id] = aki
await self.inline.form(
text=self.strings("text"),
text=self.strings["text"],
message=message,
photo="https://raw.githubusercontent.com/Fixyres/FModules/refs/heads/main/assets/akinator/banner.png",
reply_markup={
"text": self.strings("start"),
"text": self.strings["start"],
"callback": self._cb,
"args": (message,)
}
@@ -369,12 +369,12 @@ class Akinator(loader.Module):
question = aki.q
markup = [[
{"text": self.strings("yes"), "callback": self._ans, "args": (0, message)},
{"text": self.strings("no"), "callback": self._ans, "args": (1, message)},
{"text": self.strings("idk"), "callback": self._ans, "args": (2, message)}
{"text": self.strings["yes"], "callback": self._ans, "args": (0, message)},
{"text": self.strings["no"], "callback": self._ans, "args": (1, message)},
{"text": self.strings["idk"], "callback": self._ans, "args": (2, message)}
],[
{"text": self.strings("probably"), "callback": self._ans, "args": (3, message)},
{"text": self.strings("probably_not"), "callback": self._ans, "args": (4, message)}
{"text": self.strings["probably"], "callback": self._ans, "args": (3, message)},
{"text": self.strings["probably_not"], "callback": self._ans, "args": (4, message)}
]
]
@@ -393,13 +393,13 @@ class Akinator(loader.Module):
desc = aki.desc
if desc:
text = self.strings("this_is").format(name=name, description=desc)
text = self.strings["this_is"].format(name=name, description=desc)
else:
text = self.strings("this_is_no_desc").format(name=name)
text = self.strings["this_is_no_desc"].format(name=name)
markup = [[
{"text": self.strings("yes"), "callback": self._fin, "args": (True, message, text, aki.photo)},
{"text": self.strings("not_right"), "callback": self._rej, "args": (message,)}
{"text": self.strings["yes"], "callback": self._fin, "args": (True, message, text, aki.photo)},
{"text": self.strings["not_right"], "callback": self._rej, "args": (message,)}
]
]
@@ -450,7 +450,7 @@ class Akinator(loader.Module):
await call.edit(text, photo=photo, reply_markup=[])
else:
await call.edit(
self.strings("failed"),
self.strings["failed"],
photo="https://raw.githubusercontent.com/Fixyres/FModules/refs/heads/main/assets/akinator/idk.png",
reply_markup=[]
)
)

View File

@@ -2,3 +2,4 @@ akinator
FHeta
BSR
SCD
LFSecurity

View File

@@ -37,7 +37,7 @@ from .. import utils, loader
from ..types import BotInlineCall, InlineCall
logger = logging.getLogger("Limoka")
__version__ = (1, 5, 4)
__version__ = (1, 5, 5)
def _parse_version_from_source(source: str):
@@ -846,29 +846,83 @@ class Limoka(loader.Module):
logger.error(f"Skipping unsafe rmtree for {folder}")
async def _validate_url(self, url: str) -> Optional[str]:
if not url or url in self._invalid_banners:
logger.debug(f"_validate_url called with: {url}")
if not url:
logger.warning("_validate_url: URL is empty, returning None")
return None
if url in self._invalid_banners:
logger.debug(f"_validate_url: URL already in invalid_banners: {url}, returning None")
return None
# Headers to mimic a browser request
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
try:
logger.debug(f"_validate_url: Starting validation for {url}")
async with aiohttp.ClientSession() as session:
async with session.head(
url, timeout=5, allow_redirects=True
) as response:
if response.status != 200:
self._invalid_banners.add(url)
return None
ct = response.headers.get("Content-Type", "").lower()
mime = None
if ct.startswith("image/"):
return url
if not ct: # Some servers don't respond to HEAD requests with Content-Type, so instead we will try guess mime from content
async with session.get(url, timeout=5) as get_response:
data = await get_response.read(2048)
mime = filetype.guess_mime(data, mime=True)
if mime and mime.startswith("image/"):
return url
ct = None
response_status = None
# Try HEAD first (more efficient)
try:
logger.debug(f"_validate_url: Attempting HEAD request for {url}")
async with session.head(
url, timeout=5, allow_redirects=True, headers=headers
) as response:
response_status = response.status
logger.debug(f"_validate_url: HEAD request returned status {response.status} for {url}")
if response.status == 200:
ct = response.headers.get("Content-Type", "").lower()
logger.debug(f"_validate_url: Content-Type from HEAD: '{ct}' for {url}")
except (aiohttp.ClientError, asyncio.TimeoutError) as head_error:
logger.debug(f"_validate_url: HEAD failed ({type(head_error).__name__}), will try GET for {url}")
# If HEAD didn't work or returned non-200, try GET
if ct is None:
max_retries = 2
for attempt in range(max_retries):
try:
async with session.get(
url, timeout=10, headers=headers, allow_redirects=True
) as response:
if response.status != 200:
self._invalid_banners.add(url)
return None
ct = response.headers.get("Content-Type", "").lower()
# Try to get MIME if Content-Type is missing
if not ct:
try:
data = await response.content.read(2048)
mime = filetype.guess_mime(data, mime=True)
if mime and mime.startswith("image/"):
return url
else:
self._invalid_banners.add(url)
return None
except Exception as mime_error:
logger.error(f"_validate_url: Error reading content for MIME detection: {mime_error}")
break # Success, exit retry loop
except (aiohttp.ClientError, asyncio.TimeoutError) as get_error:
if attempt < max_retries - 1:
await asyncio.sleep(1) # Wait before retry
else:
self._invalid_banners.add(url)
return None
# Check Content-Type from successful request
if ct and ct.startswith("image/"):
return url
elif ct:
self._invalid_banners.add(url)
return None
except Exception:
else:
self._invalid_banners.add(url)
return None
except Exception as e:
if url:
self._invalid_banners.add(url)
return None

View File

@@ -760,7 +760,7 @@ class Limoka(loader.Module):
),
},
{
"text": f"{self.strings["body_page"]} {page_body + 1}/{len(body_pages)}",
"text": f"{self.strings['body_page']} {page_body + 1}/{len(body_pages)}",
"callback": self._inline_void,
},
{

View File

@@ -0,0 +1,157 @@
# Midga3
# I AM NOT AFFICIATED WITH WORDLE
# meta developer: @midga3_modules
import requests
import random
import logging
from .. import loader, utils
from herokutl.tl.types import Message
__verison__ = (0, 1, 1)
logger = logging.getLogger(__name__)
@loader.tds
class wordle(loader.Module):
"""Wordle!"""
strings = {
"name": "Wordle",
"loading": "Loading...",
"language": "Language of the wordle",
"have_a_good_game": "Have a good game! Try to guess the 5 letter word in {}",
"attempts_left": "WRONG! Attempts left: {}",
"gg": "GG! YOU DIDN'T GUESS THE WORD {}",
"win": "GG! YOU WON! THE WORD WAS {}",
"already_playing": "ALREADY PLAYING! type .stopwordle to stop the current game",
"length": "Must be 5 letters",
"no_game": "No game is currently running",
"ok": "Game stopped",
"ad": "I tried to Guess word {}. Check out my result:\n{}",
"real_word": "This word is not in the word list"
}
strings_ru ={
"name": "Wordle",
"loading": "Загрузка...",
"language": "Язык вордла",
"have_a_good_game": "Хорошей игры! Попытайтесь угадать слово из 5 букв на {}",
"attempts_left": "НВЕВЕРНО! Осталось попыток: {}",
"gg": "ГГ! ВЫ НЕ УГАДАЛИ СЛОВО {}",
"win": "ГГ! ВЫ ВЫИГРАЛИ! СЛОВО БЫЛО {}",
"already_playing": "УЖЕ ИГРАЕТЕ! напишите .stopwordle чтобы остановить текущую игру",
"length": "Должно содержать 5 букв",
"no_game": "Сейчас нет активной игры",
"ok": "Игра остановлена",
"ad": "Я попытался угадать слово {}. Чекайте мой результат:\n{}",
"real_word": "Такого слова нет в списке слов"
}
def __init__(self):
self.config = loader.ModuleConfig(
loader.ConfigValue(
"language",
"en",
self.strings["language"],
validator=loader.validators.Choice(["en", "ru"])
),
)
async def handler(self, call, data):
guess = data.upper()
word = self._db.get("wordle", "word", "")
attempts = self._db.get("wordle", "attempts", 0)
buttons = self._db.get("wordle", "buttons", [])
markup = buttons + [[{"text":"Введите слово","input":self.strings("length"),"handler": self.handler}]]
if len(guess) != 5:
await call.edit(self.strings("length"), reply_markup=markup)
return
if guess not in self._db.get("wordle", "words", []):
await call.edit(self.strings("real_word"), reply_markup=markup)
return
buttons2 = []
for i in range(5):
if guess[i] == word[i]:
buttons2.append({"text": guess[i], "data": "custom/data", "style": "success"})
elif guess[i] in word:
buttons2.append({"text": guess[i], "data": "custom/data", "style": "primary"})
else:
buttons2.append({"text": guess[i], "data": "custom/data", "style": "danger"})
buttons.append(buttons2)
if guess == word:
self._db.set("wordle", "buttons", buttons)
self._db.set("wordle", "now_playing", False)
result = ""
for btn in buttons:
for b in btn:
if b["style"] == "success":
result += "🟩"
elif b["style"] == "primary":
result += "🟨"
else:
result += ""
result += "\n"
buttons.append([{"text":"Поделится резултатом","copy":self.strings("ad").format(word, result)}])
await call.edit(f"{self.strings('win').format(word)}", reply_markup=buttons)
return
self._db.set("wordle", "buttons", buttons)
attempts -= 1
self._db.set("wordle", "attempts", attempts)
if attempts == 0:
result = ""
for btn in buttons:
for b in btn:
if b["style"] == "success":
result += "🟩"
elif b["style"] == "primary":
result += "🟨"
else:
result += ""
result += "\n"
buttons.append([{"text":"Поделится резултатом","copy":self.strings("ad").format(word, result)}])
await call.edit(f"{self.strings('gg').format(word)}", reply_markup=buttons)
self._db.set("wordle", "now_playing", False)
else:
await call.edit(f"{self.strings('attempts_left').format(attempts)}", reply_markup=markup)
@loader.command()
async def wordle(self, message: Message):
"""Play wordle!"""
await utils.answer(message, self.strings("loading"))
if self._db.get("wordle", "now_playing", False):
await utils.answer(message, self.strings("already_playing"))
return
args = utils.get_args(message)
if args and args[0].lower():
if args[0].lower() == "--no-sec":
self._db.set("wordle", "nosec", True)
return
else:
self._db.set("wordle", "nosec", False)
try:
response = requests.get(f"https://raw.githubusercontent.com/mimimishka449/Worlde/refs/heads/main/words_{self.config['language']}.txt")
if response.status_code == 200:
words = response.text.splitlines()
word = random.choice(words).upper()
self._db.set("wordle", "now_playing", True)
self._db.set("wordle", "attempts", 6)
self._db.set("wordle", "word", word)
self._db.set("wordle", "words", words)
self._db.set("wordle", "buttons", [])
await self.inline.form(self.strings("have_a_good_game").format("английском" if self.config['language'] == "en" else "русском"), message, reply_markup=[[{"text":"Введите слово","input":self.strings("length"),"handler": self.handler}]])
else:
await utils.answer(message, "Error fetching wordle data.")
except Exception as e:
logger.exception(f"Error: {e}")
await utils.answer(message, "An error occurred while fetching wordle data.")
@loader.command()
async def stopwordle(self, message: Message):
"""Stop the wordle game."""
if not self._db.get("wordle", "now_playing", False):
await utils.answer(message, self.strings("no_game"))
return
self._db.set("wordle", "now_playing", False)
await utils.answer(message, self.strings("ok"))

View File

@@ -1,150 +1,122 @@
#meta developer: @matubuntu
import requests, bs4
from datetime import datetime
from .. import loader, utils
import lxml
# meta developer: @matubuntu
# requires: lxml requests bs4
import time
from datetime import datetime
import aiohttp
from .. import loader, utils
_FLAGS = {
"AUD": "🇦🇺",
"AZN": "🇦🇿",
"GBP": "🇬🇧",
"AMD": "🇦🇲",
"BYN": "🇧🇾",
"BGN": "🇧🇬",
"BRL": "🇧🇷",
"HUF": "🇭🇺",
"VND": "🇻🇳",
"HKD": "🇭🇰",
"GEL": "🇬🇪",
"DKK": "🇩🇰",
"AED": "🇦🇪",
"USD": "🇺🇸",
"EUR": "🇪🇺",
"EGP": "🇪🇬",
"INR": "🇮🇳",
"IDR": "🇮🇩",
"KZT": "🇰🇿",
"CAD": "🇨🇦",
"QAR": "🇶🇦",
"KGS": "🇰🇬",
"CNY": "🇨🇳",
"MDL": "🇲🇩",
"NZD": "🇳🇿",
"NOK": "🇳🇴",
"PLN": "🇵🇱",
"RON": "🇷🇴",
"SGD": "🇸🇬",
"TJS": "🇹🇯",
"THB": "🇹🇭",
"TRY": "🇹🇷",
"TMT": "🇹🇲",
"UZS": "🇺🇿",
"UAH": "🇺🇦",
"CZK": "🇨🇿",
"SEK": "🇸🇪",
"CHF": "🇨🇭",
"RSD": "🇷🇸",
"ZAR": "🇿🇦",
"KRW": "🇰🇷",
"JPY": "🇯🇵",
"AUD": "🇦🇺", "AZN": "🇦🇿", "GBP": "🇬🇧", "AMD": "🇦🇲",
"BYN": "🇧🇾", "BGN": "🇧🇬", "BRL": "🇧🇷", "HUF": "🇭🇺",
"VND": "🇻🇳", "HKD": "🇭🇰", "GEL": "🇬🇪", "DKK": "🇩🇰",
"AED": "🇦🇪", "USD": "🇺🇸", "EUR": "🇪🇺", "EGP": "🇪🇬",
"INR": "🇮🇳", "IDR": "🇮🇩", "KZT": "🇰🇿", "CAD": "🇨🇦",
"QAR": "🇶🇦", "KGS": "🇰🇬", "CNY": "🇨🇳", "MDL": "🇲🇩",
"NZD": "🇳🇿", "NOK": "🇳🇴", "PLN": "🇵🇱", "RON": "🇷🇴",
"SGD": "🇸🇬", "TJS": "🇹🇯", "THB": "🇹🇭", "TRY": "🇹🇷",
"TMT": "🇹🇲", "UZS": "🇺🇿", "UAH": "🇺🇦", "CZK": "🇨🇿",
"SEK": "🇸🇪", "CHF": "🇨🇭", "RSD": "🇷🇸", "ZAR": "🇿🇦",
"KRW": "🇰🇷", "JPY": "🇯🇵",
}
_CRYPTO_EMOJIS = {
"BTC": "<emoji document_id=5289519973285257969>💰</emoji>",
"ETH": "<emoji document_id=5287735049301550386>💰</emoji>",
"SOL": "<emoji document_id=5251712673258697260>💰</emoji>",
"TON": "<emoji document_id=5289648693455119919>💰</emoji>",
"USDT": "<emoji document_id=5289904548951911168>💰</emoji>",
"XRP": "<emoji document_id=5373312921214401986>💰</emoji>",
"USDC": "<emoji document_id=5372958453268497353>💰</emoji>",
"ADA": "<emoji document_id=5373076801092338046>💰</emoji>",
"DOGE": "<emoji document_id=5375192042420842380>💰</emoji>",
"TRX": "<emoji document_id=5375187081733616165>💰</emoji>",
"AVAX": "<emoji document_id=5375311275007947936>💰</emoji>",
"LTC": "<emoji document_id=5373035462032113888>💰</emoji>",
"BCH": "<emoji document_id=5375596920397903962>💰</emoji>",
"ATOM": "<emoji document_id=5375468745688889977>💰</emoji>",
"XLM": "<emoji document_id=5372823290647690288>💰</emoji>",
"SHIB": "<emoji document_id=5375231036428924778>💰</emoji>",
"UNI": "<emoji document_id=5372953110329180525>💰</emoji>",
"XMR": "<emoji document_id=5375507073977038661>💰</emoji>",
"LINK": "<emoji document_id=5375149651093633217>💰</emoji>",
"ETC": "<emoji document_id=5375543306321146693>💰</emoji>",
"SUI": "<emoji document_id=5391002164929772708>💰</emoji>",
"NEAR": "<emoji document_id=5391181990915487346>💰</emoji>",
"VET": "<emoji document_id=5391091302681033446>💰</emoji>",
"FIL": "<emoji document_id=5373117173784919811>💰</emoji>",
"XTZ": "<emoji document_id=5390985478981829698>💰</emoji>",
"ALGO": "<emoji document_id=5391337713544738420>💰</emoji>",
"BTC": "<emoji document_id=5289519973285257969>💰</emoji>",
"ETH": "<emoji document_id=5287735049301550386>💰</emoji>",
"SOL": "<emoji document_id=5251712673258697260>💰</emoji>",
"TON": "<emoji document_id=5289648693455119919>💰</emoji>",
"USDT": "<emoji document_id=5289904548951911168>💰</emoji>",
"XRP": "<emoji document_id=5373312921214401986>💰</emoji>",
"USDC": "<emoji document_id=5372958453268497353>💰</emoji>",
"ADA": "<emoji document_id=5373076801092338046>💰</emoji>",
"DOGE": "<emoji document_id=5375192042420842380>💰</emoji>",
"TRX": "<emoji document_id=5375187081733616165>💰</emoji>",
"AVAX": "<emoji document_id=5375311275007947936>💰</emoji>",
"LTC": "<emoji document_id=5373035462032113888>💰</emoji>",
"BCH": "<emoji document_id=5375596920397903962>💰</emoji>",
"ATOM": "<emoji document_id=5375468745688889977>💰</emoji>",
"XLM": "<emoji document_id=5372823290647690288>💰</emoji>",
"SHIB": "<emoji document_id=5375231036428924778>💰</emoji>",
"UNI": "<emoji document_id=5372953110329180525>💰</emoji>",
"XMR": "<emoji document_id=5375507073977038661>💰</emoji>",
"LINK": "<emoji document_id=5375149651093633217>💰</emoji>",
"ETC": "<emoji document_id=5375543306321146693>💰</emoji>",
"SUI": "<emoji document_id=5391002164929772708>💰</emoji>",
"NEAR": "<emoji document_id=5391181990915487346>💰</emoji>",
"VET": "<emoji document_id=5391091302681033446>💰</emoji>",
"FIL": "<emoji document_id=5373117173784919811>💰</emoji>",
"XTZ": "<emoji document_id=5390985478981829698>💰</emoji>",
"ALGO": "<emoji document_id=5391337713544738420>💰</emoji>",
"THETA": "<emoji document_id=5391256014676833736>💰</emoji>",
"FTM": "<emoji document_id=5393179395521263785>💰</emoji>",
"XDAI": "<emoji document_id=5391325992578988886>💰</emoji>",
"RUNE": "<emoji document_id=5391347570494684983>💰</emoji>",
"DOT": "<emoji document_id=5375224568208177973>💰</emoji>",
"FTM": "<emoji document_id=5393179395521263785>💰</emoji>",
"XDAI": "<emoji document_id=5391325992578988886>💰</emoji>",
"RUNE": "<emoji document_id=5391347570494684983>💰</emoji>",
"DOT": "<emoji document_id=5375224568208177973>💰</emoji>",
}
_CRYPTO_LIST = {
"BTC": "Bitcoin",
"ETH": "Ethereum",
"XMR": "Monero",
"LTC": "Litecoin",
"XRP": "XRP",
"ADA": "Cardano",
"DOGE": "Dogecoin",
"SOL": "Solana",
"DOT": "Polkadot",
"USDT": "Tether",
"TON": "Toncoin",
"USDC": "USD Coin",
"TRX": "TRON",
"AVAX": "Avalanche",
"BCH": "Bitcoin Cash",
"ATOM": "Cosmos",
"XLM": "Stellar",
"SHIB": "Shiba Inu",
"UNI": "Uniswap",
"LINK": "Chainlink",
"ETC": "Ethereum Classic",
"SUI": "Sui",
"NEAR": "NEAR Protocol",
"VET": "VeChain",
"FIL": "Filecoin",
"XTZ": "Tezos",
"ALGO": "Algorand",
"THETA": "Theta Network",
"FTM": "Fantom",
"XDAI": "xDai",
_CRYPTO_NAMES = {
"BTC": "Bitcoin", "ETH": "Ethereum", "XMR": "Monero",
"LTC": "Litecoin", "XRP": "XRP", "ADA": "Cardano",
"DOGE": "Dogecoin", "SOL": "Solana", "DOT": "Polkadot",
"USDT": "Tether", "TON": "Toncoin", "USDC": "USD Coin",
"TRX": "TRON", "AVAX": "Avalanche", "BCH": "Bitcoin Cash",
"ATOM": "Cosmos", "XLM": "Stellar", "SHIB": "Shiba Inu",
"UNI": "Uniswap", "LINK": "Chainlink", "ETC": "Ethereum Classic",
"SUI": "Sui", "NEAR": "NEAR Protocol", "VET": "VeChain",
"FIL": "Filecoin", "XTZ": "Tezos", "ALGO": "Algorand",
"THETA": "Theta Network", "FTM": "Fantom", "XDAI": "xDai",
"RUNE": "THORChain",
}
def _fmt_num(v, d=3):
p = f"{v:,.{d}f}".replace(",", " ").split(".")
i = p[0]
d = p[1].rstrip("0") if len(p) > 1 else ""
return f"{i},{d}" if d else i
_CBR_URL = "https://www.cbr.ru/scripts/XML_daily.asp"
_CRYPTO_URL = "https://api.coinlore.net/api/tickers/?limit=100"
CACHE_TTL = 300 # seconds
def _fmt_num(value: float, decimals: int = 3) -> str:
if decimals == 0:
return f"{int(value):,}".replace(",", " ")
rounded = round(value, decimals)
int_part = int(rounded)
dec_part = str(rounded - int_part)[2:2 + decimals].rstrip("0")
int_str = f"{int_part:,}".replace(",", " ")
return f"{int_str},{dec_part}" if dec_part else int_str
def _parse_cbr_xml(xml_bytes: bytes) -> tuple[str | None, dict]:
"""Parse CBR XML without bs4/lxml — pure stdlib ElementTree."""
import xml.etree.ElementTree as ET
root = ET.fromstring(xml_bytes)
date_str = root.attrib.get("Date", "")
try:
date = datetime.strptime(date_str, "%d.%m.%Y").strftime("%d.%m.%Y")
except ValueError:
date = date_str
rates: dict[str, dict] = {}
for valute in root.findall("Valute"):
code = valute.findtext("CharCode", "").strip()
if not code or code == "XDR":
continue
try:
nominal = float(valute.findtext("Nominal", "1").replace(",", "."))
value = float(valute.findtext("Value", "0").replace(",", "."))
except ValueError:
continue
rates[code] = {
"name": valute.findtext("Name", code).strip(),
"nominal": nominal,
"rub": value / nominal,
}
return date, rates
@loader.tds
class FinanceMod(loader.Module):
strings = {
"name": "FinanceMod",
"valute_description": "<кол-во> <код> - курс валюты\n<кол-во> - список",
"valute_no_args": (
"💵 <b>Курс валюты с сайта </b><a href='https://www.cbr.ru/'>ЦБ(РФ)</a>\n"
"<b>Актуально на</b> <i>{}</i>\n\n<blockquote expandable>{}</blockquote>"
),
"valute_specific": (
"💵 <b>Курс валюты с сайта </b><a href='https://www.cbr.ru/'>ЦБ(РФ)</a>\n"
"<b>Актуально на</b> <i>{}</i>\n\n{}"
),
"valute_not_found": "🚫 Валюта {} не найдена",
"crypto_description": "<кол-во> <код> - курс крипты\n<кол-во> - список",
"crypto_no_args": "💎 <b>Курсы криптовалют</b>\n\n<blockquote expandable>{}</blockquote>",
"crypto_specific": "💎 <b>Курс криптовалюты</b>\n\n{}",
"crypto_not_found": "🚫 Криптовалюта {} не найдена",
"error": "🚫 Ошибка получения данных",
}
"""Курсы валют (ЦБ РФ) и криптовалют (CoinLore)"""
strings = {"name": "FinanceMod"}
def __init__(self):
self.config = loader.ModuleConfig(
@@ -152,149 +124,194 @@ class FinanceMod(loader.Module):
"crypto_currency",
"USD",
lambda: "Валюта для отображения крипты (USD, RUB, EUR)",
validator=loader.validators.Choice(["USD", "RUB", "EUR"])
validator=loader.validators.Choice(["USD", "RUB", "EUR"]),
)
)
# Simple in-process cache
self._cbr_cache: tuple[float, str, dict] | None = None # (ts, date, rates)
self._crypto_cache: tuple[float, list] | None = None # (ts, data)
async def _get_curr_data(self):
# ──────────────────────────── HTTP helpers ────────────────────────────
async def _fetch(self, url: str, *, as_json: bool = False):
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
resp.raise_for_status()
return await resp.json() if as_json else await resp.read()
# ──────────────────────────── CBR data ────────────────────────────────
async def _cbr_data(self) -> tuple[str | None, dict]:
now = time.monotonic()
if self._cbr_cache and now - self._cbr_cache[0] < CACHE_TTL:
return self._cbr_cache[1], self._cbr_cache[2]
try:
r = requests.get("https://www.cbr.ru/scripts/XML_daily.asp")
s = bs4.BeautifulSoup(r.content, 'xml')
d = datetime.strptime(s.ValCurs['Date'], "%d.%m.%Y").strftime("%d.%m.%Y")
return d, s.find_all('Valute')
except:
return None, None
raw = await self._fetch(_CBR_URL)
date, rates = _parse_cbr_xml(raw)
self._cbr_cache = (now, date, rates)
return date, rates
except Exception:
if self._cbr_cache:
return self._cbr_cache[1], self._cbr_cache[2]
return None, {}
async def _get_rates(self):
# ──────────────────────────── Crypto data ─────────────────────────────
async def _crypto_data(self) -> list:
now = time.monotonic()
if self._crypto_cache and now - self._crypto_cache[0] < CACHE_TTL:
return self._crypto_cache[1]
try:
r = requests.get("https://www.cbr.ru/scripts/XML_daily.asp")
s = bs4.BeautifulSoup(r.content, 'xml')
rt = {'USD': None, 'EUR': None}
for v in s.find_all('Valute'):
if v.CharCode.text in ['USD', 'EUR']:
n = float(v.Nominal.text.replace(',', '.'))
vl = float(v.Value.text.replace(',', '.'))
rt[v.CharCode.text] = vl / n
if rt['USD'] and rt['EUR']:
rt['EUR_USD'] = rt['USD'] / rt['EUR']
else:
rt['EUR_USD'] = None
return rt
except:
return None
js = await self._fetch(_CRYPTO_URL, as_json=True)
data = js.get("data", [])
self._crypto_cache = (now, data)
return data
except Exception:
return self._crypto_cache[1] if self._crypto_cache else []
async def _fmt_curr(self, v, a=1):
if v.CharCode.text == "XDR":
return None
c = v.CharCode.text
n = v.Name.text
v = float(v.Value.text.replace(',', '.')) / float(v.Nominal.text.replace(',', '.'))
t = v * a
ts = _fmt_num(t, 3)
return f"{_FLAGS.get(c, '🏳')} [{a}] {n} ({c}) - {ts} руб."
# ──────────────────────────── Formatters ──────────────────────────────
async def _get_crypto(self):
def _fmt_valute(self, code: str, info: dict, amount: float = 1.0) -> str:
total = info["rub"] * amount
flag = _FLAGS.get(code, "🏳")
return f"{flag} [{_fmt_num(amount, 0)}] {info['name']} ({code}) — {_fmt_num(total, 3)}"
def _fmt_crypto(self, coin: dict, rates: dict, amount: float = 1.0) -> str:
symbol = coin["symbol"].upper()
try:
return requests.get("https://api.coinlore.net/api/tickers/").json().get('data', [])
except:
return None
price_usd = float(coin["price_usd"])
except (KeyError, ValueError, TypeError):
return ""
async def _fmt_crypto(self, c, a=1):
r = await self._get_rates()
if not r:
return "🚫 Ошибка получения курсов валют"
cr = self.config["crypto_currency"]
try:
p = float(c['price_usd'])
except:
return "🚫 Ошибка данных криптовалюты"
if cr == "RUB":
if not r['USD']:
return "🚫 Курс USD не найден"
p *= r['USD']
elif cr == "EUR":
if not r['EUR_USD']:
return "🚫 Курс EUR/USD не рассчитан"
p *= r['EUR_USD']
t = p * a
ts = _fmt_num(t)
s = c['symbol'].upper()
e = _CRYPTO_EMOJIS.get(s, "💠")
n = _CRYPTO_LIST.get(s, c['name'])
cs = {"USD": "$", "RUB": "", "EUR": ""}.get(cr, "$")
return f"{e} [{a}] {n} ({s}) - {ts}{cs}"
currency = self.config["crypto_currency"]
if currency == "RUB":
usd_rate = rates.get("USD", {}).get("rub")
if not usd_rate:
return ""
price = price_usd * usd_rate
sign = ""
elif currency == "EUR":
usd_rate = rates.get("USD", {}).get("rub")
eur_rate = rates.get("EUR", {}).get("rub")
if not usd_rate or not eur_rate:
return ""
price = price_usd * (usd_rate / eur_rate)
sign = ""
else:
price = price_usd
sign = "$"
@loader.command()
async def valutecmd(self, m):
"""[count] [usd, eur, ...]"""
a = utils.get_args(m)
d, v = await self._get_curr_data()
if not d or not v:
return await utils.answer(m, self.strings["error"])
if len(a) == 0:
l = []
for x in v:
if (n := await self._fmt_curr(x)):
l.append(n)
await utils.answer(m, self.strings["valute_no_args"].format(d, "\n".join(l)))
elif len(a) == 1:
total = price * amount
emoji = _CRYPTO_EMOJIS.get(symbol, "💠")
name = _CRYPTO_NAMES.get(symbol, coin.get("name", symbol))
return f"{emoji} [{_fmt_num(amount, 0)}] {name} ({symbol}) — {_fmt_num(total, 3)}{sign}"
# ──────────────────────────── Commands ────────────────────────────────
@loader.command(ru_doc="[кол-во] [код] — курс валюты по ЦБ РФ")
async def valutecmd(self, message):
"""[amount] [code] — exchange rates from CBR"""
args = utils.get_args(message)
date, rates = await self._cbr_data()
if not rates:
return await utils.answer(message, "🚫 Не удалось получить данные ЦБ РФ")
header = (
f"💵 <b>Курс валюты</b> · <a href='https://www.cbr.ru/'>ЦБ РФ</a>\n"
f"<b>Актуально на</b> <i>{date}</i>\n\n"
)
# .valute — список всех, кол-во = 1
if not args:
lines = [self._fmt_valute(c, i) for c, i in rates.items()]
return await utils.answer(
message,
header + f"<blockquote expandable>{chr(10).join(lines)}</blockquote>",
)
# Первый аргумент: число или код валюты?
amount = 1.0
code = None
arg0 = args[0].upper()
if len(args) >= 2:
# .valute 100 USD
try:
am = float(a[0])
l = []
for x in v:
if (n := await self._fmt_curr(x, am)):
l.append(n)
await utils.answer(m, self.strings["valute_no_args"].format(d, "\n".join(l)))
except:
await utils.answer(m, "🚫 Некорректное число")
elif len(a) == 2:
amount = float(args[0].replace(",", "."))
except ValueError:
return await utils.answer(message, "🚫 Некорректное число")
code = args[1].upper()
else:
# .valute USD или .valute 100
try:
am = float(a[0])
c = a[1].upper()
for x in v:
if x.CharCode.text == c:
if (n := await self._fmt_curr(x, am)):
return await utils.answer(m, self.strings["valute_specific"].format(d, n))
await utils.answer(m, self.strings["valute_not_found"].format(c))
except:
await utils.answer(m, "🚫 Некорректное число")
amount = float(arg0.replace(",", "."))
# число без кода — список с умножением
except ValueError:
code = arg0
@loader.command()
async def cryptocmd(self, m):
"""[count] [ton, btc, ...]"""
a = utils.get_args(m)
c = await self._get_crypto()
if not c:
return await utils.answer(m, self.strings["error"])
try:
if len(a) == 0:
f = [x for x in c if x['symbol'].upper() in _CRYPTO_LIST]
l = []
for x in f:
if (n := await self._fmt_crypto(x)):
l.append(n)
await utils.answer(m, self.strings["crypto_no_args"].format("\n".join(l)))
elif len(a) == 1:
am = float(a[0])
f = [x for x in c if x['symbol'].upper() in _CRYPTO_LIST]
l = []
for x in f:
if (n := await self._fmt_crypto(x, am)):
l.append(n)
await utils.answer(m, self.strings["crypto_no_args"].format("\n".join(l)))
elif len(a) == 2:
am = float(a[0])
t = a[1].upper()
f = False
for x in c:
if x['symbol'].upper() == t:
if (n := await self._fmt_crypto(x, am)):
f = True
await utils.answer(m, self.strings["crypto_specific"].format(n))
break
if not f:
await utils.answer(m, self.strings["crypto_not_found"].format(t))
except ValueError:
await utils.answer(m, "🚫 Некорректное число")
except Exception as e:
await utils.answer(m, f"🚫 Ошибка: {str(e)}")
if code:
if code not in rates:
return await utils.answer(message, f"🚫 Валюта <b>{code}</b> не найдена")
line = self._fmt_valute(code, rates[code], amount)
return await utils.answer(message, header + line)
# список с кол-вом
lines = [self._fmt_valute(c, i, amount) for c, i in rates.items()]
await utils.answer(
message,
header + f"<blockquote expandable>{chr(10).join(lines)}</blockquote>",
)
@loader.command(ru_doc="[кол-во] [код] — курс крипты")
async def cryptocmd(self, message):
"""[amount] [symbol] — crypto rates from CoinLore"""
args = utils.get_args(message)
coins = await self._crypto_data()
_, rates = await self._cbr_data()
if not coins:
return await utils.answer(message, "🚫 Не удалось получить данные крипты")
header = f"💎 <b>Курсы криптовалют</b> · <i>{self.config['crypto_currency']}</i>\n\n"
amount = 1.0
symbol = None
if not args:
pass # список, amount=1
elif len(args) == 1:
try:
amount = float(args[0].replace(",", "."))
except ValueError:
symbol = args[0].upper()
else:
try:
amount = float(args[0].replace(",", "."))
except ValueError:
return await utils.answer(message, "🚫 Некорректное число")
symbol = args[1].upper()
if symbol:
coin = next((c for c in coins if c["symbol"].upper() == symbol), None)
if not coin:
return await utils.answer(message, f"🚫 Крипта <b>{symbol}</b> не найдена")
line = self._fmt_crypto(coin, rates, amount)
if not line:
return await utils.answer(message, "🚫 Ошибка форматирования")
return await utils.answer(message, header + line)
# список только известных монет
known = {c["symbol"].upper(): c for c in coins if c["symbol"].upper() in _CRYPTO_NAMES}
# сортируем по порядку _CRYPTO_NAMES
lines = []
for sym in _CRYPTO_NAMES:
if sym in known:
line = self._fmt_crypto(known[sym], rates, amount)
if line:
lines.append(line)
await utils.answer(
message,
header + f"<blockquote expandable>{chr(10).join(lines)}</blockquote>",
)

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,5 @@
ChatCopy.py
Gemini.py
GiftFinder.py
MaillingChatGT99.py
NekoEditorMod.py

View File

@@ -4,4 +4,6 @@
.ruff_cache
ruff.log
ruff.log.2
ruff.toml
ruff.toml
# Heroku files
heroku/

438
archquise/q.mods/QNotes.py Normal file
View File

@@ -0,0 +1,438 @@
__version__ = (1, 1, 6)
# █▀▀▄ █▀▄▀█ █▀█ █▀▄ █▀
# ▀▀▀█ ▄ █ ▀ █ █▄█ █▄▀ ▄█
# #### Copyright (c) 2026 Archquise #####
# 💬 Contact: https://t.me/archquise
# 🔒 Licensed under the GNU AGPLv3.
# 📄 LICENSE: https://raw.githubusercontent.com/archquise/Q.Mods/main/LICENSE
# ---------------------------------------------------------------------------------
# Name: QNotes
# Description: A notes module that just works
# Author: @quise_m
# ---------------------------------------------------------------------------------
# meta developer: @quise_m
# meta banner: https://raw.githubusercontent.com/archquise/qmods_meta/main/qnotes.png
# ---------------------------------------------------------------------------------
import asyncio
import logging
import re
from datetime import date
from typing import cast
from herokutl.tl.functions.users import GetUsersRequest
from herokutl.tl.types import InputUserSelf
from .. import loader, utils
logger = logging.getLogger(__name__)
@loader.tds
class QNotes(loader.Module):
"""A notes module that just works\nUsage: #notetag in any chat"""
strings = {
"name": "QNotes",
"topic_desc": "Stores your notes content\nUsage: #notetag in any chat",
"wrongargs": "<emoji document_id=5980953710157632545>❌</emoji> <b>Wrong arguments. Check command usage.</b>",
"not_exist": "There is no such note!",
"no_reply": "No reply! Reply to the message, which text will become a note.",
"already_exists": "Seems like note with the same tag already exists. Overwrite?",
"show_note_inline": "<blockquote>#{}</blockquote>\n\n<blockquote>{}</blockquote>",
"notelist": "Note list:",
"msg_not_found_inline": "Message with this note wasn't found. Probably, it was been removed. Note has been removed from the database.",
"remnote_inline": "🗑 Remove",
"close_inline": "❌ Close",
"yes": "✔️ Yes",
"no": "❌ No",
"true": "yes",
"false": "no",
"saved": "Note saved!",
"removed": "Note removed!",
"nonotes": "You don't have any notes!",
"privacy_switch": "Determines whose data will be used by the my_* placeholders\n\nTrue - the account that is issuing the note\nFalse - the account on which the userbot is running",
"note_prefix": "The prefix used to call up notes",
"placeholders": """
<b>Available placeholders</b>:
about the account on which userbot is installed:
{my_id} - ID
@{my_username} - username, tag
{my_phone} - phone number
{my_premium} - premium status (yes/no)
about reply author:
{reply_id} - ID
{reply_name} - name
{reply_surname} - surname
{reply_fullname} - full name (name + surname (if specified))
@{reply_username} - username, tag
{reply_phone} - phone number (if not hidden)
{reply_premium} - premium status (yes/no)
general:
{today} - current date
""",
}
strings_ru = {
"_cls_doc": "Модуль для заметок, который просто работает\nИспользование: #тегзаметки в любом чате",
"topic_desc": "Хранит содержимое ваших заметок\nИспользование: #тегзаметки в любом чате",
"wrongargs": "<emoji document_id=5980953710157632545>❌</emoji> <b>Неверные аргументы. Проверьте использование команды.</b>",
"no_reply": "Нет реплая! Ответьте на сообщение, текст которого станет заметкой.",
"not_exist": "Такой заметки не найдено!",
"already_exists": "Кажется, заметка с таким тегом уже существует. Перезаписать?",
"show_note_inline": "<blockquote>#{}</blockquote>\n\n<blockquote>{}</blockquote>",
"notelist": "Список заметок:",
"msg_not_found_inline": "Сообщение с этой заметкой не было найдено. Вероятно, оно было удалено. Заметка очищена из базы данных.",
"remnote_inline": "🗑 Удалить",
"close_inline": "❌ Закрыть",
"yes": "✔️ Да",
"no": "❌ Нет",
"saved": "Заметка сохранена!",
"removed": "Заметка удалена!",
"true": "да",
"false": "нет",
"nonotes": "Нет заметок!",
"privacy_switch": "Влияет на то, чьи данные будут использовать my_* плейсхолдеры\n\nTrue - аккаунта, который вызывает заметку\nFalse - аккаунта на котором стоит юзербот",
"note_prefix": "Префикс, с которым вызываются заметки",
"placeholders": """
<b>Доступные плейсхолдеры</b>:
об аккаунте, на котором стоит юзербот:
{my_id} - айди
@{my_username} - юзернейм, тег
{my_phone} - номер телефона
{my_premium} - статус премиум (да/нет)
об авторе реплая:
{reply_id} - айди
{reply_name} - имя
{reply_surname} - фамилия
{reply_fullname} - полное имя (имя + фамилия (если указана))
@{reply_username} - юзернейм, тег
{reply_phone} - номер телефона (если не скрыт)
{reply_premium} - статус премиум (да/нет)
общее:
{today} - текущая дата
""",
}
def __init__(self):
self.config = loader.ModuleConfig(
loader.ConfigValue(
"privacy_switch",
True,
lambda: self.strings["privacy_switch"],
validator=loader.validators.Boolean(), # type: ignore
),
loader.ConfigValue(
"note_prefix",
"#",
lambda: self.strings["note_prefix"],
validator=loader.validators.RegExp(r"^\S+$"), # type: ignore
),
)
async def client_ready(self, client, db): # type: ignore
self._content_channel_id = await utils.wait_for_content_channel(self._db)
self._notes_topic = await utils.asset_forum_topic(
client=self._client,
db=self._db,
peer=self._content_channel_id, # type: ignore
title="QNotes | Storage",
description=self.strings["topic_desc"],
icon_emoji_id=5272001961326049733,
)
self.my_phone = (await self._client(GetUsersRequest(id=[InputUserSelf()])))[
0
].phone
self.placeholders = {
"my_phone": self.my_phone,
"my_username": self._client.heroku_me.username,
"my_id": self.tg_id,
"my_premium": self.strings["true"]
if self._client.heroku_me.premium
else self.strings["false"],
}
self._notemap = cast(dict, self.pointer("notemap", default={}))
async def _ask_overwrite(self, message):
loop = asyncio.get_running_loop()
future = loop.create_future()
form = await self.inline.form(
self.strings["already_exists"],
message=message,
reply_markup=[
[
{
"text": self.strings["yes"],
"callback": (
lambda call, flag: (
future.set_result(flag) if not future.done() else None
)
),
"args": (True,),
},
{
"text": self.strings["no"],
"callback": (
lambda call, flag: (
future.set_result(flag) if not future.done() else None
)
),
"args": (False,),
},
]
],
)
try:
async with asyncio.timeout(15):
overwrite_answer = await future
except TimeoutError:
await form.delete() # type: ignore
return False, message
if not overwrite_answer:
await form.delete() # type: ignore
return False, form
return True, form
async def _show_note_inline(self, call, note, page=0):
async def _remnote(call, notetag, note_msg):
await note_msg.delete()
self._notemap.pop(notetag, None)
await call.edit(self.strings["removed"])
note_msg = await self._client.get_messages(
self._content_channel_id, ids=note[1]
)
if not note_msg:
self._notemap.pop(note[0], None)
await call.edit(
self.strings["msg_not_found_inline"],
reply_markup=[
{"text": "⬅️ Назад", "callback": self._list_page, "args": (page,)},
{"text": self.strings["close_inline"], "action": "close"},
],
)
return
await call.edit(
self.strings["show_note_inline"].format(note[0], note_msg.text), # type: ignore
reply_markup=[
[
{"text": "⬅️ Назад", "callback": self._list_page, "args": (page,)},
{
"text": self.strings["remnote_inline"],
"callback": _remnote,
"args": (note[0], note_msg),
},
],
[{"text": self.strings["close_inline"], "action": "close"}],
],
)
def _build_list_markup(self, page: int):
items = list(self._notemap.items())
total = -(-len(items) // 3)
page = max(0, min(page, total - 1))
rows = [
[
{
"text": notetag,
"callback": self._show_note_inline,
"args": ([notetag, msg_id], page),
}
]
for notetag, msg_id in items[page * 3 : (page + 1) * 3]
]
return (
rows
+ self.inline.build_pagination(
callback=self._list_page, # type: ignore
total_pages=total,
current_page=page + 1,
)
+ [[{"text": self.strings["close_inline"], "action": "close"}]]
)
async def _list_page(self, call, page):
await call.edit(
text=self.strings["notelist"], reply_markup=self._build_list_markup(page)
)
@loader.command(
ru_doc="Сохраняет заметку под тегом | Пример: .qnsave заметка",
en_doc="Saves note by tag | Example: .qnsave note",
)
async def qnsave(self, message) -> None:
args = utils.get_args(message)
if not args:
await utils.answer(message, self.strings["wrongargs"])
return
current_message = message
if not (reply := await message.get_reply_message()):
await utils.answer(message, self.strings["no_reply"])
return
try:
if args[0].strip() in self._notemap:
need_overwrite, msg = await self._ask_overwrite(message)
if not need_overwrite:
return
old_note_message = await self._client.get_messages(
self._content_channel_id,
ids=self._notemap[args[0].strip()],
)
old_note_message and await old_note_message.delete() # type: ignore
current_message = msg
note_message = await self._client.send_message(
self._content_channel_id,
reply.text,
reply_to=self._notes_topic.id,
file=reply.media,
)
self._notemap[args[0].strip()] = note_message.id
except Exception as e:
await utils.answer(current_message, f"Произошла ошибка: {e}")
logger.exception("Произошла ошибка при сохранении заметки!")
return
await utils.answer(current_message, self.strings["saved"])
@loader.command(
ru_doc="Удаляет заметку по тегу | Пример: .qnrem заметка",
en_doc="Removes note by tag | Example: .qnrem note",
)
async def qnrem(self, message) -> None:
args = utils.get_args(message)
if not args:
await utils.answer(message, self.strings["wrongargs"])
return
if args[0] not in self._notemap or not (
note_message := await self._client.get_messages(
self._content_channel_id,
ids=self._notemap[args[0]],
)
):
await utils.answer(message, self.strings["not_exist"])
return
await note_message.delete() # type: ignore
self._notemap.pop(args[0], None)
await utils.answer(message, self.strings["removed"])
@loader.command(
ru_doc="Выводит список всех заметок и позволяет управлять ими",
en_doc="Shows note list and allows managing them",
)
async def qnlist(self, message) -> None:
if self._notemap:
await self.inline.form(
text=self.strings["notelist"],
reply_markup=self._build_list_markup(0),
message=message,
)
return
await utils.answer(message, self.strings["nonotes"])
@loader.command(
ru_doc="Выводит список доступных плейсхолдеров",
en_doc="Displays a list of available placeholders",
)
async def qnp(self, message) -> None:
await utils.answer(message, self.strings["placeholders"])
@loader.watcher()
async def _note_watcher(self, message):
if not message.text.startswith(prefix := self.config["note_prefix"]) or not (
await self._client.dispatcher.security.check(message, self._note_watcher)
):
return
notetag = message.text.split(prefix, maxsplit=1)[1]
if notetag in self._notemap:
if not (
note_message := await self._client.get_messages(
self._content_channel_id,
ids=self._notemap[notetag],
)
):
self._notemap.pop(notetag, None)
return
notetext = note_message.text or "" # type: ignore
if re.search(r"\{\w+\}", notetext):
if (
not self.config["privacy_switch"]
or message.sender_id == self._client.heroku_me.id
):
placeholders = {**self.placeholders}
else:
message_author_entity = await self._client.get_entity(
message.sender_id
)
placeholders = {
"my_phone": (
await self._client(GetUsersRequest(id=[message.sender_id]))
)[0].phone,
"my_username": message_author_entity.username,
"my_id": message.sender_id,
"my_premium": self.strings["true"]
if message_author_entity.premium
else self.strings["false"],
}
if reply_msg := await message.get_reply_message():
reply_user = await self._client.get_entity(reply_msg.sender_id)
placeholders = {
**placeholders,
"reply_id": reply_user.id,
"reply_fullname": " ".join(
filter(None, [reply_user.first_name, reply_user.last_name])
),
"reply_name": reply_user.first_name,
"reply_surname": reply_user.last_name,
"reply_phone": (
await self._client(GetUsersRequest(id=[reply_user.id]))
)[0].phone,
"reply_username": reply_user.username,
"reply_premium": self.strings["true"]
if reply_user.premium
else self.strings["false"],
}
placeholders = placeholders | {"today": date.today()}
def replacer(match):
key = match.group(1)
if key not in placeholders or not placeholders[key]:
return match.group(0)
return utils.escape_html(str(placeholders[key]))
notetext = re.sub(r"\{(\w+)\}", replacer, notetext)
if media := note_message.media: # type: ignore
await utils.answer_file(message, media, notetext) # type: ignore
else:
await utils.answer(message, notetext)
return

View File

@@ -59,7 +59,7 @@ class FaceMod(loader.Module):
en_doc="Random kaomoji",
)
async def rfacecmd(self, message) -> None: # noqa: D102, ANN001
await utils.answer(message, self.strings("loading"))
await utils.answer(message, self.strings["loading"])
url = "https://files.archquise.ru/kaomoji.txt"
@@ -72,7 +72,7 @@ class FaceMod(loader.Module):
kaomoji = random.choice(kaomoji_list) # noqa: S311
await utils.answer(
message,
self.strings("random_face").format(kaomoji),
self.strings["random_face"].format(kaomoji),
)
else:
await utils.answer(message, self.strings("error"))
await utils.answer(message, self.strings["error"])

View File

@@ -113,24 +113,24 @@ class ShortenerMod(loader.Module):
async def shortencmd(self, message): # noqa: ANN001, ANN201
"""Shorten URL using bit.ly API."""
if self.config["token"] is None:
await utils.answer(message, self.strings("no_api"))
await utils.answer(message, self.strings["no_api"])
return
args = utils.get_args_raw(message)
if not args:
await utils.answer(message, self.strings("no_args"))
await utils.answer(message, self.strings["no_args"])
return
if not self._validate_url(args):
await utils.answer(message, self.strings("invalid_url"))
await utils.answer(message, self.strings["invalid_url"])
return
try:
short_url = await self.shorten_url(url=args, token=self.config["token"])
await utils.answer(message, self.strings("shortencmd").format(c=short_url))
await utils.answer(message, self.strings["shortencmd"].format(c=short_url))
except Exception as e:
logger.exception("Error shortening URL!")
await utils.answer(message, self.strings("api_error").format(error=str(e)))
await utils.answer(message, self.strings["api_error"].format(error=str(e)))
@loader.command(
ru_doc="Посмотреть статистику ссылки через bit.ly (ссылка без https:// | Доступно только на платных аккаунтах)",
@@ -139,22 +139,22 @@ class ShortenerMod(loader.Module):
async def statclcmd(self, message): # noqa: ANN001, ANN201
"""Get click statistics for shortened URL."""
if self.config["token"] is None:
await utils.answer(message, self.strings("no_api"))
await utils.answer(message, self.strings["no_api"])
return
args = utils.get_args_raw(message)
if not args:
await utils.answer(message, self.strings("no_args"))
await utils.answer(message, self.strings["no_args"])
return
try:
if not args.startswith("bit.ly/"):
await utils.answer(message, self.strings("invalid_url"))
await utils.answer(message, self.strings["invalid_url"])
return
clicks = await self.get_bitlink_stats(
bitlink=args, token=self.config["token"]
)
await utils.answer(message, self.strings("statclcmd").format(c=clicks))
await utils.answer(message, self.strings["statclcmd"].format(c=clicks))
except Exception as e:
logger.exception("Error getting statistics!")
await utils.answer(message, self.strings("api_error").format(error=str(e)))
await utils.answer(message, self.strings["api_error"].format(error=str(e)))

View File

@@ -26,7 +26,6 @@ from .. import loader, utils
logger = logging.getLogger(__name__)
class Banners:
def __init__(
self,
@@ -324,6 +323,16 @@ class YaMusicMod(loader.Module):
"name": "YaMusic"
}
duration_placeholder = {
"start_duration": "<tg-emoji emoji-id=5262663495538742892>☀️</tg-emoji><tg-emoji emoji-id=5260381609479153468>☀️</tg-emoji>",
"start_full_duration": "<tg-emoji emoji-id=5262663495538742892>☀️</tg-emoji><tg-emoji emoji-id=5260609582048254485>☀️</tg-emoji>",
"closed_duration": "<tg-emoji emoji-id=5260467667738859177>☀️</tg-emoji>",
"empty_mid": "<tg-emoji emoji-id=5260415715814448198>☀️</tg-emoji>",
"empty_closed_duration_duration": "<tg-emoji emoji-id=5260239235608255208>☀️</tg-emoji>",
"end_duration_full": "<tg-emoji emoji-id=5260467667738859177>☀️</tg-emoji>",
"empty_closed_duration": "<tg-emoji emoji-id=5260239235608255208>☀️</tg-emoji>",
}
def __init__(self):
self.config = loader.ModuleConfig(
loader.ConfigValue(
@@ -553,67 +562,19 @@ class YaMusicMod(loader.Module):
return "0%"
percent = (progress / duration) * 100
fill_logic = int(percent // 16.66)
s_less_10 = (
"<emoji document_id=5454137780454067986></emoji>"
"<emoji document_id=6158923355173949539>⭐</emoji>"
"<emoji document_id=6159012102083188132>⭐</emoji>"
"<emoji document_id=6159012102083188132>⭐</emoji>"
"<emoji document_id=6158753257289158944>⭐</emoji>"
"<emoji document_id=6156700344526049665>⭐</emoji>"
)
s_10_to_20 = (
"<emoji document_id=5454137780454067986></emoji>"
"<emoji document_id=6159095673556840262>⭐</emoji>"
"<emoji document_id=6159012102083188132>⭐</emoji>"
"<emoji document_id=6156933677214341691>⭐</emoji>"
"<emoji document_id=6158753257289158944>⭐</emoji>"
"<emoji document_id=6156700344526049665>⭐</emoji>"
)
s_30_to_40 = (
"<emoji document_id=5454137780454067986></emoji>"
"<emoji document_id=5454397458471750662></emoji>"
"<emoji document_id=5454397458471750662></emoji>"
"<emoji document_id=6158923355173949539>⭐</emoji>"
"<emoji document_id=6159012102083188132>⭐</emoji>"
"<emoji document_id=6156700344526049665>⭐</emoji>"
)
s_over_50 = (
"<emoji document_id=5454137780454067986></emoji>"
"<emoji document_id=5454397458471750662></emoji>"
"<emoji document_id=5454397458471750662></emoji>"
"<emoji document_id=5454397458471750662></emoji>"
"<emoji document_id=6156933677214341691>⭐</emoji>"
"<emoji document_id=6156700344526049665>⭐</emoji>"
)
s_over_80 = (
"<emoji document_id=5454137780454067986></emoji>"
"<emoji document_id=5454397458471750662></emoji>"
"<emoji document_id=5454397458471750662></emoji>"
"<emoji document_id=5454397458471750662></emoji>"
"<emoji document_id=5454397458471750662></emoji>"
"<emoji document_id=6156700344526049665>⭐</emoji>"
)
if percent < 10:
return s_less_10
elif percent < 20:
return s_10_to_20
elif percent < 30:
return s_10_to_20
elif percent < 40:
return s_30_to_40
elif percent < 50:
return s_30_to_40
elif percent < 80:
return s_over_50
bar = self.duration_placeholder["start_full_duration"] if fill_logic >= 1 else self.duration_placeholder["start_duration"]
for i in range(2, 6):
if fill_logic >= i:
bar += self.duration_placeholder["closed_duration"]
else:
bar += self.duration_placeholder["empty_mid"]
if fill_logic >= 6:
bar += self.duration_placeholder["end_duration_full"]
else:
return s_over_80
bar += self.duration_placeholder["empty_closed_duration"]
return bar
except Exception as e:
return f"Error: {e}"

View File

@@ -22,4 +22,4 @@ chatmodule
stats
tagwatcher
hardspam
YaMusic
YaMusic

View File

@@ -21,6 +21,7 @@ import random
import string
import asyncio
import logging
import re
from PIL import Image, UnidentifiedImageError
from telethon.tl.functions.stickers import CreateStickerSetRequest
@@ -38,11 +39,12 @@ except AttributeError:
logger = logging.getLogger(__name__)
STATIC_STICKER_LIMIT = 120
EMOJI_LIMIT = 200
async def process_to_webp(input_path: str, output_path: str, size: int = 512) -> bool:
try:
is_video = input_path.lower().endswith(('.mp4', '.webm', '.mov')) or b'ftyp' in open(input_path, 'rb').read(32)
is_video = input_path.lower().endswith((".mp4", ".webm", ".mov")) or b"ftyp" in open(input_path, "rb").read(32)
if is_video:
cap = cv2.VideoCapture(input_path)
success, frame = cap.read()
@@ -87,7 +89,7 @@ async def process_to_webp(input_path: str, output_path: str, size: int = 512) ->
async def process_to_png(input_path: str, output_path: str, size: int = 100) -> bool:
try:
is_video = input_path.lower().endswith(('.mp4', '.webm', '.mov')) or b'ftyp' in open(input_path, 'rb').read(32)
is_video = input_path.lower().endswith((".mp4", ".webm", ".mov")) or b"ftyp" in open(input_path, "rb").read(32)
if is_video:
cap = cv2.VideoCapture(input_path)
success, frame = cap.read()
@@ -137,8 +139,10 @@ class CreatePacks(loader.Module):
"processing": "<b>[CreatePacks]</b> Collecting avatars of participants...",
"no_avatars": "<b>[CreatePacks]</b> No members with avatars",
"no_valid": "<b>[CreatePacks]</b> Could not process any avatars",
"done_pack": "<b>[CreatePacks]</b> Sticker pack is ready:\n<b>[CreatePacks]</b> Open: <a href='https://t.me/addstickers/{}'>here</a>",
"done_emoji_pack": "<b>[CreatePacks]</b> Emoji pack is ready:\n<b>[CreatePacks]</b> Open: <a href='https://t.me/addstickers/{}'>here</a>",
"done_pack": "<b>[CreatePacks]</b> Sticker pack is ready:\n<b>[CreatePacks]</b> Open: <a href=\'https://t.me/addstickers/{}\\'>here</a>",
"done_packs": "<b>[CreatePacks]</b> Sticker packs are ready:\n{}",
"done_emoji_pack": "<b>[CreatePacks]</b> Emoji pack is ready:\n<b>[CreatePacks]</b> Open: <a href=\'https://t.me/addstickers/{}\\''>here</a>",
"done_emoji_packs": "<b>[CreatePacks]</b> Emoji packs are ready:\n{}",
"already": "<b>[CreatePacks]</b> A sticker pack with this name already exists.",
"emoji_processing": "<b>[CreatePacks]</b> Creating emoji pack from avatars...",
"emoji_no_emoji": "<b>[CreatePacks]</b> No emoji specified — using",
@@ -149,8 +153,10 @@ class CreatePacks(loader.Module):
"processing": "<b>[CreatePacks]</b> Собираю аватарки участников...",
"no_avatars": "<b>[CreatePacks]</b> Нет участников с аватарками",
"no_valid": "<b>[CreatePacks]</b> Не удалось обработать ни одну аватарку",
"done_pack": "<b>[CreatePacks]</b> Стикерпак готов:\n<b>[CreatePacks]</b> Открыть: <a href='https://t.me/addstickers/{}'>здесь</a>",
"done_emoji_pack": "<b>[CreatePacks]</b> Эмодзи-пак готов:\n<b>[CreatePacks]</b> Открыть: <a href='https://t.me/addstickers/{}'>здесь</a>",
"done_pack": "<b>[CreatePacks]</b> Стикерпак готов:\n<b>[CreatePacks]</b> Открыть: <a href=\'https://t.me/addstickers/{}\\''>здесь</a>",
"done_packs": "<b>[CreatePacks]</b> Стикерпаки готовы:\n{}",
"done_emoji_pack": "<b>[CreatePacks]</b> Эмодзи-пак готов:\n<b>[CreatePacks]</b> Открыть: <a href=\'https://t.me/addstickers/{}\\''>здесь</a>",
"done_emoji_packs": "<b>[CreatePacks]</b> Эмодзи-паки готовы:\n{}",
"already": "<b>[CreatePacks]</b> Стикерпак с таким именем уже существует",
"emoji_processing": "<b>[CreatePacks]</b> Создаю эмодзи-пак из аватаров...",
"emoji_no_emoji": "<b>[CreatePacks]</b> Эмодзи не указан — используется",
@@ -169,10 +175,6 @@ class CreatePacks(loader.Module):
if len(users) >= 100:
break
if not users:
shutil.rmtree(tmp_dir, ignore_errors=True)
return [], tmp_dir
processed = []
process_func = process_to_webp if format == "webp" else process_to_png
@@ -224,6 +226,43 @@ class CreatePacks(loader.Module):
return processed, tmp_dir
async def _create_sticker_pack(self, message, stickers_to_add, is_emoji_pack: bool, pack_number: int = 1, emoji: str = "🖼️"):
random_str = ''.join(random.choices(string.ascii_lowercase + string.digits, k=10))
short_name = f"pack_{random_str}_by_fcreate"
chat = await message.get_chat()
chat_title = getattr(chat, 'title', 'Chat')
title_prefix = "Ava" if not is_emoji_pack else "Emoji"
full_title = f"{chat_title} {title_prefix} #{pack_number}"
try:
await self._client(CreateStickerSetRequest(
user_id="me",
title=full_title,
short_name=short_name,
stickers=stickers_to_add,
emojis=is_emoji_pack
))
return short_name, full_title
except PackShortNameOccupiedError:
random_str = ''.join(random.choices(string.ascii_lowercase + string.digits, k=12))
short_name = f"pack_{random_str}_by_fcreate"
try:
await self._client(CreateStickerSetRequest(
user_id="me",
title=full_title,
short_name=short_name,
stickers=stickers_to_add,
emojis=is_emoji_pack
))
return short_name, full_title
except:
return "already_exists", None
except Exception as e:
logger.error(f"Error creating pack: {e}")
return None, None
@loader.command(
ru_doc="- Создать стикерпак из аватаров в группе",
only_groups=True
@@ -236,11 +275,7 @@ class CreatePacks(loader.Module):
if not files:
return await message.edit(self.strings("no_avatars"))
tag = ''.join(random.choices(string.ascii_lowercase + string.digits, k=4))
short_name = f"f{abs(message.chat_id)}_{tag}_by_fcreateavatars"
title = f"AvaPack {tag}"
stickers = []
all_stickers = []
for path in files:
try:
await asyncio.sleep(0.3)
@@ -248,42 +283,42 @@ class CreatePacks(loader.Module):
msg = await self._client.send_file("me", file, force_document=True)
doc = msg.document
await self._client.delete_messages("me", msg.id)
stickers.append(InputStickerSetItem(
all_stickers.append(InputStickerSetItem(
document=InputDocument(doc.id, doc.access_hash, doc.file_reference),
emoji="🖼️"
))
except Exception as e:
logger.error(f"Sticker loading error {path}: {e}")
continue
if not stickers:
if not all_stickers:
shutil.rmtree(tmp_dir, ignore_errors=True)
return await message.edit(self.strings("no_valid"))
try:
await self._client(CreateStickerSetRequest(
user_id="me",
title=title,
short_name=short_name,
stickers=stickers
))
await message.edit(self.strings("done_pack").format(short_name))
except PackShortNameOccupiedError:
await message.edit(self.strings("already"))
except Exception as e:
error_details = f"❌ Ошибка создания стикерпака:\n<code>{type(e).__name__}: {e}</code>\n"
error_details += f"Пак: {short_name}\nСтикеров: {len(stickers)}\n"
if files:
error_details += f"Последний файл: {files[-1]}\n"
try:
error_details += f"Размер: {Image.open(files[-1]).size}\n"
error_details += f"Вес: {os.path.getsize(files[-1])} байт"
except:
pass
await message.edit(error_details)
logger.exception("Error creating sticker pack")
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)
created_packs_links = []
pack_number = 1
for i in range(0, len(all_stickers), STATIC_STICKER_LIMIT):
current_pack_stickers = all_stickers[i : i + STATIC_STICKER_LIMIT]
short_name, full_title = await self._create_sticker_pack(message, current_pack_stickers, False, pack_number)
if short_name == "already_exists":
await message.edit(self.strings("already"))
shutil.rmtree(tmp_dir, ignore_errors=True)
return
elif short_name:
created_packs_links.append(f"<a href=\'https://t.me/addstickers/{short_name}\\''>{full_title}</a>")
pack_number += 1
if created_packs_links:
if len(created_packs_links) == 1:
# Extract short name for the single link format
sn = created_packs_links[0].split('/')[-1].split("'")[0]
await message.edit(self.strings("done_pack").format(sn))
else:
await message.edit(self.strings("done_packs").format("\n".join(created_packs_links)))
else:
await message.edit(self.strings("no_valid"))
shutil.rmtree(tmp_dir, ignore_errors=True)
@loader.command(
ru_doc="[эмодзи] - Создать эмодзи-пак из всех аватаров",
@@ -303,11 +338,7 @@ class CreatePacks(loader.Module):
if not files:
return await message.edit(self.strings("no_avatars"))
tag = ''.join(random.choices(string.ascii_lowercase + string.digits, k=4))
short_name = f"f{abs(message.chat_id)}_{tag}_by_fcreateemojis"
title = f"EmojiPack {tag}"
stickers = []
all_emojis = []
for path in files:
try:
await asyncio.sleep(0.3)
@@ -315,7 +346,7 @@ class CreatePacks(loader.Module):
msg = await self._client.send_file("me", file, force_document=True)
doc = msg.document
await self._client.delete_messages("me", msg.id)
stickers.append(InputStickerSetItem(
all_emojis.append(InputStickerSetItem(
document=InputDocument(doc.id, doc.access_hash, doc.file_reference),
emoji=emoji
))
@@ -323,32 +354,30 @@ class CreatePacks(loader.Module):
logger.error(f"Error loading emoji {path}: {e}")
continue
if not stickers:
if not all_emojis:
shutil.rmtree(tmp_dir, ignore_errors=True)
return await message.edit(self.strings("no_valid"))
try:
await self._client(CreateStickerSetRequest(
user_id="me",
title=title,
short_name=short_name,
stickers=stickers,
emojis=True
))
await message.edit(self.strings("done_emoji_pack").format(short_name))
except PackShortNameOccupiedError:
await message.edit(self.strings("already"))
except Exception as e:
error_details = f"❌ Ошибка создания эмодзи-пака:\n<code>{type(e).__name__}: {e}</code>\n"
error_details += f"Пак: {short_name}\nСмайликов: {len(stickers)}\n"
if files:
error_details += f"Последний файл: {files[-1]}\n"
try:
error_details += f"Размер: {Image.open(files[-1]).size}\n"
error_details += f"Вес: {os.path.getsize(files[-1])} байт"
except:
pass
await message.edit(error_details)
logger.exception("Error creating emoji pack")
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)
created_packs_links = []
pack_number = 1
for i in range(0, len(all_emojis), EMOJI_LIMIT):
current_pack_emojis = all_emojis[i : i + EMOJI_LIMIT]
short_name, full_title = await self._create_sticker_pack(message, current_pack_emojis, True, pack_number, emoji)
if short_name == "already_exists":
await message.edit(self.strings("already"))
shutil.rmtree(tmp_dir, ignore_errors=True)
return
elif short_name:
created_packs_links.append(f"<a href=\'https://t.me/addstickers/{short_name}\\''>{full_title}</a>")
pack_number += 1
if created_packs_links:
if len(created_packs_links) == 1:
sn = created_packs_links[0].split('/')[-1].split("'")[0]
await message.edit(self.strings("done_emoji_pack").format(sn))
else:
await message.edit(self.strings("done_emoji_packs").format("\n".join(created_packs_links)))
else:
await message.edit(self.strings("no_valid"))
shutil.rmtree(tmp_dir, ignore_errors=True)

View File

@@ -27,4 +27,5 @@ github
stream
placeholders+
PyInstall
IwaAnimation
IwaAnimation
lateban

View File

@@ -0,0 +1,320 @@
# ______ ___ ___ _ _
# ____ | ___ \ | \/ | | | | |
# / __ \| |_/ / _| . . | ___ __| |_ _| | ___
# / / _` | __/ | | | |\/| |/ _ \ / _` | | | | |/ _ \
# | | (_| | | | |_| | | | | (_) | (_| | |_| | | __/
# \ \__,_\_| \__, \_| |_/\___/ \__,_|\__,_|_|\___|
# \____/ __/ |
# |___/
# На модуль распространяется лицензия "GNU General Public License v3.0"
# https://github.com/all-licenses/GNU-General-Public-License-v3.0
# meta developer: @pymodule
import asyncio
import logging
from datetime import datetime, timezone
from herokutl.tl.functions.channels import (
EditBannedRequest,
GetParticipantsRequest,
)
from herokutl.tl.types import (
ChatBannedRights,
ChannelParticipantsSearch,
MessageService,
MessageActionChatAddUser,
MessageActionChatJoinedByLink,
MessageActionChatJoinedByRequest,
)
from .. import loader, utils
logger = logging.getLogger(__name__)
_BAN = ChatBannedRights(until_date=None, view_messages=True)
@loader.tds
class LateBanMod(loader.Module):
"""Ban all members who joined the chat after a specified date/time"""
strings = {
"name": "LateBan",
"no_args": (
"❌ Specify date/time:\n"
"<code>.lateban DD.MM.YYYY</code>\n"
"<code>.lateban DD.MM.YYYY HH:MM</code>\n"
"<code>.lateban HH:MM</code> — today"
),
"bad_date": (
"❌ Invalid format. Use <code>DD.MM.YYYY</code>, "
"<code>DD.MM.YYYY HH:MM</code> or <code>HH:MM</code>"
),
"not_chat": "❌ Only works in supergroups",
"no_rights": "❌ No permission to ban members",
"scanning": "🔍 Scanning members who joined after <b>{dt}</b>...",
"confirm": (
"⚠️ Found <b>{count}</b> members who joined after <b>{dt}</b>.\n\n"
"Confirm ban:"
),
"btn_ban": "✅ Ban {count} members",
"btn_cancel": "❌ Cancel",
"banning": "⏳ Banning {count} members...",
"progress": "⏳ Banned {done}/{total}...",
"done": (
"✅ Banned: <b>{banned}</b>\n"
"Skipped (errors/bots): <b>{skipped}</b>\n"
"Service messages deleted: <b>{deleted}</b>"
),
"nobody": "✅ No members found who joined after <b>{dt}</b>.",
}
strings_ru = {
"name": "LateBan",
"_cls_doc": "Заблокируйте всех участников, присоединившихся к чату после указанной даты/времени.",
"no_args": (
"❌ Укажи дату/время:\n"
"<code>.lateban DD.MM.YYYY</code>\n"
"<code>.lateban DD.MM.YYYY HH:MM</code>\n"
"<code>.lateban HH:MM</code>"
),
"bad_date": (
"❌ Неверный формат. Используй <code>DD.MM.YYYY</code>, "
"<code>DD.MM.YYYY HH:MM</code> или <code>HH:MM</code>"
),
"not_chat": "❌ Команда работает только в супергруппах",
"no_rights": "❌ Нет прав на бан участников",
"scanning": "🔍 Сканирую участников, вступивших после <b>{dt}</b>...",
"confirm": (
"⚠️ Найдено <b>{count}</b> участников, вступивших после <b>{dt}</b>.\n\n"
"Подтверди бан:"
),
"btn_ban": "✅ Забанить {count} участников",
"btn_cancel": "❌ Отмена",
"banning": "⏳ Баню {count} участников...",
"progress": "⏳ Забанено {done}/{total}...",
"done": (
"✅ Забанено: <b>{banned}</b>\n"
"Пропущено (ошибки/боты): <b>{skipped}</b>\n"
"Удалено сервисных сообщений: <b>{deleted}</b>"
),
"nobody": "✅ Участников, вступивших после <b>{dt}</b>, не найдено.",
}
async def client_ready(self):
pass
@loader.command(ru_doc="<DD.MM.YYYY [HH:MM] | HH:MM> - Забанить всех, кто присоединился после определённой даты/времени.")
async def latebancmd(self, message):
"""<DD.MM.YYYY [HH:MM] | HH:MM> — ban all who joined after this date/time"""
args = utils.get_args_raw(message).strip()
if not args:
return await utils.answer(message, self.strings["no_args"])
cutoff = _parse_dt(args)
if cutoff is None:
return await utils.answer(message, self.strings["bad_date"])
chat = await message.get_chat()
if not getattr(chat, "megagroup", False) and not getattr(chat, "gigagroup", False):
return await utils.answer(message, self.strings["not_chat"])
me = await self._client.get_me()
perms = await self._client.get_permissions(chat, me)
if not getattr(perms, "ban_users", False):
return await utils.answer(message, self.strings["no_rights"])
dt_str = cutoff.strftime("%d.%m.%Y %H:%M")
await utils.answer(message, self.strings["scanning"].format(dt=dt_str))
targets = await self._collect_targets(chat, cutoff, me.id)
if not targets:
return await utils.answer(message, self.strings["nobody"].format(dt=dt_str))
await self.inline.form(
message=message,
text=self.strings["confirm"].format(count=len(targets), dt=dt_str),
reply_markup=[[
{
"text": self.strings["btn_ban"].format(count=len(targets)),
"callback": self._do_ban,
"args": (chat, targets, dt_str, cutoff),
},
{
"text": self.strings["btn_cancel"],
"callback": self._cancel,
},
]],
force_me=True,
)
async def _collect_targets(self, chat, cutoff: datetime, my_id: int) -> list:
targets = []
offset = 0
limit = 200
while True:
res = await self._client(GetParticipantsRequest(
channel=chat,
filter=ChannelParticipantsSearch(""),
offset=offset,
limit=limit,
hash=0,
))
if not res.users:
break
users_map = {u.id: u for u in res.users}
for p in res.participants:
joined = getattr(p, "date", None)
if joined is None:
continue
if joined.tzinfo is None:
joined = joined.replace(tzinfo=timezone.utc)
if joined <= cutoff:
continue
uid = p.user_id
user = users_map.get(uid)
if not user or user.id == my_id:
continue
if getattr(user, "bot", False):
continue
if p.__class__.__name__ in ("ChannelParticipantAdmin", "ChannelParticipantCreator"):
continue
targets.append(uid)
if len(res.participants) < limit:
break
offset += limit
await asyncio.sleep(0.3)
return targets
async def _do_ban(self, call, chat, targets: list, dt_str: str, cutoff: datetime):
await call.edit(self.strings["banning"].format(count=len(targets)))
banned = 0
skipped = 0
banned_ids = set()
for i, uid in enumerate(targets, 1):
try:
await self._client(EditBannedRequest(chat, uid, _BAN))
banned += 1
banned_ids.add(uid)
except Exception as e:
logger.warning("LateBan: skip %s%s", uid, e)
skipped += 1
if i % 10 == 0:
try:
await call.edit(
self.strings["progress"].format(done=i, total=len(targets))
)
except Exception:
pass
await asyncio.sleep(0.4)
deleted = await self._delete_join_messages(chat, banned_ids, cutoff)
await call.edit(self.strings["done"].format(
banned=banned, skipped=skipped, deleted=deleted
))
async def _delete_join_messages(
self, chat, banned_ids: set, cutoff: datetime
) -> int:
_JOIN_ACTIONS = (
MessageActionChatAddUser,
MessageActionChatJoinedByLink,
MessageActionChatJoinedByRequest,
)
to_delete = []
try:
async for msg in self._client.iter_messages(
chat,
filter=MessageService,
reverse=False,
limit=None,
offset_date=None,
):
ts = msg.date
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
if ts < cutoff:
break
action = getattr(msg, "action", None)
if not isinstance(action, _JOIN_ACTIONS):
continue
if isinstance(action, MessageActionChatAddUser):
if any(uid in banned_ids for uid in action.users):
to_delete.append(msg.id)
else:
sender_id = getattr(msg, "from_id", None)
if sender_id is not None:
uid = getattr(sender_id, "user_id", None)
if uid in banned_ids:
to_delete.append(msg.id)
except Exception as e:
logger.warning("LateBan: failed to scan service messages — %s", e)
return 0
deleted = 0
for chunk in _chunks(to_delete, 100):
try:
await self._client.delete_messages(chat, chunk)
deleted += len(chunk)
except Exception as e:
logger.warning("LateBan: delete chunk failed — %s", e)
await asyncio.sleep(0.2)
return deleted
async def _cancel(self, call):
await call.delete()
def _parse_dt(raw: str) -> datetime | None:
"""
Supported formats:
DD.MM.YYYY → 00:00 UTC
DD.MM.YYYY HH:MM → HH:MM UTC
HH:MM → today HH:MM UTC
"""
raw = raw.strip()
today = datetime.now(timezone.utc).date()
try:
return datetime.strptime(raw, "%d.%m.%Y %H:%M").replace(tzinfo=timezone.utc)
except ValueError:
pass
try:
return datetime.strptime(raw, "%d.%m.%Y").replace(tzinfo=timezone.utc)
except ValueError:
pass
try:
t = datetime.strptime(raw, "%H:%M").time()
return datetime(
today.year, today.month, today.day,
t.hour, t.minute, tzinfo=timezone.utc,
)
except ValueError:
pass
return None
def _chunks(lst: list, n: int):
for i in range(0, len(lst), n):
yield lst[i:i + n]

View File

@@ -57,6 +57,12 @@
"gifts":[
{"id": 5969796561943660080, "emoji": "🧸", "name": "Пасхальный мишка", "price": 50}
]
},
"may_1th": {
"name": "🛠 1 Мая",
"gifts":[
{"id": 6026193266406327981, "emoji": "🧸", "name": "1 Мая мишка", "price": 50}
]
}
}
}

128856
modules.json

File diff suppressed because it is too large Load Diff

View File

@@ -6,11 +6,6 @@
# |_|\_\___| |_| |_|\___/ \__,_|___/
# @ke_mods
# =======================================
#
# LICENSE: CC BY-ND 4.0 (Attribution-NoDerivatives 4.0 International)
# --------------------------------------
# https://creativecommons.org/licenses/by-nd/4.0/legalcode
# =======================================
# meta developer: @ke_mods

View File

@@ -6,11 +6,6 @@
# |_|\_\___| |_| |_|\___/ \__,_|___/
# @ke_mods
# =======================================
#
# LICENSE: CC BY-ND 4.0 (Attribution-NoDerivatives 4.0 International)
# --------------------------------------
# https://creativecommons.org/licenses/by-nd/4.0/legalcode
# =======================================
# meta developer: @ke_mods
@@ -44,5 +39,5 @@ class NeofetchMod(loader.Module):
await utils.answer(message, f"<pre>{utils.escape_html(output)}</pre>")
except FileNotFoundError:
await utils.answer(message, self.strings("not_installed"))
await utils.answer(message, self.strings["not_installed"])

View File

@@ -6,11 +6,6 @@
# |_|\_\___| |_| |_|\___/ \__,_|___/
# @ke_mods
# =======================================
#
# LICENSE: CC BY-ND 4.0 (Attribution-NoDerivatives 4.0 International)
# --------------------------------------
# https://creativecommons.org/licenses/by-nd/4.0/legalcode
# =======================================
# meta developer: @ke_mods
# requires: pillow
@@ -95,17 +90,17 @@ class PicToStoriesMod(loader.Module):
args = utils.get_args_raw(message)
reply = await message.get_reply_message()
if not reply or not reply.media:
await utils.answer(message, self.strings("no_rep"))
await utils.answer(message, self.strings["no_rep"])
return
try:
image_bytes = await reply.download_media(file=bytes)
img = Image.open(io.BytesIO(image_bytes))
except Exception as e:
await utils.answer(message, self.strings("err").format(e))
await utils.answer(message, self.strings["err"].format(e))
return
await utils.answer(message, self.strings("work"))
await utils.answer(message, self.strings["work"])
w, h = img.size
curr_ratio = w / h
@@ -208,4 +203,4 @@ class PicToStoriesMod(loader.Module):
)
)
await utils.answer(message, self.strings("done"))
await utils.answer(message, self.strings["done"])

View File

@@ -6,11 +6,6 @@
# |_|\_\___| |_| |_|\___/ \__,_|___/
# @ke_mods
# =======================================
#
# LICENSE: CC BY-ND 4.0 (Attribution-NoDerivatives 4.0 International)
# --------------------------------------
# https://creativecommons.org/licenses/by-nd/4.0/legalcode
# =======================================
# meta developer: @ke_mods
# requires: pillow
@@ -55,23 +50,13 @@ class RandomAnimePicMod(loader.Module):
IMAGES_API_URL = "https://api.nekosapi.com/v4/images"
CATEGORIES_SCAN_LIMIT = 500
def __init__(self):
self.config = loader.ModuleConfig(
loader.ConfigValue(
"category",
"",
"Category",
validator=loader.validators.String(),
),
)
@loader.command(ru_doc="- получить рандомную аниме-картинку 👀")
async def rapiccmd(self, message):
"""- fetch random anime-pic 👀"""
await utils.answer(message, self.strings("loading"))
await utils.answer(message, self.strings["loading"])
try:
category = self.config["category"].strip()
category = await utils.get_args_raw().strip()
def fetch_image():
params = {"limit": 1, "rating": ["safe"]}
@@ -111,7 +96,7 @@ class RandomAnimePicMod(loader.Module):
url, file = await asyncio.to_thread(fetch_image)
await utils.answer(
message,
self.strings("img").format(url),
self.strings["img"].format(url),
file=file
)
@@ -120,12 +105,12 @@ class RandomAnimePicMod(loader.Module):
"Error fetching random anime pic: %s",
traceback.format_exc(),
)
await utils.answer(message, self.strings("error"))
await utils.answer(message, self.strings["error"])
@loader.command(ru_doc="- получить список категорий из API 👀")
async def racategoriescmd(self, message):
"""- fetch categories from api 👀"""
await utils.answer(message, self.strings("categories_loading"))
await utils.answer(message, self.strings["categories_loading"])
try:
def fetch_categories() -> list[str]:
@@ -162,15 +147,15 @@ class RandomAnimePicMod(loader.Module):
categories = await asyncio.to_thread(fetch_categories)
if not categories:
await utils.answer(message, self.strings("no_categories"))
await utils.answer(message, self.strings["no_categories"])
return
formatted_categories = "\n".join(
formatted_categories = ", ".join(
f"<code>{category}</code>" for category in categories
)
await utils.answer(
message,
self.strings("categories").format(formatted_categories),
self.strings["categories"].format(formatted_categories),
)
except Exception:
@@ -178,4 +163,4 @@ class RandomAnimePicMod(loader.Module):
"Error fetching categories: %s",
traceback.format_exc(),
)
await utils.answer(message, self.strings("error"))
await utils.answer(message, self.strings["error"])

File diff suppressed because it is too large Load Diff

View File

@@ -6,11 +6,6 @@
# |_|\_\___| |_| |_|\___/ \__,_|___/
# @ke_mods
# =======================================
#
# LICENSE: CC BY-ND 4.0 (Attribution-NoDerivatives 4.0 International)
# --------------------------------------
# https://creativecommons.org/licenses/by-nd/4.0/legalcode
# =======================================
# meta developer: @ke_mods
@@ -43,10 +38,10 @@ class UnbanAllMod(loader.Module):
chat = await message.get_chat()
if not chat.admin_rights and not chat.creator:
await utils.answer(message, self.strings("no_rights"))
await utils.answer(message, self.strings["no_rights"])
return
await utils.answer(message, self.strings("unban_in_process"))
await utils.answer(message, self.strings["unban_in_process"])
no_banned = True
@@ -64,11 +59,11 @@ class UnbanAllMod(loader.Module):
))
except Exception as e:
await utils.answer(message, self.strings("error_occured").format(user.id, e))
await utils.answer(message, self.strings["error_occured"].format(user.id, e))
pass
if no_banned:
await utils.answer(message, self.strings("no_banned"))
await utils.answer(message, self.strings["no_banned"])
return
await utils.answer(message, self.strings("success"))
await utils.answer(message, self.strings["success"])

View File

@@ -6,11 +6,6 @@
# |_|\_\___| |_| |_|\___/ \__,_|___/
# @ke_mods
# =======================================
#
# LICENSE: CC BY-ND 4.0 (Attribution-NoDerivatives 4.0 International)
# --------------------------------------
# https://creativecommons.org/licenses/by-nd/4.0/legalcode
# =======================================
# meta developer: @ke_mods
# scope: ffmpeg

View File

@@ -1,4 +1,4 @@
__version__ = (1, 2, 0, 0)
__version__ = (1, 3, 0, 0)
# This file is a part of Hikka Userbot!
# This product includes software developed by t.me/Fl1yd and t.me/spypm.
@@ -19,6 +19,10 @@ __version__ = (1, 2, 0, 0)
# - Added: Proxy for users from RF
# - Fixed: Correct reply author resolving for forwarded messages
# Changelog v1.3:
# - Added: Message grouping for consecutive messages from the same user (hides avatar/name)
# - Changed: Replaced RU endpoint logic with direct proxy support via module config
# █▄█ █░█ █▀▄▀█ █▀▄▀█ █▄█   █▀▄▀█ █▀█ █▀▄ █▀
# ░█░ █▄█ █░▀░█ █░▀░█ ░█░   █░▀░█ █▄█ █▄▀ ▄█
@@ -155,9 +159,10 @@ class Dick:
return None
@staticmethod
async def post(url: str, data: dict):
async def post(url: str, data: dict, proxy: Optional[str] = None):
try:
return await utils.run_sync(requests.post, url, json=data, timeout=30)
px = {"http": proxy, "https": proxy} if proxy else None
return await utils.run_sync(requests.post, url, json=data, timeout=30, proxies=px)
except Exception:
return None
@@ -199,12 +204,8 @@ class Quotes(loader.Module):
loader.ConfigValue("endpoint","https://kok.gay/gayotes/generate",
lambda:"URL API-эндпоинта (можешь поднять локально - github.com/yummy1gay/quote-api)",
validator=loader.validators.Link()),
loader.ConfigValue("use_rf_proxy", False,
lambda:'Включает прокси для РФ, если основной эндпоинт возвращает ошибку "Нетворк еррорь", и при этом сервер с юзерботом находится в России или ты сам сидишь в России с ограниченным доступом к зарубежным ресурсам (Termux / UserLAnd)',
validator=loader.validators.Boolean()),
loader.ConfigValue("rf_endpoint", "https://ru.kok.gay/gayotes/generate",
lambda:"URL API-эндпоинта для РФ",
validator=loader.validators.Link()))
loader.ConfigValue("proxy", "",
lambda:"Прокси для обхода блокировок (например: http://user:pass@ip:port). Оставь пустым, если не нужно."))
async def client_ready(self, client, db):
self.client=client; self.db=db
@@ -236,11 +237,11 @@ class Quotes(loader.Module):
"format": "webp" if not doc else "png", "type": self.config["type"]}
await utils.answer(st,self.strings["api_processing"])
endpoint=self.config['rf_endpoint'] if self.config['use_rf_proxy'] else self.config['endpoint']
r=await Dick.post(f"{endpoint}.webp",pay)
prx = self.config["proxy"] if self.config["proxy"] else None
r=await Dick.post(f"{self.config['endpoint']}.webp",pay,proxy=prx)
if not r or r.status_code!=200:
try: err=r.json().get("error",f"HTTP {r.status_code}") if r else "Нетворк еррорь (попробуй включить <code>use_rf_proxy</code> в конфиге)"
except Exception: err=f"HTTP {r.status_code}" if r else "Нетворк еррорь (попробуй включить <code>use_rf_proxy</code> в конфиге)"
try: err=r.json().get("error",f"HTTP {r.status_code}") if r else "Нетворк еррорь (попробуй указать прокси в конфиге)"
except Exception: err=f"HTTP {r.status_code}" if r else "Нетворк еррорь (попробуй указать прокси в конфиге)"
return await utils.answer(st,self.strings["api_error"].format(err))
buf=io.BytesIO(r.content); buf.name="YgQuote"+(".png" if doc else ".webp")
@@ -270,11 +271,11 @@ class Quotes(loader.Module):
"format": "webp","type":self.config["type"]}
await utils.answer(st,self.strings["api_processing"])
endpoint=self.config['rf_endpoint'] if self.config['use_rf_proxy'] else self.config['endpoint']
r=await Dick.post(f"{endpoint}.webp",dickk)
prx = self.config["proxy"] if self.config["proxy"] else None
r=await Dick.post(f"{self.config['endpoint']}.webp",dickk,proxy=prx)
if not r or r.status_code!=200:
try: err=r.json().get("error",f"HTTP {r.status_code}") if r else "Нетворк еррорь (попробуй включить <code>use_rf_proxy</code> в конфиге)"
except Exception: err=f"HTTP {r.status_code}" if r else "Нетворк еррорь (попробуй включить <code>use_rf_proxy</code> в конфиге)"
try: err=r.json().get("error",f"HTTP {r.status_code}") if r else "Нетворк еррорь (попробуй указать прокси в конфиге)"
except Exception: err=f"HTTP {r.status_code}" if r else "Нетворк еррорь (попробуй указать прокси в конфиге)"
return await utils.answer(st,self.strings["api_error"].format(err))
buf=io.BytesIO(r.content); buf.name="YgQuote.webp"
@@ -290,12 +291,18 @@ class Quotes(loader.Module):
return None
out: List[dict]=[]
prev_sender_id = None
for mm in lst:
try:
u=await self.who(mm)
if not u: continue
current_sender_id = getattr(u,"id",0)
is_chained = (current_sender_id == prev_sender_id) if current_sender_id else False
name=telethon.utils.get_display_name(u); f,l=Dick.split(name)
ava=await Dick.ava(self.client,getattr(u,"id",0)) if getattr(u,"id",None) else None
ava = await Dick.ava(self.client,current_sender_id) if (not is_chained and current_sender_id) else None
rb=None
try:
@@ -315,10 +322,16 @@ class Quotes(loader.Module):
txt=mm.raw_text or ""; ad=Dick.desc(mm)
if ad: txt=f"{txt}\n\n{ad}" if txt else ad
item={"from":{"id":getattr(u,"id", 0),"first_name":getattr(u,"first_name","") or f,"last_name":getattr(u,"last_name","") or l,
"username":getattr(u,"username",None),"name":name,"photo":{"url":ava} if ava else {}},
"text":txt,"entities":Dick.ents(mm.entities),"avatar":True}
if is_chained:
item={"from":{"id":current_sender_id,"name":""},
"text":txt,"entities":Dick.ents(mm.entities),"avatar":False}
else:
item={"from":{"id":current_sender_id,"first_name":getattr(u,"first_name","") or f,"last_name":getattr(u,"last_name","") or l,
"username":getattr(u,"username",None),"name":name,"photo":{"url":ava} if ava else {}},
"text":txt,"entities":Dick.ents(mm.entities),"avatar":True}
es=getattr(u,"emoji_status",None)
if getattr(es,"document_id",None): item["from"]["emoji_status"]=str(es.document_id)
try:
if mm.voice:
a = next((a for a in mm.voice.attributes or []
@@ -327,11 +340,10 @@ class Quotes(loader.Module):
except Exception: pass
if med: item["voice" if "voice" in med else "media"] = med.get("voice", med)
es=getattr(u,"emoji_status",None)
if getattr(es,"document_id",None): item["from"]["emoji_status"]=str(es.document_id)
if rb: item["replyMessage"]=rb
out.append(item)
prev_sender_id = current_sender_id
except Exception: continue
return out
@@ -378,6 +390,8 @@ class Quotes(loader.Module):
return await self.fake(f"{getattr(u,'id','')} {args}", None)
out: List[dict]=[]
prev_sender_id = None
for part in args.split("; "):
try:
rb=None
@@ -388,22 +402,32 @@ class Quotes(loader.Module):
if not u1: continue
txt1, ents1 = html.parse(t1) if t1 else ("", [])
current_sender_id = u1.id
is_chained = (current_sender_id == prev_sender_id)
name=telethon.utils.get_display_name(u1); f,l=Dick.split(name)
ava=await Dick.ava(self.client,u1.id)
ava = await Dick.ava(self.client,u1.id) if not is_chained else None
if u2:
txt2, ents2 = html.parse(t2) if t2 else ("", [])
name2=telethon.utils.get_display_name(u2); ava2=await Dick.ava(self.client,u2.id)
rb={"name":name2,"text":txt2,"entities":Dick.ents(ents2),"chatId":u2.id,"from":{"name":name2,"photo":{"url":ava2} if ava2 else {}}}
msg={"from":{"id":u1.id,"first_name":getattr(u1,"first_name","") or f,"last_name":getattr(u1,"last_name","") or l,
"username":getattr(u1,"username",None),"name":name,"photo":{"url":ava} if ava else {}},
"text":txt1,"entities":Dick.ents(ents1), "avatar":True}
es=getattr(u1,"emoji_status",None)
if getattr(es,"document_id",None): msg["from"]["emoji_status"]=str(es.document_id)
if is_chained:
msg={"from":{"id":current_sender_id,"name":""},
"text":txt1,"entities":Dick.ents(ents1), "avatar":False}
else:
msg={"from":{"id":current_sender_id,"first_name":getattr(u1,"first_name","") or f,"last_name":getattr(u1,"last_name","") or l,
"username":getattr(u1,"username",None),"name":name,"photo":{"url":ava} if ava else {}},
"text":txt1,"entities":Dick.ents(ents1), "avatar":True}
es=getattr(u1,"emoji_status",None)
if getattr(es,"document_id",None): msg["from"]["emoji_status"]=str(es.document_id)
if rb: msg["replyMessage"]=rb
out.append(msg)
prev_sender_id = current_sender_id
except Exception: continue
return out