From 5de86d648b76790fc1bb3ae927de9b33014ca985 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Fri, 21 Nov 2025 01:04:46 +0000 Subject: [PATCH] Added and updated repositories 2025-11-21 01:04:46 --- KorenbZla/HikkaModules/AuroraBull.py | 21 +- KorenbZla/HikkaModules/IrisFarm.py | 8 +- Ruslan-Isaev/modules/SFTPUploader.py | 114 -- Ruslan-Isaev/modules/ssh.py | 645 +++++--- Ruslan-Isaev/modules/ttf.py | 32 +- .../python-modules/createavatarspack.py | 147 -- fiksofficial/python-modules/createpacks.py | 353 +++++ fiksofficial/python-modules/deviceinfo.py | 8 +- fiksofficial/python-modules/speedtest.py | 162 +- unneyon/hikka-mods/langpacks/yamusic.yml | 92 +- unneyon/hikka-mods/yamusic.py | 1375 +++++++++-------- 11 files changed, 1720 insertions(+), 1237 deletions(-) delete mode 100644 Ruslan-Isaev/modules/SFTPUploader.py delete mode 100644 fiksofficial/python-modules/createavatarspack.py create mode 100644 fiksofficial/python-modules/createpacks.py diff --git a/KorenbZla/HikkaModules/AuroraBull.py b/KorenbZla/HikkaModules/AuroraBull.py index 977ebec..8fb0442 100644 --- a/KorenbZla/HikkaModules/AuroraBull.py +++ b/KorenbZla/HikkaModules/AuroraBull.py @@ -23,7 +23,7 @@ # meta pic: https://i.postimg.cc/Hx3Zm8rB/logo.png # meta banner: https://te.legra.ph/file/7612b5506856c1eb34c56.jpg -version = (1, 0, 0) +__version__ = (1, 1, 0) import json import aiohttp @@ -42,7 +42,7 @@ class AuroraBullMod(loader.Module): "error_decoding": "Error: The JSON could not be decoded.", "error_uploading_data": "Error loading data", "error_valid_args": "Please enter valid arguments!", - "launched": "AuroraBull launched!\n\nUse .abulloff to stop the attack.", + "launched": "AuroraBull launched!\n\nUse {prefix}abulloff to stop the attack.", "stopped": "AuroraBull has stopped.", } @@ -51,7 +51,7 @@ class AuroraBullMod(loader.Module): "error_decoding": "Error: не удалось декодировать JSON.", "error_uploading_data": "Ошибка при загрузке данных", "error_valid_args": "Введите корректные аргументы!", - "launched": "AuroraBull запущен!\n\nИспользуйте .abulloff, чтобы остановить атаку.", + "launched": "AuroraBull запущен!\n\nИспользуйте {prefix}abulloff, чтобы остановить атаку.", "stopped": "AuroraBull остановлен.", } @@ -60,7 +60,7 @@ class AuroraBullMod(loader.Module): "error_decoding": "Error: JSON декодлаш муваффақиятли амалга ошмади.", "error_uploading_data": "Маълумотлар юклаб олинмади", "error_valid_args": "Iltimos, to'g'ri dalillarni kiriting!", - "launched": "AuroraBull ishga tushirildi!\n\nHujumni toʻxtatish uchun .abulloff dan foydalaning.", + "launched": "AuroraBull ishga tushirildi!\n\nHujumni toʻxtatish uchun {prefix}abulloff dan foydalaning.", "stopped": "AuroraBull to'xtadi.", } @@ -69,7 +69,7 @@ class AuroraBullMod(loader.Module): "error_decoding": "Error: JSON konnte nicht decodiert werden.", "error_uploading_data": "Fehler beim Hochladen der Daten", "error_valid_args": "Bitte geben Sie gültige Argumente ein!", - "launched": "AuroraBull gestartet!\n\nVerwenden Sie .abulloff, um den Angriff zu stoppen.", + "launched": "AuroraBull gestartet!\n\nVerwenden Sie {prefix}abulloff, um den Angriff zu stoppen.", "stopped": "AuroraBull hat angehalten.", } @@ -78,7 +78,7 @@ class AuroraBullMod(loader.Module): "error_decoding": "Error: No se pudo decodificar JSON.", "error_uploading_data": "Error al cargar los datos", "error_valid_args": "¡Por favor ingrese argumentos válidos!", - "launched": "¡AuroraBull lanzado!\n\nUtiliza .abulloff para detener el ataque.", + "launched": "¡AuroraBull lanzado!\n\nUtiliza {prefix}abulloff para detener el ataque.", "stopped": "AuroraBull se ha detenido.", } @@ -133,7 +133,7 @@ class AuroraBullMod(loader.Module): await utils.answer(message, self.strings("error_valid_args")) return - await utils.answer(message, self.strings("launched")) + await utils.answer(message, self.strings("launched").format(prefix=self.get_prefix())) async with aiohttp.ClientSession() as session: async with session.get(url) as response: @@ -146,10 +146,11 @@ class AuroraBullMod(loader.Module): bull_text = choice(data["BullText"]) await message.respond(text + bull_text) await asyncio.sleep(time) - else: - await utils.answer(message, self.strings("error_key")) + return else: - await utils.answer(message, f"{self.strings('error_uploading_data')}: {response.status}") + return await utils.answer(message, self.strings("error_key")) + else: + return await utils.answer(message, f"{self.strings('error_uploading_data')}: {response.status}") @loader.command( ru_doc="Остановить оскорбления", diff --git a/KorenbZla/HikkaModules/IrisFarm.py b/KorenbZla/HikkaModules/IrisFarm.py index 5ea6744..c312351 100644 --- a/KorenbZla/HikkaModules/IrisFarm.py +++ b/KorenbZla/HikkaModules/IrisFarm.py @@ -23,7 +23,7 @@ # meta pic: https://i.postimg.cc/Hx3Zm8rB/logo.png # meta banner: https://te.legra.ph/file/1d547b05f967c9681b90a.jpg -__version__ = (3, 1, 1) +__version__ = (3, 2, 0) import asyncio import random @@ -97,7 +97,7 @@ class IrisFarmMod(loader.Module): ), loader.ConfigValue( "random_interval", - False, + True, lambda: self.strings["cfg_random_interval"], validator=loader.validators.Boolean() ) @@ -112,8 +112,8 @@ class IrisFarmMod(loader.Module): """{on/off} - turn auto farm on or off""" args = utils.get_args_raw(message).lower() - status_result_True = self.db.get("AuroraIrisFarm", "status", True) - if status_result_True: + status_result = self.db.get("AuroraIrisFarm", "status") + if status_result is True: status_result = self.strings("s_1") else: status_result = self.strings("s_0") diff --git a/Ruslan-Isaev/modules/SFTPUploader.py b/Ruslan-Isaev/modules/SFTPUploader.py deleted file mode 100644 index 8cdd8e9..0000000 --- a/Ruslan-Isaev/modules/SFTPUploader.py +++ /dev/null @@ -1,114 +0,0 @@ -# -*- coding: utf-8 -*- -version = (1, 0, 0) - -# meta developer: @RUIS_VlP - -import random -from datetime import timedelta -from telethon import TelegramClient, events -from telethon import functions -from telethon.tl.types import Message -import os -from .. import loader, utils - -import paramiko - -# requires: paramiko - -def upload_file_sftp(host, port, username, password, local_file, remote_file): - try: - # Создаем экземпляр SSHClient - client = paramiko.SSHClient() - - # Загружаем параметры по умолчанию - client.load_system_host_keys() - - # Разрешаем соединение с сервером, если ключа нет в системе - client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - - # Подключаемся к серверу - client.connect(hostname=host, port=port, username=username, password=password) - - # Открываем SFTP сессию - sftp = client.open_sftp() - - try: - sftp.listdir("SFTP_files") - except IOError: - sftp.mkdir("SFTP_files") - - # Загружаем файл - sftp.put(local_file, remote_file) - - print(f'Файл {local_file} успешно загружен на {remote_file}') - - except Exception as e: - print(f'Произошла ошибка: {e}') - finally: - # Закрываем SFTP сессию и SSH соединение - if 'sftp' in locals(): - sftp.close() - client.close() - - -@loader.tds -class SFTPUploaderMod(loader.Module): - """Загрузка файлов на SFTP""" - - strings = { - "name": "SFTPUploader", - } - - def __init__(self): - self.config = loader.ModuleConfig( - loader.ConfigValue( - "host", - "None", - "IP address or domain", - validator=loader.validators.String() - ), - loader.ConfigValue( - "username", - "None", - "SFTP username", - validator=loader.validators.String() - ), - loader.ConfigValue( - "password", - "None", - "SFTP password", - validator=loader.validators.Hidden() - ), - loader.ConfigValue( - "Port", - 22, - "SFTP port", - validator=loader.validators.String() - ), - ) - - @loader.command() - async def sftp(self, message): - """ - загружает файл на SFPT""" - host = self.config["host"] or "None" - username = self.config["username"] or "None" - password = self.config["password"] or "None" - port = self.config["Port"] or "None" - if host == "None" or username == "None" or password == "None" or port == "None": - await utils.answer(message, "Значения не указаны. Укажите их через команду:\n.config SFTPUploader") - return - reply = await message.get_reply_message() - if reply: - if reply.media: - await utils.answer(message, f"Начинаю загрузку....") - file_path = await message.client.download_media(reply.media) - sftp_path = f"SFTP_files/{file_path}" - upld = upload_file_sftp(host, port, username, password, file_path, sftp_path) - os.remove(file_path) - await utils.answer(message, f"Файл загружен на SFTP сервер(не факт), расположение файла: ~/SFTP_files/{file_path}") - else: - await utils.answer(message, "В сообщении не найдены файлы!") - else: - await utils.answer(message, "Команда должна быть ответом на сообщение!") - return - \ No newline at end of file diff --git a/Ruslan-Isaev/modules/ssh.py b/Ruslan-Isaev/modules/ssh.py index 52ff91c..abbe2f8 100644 --- a/Ruslan-Isaev/modules/ssh.py +++ b/Ruslan-Isaev/modules/ssh.py @@ -1,207 +1,480 @@ -version = (1, 0, 0) +version = (2, 0, 0) # meta developer: @RUIS_VlP # requires: paramiko -import random -from datetime import timedelta -from telethon import TelegramClient, events -from telethon import functions -from telethon.tl.types import Message +import asyncio import os +import random +import string from .. import loader, utils import paramiko -def upload_file_sftp(host, port, username, password, local_file, remote_file): - try: - # Создаем экземпляр SSHClient - client = paramiko.SSHClient() - - # Загружаем параметры по умолчанию - client.load_system_host_keys() - - # Разрешаем соединение с сервером, если ключа нет в системе - client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - - # Подключаемся к серверу - client.connect(hostname=host, port=port, username=username, password=password) - - # Открываем SFTP сессию - sftp = client.open_sftp() - - try: - sftp.listdir("sshmod") - except IOError: - sftp.mkdir("sshmod") - - # Загружаем файл - sftp.put(local_file, remote_file) - - print(f'Файл {local_file} успешно загружен на {remote_file}') - - except Exception as e: - print(f'Произошла ошибка: {e}') - finally: - # Закрываем SFTP сессию и SSH соединение - if 'sftp' in locals(): - sftp.close() - client.close() -def execute_ssh_command(host, port, username, password, command): - try: - # Создаем экземпляр SSHClient - client = paramiko.SSHClient() - - # Загружаем параметры по умолчанию - client.load_system_host_keys() - - # Разрешаем соединение с сервером, если ключа нет в системе - client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - - # Подключаемся к серверу - client.connect(hostname=host, port=port, username=username, password=password) - - # Выполняем команду - stdin, stdout, stderr = client.exec_command(command) - - # Получаем вывод и ошибки - output = stdout.read().decode() - error = stderr.read().decode() - exit_code = stdout.channel.recv_exit_status() - - return exit_code, output, error - - except Exception as e: - print(f'Произошла ошибка: {e}') - return None, None, str(e) - finally: - # Закрываем SSH соединение - client.close() +class SSHConnection: + + def __init__(self, host, port, username, password=None, key_path=None): + self.host = host + self.port = port + self.username = username + self.password = password + self.key_path = key_path + self.client = None + + async def connect(self): + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, self._connect_sync) + + def _connect_sync(self): + self.client = paramiko.SSHClient() + self.client.load_system_host_keys() + self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + + connect_kwargs = { + 'hostname': self.host, + 'port': self.port, + 'username': self.username + } + + if self.key_path and self.key_path != "None": + try: + private_key = paramiko.RSAKey.from_private_key_file(self.key_path) + connect_kwargs['pkey'] = private_key + except: + try: + private_key = paramiko.Ed25519Key.from_private_key_file(self.key_path) + connect_kwargs['pkey'] = private_key + except: + private_key = paramiko.ECDSAKey.from_private_key_file(self.key_path) + connect_kwargs['pkey'] = private_key + else: + connect_kwargs['password'] = self.password + + self.client.connect(**connect_kwargs) + + async def upload_file(self, local_file, remote_file): + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, self._upload_file_sync, local_file, remote_file) + + def _upload_file_sync(self, local_file, remote_file): + sftp = self.client.open_sftp() + + remote_dir = os.path.dirname(remote_file) + if remote_dir: + self._create_remote_dir(sftp, remote_dir) + + sftp.put(local_file, remote_file) + sftp.close() + + def _create_remote_dir(self, sftp, path): + dirs = [] + while path and path != '/': + try: + sftp.stat(path) + break + except IOError: + dirs.append(path) + path = os.path.dirname(path) + + while dirs: + dir_path = dirs.pop() + try: + sftp.mkdir(dir_path) + except IOError: + pass + + async def download_file(self, remote_file, local_file): + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, self._download_file_sync, remote_file, local_file) + + def _download_file_sync(self, remote_file, local_file): + sftp = self.client.open_sftp() + sftp.get(remote_file, local_file) + sftp.close() + + async def execute_command_stream(self, command, callback): + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, + self._execute_command_stream_sync, + command, + callback + ) + + def _execute_command_stream_sync(self, command, callback): + stdin, stdout, stderr = self.client.exec_command(command, get_pty=True) + + channel = stdout.channel + pid = channel.get_id() + + output_lines = [] + error_lines = [] + + while not stdout.channel.exit_status_ready() or stdout.channel.recv_ready(): + if stdout.channel.recv_ready(): + line = stdout.readline() + if line: + output_lines.append(line) + callback('stdout', line, pid) + + remaining = stdout.read().decode() + if remaining: + output_lines.append(remaining) + callback('stdout', remaining, pid) + + error_output = stderr.read().decode() + if error_output: + error_lines.append(error_output) + callback('stderr', error_output, pid) + + exit_code = stdout.channel.recv_exit_status() + + return exit_code, ''.join(output_lines), ''.join(error_lines), pid + + def close(self): + if self.client: + self.client.close() + @loader.tds class SSHMod(loader.Module): - """SSH module for uploading files and executing commands""" + """Модуль для работы с SSH""" - strings = { - "name": "SSHMod", - "cfg_host": "IP address or domain", - "cfg_username": "SSH username", - "cfg_password": "SSH password", - "cfg_port": "SSH port", - "save_description": " - saves the file to the ~/sshmod directory", - "save_uploading": "Starting upload....", - "save_success": "File uploaded to SSH server, file location: ~/sshmod/{}", - "save_no_file": "No files found in the message!", - "save_reply_required": "The command must be a reply to a message!", - "sterminal_description": " - executes a command on the SSH server", - "sterminal_no_command": "No command specified!", - "sterminal_output": "⌨️ System command\n
{}
\nExit code: {}\n📼 Output:\n
{}
", - "sterminal_error": "⌨️ System command\n
{}
\nExit code: {}\n🚫 Errors:\n
{}
", - "sterminal_output_and_error": "⌨️ System command\n
{}
\nExit code: {}\n📼 Output:\n
{}
\n🚫 Errors:\n
{}
", - "config_not_set": "Values are not set. Set them using the command:\n.config SSHMod", - } + strings = { + "name": "SSHMod", + "cfg_host": "IP address or domain", + "cfg_username": "SSH username", + "cfg_password": "SSH password (leave None if using key)", + "cfg_key": "Path to private SSH key (leave None if using password)", + "cfg_port": "SSH port", + "cfg_default_dir": "Default directory for saving files", + "sftpsave_description": " [directory] - saves the file to the specified directory", + "sftpsave_uploading": "Starting upload....", + "sftpsave_success": "File uploaded to SSH server, file location: {}", + "sftpsave_no_file": "No files found in the message!", + "sftpsave_reply_required": "The command must be a reply to a message!", + "sftpupload_description": " - downloads file from SSH server", + "sftpupload_no_path": "No file path specified!", + "sftpupload_downloading": "Downloading file from SSH server...", + "sftpupload_error": "Error downloading file: {}", + "sterminal_description": " - executes a command on the SSH server", + "sterminal_no_command": "No command specified!", + "sterminal_starting": "⌨️ System command\n
{}
\nPID: {}\nStatus: Running...\n📼 Output:\n
{}
", + "sterminal_output": "⌨️ System command\n
{}
\nPID: {}\nExit code: {}\n📼 Output:\n
{}
", + "sterminal_error": "⌨️ System command\n
{}
\nPID: {}\nExit code: {}\n🚫 Errors:\n
{}
", + "sterminal_output_and_error": "⌨️ System command\n
{}
\nPID: {}\nExit code: {}\n📼 Output:\n
{}
\n🚫 Errors:\n
{}
", + "sterminal_stopped": "⌨️ System command\n
{}
\nPID: {}\nStatus: ⛔ Stopped by user\n📼 Output:\n
{}
", + "addkey_description": " - saves SSH private key to .ssh directory", + "addkey_no_key": "No key content provided!", + "addkey_success": "Key saved successfully!\nKey file: {}\nFull path: {}\n\nYou can now set it in config:\n.fcfg SSHMod key_path {}", + "addkey_error": "Error saving key: {}", + "config_not_set": "Values are not set. Set them using the command:\n.config SSHMod", + "stop_button": "⛔ Stop", + } - strings_ru = { - "name": "SSHMod", - "cfg_host": "IP-адрес или домен", - "cfg_username": "Имя пользователя SSH", - "cfg_password": "Пароль SSH", - "cfg_port": "Порт SSH", - "save_description": " - сохраняет файл в директорию ~/sshmod", - "save_uploading": "Начинаю загрузку....", - "save_success": "Файл загружен на SSH сервер, расположение файла: ~/sshmod/{}", - "save_no_file": "В сообщении не найдены файлы!", - "save_reply_required": "Команда должна быть ответом на сообщение!", - "sterminal_description": " - выполняет команду на SSH сервере", - "sterminal_no_command": "Не указана команда для выполнения!", - "sterminal_output": "⌨️ Системная команда\n
{}
\nКод выхода: {}\n📼 Вывод:\n
{}
", - "sterminal_error": "⌨️ Системная команда\n
{}
\nКод выхода: {}\n🚫 Ошибки:\n
{}
", - "sterminal_output_and_error": "⌨️ Системная команда\n
{}
\nКод выхода: {}\n📼 Вывод:\n
{}
\n🚫 Ошибки:\n
{}
", - "config_not_set": "Значения не указаны. Укажите их через команду:\n.config SSHMod", - } + strings_ru = { + "name": "SSHMod", + "cfg_host": "IP-адрес или домен", + "cfg_username": "Имя пользователя SSH", + "cfg_password": "Пароль SSH (оставьте None при использовании ключа)", + "cfg_key": "Путь к закрытому SSH ключу (оставьте None при использовании пароля)", + "cfg_port": "Порт SSH", + "cfg_default_dir": "Директория по умолчанию для сохранения файлов", + "sftpsave_description": " [директория] - сохраняет файл в указанную директорию", + "sftpsave_uploading": "Начинаю загрузку....", + "sftpsave_success": "Файл загружен на SSH сервер, расположение файла: {}", + "sftpsave_no_file": "В сообщении не найдены файлы!", + "sftpsave_reply_required": "Команда должна быть ответом на сообщение!", + "sftpupload_description": "<путь_к_файлу> - скачивает файл с SSH сервера", + "sftpupload_no_path": "Не указан путь к файлу!", + "sftpupload_downloading": "Скачиваю файл с SSH сервера...", + "sftpupload_error": "Ошибка при скачивании файла: {}", + "sterminal_description": " - выполняет команду на SSH сервере", + "sterminal_no_command": "Не указана команда для выполнения!", + "sterminal_starting": "⌨️ Системная команда\n
{}
\nPID: {}\nСтатус: Выполняется...\n📼 Вывод:\n
{}
", + "sterminal_output": "⌨️ Системная команда\n
{}
\nPID: {}\nКод выхода: {}\n📼 Вывод:\n
{}
", + "sterminal_error": "⌨️ Системная команда\n
{}
\nPID: {}\nКод выхода: {}\n🚫 Ошибки:\n
{}
", + "sterminal_output_and_error": "⌨️ Системная команда\n
{}
\nPID: {}\nКод выхода: {}\n📼 Вывод:\n
{}
\n🚫 Ошибки:\n
{}
", + "sterminal_stopped": "⌨️ Системная команда\n
{}
\nPID: {}\nСтатус: ⛔ Остановлено пользователем\n📼 Вывод:\n
{}
", + "addkey_description": "<содержимое_ключа> - сохраняет SSH ключ в директорию .ssh", + "addkey_no_key": "Не указано содержимое ключа!", + "addkey_success": "Ключ успешно сохранён!\nИмя файла: {}\nПолный путь: {}\n\nДля применения напишите:\n.fcfg SSHMod key_path {}", + "addkey_error": "Ошибка при сохранении ключа: {}", + "config_not_set": "Значения не указаны. Укажите их через команду:\n.config SSHMod", + "stop_button": "⛔ Остановить", + } - def __init__(self): - self.config = loader.ModuleConfig( - loader.ConfigValue( - "host", - "None", - lambda: self.strings["cfg_host"], - validator=loader.validators.String(), - ), - loader.ConfigValue( - "username", - "None", - lambda: self.strings["cfg_username"], - validator=loader.validators.String(), - ), - loader.ConfigValue( - "password", - "None", - lambda: self.strings["cfg_password"], - validator=loader.validators.Hidden(), - ), - loader.ConfigValue( - "Port", - 22, - lambda: self.strings["cfg_port"], - validator=loader.validators.String(), - ), - ) + def __init__(self): + self.config = loader.ModuleConfig( + loader.ConfigValue( + "host", + "None", + lambda: self.strings["cfg_host"], + validator=loader.validators.String(), + ), + loader.ConfigValue( + "username", + "None", + lambda: self.strings["cfg_username"], + validator=loader.validators.String(), + ), + loader.ConfigValue( + "password", + "None", + lambda: self.strings["cfg_password"], + validator=loader.validators.Hidden(), + ), + loader.ConfigValue( + "key_path", + "None", + lambda: self.strings["cfg_key"], + validator=loader.validators.String(), + ), + loader.ConfigValue( + "Port", + 22, + lambda: self.strings["cfg_port"], + validator=loader.validators.Integer(), + ), + loader.ConfigValue( + "default_directory", + "sshmod", + lambda: self.strings["cfg_default_dir"], + validator=loader.validators.String(), + ), + ) + self.active_tasks = {} - @loader.command(alias="save") - async def save(self, message): - """ - saves the file to the ~/sshmod directory""" - host = self.config["host"] or "None" - username = self.config["username"] or "None" - password = self.config["password"] or "None" - port = self.config["Port"] or "None" - if host == "None" or username == "None" or password == "None" or port == "None": - await utils.answer(message, self.strings["config_not_set"]) - return - reply = await message.get_reply_message() - if reply: - if reply.media: - await utils.answer(message, self.strings["save_uploading"]) - file_path = await message.client.download_media(reply.media) - sftp_path = f"sshmod/{os.path.basename(file_path)}" - upload_file_sftp(host, port, username, password, file_path, sftp_path) - os.remove(file_path) - await utils.answer( - message, - self.strings["save_success"].format(os.path.basename(file_path)), - ) - else: - await utils.answer(message, self.strings["save_no_file"]) - else: - await utils.answer(message, self.strings["save_reply_required"]) + @loader.command() + async def sftpsave(self, message): + """ [dir] - сохраняет указанных файл на сервер""" + host = self.config["host"] + username = self.config["username"] + password = self.config["password"] + key_path = self.config["key_path"] + port = self.config["Port"] + + if host == "None" or username == "None" or (password == "None" and key_path == "None"): + await utils.answer(message, self.strings["config_not_set"]) + return + + reply = await message.get_reply_message() + if not reply: + await utils.answer(message, self.strings["sftpsave_reply_required"]) + return + + if not reply.media: + await utils.answer(message, self.strings["sftpsave_no_file"]) + return + args = utils.get_args_raw(message) + remote_dir = args if args else self.config["default_directory"] + + await utils.answer(message, self.strings["sftpsave_uploading"]) + + file_path = await message.client.download_media(reply.media) + file_name = os.path.basename(file_path) + sftp_path = f"{remote_dir}/{file_name}" + + conn = SSHConnection(host, port, username, password, key_path) + await conn.connect() + await conn.upload_file(file_path, sftp_path) + conn.close() + + os.remove(file_path) + + await utils.answer( + message, + self.strings["sftpsave_success"].format(sftp_path), + ) - @loader.command(alias="sterminal") - async def sterminal(self, message): - """ - executes a command on the SSH server""" - host = self.config["host"] or "None" - username = self.config["username"] or "None" - password = self.config["password"] or "None" - port = self.config["Port"] or "None" - if host == "None" or username == "None" or password == "None" or port == "None": - await utils.answer(message, self.strings["config_not_set"]) - return - command = utils.get_args_raw(message) - if not command: - await utils.answer(message, self.strings["sterminal_no_command"]) - return + @loader.command() + async def sftpdownload(self, message): + """ - скачивает указанных файл с сервера""" + host = self.config["host"] + username = self.config["username"] + password = self.config["password"] + key_path = self.config["key_path"] + port = self.config["Port"] + + if host == "None" or username == "None" or (password == "None" and key_path == "None"): + await utils.answer(message, self.strings["config_not_set"]) + return + + remote_path = utils.get_args_raw(message) + if not remote_path: + await utils.answer(message, self.strings["sftpupload_no_path"]) + return + + await utils.answer(message, self.strings["sftpupload_downloading"]) + + local_file = f"/tmp/sftp_download_{random.randint(1000, 9999)}_{os.path.basename(remote_path)}" + + try: + conn = SSHConnection(host, port, username, password, key_path) + await conn.connect() + await conn.download_file(remote_path, local_file) + conn.close() + + await utils.answer_file( + message, + local_file, + f"📥 File from SSH server: {remote_path}" + ) + + if os.path.exists(local_file): + os.remove(local_file) + + except Exception as e: + await utils.answer(message, self.strings["sftpupload_error"].format(str(e))) + if os.path.exists(local_file): + os.remove(local_file) - # Выполняем команду на SSH сервере - exit_code, output, error = execute_ssh_command(host, port, username, password, command) + @loader.command() + async def addkey(self, message): + """<ключ> - сохраняет указанный ssh ключ""" + key_content = utils.get_args_raw(message) + + if not key_content: + await utils.answer(message, self.strings["addkey_no_key"]) + return + + try: + ssh_dir = os.path.expanduser("~/.ssh") + os.makedirs(ssh_dir, exist_ok=True) + + random_name = ''.join(random.choices(string.ascii_lowercase + string.digits, k=12)) + key_filename = f"ssh_key_{random_name}" + key_path = os.path.join(ssh_dir, key_filename) + + with open(key_path, 'w') as f: + f.write(key_content) + + os.chmod(key_path, 0o600) + + await utils.answer( + message, + self.strings["addkey_success"].format(key_filename, key_path, key_path) + ) + + except Exception as e: + await utils.answer(message, self.strings["addkey_error"].format(str(e))) - # Формируем ответ в зависимости от наличия вывода и ошибок - if output and not error: - response = self.strings["sterminal_output"].format(command, exit_code, output) - elif error and not output: - response = self.strings["sterminal_error"].format(command, exit_code, error) - elif output and error: - response = self.strings["sterminal_output_and_error"].format(command, exit_code, output, error) - else: - response = f"⌨️ System command\n
{command}
\nExit code: {exit_code}" + @loader.command(alias="ssh") + async def sterminal(self, message): + """<команда> - выполняет команду на ssh сервере""" + host = self.config["host"] + username = self.config["username"] + password = self.config["password"] + key_path = self.config["key_path"] + port = self.config["Port"] + + if host == "None" or username == "None" or (password == "None" and key_path == "None"): + await utils.answer(message, self.strings["config_not_set"]) + return + + command = utils.get_args_raw(message) + if not command: + await utils.answer(message, self.strings["sterminal_no_command"]) + return - await utils.answer(message, response) \ No newline at end of file + conn = SSHConnection(host, port, username, password, key_path) + await conn.connect() + + output_buffer = [] + error_buffer = [] + current_pid = None + stop_flag = False + + def stream_callback(stream_type, data, pid): + nonlocal current_pid + if current_pid is None: + current_pid = pid + if stream_type == 'stdout': + output_buffer.append(data) + else: + error_buffer.append(data) + + stop_button = { + "text": self.strings["stop_button"], + "callback": self._stop_callback, + "args": (message.chat_id, message.id), + } + + task_id = f"{message.chat_id}_{message.id}" + self.active_tasks[task_id] = {'stop': False, 'conn': conn} + + msg = await utils.answer( + message, + self.strings["sterminal_starting"].format(command, "...", ""), + reply_markup=[[stop_button]] + ) + + async def execute_task(): + try: + exit_code, output, error, pid = await conn.execute_command_stream( + command, + stream_callback + ) + + if self.active_tasks.get(task_id, {}).get('stop'): + current_output = ''.join(output_buffer) + await utils.answer( + msg, + self.strings["sterminal_stopped"].format( + command, + pid if pid else "N/A", + current_output if current_output else "No output" + ) + ) + else: + if output and not error: + response = self.strings["sterminal_output"].format(command, pid, exit_code, output) + elif error and not output: + response = self.strings["sterminal_error"].format(command, pid, exit_code, error) + elif output and error: + response = self.strings["sterminal_output_and_error"].format(command, pid, exit_code, output, error) + else: + response = f"⌨️ System command\n
{command}
\nPID: {pid}\nExit code: {exit_code}" + + await utils.answer(msg, response) + + except Exception as e: + await utils.answer(msg, f"Error: {str(e)}") + finally: + conn.close() + if task_id in self.active_tasks: + del self.active_tasks[task_id] + + asyncio.create_task(execute_task()) + + for _ in range(60): + await asyncio.sleep(2) + + if task_id not in self.active_tasks: + break + + if self.active_tasks[task_id].get('stop'): + break + + current_output = ''.join(output_buffer[-20:]) + if current_output: + await msg.edit( + self.strings["sterminal_starting"].format( + command, + current_pid if current_pid else "...", + current_output[-1500:] # Ограничение длины + ), + reply_markup=[[stop_button]] + ) + + async def _stop_callback(self, call, chat_id, msg_id): + """Callback для кнопки остановки""" + task_id = f"{chat_id}_{msg_id}" + if task_id in self.active_tasks: + self.active_tasks[task_id]['stop'] = True + try: + self.active_tasks[task_id]['conn'].close() + except: + pass + await call.answer("Stopping...") \ No newline at end of file diff --git a/Ruslan-Isaev/modules/ttf.py b/Ruslan-Isaev/modules/ttf.py index 2d88103..3f4f852 100644 --- a/Ruslan-Isaev/modules/ttf.py +++ b/Ruslan-Isaev/modules/ttf.py @@ -49,34 +49,4 @@ class TTFMod(loader.Module): # Удаление файла os.remove(file_path) - - @loader.command() - async def ttf_noreply(self, message): - """ - Создает текстовый файл с заданным именем и расширением, - записывает в него текст, отправляет его в Telegram и удаляет с диска. - - Пример: - .ttf название.txt - Текст для файла - - """ - args = utils.get_args_raw(message).split("\n") - if len(args) < 1: - await message.edit("Недостаточно аргументов. Используйте: .ttf название.txt\nТекст для файла") - return - - filename = args[0].strip() - text = "\n".join(args[1:]) - - # Создание файла - file_path = os.path.join(os.getcwd(), filename) - with open(file_path, 'w') as file: - file.write(text) - await message.client.delete_messages(message.chat_id, message.id) - # Отправка файла - await message.client.send_file(message.chat_id, file_path) - - # Удаление файла - os.remove(file_path) - \ No newline at end of file + \ No newline at end of file diff --git a/fiksofficial/python-modules/createavatarspack.py b/fiksofficial/python-modules/createavatarspack.py deleted file mode 100644 index 2ebca46..0000000 --- a/fiksofficial/python-modules/createavatarspack.py +++ /dev/null @@ -1,147 +0,0 @@ -# ______ ___ ___ _ _ -# ____ | ___ \ | \/ | | | | | -# / __ \| |_/ / _| . . | ___ __| |_ _| | ___ -# / / _` | __/ | | | |\/| |/ _ \ / _` | | | | |/ _ \ -# | | (_| | | | |_| | | | | (_) | (_| | |_| | | __/ -# \ \__,_\_| \__, \_| |_/\___/ \__,_|\__,_|_|\___| -# \____/ __/ | -# |___/ - -# На модуль распространяется лицензия "GNU General Public License v3.0" -# https://github.com/all-licenses/GNU-General-Public-License-v3.0 - -# meta developer: @pymodule -# requires: opencv-python pillow - -import os, shutil, cv2 -from PIL import Image, UnidentifiedImageError -from telethon.tl.functions.stickers import CreateStickerSetRequest -from telethon.tl.types import InputStickerSetItem, InputDocument -from telethon.errors.rpcerrorlist import PackShortNameOccupiedError -from .. import loader -from telethon.tl.functions.photos import GetUserPhotosRequest - -import asyncio -import random -import string - -try: - resample = Image.Resampling.LANCZOS -except: - resample = Image.LANCZOS - -@loader.tds -class CreateAvatarsPack(loader.Module): - """Creates a sticker pack from photos and video avatars of participants""" - strings = { - "name": "CreateAvatarsPack", - "processing": "📥 I'm collecting avatars of participants...", - "no_avatars": "❌ No members with avatars", - "no_valid": "❌ Could not process any avatars", - "done": "✅ The sticker pack is ready:\n👉 Open", - "already": "⚠️ A sticker pack with this name already exists.", - } - - strings_ru = { - "processing": "📥 Собираю аватарки участников...", - "no_avatars": "❌ Нет участников с аватарками", - "no_valid": "❌ Не удалось обработать ни одну аватарку", - "done": "✅ Стикерпак готов:\n👉 Открыть", - "already": "⚠️ Стикерпак с таким именем уже существует", - } - - @loader.command(doc="- Create a sticker pack from the avatars of users in the group", ru_doc="- Создать стикерпак из аватаров пользователей группы", only_groups=True) - async def createavatars(self, message): - """- Create a sticker pack from the avatars of users in the group""" - chat = await message.get_chat() - cid = abs(message.chat_id) - await message.edit(self.strings["processing"]) - - users = [] - async for u in self._client.iter_participants(chat.id): - if u.photo: - users.append(u) - if len(users) >= 100: - break - - if not users: - return await message.edit(self.strings["no_avatars"]) - - tmp_dir = f"/tmp/avatars_{cid}" - os.makedirs(tmp_dir, exist_ok=True) - sticker_files = [] - - for u in users: - try: - photos = await self._client(GetUserPhotosRequest(u.id, 0, 0, 1)) - if not photos.photos: - continue - - raw = await self._client.download_media(photos.photos[0]) - data = raw if isinstance(raw, (bytes, bytearray)) else open(raw, "rb").read() - - path_raw = os.path.join(tmp_dir, f"{u.id}_raw") - with open(path_raw, "wb") as f: - f.write(data) - - if b"ftyp" in data[:32] or path_raw.endswith((".mp4", ".webm", ".mov")): - cap = cv2.VideoCapture(path_raw) - success, frame = cap.read() - cap.release() - if not success: - continue - img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGBA)) - else: - try: - img = Image.open(path_raw).convert("RGBA") - except UnidentifiedImageError: - continue - - img.thumbnail((512, 512), resample) - w, h = img.size - final = Image.new("RGBA", (512, 512), (0, 0, 0, 0)) - final.paste(img, ((512 - w)//2, (512 - h)//2)) - - out = os.path.join(tmp_dir, f"{u.id}.webp") - final.save(out, "WEBP") - sticker_files.append(out) - - except: - continue - - if not sticker_files: - shutil.rmtree(tmp_dir, ignore_errors=True) - return await message.edit(self.strings["no_valid"]) - - tag = ''.join(random.choices(string.ascii_lowercase + string.digits, k=4)) - short = f"f{cid}_{tag}_by_fcreateavatars" - title = f"AvaPack {tag}" - - stickers = [] - for p in sticker_files: - await asyncio.sleep(0.3) - file = await self._client.upload_file(p) - msg = await self._client.send_file("me", file, force_document=True) - doc = msg.document - await self._client.delete_messages("me", msg.id) - stickers.append(InputStickerSetItem( - document=InputDocument(doc.id, doc.access_hash, doc.file_reference), - emoji="🖼️" - )) - - try: - await self._client(CreateStickerSetRequest( - user_id="me", - title=title, - short_name=short, - stickers=stickers - )) - except PackShortNameOccupiedError: - shutil.rmtree(tmp_dir, ignore_errors=True) - return await message.edit(self.strings["already"]) - except Exception as e: - shutil.rmtree(tmp_dir, ignore_errors=True) - return await message.edit(f"❌ Error: {e}") - - shutil.rmtree(tmp_dir, ignore_errors=True) - await message.edit(self.strings["done"].format(short)) diff --git a/fiksofficial/python-modules/createpacks.py b/fiksofficial/python-modules/createpacks.py new file mode 100644 index 0000000..296d1fb --- /dev/null +++ b/fiksofficial/python-modules/createpacks.py @@ -0,0 +1,353 @@ +# ______ ___ ___ _ _ +# ____ | ___ \ | \/ | | | | | +# / __ \| |_/ / _| . . | ___ __| |_ _| | ___ +# / / _` | __/ | | | |\/| |/ _ \ / _` | | | | |/ _ \ +# | | (_| | | | |_| | | | | (_) | (_| | |_| | | __/ +# \ \__,_\_| \__, \_| |_/\___/ \__,_|\__,_|_|\___| +# \____/ __/ | +# |___/ + +# На модуль распространяется лицензия "GNU General Public License v3.0" +# https://github.com/all-licenses/GNU-General-Public-License-v3.0 + +# meta developer: @pymodule +# requires: opencv-python pillow + +import os +import shutil +import cv2 +import random +import string +import asyncio +import logging +from PIL import Image, UnidentifiedImageError + +from telethon.tl.functions.stickers import CreateStickerSetRequest +from telethon.tl.types import InputStickerSetItem, InputDocument +from telethon.errors.rpcerrorlist import PackShortNameOccupiedError + +from .. import loader, utils +from telethon.tl.functions.photos import GetUserPhotosRequest + + +try: + resample = Image.Resampling.LANCZOS +except AttributeError: + resample = Image.LANCZOS + +logger = logging.getLogger(__name__) + + + +async def process_to_webp(input_path: str, output_path: str, size: int = 512) -> bool: + try: + is_video = input_path.lower().endswith(('.mp4', '.webm', '.mov')) or b'ftyp' in open(input_path, 'rb').read(32) + if is_video: + cap = cv2.VideoCapture(input_path) + success, frame = cap.read() + cap.release() + if not success: + logger.warning(f"Video: Unable to read frame {input_path}") + return False + img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGBA)) + else: + try: + img = Image.open(input_path).convert("RGBA") + except UnidentifiedImageError: + logger.warning(f"Image: incorrect {input_path}") + return False + + img.thumbnail((size, size), resample) + final = Image.new("RGBA", (size, size), (0, 0, 0, 0)) + w, h = img.size + final.paste(img, ((size - w) // 2, (size - h) // 2)) + + final.save(output_path, "WEBP", quality=95, method=6) + + try: + check = Image.open(output_path) + if check.size != (size, size): + logger.warning(f"WEBP: size not {size}x{size}: {check.size}") + return False + if os.path.getsize(output_path) > 512 * 1024: + final.save(output_path, "WEBP", quality=80, method=6) + if os.path.getsize(output_path) > 512 * 1024: + return False + except Exception as e: + logger.error(f"WEBP: verification error {output_path}: {e}") + return False + + return True + except Exception as e: + logger.error(f"WEBP: processing error {input_path}: {e}") + return False + + + +async def process_to_png(input_path: str, output_path: str, size: int = 100) -> bool: + try: + is_video = input_path.lower().endswith(('.mp4', '.webm', '.mov')) or b'ftyp' in open(input_path, 'rb').read(32) + if is_video: + cap = cv2.VideoCapture(input_path) + success, frame = cap.read() + cap.release() + if not success: + logger.warning(f"Video: Unable to read frame {input_path}") + return False + img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGBA)) + else: + try: + img = Image.open(input_path).convert("RGBA") + except UnidentifiedImageError: + logger.warning(f"Image: incorrect {input_path}") + return False + + img.thumbnail((size, size), resample) + final = Image.new("RGBA", (size, size), (0, 0, 0, 0)) + w, h = img.size + final.paste(img, ((size - w) // 2, (size - h) // 2)) + + final.save(output_path, "PNG") + + try: + check = Image.open(output_path) + if check.size != (size, size): + logger.warning(f"PNG: size not {size}x{size}: {check.size}") + return False + if os.path.getsize(output_path) > 512 * 1024: + logger.warning(f"PNG: file >512KB: {os.path.getsize(output_path)}") + return False + except Exception as e: + logger.error(f"PNG: verification error {output_path}: {e}") + return False + + return True + except Exception as e: + logger.error(f"PNG: processing error {input_path}: {e}") + return False + + +@loader.tds +class CreatePacks(loader.Module): + """Creates sticker packs and emoji packs from the avatars of chat participants""" + + strings = { + "name": "CreatePacks", + "processing": "[CreatePacks] Collecting avatars of participants...", + "no_avatars": "[CreatePacks] No members with avatars", + "no_valid": "[CreatePacks] Could not process any avatars", + "done_pack": "[CreatePacks] Sticker pack is ready:\n[CreatePacks] Open: here", + "done_emoji_pack": "[CreatePacks] Emoji pack is ready:\n[CreatePacks] Open: here", + "already": "[CreatePacks] A sticker pack with this name already exists.", + "emoji_processing": "[CreatePacks] Creating emoji pack from avatars...", + "emoji_no_emoji": "[CreatePacks] No emoji specified — using", + } + + strings_ru = { + "_cls_doc": "Создаёт стикерпаки и эмодзи-паки из аватаров участников чата", + "processing": "[CreatePacks] Собираю аватарки участников...", + "no_avatars": "[CreatePacks] Нет участников с аватарками", + "no_valid": "[CreatePacks] Не удалось обработать ни одну аватарку", + "done_pack": "[CreatePacks] Стикерпак готов:\n[CreatePacks] Открыть: здесь", + "done_emoji_pack": "[CreatePacks] Эмодзи-пак готов:\n[CreatePacks] Открыть: здесь", + "already": "[CreatePacks] Стикерпак с таким именем уже существует", + "emoji_processing": "[CreatePacks] Создаю эмодзи-пак из аватаров...", + "emoji_no_emoji": "[CreatePacks] Эмодзи не указан — используется", + } + + async def _get_avatar_files(self, message, format: str = "webp", size: int = 512) -> tuple[list[str], str]: + chat = await message.get_chat() + cid = abs(message.chat_id) + tmp_dir = f"/tmp/avatars_{cid}_{random.randint(1000, 9999)}" + os.makedirs(tmp_dir, exist_ok=True) + + users = [] + async for u in self._client.iter_participants(chat.id): + if u.photo: + users.append(u) + if len(users) >= 100: + break + + if not users: + shutil.rmtree(tmp_dir, ignore_errors=True) + return [], tmp_dir + + processed = [] + process_func = process_to_webp if format == "webp" else process_to_png + + for u in users: + try: + photos = await self._client(GetUserPhotosRequest(u.id, 0, 0, 1)) + if not photos.photos: + continue + + raw_path = os.path.join(tmp_dir, f"{u.id}_raw") + raw = await self._client.download_media(photos.photos[0], file=raw_path) + + ext = ".webp" if format == "webp" else ".png" + output_path = os.path.join(tmp_dir, f"{u.id}{ext}") + success = False + + if isinstance(raw, str): + success = await process_func(raw, output_path, size=size) + if os.path.exists(raw): + os.unlink(raw) + else: + temp_raw = os.path.join(tmp_dir, f"{u.id}_temp_raw") + with open(temp_raw, "wb") as f: + f.write(raw) + success = await process_func(temp_raw, output_path, size=size) + if os.path.exists(temp_raw): + os.unlink(temp_raw) + + if success: + try: + img_size = Image.open(output_path).size + if img_size != (size, size): + logger.warning(f"{format.upper()}: size not {size}x{size}: {img_size}") + os.unlink(output_path) + continue + if os.path.getsize(output_path) > 512 * 1024: + logger.warning(f"{format.upper()}: file >512KB: {os.path.getsize(output_path)}") + os.unlink(output_path) + continue + processed.append(output_path) + except Exception as e: + logger.error(f"{format.upper()}: verification error {output_path}: {e}") + else: + logger.warning(f"{format.upper()}: Failed to process avatar {u.id}") + + except Exception as e: + logger.error(f"User processing error {u.id}: {e}") + continue + + return processed, tmp_dir + + @loader.command( + ru_doc="- Создать стикерпак из аватаров в группе", + only_groups=True + ) + async def createavatars(self, message): + """- Create a sticker pack from avatars in a group""" + await message.edit(self.strings("processing")) + + files, tmp_dir = await self._get_avatar_files(message, format="webp", size=512) + if not files: + return await message.edit(self.strings("no_avatars")) + + tag = ''.join(random.choices(string.ascii_lowercase + string.digits, k=4)) + short_name = f"f{abs(message.chat_id)}_{tag}_by_fcreateavatars" + title = f"AvaPack {tag}" + + stickers = [] + for path in files: + try: + await asyncio.sleep(0.3) + file = await self._client.upload_file(path) + msg = await self._client.send_file("me", file, force_document=True) + doc = msg.document + await self._client.delete_messages("me", msg.id) + stickers.append(InputStickerSetItem( + document=InputDocument(doc.id, doc.access_hash, doc.file_reference), + emoji="🖼️" + )) + except Exception as e: + logger.error(f"Sticker loading error {path}: {e}") + continue + + if not stickers: + shutil.rmtree(tmp_dir, ignore_errors=True) + return await message.edit(self.strings("no_valid")) + + try: + await self._client(CreateStickerSetRequest( + user_id="me", + title=title, + short_name=short_name, + stickers=stickers + )) + await message.edit(self.strings("done_pack").format(short_name)) + except PackShortNameOccupiedError: + await message.edit(self.strings("already")) + except Exception as e: + error_details = f"❌ Ошибка создания стикерпака:\n{type(e).__name__}: {e}\n" + error_details += f"Пак: {short_name}\nСтикеров: {len(stickers)}\n" + if files: + error_details += f"Последний файл: {files[-1]}\n" + try: + error_details += f"Размер: {Image.open(files[-1]).size}\n" + error_details += f"Вес: {os.path.getsize(files[-1])} байт" + except: + pass + await message.edit(error_details) + logger.exception("Error creating sticker pack") + finally: + shutil.rmtree(tmp_dir, ignore_errors=True) + + @loader.command( + ru_doc="[эмодзи] - Создать эмодзи-пак из всех аватаров", + only_groups=True + ) + async def createemojis(self, message): + """[emoji] - Create an emoji pack from all avatars""" + args = utils.get_args_raw(message) + emoji = args.strip() if args else "🖼️" + if not args: + await message.edit(self.strings("emoji_no_emoji") + f" `{emoji}`") + await asyncio.sleep(1.5) + + await message.edit(self.strings("emoji_processing")) + + files, tmp_dir = await self._get_avatar_files(message, format="png", size=100) + if not files: + return await message.edit(self.strings("no_avatars")) + + tag = ''.join(random.choices(string.ascii_lowercase + string.digits, k=4)) + short_name = f"f{abs(message.chat_id)}_{tag}_by_fcreateemojis" + title = f"EmojiPack {tag}" + + stickers = [] + for path in files: + try: + await asyncio.sleep(0.3) + file = await self._client.upload_file(path) + msg = await self._client.send_file("me", file, force_document=True) + doc = msg.document + await self._client.delete_messages("me", msg.id) + stickers.append(InputStickerSetItem( + document=InputDocument(doc.id, doc.access_hash, doc.file_reference), + emoji=emoji + )) + except Exception as e: + logger.error(f"Error loading emoji {path}: {e}") + continue + + if not stickers: + shutil.rmtree(tmp_dir, ignore_errors=True) + return await message.edit(self.strings("no_valid")) + + try: + await self._client(CreateStickerSetRequest( + user_id="me", + title=title, + short_name=short_name, + stickers=stickers, + emojis=True + )) + await message.edit(self.strings("done_emoji_pack").format(short_name)) + except PackShortNameOccupiedError: + await message.edit(self.strings("already")) + except Exception as e: + error_details = f"❌ Ошибка создания эмодзи-пака:\n{type(e).__name__}: {e}\n" + error_details += f"Пак: {short_name}\nСмайликов: {len(stickers)}\n" + if files: + error_details += f"Последний файл: {files[-1]}\n" + try: + error_details += f"Размер: {Image.open(files[-1]).size}\n" + error_details += f"Вес: {os.path.getsize(files[-1])} байт" + except: + pass + await message.edit(error_details) + logger.exception("Error creating emoji pack") + finally: + shutil.rmtree(tmp_dir, ignore_errors=True) \ No newline at end of file diff --git a/fiksofficial/python-modules/deviceinfo.py b/fiksofficial/python-modules/deviceinfo.py index 0d6b2e6..5daf86c 100644 --- a/fiksofficial/python-modules/deviceinfo.py +++ b/fiksofficial/python-modules/deviceinfo.py @@ -449,7 +449,7 @@ class DeviceInfo(loader.Module): await call.edit( text=self.strings["no_results"].format(query), reply_markup=[], - photo=None, # Explicitly remove any existing photo + photo=None, disable_web_page_preview=True ) except Exception as edit_error: @@ -471,7 +471,7 @@ class DeviceInfo(loader.Module): await call.edit( text=list_text, reply_markup=button_rows, - photo=None, # Explicitly remove any existing photo + photo=None, disable_web_page_preview=True ) except Exception as edit_error: @@ -491,8 +491,8 @@ class DeviceInfo(loader.Module): await call.edit( text=self.strings["error"].format(str(e)), reply_markup=[], - photo=None, # Explicitly remove any existing photo - disable_web_page_preview=True + photo=None, + disable_web_page_preview=True ) except Exception as edit_error: logger.warning(f"DeviceInfo: Failed to edit error message: {edit_error}") diff --git a/fiksofficial/python-modules/speedtest.py b/fiksofficial/python-modules/speedtest.py index 1b50e27..bdcaa7a 100644 --- a/fiksofficial/python-modules/speedtest.py +++ b/fiksofficial/python-modules/speedtest.py @@ -1,3 +1,12 @@ +# ______ ___ ___ _ _ +# ____ | ___ \ | \/ | | | | | +# / __ \| |_/ / _| . . | ___ __| |_ _| | ___ +# / / _` | __/ | | | |\/| |/ _ \ / _` | | | | |/ _ \ +# | | (_| | | | |_| | | | | (_) | (_| | |_| | | __/ +# \ \__,_\_| \__, \_| |_/\___/ \__,_|\__,_|_|\___| +# \____/ __/ | +# |___/ + # На модуль распространяется лицензия "GNU General Public License v3.0" # https://github.com/all-licenses/GNU-General-Public-License-v3.0 @@ -5,30 +14,147 @@ # requires: speedtest-cli import speedtest -from .. import loader +from .. import loader, utils +@loader.tds class SpeedTestMod(loader.Module): - """Модуль для проверки скорости интернета""" + """Checking your internet speed""" + strings = { + "name": "SpeedTest", + "starting": "Running Speedtest…", + "ping": "Ping: {:.2f} ms", + "download": "Download: {:.2f} Mbps", + "upload": "Upload: {:.2f} Mbps", + "finished": "Speedtest completed!", + "error": "Speedtest error: {}", + "progress_ping": "Testing \"Ping\"...", + "progress_download": "Testing \"Download\"...", + "progress_upload": "Testing \"Upload\"...", + "cfg_timeout": "Server request timeout (sec)", + "cfg_retries": "Number of retry attempts", + "quality_website": "Websites: {}", + "quality_video": "Video: {}", + "quality_gaming": "Gaming: {}", + "quality_calls": "Video calls: {}", + } - strings = {"name": "SpeedTest"} + strings_ru = { + "_cls_doc": "Проверка скорости интернета", + "starting": "Запускаем Speedtest…", + "ping": "Ping: {:.2f} мс", + "download": "Загрузка: {:.2f} Мбит/с", + "upload": "Отдача: {:.2f} Мбит/с", + "finished": "Speedtest завершён!", + "error": "Ошибка при выполнении Speedtest: {}", + "progress_ping": "Тестируем пинг...", + "progress_download": "Тестируем скачивание...", + "progress_upload": "Тестируем загрузку...", + "cfg_timeout": "Таймаут запросов к серверу (сек)", + "cfg_retries": "Кол‑во попыток при неудаче", + "quality_website": "Сайты: {}", + "quality_video": "Видео: {}", + "quality_gaming": "Игры: {}", + "quality_calls": "Видеосвязь: {}", + } + def __init__(self): + self.config = loader.ModuleConfig( + loader.ConfigValue( + "timeout", + 30, + lambda: self.strings("cfg_timeout"), + validator=loader.validators.Integer(minimum=10, maximum=120), + ), + loader.ConfigValue( + "retries", + 2, + lambda: self.strings("cfg_retries"), + validator=loader.validators.Integer(minimum=0, maximum=5), + ), + ) + + def _get_quality_rating(self, category: str, ping: float, download: float, upload: float) -> str: + if category == "website": + if ping < 50 and download > 5: + return "🟢🟢🟢🟢🟢" + elif ping < 100 and download > 3: + return "🟠🟠🟠🟠" + elif ping < 200 and download > 1: + return "🟡🟡🟡" + elif ping < 300 and download > 0.5: + return "🔴🔴" + else: + return "⚫" + elif category == "video": + if ping < 50 and download > 25: + return "🟢🟢🟢🟢🟢" + elif ping < 75 and download > 5: + return "🟠🟠🟠🟠" + elif ping < 100 and download > 3: + return "🟡🟡🟡" + elif ping < 150 and download > 1: + return "🔴🔴" + else: + return "⚫" + elif category == "gaming": + if ping < 50 and download > 5 and upload > 3: + return "🟢🟢🟢🟢🟢" + elif ping < 100 and download > 3 and upload > 1: + return "🟠🟠🟠🟠" + elif ping < 150 and download > 1 and upload > 0.5: + return "🟡🟡🟡" + elif ping < 200 and download > 0.5: + return "🔴🔴" + else: + return "⚫" + elif category == "calls": + if ping < 50 and download > 4 and upload > 4: + return "🟢🟢🟢🟢🟢" + elif ping < 100 and download > 1.5 and upload > 1.5: + return "🟡🟡🟡🟡" + elif ping < 150 and download > 1 and upload > 1: + return "🟠🟠🟠" + elif ping < 200 and download > 0.5: + return "🔴🔴" + else: + return "⚫" + return "⚫" + + @loader.command( + ru_doc="(.st) - Запускает тест скорости интернета", + en_doc="(.st) - Runs an internet speed test", + alias="st", + ) async def speedcmd(self, message): - """Запускает тест скорости интернета""" - msg = await message.edit("Запускаем Speedtest... 🏁") + msg = await utils.answer(message, self.strings("starting")) try: - st = speedtest.Speedtest() - st.get_best_server() - download = st.download() / 1_000_000 # Мбит/с - upload = st.upload() / 1_000_000 # Мбит/с - ping = st.results.ping + s = speedtest.Speedtest() + s.get_best_server() + await utils.answer(msg, self.strings("progress_ping")) - await msg.edit( - f" Speedtest завершён! \n\n" - f"Ping: {ping:.2f} ms\n" - f"📥 Загрузка: {download:.2f} Mbps\n" - f"📤 Отдача: {upload:.2f} Mbps", - parse_mode="HTML" + ping = s.results.ping + + await utils.answer(msg, self.strings("progress_download")) + download = s.download() / 1_000_000 + + await utils.answer(msg, self.strings("progress_upload")) + upload = s.upload() / 1_000_000 + + text = ( + f"{self.strings('finished')}\n\n" + f"{self.strings('ping').format(ping)}\n" + f"{self.strings('download').format(download)}\n" + f"{self.strings('upload').format(upload)}\n\n" + f"{self.strings('quality_website').format(self._get_quality_rating('website', ping, download, upload))}\n" + f"{self.strings('quality_video').format(self._get_quality_rating('video', ping, download, upload))}\n" + f"{self.strings('quality_gaming').format(self._get_quality_rating('gaming', ping, download, upload))}\n" + f"{self.strings('quality_calls').format(self._get_quality_rating('calls', ping, download, upload))}" + ) + + await utils.answer(msg, text) + except Exception as exc: + await utils.answer( + msg, + self.strings("error").format(utils.escape_html(str(exc))), ) - except Exception as e: - await msg.edit(f"Ошибка при выполнении Speedtest:\n{e}") diff --git a/unneyon/hikka-mods/langpacks/yamusic.yml b/unneyon/hikka-mods/langpacks/yamusic.yml index 1b7efe0..6ba23ec 100644 --- a/unneyon/hikka-mods/langpacks/yamusic.yml +++ b/unneyon/hikka-mods/langpacks/yamusic.yml @@ -1,63 +1,63 @@ en: guide: "📜 Guide for obtaining access token for Yandex.Music" - iguide: "📜 Guide for obtaining access token for Yandex.Music" - no_token: " You didn't specify the access token in the config!" + search: "🎧 {performer} — {title}\n🎵 Yandex.Music | song.link" + downloading_track: "\n\n🕔 Downloading audio…" + uploading_banner: "\n\n🕔 Uploading banner…" + lyrics: "📜 Lyrics of the {track} track:\n
{text}
\n\n©️ Writers: {writers}" + no_lyrics: "🚫 Track {track} has no lyrics!" + errors: + no_query: "🚫 Specify the search query first!" + no_token_or_invalid: "🚫 You specified an invalid access token or didn't specified it at all!" + not_found: "🚫 No results found." + no_playing: "🚫 You don't listening to anything right now." autobio: - d: "🎧 Autobio is off now" - e: "🎧 Autobio is on now" - there_is_no_playing: " You don't listening to anything right now" - queue_types: + enabled: "🎧 Autobio was enabled." + disabled: "🎧 Autobio was disabled." + likes: + liked: "❤️ Track {track} was liked." + unliked: "🖤 Track {track} was unliked." + disliked: "💔 Track {track} was disliked." + _entity_types: VARIOUS: "Your queue" - RADIO: "«My Wave»" + RADIO: "«My Vibe»" PLAYLIST: "Playlist «{}»" ALBUM: "«{}»" ARTIST: "Popular tracks by {}" - downloading: "\n\n🕔 Downloading audio…" - uploading_banner: "\n\n🕔 Uploading banner…" - likes: - liked: "❤️ Track {track} was liked" - unliked: "❤️ Track {track} was unliked" - disliked: "💔 Track {track} was disliked" - lyrics: "📜 Lyrics of the {track} track:\n
{text}
\n\n©️ Writers: {writers}" - no_lyrics: " Track {track} has no lyrics!" - args: " Specify search query" - searching: "🔍 Searching…" - 404: " No results found" - search: "🎧 {performer} — {title}\n🎵 Yandex.Music | song.link" _cfg: - token: "Your access token for Yandex.Music" - now_playing_text: "The text that is used in commands to get now playing track. May contain {performer}, {title}, {device}, {volume}, {playing_from}, {link}, {track_id}, {album_id} keywords" - autobio: "Automatic bio template (may contain {artist} and {title} keywords)" - no_playing_bio: "Bio that is set when nothing is playing" + token: "The access token for Yandex.Music." + now_playing_text: "The caption for .ynow and .ynowt commands. May contain {performer}, {title}, {device}, {volume}, {playing_from}, {link}, {track_id}, {album_id} keywords." + autobio_text: "The text for automatically changing «Bio». May contains {performer} and {title}." + no_playing_bio_text: "The text for changing «Bio» when there is no playing tracks." + banner_version: "Banner version (old / new with lyrics / new without lyrics)." ru: guide: "📜 Гайд по получению токена Яндекс.Музыки" - iguide: "📜 Гайд по получению токена Яндекс.Музыки" - no_token: " Вы не указали токен Яндекс.Музыки в конфиге!" + search: "🎧 {performer} — {title}\n🎵 Яндекс.Музыка | song.link" + downloading_track: "\n\n🕔 Загрузка трека…" + uploading_banner: "\n\n🕔 Загрузка баннера…" + lyrics: "📜 Текст трека {track}:\n
{text}
\n\n©️ Авторы: {writers}" + no_lyrics: "🚫 У трека {track} нет текста!" + errors: + no_query: "🚫 Укажите поисковый запрос!" + no_token_or_invalid: "🚫 Вы указали невалидный токен или не указали его вообще!" + not_found: "🚫 Результаты не найдены." + no_playing: "🚫 Вы ничего не слушаете сейчас." autobio: - d: "🎧 Автобио выключено" - e: "🎧 Автобио включено" - there_is_no_playing: " Вы ничего не слушаете сейчас" - queue_types: + enabled: "🎧 Автобио теперь включено." + disabled: "🎧 Автобио теперь выключено." + likes: + liked: "❤️ Трек {track} был лайкнут." + unliked: "🖤 С трека {track} был снят лайк." + disliked: "💔 Трек {track} был дизлайкнут." + _entity_types: VARIOUS: "Ваша очередь" - RADIO: "«Моя Волна»" + RADIO: "«Моя волна»" PLAYLIST: "Плейлист «{}»" ALBUM: "«{}»" ARTIST: "Популярные треки {}" - downloading: "\n\n🕔 Загрузка трека…" - uploading_banner: "\n\n🕔 Загрузка баннера…" - likes: - liked: "❤️ Трек {track} лайкнут" - unliked: "❤️ С трека {track} снят лайк" - disliked: "💔 Трек {track} дизлайкнут" - lyrics: "📜 Текст трека {track}:\n
{text}
\n\n©️ Авторы: {writers}" - no_lyrics: " У трека {track} нет текста!" - args: " Укажите поисковый запрос" - searching: "🔍 Ищем…" - 404: " Ничего не найдено" - search: "🎧 {performer} — {title}\n🎵 Яндекс.Музыка | song.link" _cfg: - token: "Ваш токен от Яндекс.Музыки" - now_playing_text: "Текст, использующийся в командах для получения прослушиваемого трека. Может содержать ключевые слова {performer}, {title}, {device}, {volume}, {playing_from}, {link}, {track_id}, {album_id}" - autobio: "Шаблон автоматического био (может содержать ключевые слова {artist} и {title})" - no_playing_bio: "Био, которое ставится, когда ничего не играет" \ No newline at end of file + token: "Токен для Яндекс.Музыки." + now_playing_text: "Текст, использующийся в подписи к файлу в командах .ynow и .ynowt. Может содержать {performer}, {title}, {device}, {volume}, {playing_from}, {link}, {track_id} и {album_id}" + autobio_text: "Текст, использующийся при автоматическом изменении «О себе». Может содержать {performer} и {title}." + no_playing_bio_text: "Текст, использующийся при изменении «О себе», когда ничего не играет." + banner_version: "Версия баннера (старый / новый с текстом трека / новый без текста трека)." diff --git a/unneyon/hikka-mods/yamusic.py b/unneyon/hikka-mods/yamusic.py index 18ef573..e21a139 100644 --- a/unneyon/hikka-mods/yamusic.py +++ b/unneyon/hikka-mods/yamusic.py @@ -1,24 +1,26 @@ -__version__ = (1, 1, 0) +__version__ = (2, 0, 0) +# region KAMEKURO. # █▄▀ ▄▀█ █▀▄▀█ █▀▀ █▄▀ █ █ █▀█ █▀█ # █ █ █▀█ █ ▀ █ ██▄ █ █ ▀▄▄▀ █▀▄ █▄█ ▄ # © Copyright 2025 # ✈ https://t.me/kamekuro - # 🔒 Licensed under CC-BY-NC-ND 4.0 unless otherwise specified. # 🌐 https://creativecommons.org/licenses/by-nc-nd/4.0 # + attribution # + non-commercial # + no-derivatives - # You CANNOT edit, distribute or redistribute this file without direct permission from the author. +# region YaMusic +# ▀▄▀ ▄▀█ █▀▄▀█ █ █ █▀▀ █ █▀▀ +# █ █▀█ █ ▀ █ ▀▄▄▀ ▄▄█ █ █▄▄ # meta banner: https://raw.githubusercontent.com/kamekuro/hikka-mods/main/banners/yamusic.png # meta pic: https://raw.githubusercontent.com/kamekuro/hikka-mods/main/icons/yamusic.png # meta developer: @kamekuro_hmods # packurl: https://raw.githubusercontent.com/kamekuro/hikka-mods/main/langpacks/yamusic.yml -# scope: hikka_only -# scope: hikka_min 1.6.3 -# requires: aiohttp asyncio requests pillow==11.2.1 git+https://github.com/MarshalX/yandex-music-api +# scope: heroku_only +# scope: heroku_min 1.7.2 +# requires: aiohttp asyncio requests pillow==12.0.0 git+https://github.com/MarshalX/yandex-music-api import aiohttp import asyncio @@ -26,17 +28,14 @@ import io import json import logging import random -import re import requests import string -import yandex_music +import textwrap +from PIL import Image, ImageDraw, ImageEnhance, ImageFilter, ImageFont import telethon -import textwrap -from PIL import ( - Image, ImageDraw, ImageEnhance, - ImageFilter, ImageFont -) +import yandex_music +import yandex_music.exceptions from .. import loader, utils @@ -44,39 +43,637 @@ from .. import loader, utils logger = logging.getLogger(__name__) -class YandexMusic(): - token: str - client: yandex_music.ClientAsync +class Banners: + def __init__( + self, + title: str, + artists: list, + duration: int, + progress: int, + track_cover: bytes, + ): + self.title = title + self.artists = artists + self.duration = duration + self.progress = progress + self.track_cover = track_cover + self.onest_b = "https://raw.githubusercontent.com/kamekuro/assets/master/fonts/Onest-Bold.ttf" + self.onest_r = "https://raw.githubusercontent.com/kamekuro/assets/master/fonts/Onest-Regular.ttf" + self.ysmusic_hb = "https://raw.githubusercontent.com/kamekuro/assets/master/fonts/YSMusic-HeadlineBold.ttf" + + def measure( + self, text: str, font: ImageFont.FreeTypeFont, draw: ImageDraw.ImageDraw + ): + bbox = draw.textbbox((0, 0), text, font=font) + return bbox[2] - bbox[0], bbox[3] - bbox[1] + + def new(self): + W, H = 1920, 768 + title_font = ImageFont.truetype( + io.BytesIO(requests.get(self.onest_b).content), 80 + ) + artist_font = ImageFont.truetype( + io.BytesIO(requests.get(self.onest_b).content), 55 + ) + time_font = ImageFont.truetype( + io.BytesIO(requests.get(self.onest_b).content), 36 + ) + + track_cov = Image.open(io.BytesIO(self.track_cover)).convert("RGBA") + banner = ( + track_cov.resize((W, W)) + .crop((0, (W - H) // 2, W, ((W - H) // 2) + H)) + .filter(ImageFilter.GaussianBlur(radius=14)) + ) + banner = ImageEnhance.Brightness(banner).enhance(0.3) + draw = ImageDraw.Draw(banner) + + track_cov = track_cov.resize((H - 250, H - 250)) + mask = Image.new("L", track_cov.size, 0) + ImageDraw.Draw(mask).rounded_rectangle( + (0, 0, track_cov.size[0], track_cov.size[1]), radius=35, fill=255 + ) + track_cov.putalpha(mask) + track_cov = track_cov.crop(track_cov.getbbox()) + banner.paste(track_cov, (75, 75), mask) + + space = (643, 75, 1870, 593) + title_lines = textwrap.wrap(self.title, width=23) + if len(title_lines) > 2: + title_lines = title_lines[:2] + title_lines[-1] = title_lines[-1][:-1] + "…" + artist_lines = textwrap.wrap(", ".join(self.artists), width=23) + if len(artist_lines) > 1: + artist_lines = artist_lines[:1] + artist_lines[-1] = artist_lines[-1][:-1] + "…" + lines = title_lines + artist_lines + lines_sizes = [ + self.measure( + line, artist_font if (i == len(lines) - 1) else title_font, draw + ) + for i, line in enumerate(lines) + ] + heights = [h for _, h in lines_sizes] + spacing = title_font.size + 10 + y_start = space[1] + (space[3] - space[1]) / 2 + for i, line in enumerate(lines): + w, _ = lines_sizes[i] + draw.text( + (space[0] + (space[2] - space[0] - w) / 2, y_start), + line, + font=(artist_font if (i == (len(lines) - 1)) else title_font), + fill="#FFFFFF", + ) + y_start += spacing + + draw.text( + (75, 650), + f"{(self.progress//1000//60):02}:{(self.progress//1000%60):02}", + font=time_font, + fill="#FFFFFF", + ) + draw.text( + (1745, 650), + f"{(self.duration//1000//60):02}:{(self.duration//1000%60):02}", + font=time_font, + fill="#FFFFFF", + ) + draw.rounded_rectangle([75, 700, 1845, 715], radius=15 // 2, fill="#A0A0A0") + draw.rounded_rectangle( + [75, 700, int(75 + (1770 * self.progress / self.duration)), 715], + radius=15 // 2, + fill="#FFFFFF", + ) + + by = io.BytesIO() + banner.save(by, format="PNG") + by.seek(0) + by.name = "banner.png" + return by + + def old(self): + w, h = 1920, 768 + title_font = ImageFont.truetype( + io.BytesIO(requests.get(self.onest_b).content), 80 + ) + art_font = ImageFont.truetype( + io.BytesIO(requests.get(self.onest_r).content), 55 + ) + time_font = ImageFont.truetype( + io.BytesIO(requests.get(self.onest_b).content), 36 + ) + + track_cov = Image.open(io.BytesIO(self.track_cover)).convert("RGBA") + banner = ( + track_cov.resize((w, w)) + .crop((0, (w - h) // 2, w, ((w - h) // 2) + h)) + .filter(ImageFilter.GaussianBlur(radius=14)) + ) + banner = ImageEnhance.Brightness(banner).enhance(0.3) + + track_cov = track_cov.resize((banner.size[1] - 150, banner.size[1] - 150)) + mask = Image.new("L", track_cov.size, 0) + ImageDraw.Draw(mask).rounded_rectangle( + (0, 0, track_cov.size[0], track_cov.size[1]), radius=35, fill=255 + ) + track_cov.putalpha(mask) + track_cov = track_cov.crop(track_cov.getbbox()) + banner.paste(track_cov, (75, 75), mask) + + title_lines = textwrap.wrap(self.title, 23) + if len(title_lines) > 1: + title_lines[1] = ( + title_lines[1] + "..." if len(title_lines) > 2 else title_lines[1] + ) + title_lines = title_lines[:2] + artists_lines = textwrap.wrap(" • ".join(self.artists), width=40) + if len(artists_lines) > 1: + for index, art in enumerate(artists_lines): + if "•" in art[-2:]: + artists_lines[index] = art[: art.rfind("•") - 1] + + draw = ImageDraw.Draw(banner) + x, y = 150 + track_cov.size[0], 110 + for index, line in enumerate(title_lines): + draw.text((x, y), line, font=title_font, fill="#FFFFFF") + if index != len(title_lines) - 1: + y += 70 + x, y = 150 + track_cov.size[0], 110 * 2 + if len(title_lines) > 1: + y += 70 + for index, line in enumerate(artists_lines): + draw.text((x, y), line, font=art_font, fill="#A0A0A0") + if index != len(artists_lines) - 1: + y += 50 + + draw.rounded_rectangle( + [768, 650, 768 + 1072, 650 + 15], radius=15 // 2, fill="#A0A0A0" + ) + draw.rounded_rectangle( + [768, 650, 768 + int(1072 * (self.progress / self.duration)), 650 + 15], + radius=15 // 2, + fill="#FFFFFF", + ) + draw.text( + (768, 600), + f"{(self.progress//1000//60):02}:{(self.progress//1000%60):02}", + font=time_font, + fill="#FFFFFF", + ) + draw.text( + (1745, 600), + f"{(self.duration//1000//60):02}:{(self.duration//1000%60):02}", + font=time_font, + fill="#FFFFFF", + ) + + by = io.BytesIO() + banner.save(by, format="PNG") + by.seek(0) + by.name = "banner.png" + return by + + +@loader.tds +class YaMusicMod(loader.Module): + """The module for Yandex.Music streaming service""" + + strings = {"name": "YaMusic", "iguide": "📜 Guide for obtaining access token for Yandex.Music"} + strings_ru = {"_cls_doc": "Модуль для стримингового сервиса Яндекс.Музыка", "iguide": "📜 Гайд по получению токена Яндекс.Музыки"} + + def __init__(self): + self.config = loader.ModuleConfig( + loader.ConfigValue( + option="token", + default=None, + doc=lambda: self.strings["_cfg"]["token"], + validator=loader.validators.Hidden(), + ), + loader.ConfigValue( + option="now_playing_text", + default=( + "🎧 {performer} — {title}\n\n" + "⌨️ Now is listening on " + "{device} (🔊 {volume}%)\n" + "🗂 Playing from: {playing_from}" + "\n\n🎵 {link} | " + 'song.link' + ), + doc=lambda: self.strings["_cfg"]["now_playing_text"], + validator=loader.validators.String(), + ), + loader.ConfigValue( + option="autobio_text", + default="{performer} — {title}", + doc=lambda: self.strings["_cfg"]["autobio_text"], + validator=loader.validators.String(), + ), + loader.ConfigValue( + option="no_playing_bio_text", + default="I use Heroku with YaMusic mod btw", + doc=lambda: self.strings["_cfg"]["no_playing_bio_text"], + validator=loader.validators.String(), + ), + loader.ConfigValue( + option="banner_version", + default="new", + doc=lambda: self.strings["_cfg"]["banner_version"], + validator=loader.validators.Choice(["old", "new"]), + ), + ) + + async def client_ready(self, client, db): + self._client: telethon.TelegramClient = client + self._db = db + if not self.get("guide_sent", False): + await self.inline.bot.send_message(self._tg_id, self.strings("iguide")) + self.set("guide_sent", True) + me = await self._client.get_me() + self._premium = me.premium if hasattr(me, "premium") else False + if self.get("autobio", False): + self.autobio.start() + + @loader.loop(1800, autostart=True) + async def premium_check(self): + me = await self._client.get_me() + self._premium = me.premium if hasattr(me, "premium") else False + + @loader.loop(30) + async def autobio(self): + if not self.config["token"]: + self.autobio.stop() + self.set("autobio", False) + return + now = await self.__get_now_playing() + if now and (not now["paused"]): + out = self.config["autobio_text"].format( + title=now["track"]["title"], + performer=", ".join(now["track"]["artist"]), + ) + else: + out = self.config["no_playing_bio_text"] + try: + await self._client( + telethon.functions.account.UpdateProfileRequest( + about=out[: (140 if self._premium else 70)] + ) + ) + except telethon.errors.rpcerrorlist.FloodWaitError as e: + logger.info(f"Sleeping {max(e.seconds, 60)} because of floodwait") + await asyncio.sleep(max(e.seconds, 60)) + + + @loader.command(ru_doc="👉 Гайд по получению токена Яндекс.Музыки", alias="yg") + async def yguidecmd(self, message: telethon.types.Message): + """👉 Guide for obtaining a Yandex.Music token""" + await utils.answer(message, self.strings("guide")) + + + @loader.command(ru_doc="👉 Включить/выключить автобио", alias="yb") + async def ybiocmd(self, message: telethon.types.Message): + """👉 Enable/disable autobio""" + try: + ym_client = await yandex_music.ClientAsync(self.config["token"]).init() + except yandex_music.exceptions.UnauthorizedError: + return await utils.answer( + message, self.strings("errors")["no_token_or_invalid"] + ) + + bio = not self.get("autobio", False) + self.set("autobio", bio) + if bio: + await self.autobio.func(self) + self.autobio.start() + else: + self.autobio.stop() + try: + await self._client( + telethon.functions.account.UpdateProfileRequest( + about=self.config["no_playing_bio_text"][ + : (140 if self._premium else 70) + ] + ) + ) + except: + pass + + bio = self.get("autobio", False) + await utils.answer( + message, self.strings("autobio")["enabled" if bio else "disabled"] + ) + + + @loader.command(ru_doc="👉 Поиск треков в Яндекс.Музыке", alias="yq") + async def ysearchcmd(self, message: telethon.types.Message): + """👉 Searching tracks in Yandex.Music""" + try: + ym_client = await yandex_music.ClientAsync(self.config["token"]).init() + except yandex_music.exceptions.UnauthorizedError: + return await utils.answer( + message, self.strings("errors")["no_token_or_invalid"] + ) + query = utils.get_args_raw(message) + if not query: + return await utils.answer(message, self.strings("errors")["no_query"]) + search = await ym_client.search(query, type_="track") + if (not search.tracks) or (len(search.tracks.results) == 0): + return await utils.answer(message, self.strings("errors")["not_found"]) + + track = search.tracks.results[0] + out = self.strings("search").format( + title=track.title, + performer=", ".join(track.artists_name()), + track_id=track.track_id, + ) + await utils.answer(message, out + self.strings("downloading_track")) + + info = await ym_client.tracks_download_info(search.tracks.results[0].id, True) + audio = io.BytesIO(requests.get(info[0].direct_link).content) + audio.name = "audio.mp3" + await utils.answer( + message=message, + response=out, + file=audio, + attributes=( + [ + telethon.types.DocumentAttributeAudio( + duration=int(search.tracks.results[0].duration_ms / 1000), + title=search.tracks.results[0].title, + performer=", ".join( + [x.name for x in search.tracks.results[0].artists] + ), + ) + ] + ), + ) + + + @loader.command( + ru_doc="👉 Получить баннер трека, который играет сейчас", alias="yn" + ) + async def ynowcmd(self, message: telethon.types.Message): + """👉 Get the banner of the track playing right now""" + try: + ym_client = await yandex_music.ClientAsync(self.config["token"]).init() + except yandex_music.exceptions.UnauthorizedError: + return await utils.answer( + message, self.strings("errors")["no_token_or_invalid"] + ) + await utils.answer(message, self.strings("uploading_banner")) + now = await self.__get_now_playing() + if not now: + return await utils.answer(message, self.strings("errors")["no_playing"]) + track_object = (await ym_client.tracks(now["playable_id"]))[0] + + playlist_name = "" + if now["entity_type"] == "PLAYLIST": + playlist = (await ym_client.playlists_list(now["entity_id"]))[0] + playlist_name = ( + f'{playlist.title}' + ) + if now["entity_type"] == "ALBUM": + album = (await ym_client.albums(now["entity_id"]))[0] + playlist_name = ( + f'{album.title}' + ) + if now["entity_type"] == "ARTIST": + artist = (await ym_client.artists(now["entity_id"]))[0] + playlist_name = ( + f'{artist.name}' + ) + if now["entity_type"] not in self.strings("_entity_types").keys(): + now["entity_type"] = "VARIOUS" + + device, volume = "Unknown Device", "❔" + if now["device"]: + device = now["device"][0]["info"]["title"] + volume = round(now["device"][0]["volume"] * 100, 2) + out = self.config["now_playing_text"].format( + performer=", ".join(now["track"]["artist"]), + title=now["track"]["title"], + device=device, + volume=volume, + track_id=now["track"]["track_id"], + album_id=now["track"]["album_id"], + playing_from=self.strings("_entity_types") + .get(now["entity_type"]) + .format(playlist_name), + link=f"Яндекс.Музыка", + ) + try: + await utils.answer(message, out + self.strings("uploading_banner")) + except: + pass + + banners = Banners( + title=now["track"]["title"], + artists=now["track"]["artist"], + duration=now["duration_ms"], + progress=now["progress_ms"], + track_cover=requests.get(now["track"]["img"]).content, + ) + file = getattr(banners, self.config["banner_version"], banners.new)() + await utils.answer(message=message, response=out, file=file) + + + @loader.command(ru_doc="👉 Получить трек, который играет сейчас", alias="ynt") + async def ynowtcmd(self, message: telethon.types.Message): + """👉 Get the track playing right now""" + try: + ym_client = await yandex_music.ClientAsync(self.config["token"]).init() + except yandex_music.exceptions.UnauthorizedError: + return await utils.answer( + message, self.strings("errors")["no_token_or_invalid"] + ) + await utils.answer(message, self.strings("downloading_track")) + now = await self.__get_now_playing() + if not now: + return await utils.answer(message, self.strings("errors")["no_playing"]) + + playlist_name = "" + if now["entity_type"] == "PLAYLIST": + playlist = (await ym_client.playlists_list(now["entity_id"]))[0] + playlist_name = ( + f'{playlist.title}' + ) + if now["entity_type"] == "ALBUM": + album = (await ym_client.albums(now["entity_id"]))[0] + playlist_name = ( + f'{album.title}' + ) + if now["entity_type"] == "ARTIST": + artist = (await ym_client.artists(now["entity_id"]))[0] + playlist_name = ( + f'{artist.name}' + ) + if now["entity_type"] not in self.strings("_entity_types").keys(): + now["entity_type"] = "VARIOUS" + + device, volume = "Unknown Device", "❔" + if now["device"]: + device = now["device"][0]["info"]["title"] + volume = round(now["device"][0]["volume"] * 100, 2) + out = self.config["now_playing_text"].format( + performer=", ".join(now["track"]["artist"]), + title=now["track"]["title"], + device=device, + volume=volume, + track_id=now["track"]["track_id"], + album_id=now["track"]["album_id"], + playing_from=self.strings("_entity_types") + .get(now["entity_type"]) + .format(playlist_name), + link=f"Яндекс.Музыка", + ) + try: + await utils.answer(message, out + self.strings("downloading_track")) + except: + pass + + audio = io.BytesIO(requests.get(now["track"]["download_link"]).content) + audio.name = "audio.mp3" + await utils.answer( + message=message, + response=out, + file=audio, + attributes=( + [ + telethon.types.DocumentAttributeAudio( + duration=int(now["duration_ms"] / 1000), + title=now["track"]["title"], + performer=", ".join(now["track"]["artist"]), + ) + ] + ), + ) + + + @loader.command(ru_doc="👉 Лайкнуть играющий сейчас трек") + async def ylikecmd(self, message: telethon.types.Message): + """👉 Like the track playing right now""" + try: + ym_client = await yandex_music.ClientAsync(self.config["token"]).init() + except yandex_music.exceptions.UnauthorizedError: + return await utils.answer( + message, self.strings("errors")["no_token_or_invalid"] + ) + now = await self.__get_now_playing() + if not now: + return await utils.answer(message, self.strings("errors")["no_playing"]) + await ym_client.users_likes_tracks_add(now["track"]["track_id"]) + await utils.answer( + message, + self.strings("likes")["liked"].format( + track_id=now["track"]["track_id"], + track=f"{', '.join(now['track']['artist'])} — {now['track']['title']}", + ), + ) + + @loader.command(ru_doc="👉 Снять лайк с играющего сейчас трека") + async def yunlikecmd(self, message: telethon.types.Message): + """👉 Unlike the track playing right now""" + try: + ym_client = await yandex_music.ClientAsync(self.config["token"]).init() + except yandex_music.exceptions.UnauthorizedError: + return await utils.answer( + message, self.strings("errors")["no_token_or_invalid"] + ) + now = await self.__get_now_playing() + if not now: + return await utils.answer(message, self.strings("errors")["no_playing"]) + await ym_client.users_likes_tracks_remove(now["track"]["track_id"]) + await utils.answer( + message, + self.strings("likes")["unliked"].format( + track_id=now["track"]["track_id"], + track=f"{', '.join(now['track']['artist'])} — {now['track']['title']}", + ), + ) + + @loader.command(ru_doc="👉 Дизлайкнуть играющий сейчас трек") + async def ydislikecmd(self, message: telethon.types.Message): + """👉 Dislike the track playing right now""" + try: + ym_client = await yandex_music.ClientAsync(self.config["token"]).init() + except yandex_music.exceptions.UnauthorizedError: + return await utils.answer( + message, self.strings("errors")["no_token_or_invalid"] + ) + now = await self.__get_now_playing() + if not now: + return await utils.answer(message, self.strings("errors")["no_playing"]) + await ym_client.users_dislikes_tracks_add(now["track"]["track_id"]) + await utils.answer( + message, + self.strings("likes")["disliked"].format( + track_id=now["track"]["track_id"], + track=f"{', '.join(now['track']['artist'])} — {now['track']['title']}", + ), + ) + + + @loader.command(ru_doc="👉 Получить текст играющего сейчас трека") + async def ylyricscmd(self, message: telethon.types.Message): + """👉 Get the lyrics of the track playing right now""" + try: + ym_client = await yandex_music.ClientAsync(self.config["token"]).init() + except yandex_music.exceptions.UnauthorizedError: + return await utils.answer( + message, self.strings("errors")["no_token_or_invalid"] + ) + now = await self.__get_now_playing() + if not now: + return await utils.answer(message, self.strings("errors")["no_playing"]) + try: + lyrics = await ym_client.tracks_lyrics(now["track"]["track_id"]) + await utils.answer( + message, + self.strings("lyrics").format( + track_id=now["track"]["track_id"], + track=f"{', '.join(now['track']['artist'])} — {now['track']['title']}", + text=requests.get(lyrics.download_url).text, + writers=", ".join(lyrics.writers) if lyrics.writers else "Unknown", + ), + ) + except yandex_music.exceptions.NotFoundError: + await utils.answer( + message, + self.strings("no_lyrics").format( + track_id=now["track"]["track_id"], + track=f"{', '.join(now['track']['artist'])} — {now['track']['title']}", + ), + ) - def __init__(self, token: str): - self.client = yandex_music.ClientAsync(token) - self.token = token - async def init(self): - self.client = await self.client.init() - return self # Original code: https://raw.githubusercontent.com/MIPOHBOPOHIH/YMMBFA/main/main.py - async def _create_ynison_ws(self, ws_proto: dict) -> dict: - async with aiohttp.ClientSession() as session: - async with session.ws_connect( - "wss://ynison.music.yandex.ru/redirector.YnisonRedirectService/GetRedirectToYnison", - headers={ - "Sec-WebSocket-Protocol": f"Bearer, v2, {json.dumps(ws_proto)}", - "Origin": "http://music.yandex.ru", - "Authorization": f"OAuth {self.token}", - }, - ) as ws: - response = await ws.receive() - return json.loads(response.data) + async def __get_ynison(self): + async def create_ws(token, ws_proto): + async with aiohttp.ClientSession() as session: + async with session.ws_connect( + "wss://ynison.music.yandex.ru/redirector.YnisonRedirectService/GetRedirectToYnison", + headers={ + "Sec-WebSocket-Protocol": f"Bearer, v2, {json.dumps(ws_proto)}", + "Origin": "http://music.yandex.ru", + "Authorization": f"OAuth {token}", + }, + ) as ws: + response = await ws.receive() + return json.loads(response.data) - # Original code: https://raw.githubusercontent.com/MIPOHBOPOHIH/YMMBFA/main/main.py - async def _get_ynison(self): - device_id = ''.join(random.choices(string.ascii_lowercase, k=16)) + device_id = "".join(random.choices(string.ascii_lowercase, k=16)) ws_proto = { "Ynison-Device-Id": device_id, "Ynison-Device-Info": json.dumps({"app_name": "Chrome", "type": 1}), } - data = await self._create_ynison_ws(ws_proto) + data = await create_ws(self.config["token"], ws_proto) ws_proto["Ynison-Redirect-Ticket"] = data["redirect_ticket"] payload = { "update_full_state": { @@ -88,7 +685,11 @@ class YandexMusic(): "playable_list": [], "options": {"repeat_mode": "NONE"}, "entity_context": "BASED_ON_ENTITY_BY_DEFAULT", - "version": {"device_id": device_id, "version": 9021243204784341000, "timestamp_ms": 0}, + "version": { + "device_id": device_id, + "version": 9021243204784341000, + "timestamp_ms": 0, + }, "from_optional": "", }, "status": { @@ -96,11 +697,19 @@ class YandexMusic(): "paused": True, "playback_speed": 1, "progress_ms": 0, - "version": {"device_id": device_id, "version": 8321822175199937000, "timestamp_ms": 0}, + "version": { + "device_id": device_id, + "version": 8321822175199937000, + "timestamp_ms": 0, + }, }, }, "device": { - "capabilities": {"can_be_player": True, "can_be_remote_controller": False, "volume_granularity": 16}, + "capabilities": { + "can_be_player": True, + "can_be_remote_controller": False, + "volume_granularity": 16, + }, "info": { "device_id": device_id, "type": "WEB", @@ -122,650 +731,62 @@ class YandexMusic(): headers={ "Sec-WebSocket-Protocol": f"Bearer, v2, {json.dumps(ws_proto)}", "Origin": "http://music.yandex.ru", - "Authorization": f"OAuth {self.token}", - } + "Authorization": f"OAuth {self.config['token']}", + }, ) as ws: await ws.send_str(json.dumps(payload)) response = await ws.receive() ynison: dict = json.loads(response.data) return ynison - - async def get_lyrics(self, track_id: int, with_timecodes: bool = False): - t = (await self.client.tracks(track_id))[0] - if with_timecodes: - if t.lyrics_info.has_available_sync_lyrics: - lyrics = await self.client.tracks_lyrics(track_id, "LRC") - return { - "text": requests.get(lyrics.download_url).text, - "writers": lyrics.writers - } - else: - if t.lyrics_info.has_available_text_lyrics: - lyrics = await self.client.tracks_lyrics(track_id, "TEXT") - return { - "text": requests.get(lyrics.download_url).text, - "writers": lyrics.writers - } - return None - async def get_now_playing(self): - ynison = await self._get_ynison() - if len(ynison.get("player_state", {}).get("player_queue", {}).get("playable_list", [])) == 0: + async def __get_now_playing(self): + if not self.config["token"]: + return {} + try: + ym_client = await yandex_music.ClientAsync(self.config["token"]).init() + except yandex_music.exceptions.UnauthorizedError: + return {} + + ynison = await self.__get_ynison() + if (len(ynison.get("player_state", {}).get("player_queue", {}).get("playable_list", [])) == 0): return {} raw_track = ynison["player_state"]["player_queue"]["playable_list"][ ynison["player_state"]["player_queue"]["current_playable_index"] ] - return { - "paused": ynison["player_state"]["status"]["paused"], - "duration_ms": int(ynison["player_state"]["status"]["duration_ms"]), - "progress_ms": int(ynison["player_state"]["status"]["progress_ms"]), - "entity_id": ynison["player_state"]["player_queue"]["entity_id"], - "entity_type": ynison["player_state"]["player_queue"]["entity_type"], - "playable_id": raw_track["playable_id"], - "device": [ - x for x in ynison['devices'] - if x['info']['device_id'] == ynison.get('active_device_id_optional', "") - ], - "track": (await self.client.tracks(raw_track["playable_id"]))[0] - } if raw_track['playable_type'] != "LOCAL_TRACK" else {} - - -@loader.tds -class YaMusicMod(loader.Module): - """The module for Yandex.Music streaming service""" - strings = {"name": "YaMusic"} - strings_ru = {"_cls_doc": "Модуль для стримингового сервиса Яндекс.Музыка"} - - def __init__(self): - self.config = loader.ModuleConfig( - loader.ConfigValue( - "token", - None, - lambda: self.strings["_cfg"]["token"], - validator=loader.validators.Hidden() - ), - loader.ConfigValue( - "now_playing_text", - "🎧 {performer} — {title}\n\n" \ - "⌨️ Now is listening on " \ - "{device} (🔊 " \ - "{volume}%)\n🗂 Playing from: " \ - "{playing_from}\n\n🎵 {link} | " \ - "song.link", - lambda: self.strings["_cfg"]["now_playing_text"], - validator=loader.validators.String() - ), - loader.ConfigValue( - "autobio", - "🎧 {artist} - {title}", - lambda: self.strings["_cfg"]["autobio"], - validator=loader.validators.String() - ), - loader.ConfigValue( - "no_playing_bio", - "Hello!", - lambda: self.strings["_cfg"]["no_playing_bio"], - validator=loader.validators.String() - ), - loader.ConfigValue( - "banner_version", - "new", - "Version of track banner (old/new)", - validator=loader.validators.Choice(["new", "old"]) - ) + ym_client = await yandex_music.ClientAsync(self.config["token"]).init() + track_object = (await ym_client.tracks(raw_track["playable_id"]))[0] + return ( + { + "paused": ynison["player_state"]["status"]["paused"], + "playable_id": raw_track["playable_id"], + "duration_ms": int(ynison["player_state"]["status"]["duration_ms"]), + "progress_ms": int(ynison["player_state"]["status"]["progress_ms"]), + "entity_id": ynison["player_state"]["player_queue"]["entity_id"], + "entity_type": ynison["player_state"]["player_queue"]["entity_type"], + "device": [ + x + for x in ynison["devices"] + if x["info"]["device_id"] + == ynison.get("active_device_id_optional", "") + ], + "track": { + "track_id": track_object.track_id, + "album_id": track_object.albums[0].id, + "title": track_object.title, + "artist": track_object.artists_name(), + "img": f"https://{track_object.cover_uri[:-2]}1000x1000", + "duration": track_object.duration_ms // 1000, + "minutes": round(track_object.duration_ms / 1000) // 60, + "seconds": round(track_object.duration_ms / 1000) % 60, + "download_link": ( + ( + await ym_client.tracks_download_info( + track_object.track_id, get_direct_links=True + ) + )[0].direct_link + ), + }, + } + if raw_track["playable_type"] != "LOCAL_TRACK" + else {} ) - - async def on_dlmod(self): - if not self.get("guide_send", False): - await self.inline.bot.send_message(self._tg_id, self.strings("iguide")) - self.set("guide_send", True) - - async def client_ready(self, client, db): - self._client = client - self._db = db - - me = await self._client.get_me() - self._premium = me.premium if hasattr(me, "premium") else False - self.premium_check.start() - - if self.get("autobio", False): - self.autobio.start() - - - @loader.loop(1800) - async def premium_check(self): - me = await self._client.get_me() - self._premium = me.premium if hasattr(me, "premium") else False - - - @loader.loop(30) - async def autobio(self): - if not self.config['token']: - self.autobio.stop(); self.set("autobio", False) - return - ym = await YandexMusic(self.config['token']).init() - now = await ym.get_now_playing() - if now and (not now['paused']): - out = self.config['autobio'].format( - title=now['track'].title, - artist=", ".join([x.name for x in now['track'].artists]) - )[:(140 if self._premium else 70)] - try: - await self._client( - telethon.functions.account.UpdateProfileRequest(about=out) - ) - except telethon.errors.rpcerrorlist.FloodWaitError as e: - logger.info(f"Sleeping {max(e.seconds, 60)} because of floodwait") - await asyncio.sleep(max(e.seconds, 60)) - - - @loader.command( - ru_doc="👉 Гайд по получению токена Яндекс.Музыки", - alias="yg" - ) - async def yguidecmd(self, message: telethon.types.Message): - """👉 Guide for obtaining a Yandex.Music token""" - await utils.answer(message, self.strings("guide")) - - - @loader.command( - ru_doc="👉 Включить/выключить автобио", - alias="yb" - ) - async def ybiocmd(self, message: telethon.types.Message): - """👉 Enable/disable autobio""" - - if (not self.config['token']) and self.get("autobio", False): - return await utils.answer(message, self.strings("no_token")) - - bio = not self.get("autobio", False) - self.set("autobio", bio) - if bio: self.autobio.start() - else: - self.autobio.stop() - try: - await self._client( - telethon.functions.account.UpdateProfileRequest( - about=self.config['no_playing_bio'][:(140 if self._premium else 70)] - ) - ) - except: pass - - await utils.answer( - message, - self.strings("autobio")['e' if bio else 'd'] - ) - - - @loader.command( - ru_doc="👉 Получить трек, который играет сейчас (с файлом трека)", - alias="ynt" - ) - async def ynowtcmd(self, message: telethon.types.Message): - """👉 Get now playing track (with track file)""" - - if not self.config['token']: - return await utils.answer(message, self.strings("no_token")) - ym = await YandexMusic(self.config['token']).init() - now = await ym.get_now_playing() - if not now: - return await utils.answer(message, self.strings("there_is_no_playing")) - if now['entity_type'] not in self.strings("queue_types").keys(): - now['entity_type'] = "VARIOUS" - - playlist_name = "" - if now['entity_type'] == "PLAYLIST": - playlist = (await ym.client.playlists_list(now['entity_id']))[0] - playlist_name = f"{playlist.title}" - if now['entity_type'] == "ALBUM": - album = (await ym.client.albums(now['entity_id']))[0] - playlist_name = f"{album.title}" - if now['entity_type'] == "ARTIST": - artist = (await ym.client.artists(now['entity_id']))[0] - playlist_name = f"{artist.name}" - - device, volume = "Unknown Device", "❓" - if now['device']: - device=now['device'][0]['info']['title'] - volume=round(now['device'][0]['volume']*100, 2) - - out = self.config['now_playing_text'].format( - title=now['track'].title, - performer=", ".join([x.name for x in now['track'].artists]), - device=device, volume=volume, - playing_from=self.strings("queue_types").get(now['entity_type']).format(playlist_name), - track_id=now['track'].id, - album_id=now['track'].albums[0].id, - link=f"Яндекс.Музыка" - ) - await utils.answer( - message, out+self.strings("downloading") - ) - - audio = io.BytesIO((await utils.run_sync(requests.get, (await ym.client.tracks_download_info(now['track'].id, get_direct_links=True))[0].direct_link)).content) - audio.name = "audio.mp3" - await utils.answer( - message=message, response=out, - file=audio, - attributes=([ - telethon.types.DocumentAttributeAudio( - duration=now['track'].duration_ms // 1000, - title=now['track'].title, - performer=", ".join([x.name for x in now['track'].artists]) - ) - ]) - ) - - - @loader.command( - ru_doc="👉 Получить баннер трека, который играет сейчас", - alias="yn" - ) - async def ynowcmd(self, message: telethon.types.Message): - """👉 Get now playing track's banner""" - - if not self.config['token']: - return await utils.answer(message, self.strings("no_token")) - ym = await YandexMusic(self.config['token']).init() - now = await ym.get_now_playing() - if not now: - return await utils.answer(message, self.strings("there_is_no_playing")) - if now['entity_type'] not in self.strings("queue_types").keys(): - now['entity_type'] = "VARIOUS" - - playlist_name = "" - if now['entity_type'] == "PLAYLIST": - playlist = (await ym.client.playlists_list(now['entity_id']))[0] - playlist_name = f"{playlist.title}" - if now['entity_type'] == "ALBUM": - album = (await ym.client.albums(now['entity_id']))[0] - playlist_name = f"{album.title}" - if now['entity_type'] == "ARTIST": - artist = (await ym.client.artists(now['entity_id']))[0] - playlist_name = f"{artist.name}" - - device, volume = "Unknown Device", "❓" - if now['device']: - device=now['device'][0]['info']['title'] - volume=round(now['device'][0]['volume']*100, 2) - - out = self.config['now_playing_text'].format( - title=now['track'].title, - performer=", ".join([x.name for x in now['track'].artists]), - device=device, volume=volume, - playing_from=self.strings("queue_types").get(now['entity_type']).format(playlist_name), - track_id=now['track'].id, - album_id=now['track'].albums[0].id, - link=f"Яндекс.Музыка" - ) - await utils.answer( - message, out+self.strings("uploading_banner") - ) - - lyrics = await ym.get_lyrics(now['track'].id, True) - func = self.__create_banner if self.config['banner_version'] == "new" else self.__create_banner_old - file = func( - now['track'].title, [x.name for x in now['track'].artists], - now['duration_ms'], now['progress_ms'], - requests.get(f"https://{now['track'].cover_uri[:-2]}1000x1000").content, - lyrics['text'] if lyrics else None - ) - await utils.answer( - message=message, response=out, file=file - ) - - - @loader.command( - ru_doc="👉 Лайкнуть играющий сейчас трек" - ) - async def ylikecmd(self, message: telethon.types.Message): - """👉 Like now playing track's banner""" - - if not self.config['token']: - return await utils.answer(message, self.strings("no_token")) - ym = await YandexMusic(self.config['token']).init() - now = await ym.get_now_playing() - if not now: - return await utils.answer(message, self.strings("there_is_no_playing")) - - await ym.client.users_likes_tracks_add(now['track'].id) - await utils.answer( - message, self.strings("likes")['liked'].format( - track_id=now['track'].id, album_id=now['track'].albums[0].id, - track=f"{', '.join([x.name for x in now['track'].artists])} — {now['track'].title}" - ) - ) - - @loader.command( - ru_doc="👉 Убрать лайк с играющего сейчас трека" - ) - async def yunlikecmd(self, message: telethon.types.Message): - """👉 Unlike now playing track""" - - if not self.config['token']: - return await utils.answer(message, self.strings("no_token")) - ym = await YandexMusic(self.config['token']).init() - now = await ym.get_now_playing() - if not now: - return await utils.answer(message, self.strings("there_is_no_playing")) - - await ym.client.users_likes_tracks_remove(now['track'].id) - await utils.answer( - message, self.strings("likes")['unliked'].format( - track_id=now['track'].id, album_id=now['track'].albums[0].id, - track=f"{', '.join([x.name for x in now['track'].artists])} — {now['track'].title}" - ) - ) - - @loader.command( - ru_doc="👉 Дизлайкнуть играющий сейчас трек", - alias="ydis" - ) - async def ydislikecmd(self, message: telethon.types.Message): - """👉 Dislike now playing track""" - - if not self.config['token']: - return await utils.answer(message, self.strings("no_token")) - ym = await YandexMusic(self.config['token']).init() - now = await ym.get_now_playing() - if not now: - return await utils.answer(message, self.strings("there_is_no_playing")) - - await ym.client.users_dislikes_tracks_add(now['track'].id) - await utils.answer( - message, self.strings("likes")['disliked'].format( - track_id=now['track'].id, album_id=now['track'].albums[0].id, - track=f"{', '.join([x.name for x in now['track'].artists])} — {now['track'].title}" - ) - ) - - - @loader.command( - ru_doc="👉 Получить текст играющего сейчас трека" - ) - async def ylyricscmd(self, message: telethon.types.Message): - """👉 Get lyrics of the now playing track""" - - if not self.config['token']: - return await utils.answer(message, self.strings("no_token")) - ym = await YandexMusic(self.config['token']).init() - now = await ym.get_now_playing() - if not now: - return await utils.answer(message, self.strings("there_is_no_playing")) - - lyrics = await ym.get_lyrics(now['playable_id']) - if lyrics: - await utils.answer( - message, self.strings("lyrics").format( - track_id=now['track'].id, album_id=now['track'].albums[0].id, - track=f"{', '.join([x.name for x in now['track'].artists])} — {now['track'].title}", - text=lyrics['text'], - writers=", ".join(lyrics['writers']) - ) - ) - else: - await utils.answer( - message, self.strings("no_lyrics").format( - track_id=now['track'].id, album_id=now['track'].albums[0].id, - track=f"{', '.join([x.name for x in now['track'].artists])} — {now['track'].title}" - ) - ) - - - @loader.command( - ru_doc="<запрос> 👉 Поиск трека в Яндекс.Музыке", - alias="yq" - ) - async def ysearchcmd(self, message: telethon.types.Message): - """ 👉 Search track in Yandex.Music""" - - if not self.config['token']: - return await utils.answer(message, self.strings("no_token")) - ym = await YandexMusic(self.config['token']).init() - - query = utils.get_args_raw(message) - if not query: - await utils.answer(message, self.strings("args")) - return - - message = await utils.answer(message, self.strings("searching")) - - search = await ym.client.search(query, type_="track") - if (not search.tracks) or (len(search.tracks.results) == 0): - return await utils.answer(message, self.strings("404")) - - out = self.strings("search").format( - title=search.tracks.results[0].title + ( - f" ({search.tracks.results[0].version})" if search.tracks.results[0].version else "" - ), - performer=", ".join([x.name for x in search.tracks.results[0].artists]), - album_id=search.tracks.results[0].albums[0].id, track_id=search.tracks.results[0].id - ) - message = await utils.answer(message, out+self.strings("downloading")) - - info = await ym.client.tracks_download_info(search.tracks.results[0].id, True) - link = info[0].direct_link - audio = None - audio = io.BytesIO((await utils.run_sync(requests.get, link)).content) - audio.name = "audio.mp3" - - await utils.answer( - message=message, response=out, - file=audio, - attributes=([ - telethon.types.DocumentAttributeAudio( - duration=int(search.tracks.results[0].duration_ms / 1000), - title=search.tracks.results[0].title, - performer=", ".join([x.name for x in search.tracks.results[0].artists]) - ) - ]) - ) - - - def __create_banner( - self, - title: str, artists: list, - duration: int, progress: int, - track_cover: bytes, lyrics: str, - ): - # ——————————————— CONSTS ——————————————— - W, H = 1920, 768 - title_font = ImageFont.truetype(io.BytesIO(requests.get( - "https://raw.githubusercontent.com/kamekuro/assets/master/fonts/Onest-Bold.ttf" - ).content), 55) - artist_font = ImageFont.truetype(io.BytesIO(requests.get( - "https://raw.githubusercontent.com/kamekuro/assets/master/fonts/Onest-Bold.ttf" - ).content), 46) - time_font = ImageFont.truetype(io.BytesIO(requests.get( - "https://raw.githubusercontent.com/kamekuro/assets/master/fonts/Onest-Bold.ttf" - ).content), 36) - lyrics_font = ImageFont.truetype(io.BytesIO(requests.get( - "https://raw.githubusercontent.com/kamekuro/assets/master/fonts/YSMusic-HeadlineBold.ttf" - ).content), 75) - nlyrics_font = ImageFont.truetype(io.BytesIO(requests.get( - "https://raw.githubusercontent.com/kamekuro/assets/master/fonts/YSMusic-HeadlineBold.ttf" - ).content), 60) - def measure(t: str, f: ImageFont.FreeTypeFont, d: ImageDraw.ImageDraw): - bb = d.textbbox((0, 0), t, font=f) - return bb[2] - bb[0], bb[3] - bb[1] - - # ——————————————— BACKGROUND ——————————————— - track_cov = Image.open(io.BytesIO(track_cover)).convert("RGBA") - banner = ( - track_cov.resize((W, W)) - .crop((0, (W-H) // 2, W, ((W-H) // 2) + H)) - .filter(ImageFilter.GaussianBlur(radius=14)) - ) - banner = ImageEnhance.Brightness(banner).enhance(0.3) - draw = ImageDraw.Draw(banner) - - # ——————————————— TRACK COVER ——————————————— - track_cov = track_cov.resize((H-350, H-350)) - mask = Image.new("L", track_cov.size, 0) - ImageDraw.Draw(mask).rounded_rectangle( - (0, 0, track_cov.size[0], track_cov.size[1]), radius=35, fill=255 - ) - track_cov.putalpha(mask) - track_cov = track_cov.crop(track_cov.getbbox()) - banner.paste(track_cov, (175, 175), mask) - - # ——————————————— ARTIST & TITLE ——————————————— - text_width, _ = measure(f"{', '.join(artists)} — {title}", title_font, draw) - if text_width > 1680: - lines = [f"{title}", f"{', '.join(artists)}"] - lsizes = [measure(lines[0], title_font, draw), measure(lines[1], artist_font, draw)] - else: - lines = [f"{', '.join(artists)} — {title}"] - lsizes = [measure(lines[0], title_font, draw)] - text_h = sum(th for _, th in lsizes) + (len(lines) - 1) - text_y = (150 - text_h) / 2 - for i, (l, (lw, lh)) in enumerate(zip(lines, lsizes)): - if len(lines) == 2 and i == 1: - ftu = artist_font - else: - ftu = title_font - if lw > 1680: - while lw > 1680 and len(l) > 3: - l = l[:-4] + "…" - lw, _ = measure(l, ftu, draw) - tx = (W - lw) / 2 - draw.text((tx, text_y), l, font=ftu, fill="#A0A0A0") - text_y += lh + 5 - - # ——————————————— LYRICS ——————————————— - if lyrics: - lyrics_lines = [] - for match in re.finditer(r"\[(\d{2}):(\d{2}\.\d{2})\] (.+)", lyrics): - minutes = int(match.group(1)) - seconds = float(match.group(2)) - text = match.group(3) - time_ms = int((minutes * 60 + seconds) * 1000) - lyrics_lines.append((time_ms, text)) - llast, lnext = "", "" - for i, (time_ms, text) in enumerate(lyrics_lines): - if time_ms <= progress: - llast = text - if i+1 < len(lyrics_lines): - lnext = lyrics_lines[i+1][1] - else: - break - y_start = None - if llast: - lines = textwrap.wrap(llast, width=23) - if len(lines) > 3: - lines = lines[:3] - lines[-1] += "…" - lines_sizes = [draw.textbbox((0, 0), l, font=lyrics_font) for l in lines] - line_heights = [bb[3] - bb[1] for bb in lines_sizes] - total_text_height = sum(line_heights) + (len(lines) - 1) * 10 - y_start = (150 + (track_cov.size[0]-total_text_height)) / 2 - for i, line in enumerate(lines): - lw = lines_sizes[i][2] - lines_sizes[i][0] - tx = (track_cov.size[0]+325 + ((W-track_cov.size[0]+285) - lw)) / 2 - draw.text((tx, y_start), line, font=lyrics_font, fill="#FFFFFF") - y_start += line_heights[i] + 10 - if lnext: - next_lines = textwrap.wrap(lnext, width=23) - if len(next_lines) > 2: - next_lines = next_lines[:2] - next_lines[-1] += "…" - next_sizes = [draw.textbbox((0, 0), l, font=nlyrics_font) for l in next_lines] - next_heights = [bb[3] - bb[1] for bb in next_sizes] - total_text_height = sum(next_heights) + (len(next_lines) - 1) * 10 - if not y_start: - y_start = (150 + (track_cov.size[0] - total_text_height))/2 + 150 - for j, line in enumerate(next_lines): - lw = next_sizes[j][2] - next_sizes[j][0] - tx = (track_cov.size[0] + 325 + ((W - track_cov.size[0] + 285) - lw)) / 2 - draw.text((tx, y_start + 40), line, font=nlyrics_font, fill="#A0A0A0") - y_start += next_heights[j] + 10 - - # ——————————————— STATUS BAR ——————————————— - draw.rounded_rectangle([75, 700, 768 + 1072, 700 + 15], radius=15 // 2, fill="#A0A0A0") - draw.rounded_rectangle([75, 700, 768 + int(1072 * (progress / duration)), 700 + 15], radius=15 // 2, fill="#FFFFFF") - draw.text((75, 650), f"{(progress//1000//60):02}:{(progress//1000%60):02}", font=time_font, fill="#FFFFFF") - draw.text((1745, 650), f"{(duration//1000//60):02}:{(duration//1000%60):02}", font=time_font, fill="#FFFFFF") - - # ——————————————— SAVE ——————————————— - by = io.BytesIO() - banner.save(by, format="PNG"); by.seek(0) - by.name = "banner.png" - return by - - - def __create_banner_old( - self, - title: str, artists: list, - duration: int, progress: int, - track_cover: bytes, - *args, **kwargs - ): - w, h = 1920, 768 - title_font = ImageFont.truetype(io.BytesIO(requests.get( - "https://raw.githubusercontent.com/kamekuro/assets/master/fonts/Onest-Bold.ttf" - ).content), 80) - art_font = ImageFont.truetype(io.BytesIO(requests.get( - "https://raw.githubusercontent.com/kamekuro/assets/master/fonts/Onest-Regular.ttf" - ).content), 55) - time_font = ImageFont.truetype(io.BytesIO(requests.get( - "https://raw.githubusercontent.com/kamekuro/assets/master/fonts/Onest-Bold.ttf" - ).content), 36) - - # Gen banner (bg) - track_cov = Image.open(io.BytesIO(track_cover)).convert("RGBA") - banner = track_cov.resize((w, w)).crop( - (0, (w-h)//2, w, ((w-h)//2)+h) - ).filter(ImageFilter.GaussianBlur(radius=14)) - banner = ImageEnhance.Brightness(banner).enhance(0.3) - - # Gen track cover and put to bg - track_cov = track_cov.resize((banner.size[1]-150, banner.size[1]-150)) - mask = Image.new("L", track_cov.size, 0) - ImageDraw.Draw(mask).rounded_rectangle((0, 0, track_cov.size[0], track_cov.size[1]), radius=35, fill=255) - track_cov.putalpha(mask) - track_cov = track_cov.crop(track_cov.getbbox()) - banner.paste(track_cov, (75, 75), mask) - - # Editing text - title_lines = textwrap.wrap(title, 23) - if len(title_lines) > 1: - title_lines[1] = title_lines[1] + "..." if len(title_lines) > 2 else title_lines[1] - title_lines = title_lines[:2] - artists_lines = textwrap.wrap(" • ".join(artists), width=40) - if len(artists_lines) > 1: - for index, art in enumerate(artists_lines): - if "•" in art[-2:]: - artists_lines[index] = art[:art.rfind("•") - 1] - - # Put title and artists to banner - draw = ImageDraw.Draw(banner) - x, y = 150+track_cov.size[0], 110 - for index, line in enumerate(title_lines): - draw.text((x, y), line, font=title_font, fill="#FFFFFF") - if index != len(title_lines)-1: - y += 70 - x, y = 150+track_cov.size[0], 110*2 - if len(title_lines) > 1: y += 70 - for index, line in enumerate(artists_lines): - draw.text((x, y), line, font=art_font, fill="#A0A0A0") - if index != len(artists_lines)-1: - y += 50 - - # Drawing status bar - draw.rounded_rectangle([768, 650, 768+1072, 650+15], radius=15//2, fill="#A0A0A0") - draw.rounded_rectangle([768, 650, 768+int(1072*(progress/duration)), 650+15], radius=15//2, fill="#FFFFFF") - draw.text((768, 600), f"{(progress//1000//60):02}:{(progress//1000%60):02}", font=time_font, fill="#FFFFFF") - draw.text((1745, 600), f"{(duration//1000//60):02}:{(duration//1000%60):02}", font=time_font, fill="#FFFFFF") - - by = io.BytesIO() - banner.save(by, format="PNG"); by.seek(0) - by.name = "banner.png" - return by \ No newline at end of file