diff --git a/Ruslan-Isaev/modules/GeoMod.py b/Ruslan-Isaev/modules/GeoMod.py
new file mode 100644
index 0000000..e88b54c
--- /dev/null
+++ b/Ruslan-Isaev/modules/GeoMod.py
@@ -0,0 +1,61 @@
+__version__ = (1, 0, 0)
+
+# meta developer: @RUIS_VlP
+
+from .. import loader, utils
+import aiohttp
+from telethon.tl.types import InputGeoPoint, InputMediaGeoPoint
+from urllib.parse import quote
+
+async def get_coordinates(query: str):
+ base_url = "https://nominatim.openstreetmap.org/search"
+ encoded_query = quote(query)
+
+ url = f"{base_url}?q={encoded_query}&format=json"
+ headers = {
+ "User-Agent": "Heroku-GeoMod/1.0 (https://t.me/RUIS_VlP)"
+ }
+
+ async with aiohttp.ClientSession() as session:
+ async with session.get(url, headers=headers) as resp:
+ if resp.status == 200:
+ data = await resp.json()
+ if data:
+ lat = float(data[0]["lat"])
+ lon = float(data[0]["lon"])
+ return [lat, lon]
+ return None
+
+@loader.tds
+class GeoMod(loader.Module):
+ """Модуль для отправки геолокации с указанным адресом или координатами"""
+
+ strings = {
+ "name": "GeoMod",
+ }
+
+ @loader.command()
+ async def sendgeo(self, message):
+ """<адрес> - отправить геолокацию с указанным адресом или координатами"""
+ args = utils.get_args_raw(message)
+ if not args:
+ await utils.answer(
+ message,
+ "Укажите адрес, например: .sendgeo Москва, Манежная улица, 2"
+ )
+ return
+
+ coords = await get_coordinates(args)
+ if coords:
+ await message.client.send_file(
+ message.chat_id,
+ InputMediaGeoPoint(
+ geo_point=InputGeoPoint(
+ lat=coords[0],
+ long=coords[1],
+ )
+ )
+ )
+ await message.delete()
+ else:
+ await utils.answer(message, "Координаты не найдены.")
\ No newline at end of file
diff --git a/Ruslan-Isaev/modules/photos/cover.txt b/Ruslan-Isaev/modules/photos/cover.txt
new file mode 100644
index 0000000..66de0c0
--- /dev/null
+++ b/Ruslan-Isaev/modules/photos/cover.txt
@@ -0,0 +1,36 @@
+PCFET0NUWVBFIGh0bWw+CjxodG1sPgo8aGVhZD4KICAgIDx0aXRsZT4vdG1wL2ZpbGVzIC0gNjFf
+MjAyNTEwMTMwMjIzMjAuanBnPC90aXRsZT4KCiAgICA8bWV0YSBodHRwLWVxdWl2PSJjb250ZW50
+LVR5cGUiIGNvbnRlbnQ9InRleHQvaHRtbDsgY2hhcnNldD1VVEYtOCIvPgogICAgPG1ldGEgbmFt
+ZT0idmlld3BvcnQiIGNvbnRlbnQ9IndpZHRoPWRldmljZS13aWR0aCwgaW5pdGlhbC1zY2FsZT0x
+LjAsIG1heGltdW0tc2NhbGU9MS4wLCB1c2VyLXNjYWxhYmxlPW5vIj4KCiAgICA8bGluayBocmVm
+PSIvY3NzL3N0eWxlLmNzcyIgbWVkaWE9ImFsbCIgcmVsPSJzdHlsZXNoZWV0IiB0eXBlPSJ0ZXh0
+L2NzcyIvPgogICAgPGxpbmsgaHJlZj0nLy9mb250cy5nb29nbGVhcGlzLmNvbS9jc3M/ZmFtaWx5
+PU9wZW4rU2FucytDb25kZW5zZWQ6MzAwLDcwMCcgcmVsPSdzdHlsZXNoZWV0JyB0eXBlPSd0ZXh0
+L2NzcycvPgoKICAgIDxzY3JpcHQgZGF0YS1jZmFzeW5jPSJmYWxzZSIgc3JjPSIvL2RnYWYybmN5
+NGR0YW4uY2xvdWRmcm9udC5uZXQvP25mYWdkPTEyMTM0NTEiPjwvc2NyaXB0PgoKCiAgICA8IS0t
+IEdsb2JhbCBzaXRlIHRhZyAoZ3RhZy5qcykgLSBHb29nbGUgQW5hbHl0aWNzIC0tPgogICAgPHNj
+cmlwdCBhc3luYyBzcmM9Imh0dHBzOi8vd3d3Lmdvb2dsZXRhZ21hbmFnZXIuY29tL2d0YWcvanM/
+aWQ9VUEtNjYxMTIxNjEtMiI+PC9zY3JpcHQ+CiAgICA8c2NyaXB0PgogICAgICAgIHdpbmRvdy5k
+YXRhTGF5ZXIgPSB3aW5kb3cuZGF0YUxheWVyIHx8IFtdOwoKICAgICAgICBmdW5jdGlvbiBndGFn
+KCkgewogICAgICAgICAgICBkYXRhTGF5ZXIucHVzaChhcmd1bWVudHMpOwogICAgICAgIH0KCiAg
+ICAgICAgZ3RhZygnanMnLCBuZXcgRGF0ZSgpKTsKICAgICAgICBndGFnKCdjb25maWcnLCAnVUEt
+NjYxMTIxNjEtMicpOwogICAgPC9zY3JpcHQ+CjwvaGVhZD4KPGJvZHk+CjxkaXYgaWQ9ImNvbnRh
+aW5lciI+CiAgICA8aGVhZGVyPgogICAgICAgIDxoMT48YSBocmVmPSIvIj4vdG1wL2ZpbGVzPC9h
+PjwvaDE+CiAgICAgICAgPGgyPlRlbXBvcmFyeSBGaWxlIEhvc3Rpbmc8L2gyPgogICAgPC9oZWFk
+ZXI+CiAgICA8c2VjdGlvbj4KICAgICAgICAKICAgIDx0YWJsZSBzdHlsZT0iY29sb3I6ICNmZmZm
+ZmY7IGZvbnQtc2l6ZTogMTJweDsiPgogICAgICAgIDx0cj4KICAgICAgICAgICAgPHRoIHN0eWxl
+PSJ3aWR0aDogODBweDsiPkZpbGVuYW1lPC90aD4KICAgICAgICAgICAgPHRkPjYxXzIwMjUxMDEz
+MDIyMzIwLmpwZzwvdGQ+CiAgICAgICAgPC90cj4KICAgICAgICA8dHI+CiAgICAgICAgICAgIDx0
+aD5TaXplPC90aD4KICAgICAgICAgICAgPHRkPjU1LjI1IEtCPC90ZD4KICAgICAgICA8L3RyPgog
+ICAgICAgIDx0cj4KICAgICAgICAgICAgPHRoPlVSTDwvdGg+CiAgICAgICAgICAgIDx0ZD48YSB0
+YXJnZXQ9Il9ibGFuayIgaHJlZj0iaHR0cDovL3RtcGZpbGVzLm9yZy9kbC8zOTczNjYyLzYxXzIw
+MjUxMDEzMDIyMzIwLmpwZyI+aHR0cDovL3RtcGZpbGVzLm9yZy9kbC8zOTczNjYyLzYxXzIwMjUx
+MDEzMDIyMzIwLmpwZzwvYT48L3RkPgogICAgICAgIDwvdHI+CiAgICAgICAgPHRyPgogICAgICAg
+ICAgICA8dGg+RXhwaXJlcyBhdDwvdGg+CiAgICAgICAgICAgIDx0ZD4yMDI1LTEwLTEzIDAwOjI2
+IFVUQzwvdGQ+CiAgICAgICAgPC90cj4KICAgIDwvdGFibGU+CiAgICA8YnI+CgogICAgICAgICAg
+ICA8aW1nIGlkPSJpbWdfcHJldmlldyIgc3JjPSJodHRwOi8vdG1wZmlsZXMub3JnL2RsLzM5NzM2
+NjIvNjFfMjAyNTEwMTMwMjIzMjAuanBnIi8+CiAgICAKICAgIDwvc2VjdGlvbj4KICAgIDxmb290
+ZXI+CiAgICAgICAgPHVsPgogICAgICAgICAgICA8bGk+PGEgaHJlZj0iLyI+VXBsb2FkPC9hPjwv
+bGk+CiAgICAgICAgICAgIDxsaT48YSBocmVmPSIvYXBpIj5BUEk8L2E+PC9saT4KICAgICAgICAg
+ICAgPGxpPjxhIGhyZWY9Ii9hYm91dCI+QWJvdXQ8L2E+PC9saT4KICAgICAgICA8L3VsPgogICAg
+PC9mb290ZXI+CjwvZGl2Pgo8L2JvZHk+CjwvaHRtbD4K
\ No newline at end of file
diff --git a/Ruslan-Isaev/modules/whois.py b/Ruslan-Isaev/modules/whois.py
index 57b72d3..9d4cbf7 100755
--- a/Ruslan-Isaev/modules/whois.py
+++ b/Ruslan-Isaev/modules/whois.py
@@ -63,7 +63,7 @@ async def get_whois(identifier, API_KEY: str) -> dict:
return response
async def fetch_dns_record(session, domain, record_type):
- url = "https://unfiltered.adguard-dns.com/resolve"
+ url = "https://dns.google/resolve"
headers = {"accept": "application/dns-json"}
params = {"name": domain, "type": record_type}
diff --git a/fajox1/famods/picme.py b/fajox1/famods/picme.py
index e53c993..22ad4e9 100644
--- a/fajox1/famods/picme.py
+++ b/fajox1/famods/picme.py
@@ -69,7 +69,8 @@ class PicMe(loader.Module):
async def watcher(self, event):
try:
if event.from_id != self.tg_id:
- return
+ if event.from_id.user_id != self.tg_id:
+ return
except:
return
if not self.db.get(self.name, "picme", False):
diff --git a/fiksofficial/python-modules/createavatarspack.py b/fiksofficial/python-modules/createavatarspack.py
index afcb4db..2ebca46 100644
--- a/fiksofficial/python-modules/createavatarspack.py
+++ b/fiksofficial/python-modules/createavatarspack.py
@@ -11,7 +11,7 @@
# https://github.com/all-licenses/GNU-General-Public-License-v3.0
# meta developer: @pymodule
-# requires: opencv-python
+# requires: opencv-python pillow
import os, shutil, cv2
from PIL import Image, UnidentifiedImageError
diff --git a/fiksofficial/python-modules/deviceinfo.py b/fiksofficial/python-modules/deviceinfo.py
new file mode 100644
index 0000000..0d6b2e6
--- /dev/null
+++ b/fiksofficial/python-modules/deviceinfo.py
@@ -0,0 +1,503 @@
+# ______ ___ ___ _ _
+# ____ | ___ \ | \/ | | | | |
+# / __ \| |_/ / _| . . | ___ __| |_ _| | ___
+# / / _` | __/ | | | |\/| |/ _ \ / _` | | | | |/ _ \
+# | | (_| | | | |_| | | | | (_) | (_| | |_| | | __/
+# \ \__,_\_| \__, \_| |_/\___/ \__,_|\__,_|_|\___|
+# \____/ __/ |
+# |___/
+
+# На модуль распространяется лицензия "GNU General Public License v3.0"
+# https://github.com/all-licenses/GNU-General-Public-License-v3.0
+
+# meta developer: @pymodule
+# requires: aiohttp cachetools
+
+import asyncio
+import logging
+from typing import List, Dict, Any
+import aiohttp
+from cachetools import TTLCache
+
+from .. import loader, utils
+from ..inline.types import InlineMessage
+
+logger = logging.getLogger(__name__)
+
+@loader.tds
+class DeviceInfo(loader.Module):
+ """A module for obtaining information about smartphones"""
+
+ strings_ru = {
+ "name": "DeviceInfo",
+ "_cls_doc": "Модуль для получения информации о смартфонах",
+ "searching": "🔍 Ищу устройства по запросу: {}...",
+ "no_query": "❌ Укажи название устройства! Пример: .di iPhone 15",
+ "no_results": "📭 Устройства не найдены для запросу: {}",
+ "device_list": "📱 Найдено {} устройств по запросу {}:",
+ "device_info": "📱 {}\n\n{}",
+ "error": "❌ Ошибка: {}. Попробуй позже или проверь API.",
+ "network": "📡 Сеть: {}\n",
+ "launched": "📅 Дата выпуска:\n Анонс: {}\n Статус: {}\n",
+ "body": "📏 Корпус:\n Размеры: {}\n Вес: {}\n SIM: {}\n Прочее: {}\n",
+ "display": "🖥️ Дисплей:\n Тип: {}\n Размер: {}\n Разрешение: {}\n Защита: {}\n",
+ "platform": "⚙️ Платформа:\n ОС: {}\n Чипсет: {}\n CPU: {}\n GPU: {}\n",
+ "memory": "💾 Память:\n Карта памяти: {}\n Внутренняя: {}\n Прочее: {}\n",
+ "main_camera": "📷 Основная камера:\n Модули: {}\n Функции: {}\n Видео: {}\n",
+ "selfie_camera": "🤳 Фронтальная камера:\n Модули: {}\n Функции: {}\n Видео: {}\n",
+ "sound": "🔊 Звук:\n Динамик: {}\n Аудиоразъём: {}\n Прочее: {}\n",
+ "comms": "🌐 Связь:\n Wi-Fi: {}\n Bluetooth: {}\n GPS: {}\n NFC: {}\n Инфракрасный порт: {}\n Радио: {}\n USB: {}\n",
+ "sensors": "🛠️ Датчики: {}\n",
+ "battery": "🔋 Батарея:\n Тип: {}\n Зарядка: {}\n",
+ "misc": "🎨 Разное:\n Цвета: {}\n Модели: {}\n",
+ "show_body": "📏 Корпус",
+ "show_memory": "💾 Память",
+ "show_cameras": "📷 Камеры",
+ "show_sound": "🔊 Звук",
+ "show_comms": "🌐 Связь",
+ "show_sensors": "🛠️ Датчики",
+ "show_misc": "🎨 Разное",
+ "next_photo": "▶️ След. фото",
+ "prev_photo": "◀️ Пред. фото",
+ "back": "🔙 Назад",
+ "back_to_device": "🔙 К устройству",
+ "config_saved": "✅ Конфигурация сохранена!",
+ "retrying": "🔄 Повторяю запрос... (попытка {}/{} )"
+ }
+
+ strings = {
+ "name": "DeviceInfo",
+ "searching": "🔍 Searching devices for: {}...",
+ "no_query": "❌ Specify a device name! Example: .di iPhone 15",
+ "no_results": "📭 No devices found for query: {}",
+ "device_list": "📱 Found {} devices for query {}:",
+ "device_info": "📱 {}\n\n{}",
+ "error": "❌ Error: {}. Try again later or check the API.",
+ "network": "📡 Network: {}\n",
+ "launched": "📅 Launch:\n Announced: {}\n Status: {}\n",
+ "body": "📏 Body:\n Dimensions: {}\n Weight: {}\n SIM: {}\n Other: {}\n",
+ "display": "🖥️ Display:\n Type: {}\n Size: {}\n Resolution: {}\n Protection: {}\n",
+ "platform": "⚙️ Platform:\n OS: {}\n Chipset: {}\n CPU: {}\n GPU: {}\n",
+ "memory": "💾 Memory:\n Card slot: {}\n Internal: {}\n Other: {}\n",
+ "main_camera": "📷 Main Camera:\n Modules: {}\n Features: {}\n Video: {}\n",
+ "selfie_camera": "🤳 Selfie Camera:\n Modules: {}\n Features: {}\n Video: {}\n",
+ "sound": "🔊 Sound:\n Loudspeaker: {}\n Audio Jack: {}\n Other: {}\n",
+ "comms": "🌐 Comms:\n Wi-Fi: {}\n Bluetooth: {}\n GPS: {}\n NFC: {}\n Infrared: {}\n Radio: {}\n USB: {}\n",
+ "sensors": "🛠️ Sensors: {}\n",
+ "battery": "🔋 Battery:\n Type: {}\n Charging: {}\n",
+ "misc": "🎨 Misc:\n Colors: {}\n Models: {}\n",
+ "show_body": "📏 Body",
+ "show_memory": "💾 Memory",
+ "show_cameras": "📷 Cameras",
+ "show_sound": "🔊 Sound",
+ "show_comms": "🌐 Comms",
+ "show_sensors": "🛠️ Sensors",
+ "show_misc": "🎨 Misc",
+ "next_photo": "▶️ Next Photo",
+ "prev_photo": "◀️ Prev Photo",
+ "back": "🔙 Back",
+ "back_to_device": "🔙 To Device",
+ "config_saved": "✅ Configuration saved!",
+ "retrying": "🔄 Retrying request... (attempt {}/{})"
+ }
+
+ def __init__(self):
+ self.config = loader.ModuleConfig(
+ loader.ConfigValue(
+ "api_base_url",
+ "https://mobilespecs.fiksofficial.fun",
+ lambda: "API Url",
+ validator=loader.validators.String()
+ ),
+ loader.ConfigValue(
+ "max_results",
+ 20,
+ lambda: "Maximum search results to display",
+ validator=loader.validators.Integer(minimum=1, maximum=50)
+ ),
+ loader.ConfigValue(
+ "timeout",
+ 10,
+ lambda: "Timeout for API requests (seconds)",
+ validator=loader.validators.Integer(minimum=5, maximum=30)
+ ),
+ loader.ConfigValue(
+ "retry_attempts",
+ 3,
+ lambda: "Number of retry attempts for API requests",
+ validator=loader.validators.Integer(minimum=1, maximum=5)
+ )
+ )
+ self.cache = TTLCache(maxsize=100, ttl=300)
+ self.session: aiohttp.ClientSession = None
+
+ async def client_ready(self, client, db):
+ """Initialize aiohttp session on client ready"""
+ self.session = aiohttp.ClientSession(
+ timeout=aiohttp.ClientTimeout(total=self.config["timeout"])
+ )
+ logger.info("DeviceInfo: aiohttp session initialized")
+ self.client = client
+
+ async def on_unload(self):
+ """Close aiohttp session on module unload"""
+ if self.session:
+ await self.session.close()
+ logger.info("DeviceInfo: aiohttp session closed")
+
+ async def _resolve_entity(self, call: InlineMessage, message_id: int, chat_id: int = None):
+ """Resolve Telegram entity to Message or int"""
+ if hasattr(call, "message") and call.message:
+ logger.debug("DeviceInfo: Using call.message")
+ return call.message
+ if chat_id:
+ logger.warning(f"DeviceInfo: call.message is None, falling back to chat_id {chat_id}")
+ return chat_id
+ logger.warning(f"DeviceInfo: call.message and chat_id are None, falling back to message_id {message_id}")
+ return message_id
+
+ async def _fetch_json(self, endpoint: str, params: Dict[str, Any] = None) -> Any:
+ """Fetch JSON from API with retry and caching"""
+ cache_key = f"{endpoint}_{params}" if params else endpoint
+ if cache_key in self.cache:
+ logger.debug(f"Cache hit for {cache_key}")
+ return self.cache[cache_key]
+
+ url = f"{self.config['api_base_url']}/gsm/{endpoint}"
+ params_clean = {k: v for k, v in (params or {}).items() if k != "message"}
+ for attempt in range(1, self.config["retry_attempts"] + 1):
+ try:
+ async with self.session.get(url, params=params_clean or None) as resp:
+ if resp.status != 200:
+ error_text = await resp.text()
+ logger.error(f"DeviceInfo: HTTP {resp.status} for {url}: {error_text}")
+ raise aiohttp.ClientError(f"HTTP {resp.status}: {error_text}")
+ content_type = resp.headers.get("Content-Type", "")
+ if "application/json" not in content_type:
+ error_text = await resp.text()
+ logger.error(f"DeviceInfo: Invalid content-type {content_type} for {url}: {error_text}")
+ raise ValueError(f"Invalid content-type: {content_type}")
+ data = await resp.json()
+ if data is None:
+ error_text = await resp.text()
+ logger.error(f"DeviceInfo: API returned None for {url}: {error_text}")
+ if endpoint.startswith("search"):
+ data = []
+ else:
+ data = {}
+ if not isinstance(data, (list, dict)):
+ logger.error(f"DeviceInfo: Unexpected API response type for {url}: {type(data)}")
+ raise ValueError(f"Unexpected API response type: {type(data)}")
+ self.cache[cache_key] = data
+ logger.debug(f"Cache set for {cache_key}")
+ return data
+ except (aiohttp.ClientError, asyncio.TimeoutError, ValueError) as e:
+ logger.warning(f"DeviceInfo: Request failed for {endpoint} (attempt {attempt}): {e}")
+ if attempt < self.config["retry_attempts"]:
+ if params and "message" in params:
+ await utils.answer(params["message"], self.strings["retrying"].format(attempt, self.config["retry_attempts"]))
+ await asyncio.sleep(2 * attempt)
+ else:
+ logger.error(f"DeviceInfo: API failed after {self.config['retry_attempts']} attempts: {e}")
+ if endpoint.startswith("search"):
+ return []
+ return {}
+
+ def _format_essential_info(self, device: Dict[str, Any]) -> str:
+ """Format essential device info (name, network, launch, display, platform, battery)"""
+ info_text = ""
+ if "network" in device:
+ info_text += self.strings["network"].format(device.get("network", "N/A"))
+ if "launced" in device:
+ info_text += self.strings["launched"].format(
+ device["launced"].get("announced", "N/A"),
+ device["launced"].get("status", "N/A")
+ )
+ if "display" in device:
+ info_text += self.strings["display"].format(
+ device["display"].get("type", "N/A"),
+ device["display"].get("size", "N/A"),
+ device["display"].get("resolution", "N/A"),
+ device["display"].get("protection", "N/A")
+ )
+ if "platform" in device:
+ info_text += self.strings["platform"].format(
+ device["platform"].get("os", "N/A"),
+ device["platform"].get("chipset", "N/A"),
+ device["platform"].get("cpu", "N/A"),
+ device["platform"].get("gpu", "N/A")
+ )
+ if "battery" in device:
+ info_text += self.strings["battery"].format(
+ device["battery"].get("battType", "N/A"),
+ device["battery"].get("charging", "N/A")
+ )
+ return info_text
+
+ def _format_section(self, section: str, device: Dict[str, Any]) -> str:
+ """Format a specific section of device info"""
+ if section == "body" and "body" in device:
+ return self.strings["body"].format(
+ device["body"].get("dimension", "N/A"),
+ device["body"].get("weight", "N/A"),
+ device["body"].get("sim", "N/A"),
+ device["body"].get("other", "N/A")
+ )
+ if section == "memory" and "memory" in device:
+ memory = {item.get("label", ""): item.get("value", "N/A") for item in device.get("memory", [])}
+ return self.strings["memory"].format(
+ memory.get("card", "N/A"),
+ memory.get("internal", "N/A"),
+ memory.get("otherMemory", "N/A")
+ )
+ if section == "cameras":
+ cam_text = ""
+ if "mainCamera" in device:
+ cam_text += self.strings["main_camera"].format(
+ device["mainCamera"].get("mainModules", "N/A"),
+ device["mainCamera"].get("mainFeatures", "N/A"),
+ device["mainCamera"].get("mainVideo", "N/A")
+ )
+ if "selfieCamera" in device:
+ cam_text += self.strings["selfie_camera"].format(
+ device["selfieCamera"].get("selfieModules", "N/A"),
+ device["selfieCamera"].get("selfieFeatures", "N/A"),
+ device["selfieCamera"].get("selfieVideo", "N/A")
+ )
+ return cam_text
+ if section == "sound" and "sound" in device:
+ return self.strings["sound"].format(
+ device["sound"].get("loudSpeaker", "N/A"),
+ device["sound"].get("audioJack", "N/A"),
+ device["sound"].get("otherSound", "N/A")
+ )
+ if section == "comms" and "comms" in device:
+ return self.strings["comms"].format(
+ device["comms"].get("wlan", "N/A"),
+ device["comms"].get("bluetooth", "N/A"),
+ device["comms"].get("gps", "N/A"),
+ device["comms"].get("nfc", "N/A"),
+ device["comms"].get("infrared", "N/A"),
+ device["comms"].get("radio", "N/A"),
+ device["comms"].get("usb", "N/A")
+ )
+ if section == "sensors" and "sensors" in device:
+ return self.strings["sensors"].format(device.get("sensors", "N/A"))
+ if section == "misc" and "misc" in device:
+ return self.strings["misc"].format(
+ device["misc"].get("colors", "N/A"),
+ device["misc"].get("models", "N/A")
+ )
+ return "N/A"
+
+ @loader.command(ru_doc="(.di) <название устройства> - Получить информацию о смартфоне", alias="di")
+ async def deviceinfo(self, message):
+ """(.di) - Get smartphone info by name"""
+ query = utils.get_args_raw(message).strip()
+ if not query:
+ await utils.answer(message, self.strings["no_query"])
+ return
+
+ await utils.answer(message, self.strings["searching"].format(query))
+ try:
+ devices = await self._fetch_json("search", {"q": query, "message": message})
+ if not devices:
+ await utils.answer(message, self.strings["no_results"].format(query))
+ return
+
+ devices = devices[:self.config["max_results"]]
+ button_rows = [[{"text": device["name"], "callback": self.show_device_info, "args": [device["id"], query, message.id, message.chat_id, 0, None]}] for device in devices]
+ await self.inline.list(
+ message=message,
+ strings=[self.strings["device_list"].format(len(devices), query)],
+ custom_buttons=button_rows,
+ ttl=60,
+ force_me=True,
+ manual_security=True,
+ silent=True
+ )
+ except Exception as e:
+ logger.error(f"DeviceInfo: Failed to fetch search results: {e}")
+ await utils.answer(message, self.strings["error"].format(str(e)))
+
+ async def show_device_info(self, call: InlineMessage, device_id: str, query: str, message_id: int, chat_id: int, photo_idx: int, prev_call_id: str = None):
+ """Handle device selection and show essential info with buttons for details"""
+ message = await self._resolve_entity(call, message_id, chat_id)
+
+ try:
+ device = await self._fetch_json(f"info/{device_id}", {"message": message})
+ if not device:
+ raise ValueError("No device info returned")
+ images_data = await self._fetch_json(f"images/{device_id}", {"message": message})
+ images = images_data.get("images", []) if isinstance(images_data, dict) else []
+
+ info_text = self._format_essential_info(device)
+ full_text = self.strings["device_info"].format(device.get("name", "N/A"), info_text)
+
+ # Truncate for media caption (Telegram limit: 1024 chars)
+ caption = full_text[:1024] + ("..." if len(full_text) > 1024 else "") if images else full_text
+ logger.debug(f"DeviceInfo: Caption length: {len(caption)} characters, photo_idx: {photo_idx}")
+
+ # Buttons for additional sections and photo navigation
+ buttons = [
+ [
+ {"text": self.strings["show_body"], "callback": self.show_section, "args": ["body", device_id, query, message_id, chat_id, photo_idx]},
+ {"text": self.strings["show_memory"], "callback": self.show_section, "args": ["memory", device_id, query, message_id, chat_id, photo_idx]},
+ ],
+ [
+ {"text": self.strings["show_cameras"], "callback": self.show_section, "args": ["cameras", device_id, query, message_id, chat_id, photo_idx]},
+ {"text": self.strings["show_sound"], "callback": self.show_section, "args": ["sound", device_id, query, message_id, chat_id, photo_idx]},
+ ],
+ [
+ {"text": self.strings["show_comms"], "callback": self.show_section, "args": ["comms", device_id, query, message_id, chat_id, photo_idx]},
+ {"text": self.strings["show_sensors"], "callback": self.show_section, "args": ["sensors", device_id, query, message_id, chat_id, photo_idx]},
+ ],
+ [
+ {"text": self.strings["show_misc"], "callback": self.show_section, "args": ["misc", device_id, query, message_id, chat_id, photo_idx]},
+ ],
+ [
+ {"text": self.strings["prev_photo"], "callback": self.show_device_info, "args": [device_id, query, message_id, chat_id, max(0, photo_idx - 1), call.id]} if photo_idx > 0 else None,
+ {"text": self.strings["next_photo"], "callback": self.show_device_info, "args": [device_id, query, message_id, chat_id, min(len(images) - 1, photo_idx + 1), call.id]} if images and photo_idx < len(images) - 1 else None,
+ {"text": self.strings["back"], "callback": self.back_to_search, "args": [query, message_id, chat_id]},
+ ]
+ ]
+ # Filter out None buttons
+ buttons = [[btn for btn in row if btn] for row in buttons if any(row)]
+
+ # Always edit the message (for device selection or photo navigation)
+ logger.debug(f"DeviceInfo: Editing message for device_id: {device_id}, photo_idx: {photo_idx}, call_id: {call.id}")
+ await call.edit(
+ text=caption,
+ reply_markup=buttons,
+ photo=images[photo_idx] if images else None,
+ disable_web_page_preview=True
+ )
+ except Exception as e:
+ logger.error(f"DeviceInfo: Failed to show device info: {e}")
+ await call.edit(
+ text=self.strings["error"].format(str(e)),
+ reply_markup=[],
+ disable_web_page_preview=True
+ )
+
+ async def show_section(self, call: InlineMessage, section: str, device_id: str, query: str, message_id: int, chat_id: int, photo_idx: int):
+ """Show a specific section of device info"""
+ message = await self._resolve_entity(call, message_id, chat_id)
+
+ try:
+ device = await self._fetch_json(f"info/{device_id}", {"message": message})
+ if not device:
+ raise ValueError("No device info returned")
+
+ section_text = self._format_section(section, device)
+ full_text = self.strings["device_info"].format(device.get("name", "N/A"), section_text)
+
+ # Truncate for Telegram message limit (4000 chars)
+ full_text = full_text[:4000] + ("..." if len(full_text) > 4000 else "")
+
+ # Buttons for returning to device info
+ buttons = [
+ [{"text": self.strings["back_to_device"], "callback": self.show_device_info, "args": [device_id, query, message_id, chat_id, photo_idx, call.id]}]
+ ]
+
+ # Try to edit the message
+ try:
+ logger.debug(f"DeviceInfo: Editing message for section: {section}, call_id: {call.id}")
+ await call.edit(
+ text=full_text,
+ reply_markup=buttons,
+ photo=None, # Sections don't include photos to avoid media/text mismatch
+ disable_web_page_preview=True
+ )
+ except Exception as edit_error:
+ logger.warning(f"DeviceInfo: Failed to edit message for section {section}: {edit_error}")
+ # Fallback to new inline form if edit fails
+ await self.inline.form(
+ text=full_text,
+ message=message,
+ reply_markup=buttons,
+ ttl=300,
+ force_me=True,
+ silent=True
+ )
+ except Exception as e:
+ logger.error(f"DeviceInfo: Failed to show section {section}: {e}")
+ try:
+ await call.edit(
+ text=self.strings["error"].format(str(e)),
+ reply_markup=[],
+ disable_web_page_preview=True
+ )
+ except Exception as edit_error:
+ logger.warning(f"DeviceInfo: Failed to edit error message: {edit_error}")
+ await self.inline.form(
+ text=self.strings["error"].format(str(e)),
+ message=message,
+ silent=True
+ )
+
+ async def back_to_search(self, call: InlineMessage, query: str, message_id: int, chat_id: int):
+ """Handle 'Back' button to return to search results"""
+ message = await self._resolve_entity(call, message_id, chat_id)
+
+ try:
+ devices = await self._fetch_json("search", {"q": query, "message": message})
+ logger.debug(f"DeviceInfo: Fetched {len(devices)} devices for query: {query}")
+ if not devices:
+ logger.warning(f"DeviceInfo: No devices found for query: {query}")
+ try:
+ await call.edit(
+ text=self.strings["no_results"].format(query),
+ reply_markup=[],
+ photo=None, # Explicitly remove any existing photo
+ disable_web_page_preview=True
+ )
+ except Exception as edit_error:
+ logger.warning(f"DeviceInfo: Failed to edit no_results message: {edit_error}")
+ await self.inline.form(
+ text=self.strings["no_results"].format(query),
+ message=message,
+ silent=True
+ )
+ return
+
+ devices = devices[:self.config["max_results"]]
+ button_rows = [[{"text": device["name"], "callback": self.show_device_info, "args": [device["id"], query, message_id, chat_id, 0, None]}] for device in devices]
+ list_text = self.strings["device_list"].format(len(devices), query)
+
+ # Try to edit the message
+ try:
+ logger.debug(f"DeviceInfo: Editing message for back_to_search, query: {query}, call_id: {call.id}")
+ await call.edit(
+ text=list_text,
+ reply_markup=button_rows,
+ photo=None, # Explicitly remove any existing photo
+ disable_web_page_preview=True
+ )
+ except Exception as edit_error:
+ logger.warning(f"DeviceInfo: Failed to edit back_to_search message: {edit_error}")
+ await self.inline.list(
+ message=message_id,
+ strings=[list_text],
+ custom_buttons=button_rows,
+ ttl=60,
+ force_me=True,
+ manual_security=True,
+ silent=True
+ )
+ except Exception as e:
+ logger.error(f"DeviceInfo: Failed to return to search: {e}")
+ try:
+ await call.edit(
+ text=self.strings["error"].format(str(e)),
+ reply_markup=[],
+ photo=None, # Explicitly remove any existing photo
+ disable_web_page_preview=True
+ )
+ except Exception as edit_error:
+ logger.warning(f"DeviceInfo: Failed to edit error message: {edit_error}")
+ await self.inline.form(
+ text=self.strings["error"].format(str(e)),
+ message=message,
+ silent=True
+ )
\ No newline at end of file
diff --git a/fiksofficial/python-modules/full.txt b/fiksofficial/python-modules/full.txt
index d49f815..73e0f01 100644
--- a/fiksofficial/python-modules/full.txt
+++ b/fiksofficial/python-modules/full.txt
@@ -17,4 +17,7 @@ qrgen
wiki
checkhost
createavatarspack
-multiunloadmodule
\ No newline at end of file
+multiunloadmodule
+tagall2.0
+point
+deviceinfo
\ No newline at end of file
diff --git a/fiksofficial/python-modules/lyrics.py b/fiksofficial/python-modules/lyrics.py
index 3fb5e6e..d62a880 100644
--- a/fiksofficial/python-modules/lyrics.py
+++ b/fiksofficial/python-modules/lyrics.py
@@ -2,6 +2,8 @@
# https://github.com/all-licenses/GNU-General-Public-License-v3.0
# meta developer: @PyModule
+# requires: lyricsgenius===3.7.0
+
from lyricsgenius import Genius
from .. import loader, utils
diff --git a/fiksofficial/python-modules/point.py b/fiksofficial/python-modules/point.py
new file mode 100644
index 0000000..f6112c7
--- /dev/null
+++ b/fiksofficial/python-modules/point.py
@@ -0,0 +1,160 @@
+# ______ ___ ___ _ _
+# ____ | ___ \ | \/ | | | | |
+# / __ \| |_/ / _| . . | ___ __| |_ _| | ___
+# / / _` | __/ | | | |\/| |/ _ \ / _` | | | | |/ _ \
+# | | (_| | | | |_| | | | | (_) | (_| | |_| | | __/
+# \ \__,_\_| \__, \_| |_/\___/ \__,_|\__,_|_|\___|
+# \____/ __/ |
+# |___/
+
+# На модуль распространяется лицензия "GNU General Public License v3.0"
+# https://github.com/all-licenses/GNU-General-Public-License-v3.0
+
+# meta developer: @pymodule
+
+from .. import loader, utils
+
+
+@loader.tds
+class PointSentenceCaseMod(loader.Module):
+ """Automatically capitalizes the first letter of each sentence and adds a period at the end of the message (if there isn't one)."""
+
+ strings = {
+ "name": "PointSentenceCase",
+ "enabled": "The module is activated ✅",
+ "disabled": "The module is deactivated ❌",
+ "status": "Current status: {status}\nIgnore channels: {ignore_channels}\n\nUsage:\n.pointcase on|off\n.pointcaseignore on|off",
+ "status_on": "✅ Enabled",
+ "status_off": "❌ Off",
+ "ignore_on": "✅ Ignoring channels",
+ "ignore_off": "❌ Not ignoring channels",
+ }
+
+ strings_ru = {
+ "_cls_doc": "Автоматически делает первую букву каждого предложения заглавной и добавляет точку в конце сообщения (если её нет).",
+ "enabled": "Модуль активирован ✅",
+ "disabled": "Модуль деактивирован ❌",
+ "status": "Текущий статус: {status}\nИгнорировать каналы: {ignore_channels}\n\nИспользование:\n.pointcase on|off\n.pointcaseignore on|off",
+ "status_on": "✅ Включен",
+ "status_off": "❌ Выключен",
+ "ignore_on": "✅ Каналы игнорируются",
+ "ignore_off": "❌ Каналы не игнорируются",
+ }
+
+ async def client_ready(self, client, db):
+ self.client = client
+ self.db = db
+ if self.db.get("PointSentenceCase", "enabled") is None:
+ self.db.set("PointSentenceCase", "enabled", True)
+ if self.db.get("PointSentenceCase", "ignore_channels") is None:
+ self.db.set("PointSentenceCase", "ignore_channels", True)
+
+ @loader.command(ru_doc="{on/off} — включает/выключает модуль")
+ async def pointcase(self, message):
+ """{on/off} - enables/disables the module"""
+ args = utils.get_args_raw(message).lower()
+
+ if args == "on":
+ self.db.set("PointSentenceCase", "enabled", True)
+ await utils.answer(message, self.strings("enabled"))
+ elif args == "off":
+ self.db.set("PointSentenceCase", "enabled", False)
+ await utils.answer(message, self.strings("disabled"))
+ else:
+ status = self.db.get("PointSentenceCase", "enabled", True)
+ ignore = self.db.get("PointSentenceCase", "ignore_channels", True)
+ await utils.answer(
+ message,
+ self.strings("status").format(
+ status=self.strings("status_on") if status else self.strings("status_off"),
+ ignore_channels=self.strings("ignore_on") if ignore else self.strings("ignore_off"),
+ ),
+ )
+
+ @loader.command(ru_doc="{on/off} — включает/выключает игнорирование каналов")
+ async def pointcaseignore(self, message):
+ """{on/off} - enables/disables ignoring channels"""
+ args = utils.get_args_raw(message).lower()
+
+ if args == "on":
+ self.db.set("PointSentenceCase", "ignore_channels", True)
+ await utils.answer(message, self.strings("ignore_on"))
+ elif args == "off":
+ self.db.set("PointSentenceCase", "ignore_channels", False)
+ await utils.answer(message, self.strings("ignore_off"))
+ else:
+ ignore = self.db.get("PointSentenceCase", "ignore_channels", True)
+ await utils.answer(
+ message,
+ self.strings("ignore_on") if ignore else self.strings("ignore_off"),
+ )
+
+ async def watcher(self, message):
+ if not self.db.get("PointSentenceCase", "enabled", True):
+ return
+
+ if not message.out or not message.text:
+ return
+
+ if self.db.get("PointSentenceCase", "ignore_channels", True):
+ try:
+ peer = await message.get_chat()
+ if getattr(peer, "is_channel", False) and not getattr(peer, "is_group", False):
+ return
+ except Exception:
+ pass
+
+ text = message.text.strip()
+ if not text:
+ return
+
+ prefixes = self.get_prefix()
+ if isinstance(prefixes, str):
+ prefixes = [prefixes]
+
+ if any(text.startswith(prefix) for prefix in prefixes):
+ return
+
+ sentence_end_marks = {".", "!", "?", "…"}
+ result = ""
+ capitalize_next = True
+
+ for char in text:
+ if capitalize_next and char.isalpha():
+ result += char.upper()
+ capitalize_next = False
+ else:
+ result += char.lower()
+ if char in sentence_end_marks:
+ capitalize_next = True
+ elif char in {",", ":", "-", "#", "/", '"'}:
+ capitalize_next = False
+
+ last_char = result[-1] if result else ""
+ is_special = not last_char.isalnum() and not self.is_emoji(last_char)
+
+ if (
+ result
+ and last_char not in sentence_end_marks
+ and not self.is_emoji(last_char)
+ and not is_special
+ ):
+ result += "."
+
+ if result != text:
+ await message.edit(result)
+
+ def is_emoji(self, char: str) -> bool:
+ return any([
+ "\U0001F600" <= char <= "\U0001F64F",
+ "\U0001F300" <= char <= "\U0001F5FF",
+ "\U0001F680" <= char <= "\U0001F6FF",
+ "\U0001F700" <= char <= "\U0001F77F",
+ "\U0001F780" <= char <= "\U0001F7FF",
+ "\U0001F800" <= char <= "\U0001F8FF",
+ "\U0001F900" <= char <= "\U0001F9FF",
+ "\U0001FA00" <= char <= "\U0001FA6F",
+ "\U0001FA70" <= char <= "\U0001FAFF",
+ "\U00002702" <= char <= "\U000027B0",
+ "\U000024C2" <= char <= "\U0001F251",
+ ])
\ No newline at end of file
diff --git a/fiksofficial/python-modules/tagall2.0.py b/fiksofficial/python-modules/tagall2.0.py
new file mode 100644
index 0000000..3655629
--- /dev/null
+++ b/fiksofficial/python-modules/tagall2.0.py
@@ -0,0 +1,110 @@
+# ______ ___ ___ _ _
+# ____ | ___ \ | \/ | | | | |
+# / __ \| |_/ / _| . . | ___ __| |_ _| | ___
+# / / _` | __/ | | | |\/| |/ _ \ / _` | | | | |/ _ \
+# | | (_| | | | |_| | | | | (_) | (_| | |_| | | __/
+# \ \__,_\_| \__, \_| |_/\___/ \__,_|\__,_|_|\___|
+# \____/ __/ |
+# |___/
+
+# На модуль распространяется лицензия "GNU General Public License v3.0"
+# https://github.com/all-licenses/GNU-General-Public-License-v3.0
+
+# meta developer: @pymodule
+
+from .. import loader, utils
+from telethon.tl.types import ChannelParticipantsAdmins, UserStatusRecently, UserStatusOnline, Message
+import typing
+import asyncio
+
+@loader.tds
+class TagAllMod(loader.Module):
+ """TagAll 2.0 — smart mention of chat participants: .tagall {all/admins/online/active} {text}"""
+
+ strings = {
+ "name": "TagAll 2.0",
+ "done": "✅ {} users mentioned",
+ "no_users": "⚠️ No users found matching this filter",
+ "invalid_args": "❌ Invalid command format. Use: .tagall {all/admins/online/active} {text}",
+ }
+
+ strings_ru = {
+ "_cls_doc": "TagAll 2.0 — умное упоминание участников чата: .tagall {all/admins/online/active} {текст}",
+ "done": "✅ Упомянуто {} пользователей",
+ "no_users": "⚠️ Не найдено пользователей по данному фильтру",
+ "invalid_args": "❌ Неверный формат команды. Используйте: .tagall {all/admins/online/active} {текст}",
+ }
+
+ async def client_ready(self, client, db):
+ self.client = client
+
+ @loader.command(ru_doc="Упомянуть участников: .tagall {all/admins/online/active} {текст}")
+ async def tagallcmd(self, message: Message):
+ """Mention members: .tagall {all/admins/online/active} {text}"""
+ args = utils.get_args_raw(message).split(maxsplit=1)
+ mode = args[0].lower() if args else None
+ text = args[1] if len(args) > 1 else "Срочное собрание!"
+
+ valid_modes = {"all", "admins", "online", "active"}
+
+ if mode not in valid_modes:
+ await utils.answer(message, self.strings["invalid_args"])
+ return
+
+ chat = await self.client.get_entity(message.chat_id)
+ tagged = await self._do_tagall(chat, mode, text)
+ if not tagged:
+ await utils.answer(message, self.strings["no_users"])
+ return
+ await utils.answer(message, self.strings["done"].format(tagged))
+
+ async def _do_tagall(self, chat, filter_: str, text: str = "") -> typing.Optional[int]:
+ users = []
+
+ try:
+ if filter_ == "all":
+ async for user in self.client.iter_participants(chat):
+ if not user.bot:
+ users.append(user)
+
+ elif filter_ == "admins":
+ async for user in self.client.iter_participants(chat, filter=ChannelParticipantsAdmins):
+ if not user.bot:
+ users.append(user)
+
+ elif filter_ == "online":
+ async for user in self.client.iter_participants(chat):
+ if not user.bot and isinstance(user.status, (UserStatusRecently, UserStatusOnline)):
+ users.append(user)
+
+ elif filter_ == "active":
+ user_ids = set()
+ async for msg in self.client.iter_messages(chat, limit=50):
+ if msg.sender_id and msg.sender_id not in user_ids:
+ try:
+ user = await self.client.get_entity(msg.sender_id)
+ if not user.bot:
+ users.append(user)
+ user_ids.add(msg.sender_id)
+ except Exception:
+ continue
+
+ if not users:
+ return None
+
+ batch_size = 5
+ tagged = 0
+ for i in range(0, len(users), batch_size):
+ batch = users[i:i + batch_size]
+ mentions = " ".join([f"{u.first_name or 'User'}" for u in batch])
+ msg_text = f"{text}\n{mentions}" if text else mentions
+ await self.client.send_message(chat, msg_text, link_preview=False, parse_mode="html")
+ tagged += len(batch)
+ if i + batch_size < len(users):
+ await asyncio.sleep(2)
+
+ return tagged
+
+ except Exception as e:
+ self._log.error(f"Error in tagall: {e}")
+ return None
\ No newline at end of file
diff --git a/mead0wsss/mead0wsMods/README.md b/mead0wsss/mead0wsMods/README.md
new file mode 100644
index 0000000..8f03577
--- /dev/null
+++ b/mead0wsss/mead0wsMods/README.md
@@ -0,0 +1 @@
+эрон дон дон
diff --git a/mead0wsss/mead0wsMods/SenderGifts.py b/mead0wsss/mead0wsMods/SenderGifts.py
index e56dd86..6b79d4c 100644
--- a/mead0wsss/mead0wsMods/SenderGifts.py
+++ b/mead0wsss/mead0wsMods/SenderGifts.py
@@ -1,5 +1,5 @@
# -- version --
-__version__ = (1, 0, 0)
+__version__ = (1, 2, 0)
# -- version --
@@ -17,95 +17,209 @@ __version__ = (1, 0, 0)
# scope: heroku_only
from .. import loader, utils
-from herokutl.tl.functions.payments import GetPaymentFormRequest, SendStarsFormRequest
+from herokutl.tl.functions.payments import GetPaymentFormRequest, SendStarsFormRequest, GetStarsStatusRequest
from herokutl.tl.types import InputInvoiceStarGift, TextWithEntities
from herokutl.errors.rpcerrorlist import BadRequestError
import logging
@loader.tds
class SenderGifts(loader.Module):
- """Модуль для отправки подарков"""
-
+ """Модуль для отправки подарков Telegram прямиком в чате"""
strings = {
"name": "SenderGifts",
- "usage": "❌ Используйте в формате: .sendgift @username текст",
+ "usage": "❌ Используйте в формате: .sendgift @username текст или реплай + .sendgift текст",
"checking_user": "🔍 Проверка пользователя...",
+ "checking_balance": "🔍 Проверка баланса...",
"user_not_found": "❌ Пользователь не найден",
- "gift_menu": "🎁 Выберите подарок.\n\n👤 Пользователь: {}\n📄 Текст: {}",
+ "gift_menu": "🎁 Выберите категорию подарков.\n\n👤 Пользователь: {}\n📄 Текст: {}\n⭐ Баланс: {} звезд",
+ "category_menu": "🎁 Подарки за {} ⭐\n\n👤 Пользователь: {}\n📄 Текст: {}",
"sending_gift": "🛫 Отправка подарка...",
"gift_sent": "✅ Подарок успешно отправлен!",
"not_enough_stars": "❌ Недостаточно звезд для отправки подарка {}!",
+ "min_stars_error": "❌ Недостаточно звезд для отправки минимального подарка!",
+ "no_available_gifts": "❌ Нет доступных подарков для вашего баланса",
+ "balance_error": "❌ Ошибка при проверке баланса",
}
- gifts = [
- [
- {"id": 5170145012310081615, "stars": 15, "emoji": "❤️", "name": "Сердце"},
- {"id": 5170233102089322756, "stars": 15, "emoji": "🧸", "name": "Мишка"},
- {"id": 5170250947678437525, "stars": 25, "emoji": "🎁", "name": "Подарок"},
+ gift_categories = {
+ 15: [
+ {"id": 5170145012310081615, "emoji": "❤️", "name": "Сердце"},
+ {"id": 5170233102089322756, "emoji": "🧸", "name": "Мишка"},
],
- [
- {"id": 5168103777563050263, "stars": 25, "emoji": "🌹", "name": "Роза"},
- {"id": 5170144170496491616, "stars": 50, "emoji": "🎂", "name": "Тортик"},
- {"id": 5170314324215857265, "stars": 50, "emoji": "💐", "name": "Цветы"},
+ 25: [
+ {"id": 5170250947678437525, "emoji": "🎁", "name": "Подарок"},
+ {"id": 5168103777563050263, "emoji": "🌹", "name": "Роза"},
],
- [
- {"id": 5170564780938756245, "stars": 50, "emoji": "🚀", "name": "Ракета"},
- {"id": 5168043875654172773, "stars": 100, "emoji": "🏆", "name": "Кубок"},
- {"id": 5170690322832818290, "stars": 100, "emoji": "💍", "name": "Кольцо"},
+ 50: [
+ {"id": 5170144170496491616, "emoji": "🎂", "name": "Тортик"},
+ {"id": 5170314324215857265, "emoji": "💐", "name": "Цветы"},
+ {"id": 5170564780938756245, "emoji": "🚀", "name": "Ракета"},
+ ],
+ 100: [
+ {"id": 5168043875654172773, "emoji": "🏆", "name": "Кубок"},
+ {"id": 5170690322832818290, "emoji": "💍", "name": "Кольцо"},
+ {"id": 5170521118301225164, "emoji": "💎", "name": "Алмаз"},
]
- ]
+ }
async def client_ready(self, client, db):
self.client = client
+ async def get_star_balance(self):
+ try:
+ balance_info = (await self.client(GetStarsStatusRequest("me")))
+ return balance_info.balance.amount
+ except Exception as e:
+ logging.error(f"Error getting balance: {e}")
+ return 0
+
@loader.command()
async def sendgift(self, message):
- """Отправить подарок пользователю"""
+ """- - отправить подарок пользователю (* - необязательный параметр.) Поддерживается реплай режим."""
args = utils.get_args_raw(message)
- if not args:
- await utils.answer(message, self.strings["usage"])
- return
-
- parts = args.split(maxsplit=1)
- if len(parts) < 1:
- await utils.answer(message, self.strings["usage"])
- return
-
- username = parts[0]
- text = parts[1] if len(parts) > 1 else ""
- if username.startswith('@'):
- username = username[1:]
- msg = await utils.answer(message, self.strings["checking_user"])
+ reply = await message.get_reply_message()
+ if reply:
+ user = reply.sender
+ text = args if args else ""
+ else:
+ if not args:
+ await utils.answer(message, self.strings["usage"])
+ return
+ parts = args.split(maxsplit=1)
+ if len(parts) < 1:
+ await utils.answer(message, self.strings["usage"])
+ return
+ username = parts[0]
+ text = parts[1] if len(parts) > 1 else ""
+ if username.startswith('@'):
+ username = username[1:]
+ msg = await utils.answer(message, self.strings["checking_user"])
+ try:
+ user = await self.client.get_entity(username)
+ except Exception as e:
+ logging.error(f"User not found: {e}")
+ await utils.answer(msg, self.strings["user_not_found"])
+ return
+
+ balance_msg = await utils.answer(message, self.strings["checking_balance"])
try:
- user = await self.client.get_entity(username)
+ balance = await self.get_star_balance()
except Exception as e:
- logging.error(f"User not found: {e}")
- await utils.answer(msg, self.strings["user_not_found"])
+ logging.error(f"Balance error: {e}")
+ await utils.answer(balance_msg, self.strings["balance_error"])
+ return
+
+ min_price = min(self.gift_categories.keys())
+ if balance < min_price:
+ await utils.answer(balance_msg, self.strings["min_stars_error"])
+ return
+
+ available_categories = [price for price in self.gift_categories.keys() if balance >= price]
+ if not available_categories:
+ await utils.answer(balance_msg, self.strings["no_available_gifts"])
return
buttons = []
- for row in self.gifts:
- btn_row = []
- for gift in row:
- btn_row.append({
- "text": gift["emoji"],
- "callback": self._send_gift,
- "args": (user.id, gift["id"], text, gift["emoji"], msg.id),
- })
- buttons.append(btn_row)
+ row = []
+ for price in sorted(available_categories):
+ row.append({
+ "text": f"{price} ⭐",
+ "callback": self._show_category,
+ "args": (user.id, price, text, balance, message.id),
+ })
+ if len(row) == 2:
+ buttons.append(row)
+ row = []
+
+ if row:
+ buttons.append(row)
+
await utils.answer(
- msg,
+ balance_msg,
self.strings["gift_menu"].format(
f"@{user.username}" if user.username else user.first_name,
+ text if text else "-",
+ balance
+ ),
+ reply_markup=buttons
+ )
+
+ async def _show_category(self, call, user_id, price, text, balance, msg_id):
+ gifts = self.gift_categories[price]
+ buttons = []
+ row = []
+ for gift in gifts:
+ row.append({
+ "text": gift["emoji"],
+ "callback": self._send_gift,
+ "args": (user_id, gift["id"], text, gift["emoji"], msg_id, balance),
+ })
+ if len(row) == 3:
+ buttons.append(row)
+ row = []
+
+ if row:
+ buttons.append(row)
+ buttons.append([{
+ "text": "⬅️ Назад",
+ "callback": self._back_to_categories,
+ "args": (user_id, text, balance, msg_id),
+ }])
+
+ try:
+ user = await self.client.get_entity(user_id)
+ user_display = f"@{user.username}" if user.username else user.first_name
+ except:
+ user_display = f"ID: {user_id}"
+
+ await call.edit(
+ self.strings["category_menu"].format(
+ price,
+ user_display,
text if text else "-"
),
reply_markup=buttons
)
- async def _send_gift(self, call, user_id, gift_id, text, gift_emoji, msg_id):
+
+ async def _back_to_categories(self, call, user_id, text, balance, msg_id):
+ try:
+ user = await self.client.get_entity(user_id)
+ except:
+ await call.answer("Ошибка получения пользователя", show_alert=True)
+ return
+
+ available_categories = [price for price in self.gift_categories.keys() if balance >= price]
+
+ buttons = []
+ row = []
+ for price in sorted(available_categories):
+ row.append({
+ "text": f"{price} ⭐",
+ "callback": self._show_category,
+ "args": (user_id, price, text, balance, msg_id),
+ })
+ if len(row) == 2:
+ buttons.append(row)
+ row = []
+
+ if row:
+ buttons.append(row)
+
+ await call.edit(
+ self.strings["gift_menu"].format(
+ f"@{user.username}" if user.username else user.first_name,
+ text if text else "-",
+ balance
+ ),
+ reply_markup=buttons
+ )
+
+ async def _send_gift(self, call, user_id, gift_id, text, gift_emoji, msg_id, balance):
try:
await call.edit(
self.strings["sending_gift"],
reply_markup=None
)
+
user = await self.client.get_input_entity(user_id)
inv = InputInvoiceStarGift(
user,
@@ -116,6 +230,7 @@ class SenderGifts(loader.Module):
result = await self.client(SendStarsFormRequest(form.form_id, inv))
await call.edit(self.strings["gift_sent"])
+
except BadRequestError as e:
if "BALANCE_TOO_LOW" in str(e):
await call.edit(
@@ -125,13 +240,12 @@ class SenderGifts(loader.Module):
else:
logging.error(f"Error sending gift: {e}")
await call.edit(
- f"❌ Ошибка при отправке подарка: {str(e)}",
+ f"❌ Ошибка при отправке подарка: {str(e)}",
reply_markup=None
)
except Exception as e:
logging.error(f"Error sending gift: {e}")
await call.edit(
- f"❌ Ошибка при отправке подарка: {str(e)}",
+ f"❌ Ошибка при отправке подарка: {str(e)}",
reply_markup=None
)
-# эрон Дон Дон
diff --git a/unneyon/hikka-mods/langpacks/yamusic.yml b/unneyon/hikka-mods/langpacks/yamusic.yml
new file mode 100644
index 0000000..1b7efe0
--- /dev/null
+++ b/unneyon/hikka-mods/langpacks/yamusic.yml
@@ -0,0 +1,63 @@
+en:
+ guide: "📜 Guide for obtaining access token for Yandex.Music"
+ iguide: "📜 Guide for obtaining access token for Yandex.Music"
+ no_token: "❌ You didn't specify the access token in the config!"
+ autobio:
+ d: "🎧 Autobio is off now"
+ e: "🎧 Autobio is on now"
+ there_is_no_playing: "❌ You don't listening to anything right now"
+ queue_types:
+ VARIOUS: "Your queue"
+ RADIO: "«My Wave»"
+ PLAYLIST: "Playlist «{}»"
+ ALBUM: "«{}»"
+ ARTIST: "Popular tracks by {}"
+ downloading: "\n\n🕔 Downloading audio…"
+ uploading_banner: "\n\n🕔 Uploading banner…"
+ likes:
+ liked: "❤️ Track {track} was liked"
+ unliked: "❤️ Track {track} was unliked"
+ disliked: "💔 Track {track} was disliked"
+ lyrics: "📜 Lyrics of the {track} track:\n{text}
\n\n©️ Writers: {writers}"
+ no_lyrics: "❌ Track {track} has no lyrics!"
+ args: "❌ Specify search query"
+ searching: "🔍 Searching…"
+ 404: "❌ No results found"
+ search: "🎧 {performer} — {title}\n🎵 Yandex.Music | song.link"
+ _cfg:
+ token: "Your access token for Yandex.Music"
+ now_playing_text: "The text that is used in commands to get now playing track. May contain {performer}, {title}, {device}, {volume}, {playing_from}, {link}, {track_id}, {album_id} keywords"
+ autobio: "Automatic bio template (may contain {artist} and {title} keywords)"
+ no_playing_bio: "Bio that is set when nothing is playing"
+
+ru:
+ guide: "📜 Гайд по получению токена Яндекс.Музыки"
+ iguide: "📜 Гайд по получению токена Яндекс.Музыки"
+ no_token: "❌ Вы не указали токен Яндекс.Музыки в конфиге!"
+ autobio:
+ d: "🎧 Автобио выключено"
+ e: "🎧 Автобио включено"
+ there_is_no_playing: "❌ Вы ничего не слушаете сейчас"
+ queue_types:
+ VARIOUS: "Ваша очередь"
+ RADIO: "«Моя Волна»"
+ PLAYLIST: "Плейлист «{}»"
+ ALBUM: "«{}»"
+ ARTIST: "Популярные треки {}"
+ downloading: "\n\n🕔 Загрузка трека…"
+ uploading_banner: "\n\n🕔 Загрузка баннера…"
+ likes:
+ liked: "❤️ Трек {track} лайкнут"
+ unliked: "❤️ С трека {track} снят лайк"
+ disliked: "💔 Трек {track} дизлайкнут"
+ lyrics: "📜 Текст трека {track}:\n{text}
\n\n©️ Авторы: {writers}"
+ no_lyrics: "❌ У трека {track} нет текста!"
+ args: "❌ Укажите поисковый запрос"
+ searching: "🔍 Ищем…"
+ 404: "❌ Ничего не найдено"
+ search: "🎧 {performer} — {title}\n🎵 Яндекс.Музыка | song.link"
+ _cfg:
+ token: "Ваш токен от Яндекс.Музыки"
+ now_playing_text: "Текст, использующийся в командах для получения прослушиваемого трека. Может содержать ключевые слова {performer}, {title}, {device}, {volume}, {playing_from}, {link}, {track_id}, {album_id}"
+ autobio: "Шаблон автоматического био (может содержать ключевые слова {artist} и {title})"
+ no_playing_bio: "Био, которое ставится, когда ничего не играет"
\ No newline at end of file
diff --git a/unneyon/hikka-mods/yamusic.py b/unneyon/hikka-mods/yamusic.py
index e012df2..18ef573 100644
--- a/unneyon/hikka-mods/yamusic.py
+++ b/unneyon/hikka-mods/yamusic.py
@@ -1,4 +1,4 @@
-__version__ = (1, 0, 3)
+__version__ = (1, 1, 0)
# █▄▀ ▄▀█ █▀▄▀█ █▀▀ █▄▀ █ █ █▀█ █▀█
# █ █ █▀█ █ ▀ █ ██▄ █ █ ▀▄▄▀ █▀▄ █▄█ ▄
# © Copyright 2025
@@ -15,6 +15,7 @@ __version__ = (1, 0, 3)
# meta banner: https://raw.githubusercontent.com/kamekuro/hikka-mods/main/banners/yamusic.png
# meta pic: https://raw.githubusercontent.com/kamekuro/hikka-mods/main/icons/yamusic.png
# meta developer: @kamekuro_hmods
+# packurl: https://raw.githubusercontent.com/kamekuro/hikka-mods/main/langpacks/yamusic.yml
# scope: hikka_only
# scope: hikka_min 1.6.3
# requires: aiohttp asyncio requests pillow==11.2.1 git+https://github.com/MarshalX/yandex-music-api
@@ -25,6 +26,7 @@ import io
import json
import logging
import random
+import re
import requests
import string
import yandex_music
@@ -35,7 +37,6 @@ from PIL import (
Image, ImageDraw, ImageEnhance,
ImageFilter, ImageFont
)
-import yandex_music.exceptions
from .. import loader, utils
@@ -43,564 +44,40 @@ from .. import loader, utils
logger = logging.getLogger(__name__)
-@loader.tds
-class YaMusicMod(loader.Module):
- """The module for Yandex.Music streaming service"""
-
- strings = {
- "name": "YaMusic",
- "queue_types": {
- "VARIOUS": "Your queue",
- "RADIO": "«My Wave»",
- "PLAYLIST": "Playlist «{}»",
- "ALBUM": "Album «{}»"
- },
- "guide": (
- "📜 Guide for obtaining a Yandex.Music token"
- ),
- "no_token": (
- "❌ You didn't specify "
- "the access token in the config!"
- ),
- "autobio_e": "🎧 Autobio is on now",
- "autobio_d": "🎧 Autobio is off now",
- "there_is_no_playing": (
- "❌ You don't "
- "listening to anything right now."
- ),
- "now": (
- "🎧 {performer} — {title}\n\n"
- "⌨️ Now is listening on {device} "
- "(🔊 {volume}%)\n"
- "🗂 Playing from: {playing_from}\n\n"
- "🎵 Yandex.Music | "
- "song.link"
- ),
- "downloading": "\n\n🕔 Downloading audio…",
- "downloading_banner": "\n\n🕔 Downloading banner…",
- "likes": {
- "liked": (
- "❤️ Track "
- "{track} "
- "was successfully liked"
- ),
- "unliked": (
- "❤️ Track "
- "{track} "
- "was successfully unliked"
- ),
- "disliked": (
- "💔 Track "
- "{track} "
- "was successfully disliked"
- )
- },
- "lyrics": (
- "📜 Lyrics of the {track} track:\n"
- "{text}
\n\n"
- "©️ Writers: {writers}"
- ),
- "no_lyrics": (
- "❌ Track "
- "{track} "
- "has no lyrics!"
- ),
- "search": (
- "🎧 {performer} — {title}\n"
- "🎵 Yandex.Music | "
- "song.link"
- ),
- "args": "❌ Specify search query",
- "404": "❌ No results found",
- "searching": "🔍 Searching…",
- "_cfg_token": "Your access token of Yandex.Music",
- "_cfg_autobio": "Automatic bio template (may contain {artist} and {title})",
- "_cfg_no_playing_bio": "Bio that is set when nothing is playing"
- }
-
- strings_ru = {
- "_cls_doc": "Модуль для стримингового сервиса Яндекс.Музыка",
- "queue_types": {
- "VARIOUS": "Ваша очередь",
- "RADIO": "«Моя Волна»",
- "PLAYLIST": "Плейлист «{}»",
- "ALBUM": "Альбом «{}»"
- },
- "guide": (
- "📜 Гайд по получению токена Яндекс.Музыки"
- ),
- "no_token": (
- "❌ Ты не "
- "указал токен Яндекс.Музыки в конфиге!"
- ),
- "autobio_e": "🎧 Автобио включено",
- "autobio_d": "🎧 Автобио выключено",
- "there_is_no_playing": (
- "❌ Ты ничего "
- "не слушаешь сейчас."
- ),
- "now": (
- "🎧 {performer} — {title}\n\n"
- "⌨️ Сейчас слушаю на {device} "
- " (🔊 {volume}%)\n"
- "🗂 Откуда играет: {playing_from}\n\n"
- "🎵 Яндекс.Музыка | "
- "song.link"
- ),
- "downloading": "\n\n🕔 Загрузка трека…",
- "downloading_banner": "\n\n🕔 Загрузка баннера…",
- "likes": {
- "liked": (
- "❤️ Лайкнул трек "
- "{track}"
- ),
- "unliked": (
- "❤️ Убрал лайк с трека "
- "{track}"
- ),
- "disliked": (
- "💔 Дизлайкнул трек "
- "{track}"
- )
- },
- "lyrics": (
- "📜 Текст трека "
- "{track}:\n"
- "{text}
\n\n"
- "©️ Авторы: {writers}"
- ),
- "no_lyrics": (
- "❌ У трека "
- "{track} "
- "нет текста!"
- ),
- "search": (
- "🎧 {performer} — {title}\n"
- "🎵 Яндекс.Музыка | "
- "song.link"
- ),
- "args": "❌ Укажите поисковый запрос",
- "404": "❌ Ничего не найдено",
- "searching": "🔍 Ищем…",
- "_cfg_token": "Твой токен от Яндекс.Музыки",
- "_cfg_autobio": "Шаблон автоматического био (может содержать {artist} и {title})",
- "_cfg_no_playing_bio": "Био, которое ставится, когда ничего не играет"
- }
-
-
- def __init__(self):
- self.config = loader.ModuleConfig(
- loader.ConfigValue(
- "token",
- None,
- lambda: self.strings["_cfg_token"],
- validator=loader.validators.Hidden()
- ),
- loader.ConfigValue(
- "autobio",
- "🎧 {artist} - {title}",
- lambda: self.strings["_cfg_autobio"],
- validator=loader.validators.String()
- ),
- loader.ConfigValue(
- "no_playing_bio",
- "Hello!",
- lambda: self.strings["_cfg_no_playing_bio"],
- validator=loader.validators.String()
- )
- )
-
- async def on_dlmod(self):
- if not self.get("guide_send", False):
- await self.inline.bot.send_message(
- self._tg_id,
- self.strings("guide").replace("📜", "📜"),
- )
- self.set("guide_send", True)
-
- async def client_ready(self, client, db):
- self._client = client
- self._db = db
-
- me = await self._client.get_me()
- self._premium = me.premium if hasattr(me, "premium") else False
- self.premium_check.start()
-
- if self.get("autobio", False):
- self.autobio.start()
-
-
- @loader.loop(1800)
- async def premium_check(self):
- me = await self._client.get_me()
- self._premium = me.premium if hasattr(me, "premium") else False
-
-
- @loader.loop(30)
- async def autobio(self):
- if not self.config['token']:
- self.autobio.stop(); self.set("autobio", False)
- return
- client = yandex_music.Client(self.config['token']).init()
- now = await self.__get_now_playing(self.config['token'], client)
- out = self.config['no_playing_bio'][:(140 if self._premium else 70)]
- if now and (not now['paused']):
- out = self.config['autobio'].format(
- title=now['track']['title'],
- artist=", ".join(now['track']['artist'])
- )[:(140 if self._premium else 70)]
- try:
- await self._client(
- telethon.functions.account.UpdateProfileRequest(about=out)
- )
- except telethon.errors.rpcerrorlist.FloodWaitError as e:
- logger.info(f"Sleeping {max(e.seconds, 60)} because of floodwait")
- await asyncio.sleep(max(e.seconds, 60))
-
-
- @loader.command(
- ru_doc="👉 Гайд по получению токена Яндекс.Музыки",
- alias="yg"
- )
- async def yguidecmd(self, message: telethon.types.Message):
- """👉 Guide for obtaining a Yandex.Music token"""
- await utils.answer(message, self.strings("guide"))
-
-
- @loader.command(
- ru_doc="👉 Включить/выключить автобио",
- alias="yb"
- )
- async def ybiocmd(self, message: telethon.types.Message):
- """👉 Enable/disable autobio"""
-
- if not self.config['token']:
- return await utils.answer(message, self.strings("no_token"))
-
- bio_now = self.get("autobio", False)
- self.set("autobio", not bio_now)
- if (not bio_now):
- self.autobio.start()
- else:
- self.autobio.stop()
- try:
- await self._client(
- telethon.functions.account.UpdateProfileRequest(
- about=self.config['no_playing_bio'][:(140 if self._premium else 70)]
- )
- )
- except: pass
-
- await utils.answer(
- message,
- self.strings(f"autobio_{'e' if (not bio_now) else 'd'}")
- )
-
-
- @loader.command(
- ru_doc="👉 Получить трек, который играет сейчас",
- alias="yn"
- )
- async def ynowcmd(self, message: telethon.types.Message):
- """👉 Get now playing track"""
-
- if not self.config['token']:
- return await utils.answer(message, self.strings("no_token"))
- client = yandex_music.Client(self.config['token']).init()
- now = await self.__get_now_playing(self.config['token'], client)
- if not now:
- return await utils.answer(message, self.strings("there_is_no_playing"))
-
- playlist_name = ""
- if now['entity_type'] in ["PLAYLIST", "ALBUM"]:
- func = getattr(
- client,
- "playlists_list" if now['entity_type'] == "PLAYLIST" else "albums"
- )
- if func:
- entity = func(now['entity_id'])[0]
- playlist_name = f"{entity.title}"
- else:
- now['entity_type'] = "RADIO"
-
- device_eid, device, volume = "6039404727542747508", "Unknown Device", "❓"
- if now['device']:
- device=now['device']['info']['title']
- volume=round(now['device']['volume']*100, 2)
- if now['device']['info']['type'] == "ANDROID": device_eid = "5373266788970670174"
- if now['device']['info']['type'] == "IOS": device_eid = "5372908412604525258"
-
- out = self.strings("now").format(
- title=now['track']['title'],
- performer=", ".join(now['track']['artist']),
- device=device, volume=volume, device_eid=device_eid,
- playing_from=self.strings("queue_types").get(now['entity_type'], "VARIOUS").format(playlist_name),
- track_id=now['track']['track_id'],
- album_id=now['track']['album_id']
- )
-
- await utils.answer(
- message, out+self.strings("downloading")
- )
-
- audio = io.BytesIO((await utils.run_sync(requests.get, now['track']['download_link'])).content)
- audio.name = "audio.mp3"
-
- await utils.answer(
- message=message, response=out,
- file=audio,
- attributes=([
- telethon.types.DocumentAttributeAudio(
- duration=now['track']['duration'],
- title=now['track']['title'],
- performer=", ".join(now['track']['artist'])
- )
- ])
- )
-
-
- @loader.command(
- ru_doc="👉 Получить баннер трека, который играет сейчас",
- alias="ynb"
- )
- async def ynowbcmd(self, message: telethon.types.Message):
- """👉 Get now playing track's banner"""
-
- if not self.config['token']:
- return await utils.answer(message, self.strings("no_token"))
- client = yandex_music.Client(self.config['token']).init()
- now = await self.__get_now_playing(self.config['token'], client)
- if not now:
- return await utils.answer(message, self.strings("there_is_no_playing"))
-
- playlist_name = ""
- if now['entity_type'] in ["PLAYLIST", "ALBUM"]:
- func = getattr(
- client,
- "playlists_list" if now['entity_type'] == "PLAYLIST" else "albums"
- )
- if func:
- entity = func(now['entity_id'])[0]
- playlist_name = f"{entity.title}"
- else:
- now['entity_type'] = "RADIO"
-
- device_eid, device, volume = "6039404727542747508", "Unknown Device", "❓"
- if now['device']:
- device=now['device']['info']['title']
- volume=round(now['device']['volume']*100, 2)
- if now['device']['info']['type'] == "ANDROID": device_eid = "5373266788970670174"
- if now['device']['info']['type'] == "IOS": device_eid = "5372908412604525258"
-
- out = self.strings("now").format(
- title=now['track']['title'],
- performer=", ".join(now['track']['artist']),
- device=device, volume=volume, device_eid=device_eid,
- playing_from=self.strings("queue_types").get(now['entity_type'], "VARIOUS").format(playlist_name),
- track_id=now['track']['track_id'],
- album_id=now['track']['album_id']
- )
-
- await utils.answer(
- message, out+self.strings("downloading_banner")
- )
-
- file = self.__create_banner(
- now['track']['title'], now['track']['artist'],
- now['duration_ms'], now['progress_ms'],
- requests.get(now['track']['img']).content
- )
- await utils.answer(
- message=message, response=out, file=file
- )
-
-
- @loader.command(
- ru_doc="👉 Лайкнуть играющий сейчас трек"
- )
- async def ylikecmd(self, message: telethon.types.Message):
- """👉 Like now playing track's banner"""
-
- if not self.config['token']:
- return await utils.answer(message, self.strings("no_token"))
- client = yandex_music.Client(self.config['token']).init()
- now = await self.__get_now_playing(self.config['token'], client)
- if not now:
- return await utils.answer(message, self.strings("there_is_no_playing"))
-
- client.users_likes_tracks_add(now['track']['track_id'])
- await utils.answer(
- message, self.strings("likes")['liked'].format(
- track_id=now['track']['track_id'], album_id=now['track']['album_id'],
- track=f"{', '.join(now['track']['artist'])} — {now['track']['title']}"
- )
- )
-
- @loader.command(
- ru_doc="👉 Убрать лайк с играющего сейчас трека"
- )
- async def yunlikecmd(self, message: telethon.types.Message):
- """👉 Unlike now playing track"""
-
- if not self.config['token']:
- return await utils.answer(message, self.strings("no_token"))
- client = yandex_music.Client(self.config['token']).init()
- now = await self.__get_now_playing(self.config['token'], client)
- if not now:
- return await utils.answer(message, self.strings("there_is_no_playing"))
-
- client.users_likes_tracks_remove(now['track']['track_id'])
- await utils.answer(
- message, self.strings("likes")['unliked'].format(
- track_id=now['track']['track_id'], album_id=now['track']['album_id'],
- track=f"{', '.join(now['track']['artist'])} — {now['track']['title']}"
- )
- )
-
- @loader.command(
- ru_doc="👉 Дизлайкнуть играющий сейчас трек",
- alias="ydis"
- )
- async def ydislikecmd(self, message: telethon.types.Message):
- """👉 Dislike now playing track"""
-
- if not self.config['token']:
- return await utils.answer(message, self.strings("no_token"))
- client = yandex_music.Client(self.config['token']).init()
- now = await self.__get_now_playing(self.config['token'], client)
- if not now:
- return await utils.answer(message, self.strings("there_is_no_playing"))
-
- client.users_dislikes_tracks_add(now['track']['track_id'])
- await utils.answer(
- message, self.strings("likes")['disliked'].format(
- track_id=now['track']['track_id'], album_id=now['track']['album_id'],
- track=f"{', '.join(now['track']['artist'])} — {now['track']['title']}"
- )
- )
-
-
- @loader.command(
- ru_doc="👉 Получить текст играющего сейчас трека"
- )
- async def ylyricscmd(self, message: telethon.types.Message):
- """👉 Get lyrics of the now playing track"""
-
- if not self.config['token']:
- return await utils.answer(message, self.strings("no_token"))
- client = yandex_music.Client(self.config['token']).init()
- now = await self.__get_now_playing(self.config['token'], client)
- if not now:
- return await utils.answer(message, self.strings("there_is_no_playing"))
-
- try:
- lyrics = client.tracks_lyrics(now['track']['track_id'])
- await utils.answer(
- message, self.strings("lyrics").format(
- track_id=now['track']['track_id'], album_id=now['track']['album_id'],
- track=f"{', '.join(now['track']['artist'])} — {now['track']['title']}",
- text=requests.get(lyrics.download_url).text,
- writers=", ".join(lyrics.writers)
- )
- )
- except yandex_music.exceptions.NotFoundError:
- await utils.answer(
- message, self.strings("no_lyrics").format(
- track_id=now['track']['track_id'], album_id=now['track']['album_id'],
- track=f"{', '.join(now['track']['artist'])} — {now['track']['title']}"
- )
- )
-
-
- @loader.command(
- ru_doc="<запрос> 👉 Поиск трека в Яндекс.Музыке",
- alias="yq"
- )
- async def ysearchcmd(self, message: telethon.types.Message):
- """ 👉 Search track in Yandex.Music"""
-
- if not self.config['token']:
- return await utils.answer(message, self.strings("no_token"))
- client = yandex_music.Client(self.config['token']).init()
-
- query = utils.get_args_raw(message)
- if not query:
- await utils.answer(message, self.strings("args"))
- return
-
- message = await utils.answer(message, self.strings("searching"))
-
- search = client.search(query, type_="track")
- if (not search.tracks) or (len(search.tracks.results) == 0):
- return await utils.answer(message, self.strings("404"))
-
- out = self.strings("search").format(
- title=search.tracks.results[0].title + (
- f" ({search.tracks.results[0].version})" if search.tracks.results[0].version else ""
- ),
- performer=", ".join([x.name for x in search.tracks.results[0].artists]),
- album_id=search.tracks.results[0].albums[0].id, track_id=search.tracks.results[0].id
- )
- message = await utils.answer(message, out+self.strings("downloading"))
-
- info = client.tracks_download_info(search.tracks.results[0].id, True)
- link = info[0].direct_link
- audio = None
- audio = io.BytesIO((await utils.run_sync(requests.get, link)).content)
- audio.name = "audio.mp3"
-
- await utils.answer(
- message=message, response=out,
- file=audio,
- attributes=([
- telethon.types.DocumentAttributeAudio(
- duration=int(search.tracks.results[0].duration_ms / 1000),
- title=search.tracks.results[0].title,
- performer=", ".join([x.name for x in search.tracks.results[0].artists])
- )
- ])
- )
-
+class YandexMusic():
+ token: str
+ client: yandex_music.ClientAsync
+ def __init__(self, token: str):
+ self.client = yandex_music.ClientAsync(token)
+ self.token = token
+ async def init(self):
+ self.client = await self.client.init()
+ return self
# Original code: https://raw.githubusercontent.com/MIPOHBOPOHIH/YMMBFA/main/main.py
- async def __create_ynison_ws(self, yamusic_token: str, ws_proto: dict) -> dict:
+ async def _create_ynison_ws(self, ws_proto: dict) -> dict:
async with aiohttp.ClientSession() as session:
async with session.ws_connect(
"wss://ynison.music.yandex.ru/redirector.YnisonRedirectService/GetRedirectToYnison",
headers={
"Sec-WebSocket-Protocol": f"Bearer, v2, {json.dumps(ws_proto)}",
"Origin": "http://music.yandex.ru",
- "Authorization": f"OAuth {yamusic_token}",
+ "Authorization": f"OAuth {self.token}",
},
) as ws:
response = await ws.receive()
return json.loads(response.data)
# Original code: https://raw.githubusercontent.com/MIPOHBOPOHIH/YMMBFA/main/main.py
- async def __get_now_playing(self, yamusic_token: str, client: yandex_music.Client):
+ async def _get_ynison(self):
device_id = ''.join(random.choices(string.ascii_lowercase, k=16))
ws_proto = {
"Ynison-Device-Id": device_id,
"Ynison-Device-Info": json.dumps({"app_name": "Chrome", "type": 1}),
}
- data = await self.__create_ynison_ws(yamusic_token, ws_proto)
-
+ data = await self._create_ynison_ws(ws_proto)
ws_proto["Ynison-Redirect-Ticket"] = data["redirect_ticket"]
-
payload = {
"update_full_state": {
"player_state": {
@@ -639,56 +116,597 @@ class YaMusicMod(loader.Module):
"player_action_timestamp_ms": 0,
"activity_interception_type": "DO_NOT_INTERCEPT_BY_DEFAULT",
}
-
async with aiohttp.ClientSession() as session:
async with session.ws_connect(
f"wss://{data['host']}/ynison_state.YnisonStateService/PutYnisonState",
headers={
"Sec-WebSocket-Protocol": f"Bearer, v2, {json.dumps(ws_proto)}",
"Origin": "http://music.yandex.ru",
- "Authorization": f"OAuth {yamusic_token}",
+ "Authorization": f"OAuth {self.token}",
}
) as ws:
await ws.send_str(json.dumps(payload))
response = await ws.receive()
ynison: dict = json.loads(response.data)
+ return ynison
+
+ async def get_lyrics(self, track_id: int, with_timecodes: bool = False):
+ t = (await self.client.tracks(track_id))[0]
+ if with_timecodes:
+ if t.lyrics_info.has_available_sync_lyrics:
+ lyrics = await self.client.tracks_lyrics(track_id, "LRC")
+ return {
+ "text": requests.get(lyrics.download_url).text,
+ "writers": lyrics.writers
+ }
+ else:
+ if t.lyrics_info.has_available_text_lyrics:
+ lyrics = await self.client.tracks_lyrics(track_id, "TEXT")
+ return {
+ "text": requests.get(lyrics.download_url).text,
+ "writers": lyrics.writers
+ }
+ return None
+ async def get_now_playing(self):
+ ynison = await self._get_ynison()
if len(ynison.get("player_state", {}).get("player_queue", {}).get("playable_list", [])) == 0:
return {}
raw_track = ynison["player_state"]["player_queue"]["playable_list"][
ynison["player_state"]["player_queue"]["current_playable_index"]
]
- track = client.tracks(raw_track["playable_id"])[0]
- device = [
- x for x in ynison['devices'] if x['info']['device_id'] == ynison.get('active_device_id_optional', "")
- ]
-
return {
"paused": ynison["player_state"]["status"]["paused"],
"duration_ms": int(ynison["player_state"]["status"]["duration_ms"]),
"progress_ms": int(ynison["player_state"]["status"]["progress_ms"]),
"entity_id": ynison["player_state"]["player_queue"]["entity_id"],
"entity_type": ynison["player_state"]["player_queue"]["entity_type"],
- "device": device[0] if len(device) > 0 else None,
- "track": {
- "track_id": int(track.track_id.split(":")[0]) if track.track_id.split(":")[0].isdigit() else track.track_id,
- "album_id": track.albums[0].id,
- "title": track.title,
- "artist": track.artists_name(),
- "img": f"https://{track.cover_uri[:-2]}1000x1000",
- "duration": track.duration_ms // 1000,
- "minutes": round(track.duration_ms / 1000) // 60,
- "seconds": round(track.duration_ms / 1000) % 60,
- "download_link": track.get_download_info(get_direct_links=True)[0].direct_link
- }
+ "playable_id": raw_track["playable_id"],
+ "device": [
+ x for x in ynison['devices']
+ if x['info']['device_id'] == ynison.get('active_device_id_optional', "")
+ ],
+ "track": (await self.client.tracks(raw_track["playable_id"]))[0]
} if raw_track['playable_type'] != "LOCAL_TRACK" else {}
+@loader.tds
+class YaMusicMod(loader.Module):
+ """The module for Yandex.Music streaming service"""
+ strings = {"name": "YaMusic"}
+ strings_ru = {"_cls_doc": "Модуль для стримингового сервиса Яндекс.Музыка"}
+
+ def __init__(self):
+ self.config = loader.ModuleConfig(
+ loader.ConfigValue(
+ "token",
+ None,
+ lambda: self.strings["_cfg"]["token"],
+ validator=loader.validators.Hidden()
+ ),
+ loader.ConfigValue(
+ "now_playing_text",
+ "🎧 {performer} — {title}\n\n" \
+ "⌨️ Now is listening on " \
+ "{device} (🔊 " \
+ "{volume}%)\n🗂 Playing from: " \
+ "{playing_from}\n\n🎵 {link} | " \
+ "song.link",
+ lambda: self.strings["_cfg"]["now_playing_text"],
+ validator=loader.validators.String()
+ ),
+ loader.ConfigValue(
+ "autobio",
+ "🎧 {artist} - {title}",
+ lambda: self.strings["_cfg"]["autobio"],
+ validator=loader.validators.String()
+ ),
+ loader.ConfigValue(
+ "no_playing_bio",
+ "Hello!",
+ lambda: self.strings["_cfg"]["no_playing_bio"],
+ validator=loader.validators.String()
+ ),
+ loader.ConfigValue(
+ "banner_version",
+ "new",
+ "Version of track banner (old/new)",
+ validator=loader.validators.Choice(["new", "old"])
+ )
+ )
+
+ async def on_dlmod(self):
+ if not self.get("guide_send", False):
+ await self.inline.bot.send_message(self._tg_id, self.strings("iguide"))
+ self.set("guide_send", True)
+
+ async def client_ready(self, client, db):
+ self._client = client
+ self._db = db
+
+ me = await self._client.get_me()
+ self._premium = me.premium if hasattr(me, "premium") else False
+ self.premium_check.start()
+
+ if self.get("autobio", False):
+ self.autobio.start()
+
+
+ @loader.loop(1800)
+ async def premium_check(self):
+ me = await self._client.get_me()
+ self._premium = me.premium if hasattr(me, "premium") else False
+
+
+ @loader.loop(30)
+ async def autobio(self):
+ if not self.config['token']:
+ self.autobio.stop(); self.set("autobio", False)
+ return
+ ym = await YandexMusic(self.config['token']).init()
+ now = await ym.get_now_playing()
+ if now and (not now['paused']):
+ out = self.config['autobio'].format(
+ title=now['track'].title,
+ artist=", ".join([x.name for x in now['track'].artists])
+ )[:(140 if self._premium else 70)]
+ try:
+ await self._client(
+ telethon.functions.account.UpdateProfileRequest(about=out)
+ )
+ except telethon.errors.rpcerrorlist.FloodWaitError as e:
+ logger.info(f"Sleeping {max(e.seconds, 60)} because of floodwait")
+ await asyncio.sleep(max(e.seconds, 60))
+
+
+ @loader.command(
+ ru_doc="👉 Гайд по получению токена Яндекс.Музыки",
+ alias="yg"
+ )
+ async def yguidecmd(self, message: telethon.types.Message):
+ """👉 Guide for obtaining a Yandex.Music token"""
+ await utils.answer(message, self.strings("guide"))
+
+
+ @loader.command(
+ ru_doc="👉 Включить/выключить автобио",
+ alias="yb"
+ )
+ async def ybiocmd(self, message: telethon.types.Message):
+ """👉 Enable/disable autobio"""
+
+ if (not self.config['token']) and self.get("autobio", False):
+ return await utils.answer(message, self.strings("no_token"))
+
+ bio = not self.get("autobio", False)
+ self.set("autobio", bio)
+ if bio: self.autobio.start()
+ else:
+ self.autobio.stop()
+ try:
+ await self._client(
+ telethon.functions.account.UpdateProfileRequest(
+ about=self.config['no_playing_bio'][:(140 if self._premium else 70)]
+ )
+ )
+ except: pass
+
+ await utils.answer(
+ message,
+ self.strings("autobio")['e' if bio else 'd']
+ )
+
+
+ @loader.command(
+ ru_doc="👉 Получить трек, который играет сейчас (с файлом трека)",
+ alias="ynt"
+ )
+ async def ynowtcmd(self, message: telethon.types.Message):
+ """👉 Get now playing track (with track file)"""
+
+ if not self.config['token']:
+ return await utils.answer(message, self.strings("no_token"))
+ ym = await YandexMusic(self.config['token']).init()
+ now = await ym.get_now_playing()
+ if not now:
+ return await utils.answer(message, self.strings("there_is_no_playing"))
+ if now['entity_type'] not in self.strings("queue_types").keys():
+ now['entity_type'] = "VARIOUS"
+
+ playlist_name = ""
+ if now['entity_type'] == "PLAYLIST":
+ playlist = (await ym.client.playlists_list(now['entity_id']))[0]
+ playlist_name = f"{playlist.title}"
+ if now['entity_type'] == "ALBUM":
+ album = (await ym.client.albums(now['entity_id']))[0]
+ playlist_name = f"{album.title}"
+ if now['entity_type'] == "ARTIST":
+ artist = (await ym.client.artists(now['entity_id']))[0]
+ playlist_name = f"{artist.name}"
+
+ device, volume = "Unknown Device", "❓"
+ if now['device']:
+ device=now['device'][0]['info']['title']
+ volume=round(now['device'][0]['volume']*100, 2)
+
+ out = self.config['now_playing_text'].format(
+ title=now['track'].title,
+ performer=", ".join([x.name for x in now['track'].artists]),
+ device=device, volume=volume,
+ playing_from=self.strings("queue_types").get(now['entity_type']).format(playlist_name),
+ track_id=now['track'].id,
+ album_id=now['track'].albums[0].id,
+ link=f"Яндекс.Музыка"
+ )
+ await utils.answer(
+ message, out+self.strings("downloading")
+ )
+
+ audio = io.BytesIO((await utils.run_sync(requests.get, (await ym.client.tracks_download_info(now['track'].id, get_direct_links=True))[0].direct_link)).content)
+ audio.name = "audio.mp3"
+ await utils.answer(
+ message=message, response=out,
+ file=audio,
+ attributes=([
+ telethon.types.DocumentAttributeAudio(
+ duration=now['track'].duration_ms // 1000,
+ title=now['track'].title,
+ performer=", ".join([x.name for x in now['track'].artists])
+ )
+ ])
+ )
+
+
+ @loader.command(
+ ru_doc="👉 Получить баннер трека, который играет сейчас",
+ alias="yn"
+ )
+ async def ynowcmd(self, message: telethon.types.Message):
+ """👉 Get now playing track's banner"""
+
+ if not self.config['token']:
+ return await utils.answer(message, self.strings("no_token"))
+ ym = await YandexMusic(self.config['token']).init()
+ now = await ym.get_now_playing()
+ if not now:
+ return await utils.answer(message, self.strings("there_is_no_playing"))
+ if now['entity_type'] not in self.strings("queue_types").keys():
+ now['entity_type'] = "VARIOUS"
+
+ playlist_name = ""
+ if now['entity_type'] == "PLAYLIST":
+ playlist = (await ym.client.playlists_list(now['entity_id']))[0]
+ playlist_name = f"{playlist.title}"
+ if now['entity_type'] == "ALBUM":
+ album = (await ym.client.albums(now['entity_id']))[0]
+ playlist_name = f"{album.title}"
+ if now['entity_type'] == "ARTIST":
+ artist = (await ym.client.artists(now['entity_id']))[0]
+ playlist_name = f"{artist.name}"
+
+ device, volume = "Unknown Device", "❓"
+ if now['device']:
+ device=now['device'][0]['info']['title']
+ volume=round(now['device'][0]['volume']*100, 2)
+
+ out = self.config['now_playing_text'].format(
+ title=now['track'].title,
+ performer=", ".join([x.name for x in now['track'].artists]),
+ device=device, volume=volume,
+ playing_from=self.strings("queue_types").get(now['entity_type']).format(playlist_name),
+ track_id=now['track'].id,
+ album_id=now['track'].albums[0].id,
+ link=f"Яндекс.Музыка"
+ )
+ await utils.answer(
+ message, out+self.strings("uploading_banner")
+ )
+
+ lyrics = await ym.get_lyrics(now['track'].id, True)
+ func = self.__create_banner if self.config['banner_version'] == "new" else self.__create_banner_old
+ file = func(
+ now['track'].title, [x.name for x in now['track'].artists],
+ now['duration_ms'], now['progress_ms'],
+ requests.get(f"https://{now['track'].cover_uri[:-2]}1000x1000").content,
+ lyrics['text'] if lyrics else None
+ )
+ await utils.answer(
+ message=message, response=out, file=file
+ )
+
+
+ @loader.command(
+ ru_doc="👉 Лайкнуть играющий сейчас трек"
+ )
+ async def ylikecmd(self, message: telethon.types.Message):
+ """👉 Like now playing track's banner"""
+
+ if not self.config['token']:
+ return await utils.answer(message, self.strings("no_token"))
+ ym = await YandexMusic(self.config['token']).init()
+ now = await ym.get_now_playing()
+ if not now:
+ return await utils.answer(message, self.strings("there_is_no_playing"))
+
+ await ym.client.users_likes_tracks_add(now['track'].id)
+ await utils.answer(
+ message, self.strings("likes")['liked'].format(
+ track_id=now['track'].id, album_id=now['track'].albums[0].id,
+ track=f"{', '.join([x.name for x in now['track'].artists])} — {now['track'].title}"
+ )
+ )
+
+ @loader.command(
+ ru_doc="👉 Убрать лайк с играющего сейчас трека"
+ )
+ async def yunlikecmd(self, message: telethon.types.Message):
+ """👉 Unlike now playing track"""
+
+ if not self.config['token']:
+ return await utils.answer(message, self.strings("no_token"))
+ ym = await YandexMusic(self.config['token']).init()
+ now = await ym.get_now_playing()
+ if not now:
+ return await utils.answer(message, self.strings("there_is_no_playing"))
+
+ await ym.client.users_likes_tracks_remove(now['track'].id)
+ await utils.answer(
+ message, self.strings("likes")['unliked'].format(
+ track_id=now['track'].id, album_id=now['track'].albums[0].id,
+ track=f"{', '.join([x.name for x in now['track'].artists])} — {now['track'].title}"
+ )
+ )
+
+ @loader.command(
+ ru_doc="👉 Дизлайкнуть играющий сейчас трек",
+ alias="ydis"
+ )
+ async def ydislikecmd(self, message: telethon.types.Message):
+ """👉 Dislike now playing track"""
+
+ if not self.config['token']:
+ return await utils.answer(message, self.strings("no_token"))
+ ym = await YandexMusic(self.config['token']).init()
+ now = await ym.get_now_playing()
+ if not now:
+ return await utils.answer(message, self.strings("there_is_no_playing"))
+
+ await ym.client.users_dislikes_tracks_add(now['track'].id)
+ await utils.answer(
+ message, self.strings("likes")['disliked'].format(
+ track_id=now['track'].id, album_id=now['track'].albums[0].id,
+ track=f"{', '.join([x.name for x in now['track'].artists])} — {now['track'].title}"
+ )
+ )
+
+
+ @loader.command(
+ ru_doc="👉 Получить текст играющего сейчас трека"
+ )
+ async def ylyricscmd(self, message: telethon.types.Message):
+ """👉 Get lyrics of the now playing track"""
+
+ if not self.config['token']:
+ return await utils.answer(message, self.strings("no_token"))
+ ym = await YandexMusic(self.config['token']).init()
+ now = await ym.get_now_playing()
+ if not now:
+ return await utils.answer(message, self.strings("there_is_no_playing"))
+
+ lyrics = await ym.get_lyrics(now['playable_id'])
+ if lyrics:
+ await utils.answer(
+ message, self.strings("lyrics").format(
+ track_id=now['track'].id, album_id=now['track'].albums[0].id,
+ track=f"{', '.join([x.name for x in now['track'].artists])} — {now['track'].title}",
+ text=lyrics['text'],
+ writers=", ".join(lyrics['writers'])
+ )
+ )
+ else:
+ await utils.answer(
+ message, self.strings("no_lyrics").format(
+ track_id=now['track'].id, album_id=now['track'].albums[0].id,
+ track=f"{', '.join([x.name for x in now['track'].artists])} — {now['track'].title}"
+ )
+ )
+
+
+ @loader.command(
+ ru_doc="<запрос> 👉 Поиск трека в Яндекс.Музыке",
+ alias="yq"
+ )
+ async def ysearchcmd(self, message: telethon.types.Message):
+ """ 👉 Search track in Yandex.Music"""
+
+ if not self.config['token']:
+ return await utils.answer(message, self.strings("no_token"))
+ ym = await YandexMusic(self.config['token']).init()
+
+ query = utils.get_args_raw(message)
+ if not query:
+ await utils.answer(message, self.strings("args"))
+ return
+
+ message = await utils.answer(message, self.strings("searching"))
+
+ search = await ym.client.search(query, type_="track")
+ if (not search.tracks) or (len(search.tracks.results) == 0):
+ return await utils.answer(message, self.strings("404"))
+
+ out = self.strings("search").format(
+ title=search.tracks.results[0].title + (
+ f" ({search.tracks.results[0].version})" if search.tracks.results[0].version else ""
+ ),
+ performer=", ".join([x.name for x in search.tracks.results[0].artists]),
+ album_id=search.tracks.results[0].albums[0].id, track_id=search.tracks.results[0].id
+ )
+ message = await utils.answer(message, out+self.strings("downloading"))
+
+ info = await ym.client.tracks_download_info(search.tracks.results[0].id, True)
+ link = info[0].direct_link
+ audio = None
+ audio = io.BytesIO((await utils.run_sync(requests.get, link)).content)
+ audio.name = "audio.mp3"
+
+ await utils.answer(
+ message=message, response=out,
+ file=audio,
+ attributes=([
+ telethon.types.DocumentAttributeAudio(
+ duration=int(search.tracks.results[0].duration_ms / 1000),
+ title=search.tracks.results[0].title,
+ performer=", ".join([x.name for x in search.tracks.results[0].artists])
+ )
+ ])
+ )
+
+
def __create_banner(
self,
title: str, artists: list,
duration: int, progress: int,
- track_cover: bytes
+ track_cover: bytes, lyrics: str,
+ ):
+ # ——————————————— CONSTS ———————————————
+ W, H = 1920, 768
+ title_font = ImageFont.truetype(io.BytesIO(requests.get(
+ "https://raw.githubusercontent.com/kamekuro/assets/master/fonts/Onest-Bold.ttf"
+ ).content), 55)
+ artist_font = ImageFont.truetype(io.BytesIO(requests.get(
+ "https://raw.githubusercontent.com/kamekuro/assets/master/fonts/Onest-Bold.ttf"
+ ).content), 46)
+ time_font = ImageFont.truetype(io.BytesIO(requests.get(
+ "https://raw.githubusercontent.com/kamekuro/assets/master/fonts/Onest-Bold.ttf"
+ ).content), 36)
+ lyrics_font = ImageFont.truetype(io.BytesIO(requests.get(
+ "https://raw.githubusercontent.com/kamekuro/assets/master/fonts/YSMusic-HeadlineBold.ttf"
+ ).content), 75)
+ nlyrics_font = ImageFont.truetype(io.BytesIO(requests.get(
+ "https://raw.githubusercontent.com/kamekuro/assets/master/fonts/YSMusic-HeadlineBold.ttf"
+ ).content), 60)
+ def measure(t: str, f: ImageFont.FreeTypeFont, d: ImageDraw.ImageDraw):
+ bb = d.textbbox((0, 0), t, font=f)
+ return bb[2] - bb[0], bb[3] - bb[1]
+
+ # ——————————————— BACKGROUND ———————————————
+ track_cov = Image.open(io.BytesIO(track_cover)).convert("RGBA")
+ banner = (
+ track_cov.resize((W, W))
+ .crop((0, (W-H) // 2, W, ((W-H) // 2) + H))
+ .filter(ImageFilter.GaussianBlur(radius=14))
+ )
+ banner = ImageEnhance.Brightness(banner).enhance(0.3)
+ draw = ImageDraw.Draw(banner)
+
+ # ——————————————— TRACK COVER ———————————————
+ track_cov = track_cov.resize((H-350, H-350))
+ mask = Image.new("L", track_cov.size, 0)
+ ImageDraw.Draw(mask).rounded_rectangle(
+ (0, 0, track_cov.size[0], track_cov.size[1]), radius=35, fill=255
+ )
+ track_cov.putalpha(mask)
+ track_cov = track_cov.crop(track_cov.getbbox())
+ banner.paste(track_cov, (175, 175), mask)
+
+ # ——————————————— ARTIST & TITLE ———————————————
+ text_width, _ = measure(f"{', '.join(artists)} — {title}", title_font, draw)
+ if text_width > 1680:
+ lines = [f"{title}", f"{', '.join(artists)}"]
+ lsizes = [measure(lines[0], title_font, draw), measure(lines[1], artist_font, draw)]
+ else:
+ lines = [f"{', '.join(artists)} — {title}"]
+ lsizes = [measure(lines[0], title_font, draw)]
+ text_h = sum(th for _, th in lsizes) + (len(lines) - 1)
+ text_y = (150 - text_h) / 2
+ for i, (l, (lw, lh)) in enumerate(zip(lines, lsizes)):
+ if len(lines) == 2 and i == 1:
+ ftu = artist_font
+ else:
+ ftu = title_font
+ if lw > 1680:
+ while lw > 1680 and len(l) > 3:
+ l = l[:-4] + "…"
+ lw, _ = measure(l, ftu, draw)
+ tx = (W - lw) / 2
+ draw.text((tx, text_y), l, font=ftu, fill="#A0A0A0")
+ text_y += lh + 5
+
+ # ——————————————— LYRICS ———————————————
+ if lyrics:
+ lyrics_lines = []
+ for match in re.finditer(r"\[(\d{2}):(\d{2}\.\d{2})\] (.+)", lyrics):
+ minutes = int(match.group(1))
+ seconds = float(match.group(2))
+ text = match.group(3)
+ time_ms = int((minutes * 60 + seconds) * 1000)
+ lyrics_lines.append((time_ms, text))
+ llast, lnext = "", ""
+ for i, (time_ms, text) in enumerate(lyrics_lines):
+ if time_ms <= progress:
+ llast = text
+ if i+1 < len(lyrics_lines):
+ lnext = lyrics_lines[i+1][1]
+ else:
+ break
+ y_start = None
+ if llast:
+ lines = textwrap.wrap(llast, width=23)
+ if len(lines) > 3:
+ lines = lines[:3]
+ lines[-1] += "…"
+ lines_sizes = [draw.textbbox((0, 0), l, font=lyrics_font) for l in lines]
+ line_heights = [bb[3] - bb[1] for bb in lines_sizes]
+ total_text_height = sum(line_heights) + (len(lines) - 1) * 10
+ y_start = (150 + (track_cov.size[0]-total_text_height)) / 2
+ for i, line in enumerate(lines):
+ lw = lines_sizes[i][2] - lines_sizes[i][0]
+ tx = (track_cov.size[0]+325 + ((W-track_cov.size[0]+285) - lw)) / 2
+ draw.text((tx, y_start), line, font=lyrics_font, fill="#FFFFFF")
+ y_start += line_heights[i] + 10
+ if lnext:
+ next_lines = textwrap.wrap(lnext, width=23)
+ if len(next_lines) > 2:
+ next_lines = next_lines[:2]
+ next_lines[-1] += "…"
+ next_sizes = [draw.textbbox((0, 0), l, font=nlyrics_font) for l in next_lines]
+ next_heights = [bb[3] - bb[1] for bb in next_sizes]
+ total_text_height = sum(next_heights) + (len(next_lines) - 1) * 10
+ if not y_start:
+ y_start = (150 + (track_cov.size[0] - total_text_height))/2 + 150
+ for j, line in enumerate(next_lines):
+ lw = next_sizes[j][2] - next_sizes[j][0]
+ tx = (track_cov.size[0] + 325 + ((W - track_cov.size[0] + 285) - lw)) / 2
+ draw.text((tx, y_start + 40), line, font=nlyrics_font, fill="#A0A0A0")
+ y_start += next_heights[j] + 10
+
+ # ——————————————— STATUS BAR ———————————————
+ draw.rounded_rectangle([75, 700, 768 + 1072, 700 + 15], radius=15 // 2, fill="#A0A0A0")
+ draw.rounded_rectangle([75, 700, 768 + int(1072 * (progress / duration)), 700 + 15], radius=15 // 2, fill="#FFFFFF")
+ draw.text((75, 650), f"{(progress//1000//60):02}:{(progress//1000%60):02}", font=time_font, fill="#FFFFFF")
+ draw.text((1745, 650), f"{(duration//1000//60):02}:{(duration//1000%60):02}", font=time_font, fill="#FFFFFF")
+
+ # ——————————————— SAVE ———————————————
+ by = io.BytesIO()
+ banner.save(by, format="PNG"); by.seek(0)
+ by.name = "banner.png"
+ return by
+
+
+ def __create_banner_old(
+ self,
+ title: str, artists: list,
+ duration: int, progress: int,
+ track_cover: bytes,
+ *args, **kwargs
):
w, h = 1920, 768
title_font = ImageFont.truetype(io.BytesIO(requests.get(