diff --git a/KorenbZla/HikkaModules/AuroraBull.py b/KorenbZla/HikkaModules/AuroraBull.py
index 977ebec..8fb0442 100644
--- a/KorenbZla/HikkaModules/AuroraBull.py
+++ b/KorenbZla/HikkaModules/AuroraBull.py
@@ -23,7 +23,7 @@
# meta pic: https://i.postimg.cc/Hx3Zm8rB/logo.png
# meta banner: https://te.legra.ph/file/7612b5506856c1eb34c56.jpg
-version = (1, 0, 0)
+__version__ = (1, 1, 0)
import json
import aiohttp
@@ -42,7 +42,7 @@ class AuroraBullMod(loader.Module):
"error_decoding": "Error: The JSON could not be decoded.",
"error_uploading_data": "Error loading data",
"error_valid_args": "Please enter valid arguments!",
- "launched": "AuroraBull launched!\n\nUse .abulloff to stop the attack.",
+ "launched": "AuroraBull launched!\n\nUse {prefix}abulloff to stop the attack.",
"stopped": "AuroraBull has stopped.",
}
@@ -51,7 +51,7 @@ class AuroraBullMod(loader.Module):
"error_decoding": "Error: не удалось декодировать JSON.",
"error_uploading_data": "Ошибка при загрузке данных",
"error_valid_args": "Введите корректные аргументы!",
- "launched": "AuroraBull запущен!\n\nИспользуйте .abulloff, чтобы остановить атаку.",
+ "launched": "AuroraBull запущен!\n\nИспользуйте {prefix}abulloff, чтобы остановить атаку.",
"stopped": "AuroraBull остановлен.",
}
@@ -60,7 +60,7 @@ class AuroraBullMod(loader.Module):
"error_decoding": "Error: JSON декодлаш муваффақиятли амалга ошмади.",
"error_uploading_data": "Маълумотлар юклаб олинмади",
"error_valid_args": "Iltimos, to'g'ri dalillarni kiriting!",
- "launched": "AuroraBull ishga tushirildi!\n\nHujumni toʻxtatish uchun .abulloff dan foydalaning.",
+ "launched": "AuroraBull ishga tushirildi!\n\nHujumni toʻxtatish uchun {prefix}abulloff dan foydalaning.",
"stopped": "AuroraBull to'xtadi.",
}
@@ -69,7 +69,7 @@ class AuroraBullMod(loader.Module):
"error_decoding": "Error: JSON konnte nicht decodiert werden.",
"error_uploading_data": "Fehler beim Hochladen der Daten",
"error_valid_args": "Bitte geben Sie gültige Argumente ein!",
- "launched": "AuroraBull gestartet!\n\nVerwenden Sie .abulloff, um den Angriff zu stoppen.",
+ "launched": "AuroraBull gestartet!\n\nVerwenden Sie {prefix}abulloff, um den Angriff zu stoppen.",
"stopped": "AuroraBull hat angehalten.",
}
@@ -78,7 +78,7 @@ class AuroraBullMod(loader.Module):
"error_decoding": "Error: No se pudo decodificar JSON.",
"error_uploading_data": "Error al cargar los datos",
"error_valid_args": "¡Por favor ingrese argumentos válidos!",
- "launched": "¡AuroraBull lanzado!\n\nUtiliza .abulloff para detener el ataque.",
+ "launched": "¡AuroraBull lanzado!\n\nUtiliza {prefix}abulloff para detener el ataque.",
"stopped": "AuroraBull se ha detenido.",
}
@@ -133,7 +133,7 @@ class AuroraBullMod(loader.Module):
await utils.answer(message, self.strings("error_valid_args"))
return
- await utils.answer(message, self.strings("launched"))
+ await utils.answer(message, self.strings("launched").format(prefix=self.get_prefix()))
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
@@ -146,10 +146,11 @@ class AuroraBullMod(loader.Module):
bull_text = choice(data["BullText"])
await message.respond(text + bull_text)
await asyncio.sleep(time)
- else:
- await utils.answer(message, self.strings("error_key"))
+ return
else:
- await utils.answer(message, f"{self.strings('error_uploading_data')}: {response.status}")
+ return await utils.answer(message, self.strings("error_key"))
+ else:
+ return await utils.answer(message, f"{self.strings('error_uploading_data')}: {response.status}")
@loader.command(
ru_doc="Остановить оскорбления",
diff --git a/KorenbZla/HikkaModules/IrisFarm.py b/KorenbZla/HikkaModules/IrisFarm.py
index 5ea6744..c312351 100644
--- a/KorenbZla/HikkaModules/IrisFarm.py
+++ b/KorenbZla/HikkaModules/IrisFarm.py
@@ -23,7 +23,7 @@
# meta pic: https://i.postimg.cc/Hx3Zm8rB/logo.png
# meta banner: https://te.legra.ph/file/1d547b05f967c9681b90a.jpg
-__version__ = (3, 1, 1)
+__version__ = (3, 2, 0)
import asyncio
import random
@@ -97,7 +97,7 @@ class IrisFarmMod(loader.Module):
),
loader.ConfigValue(
"random_interval",
- False,
+ True,
lambda: self.strings["cfg_random_interval"],
validator=loader.validators.Boolean()
)
@@ -112,8 +112,8 @@ class IrisFarmMod(loader.Module):
"""{on/off} - turn auto farm on or off"""
args = utils.get_args_raw(message).lower()
- status_result_True = self.db.get("AuroraIrisFarm", "status", True)
- if status_result_True:
+ status_result = self.db.get("AuroraIrisFarm", "status")
+ if status_result is True:
status_result = self.strings("s_1")
else:
status_result = self.strings("s_0")
diff --git a/Ruslan-Isaev/modules/SFTPUploader.py b/Ruslan-Isaev/modules/SFTPUploader.py
deleted file mode 100644
index 8cdd8e9..0000000
--- a/Ruslan-Isaev/modules/SFTPUploader.py
+++ /dev/null
@@ -1,114 +0,0 @@
-# -*- coding: utf-8 -*-
-version = (1, 0, 0)
-
-# meta developer: @RUIS_VlP
-
-import random
-from datetime import timedelta
-from telethon import TelegramClient, events
-from telethon import functions
-from telethon.tl.types import Message
-import os
-from .. import loader, utils
-
-import paramiko
-
-# requires: paramiko
-
-def upload_file_sftp(host, port, username, password, local_file, remote_file):
- try:
- # Создаем экземпляр SSHClient
- client = paramiko.SSHClient()
-
- # Загружаем параметры по умолчанию
- client.load_system_host_keys()
-
- # Разрешаем соединение с сервером, если ключа нет в системе
- client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
-
- # Подключаемся к серверу
- client.connect(hostname=host, port=port, username=username, password=password)
-
- # Открываем SFTP сессию
- sftp = client.open_sftp()
-
- try:
- sftp.listdir("SFTP_files")
- except IOError:
- sftp.mkdir("SFTP_files")
-
- # Загружаем файл
- sftp.put(local_file, remote_file)
-
- print(f'Файл {local_file} успешно загружен на {remote_file}')
-
- except Exception as e:
- print(f'Произошла ошибка: {e}')
- finally:
- # Закрываем SFTP сессию и SSH соединение
- if 'sftp' in locals():
- sftp.close()
- client.close()
-
-
-@loader.tds
-class SFTPUploaderMod(loader.Module):
- """Загрузка файлов на SFTP"""
-
- strings = {
- "name": "SFTPUploader",
- }
-
- def __init__(self):
- self.config = loader.ModuleConfig(
- loader.ConfigValue(
- "host",
- "None",
- "IP address or domain",
- validator=loader.validators.String()
- ),
- loader.ConfigValue(
- "username",
- "None",
- "SFTP username",
- validator=loader.validators.String()
- ),
- loader.ConfigValue(
- "password",
- "None",
- "SFTP password",
- validator=loader.validators.Hidden()
- ),
- loader.ConfigValue(
- "Port",
- 22,
- "SFTP port",
- validator=loader.validators.String()
- ),
- )
-
- @loader.command()
- async def sftp(self, message):
- """ - загружает файл на SFPT"""
- host = self.config["host"] or "None"
- username = self.config["username"] or "None"
- password = self.config["password"] or "None"
- port = self.config["Port"] or "None"
- if host == "None" or username == "None" or password == "None" or port == "None":
- await utils.answer(message, "Значения не указаны. Укажите их через команду:\n.config SFTPUploader")
- return
- reply = await message.get_reply_message()
- if reply:
- if reply.media:
- await utils.answer(message, f"Начинаю загрузку....")
- file_path = await message.client.download_media(reply.media)
- sftp_path = f"SFTP_files/{file_path}"
- upld = upload_file_sftp(host, port, username, password, file_path, sftp_path)
- os.remove(file_path)
- await utils.answer(message, f"Файл загружен на SFTP сервер(не факт), расположение файла: ~/SFTP_files/{file_path}")
- else:
- await utils.answer(message, "В сообщении не найдены файлы!")
- else:
- await utils.answer(message, "Команда должна быть ответом на сообщение!")
- return
-
\ No newline at end of file
diff --git a/Ruslan-Isaev/modules/ssh.py b/Ruslan-Isaev/modules/ssh.py
index 52ff91c..abbe2f8 100644
--- a/Ruslan-Isaev/modules/ssh.py
+++ b/Ruslan-Isaev/modules/ssh.py
@@ -1,207 +1,480 @@
-version = (1, 0, 0)
+version = (2, 0, 0)
# meta developer: @RUIS_VlP
# requires: paramiko
-import random
-from datetime import timedelta
-from telethon import TelegramClient, events
-from telethon import functions
-from telethon.tl.types import Message
+import asyncio
import os
+import random
+import string
from .. import loader, utils
import paramiko
-def upload_file_sftp(host, port, username, password, local_file, remote_file):
- try:
- # Создаем экземпляр SSHClient
- client = paramiko.SSHClient()
-
- # Загружаем параметры по умолчанию
- client.load_system_host_keys()
-
- # Разрешаем соединение с сервером, если ключа нет в системе
- client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
-
- # Подключаемся к серверу
- client.connect(hostname=host, port=port, username=username, password=password)
-
- # Открываем SFTP сессию
- sftp = client.open_sftp()
-
- try:
- sftp.listdir("sshmod")
- except IOError:
- sftp.mkdir("sshmod")
-
- # Загружаем файл
- sftp.put(local_file, remote_file)
-
- print(f'Файл {local_file} успешно загружен на {remote_file}')
-
- except Exception as e:
- print(f'Произошла ошибка: {e}')
- finally:
- # Закрываем SFTP сессию и SSH соединение
- if 'sftp' in locals():
- sftp.close()
- client.close()
-def execute_ssh_command(host, port, username, password, command):
- try:
- # Создаем экземпляр SSHClient
- client = paramiko.SSHClient()
-
- # Загружаем параметры по умолчанию
- client.load_system_host_keys()
-
- # Разрешаем соединение с сервером, если ключа нет в системе
- client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
-
- # Подключаемся к серверу
- client.connect(hostname=host, port=port, username=username, password=password)
-
- # Выполняем команду
- stdin, stdout, stderr = client.exec_command(command)
-
- # Получаем вывод и ошибки
- output = stdout.read().decode()
- error = stderr.read().decode()
- exit_code = stdout.channel.recv_exit_status()
-
- return exit_code, output, error
-
- except Exception as e:
- print(f'Произошла ошибка: {e}')
- return None, None, str(e)
- finally:
- # Закрываем SSH соединение
- client.close()
+class SSHConnection:
+
+ def __init__(self, host, port, username, password=None, key_path=None):
+ self.host = host
+ self.port = port
+ self.username = username
+ self.password = password
+ self.key_path = key_path
+ self.client = None
+
+ async def connect(self):
+ loop = asyncio.get_event_loop()
+ await loop.run_in_executor(None, self._connect_sync)
+
+ def _connect_sync(self):
+ self.client = paramiko.SSHClient()
+ self.client.load_system_host_keys()
+ self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+
+ connect_kwargs = {
+ 'hostname': self.host,
+ 'port': self.port,
+ 'username': self.username
+ }
+
+ if self.key_path and self.key_path != "None":
+ try:
+ private_key = paramiko.RSAKey.from_private_key_file(self.key_path)
+ connect_kwargs['pkey'] = private_key
+ except:
+ try:
+ private_key = paramiko.Ed25519Key.from_private_key_file(self.key_path)
+ connect_kwargs['pkey'] = private_key
+ except:
+ private_key = paramiko.ECDSAKey.from_private_key_file(self.key_path)
+ connect_kwargs['pkey'] = private_key
+ else:
+ connect_kwargs['password'] = self.password
+
+ self.client.connect(**connect_kwargs)
+
+ async def upload_file(self, local_file, remote_file):
+ loop = asyncio.get_event_loop()
+ await loop.run_in_executor(None, self._upload_file_sync, local_file, remote_file)
+
+ def _upload_file_sync(self, local_file, remote_file):
+ sftp = self.client.open_sftp()
+
+ remote_dir = os.path.dirname(remote_file)
+ if remote_dir:
+ self._create_remote_dir(sftp, remote_dir)
+
+ sftp.put(local_file, remote_file)
+ sftp.close()
+
+ def _create_remote_dir(self, sftp, path):
+ dirs = []
+ while path and path != '/':
+ try:
+ sftp.stat(path)
+ break
+ except IOError:
+ dirs.append(path)
+ path = os.path.dirname(path)
+
+ while dirs:
+ dir_path = dirs.pop()
+ try:
+ sftp.mkdir(dir_path)
+ except IOError:
+ pass
+
+ async def download_file(self, remote_file, local_file):
+ loop = asyncio.get_event_loop()
+ await loop.run_in_executor(None, self._download_file_sync, remote_file, local_file)
+
+ def _download_file_sync(self, remote_file, local_file):
+ sftp = self.client.open_sftp()
+ sftp.get(remote_file, local_file)
+ sftp.close()
+
+ async def execute_command_stream(self, command, callback):
+ loop = asyncio.get_event_loop()
+ return await loop.run_in_executor(
+ None,
+ self._execute_command_stream_sync,
+ command,
+ callback
+ )
+
+ def _execute_command_stream_sync(self, command, callback):
+ stdin, stdout, stderr = self.client.exec_command(command, get_pty=True)
+
+ channel = stdout.channel
+ pid = channel.get_id()
+
+ output_lines = []
+ error_lines = []
+
+ while not stdout.channel.exit_status_ready() or stdout.channel.recv_ready():
+ if stdout.channel.recv_ready():
+ line = stdout.readline()
+ if line:
+ output_lines.append(line)
+ callback('stdout', line, pid)
+
+ remaining = stdout.read().decode()
+ if remaining:
+ output_lines.append(remaining)
+ callback('stdout', remaining, pid)
+
+ error_output = stderr.read().decode()
+ if error_output:
+ error_lines.append(error_output)
+ callback('stderr', error_output, pid)
+
+ exit_code = stdout.channel.recv_exit_status()
+
+ return exit_code, ''.join(output_lines), ''.join(error_lines), pid
+
+ def close(self):
+ if self.client:
+ self.client.close()
+
@loader.tds
class SSHMod(loader.Module):
- """SSH module for uploading files and executing commands"""
+ """Модуль для работы с SSH"""
- strings = {
- "name": "SSHMod",
- "cfg_host": "IP address or domain",
- "cfg_username": "SSH username",
- "cfg_password": "SSH password",
- "cfg_port": "SSH port",
- "save_description": " - saves the file to the ~/sshmod directory",
- "save_uploading": "Starting upload....",
- "save_success": "File uploaded to SSH server, file location: ~/sshmod/{}",
- "save_no_file": "No files found in the message!",
- "save_reply_required": "The command must be a reply to a message!",
- "sterminal_description": " - executes a command on the SSH server",
- "sterminal_no_command": "No command specified!",
- "sterminal_output": "⌨️ System command\n{}
\nExit code: {}\n📼 Output:\n{}
",
- "sterminal_error": "⌨️ System command\n{}
\nExit code: {}\n🚫 Errors:\n{}
",
- "sterminal_output_and_error": "⌨️ System command\n{}
\nExit code: {}\n📼 Output:\n{}
\n🚫 Errors:\n{}
",
- "config_not_set": "Values are not set. Set them using the command:\n.config SSHMod",
- }
+ strings = {
+ "name": "SSHMod",
+ "cfg_host": "IP address or domain",
+ "cfg_username": "SSH username",
+ "cfg_password": "SSH password (leave None if using key)",
+ "cfg_key": "Path to private SSH key (leave None if using password)",
+ "cfg_port": "SSH port",
+ "cfg_default_dir": "Default directory for saving files",
+ "sftpsave_description": " [directory] - saves the file to the specified directory",
+ "sftpsave_uploading": "Starting upload....",
+ "sftpsave_success": "File uploaded to SSH server, file location: {}",
+ "sftpsave_no_file": "No files found in the message!",
+ "sftpsave_reply_required": "The command must be a reply to a message!",
+ "sftpupload_description": " - downloads file from SSH server",
+ "sftpupload_no_path": "No file path specified!",
+ "sftpupload_downloading": "Downloading file from SSH server...",
+ "sftpupload_error": "Error downloading file: {}",
+ "sterminal_description": " - executes a command on the SSH server",
+ "sterminal_no_command": "No command specified!",
+ "sterminal_starting": "⌨️ System command\n{}
\nPID: {}\nStatus: Running...\n📼 Output:\n{}
",
+ "sterminal_output": "⌨️ System command\n{}
\nPID: {}\nExit code: {}\n📼 Output:\n{}
",
+ "sterminal_error": "⌨️ System command\n{}
\nPID: {}\nExit code: {}\n🚫 Errors:\n{}
",
+ "sterminal_output_and_error": "⌨️ System command\n{}
\nPID: {}\nExit code: {}\n📼 Output:\n{}
\n🚫 Errors:\n{}
",
+ "sterminal_stopped": "⌨️ System command\n{}
\nPID: {}\nStatus: ⛔ Stopped by user\n📼 Output:\n{}
",
+ "addkey_description": " - saves SSH private key to .ssh directory",
+ "addkey_no_key": "No key content provided!",
+ "addkey_success": "Key saved successfully!\nKey file: {}\nFull path: {}\n\nYou can now set it in config:\n.fcfg SSHMod key_path {}",
+ "addkey_error": "Error saving key: {}",
+ "config_not_set": "Values are not set. Set them using the command:\n.config SSHMod",
+ "stop_button": "⛔ Stop",
+ }
- strings_ru = {
- "name": "SSHMod",
- "cfg_host": "IP-адрес или домен",
- "cfg_username": "Имя пользователя SSH",
- "cfg_password": "Пароль SSH",
- "cfg_port": "Порт SSH",
- "save_description": " - сохраняет файл в директорию ~/sshmod",
- "save_uploading": "Начинаю загрузку....",
- "save_success": "Файл загружен на SSH сервер, расположение файла: ~/sshmod/{}",
- "save_no_file": "В сообщении не найдены файлы!",
- "save_reply_required": "Команда должна быть ответом на сообщение!",
- "sterminal_description": " - выполняет команду на SSH сервере",
- "sterminal_no_command": "Не указана команда для выполнения!",
- "sterminal_output": "⌨️ Системная команда\n{}
\nКод выхода: {}\n📼 Вывод:\n{}
",
- "sterminal_error": "⌨️ Системная команда\n{}
\nКод выхода: {}\n🚫 Ошибки:\n{}
",
- "sterminal_output_and_error": "⌨️ Системная команда\n{}
\nКод выхода: {}\n📼 Вывод:\n{}
\n🚫 Ошибки:\n{}
",
- "config_not_set": "Значения не указаны. Укажите их через команду:\n.config SSHMod",
- }
+ strings_ru = {
+ "name": "SSHMod",
+ "cfg_host": "IP-адрес или домен",
+ "cfg_username": "Имя пользователя SSH",
+ "cfg_password": "Пароль SSH (оставьте None при использовании ключа)",
+ "cfg_key": "Путь к закрытому SSH ключу (оставьте None при использовании пароля)",
+ "cfg_port": "Порт SSH",
+ "cfg_default_dir": "Директория по умолчанию для сохранения файлов",
+ "sftpsave_description": " [директория] - сохраняет файл в указанную директорию",
+ "sftpsave_uploading": "Начинаю загрузку....",
+ "sftpsave_success": "Файл загружен на SSH сервер, расположение файла: {}",
+ "sftpsave_no_file": "В сообщении не найдены файлы!",
+ "sftpsave_reply_required": "Команда должна быть ответом на сообщение!",
+ "sftpupload_description": "<путь_к_файлу> - скачивает файл с SSH сервера",
+ "sftpupload_no_path": "Не указан путь к файлу!",
+ "sftpupload_downloading": "Скачиваю файл с SSH сервера...",
+ "sftpupload_error": "Ошибка при скачивании файла: {}",
+ "sterminal_description": " - выполняет команду на SSH сервере",
+ "sterminal_no_command": "Не указана команда для выполнения!",
+ "sterminal_starting": "⌨️ Системная команда\n{}
\nPID: {}\nСтатус: Выполняется...\n📼 Вывод:\n{}
",
+ "sterminal_output": "⌨️ Системная команда\n{}
\nPID: {}\nКод выхода: {}\n📼 Вывод:\n{}
",
+ "sterminal_error": "⌨️ Системная команда\n{}
\nPID: {}\nКод выхода: {}\n🚫 Ошибки:\n{}
",
+ "sterminal_output_and_error": "⌨️ Системная команда\n{}
\nPID: {}\nКод выхода: {}\n📼 Вывод:\n{}
\n🚫 Ошибки:\n{}
",
+ "sterminal_stopped": "⌨️ Системная команда\n{}
\nPID: {}\nСтатус: ⛔ Остановлено пользователем\n📼 Вывод:\n{}
",
+ "addkey_description": "<содержимое_ключа> - сохраняет SSH ключ в директорию .ssh",
+ "addkey_no_key": "Не указано содержимое ключа!",
+ "addkey_success": "Ключ успешно сохранён!\nИмя файла: {}\nПолный путь: {}\n\nДля применения напишите:\n.fcfg SSHMod key_path {}",
+ "addkey_error": "Ошибка при сохранении ключа: {}",
+ "config_not_set": "Значения не указаны. Укажите их через команду:\n.config SSHMod",
+ "stop_button": "⛔ Остановить",
+ }
- def __init__(self):
- self.config = loader.ModuleConfig(
- loader.ConfigValue(
- "host",
- "None",
- lambda: self.strings["cfg_host"],
- validator=loader.validators.String(),
- ),
- loader.ConfigValue(
- "username",
- "None",
- lambda: self.strings["cfg_username"],
- validator=loader.validators.String(),
- ),
- loader.ConfigValue(
- "password",
- "None",
- lambda: self.strings["cfg_password"],
- validator=loader.validators.Hidden(),
- ),
- loader.ConfigValue(
- "Port",
- 22,
- lambda: self.strings["cfg_port"],
- validator=loader.validators.String(),
- ),
- )
+ def __init__(self):
+ self.config = loader.ModuleConfig(
+ loader.ConfigValue(
+ "host",
+ "None",
+ lambda: self.strings["cfg_host"],
+ validator=loader.validators.String(),
+ ),
+ loader.ConfigValue(
+ "username",
+ "None",
+ lambda: self.strings["cfg_username"],
+ validator=loader.validators.String(),
+ ),
+ loader.ConfigValue(
+ "password",
+ "None",
+ lambda: self.strings["cfg_password"],
+ validator=loader.validators.Hidden(),
+ ),
+ loader.ConfigValue(
+ "key_path",
+ "None",
+ lambda: self.strings["cfg_key"],
+ validator=loader.validators.String(),
+ ),
+ loader.ConfigValue(
+ "Port",
+ 22,
+ lambda: self.strings["cfg_port"],
+ validator=loader.validators.Integer(),
+ ),
+ loader.ConfigValue(
+ "default_directory",
+ "sshmod",
+ lambda: self.strings["cfg_default_dir"],
+ validator=loader.validators.String(),
+ ),
+ )
+ self.active_tasks = {}
- @loader.command(alias="save")
- async def save(self, message):
- """ - saves the file to the ~/sshmod directory"""
- host = self.config["host"] or "None"
- username = self.config["username"] or "None"
- password = self.config["password"] or "None"
- port = self.config["Port"] or "None"
- if host == "None" or username == "None" or password == "None" or port == "None":
- await utils.answer(message, self.strings["config_not_set"])
- return
- reply = await message.get_reply_message()
- if reply:
- if reply.media:
- await utils.answer(message, self.strings["save_uploading"])
- file_path = await message.client.download_media(reply.media)
- sftp_path = f"sshmod/{os.path.basename(file_path)}"
- upload_file_sftp(host, port, username, password, file_path, sftp_path)
- os.remove(file_path)
- await utils.answer(
- message,
- self.strings["save_success"].format(os.path.basename(file_path)),
- )
- else:
- await utils.answer(message, self.strings["save_no_file"])
- else:
- await utils.answer(message, self.strings["save_reply_required"])
+ @loader.command()
+ async def sftpsave(self, message):
+ """ [dir] - сохраняет указанных файл на сервер"""
+ host = self.config["host"]
+ username = self.config["username"]
+ password = self.config["password"]
+ key_path = self.config["key_path"]
+ port = self.config["Port"]
+
+ if host == "None" or username == "None" or (password == "None" and key_path == "None"):
+ await utils.answer(message, self.strings["config_not_set"])
+ return
+
+ reply = await message.get_reply_message()
+ if not reply:
+ await utils.answer(message, self.strings["sftpsave_reply_required"])
+ return
+
+ if not reply.media:
+ await utils.answer(message, self.strings["sftpsave_no_file"])
+ return
+ args = utils.get_args_raw(message)
+ remote_dir = args if args else self.config["default_directory"]
+
+ await utils.answer(message, self.strings["sftpsave_uploading"])
+
+ file_path = await message.client.download_media(reply.media)
+ file_name = os.path.basename(file_path)
+ sftp_path = f"{remote_dir}/{file_name}"
+
+ conn = SSHConnection(host, port, username, password, key_path)
+ await conn.connect()
+ await conn.upload_file(file_path, sftp_path)
+ conn.close()
+
+ os.remove(file_path)
+
+ await utils.answer(
+ message,
+ self.strings["sftpsave_success"].format(sftp_path),
+ )
- @loader.command(alias="sterminal")
- async def sterminal(self, message):
- """ - executes a command on the SSH server"""
- host = self.config["host"] or "None"
- username = self.config["username"] or "None"
- password = self.config["password"] or "None"
- port = self.config["Port"] or "None"
- if host == "None" or username == "None" or password == "None" or port == "None":
- await utils.answer(message, self.strings["config_not_set"])
- return
- command = utils.get_args_raw(message)
- if not command:
- await utils.answer(message, self.strings["sterminal_no_command"])
- return
+ @loader.command()
+ async def sftpdownload(self, message):
+ """ - скачивает указанных файл с сервера"""
+ host = self.config["host"]
+ username = self.config["username"]
+ password = self.config["password"]
+ key_path = self.config["key_path"]
+ port = self.config["Port"]
+
+ if host == "None" or username == "None" or (password == "None" and key_path == "None"):
+ await utils.answer(message, self.strings["config_not_set"])
+ return
+
+ remote_path = utils.get_args_raw(message)
+ if not remote_path:
+ await utils.answer(message, self.strings["sftpupload_no_path"])
+ return
+
+ await utils.answer(message, self.strings["sftpupload_downloading"])
+
+ local_file = f"/tmp/sftp_download_{random.randint(1000, 9999)}_{os.path.basename(remote_path)}"
+
+ try:
+ conn = SSHConnection(host, port, username, password, key_path)
+ await conn.connect()
+ await conn.download_file(remote_path, local_file)
+ conn.close()
+
+ await utils.answer_file(
+ message,
+ local_file,
+ f"📥 File from SSH server: {remote_path}"
+ )
+
+ if os.path.exists(local_file):
+ os.remove(local_file)
+
+ except Exception as e:
+ await utils.answer(message, self.strings["sftpupload_error"].format(str(e)))
+ if os.path.exists(local_file):
+ os.remove(local_file)
- # Выполняем команду на SSH сервере
- exit_code, output, error = execute_ssh_command(host, port, username, password, command)
+ @loader.command()
+ async def addkey(self, message):
+ """<ключ> - сохраняет указанный ssh ключ"""
+ key_content = utils.get_args_raw(message)
+
+ if not key_content:
+ await utils.answer(message, self.strings["addkey_no_key"])
+ return
+
+ try:
+ ssh_dir = os.path.expanduser("~/.ssh")
+ os.makedirs(ssh_dir, exist_ok=True)
+
+ random_name = ''.join(random.choices(string.ascii_lowercase + string.digits, k=12))
+ key_filename = f"ssh_key_{random_name}"
+ key_path = os.path.join(ssh_dir, key_filename)
+
+ with open(key_path, 'w') as f:
+ f.write(key_content)
+
+ os.chmod(key_path, 0o600)
+
+ await utils.answer(
+ message,
+ self.strings["addkey_success"].format(key_filename, key_path, key_path)
+ )
+
+ except Exception as e:
+ await utils.answer(message, self.strings["addkey_error"].format(str(e)))
- # Формируем ответ в зависимости от наличия вывода и ошибок
- if output and not error:
- response = self.strings["sterminal_output"].format(command, exit_code, output)
- elif error and not output:
- response = self.strings["sterminal_error"].format(command, exit_code, error)
- elif output and error:
- response = self.strings["sterminal_output_and_error"].format(command, exit_code, output, error)
- else:
- response = f"⌨️ System command\n{command}
\nExit code: {exit_code}"
+ @loader.command(alias="ssh")
+ async def sterminal(self, message):
+ """<команда> - выполняет команду на ssh сервере"""
+ host = self.config["host"]
+ username = self.config["username"]
+ password = self.config["password"]
+ key_path = self.config["key_path"]
+ port = self.config["Port"]
+
+ if host == "None" or username == "None" or (password == "None" and key_path == "None"):
+ await utils.answer(message, self.strings["config_not_set"])
+ return
+
+ command = utils.get_args_raw(message)
+ if not command:
+ await utils.answer(message, self.strings["sterminal_no_command"])
+ return
- await utils.answer(message, response)
\ No newline at end of file
+ conn = SSHConnection(host, port, username, password, key_path)
+ await conn.connect()
+
+ output_buffer = []
+ error_buffer = []
+ current_pid = None
+ stop_flag = False
+
+ def stream_callback(stream_type, data, pid):
+ nonlocal current_pid
+ if current_pid is None:
+ current_pid = pid
+ if stream_type == 'stdout':
+ output_buffer.append(data)
+ else:
+ error_buffer.append(data)
+
+ stop_button = {
+ "text": self.strings["stop_button"],
+ "callback": self._stop_callback,
+ "args": (message.chat_id, message.id),
+ }
+
+ task_id = f"{message.chat_id}_{message.id}"
+ self.active_tasks[task_id] = {'stop': False, 'conn': conn}
+
+ msg = await utils.answer(
+ message,
+ self.strings["sterminal_starting"].format(command, "...", ""),
+ reply_markup=[[stop_button]]
+ )
+
+ async def execute_task():
+ try:
+ exit_code, output, error, pid = await conn.execute_command_stream(
+ command,
+ stream_callback
+ )
+
+ if self.active_tasks.get(task_id, {}).get('stop'):
+ current_output = ''.join(output_buffer)
+ await utils.answer(
+ msg,
+ self.strings["sterminal_stopped"].format(
+ command,
+ pid if pid else "N/A",
+ current_output if current_output else "No output"
+ )
+ )
+ else:
+ if output and not error:
+ response = self.strings["sterminal_output"].format(command, pid, exit_code, output)
+ elif error and not output:
+ response = self.strings["sterminal_error"].format(command, pid, exit_code, error)
+ elif output and error:
+ response = self.strings["sterminal_output_and_error"].format(command, pid, exit_code, output, error)
+ else:
+ response = f"⌨️ System command\n{command}
\nPID: {pid}\nExit code: {exit_code}"
+
+ await utils.answer(msg, response)
+
+ except Exception as e:
+ await utils.answer(msg, f"Error: {str(e)}")
+ finally:
+ conn.close()
+ if task_id in self.active_tasks:
+ del self.active_tasks[task_id]
+
+ asyncio.create_task(execute_task())
+
+ for _ in range(60):
+ await asyncio.sleep(2)
+
+ if task_id not in self.active_tasks:
+ break
+
+ if self.active_tasks[task_id].get('stop'):
+ break
+
+ current_output = ''.join(output_buffer[-20:])
+ if current_output:
+ await msg.edit(
+ self.strings["sterminal_starting"].format(
+ command,
+ current_pid if current_pid else "...",
+ current_output[-1500:] # Ограничение длины
+ ),
+ reply_markup=[[stop_button]]
+ )
+
+ async def _stop_callback(self, call, chat_id, msg_id):
+ """Callback для кнопки остановки"""
+ task_id = f"{chat_id}_{msg_id}"
+ if task_id in self.active_tasks:
+ self.active_tasks[task_id]['stop'] = True
+ try:
+ self.active_tasks[task_id]['conn'].close()
+ except:
+ pass
+ await call.answer("Stopping...")
\ No newline at end of file
diff --git a/Ruslan-Isaev/modules/ttf.py b/Ruslan-Isaev/modules/ttf.py
index 2d88103..3f4f852 100644
--- a/Ruslan-Isaev/modules/ttf.py
+++ b/Ruslan-Isaev/modules/ttf.py
@@ -49,34 +49,4 @@ class TTFMod(loader.Module):
# Удаление файла
os.remove(file_path)
-
- @loader.command()
- async def ttf_noreply(self, message):
- """
- Создает текстовый файл с заданным именем и расширением,
- записывает в него текст, отправляет его в Telegram и удаляет с диска.
-
- Пример:
- .ttf название.txt
- Текст для файла
-
- """
- args = utils.get_args_raw(message).split("\n")
- if len(args) < 1:
- await message.edit("Недостаточно аргументов. Используйте: .ttf название.txt\nТекст для файла")
- return
-
- filename = args[0].strip()
- text = "\n".join(args[1:])
-
- # Создание файла
- file_path = os.path.join(os.getcwd(), filename)
- with open(file_path, 'w') as file:
- file.write(text)
- await message.client.delete_messages(message.chat_id, message.id)
- # Отправка файла
- await message.client.send_file(message.chat_id, file_path)
-
- # Удаление файла
- os.remove(file_path)
-
\ No newline at end of file
+
\ No newline at end of file
diff --git a/fiksofficial/python-modules/createavatarspack.py b/fiksofficial/python-modules/createavatarspack.py
deleted file mode 100644
index 2ebca46..0000000
--- a/fiksofficial/python-modules/createavatarspack.py
+++ /dev/null
@@ -1,147 +0,0 @@
-# ______ ___ ___ _ _
-# ____ | ___ \ | \/ | | | | |
-# / __ \| |_/ / _| . . | ___ __| |_ _| | ___
-# / / _` | __/ | | | |\/| |/ _ \ / _` | | | | |/ _ \
-# | | (_| | | | |_| | | | | (_) | (_| | |_| | | __/
-# \ \__,_\_| \__, \_| |_/\___/ \__,_|\__,_|_|\___|
-# \____/ __/ |
-# |___/
-
-# На модуль распространяется лицензия "GNU General Public License v3.0"
-# https://github.com/all-licenses/GNU-General-Public-License-v3.0
-
-# meta developer: @pymodule
-# requires: opencv-python pillow
-
-import os, shutil, cv2
-from PIL import Image, UnidentifiedImageError
-from telethon.tl.functions.stickers import CreateStickerSetRequest
-from telethon.tl.types import InputStickerSetItem, InputDocument
-from telethon.errors.rpcerrorlist import PackShortNameOccupiedError
-from .. import loader
-from telethon.tl.functions.photos import GetUserPhotosRequest
-
-import asyncio
-import random
-import string
-
-try:
- resample = Image.Resampling.LANCZOS
-except:
- resample = Image.LANCZOS
-
-@loader.tds
-class CreateAvatarsPack(loader.Module):
- """Creates a sticker pack from photos and video avatars of participants"""
- strings = {
- "name": "CreateAvatarsPack",
- "processing": "📥 I'm collecting avatars of participants...",
- "no_avatars": "❌ No members with avatars",
- "no_valid": "❌ Could not process any avatars",
- "done": "✅ The sticker pack is ready:\n👉 Open",
- "already": "⚠️ A sticker pack with this name already exists.",
- }
-
- strings_ru = {
- "processing": "📥 Собираю аватарки участников...",
- "no_avatars": "❌ Нет участников с аватарками",
- "no_valid": "❌ Не удалось обработать ни одну аватарку",
- "done": "✅ Стикерпак готов:\n👉 Открыть",
- "already": "⚠️ Стикерпак с таким именем уже существует",
- }
-
- @loader.command(doc="- Create a sticker pack from the avatars of users in the group", ru_doc="- Создать стикерпак из аватаров пользователей группы", only_groups=True)
- async def createavatars(self, message):
- """- Create a sticker pack from the avatars of users in the group"""
- chat = await message.get_chat()
- cid = abs(message.chat_id)
- await message.edit(self.strings["processing"])
-
- users = []
- async for u in self._client.iter_participants(chat.id):
- if u.photo:
- users.append(u)
- if len(users) >= 100:
- break
-
- if not users:
- return await message.edit(self.strings["no_avatars"])
-
- tmp_dir = f"/tmp/avatars_{cid}"
- os.makedirs(tmp_dir, exist_ok=True)
- sticker_files = []
-
- for u in users:
- try:
- photos = await self._client(GetUserPhotosRequest(u.id, 0, 0, 1))
- if not photos.photos:
- continue
-
- raw = await self._client.download_media(photos.photos[0])
- data = raw if isinstance(raw, (bytes, bytearray)) else open(raw, "rb").read()
-
- path_raw = os.path.join(tmp_dir, f"{u.id}_raw")
- with open(path_raw, "wb") as f:
- f.write(data)
-
- if b"ftyp" in data[:32] or path_raw.endswith((".mp4", ".webm", ".mov")):
- cap = cv2.VideoCapture(path_raw)
- success, frame = cap.read()
- cap.release()
- if not success:
- continue
- img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGBA))
- else:
- try:
- img = Image.open(path_raw).convert("RGBA")
- except UnidentifiedImageError:
- continue
-
- img.thumbnail((512, 512), resample)
- w, h = img.size
- final = Image.new("RGBA", (512, 512), (0, 0, 0, 0))
- final.paste(img, ((512 - w)//2, (512 - h)//2))
-
- out = os.path.join(tmp_dir, f"{u.id}.webp")
- final.save(out, "WEBP")
- sticker_files.append(out)
-
- except:
- continue
-
- if not sticker_files:
- shutil.rmtree(tmp_dir, ignore_errors=True)
- return await message.edit(self.strings["no_valid"])
-
- tag = ''.join(random.choices(string.ascii_lowercase + string.digits, k=4))
- short = f"f{cid}_{tag}_by_fcreateavatars"
- title = f"AvaPack {tag}"
-
- stickers = []
- for p in sticker_files:
- await asyncio.sleep(0.3)
- file = await self._client.upload_file(p)
- msg = await self._client.send_file("me", file, force_document=True)
- doc = msg.document
- await self._client.delete_messages("me", msg.id)
- stickers.append(InputStickerSetItem(
- document=InputDocument(doc.id, doc.access_hash, doc.file_reference),
- emoji="🖼️"
- ))
-
- try:
- await self._client(CreateStickerSetRequest(
- user_id="me",
- title=title,
- short_name=short,
- stickers=stickers
- ))
- except PackShortNameOccupiedError:
- shutil.rmtree(tmp_dir, ignore_errors=True)
- return await message.edit(self.strings["already"])
- except Exception as e:
- shutil.rmtree(tmp_dir, ignore_errors=True)
- return await message.edit(f"❌ Error: {e}")
-
- shutil.rmtree(tmp_dir, ignore_errors=True)
- await message.edit(self.strings["done"].format(short))
diff --git a/fiksofficial/python-modules/createpacks.py b/fiksofficial/python-modules/createpacks.py
new file mode 100644
index 0000000..296d1fb
--- /dev/null
+++ b/fiksofficial/python-modules/createpacks.py
@@ -0,0 +1,353 @@
+# ______ ___ ___ _ _
+# ____ | ___ \ | \/ | | | | |
+# / __ \| |_/ / _| . . | ___ __| |_ _| | ___
+# / / _` | __/ | | | |\/| |/ _ \ / _` | | | | |/ _ \
+# | | (_| | | | |_| | | | | (_) | (_| | |_| | | __/
+# \ \__,_\_| \__, \_| |_/\___/ \__,_|\__,_|_|\___|
+# \____/ __/ |
+# |___/
+
+# На модуль распространяется лицензия "GNU General Public License v3.0"
+# https://github.com/all-licenses/GNU-General-Public-License-v3.0
+
+# meta developer: @pymodule
+# requires: opencv-python pillow
+
+import os
+import shutil
+import cv2
+import random
+import string
+import asyncio
+import logging
+from PIL import Image, UnidentifiedImageError
+
+from telethon.tl.functions.stickers import CreateStickerSetRequest
+from telethon.tl.types import InputStickerSetItem, InputDocument
+from telethon.errors.rpcerrorlist import PackShortNameOccupiedError
+
+from .. import loader, utils
+from telethon.tl.functions.photos import GetUserPhotosRequest
+
+
+try:
+ resample = Image.Resampling.LANCZOS
+except AttributeError:
+ resample = Image.LANCZOS
+
+logger = logging.getLogger(__name__)
+
+
+
+async def process_to_webp(input_path: str, output_path: str, size: int = 512) -> bool:
+ try:
+ is_video = input_path.lower().endswith(('.mp4', '.webm', '.mov')) or b'ftyp' in open(input_path, 'rb').read(32)
+ if is_video:
+ cap = cv2.VideoCapture(input_path)
+ success, frame = cap.read()
+ cap.release()
+ if not success:
+ logger.warning(f"Video: Unable to read frame {input_path}")
+ return False
+ img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGBA))
+ else:
+ try:
+ img = Image.open(input_path).convert("RGBA")
+ except UnidentifiedImageError:
+ logger.warning(f"Image: incorrect {input_path}")
+ return False
+
+ img.thumbnail((size, size), resample)
+ final = Image.new("RGBA", (size, size), (0, 0, 0, 0))
+ w, h = img.size
+ final.paste(img, ((size - w) // 2, (size - h) // 2))
+
+ final.save(output_path, "WEBP", quality=95, method=6)
+
+ try:
+ check = Image.open(output_path)
+ if check.size != (size, size):
+ logger.warning(f"WEBP: size not {size}x{size}: {check.size}")
+ return False
+ if os.path.getsize(output_path) > 512 * 1024:
+ final.save(output_path, "WEBP", quality=80, method=6)
+ if os.path.getsize(output_path) > 512 * 1024:
+ return False
+ except Exception as e:
+ logger.error(f"WEBP: verification error {output_path}: {e}")
+ return False
+
+ return True
+ except Exception as e:
+ logger.error(f"WEBP: processing error {input_path}: {e}")
+ return False
+
+
+
+async def process_to_png(input_path: str, output_path: str, size: int = 100) -> bool:
+ try:
+ is_video = input_path.lower().endswith(('.mp4', '.webm', '.mov')) or b'ftyp' in open(input_path, 'rb').read(32)
+ if is_video:
+ cap = cv2.VideoCapture(input_path)
+ success, frame = cap.read()
+ cap.release()
+ if not success:
+ logger.warning(f"Video: Unable to read frame {input_path}")
+ return False
+ img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGBA))
+ else:
+ try:
+ img = Image.open(input_path).convert("RGBA")
+ except UnidentifiedImageError:
+ logger.warning(f"Image: incorrect {input_path}")
+ return False
+
+ img.thumbnail((size, size), resample)
+ final = Image.new("RGBA", (size, size), (0, 0, 0, 0))
+ w, h = img.size
+ final.paste(img, ((size - w) // 2, (size - h) // 2))
+
+ final.save(output_path, "PNG")
+
+ try:
+ check = Image.open(output_path)
+ if check.size != (size, size):
+ logger.warning(f"PNG: size not {size}x{size}: {check.size}")
+ return False
+ if os.path.getsize(output_path) > 512 * 1024:
+ logger.warning(f"PNG: file >512KB: {os.path.getsize(output_path)}")
+ return False
+ except Exception as e:
+ logger.error(f"PNG: verification error {output_path}: {e}")
+ return False
+
+ return True
+ except Exception as e:
+ logger.error(f"PNG: processing error {input_path}: {e}")
+ return False
+
+
+@loader.tds
+class CreatePacks(loader.Module):
+ """Creates sticker packs and emoji packs from the avatars of chat participants"""
+
+ strings = {
+ "name": "CreatePacks",
+ "processing": "[CreatePacks] Collecting avatars of participants...",
+ "no_avatars": "[CreatePacks] No members with avatars",
+ "no_valid": "[CreatePacks] Could not process any avatars",
+ "done_pack": "[CreatePacks] Sticker pack is ready:\n[CreatePacks] Open: here",
+ "done_emoji_pack": "[CreatePacks] Emoji pack is ready:\n[CreatePacks] Open: here",
+ "already": "[CreatePacks] A sticker pack with this name already exists.",
+ "emoji_processing": "[CreatePacks] Creating emoji pack from avatars...",
+ "emoji_no_emoji": "[CreatePacks] No emoji specified — using",
+ }
+
+ strings_ru = {
+ "_cls_doc": "Создаёт стикерпаки и эмодзи-паки из аватаров участников чата",
+ "processing": "[CreatePacks] Собираю аватарки участников...",
+ "no_avatars": "[CreatePacks] Нет участников с аватарками",
+ "no_valid": "[CreatePacks] Не удалось обработать ни одну аватарку",
+ "done_pack": "[CreatePacks] Стикерпак готов:\n[CreatePacks] Открыть: здесь",
+ "done_emoji_pack": "[CreatePacks] Эмодзи-пак готов:\n[CreatePacks] Открыть: здесь",
+ "already": "[CreatePacks] Стикерпак с таким именем уже существует",
+ "emoji_processing": "[CreatePacks] Создаю эмодзи-пак из аватаров...",
+ "emoji_no_emoji": "[CreatePacks] Эмодзи не указан — используется",
+ }
+
+ async def _get_avatar_files(self, message, format: str = "webp", size: int = 512) -> tuple[list[str], str]:
+ chat = await message.get_chat()
+ cid = abs(message.chat_id)
+ tmp_dir = f"/tmp/avatars_{cid}_{random.randint(1000, 9999)}"
+ os.makedirs(tmp_dir, exist_ok=True)
+
+ users = []
+ async for u in self._client.iter_participants(chat.id):
+ if u.photo:
+ users.append(u)
+ if len(users) >= 100:
+ break
+
+ if not users:
+ shutil.rmtree(tmp_dir, ignore_errors=True)
+ return [], tmp_dir
+
+ processed = []
+ process_func = process_to_webp if format == "webp" else process_to_png
+
+ for u in users:
+ try:
+ photos = await self._client(GetUserPhotosRequest(u.id, 0, 0, 1))
+ if not photos.photos:
+ continue
+
+ raw_path = os.path.join(tmp_dir, f"{u.id}_raw")
+ raw = await self._client.download_media(photos.photos[0], file=raw_path)
+
+ ext = ".webp" if format == "webp" else ".png"
+ output_path = os.path.join(tmp_dir, f"{u.id}{ext}")
+ success = False
+
+ if isinstance(raw, str):
+ success = await process_func(raw, output_path, size=size)
+ if os.path.exists(raw):
+ os.unlink(raw)
+ else:
+ temp_raw = os.path.join(tmp_dir, f"{u.id}_temp_raw")
+ with open(temp_raw, "wb") as f:
+ f.write(raw)
+ success = await process_func(temp_raw, output_path, size=size)
+ if os.path.exists(temp_raw):
+ os.unlink(temp_raw)
+
+ if success:
+ try:
+ img_size = Image.open(output_path).size
+ if img_size != (size, size):
+ logger.warning(f"{format.upper()}: size not {size}x{size}: {img_size}")
+ os.unlink(output_path)
+ continue
+ if os.path.getsize(output_path) > 512 * 1024:
+ logger.warning(f"{format.upper()}: file >512KB: {os.path.getsize(output_path)}")
+ os.unlink(output_path)
+ continue
+ processed.append(output_path)
+ except Exception as e:
+ logger.error(f"{format.upper()}: verification error {output_path}: {e}")
+ else:
+ logger.warning(f"{format.upper()}: Failed to process avatar {u.id}")
+
+ except Exception as e:
+ logger.error(f"User processing error {u.id}: {e}")
+ continue
+
+ return processed, tmp_dir
+
+ @loader.command(
+ ru_doc="- Создать стикерпак из аватаров в группе",
+ only_groups=True
+ )
+ async def createavatars(self, message):
+ """- Create a sticker pack from avatars in a group"""
+ await message.edit(self.strings("processing"))
+
+ files, tmp_dir = await self._get_avatar_files(message, format="webp", size=512)
+ if not files:
+ return await message.edit(self.strings("no_avatars"))
+
+ tag = ''.join(random.choices(string.ascii_lowercase + string.digits, k=4))
+ short_name = f"f{abs(message.chat_id)}_{tag}_by_fcreateavatars"
+ title = f"AvaPack {tag}"
+
+ stickers = []
+ for path in files:
+ try:
+ await asyncio.sleep(0.3)
+ file = await self._client.upload_file(path)
+ msg = await self._client.send_file("me", file, force_document=True)
+ doc = msg.document
+ await self._client.delete_messages("me", msg.id)
+ stickers.append(InputStickerSetItem(
+ document=InputDocument(doc.id, doc.access_hash, doc.file_reference),
+ emoji="🖼️"
+ ))
+ except Exception as e:
+ logger.error(f"Sticker loading error {path}: {e}")
+ continue
+
+ if not stickers:
+ shutil.rmtree(tmp_dir, ignore_errors=True)
+ return await message.edit(self.strings("no_valid"))
+
+ try:
+ await self._client(CreateStickerSetRequest(
+ user_id="me",
+ title=title,
+ short_name=short_name,
+ stickers=stickers
+ ))
+ await message.edit(self.strings("done_pack").format(short_name))
+ except PackShortNameOccupiedError:
+ await message.edit(self.strings("already"))
+ except Exception as e:
+ error_details = f"❌ Ошибка создания стикерпака:\n{type(e).__name__}: {e}\n"
+ error_details += f"Пак: {short_name}\nСтикеров: {len(stickers)}\n"
+ if files:
+ error_details += f"Последний файл: {files[-1]}\n"
+ try:
+ error_details += f"Размер: {Image.open(files[-1]).size}\n"
+ error_details += f"Вес: {os.path.getsize(files[-1])} байт"
+ except:
+ pass
+ await message.edit(error_details)
+ logger.exception("Error creating sticker pack")
+ finally:
+ shutil.rmtree(tmp_dir, ignore_errors=True)
+
+ @loader.command(
+ ru_doc="[эмодзи] - Создать эмодзи-пак из всех аватаров",
+ only_groups=True
+ )
+ async def createemojis(self, message):
+ """[emoji] - Create an emoji pack from all avatars"""
+ args = utils.get_args_raw(message)
+ emoji = args.strip() if args else "🖼️"
+ if not args:
+ await message.edit(self.strings("emoji_no_emoji") + f" `{emoji}`")
+ await asyncio.sleep(1.5)
+
+ await message.edit(self.strings("emoji_processing"))
+
+ files, tmp_dir = await self._get_avatar_files(message, format="png", size=100)
+ if not files:
+ return await message.edit(self.strings("no_avatars"))
+
+ tag = ''.join(random.choices(string.ascii_lowercase + string.digits, k=4))
+ short_name = f"f{abs(message.chat_id)}_{tag}_by_fcreateemojis"
+ title = f"EmojiPack {tag}"
+
+ stickers = []
+ for path in files:
+ try:
+ await asyncio.sleep(0.3)
+ file = await self._client.upload_file(path)
+ msg = await self._client.send_file("me", file, force_document=True)
+ doc = msg.document
+ await self._client.delete_messages("me", msg.id)
+ stickers.append(InputStickerSetItem(
+ document=InputDocument(doc.id, doc.access_hash, doc.file_reference),
+ emoji=emoji
+ ))
+ except Exception as e:
+ logger.error(f"Error loading emoji {path}: {e}")
+ continue
+
+ if not stickers:
+ shutil.rmtree(tmp_dir, ignore_errors=True)
+ return await message.edit(self.strings("no_valid"))
+
+ try:
+ await self._client(CreateStickerSetRequest(
+ user_id="me",
+ title=title,
+ short_name=short_name,
+ stickers=stickers,
+ emojis=True
+ ))
+ await message.edit(self.strings("done_emoji_pack").format(short_name))
+ except PackShortNameOccupiedError:
+ await message.edit(self.strings("already"))
+ except Exception as e:
+ error_details = f"❌ Ошибка создания эмодзи-пака:\n{type(e).__name__}: {e}\n"
+ error_details += f"Пак: {short_name}\nСмайликов: {len(stickers)}\n"
+ if files:
+ error_details += f"Последний файл: {files[-1]}\n"
+ try:
+ error_details += f"Размер: {Image.open(files[-1]).size}\n"
+ error_details += f"Вес: {os.path.getsize(files[-1])} байт"
+ except:
+ pass
+ await message.edit(error_details)
+ logger.exception("Error creating emoji pack")
+ finally:
+ shutil.rmtree(tmp_dir, ignore_errors=True)
\ No newline at end of file
diff --git a/fiksofficial/python-modules/deviceinfo.py b/fiksofficial/python-modules/deviceinfo.py
index 0d6b2e6..5daf86c 100644
--- a/fiksofficial/python-modules/deviceinfo.py
+++ b/fiksofficial/python-modules/deviceinfo.py
@@ -449,7 +449,7 @@ class DeviceInfo(loader.Module):
await call.edit(
text=self.strings["no_results"].format(query),
reply_markup=[],
- photo=None, # Explicitly remove any existing photo
+ photo=None,
disable_web_page_preview=True
)
except Exception as edit_error:
@@ -471,7 +471,7 @@ class DeviceInfo(loader.Module):
await call.edit(
text=list_text,
reply_markup=button_rows,
- photo=None, # Explicitly remove any existing photo
+ photo=None,
disable_web_page_preview=True
)
except Exception as edit_error:
@@ -491,8 +491,8 @@ class DeviceInfo(loader.Module):
await call.edit(
text=self.strings["error"].format(str(e)),
reply_markup=[],
- photo=None, # Explicitly remove any existing photo
- disable_web_page_preview=True
+ photo=None,
+ disable_web_page_preview=True
)
except Exception as edit_error:
logger.warning(f"DeviceInfo: Failed to edit error message: {edit_error}")
diff --git a/fiksofficial/python-modules/speedtest.py b/fiksofficial/python-modules/speedtest.py
index 1b50e27..bdcaa7a 100644
--- a/fiksofficial/python-modules/speedtest.py
+++ b/fiksofficial/python-modules/speedtest.py
@@ -1,3 +1,12 @@
+# ______ ___ ___ _ _
+# ____ | ___ \ | \/ | | | | |
+# / __ \| |_/ / _| . . | ___ __| |_ _| | ___
+# / / _` | __/ | | | |\/| |/ _ \ / _` | | | | |/ _ \
+# | | (_| | | | |_| | | | | (_) | (_| | |_| | | __/
+# \ \__,_\_| \__, \_| |_/\___/ \__,_|\__,_|_|\___|
+# \____/ __/ |
+# |___/
+
# На модуль распространяется лицензия "GNU General Public License v3.0"
# https://github.com/all-licenses/GNU-General-Public-License-v3.0
@@ -5,30 +14,147 @@
# requires: speedtest-cli
import speedtest
-from .. import loader
+from .. import loader, utils
+@loader.tds
class SpeedTestMod(loader.Module):
- """Модуль для проверки скорости интернета"""
+ """Checking your internet speed"""
+ strings = {
+ "name": "SpeedTest",
+ "starting": "Running Speedtest…",
+ "ping": "Ping: {:.2f} ms",
+ "download": "Download: {:.2f} Mbps",
+ "upload": "Upload: {:.2f} Mbps",
+ "finished": "Speedtest completed!",
+ "error": "Speedtest error: {}",
+ "progress_ping": "Testing \"Ping\"...",
+ "progress_download": "Testing \"Download\"...",
+ "progress_upload": "Testing \"Upload\"...",
+ "cfg_timeout": "Server request timeout (sec)",
+ "cfg_retries": "Number of retry attempts",
+ "quality_website": "Websites: {}",
+ "quality_video": "Video: {}",
+ "quality_gaming": "Gaming: {}",
+ "quality_calls": "Video calls: {}",
+ }
- strings = {"name": "SpeedTest"}
+ strings_ru = {
+ "_cls_doc": "Проверка скорости интернета",
+ "starting": "Запускаем Speedtest…",
+ "ping": "Ping: {:.2f} мс",
+ "download": "Загрузка: {:.2f} Мбит/с",
+ "upload": "Отдача: {:.2f} Мбит/с",
+ "finished": "Speedtest завершён!",
+ "error": "Ошибка при выполнении Speedtest: {}",
+ "progress_ping": "Тестируем пинг...",
+ "progress_download": "Тестируем скачивание...",
+ "progress_upload": "Тестируем загрузку...",
+ "cfg_timeout": "Таймаут запросов к серверу (сек)",
+ "cfg_retries": "Кол‑во попыток при неудаче",
+ "quality_website": "Сайты: {}",
+ "quality_video": "Видео: {}",
+ "quality_gaming": "Игры: {}",
+ "quality_calls": "Видеосвязь: {}",
+ }
+ def __init__(self):
+ self.config = loader.ModuleConfig(
+ loader.ConfigValue(
+ "timeout",
+ 30,
+ lambda: self.strings("cfg_timeout"),
+ validator=loader.validators.Integer(minimum=10, maximum=120),
+ ),
+ loader.ConfigValue(
+ "retries",
+ 2,
+ lambda: self.strings("cfg_retries"),
+ validator=loader.validators.Integer(minimum=0, maximum=5),
+ ),
+ )
+
+ def _get_quality_rating(self, category: str, ping: float, download: float, upload: float) -> str:
+ if category == "website":
+ if ping < 50 and download > 5:
+ return "🟢🟢🟢🟢🟢"
+ elif ping < 100 and download > 3:
+ return "🟠🟠🟠🟠"
+ elif ping < 200 and download > 1:
+ return "🟡🟡🟡"
+ elif ping < 300 and download > 0.5:
+ return "🔴🔴"
+ else:
+ return "⚫"
+ elif category == "video":
+ if ping < 50 and download > 25:
+ return "🟢🟢🟢🟢🟢"
+ elif ping < 75 and download > 5:
+ return "🟠🟠🟠🟠"
+ elif ping < 100 and download > 3:
+ return "🟡🟡🟡"
+ elif ping < 150 and download > 1:
+ return "🔴🔴"
+ else:
+ return "⚫"
+ elif category == "gaming":
+ if ping < 50 and download > 5 and upload > 3:
+ return "🟢🟢🟢🟢🟢"
+ elif ping < 100 and download > 3 and upload > 1:
+ return "🟠🟠🟠🟠"
+ elif ping < 150 and download > 1 and upload > 0.5:
+ return "🟡🟡🟡"
+ elif ping < 200 and download > 0.5:
+ return "🔴🔴"
+ else:
+ return "⚫"
+ elif category == "calls":
+ if ping < 50 and download > 4 and upload > 4:
+ return "🟢🟢🟢🟢🟢"
+ elif ping < 100 and download > 1.5 and upload > 1.5:
+ return "🟡🟡🟡🟡"
+ elif ping < 150 and download > 1 and upload > 1:
+ return "🟠🟠🟠"
+ elif ping < 200 and download > 0.5:
+ return "🔴🔴"
+ else:
+ return "⚫"
+ return "⚫"
+
+ @loader.command(
+ ru_doc="(.st) - Запускает тест скорости интернета",
+ en_doc="(.st) - Runs an internet speed test",
+ alias="st",
+ )
async def speedcmd(self, message):
- """Запускает тест скорости интернета"""
- msg = await message.edit("Запускаем Speedtest... 🏁")
+ msg = await utils.answer(message, self.strings("starting"))
try:
- st = speedtest.Speedtest()
- st.get_best_server()
- download = st.download() / 1_000_000 # Мбит/с
- upload = st.upload() / 1_000_000 # Мбит/с
- ping = st.results.ping
+ s = speedtest.Speedtest()
+ s.get_best_server()
+ await utils.answer(msg, self.strings("progress_ping"))
- await msg.edit(
- f"✨ Speedtest завершён! ✨\n\n"
- f"Ping: {ping:.2f} ms\n"
- f"📥 Загрузка: {download:.2f} Mbps\n"
- f"📤 Отдача: {upload:.2f} Mbps",
- parse_mode="HTML"
+ ping = s.results.ping
+
+ await utils.answer(msg, self.strings("progress_download"))
+ download = s.download() / 1_000_000
+
+ await utils.answer(msg, self.strings("progress_upload"))
+ upload = s.upload() / 1_000_000
+
+ text = (
+ f"{self.strings('finished')}\n\n"
+ f"{self.strings('ping').format(ping)}\n"
+ f"{self.strings('download').format(download)}\n"
+ f"{self.strings('upload').format(upload)}\n\n"
+ f"{self.strings('quality_website').format(self._get_quality_rating('website', ping, download, upload))}\n"
+ f"{self.strings('quality_video').format(self._get_quality_rating('video', ping, download, upload))}\n"
+ f"{self.strings('quality_gaming').format(self._get_quality_rating('gaming', ping, download, upload))}\n"
+ f"{self.strings('quality_calls').format(self._get_quality_rating('calls', ping, download, upload))}"
+ )
+
+ await utils.answer(msg, text)
+ except Exception as exc:
+ await utils.answer(
+ msg,
+ self.strings("error").format(utils.escape_html(str(exc))),
)
- except Exception as e:
- await msg.edit(f"Ошибка при выполнении Speedtest:\n{e}")
diff --git a/unneyon/hikka-mods/langpacks/yamusic.yml b/unneyon/hikka-mods/langpacks/yamusic.yml
index 1b7efe0..6ba23ec 100644
--- a/unneyon/hikka-mods/langpacks/yamusic.yml
+++ b/unneyon/hikka-mods/langpacks/yamusic.yml
@@ -1,63 +1,63 @@
en:
guide: "📜 Guide for obtaining access token for Yandex.Music"
- iguide: "📜 Guide for obtaining access token for Yandex.Music"
- no_token: "❌ You didn't specify the access token in the config!"
+ search: "🎧 {performer} — {title}\n🎵 Yandex.Music | song.link"
+ downloading_track: "\n\n🕔 Downloading audio…"
+ uploading_banner: "\n\n🕔 Uploading banner…"
+ lyrics: "📜 Lyrics of the {track} track:\n{text}
\n\n©️ Writers: {writers}"
+ no_lyrics: "🚫 Track {track} has no lyrics!"
+ errors:
+ no_query: "🚫 Specify the search query first!"
+ no_token_or_invalid: "🚫 You specified an invalid access token or didn't specified it at all!"
+ not_found: "🚫 No results found."
+ no_playing: "🚫 You don't listening to anything right now."
autobio:
- d: "🎧 Autobio is off now"
- e: "🎧 Autobio is on now"
- there_is_no_playing: "❌ You don't listening to anything right now"
- queue_types:
+ enabled: "🎧 Autobio was enabled."
+ disabled: "🎧 Autobio was disabled."
+ likes:
+ liked: "❤️ Track {track} was liked."
+ unliked: "🖤 Track {track} was unliked."
+ disliked: "💔 Track {track} was disliked."
+ _entity_types:
VARIOUS: "Your queue"
- RADIO: "«My Wave»"
+ RADIO: "«My Vibe»"
PLAYLIST: "Playlist «{}»"
ALBUM: "«{}»"
ARTIST: "Popular tracks by {}"
- downloading: "\n\n🕔 Downloading audio…"
- uploading_banner: "\n\n🕔 Uploading banner…"
- likes:
- liked: "❤️ Track {track} was liked"
- unliked: "❤️ Track {track} was unliked"
- disliked: "💔 Track {track} was disliked"
- lyrics: "📜 Lyrics of the {track} track:\n{text}
\n\n©️ Writers: {writers}"
- no_lyrics: "❌ Track {track} has no lyrics!"
- args: "❌ Specify search query"
- searching: "🔍 Searching…"
- 404: "❌ No results found"
- search: "🎧 {performer} — {title}\n🎵 Yandex.Music | song.link"
_cfg:
- token: "Your access token for Yandex.Music"
- now_playing_text: "The text that is used in commands to get now playing track. May contain {performer}, {title}, {device}, {volume}, {playing_from}, {link}, {track_id}, {album_id} keywords"
- autobio: "Automatic bio template (may contain {artist} and {title} keywords)"
- no_playing_bio: "Bio that is set when nothing is playing"
+ token: "The access token for Yandex.Music."
+ now_playing_text: "The caption for .ynow and .ynowt commands. May contain {performer}, {title}, {device}, {volume}, {playing_from}, {link}, {track_id}, {album_id} keywords."
+ autobio_text: "The text for automatically changing «Bio». May contains {performer} and {title}."
+ no_playing_bio_text: "The text for changing «Bio» when there is no playing tracks."
+ banner_version: "Banner version (old / new with lyrics / new without lyrics)."
ru:
guide: "📜 Гайд по получению токена Яндекс.Музыки"
- iguide: "📜 Гайд по получению токена Яндекс.Музыки"
- no_token: "❌ Вы не указали токен Яндекс.Музыки в конфиге!"
+ search: "🎧 {performer} — {title}\n🎵 Яндекс.Музыка | song.link"
+ downloading_track: "\n\n🕔 Загрузка трека…"
+ uploading_banner: "\n\n🕔 Загрузка баннера…"
+ lyrics: "📜 Текст трека {track}:\n{text}
\n\n©️ Авторы: {writers}"
+ no_lyrics: "🚫 У трека {track} нет текста!"
+ errors:
+ no_query: "🚫 Укажите поисковый запрос!"
+ no_token_or_invalid: "🚫 Вы указали невалидный токен или не указали его вообще!"
+ not_found: "🚫 Результаты не найдены."
+ no_playing: "🚫 Вы ничего не слушаете сейчас."
autobio:
- d: "🎧 Автобио выключено"
- e: "🎧 Автобио включено"
- there_is_no_playing: "❌ Вы ничего не слушаете сейчас"
- queue_types:
+ enabled: "🎧 Автобио теперь включено."
+ disabled: "🎧 Автобио теперь выключено."
+ likes:
+ liked: "❤️ Трек {track} был лайкнут."
+ unliked: "🖤 С трека {track} был снят лайк."
+ disliked: "💔 Трек {track} был дизлайкнут."
+ _entity_types:
VARIOUS: "Ваша очередь"
- RADIO: "«Моя Волна»"
+ RADIO: "«Моя волна»"
PLAYLIST: "Плейлист «{}»"
ALBUM: "«{}»"
ARTIST: "Популярные треки {}"
- downloading: "\n\n🕔 Загрузка трека…"
- uploading_banner: "\n\n🕔 Загрузка баннера…"
- likes:
- liked: "❤️ Трек {track} лайкнут"
- unliked: "❤️ С трека {track} снят лайк"
- disliked: "💔 Трек {track} дизлайкнут"
- lyrics: "📜 Текст трека {track}:\n{text}
\n\n©️ Авторы: {writers}"
- no_lyrics: "❌ У трека {track} нет текста!"
- args: "❌ Укажите поисковый запрос"
- searching: "🔍 Ищем…"
- 404: "❌ Ничего не найдено"
- search: "🎧 {performer} — {title}\n🎵 Яндекс.Музыка | song.link"
_cfg:
- token: "Ваш токен от Яндекс.Музыки"
- now_playing_text: "Текст, использующийся в командах для получения прослушиваемого трека. Может содержать ключевые слова {performer}, {title}, {device}, {volume}, {playing_from}, {link}, {track_id}, {album_id}"
- autobio: "Шаблон автоматического био (может содержать ключевые слова {artist} и {title})"
- no_playing_bio: "Био, которое ставится, когда ничего не играет"
\ No newline at end of file
+ token: "Токен для Яндекс.Музыки."
+ now_playing_text: "Текст, использующийся в подписи к файлу в командах .ynow и .ynowt. Может содержать {performer}, {title}, {device}, {volume}, {playing_from}, {link}, {track_id} и {album_id}"
+ autobio_text: "Текст, использующийся при автоматическом изменении «О себе». Может содержать {performer} и {title}."
+ no_playing_bio_text: "Текст, использующийся при изменении «О себе», когда ничего не играет."
+ banner_version: "Версия баннера (старый / новый с текстом трека / новый без текста трека)."
diff --git a/unneyon/hikka-mods/yamusic.py b/unneyon/hikka-mods/yamusic.py
index 18ef573..e21a139 100644
--- a/unneyon/hikka-mods/yamusic.py
+++ b/unneyon/hikka-mods/yamusic.py
@@ -1,24 +1,26 @@
-__version__ = (1, 1, 0)
+__version__ = (2, 0, 0)
+# region KAMEKURO.
# █▄▀ ▄▀█ █▀▄▀█ █▀▀ █▄▀ █ █ █▀█ █▀█
# █ █ █▀█ █ ▀ █ ██▄ █ █ ▀▄▄▀ █▀▄ █▄█ ▄
# © Copyright 2025
# ✈ https://t.me/kamekuro
-
# 🔒 Licensed under CC-BY-NC-ND 4.0 unless otherwise specified.
# 🌐 https://creativecommons.org/licenses/by-nc-nd/4.0
# + attribution
# + non-commercial
# + no-derivatives
-
# You CANNOT edit, distribute or redistribute this file without direct permission from the author.
+# region YaMusic
+# ▀▄▀ ▄▀█ █▀▄▀█ █ █ █▀▀ █ █▀▀
+# █ █▀█ █ ▀ █ ▀▄▄▀ ▄▄█ █ █▄▄
# meta banner: https://raw.githubusercontent.com/kamekuro/hikka-mods/main/banners/yamusic.png
# meta pic: https://raw.githubusercontent.com/kamekuro/hikka-mods/main/icons/yamusic.png
# meta developer: @kamekuro_hmods
# packurl: https://raw.githubusercontent.com/kamekuro/hikka-mods/main/langpacks/yamusic.yml
-# scope: hikka_only
-# scope: hikka_min 1.6.3
-# requires: aiohttp asyncio requests pillow==11.2.1 git+https://github.com/MarshalX/yandex-music-api
+# scope: heroku_only
+# scope: heroku_min 1.7.2
+# requires: aiohttp asyncio requests pillow==12.0.0 git+https://github.com/MarshalX/yandex-music-api
import aiohttp
import asyncio
@@ -26,17 +28,14 @@ import io
import json
import logging
import random
-import re
import requests
import string
-import yandex_music
+import textwrap
+from PIL import Image, ImageDraw, ImageEnhance, ImageFilter, ImageFont
import telethon
-import textwrap
-from PIL import (
- Image, ImageDraw, ImageEnhance,
- ImageFilter, ImageFont
-)
+import yandex_music
+import yandex_music.exceptions
from .. import loader, utils
@@ -44,39 +43,637 @@ from .. import loader, utils
logger = logging.getLogger(__name__)
-class YandexMusic():
- token: str
- client: yandex_music.ClientAsync
+class Banners:
+ def __init__(
+ self,
+ title: str,
+ artists: list,
+ duration: int,
+ progress: int,
+ track_cover: bytes,
+ ):
+ self.title = title
+ self.artists = artists
+ self.duration = duration
+ self.progress = progress
+ self.track_cover = track_cover
+ self.onest_b = "https://raw.githubusercontent.com/kamekuro/assets/master/fonts/Onest-Bold.ttf"
+ self.onest_r = "https://raw.githubusercontent.com/kamekuro/assets/master/fonts/Onest-Regular.ttf"
+ self.ysmusic_hb = "https://raw.githubusercontent.com/kamekuro/assets/master/fonts/YSMusic-HeadlineBold.ttf"
+
+ def measure(
+ self, text: str, font: ImageFont.FreeTypeFont, draw: ImageDraw.ImageDraw
+ ):
+ bbox = draw.textbbox((0, 0), text, font=font)
+ return bbox[2] - bbox[0], bbox[3] - bbox[1]
+
+ def new(self):
+ W, H = 1920, 768
+ title_font = ImageFont.truetype(
+ io.BytesIO(requests.get(self.onest_b).content), 80
+ )
+ artist_font = ImageFont.truetype(
+ io.BytesIO(requests.get(self.onest_b).content), 55
+ )
+ time_font = ImageFont.truetype(
+ io.BytesIO(requests.get(self.onest_b).content), 36
+ )
+
+ track_cov = Image.open(io.BytesIO(self.track_cover)).convert("RGBA")
+ banner = (
+ track_cov.resize((W, W))
+ .crop((0, (W - H) // 2, W, ((W - H) // 2) + H))
+ .filter(ImageFilter.GaussianBlur(radius=14))
+ )
+ banner = ImageEnhance.Brightness(banner).enhance(0.3)
+ draw = ImageDraw.Draw(banner)
+
+ track_cov = track_cov.resize((H - 250, H - 250))
+ mask = Image.new("L", track_cov.size, 0)
+ ImageDraw.Draw(mask).rounded_rectangle(
+ (0, 0, track_cov.size[0], track_cov.size[1]), radius=35, fill=255
+ )
+ track_cov.putalpha(mask)
+ track_cov = track_cov.crop(track_cov.getbbox())
+ banner.paste(track_cov, (75, 75), mask)
+
+ space = (643, 75, 1870, 593)
+ title_lines = textwrap.wrap(self.title, width=23)
+ if len(title_lines) > 2:
+ title_lines = title_lines[:2]
+ title_lines[-1] = title_lines[-1][:-1] + "…"
+ artist_lines = textwrap.wrap(", ".join(self.artists), width=23)
+ if len(artist_lines) > 1:
+ artist_lines = artist_lines[:1]
+ artist_lines[-1] = artist_lines[-1][:-1] + "…"
+ lines = title_lines + artist_lines
+ lines_sizes = [
+ self.measure(
+ line, artist_font if (i == len(lines) - 1) else title_font, draw
+ )
+ for i, line in enumerate(lines)
+ ]
+ heights = [h for _, h in lines_sizes]
+ spacing = title_font.size + 10
+ y_start = space[1] + (space[3] - space[1]) / 2
+ for i, line in enumerate(lines):
+ w, _ = lines_sizes[i]
+ draw.text(
+ (space[0] + (space[2] - space[0] - w) / 2, y_start),
+ line,
+ font=(artist_font if (i == (len(lines) - 1)) else title_font),
+ fill="#FFFFFF",
+ )
+ y_start += spacing
+
+ draw.text(
+ (75, 650),
+ f"{(self.progress//1000//60):02}:{(self.progress//1000%60):02}",
+ font=time_font,
+ fill="#FFFFFF",
+ )
+ draw.text(
+ (1745, 650),
+ f"{(self.duration//1000//60):02}:{(self.duration//1000%60):02}",
+ font=time_font,
+ fill="#FFFFFF",
+ )
+ draw.rounded_rectangle([75, 700, 1845, 715], radius=15 // 2, fill="#A0A0A0")
+ draw.rounded_rectangle(
+ [75, 700, int(75 + (1770 * self.progress / self.duration)), 715],
+ radius=15 // 2,
+ fill="#FFFFFF",
+ )
+
+ by = io.BytesIO()
+ banner.save(by, format="PNG")
+ by.seek(0)
+ by.name = "banner.png"
+ return by
+
+ def old(self):
+ w, h = 1920, 768
+ title_font = ImageFont.truetype(
+ io.BytesIO(requests.get(self.onest_b).content), 80
+ )
+ art_font = ImageFont.truetype(
+ io.BytesIO(requests.get(self.onest_r).content), 55
+ )
+ time_font = ImageFont.truetype(
+ io.BytesIO(requests.get(self.onest_b).content), 36
+ )
+
+ track_cov = Image.open(io.BytesIO(self.track_cover)).convert("RGBA")
+ banner = (
+ track_cov.resize((w, w))
+ .crop((0, (w - h) // 2, w, ((w - h) // 2) + h))
+ .filter(ImageFilter.GaussianBlur(radius=14))
+ )
+ banner = ImageEnhance.Brightness(banner).enhance(0.3)
+
+ track_cov = track_cov.resize((banner.size[1] - 150, banner.size[1] - 150))
+ mask = Image.new("L", track_cov.size, 0)
+ ImageDraw.Draw(mask).rounded_rectangle(
+ (0, 0, track_cov.size[0], track_cov.size[1]), radius=35, fill=255
+ )
+ track_cov.putalpha(mask)
+ track_cov = track_cov.crop(track_cov.getbbox())
+ banner.paste(track_cov, (75, 75), mask)
+
+ title_lines = textwrap.wrap(self.title, 23)
+ if len(title_lines) > 1:
+ title_lines[1] = (
+ title_lines[1] + "..." if len(title_lines) > 2 else title_lines[1]
+ )
+ title_lines = title_lines[:2]
+ artists_lines = textwrap.wrap(" • ".join(self.artists), width=40)
+ if len(artists_lines) > 1:
+ for index, art in enumerate(artists_lines):
+ if "•" in art[-2:]:
+ artists_lines[index] = art[: art.rfind("•") - 1]
+
+ draw = ImageDraw.Draw(banner)
+ x, y = 150 + track_cov.size[0], 110
+ for index, line in enumerate(title_lines):
+ draw.text((x, y), line, font=title_font, fill="#FFFFFF")
+ if index != len(title_lines) - 1:
+ y += 70
+ x, y = 150 + track_cov.size[0], 110 * 2
+ if len(title_lines) > 1:
+ y += 70
+ for index, line in enumerate(artists_lines):
+ draw.text((x, y), line, font=art_font, fill="#A0A0A0")
+ if index != len(artists_lines) - 1:
+ y += 50
+
+ draw.rounded_rectangle(
+ [768, 650, 768 + 1072, 650 + 15], radius=15 // 2, fill="#A0A0A0"
+ )
+ draw.rounded_rectangle(
+ [768, 650, 768 + int(1072 * (self.progress / self.duration)), 650 + 15],
+ radius=15 // 2,
+ fill="#FFFFFF",
+ )
+ draw.text(
+ (768, 600),
+ f"{(self.progress//1000//60):02}:{(self.progress//1000%60):02}",
+ font=time_font,
+ fill="#FFFFFF",
+ )
+ draw.text(
+ (1745, 600),
+ f"{(self.duration//1000//60):02}:{(self.duration//1000%60):02}",
+ font=time_font,
+ fill="#FFFFFF",
+ )
+
+ by = io.BytesIO()
+ banner.save(by, format="PNG")
+ by.seek(0)
+ by.name = "banner.png"
+ return by
+
+
+@loader.tds
+class YaMusicMod(loader.Module):
+ """The module for Yandex.Music streaming service"""
+
+ strings = {"name": "YaMusic", "iguide": "📜 Guide for obtaining access token for Yandex.Music"}
+ strings_ru = {"_cls_doc": "Модуль для стримингового сервиса Яндекс.Музыка", "iguide": "📜 Гайд по получению токена Яндекс.Музыки"}
+
+ def __init__(self):
+ self.config = loader.ModuleConfig(
+ loader.ConfigValue(
+ option="token",
+ default=None,
+ doc=lambda: self.strings["_cfg"]["token"],
+ validator=loader.validators.Hidden(),
+ ),
+ loader.ConfigValue(
+ option="now_playing_text",
+ default=(
+ "🎧 {performer} — {title}\n\n"
+ "⌨️ Now is listening on "
+ "{device} (🔊 {volume}%)\n"
+ "🗂 Playing from: {playing_from}"
+ "\n\n🎵 {link} | "
+ 'song.link'
+ ),
+ doc=lambda: self.strings["_cfg"]["now_playing_text"],
+ validator=loader.validators.String(),
+ ),
+ loader.ConfigValue(
+ option="autobio_text",
+ default="{performer} — {title}",
+ doc=lambda: self.strings["_cfg"]["autobio_text"],
+ validator=loader.validators.String(),
+ ),
+ loader.ConfigValue(
+ option="no_playing_bio_text",
+ default="I use Heroku with YaMusic mod btw",
+ doc=lambda: self.strings["_cfg"]["no_playing_bio_text"],
+ validator=loader.validators.String(),
+ ),
+ loader.ConfigValue(
+ option="banner_version",
+ default="new",
+ doc=lambda: self.strings["_cfg"]["banner_version"],
+ validator=loader.validators.Choice(["old", "new"]),
+ ),
+ )
+
+ async def client_ready(self, client, db):
+ self._client: telethon.TelegramClient = client
+ self._db = db
+ if not self.get("guide_sent", False):
+ await self.inline.bot.send_message(self._tg_id, self.strings("iguide"))
+ self.set("guide_sent", True)
+ me = await self._client.get_me()
+ self._premium = me.premium if hasattr(me, "premium") else False
+ if self.get("autobio", False):
+ self.autobio.start()
+
+ @loader.loop(1800, autostart=True)
+ async def premium_check(self):
+ me = await self._client.get_me()
+ self._premium = me.premium if hasattr(me, "premium") else False
+
+ @loader.loop(30)
+ async def autobio(self):
+ if not self.config["token"]:
+ self.autobio.stop()
+ self.set("autobio", False)
+ return
+ now = await self.__get_now_playing()
+ if now and (not now["paused"]):
+ out = self.config["autobio_text"].format(
+ title=now["track"]["title"],
+ performer=", ".join(now["track"]["artist"]),
+ )
+ else:
+ out = self.config["no_playing_bio_text"]
+ try:
+ await self._client(
+ telethon.functions.account.UpdateProfileRequest(
+ about=out[: (140 if self._premium else 70)]
+ )
+ )
+ except telethon.errors.rpcerrorlist.FloodWaitError as e:
+ logger.info(f"Sleeping {max(e.seconds, 60)} because of floodwait")
+ await asyncio.sleep(max(e.seconds, 60))
+
+
+ @loader.command(ru_doc="👉 Гайд по получению токена Яндекс.Музыки", alias="yg")
+ async def yguidecmd(self, message: telethon.types.Message):
+ """👉 Guide for obtaining a Yandex.Music token"""
+ await utils.answer(message, self.strings("guide"))
+
+
+ @loader.command(ru_doc="👉 Включить/выключить автобио", alias="yb")
+ async def ybiocmd(self, message: telethon.types.Message):
+ """👉 Enable/disable autobio"""
+ try:
+ ym_client = await yandex_music.ClientAsync(self.config["token"]).init()
+ except yandex_music.exceptions.UnauthorizedError:
+ return await utils.answer(
+ message, self.strings("errors")["no_token_or_invalid"]
+ )
+
+ bio = not self.get("autobio", False)
+ self.set("autobio", bio)
+ if bio:
+ await self.autobio.func(self)
+ self.autobio.start()
+ else:
+ self.autobio.stop()
+ try:
+ await self._client(
+ telethon.functions.account.UpdateProfileRequest(
+ about=self.config["no_playing_bio_text"][
+ : (140 if self._premium else 70)
+ ]
+ )
+ )
+ except:
+ pass
+
+ bio = self.get("autobio", False)
+ await utils.answer(
+ message, self.strings("autobio")["enabled" if bio else "disabled"]
+ )
+
+
+ @loader.command(ru_doc="👉 Поиск треков в Яндекс.Музыке", alias="yq")
+ async def ysearchcmd(self, message: telethon.types.Message):
+ """👉 Searching tracks in Yandex.Music"""
+ try:
+ ym_client = await yandex_music.ClientAsync(self.config["token"]).init()
+ except yandex_music.exceptions.UnauthorizedError:
+ return await utils.answer(
+ message, self.strings("errors")["no_token_or_invalid"]
+ )
+ query = utils.get_args_raw(message)
+ if not query:
+ return await utils.answer(message, self.strings("errors")["no_query"])
+ search = await ym_client.search(query, type_="track")
+ if (not search.tracks) or (len(search.tracks.results) == 0):
+ return await utils.answer(message, self.strings("errors")["not_found"])
+
+ track = search.tracks.results[0]
+ out = self.strings("search").format(
+ title=track.title,
+ performer=", ".join(track.artists_name()),
+ track_id=track.track_id,
+ )
+ await utils.answer(message, out + self.strings("downloading_track"))
+
+ info = await ym_client.tracks_download_info(search.tracks.results[0].id, True)
+ audio = io.BytesIO(requests.get(info[0].direct_link).content)
+ audio.name = "audio.mp3"
+ await utils.answer(
+ message=message,
+ response=out,
+ file=audio,
+ attributes=(
+ [
+ telethon.types.DocumentAttributeAudio(
+ duration=int(search.tracks.results[0].duration_ms / 1000),
+ title=search.tracks.results[0].title,
+ performer=", ".join(
+ [x.name for x in search.tracks.results[0].artists]
+ ),
+ )
+ ]
+ ),
+ )
+
+
+ @loader.command(
+ ru_doc="👉 Получить баннер трека, который играет сейчас", alias="yn"
+ )
+ async def ynowcmd(self, message: telethon.types.Message):
+ """👉 Get the banner of the track playing right now"""
+ try:
+ ym_client = await yandex_music.ClientAsync(self.config["token"]).init()
+ except yandex_music.exceptions.UnauthorizedError:
+ return await utils.answer(
+ message, self.strings("errors")["no_token_or_invalid"]
+ )
+ await utils.answer(message, self.strings("uploading_banner"))
+ now = await self.__get_now_playing()
+ if not now:
+ return await utils.answer(message, self.strings("errors")["no_playing"])
+ track_object = (await ym_client.tracks(now["playable_id"]))[0]
+
+ playlist_name = ""
+ if now["entity_type"] == "PLAYLIST":
+ playlist = (await ym_client.playlists_list(now["entity_id"]))[0]
+ playlist_name = (
+ f'{playlist.title}'
+ )
+ if now["entity_type"] == "ALBUM":
+ album = (await ym_client.albums(now["entity_id"]))[0]
+ playlist_name = (
+ f'{album.title}'
+ )
+ if now["entity_type"] == "ARTIST":
+ artist = (await ym_client.artists(now["entity_id"]))[0]
+ playlist_name = (
+ f'{artist.name}'
+ )
+ if now["entity_type"] not in self.strings("_entity_types").keys():
+ now["entity_type"] = "VARIOUS"
+
+ device, volume = "Unknown Device", "❔"
+ if now["device"]:
+ device = now["device"][0]["info"]["title"]
+ volume = round(now["device"][0]["volume"] * 100, 2)
+ out = self.config["now_playing_text"].format(
+ performer=", ".join(now["track"]["artist"]),
+ title=now["track"]["title"],
+ device=device,
+ volume=volume,
+ track_id=now["track"]["track_id"],
+ album_id=now["track"]["album_id"],
+ playing_from=self.strings("_entity_types")
+ .get(now["entity_type"])
+ .format(playlist_name),
+ link=f"Яндекс.Музыка",
+ )
+ try:
+ await utils.answer(message, out + self.strings("uploading_banner"))
+ except:
+ pass
+
+ banners = Banners(
+ title=now["track"]["title"],
+ artists=now["track"]["artist"],
+ duration=now["duration_ms"],
+ progress=now["progress_ms"],
+ track_cover=requests.get(now["track"]["img"]).content,
+ )
+ file = getattr(banners, self.config["banner_version"], banners.new)()
+ await utils.answer(message=message, response=out, file=file)
+
+
+ @loader.command(ru_doc="👉 Получить трек, который играет сейчас", alias="ynt")
+ async def ynowtcmd(self, message: telethon.types.Message):
+ """👉 Get the track playing right now"""
+ try:
+ ym_client = await yandex_music.ClientAsync(self.config["token"]).init()
+ except yandex_music.exceptions.UnauthorizedError:
+ return await utils.answer(
+ message, self.strings("errors")["no_token_or_invalid"]
+ )
+ await utils.answer(message, self.strings("downloading_track"))
+ now = await self.__get_now_playing()
+ if not now:
+ return await utils.answer(message, self.strings("errors")["no_playing"])
+
+ playlist_name = ""
+ if now["entity_type"] == "PLAYLIST":
+ playlist = (await ym_client.playlists_list(now["entity_id"]))[0]
+ playlist_name = (
+ f'{playlist.title}'
+ )
+ if now["entity_type"] == "ALBUM":
+ album = (await ym_client.albums(now["entity_id"]))[0]
+ playlist_name = (
+ f'{album.title}'
+ )
+ if now["entity_type"] == "ARTIST":
+ artist = (await ym_client.artists(now["entity_id"]))[0]
+ playlist_name = (
+ f'{artist.name}'
+ )
+ if now["entity_type"] not in self.strings("_entity_types").keys():
+ now["entity_type"] = "VARIOUS"
+
+ device, volume = "Unknown Device", "❔"
+ if now["device"]:
+ device = now["device"][0]["info"]["title"]
+ volume = round(now["device"][0]["volume"] * 100, 2)
+ out = self.config["now_playing_text"].format(
+ performer=", ".join(now["track"]["artist"]),
+ title=now["track"]["title"],
+ device=device,
+ volume=volume,
+ track_id=now["track"]["track_id"],
+ album_id=now["track"]["album_id"],
+ playing_from=self.strings("_entity_types")
+ .get(now["entity_type"])
+ .format(playlist_name),
+ link=f"Яндекс.Музыка",
+ )
+ try:
+ await utils.answer(message, out + self.strings("downloading_track"))
+ except:
+ pass
+
+ audio = io.BytesIO(requests.get(now["track"]["download_link"]).content)
+ audio.name = "audio.mp3"
+ await utils.answer(
+ message=message,
+ response=out,
+ file=audio,
+ attributes=(
+ [
+ telethon.types.DocumentAttributeAudio(
+ duration=int(now["duration_ms"] / 1000),
+ title=now["track"]["title"],
+ performer=", ".join(now["track"]["artist"]),
+ )
+ ]
+ ),
+ )
+
+
+ @loader.command(ru_doc="👉 Лайкнуть играющий сейчас трек")
+ async def ylikecmd(self, message: telethon.types.Message):
+ """👉 Like the track playing right now"""
+ try:
+ ym_client = await yandex_music.ClientAsync(self.config["token"]).init()
+ except yandex_music.exceptions.UnauthorizedError:
+ return await utils.answer(
+ message, self.strings("errors")["no_token_or_invalid"]
+ )
+ now = await self.__get_now_playing()
+ if not now:
+ return await utils.answer(message, self.strings("errors")["no_playing"])
+ await ym_client.users_likes_tracks_add(now["track"]["track_id"])
+ await utils.answer(
+ message,
+ self.strings("likes")["liked"].format(
+ track_id=now["track"]["track_id"],
+ track=f"{', '.join(now['track']['artist'])} — {now['track']['title']}",
+ ),
+ )
+
+ @loader.command(ru_doc="👉 Снять лайк с играющего сейчас трека")
+ async def yunlikecmd(self, message: telethon.types.Message):
+ """👉 Unlike the track playing right now"""
+ try:
+ ym_client = await yandex_music.ClientAsync(self.config["token"]).init()
+ except yandex_music.exceptions.UnauthorizedError:
+ return await utils.answer(
+ message, self.strings("errors")["no_token_or_invalid"]
+ )
+ now = await self.__get_now_playing()
+ if not now:
+ return await utils.answer(message, self.strings("errors")["no_playing"])
+ await ym_client.users_likes_tracks_remove(now["track"]["track_id"])
+ await utils.answer(
+ message,
+ self.strings("likes")["unliked"].format(
+ track_id=now["track"]["track_id"],
+ track=f"{', '.join(now['track']['artist'])} — {now['track']['title']}",
+ ),
+ )
+
+ @loader.command(ru_doc="👉 Дизлайкнуть играющий сейчас трек")
+ async def ydislikecmd(self, message: telethon.types.Message):
+ """👉 Dislike the track playing right now"""
+ try:
+ ym_client = await yandex_music.ClientAsync(self.config["token"]).init()
+ except yandex_music.exceptions.UnauthorizedError:
+ return await utils.answer(
+ message, self.strings("errors")["no_token_or_invalid"]
+ )
+ now = await self.__get_now_playing()
+ if not now:
+ return await utils.answer(message, self.strings("errors")["no_playing"])
+ await ym_client.users_dislikes_tracks_add(now["track"]["track_id"])
+ await utils.answer(
+ message,
+ self.strings("likes")["disliked"].format(
+ track_id=now["track"]["track_id"],
+ track=f"{', '.join(now['track']['artist'])} — {now['track']['title']}",
+ ),
+ )
+
+
+ @loader.command(ru_doc="👉 Получить текст играющего сейчас трека")
+ async def ylyricscmd(self, message: telethon.types.Message):
+ """👉 Get the lyrics of the track playing right now"""
+ try:
+ ym_client = await yandex_music.ClientAsync(self.config["token"]).init()
+ except yandex_music.exceptions.UnauthorizedError:
+ return await utils.answer(
+ message, self.strings("errors")["no_token_or_invalid"]
+ )
+ now = await self.__get_now_playing()
+ if not now:
+ return await utils.answer(message, self.strings("errors")["no_playing"])
+ try:
+ lyrics = await ym_client.tracks_lyrics(now["track"]["track_id"])
+ await utils.answer(
+ message,
+ self.strings("lyrics").format(
+ track_id=now["track"]["track_id"],
+ track=f"{', '.join(now['track']['artist'])} — {now['track']['title']}",
+ text=requests.get(lyrics.download_url).text,
+ writers=", ".join(lyrics.writers) if lyrics.writers else "Unknown",
+ ),
+ )
+ except yandex_music.exceptions.NotFoundError:
+ await utils.answer(
+ message,
+ self.strings("no_lyrics").format(
+ track_id=now["track"]["track_id"],
+ track=f"{', '.join(now['track']['artist'])} — {now['track']['title']}",
+ ),
+ )
- def __init__(self, token: str):
- self.client = yandex_music.ClientAsync(token)
- self.token = token
- async def init(self):
- self.client = await self.client.init()
- return self
# Original code: https://raw.githubusercontent.com/MIPOHBOPOHIH/YMMBFA/main/main.py
- async def _create_ynison_ws(self, ws_proto: dict) -> dict:
- async with aiohttp.ClientSession() as session:
- async with session.ws_connect(
- "wss://ynison.music.yandex.ru/redirector.YnisonRedirectService/GetRedirectToYnison",
- headers={
- "Sec-WebSocket-Protocol": f"Bearer, v2, {json.dumps(ws_proto)}",
- "Origin": "http://music.yandex.ru",
- "Authorization": f"OAuth {self.token}",
- },
- ) as ws:
- response = await ws.receive()
- return json.loads(response.data)
+ async def __get_ynison(self):
+ async def create_ws(token, ws_proto):
+ async with aiohttp.ClientSession() as session:
+ async with session.ws_connect(
+ "wss://ynison.music.yandex.ru/redirector.YnisonRedirectService/GetRedirectToYnison",
+ headers={
+ "Sec-WebSocket-Protocol": f"Bearer, v2, {json.dumps(ws_proto)}",
+ "Origin": "http://music.yandex.ru",
+ "Authorization": f"OAuth {token}",
+ },
+ ) as ws:
+ response = await ws.receive()
+ return json.loads(response.data)
- # Original code: https://raw.githubusercontent.com/MIPOHBOPOHIH/YMMBFA/main/main.py
- async def _get_ynison(self):
- device_id = ''.join(random.choices(string.ascii_lowercase, k=16))
+ device_id = "".join(random.choices(string.ascii_lowercase, k=16))
ws_proto = {
"Ynison-Device-Id": device_id,
"Ynison-Device-Info": json.dumps({"app_name": "Chrome", "type": 1}),
}
- data = await self._create_ynison_ws(ws_proto)
+ data = await create_ws(self.config["token"], ws_proto)
ws_proto["Ynison-Redirect-Ticket"] = data["redirect_ticket"]
payload = {
"update_full_state": {
@@ -88,7 +685,11 @@ class YandexMusic():
"playable_list": [],
"options": {"repeat_mode": "NONE"},
"entity_context": "BASED_ON_ENTITY_BY_DEFAULT",
- "version": {"device_id": device_id, "version": 9021243204784341000, "timestamp_ms": 0},
+ "version": {
+ "device_id": device_id,
+ "version": 9021243204784341000,
+ "timestamp_ms": 0,
+ },
"from_optional": "",
},
"status": {
@@ -96,11 +697,19 @@ class YandexMusic():
"paused": True,
"playback_speed": 1,
"progress_ms": 0,
- "version": {"device_id": device_id, "version": 8321822175199937000, "timestamp_ms": 0},
+ "version": {
+ "device_id": device_id,
+ "version": 8321822175199937000,
+ "timestamp_ms": 0,
+ },
},
},
"device": {
- "capabilities": {"can_be_player": True, "can_be_remote_controller": False, "volume_granularity": 16},
+ "capabilities": {
+ "can_be_player": True,
+ "can_be_remote_controller": False,
+ "volume_granularity": 16,
+ },
"info": {
"device_id": device_id,
"type": "WEB",
@@ -122,650 +731,62 @@ class YandexMusic():
headers={
"Sec-WebSocket-Protocol": f"Bearer, v2, {json.dumps(ws_proto)}",
"Origin": "http://music.yandex.ru",
- "Authorization": f"OAuth {self.token}",
- }
+ "Authorization": f"OAuth {self.config['token']}",
+ },
) as ws:
await ws.send_str(json.dumps(payload))
response = await ws.receive()
ynison: dict = json.loads(response.data)
return ynison
-
- async def get_lyrics(self, track_id: int, with_timecodes: bool = False):
- t = (await self.client.tracks(track_id))[0]
- if with_timecodes:
- if t.lyrics_info.has_available_sync_lyrics:
- lyrics = await self.client.tracks_lyrics(track_id, "LRC")
- return {
- "text": requests.get(lyrics.download_url).text,
- "writers": lyrics.writers
- }
- else:
- if t.lyrics_info.has_available_text_lyrics:
- lyrics = await self.client.tracks_lyrics(track_id, "TEXT")
- return {
- "text": requests.get(lyrics.download_url).text,
- "writers": lyrics.writers
- }
- return None
- async def get_now_playing(self):
- ynison = await self._get_ynison()
- if len(ynison.get("player_state", {}).get("player_queue", {}).get("playable_list", [])) == 0:
+ async def __get_now_playing(self):
+ if not self.config["token"]:
+ return {}
+ try:
+ ym_client = await yandex_music.ClientAsync(self.config["token"]).init()
+ except yandex_music.exceptions.UnauthorizedError:
+ return {}
+
+ ynison = await self.__get_ynison()
+ if (len(ynison.get("player_state", {}).get("player_queue", {}).get("playable_list", [])) == 0):
return {}
raw_track = ynison["player_state"]["player_queue"]["playable_list"][
ynison["player_state"]["player_queue"]["current_playable_index"]
]
- return {
- "paused": ynison["player_state"]["status"]["paused"],
- "duration_ms": int(ynison["player_state"]["status"]["duration_ms"]),
- "progress_ms": int(ynison["player_state"]["status"]["progress_ms"]),
- "entity_id": ynison["player_state"]["player_queue"]["entity_id"],
- "entity_type": ynison["player_state"]["player_queue"]["entity_type"],
- "playable_id": raw_track["playable_id"],
- "device": [
- x for x in ynison['devices']
- if x['info']['device_id'] == ynison.get('active_device_id_optional', "")
- ],
- "track": (await self.client.tracks(raw_track["playable_id"]))[0]
- } if raw_track['playable_type'] != "LOCAL_TRACK" else {}
-
-
-@loader.tds
-class YaMusicMod(loader.Module):
- """The module for Yandex.Music streaming service"""
- strings = {"name": "YaMusic"}
- strings_ru = {"_cls_doc": "Модуль для стримингового сервиса Яндекс.Музыка"}
-
- def __init__(self):
- self.config = loader.ModuleConfig(
- loader.ConfigValue(
- "token",
- None,
- lambda: self.strings["_cfg"]["token"],
- validator=loader.validators.Hidden()
- ),
- loader.ConfigValue(
- "now_playing_text",
- "🎧 {performer} — {title}\n\n" \
- "⌨️ Now is listening on " \
- "{device} (🔊 " \
- "{volume}%)\n🗂 Playing from: " \
- "{playing_from}\n\n🎵 {link} | " \
- "song.link",
- lambda: self.strings["_cfg"]["now_playing_text"],
- validator=loader.validators.String()
- ),
- loader.ConfigValue(
- "autobio",
- "🎧 {artist} - {title}",
- lambda: self.strings["_cfg"]["autobio"],
- validator=loader.validators.String()
- ),
- loader.ConfigValue(
- "no_playing_bio",
- "Hello!",
- lambda: self.strings["_cfg"]["no_playing_bio"],
- validator=loader.validators.String()
- ),
- loader.ConfigValue(
- "banner_version",
- "new",
- "Version of track banner (old/new)",
- validator=loader.validators.Choice(["new", "old"])
- )
+ ym_client = await yandex_music.ClientAsync(self.config["token"]).init()
+ track_object = (await ym_client.tracks(raw_track["playable_id"]))[0]
+ return (
+ {
+ "paused": ynison["player_state"]["status"]["paused"],
+ "playable_id": raw_track["playable_id"],
+ "duration_ms": int(ynison["player_state"]["status"]["duration_ms"]),
+ "progress_ms": int(ynison["player_state"]["status"]["progress_ms"]),
+ "entity_id": ynison["player_state"]["player_queue"]["entity_id"],
+ "entity_type": ynison["player_state"]["player_queue"]["entity_type"],
+ "device": [
+ x
+ for x in ynison["devices"]
+ if x["info"]["device_id"]
+ == ynison.get("active_device_id_optional", "")
+ ],
+ "track": {
+ "track_id": track_object.track_id,
+ "album_id": track_object.albums[0].id,
+ "title": track_object.title,
+ "artist": track_object.artists_name(),
+ "img": f"https://{track_object.cover_uri[:-2]}1000x1000",
+ "duration": track_object.duration_ms // 1000,
+ "minutes": round(track_object.duration_ms / 1000) // 60,
+ "seconds": round(track_object.duration_ms / 1000) % 60,
+ "download_link": (
+ (
+ await ym_client.tracks_download_info(
+ track_object.track_id, get_direct_links=True
+ )
+ )[0].direct_link
+ ),
+ },
+ }
+ if raw_track["playable_type"] != "LOCAL_TRACK"
+ else {}
)
-
- async def on_dlmod(self):
- if not self.get("guide_send", False):
- await self.inline.bot.send_message(self._tg_id, self.strings("iguide"))
- self.set("guide_send", True)
-
- async def client_ready(self, client, db):
- self._client = client
- self._db = db
-
- me = await self._client.get_me()
- self._premium = me.premium if hasattr(me, "premium") else False
- self.premium_check.start()
-
- if self.get("autobio", False):
- self.autobio.start()
-
-
- @loader.loop(1800)
- async def premium_check(self):
- me = await self._client.get_me()
- self._premium = me.premium if hasattr(me, "premium") else False
-
-
- @loader.loop(30)
- async def autobio(self):
- if not self.config['token']:
- self.autobio.stop(); self.set("autobio", False)
- return
- ym = await YandexMusic(self.config['token']).init()
- now = await ym.get_now_playing()
- if now and (not now['paused']):
- out = self.config['autobio'].format(
- title=now['track'].title,
- artist=", ".join([x.name for x in now['track'].artists])
- )[:(140 if self._premium else 70)]
- try:
- await self._client(
- telethon.functions.account.UpdateProfileRequest(about=out)
- )
- except telethon.errors.rpcerrorlist.FloodWaitError as e:
- logger.info(f"Sleeping {max(e.seconds, 60)} because of floodwait")
- await asyncio.sleep(max(e.seconds, 60))
-
-
- @loader.command(
- ru_doc="👉 Гайд по получению токена Яндекс.Музыки",
- alias="yg"
- )
- async def yguidecmd(self, message: telethon.types.Message):
- """👉 Guide for obtaining a Yandex.Music token"""
- await utils.answer(message, self.strings("guide"))
-
-
- @loader.command(
- ru_doc="👉 Включить/выключить автобио",
- alias="yb"
- )
- async def ybiocmd(self, message: telethon.types.Message):
- """👉 Enable/disable autobio"""
-
- if (not self.config['token']) and self.get("autobio", False):
- return await utils.answer(message, self.strings("no_token"))
-
- bio = not self.get("autobio", False)
- self.set("autobio", bio)
- if bio: self.autobio.start()
- else:
- self.autobio.stop()
- try:
- await self._client(
- telethon.functions.account.UpdateProfileRequest(
- about=self.config['no_playing_bio'][:(140 if self._premium else 70)]
- )
- )
- except: pass
-
- await utils.answer(
- message,
- self.strings("autobio")['e' if bio else 'd']
- )
-
-
- @loader.command(
- ru_doc="👉 Получить трек, который играет сейчас (с файлом трека)",
- alias="ynt"
- )
- async def ynowtcmd(self, message: telethon.types.Message):
- """👉 Get now playing track (with track file)"""
-
- if not self.config['token']:
- return await utils.answer(message, self.strings("no_token"))
- ym = await YandexMusic(self.config['token']).init()
- now = await ym.get_now_playing()
- if not now:
- return await utils.answer(message, self.strings("there_is_no_playing"))
- if now['entity_type'] not in self.strings("queue_types").keys():
- now['entity_type'] = "VARIOUS"
-
- playlist_name = ""
- if now['entity_type'] == "PLAYLIST":
- playlist = (await ym.client.playlists_list(now['entity_id']))[0]
- playlist_name = f"{playlist.title}"
- if now['entity_type'] == "ALBUM":
- album = (await ym.client.albums(now['entity_id']))[0]
- playlist_name = f"{album.title}"
- if now['entity_type'] == "ARTIST":
- artist = (await ym.client.artists(now['entity_id']))[0]
- playlist_name = f"{artist.name}"
-
- device, volume = "Unknown Device", "❓"
- if now['device']:
- device=now['device'][0]['info']['title']
- volume=round(now['device'][0]['volume']*100, 2)
-
- out = self.config['now_playing_text'].format(
- title=now['track'].title,
- performer=", ".join([x.name for x in now['track'].artists]),
- device=device, volume=volume,
- playing_from=self.strings("queue_types").get(now['entity_type']).format(playlist_name),
- track_id=now['track'].id,
- album_id=now['track'].albums[0].id,
- link=f"Яндекс.Музыка"
- )
- await utils.answer(
- message, out+self.strings("downloading")
- )
-
- audio = io.BytesIO((await utils.run_sync(requests.get, (await ym.client.tracks_download_info(now['track'].id, get_direct_links=True))[0].direct_link)).content)
- audio.name = "audio.mp3"
- await utils.answer(
- message=message, response=out,
- file=audio,
- attributes=([
- telethon.types.DocumentAttributeAudio(
- duration=now['track'].duration_ms // 1000,
- title=now['track'].title,
- performer=", ".join([x.name for x in now['track'].artists])
- )
- ])
- )
-
-
- @loader.command(
- ru_doc="👉 Получить баннер трека, который играет сейчас",
- alias="yn"
- )
- async def ynowcmd(self, message: telethon.types.Message):
- """👉 Get now playing track's banner"""
-
- if not self.config['token']:
- return await utils.answer(message, self.strings("no_token"))
- ym = await YandexMusic(self.config['token']).init()
- now = await ym.get_now_playing()
- if not now:
- return await utils.answer(message, self.strings("there_is_no_playing"))
- if now['entity_type'] not in self.strings("queue_types").keys():
- now['entity_type'] = "VARIOUS"
-
- playlist_name = ""
- if now['entity_type'] == "PLAYLIST":
- playlist = (await ym.client.playlists_list(now['entity_id']))[0]
- playlist_name = f"{playlist.title}"
- if now['entity_type'] == "ALBUM":
- album = (await ym.client.albums(now['entity_id']))[0]
- playlist_name = f"{album.title}"
- if now['entity_type'] == "ARTIST":
- artist = (await ym.client.artists(now['entity_id']))[0]
- playlist_name = f"{artist.name}"
-
- device, volume = "Unknown Device", "❓"
- if now['device']:
- device=now['device'][0]['info']['title']
- volume=round(now['device'][0]['volume']*100, 2)
-
- out = self.config['now_playing_text'].format(
- title=now['track'].title,
- performer=", ".join([x.name for x in now['track'].artists]),
- device=device, volume=volume,
- playing_from=self.strings("queue_types").get(now['entity_type']).format(playlist_name),
- track_id=now['track'].id,
- album_id=now['track'].albums[0].id,
- link=f"Яндекс.Музыка"
- )
- await utils.answer(
- message, out+self.strings("uploading_banner")
- )
-
- lyrics = await ym.get_lyrics(now['track'].id, True)
- func = self.__create_banner if self.config['banner_version'] == "new" else self.__create_banner_old
- file = func(
- now['track'].title, [x.name for x in now['track'].artists],
- now['duration_ms'], now['progress_ms'],
- requests.get(f"https://{now['track'].cover_uri[:-2]}1000x1000").content,
- lyrics['text'] if lyrics else None
- )
- await utils.answer(
- message=message, response=out, file=file
- )
-
-
- @loader.command(
- ru_doc="👉 Лайкнуть играющий сейчас трек"
- )
- async def ylikecmd(self, message: telethon.types.Message):
- """👉 Like now playing track's banner"""
-
- if not self.config['token']:
- return await utils.answer(message, self.strings("no_token"))
- ym = await YandexMusic(self.config['token']).init()
- now = await ym.get_now_playing()
- if not now:
- return await utils.answer(message, self.strings("there_is_no_playing"))
-
- await ym.client.users_likes_tracks_add(now['track'].id)
- await utils.answer(
- message, self.strings("likes")['liked'].format(
- track_id=now['track'].id, album_id=now['track'].albums[0].id,
- track=f"{', '.join([x.name for x in now['track'].artists])} — {now['track'].title}"
- )
- )
-
- @loader.command(
- ru_doc="👉 Убрать лайк с играющего сейчас трека"
- )
- async def yunlikecmd(self, message: telethon.types.Message):
- """👉 Unlike now playing track"""
-
- if not self.config['token']:
- return await utils.answer(message, self.strings("no_token"))
- ym = await YandexMusic(self.config['token']).init()
- now = await ym.get_now_playing()
- if not now:
- return await utils.answer(message, self.strings("there_is_no_playing"))
-
- await ym.client.users_likes_tracks_remove(now['track'].id)
- await utils.answer(
- message, self.strings("likes")['unliked'].format(
- track_id=now['track'].id, album_id=now['track'].albums[0].id,
- track=f"{', '.join([x.name for x in now['track'].artists])} — {now['track'].title}"
- )
- )
-
- @loader.command(
- ru_doc="👉 Дизлайкнуть играющий сейчас трек",
- alias="ydis"
- )
- async def ydislikecmd(self, message: telethon.types.Message):
- """👉 Dislike now playing track"""
-
- if not self.config['token']:
- return await utils.answer(message, self.strings("no_token"))
- ym = await YandexMusic(self.config['token']).init()
- now = await ym.get_now_playing()
- if not now:
- return await utils.answer(message, self.strings("there_is_no_playing"))
-
- await ym.client.users_dislikes_tracks_add(now['track'].id)
- await utils.answer(
- message, self.strings("likes")['disliked'].format(
- track_id=now['track'].id, album_id=now['track'].albums[0].id,
- track=f"{', '.join([x.name for x in now['track'].artists])} — {now['track'].title}"
- )
- )
-
-
- @loader.command(
- ru_doc="👉 Получить текст играющего сейчас трека"
- )
- async def ylyricscmd(self, message: telethon.types.Message):
- """👉 Get lyrics of the now playing track"""
-
- if not self.config['token']:
- return await utils.answer(message, self.strings("no_token"))
- ym = await YandexMusic(self.config['token']).init()
- now = await ym.get_now_playing()
- if not now:
- return await utils.answer(message, self.strings("there_is_no_playing"))
-
- lyrics = await ym.get_lyrics(now['playable_id'])
- if lyrics:
- await utils.answer(
- message, self.strings("lyrics").format(
- track_id=now['track'].id, album_id=now['track'].albums[0].id,
- track=f"{', '.join([x.name for x in now['track'].artists])} — {now['track'].title}",
- text=lyrics['text'],
- writers=", ".join(lyrics['writers'])
- )
- )
- else:
- await utils.answer(
- message, self.strings("no_lyrics").format(
- track_id=now['track'].id, album_id=now['track'].albums[0].id,
- track=f"{', '.join([x.name for x in now['track'].artists])} — {now['track'].title}"
- )
- )
-
-
- @loader.command(
- ru_doc="<запрос> 👉 Поиск трека в Яндекс.Музыке",
- alias="yq"
- )
- async def ysearchcmd(self, message: telethon.types.Message):
- """ 👉 Search track in Yandex.Music"""
-
- if not self.config['token']:
- return await utils.answer(message, self.strings("no_token"))
- ym = await YandexMusic(self.config['token']).init()
-
- query = utils.get_args_raw(message)
- if not query:
- await utils.answer(message, self.strings("args"))
- return
-
- message = await utils.answer(message, self.strings("searching"))
-
- search = await ym.client.search(query, type_="track")
- if (not search.tracks) or (len(search.tracks.results) == 0):
- return await utils.answer(message, self.strings("404"))
-
- out = self.strings("search").format(
- title=search.tracks.results[0].title + (
- f" ({search.tracks.results[0].version})" if search.tracks.results[0].version else ""
- ),
- performer=", ".join([x.name for x in search.tracks.results[0].artists]),
- album_id=search.tracks.results[0].albums[0].id, track_id=search.tracks.results[0].id
- )
- message = await utils.answer(message, out+self.strings("downloading"))
-
- info = await ym.client.tracks_download_info(search.tracks.results[0].id, True)
- link = info[0].direct_link
- audio = None
- audio = io.BytesIO((await utils.run_sync(requests.get, link)).content)
- audio.name = "audio.mp3"
-
- await utils.answer(
- message=message, response=out,
- file=audio,
- attributes=([
- telethon.types.DocumentAttributeAudio(
- duration=int(search.tracks.results[0].duration_ms / 1000),
- title=search.tracks.results[0].title,
- performer=", ".join([x.name for x in search.tracks.results[0].artists])
- )
- ])
- )
-
-
- def __create_banner(
- self,
- title: str, artists: list,
- duration: int, progress: int,
- track_cover: bytes, lyrics: str,
- ):
- # ——————————————— CONSTS ———————————————
- W, H = 1920, 768
- title_font = ImageFont.truetype(io.BytesIO(requests.get(
- "https://raw.githubusercontent.com/kamekuro/assets/master/fonts/Onest-Bold.ttf"
- ).content), 55)
- artist_font = ImageFont.truetype(io.BytesIO(requests.get(
- "https://raw.githubusercontent.com/kamekuro/assets/master/fonts/Onest-Bold.ttf"
- ).content), 46)
- time_font = ImageFont.truetype(io.BytesIO(requests.get(
- "https://raw.githubusercontent.com/kamekuro/assets/master/fonts/Onest-Bold.ttf"
- ).content), 36)
- lyrics_font = ImageFont.truetype(io.BytesIO(requests.get(
- "https://raw.githubusercontent.com/kamekuro/assets/master/fonts/YSMusic-HeadlineBold.ttf"
- ).content), 75)
- nlyrics_font = ImageFont.truetype(io.BytesIO(requests.get(
- "https://raw.githubusercontent.com/kamekuro/assets/master/fonts/YSMusic-HeadlineBold.ttf"
- ).content), 60)
- def measure(t: str, f: ImageFont.FreeTypeFont, d: ImageDraw.ImageDraw):
- bb = d.textbbox((0, 0), t, font=f)
- return bb[2] - bb[0], bb[3] - bb[1]
-
- # ——————————————— BACKGROUND ———————————————
- track_cov = Image.open(io.BytesIO(track_cover)).convert("RGBA")
- banner = (
- track_cov.resize((W, W))
- .crop((0, (W-H) // 2, W, ((W-H) // 2) + H))
- .filter(ImageFilter.GaussianBlur(radius=14))
- )
- banner = ImageEnhance.Brightness(banner).enhance(0.3)
- draw = ImageDraw.Draw(banner)
-
- # ——————————————— TRACK COVER ———————————————
- track_cov = track_cov.resize((H-350, H-350))
- mask = Image.new("L", track_cov.size, 0)
- ImageDraw.Draw(mask).rounded_rectangle(
- (0, 0, track_cov.size[0], track_cov.size[1]), radius=35, fill=255
- )
- track_cov.putalpha(mask)
- track_cov = track_cov.crop(track_cov.getbbox())
- banner.paste(track_cov, (175, 175), mask)
-
- # ——————————————— ARTIST & TITLE ———————————————
- text_width, _ = measure(f"{', '.join(artists)} — {title}", title_font, draw)
- if text_width > 1680:
- lines = [f"{title}", f"{', '.join(artists)}"]
- lsizes = [measure(lines[0], title_font, draw), measure(lines[1], artist_font, draw)]
- else:
- lines = [f"{', '.join(artists)} — {title}"]
- lsizes = [measure(lines[0], title_font, draw)]
- text_h = sum(th for _, th in lsizes) + (len(lines) - 1)
- text_y = (150 - text_h) / 2
- for i, (l, (lw, lh)) in enumerate(zip(lines, lsizes)):
- if len(lines) == 2 and i == 1:
- ftu = artist_font
- else:
- ftu = title_font
- if lw > 1680:
- while lw > 1680 and len(l) > 3:
- l = l[:-4] + "…"
- lw, _ = measure(l, ftu, draw)
- tx = (W - lw) / 2
- draw.text((tx, text_y), l, font=ftu, fill="#A0A0A0")
- text_y += lh + 5
-
- # ——————————————— LYRICS ———————————————
- if lyrics:
- lyrics_lines = []
- for match in re.finditer(r"\[(\d{2}):(\d{2}\.\d{2})\] (.+)", lyrics):
- minutes = int(match.group(1))
- seconds = float(match.group(2))
- text = match.group(3)
- time_ms = int((minutes * 60 + seconds) * 1000)
- lyrics_lines.append((time_ms, text))
- llast, lnext = "", ""
- for i, (time_ms, text) in enumerate(lyrics_lines):
- if time_ms <= progress:
- llast = text
- if i+1 < len(lyrics_lines):
- lnext = lyrics_lines[i+1][1]
- else:
- break
- y_start = None
- if llast:
- lines = textwrap.wrap(llast, width=23)
- if len(lines) > 3:
- lines = lines[:3]
- lines[-1] += "…"
- lines_sizes = [draw.textbbox((0, 0), l, font=lyrics_font) for l in lines]
- line_heights = [bb[3] - bb[1] for bb in lines_sizes]
- total_text_height = sum(line_heights) + (len(lines) - 1) * 10
- y_start = (150 + (track_cov.size[0]-total_text_height)) / 2
- for i, line in enumerate(lines):
- lw = lines_sizes[i][2] - lines_sizes[i][0]
- tx = (track_cov.size[0]+325 + ((W-track_cov.size[0]+285) - lw)) / 2
- draw.text((tx, y_start), line, font=lyrics_font, fill="#FFFFFF")
- y_start += line_heights[i] + 10
- if lnext:
- next_lines = textwrap.wrap(lnext, width=23)
- if len(next_lines) > 2:
- next_lines = next_lines[:2]
- next_lines[-1] += "…"
- next_sizes = [draw.textbbox((0, 0), l, font=nlyrics_font) for l in next_lines]
- next_heights = [bb[3] - bb[1] for bb in next_sizes]
- total_text_height = sum(next_heights) + (len(next_lines) - 1) * 10
- if not y_start:
- y_start = (150 + (track_cov.size[0] - total_text_height))/2 + 150
- for j, line in enumerate(next_lines):
- lw = next_sizes[j][2] - next_sizes[j][0]
- tx = (track_cov.size[0] + 325 + ((W - track_cov.size[0] + 285) - lw)) / 2
- draw.text((tx, y_start + 40), line, font=nlyrics_font, fill="#A0A0A0")
- y_start += next_heights[j] + 10
-
- # ——————————————— STATUS BAR ———————————————
- draw.rounded_rectangle([75, 700, 768 + 1072, 700 + 15], radius=15 // 2, fill="#A0A0A0")
- draw.rounded_rectangle([75, 700, 768 + int(1072 * (progress / duration)), 700 + 15], radius=15 // 2, fill="#FFFFFF")
- draw.text((75, 650), f"{(progress//1000//60):02}:{(progress//1000%60):02}", font=time_font, fill="#FFFFFF")
- draw.text((1745, 650), f"{(duration//1000//60):02}:{(duration//1000%60):02}", font=time_font, fill="#FFFFFF")
-
- # ——————————————— SAVE ———————————————
- by = io.BytesIO()
- banner.save(by, format="PNG"); by.seek(0)
- by.name = "banner.png"
- return by
-
-
- def __create_banner_old(
- self,
- title: str, artists: list,
- duration: int, progress: int,
- track_cover: bytes,
- *args, **kwargs
- ):
- w, h = 1920, 768
- title_font = ImageFont.truetype(io.BytesIO(requests.get(
- "https://raw.githubusercontent.com/kamekuro/assets/master/fonts/Onest-Bold.ttf"
- ).content), 80)
- art_font = ImageFont.truetype(io.BytesIO(requests.get(
- "https://raw.githubusercontent.com/kamekuro/assets/master/fonts/Onest-Regular.ttf"
- ).content), 55)
- time_font = ImageFont.truetype(io.BytesIO(requests.get(
- "https://raw.githubusercontent.com/kamekuro/assets/master/fonts/Onest-Bold.ttf"
- ).content), 36)
-
- # Gen banner (bg)
- track_cov = Image.open(io.BytesIO(track_cover)).convert("RGBA")
- banner = track_cov.resize((w, w)).crop(
- (0, (w-h)//2, w, ((w-h)//2)+h)
- ).filter(ImageFilter.GaussianBlur(radius=14))
- banner = ImageEnhance.Brightness(banner).enhance(0.3)
-
- # Gen track cover and put to bg
- track_cov = track_cov.resize((banner.size[1]-150, banner.size[1]-150))
- mask = Image.new("L", track_cov.size, 0)
- ImageDraw.Draw(mask).rounded_rectangle((0, 0, track_cov.size[0], track_cov.size[1]), radius=35, fill=255)
- track_cov.putalpha(mask)
- track_cov = track_cov.crop(track_cov.getbbox())
- banner.paste(track_cov, (75, 75), mask)
-
- # Editing text
- title_lines = textwrap.wrap(title, 23)
- if len(title_lines) > 1:
- title_lines[1] = title_lines[1] + "..." if len(title_lines) > 2 else title_lines[1]
- title_lines = title_lines[:2]
- artists_lines = textwrap.wrap(" • ".join(artists), width=40)
- if len(artists_lines) > 1:
- for index, art in enumerate(artists_lines):
- if "•" in art[-2:]:
- artists_lines[index] = art[:art.rfind("•") - 1]
-
- # Put title and artists to banner
- draw = ImageDraw.Draw(banner)
- x, y = 150+track_cov.size[0], 110
- for index, line in enumerate(title_lines):
- draw.text((x, y), line, font=title_font, fill="#FFFFFF")
- if index != len(title_lines)-1:
- y += 70
- x, y = 150+track_cov.size[0], 110*2
- if len(title_lines) > 1: y += 70
- for index, line in enumerate(artists_lines):
- draw.text((x, y), line, font=art_font, fill="#A0A0A0")
- if index != len(artists_lines)-1:
- y += 50
-
- # Drawing status bar
- draw.rounded_rectangle([768, 650, 768+1072, 650+15], radius=15//2, fill="#A0A0A0")
- draw.rounded_rectangle([768, 650, 768+int(1072*(progress/duration)), 650+15], radius=15//2, fill="#FFFFFF")
- draw.text((768, 600), f"{(progress//1000//60):02}:{(progress//1000%60):02}", font=time_font, fill="#FFFFFF")
- draw.text((1745, 600), f"{(duration//1000//60):02}:{(duration//1000%60):02}", font=time_font, fill="#FFFFFF")
-
- by = io.BytesIO()
- banner.save(by, format="PNG"); by.seek(0)
- by.name = "banner.png"
- return by
\ No newline at end of file