diff --git a/ZetGoHack/nullmod/Gradientor.py b/ZetGoHack/nullmod/Gradientor.py new file mode 100644 index 0000000..91ba9cb --- /dev/null +++ b/ZetGoHack/nullmod/Gradientor.py @@ -0,0 +1,246 @@ +#░░░███░███░███░███░███ +#░░░░░█░█░░░░█░░█░░░█░█ +#░░░░█░░███░░█░░█░█░█░█ +#░░░█░░░█░░░░█░░█░█░█░█ +#░░░███░███░░█░░███░███ + +# meta developer: @ZetGo + +__version__ = (1, 0, 0) + +import io +import math + +from PIL import Image, ImageDraw + +from herokutl.tl.custom import Message +from herokutl.tl.functions.help import ( + GetPeerProfileColorsRequest +) +from herokutl.tl.types import ( + EmojiStatusCollectible +) + +from .. import loader, utils + +def resize_image(image: Image.Image, max_size: int = 1280) -> Image.Image: + w, h = image.size + if max(w, h) <= max_size: + return image + else: + scale = max_size / max(w, h) + new_w = int(w * scale) + new_h = int(h * scale) + + return image.resize((new_w, new_h), Image.LANCZOS) + +# Source: https://gist.github.com/weihanglo/1e754ec47fdd683a42fdf6a272904535#file-draw_gradient_pillow-py +def get_gradient(size: tuple, color1: tuple, color2: tuple, gradient_type: str = "linear") -> Image.Image: + def interpolate(f_co, t_co, interval): + if interval <= 1: + yield list(t_co) + return + + det_co = [(t - f) / (interval - 1) for f, t in zip(f_co, t_co)] + for i in range(interval): + yield [round(f + det * i) for f, det in zip(f_co, det_co)] + + gradient = Image.new('RGB', size, color=(0, 0, 0)) + draw = ImageDraw.Draw(gradient) + + if gradient_type == "linear": + top_color, bottom_color = color1, color2 + + for y, color in enumerate(interpolate(top_color, bottom_color, max(1, size[1]))): + draw.line([(0, y), (size[0], y)], fill=tuple(color), width=1) + + elif gradient_type == "radial": + center_color, edge_color = color1, color2 + + max_radius = math.hypot(size[0], size[1]) / 2.0 + interval = max(1, int(math.ceil(max_radius)) + 1) + + colors = list(interpolate(center_color, edge_color, interval)) + + cx = size[0] / 2 + cy = size[1] / 2 + + for r_index, color in enumerate(colors): + r = interval - 1 - r_index + if r < 0: + continue + bbox = [ + int(round(cx - r)), + int(round(cy - r)), + int(round(cx + r)), + int(round(cy + r)) + ] + draw.ellipse(bbox, fill=tuple(color)) + + return gradient + +def set_gradient(im: io.BytesIO, gradient: Image.Image) -> io.BytesIO: + img = resize_image(Image.open(im).convert('RGBA')) + + max_size = max(img.width, img.height) + gradient = gradient.resize((max_size, max_size), Image.LANCZOS).convert('RGBA') + left = (max_size - img.width) // 2 + top = (max_size - img.height) // 2 + gradient.paste(img, (left, top), img) + buffer = io.BytesIO() + + gradient.save(buffer, format='PNG') + + buffer.seek(0) + return buffer + +def crop_by_bbox(img: Image.Image): + img_w, img_h = img.size + x, y, w, h = BBOX_TGA_TGD + + left = int(round(x * img_w)) + top = int(round(y * img_h)) + right = int(round((x + w) * img_w)) + bottom = int(round((y + h) * img_h)) + + return img.crop((left, top, right, bottom)) + + +def hex_to_rgb(value: int): + return ((value >> 16) & 255, (value >> 8) & 255, value & 255) + +def hexes_to_rgbs(value: list): + if len(value) > 1: + res = list() + for i in value: + res.append(hex_to_rgb(i)) + + return tuple(res) + else: + res = hex_to_rgb(value[0]) + return (res, res) + +SHAPES = { + # TODO: фигуры для создания масок на авы +} + +BBOX_TGA_TGD = ( + 2894 / 8268, + 1260 / 8268, + 2504 / 8268, + 2504 / 8268, +) + + +@loader.translatable_docstring +class Gradientor(loader.Module): + strings = { + "name": "Gradientor", + "_cls_doc": "A module to create your profile picture with a background from your profile", + "gradient_creating": "🔁 Creating gradient...", + "gradient_created": " Gradient created!", + } + strings_ru = { + "_cls_doc": "Модуль для создания вашей аватарки на фоне из вашего профиля", + "gradient_creating": "🔁 Создание градиента...", + "gradient_created": " Градиент создан!", + } + + async def client_ready(self): + self.colors = self.get("PROFILE_COLORS", None) + if not self.colors: + raw_colors = (await self.client(GetPeerProfileColorsRequest(0))).colors + self.colors = { + str(col.color_id): hexes_to_rgbs(col.dark_colors.bg_colors) for col + in raw_colors + } + + self.set("PROFILE_COLORS", self.colors) + + @loader.command( + ru_doc="[фотография/reply] - создать аватарку с градиентом из цвета профиля\n" + "--update-cache - обновить кеш профиля, если вы только что сменили фон профиля\n" + "--linear - использовать линейный градиент" + ) + async def makepp(self, message: Message): + """[photo/reply] - create a profile picture with a gradient from profile color\n + --update-cache - update profile cache if you just changed profile background\n + --linear - use linear gradient""" + reply: Message = await message.get_reply_message() + args = utils.get_args(message) + + if "--update-cache" in args: + upd_cache = True + args.remove("--update-cache") + else: + upd_cache = False + + if "--linear" in args: + force_linear = True + args.remove("--linear") + else: + force_linear = False + + user = None + background_only = False + + if args: + user = await self.client.get_entity(int(args[0]) if args[0].isdigit() else args[0]) + + photo_source = ( + message + if (not reply or not (reply.photo or reply.document and "image/" in getattr(reply.document, "mime_type", ""))) + else reply + ) + if not (photo_source.photo or photo_source.document and "image/" in getattr(photo_source.document, "mime_type", "")): + background_only = True + + if not user: + if upd_cache: + user = self.client.hikka_me = await self.client.get_me() + elif reply: + user = reply.sender + else: + user = self.client.hikka_me + + if not user.premium: + color1, color2 = (28, 28, 28), (28, 28, 28) + + elif user.emoji_status and isinstance(user.emoji_status, EmojiStatusCollectible): + color1, color2 = ( + user.emoji_status.edge_color, user.emoji_status.center_color + ) + color1 = hex_to_rgb(color1) + color2 = hex_to_rgb(color2) + + elif user.profile_color: + color_variant = user.profile_color.color + + color1, color2 = self.colors.get( + str(color_variant), + ((28, 28, 28), (28, 28, 28)) + ) + + else: + color1, color2 = (28, 28, 28), (28, 28, 28) + + await utils.answer(message, self.strings["gradient_creating"]) + + gradient = get_gradient((1280, 1280), color1, color2, "linear" if force_linear else "radial") + gradient = crop_by_bbox(gradient) + + if not background_only: + p_b = await photo_source.download_media(bytes) + p_b_io = io.BytesIO(p_b) + p_b_io.seek(0) + + result = set_gradient(p_b_io, gradient) + + else: + result = io.BytesIO() + gradient.save(result, format='PNG') + result.seek(0) + + result.name = "grad @nullmod.png" + + await utils.answer(message, self.strings["gradient_created"], file=result, force_document=True) diff --git a/ZetGoHack/nullmod/full.txt b/ZetGoHack/nullmod/full.txt index 41a7a9b..bd51138 100644 --- a/ZetGoHack/nullmod/full.txt +++ b/ZetGoHack/nullmod/full.txt @@ -1,3 +1,4 @@ Chess HaremManager -SchedulePlus \ No newline at end of file +SchedulePlus +Gradientor diff --git a/modules.json b/modules.json index e1acf11..4cc5b94 100644 --- a/modules.json +++ b/modules.json @@ -38599,7 +38599,7 @@ "sgamerps": "- Start the game «Rock, Paper, Scissors» | (RU) - Начать игру «Камень, ножницы, бумага» | (UZ) - «Tosh, qog'oz, qaychi» o'yinining boshlanishi | (DE) - Beginn des Spiels «Stein, Papier, Schere» | (ES) - El comienzo del juego «Piedra, papel o tijera»" }, { - "cleargames": "- Complete all running games. | (RU) | (UZ) - Barcha faol o'yinlarni tugatish. | (DE) - Alle aktiven Spiele beenden. | (ES) - Completar todos los juegos en curso." + "cleargames": "- Complete all running games. | (RU) | (UZ) - Barcha faol o'yinlarni tugatish. | (DE) - Alle aktiven Spiele beenden. | (ES) - Completar todos los juegos en curso." } ], "new_commands": [ @@ -38625,7 +38625,7 @@ "original_name": "cleargames", "description": { "default": "- Complete all running games.", - "ru": "", + "ru": "", "uz": "- Barcha faol o'yinlarni tugatish.", "de": "- Alle aktiven Spiele beenden.", "es": "- Completar todos los juegos en curso." @@ -81807,6 +81807,6 @@ }, "meta": { "total_modules": 1017, - "generated_at": "2026-02-05T01:23:15.214009" + "generated_at": "2026-02-06T01:24:40.647732" } } \ No newline at end of file diff --git a/radiocycle/Modules/SpotifyMod.py b/radiocycle/Modules/SpotifyMod.py index 7834a05..218588b 100644 --- a/radiocycle/Modules/SpotifyMod.py +++ b/radiocycle/Modules/SpotifyMod.py @@ -1 +1,1436 @@ -# License Violation. Original module has GPL-v3 but author of nowhere tried to change it to CC BY-ND 4.0 +# █ █ ▀ █▄▀ ▄▀█ █▀█ ▀ +# █▀█ █ █ █ █▀█ █▀▄ █ +# © Copyright 2022 +# +# https://t.me/hikariatama +# +# 🔒 Licensed under the GNU AGPLv3 +# 🌐 https://www.gnu.org/licenses/agpl-3.0.html +# +# You CANNOT edit, distribute or redistribute this file without direct permission from the author. +# +# ORIGINAL MODULE: https://raw.githubusercontent.com/hikariatama/ftg/master/spotify.py + +# ======================================= +# _ __ __ __ _ +# | |/ /___ | \/ | ___ __| |___ +# | ' // _ \ | |\/| |/ _ \ / _` / __| +# | . \ __/ | | | | (_) | (_| \__ \ +# |_|\_\___| |_| |_|\___/ \__,_|___/ +# @ke_mods +# ======================================= +# +# LICENSE: CC BY-ND 4.0 (Attribution-NoDerivatives 4.0 International) +# -------------------------------------- +# https://creativecommons.org/licenses/by-nd/4.0/legalcode +# ======================================= + +# meta developer: @ke_mods +# requires: telethon spotipy pillow requests yt-dlp + +import asyncio +import contextlib +import functools +import io +import logging +import re +import textwrap +import time +import traceback +import os +from types import FunctionType + +import requests +import spotipy +from PIL import Image, ImageDraw, ImageEnhance, ImageFilter, ImageFont, ImageOps +from telethon.errors import FloodWaitError +from telethon.tl.functions.account import UpdateProfileRequest +from telethon.tl.types import Message + +from .. import loader, utils + +logger = logging.getLogger(__name__) +logging.getLogger("spotipy").setLevel(logging.CRITICAL) + +class Banners: + def __init__( + self, + title: str, + artists: list, + duration: int, + progress: int, + track_cover: bytes, + font + ): + self.title = title + self.artists = ", ".join(artists) if isinstance(artists, list) else artists + self.duration = duration + self.progress = progress + self.track_cover = track_cover + self.font_url = font + + def _get_font(self, size, font_bytes): + return ImageFont.truetype(io.BytesIO(font_bytes), size) + + def _prepare_cover(self, size, radius): + cover = Image.open(io.BytesIO(self.track_cover)).convert("RGBA") + cover = cover.resize((size, size), Image.Resampling.LANCZOS) + + mask = Image.new("L", (size, size), 0) + draw = ImageDraw.Draw(mask) + draw.rounded_rectangle((0, 0, size, size), radius=radius, fill=255) + + output = Image.new("RGBA", (size, size), (0, 0, 0, 0)) + output.paste(cover, (0, 0), mask=mask) + return output + + def _prepare_background(self, w, h): + bg = Image.open(io.BytesIO(self.track_cover)).convert("RGBA") + bg = bg.resize((w, h), Image.Resampling.BICUBIC) + bg = bg.filter(ImageFilter.GaussianBlur(radius=40)) + bg = ImageEnhance.Brightness(bg).enhance(0.4) + return bg + + def _draw_progress_bar(self, draw, x, y, w, h, progress_pct, color="white", bg_color="#5e5e5e"): + draw.rounded_rectangle((x, y, x + w, y + h), radius=h/2, fill=bg_color) + + fill_w = int(w * progress_pct) + if fill_w > 0: + draw.rounded_rectangle((x, y, x + fill_w, y + h), radius=h/2, fill=color) + + dot_radius = h * 1.2 + dot_x = x + fill_w + dot_y = y + (h / 2) + + draw.ellipse( + (dot_x - dot_radius, dot_y - dot_radius, dot_x + dot_radius, dot_y + dot_radius), + fill=color + ) + + def horizontal(self): + W, H = 1500, 600 + padding = 60 + cover_size = 480 + + font_bytes = requests.get(self.font_url).content + title_font = self._get_font(55, font_bytes) + artist_font = self._get_font(45, font_bytes) + time_font = self._get_font(25, font_bytes) + + img = self._prepare_background(W, H) + draw = ImageDraw.Draw(img) + + cover = self._prepare_cover(cover_size, 30) + img.paste(cover, (padding, (H - cover_size) // 2), cover) + + text_x = padding + cover_size + 60 + text_y_start = 100 + text_width_limit = W - text_x - padding + + display_title = self.title + while title_font.getlength(display_title) > text_width_limit and len(display_title) > 0: + display_title = display_title[:-1] + if len(display_title) < len(self.title): display_title += "…" + + display_artist = self.artists + while artist_font.getlength(display_artist) > text_width_limit and len(display_artist) > 0: + display_artist = display_artist[:-1] + if len(display_artist) < len(self.artists): display_artist += "…" + + draw.text((text_x, text_y_start), display_title, font=title_font, fill="white") + draw.text((text_x, text_y_start + 70), display_artist, font=artist_font, fill="#B3B3B3") + + cur_time = f"{(self.progress//1000//60):02}:{(self.progress//1000%60):02}" + dur_time = f"{(self.duration//1000//60):02}:{(self.duration//1000%60):02}" + + cur_w = time_font.getlength(cur_time) + dur_w = time_font.getlength(dur_time) + + bar_y = 480 + bar_h = 8 + gap = 25 + + draw.text((text_x, bar_y - 12), cur_time, font=time_font, fill="white") + + bar_start_x = text_x + cur_w + gap + bar_end_x = text_x + text_width_limit - dur_w - gap + bar_w = bar_end_x - bar_start_x + + prog_pct = self.progress / self.duration if self.duration > 0 else 0 + self._draw_progress_bar(draw, bar_start_x, bar_y, bar_w, bar_h, prog_pct) + + draw.text((bar_end_x + gap, bar_y - 12), dur_time, font=time_font, fill="white") + + by = io.BytesIO() + img.save(by, format="PNG") + by.seek(0) + by.name = "banner.png" + return by + + def vertical(self): + W, H = 1000, 1500 + padding = 80 + cover_size = 800 + + font_bytes = requests.get(self.font_url).content + title_font = self._get_font(60, font_bytes) + artist_font = self._get_font(45, font_bytes) + time_font = self._get_font(35, font_bytes) + + img = self._prepare_background(W, H) + draw = ImageDraw.Draw(img) + + cover = self._prepare_cover(cover_size, 40) + cover_x = (W - cover_size) // 2 + cover_y = 120 + img.paste(cover, (cover_x, cover_y), cover) + + text_area_y = cover_y + cover_size + 120 + text_width_limit = W - (padding * 2) + + display_title = self.title + while title_font.getlength(display_title) > text_width_limit and len(display_title) > 0: + display_title = display_title[:-1] + if len(display_title) < len(self.title): display_title += "…" + + display_artist = self.artists + while artist_font.getlength(display_artist) > text_width_limit and len(display_artist) > 0: + display_artist = display_artist[:-1] + if len(display_artist) < len(self.artists): display_artist += "…" + + title_w = title_font.getlength(display_title) + draw.text(((W - title_w) / 2, text_area_y), display_title, font=title_font, fill="white") + + artist_w = artist_font.getlength(display_artist) + draw.text(((W - artist_w) / 2, text_area_y + 75), display_artist, font=artist_font, fill="#B3B3B3") + + bar_y = text_area_y + 260 + bar_h = 8 + bar_w = W - (padding * 2) + prog_pct = self.progress / self.duration if self.duration > 0 else 0 + + self._draw_progress_bar(draw, padding, bar_y, bar_w, bar_h, prog_pct, color="white", bg_color="#5e5e5e") + + cur_time = f"{(self.progress//1000//60):02}:{(self.progress//1000%60):02}" + dur_time = f"{(self.duration//1000//60):02}:{(self.duration//1000%60):02}" + + draw.text((padding, bar_y + 40), cur_time, font=time_font, fill="#B3B3B3") + + dur_w = time_font.getlength(dur_time) + draw.text((W - padding - dur_w, bar_y + 40), dur_time, font=time_font, fill="#B3B3B3") + + by = io.BytesIO() + img.save(by, format="PNG") + by.seek(0) + by.name = "banner.png" + return by + +@loader.tds +class SpotifyMod(loader.Module): + """Card with the currently playing track on Spotify.""" + + strings = { + "name": "SpotifyMod", + "need_auth": ( + " Please execute" + " .sauth before performing this action." + ), + "on-repeat": ( + "🔄 Set on-repeat." + ), + "off-repeat": ( + "🔄 Stopped track" + " repeat." + ), + "skipped": ( + "➡️ Skipped track." + ), + "playing": "▶️ Playing...", + "back": ( + "⬅️ Switched to previous" + " track" + ), + "paused": " Pause", + "restarted": ( + "✅️ Playing track" + " from the" + " beginning" + ), + "liked": ( + "❤️ Liked current" + " playback" + ), + "unlike": ( + "" + " Unliked current playback" + ), + "err": ( + " An error occurred." + "\n{}" + ), + "already_authed": ( + " Already authorized" + ), + "authed": ( + " Authentication" + " successful" + ), + "deauth": ( + "🚪 Successfully logged out" + " of account" + ), + "auth": ( + '🔗 Follow this' + " link, allow access, then enter .scode https://... with" + " the link you received." + ), + "no_music": ( + " No music is playing!" + ), + "dl_err": ( + " Failed to download" + " track." + ), + "volume_changed": ( + "🔊" + " Volume changed to {}%." + ), + "volume_invalid": ( + " Volume level must be" + " a number between 0 and 100." + ), + "volume_err": ( + " An error occurred while" + " changing volume." + ), + "no_volume_arg": ( + " Please specify a" + " volume level between 0 and 100." + ), + "searching_tracks": ( + "🕔 Searching for tracks" + " matching {}..." + ), + "no_search_query": ( + " Please specify a" + " search query." + ), + "no_tracks_found": ( + " No tracks found for" + " {}." + ), + "search_results": ( + " Search results for" + " {}:\n\n{}" + ), + "downloading_search_track": ( + "🕔 Downloading {}..." + ), + "download_success": ( + " Successfully downloaded {} - {}" + ), + "invalid_track_number": ( + " Invalid track number." + " Please search first or provide a valid number from the list." + ), + "device_list": ( + "📄 Available devices:\n{}" + ), + "no_devices_found": ( + " No devices found." + ), + "device_changed": ( + " Playback transferred to" + " {}." + ), + "invalid_device_id": ( + " Invalid device ID." + " Use .sdevice to see available devices." + ), + "search_results_cleared": " Search results cleared", + "autobio": ( + "🎧 Spotify autobio {}" + ), + "no_ytdlp": " yt-dlp not found... Check config or install yt-dlp ({}terminal pip install yt-dlp)", + "snowt_failed": "\n\n Download failed", + "uploading_banner": "\n\n🕔 Uploading banner...", + "downloading_track": "\n\n🕔 Downloading track...", + "no_playlists": " No playlists found.", + "playlists_list": "📄 Your playlists:\n\n{}", + "added_to_playlist": " Added {} to {}", + "removed_from_playlist": " Removed {} from {}", + "invalid_playlist_index": " Invalid playlist number.", + "no_cached_playlists": " Use .splaylists first.", + "playlist_created": " Playlist {} created.", + "playlist_deleted": " Playlist {} deleted.", + "no_playlist_name": " Please specify a playlist name.", + } + + strings_ru = { + "_cls_doc": "Карточка с играющим треком в Spotify.", + "need_auth": ( + " Выполни" + " .sauth перед выполнением этого действия." + ), + "err": ( + " Произошла ошибка." + "\n{}" + ), + "on-repeat": ( + "🔄 Включен повтор трека." + ), + "off-repeat": ( + "🔄 Повтор трека отключён." + ), + "skipped": ( + "➡️ Трек пропущен." + ), + "playing": "▶️ Играет...", + "back": ( + "⬅️ Переключено на предыдущий трек" + ), + "paused": " Пауза", + "restarted": ( + "✅️ Воспроизведение трека с начала..." + ), + "liked": ( + "❤️ Текущий трек добавлен в избранное" + ), + "unlike": ( + " Убрал лайк с текущего трека" + ), + "already_authed": ( + " Уже авторизован" + ), + "authed": ( + " Успешная аутентификация" + ), + "deauth": ( + "🚪 Успешный выход из аккаунта" + ), + "auth": ( + '🔗 Пройдите по этой ссылке, разрешите вход, затем введите .scode https://... с ссылкой которую вы получили.' + ), + "no_music": ( + " Музыка не играет!" + ), + "dl_err": ( + " Не удалось скачать трек." + ), + "volume_changed": ( + "🔊" + " Громкость изменена на {}%." + ), + "volume_invalid": ( + " Уровень громкости должен" + " быть числом от 0 до 100." + ), + "volume_err": ( + " Произошла ошибка при" + " изменении громкости." + ), + "no_volume_arg": ( + " Пожалуйста, укажите" + " уровень громкости от 0 до 100." + ), + "searching_tracks": ( + "🕔 Идет поиск треков" + " по запросу {}..." + ), + "no_search_query": ( + " Пожалуйста, укажите" + " поисковый запрос." + ), + "no_tracks_found": ( + " По запросу '{}'" + " ничего не найдено." + ), + "search_results": ( + " Результаты поиска" + " по запросу {}:\n\n{}" + ), + "downloading_search_track": ( + "🕔 Скачиваю {}..." + ), + "download_success": ( + " Трек {} - {} успешно скачан." + ), + "invalid_track_number": ( + " Некорректный номер трека." + " Сначала выполните поиск или укажите правильный номер из списка." + ), + "device_list": ( + "📄 Доступные устройства:\n{}" + ), + "no_devices_found": ( + " Устройства не найдены." + ), + "device_changed": ( + " Воспроизведение переключено на" + " {}." + ), + "invalid_device_id": ( + " Некорректный ID устройства." + " Используйте .sdevice , чтобы увидеть доступные устройства." + ), + "search_results_cleared": " Результаты поиска очищены", + "autobio": ( + "🎧 Обновление био" + " включено {}" + ), + "no_ytdlp": " yt-dlp не найден... Проверьте конфиг или установите yt-dlp ({}terminal pip install yt-dlp)", + "snowt_failed": "\n\n Ошибка скачивания.", + "uploading_banner": "\n\n🕔 Загрузка баннера...", + "downloading_track": "\n\n🕔 Скачивание трека...", + "no_playlists": " Плейлисты не найдены.", + "playlists_list": "📄 Ваши плейлисты:\n\n{}", + "added_to_playlist": " Трек {} добавлен в {}", + "removed_from_playlist": " Трек {} удален из {}", + "invalid_playlist_index": " Неверный номер плейлиста.", + "no_cached_playlists": " Сначала используйте .splaylists.", + "playlist_created": " Плейлист {} создан.", + "playlist_deleted": " Плейлист {} удален.", + "no_playlist_name": " Пожалуйста, укажите название плейлиста.", + } + strings_jp = { + "_cls_doc": "Spotify からのメッセージ", + "need_auth": ( + " この操作を行う前に " + ".sauth を実行してください。" + ), + "on-repeat": ( + "🔄 リピート再生を設定しました。" + ), + "off-repeat": ( + "🔄 リピート再生を解除しました。" + ), + "skipped": ( + "➡️ スキップしました。" + ), + "playing": "▶️ 再生中...", + "back": ( + "⬅️ 前のトラックに戻りました。" + ), + "paused": " 一時停止", + "restarted": ( + "✅️ 最初から再生します。" + ), + "liked": ( + "❤️ お気に入りに追加しました。" + ), + "unlike": ( + "" + " お気に入りから削除しました。" + ), + "err": ( + " エラーが発生しました。" + "\n{}" + ), + "already_authed": ( + " 既に認証されています。" + ), + "authed": ( + " 認証に成功しました。" + ), + "deauth": ( + "🚪 ログアウトしました。" + ), + "auth": ( + '🔗 リンクをクリックしてアクセスを許可し、取得したURLを使って .scode https://... を入力してください。' + ), + "no_music": ( + " 音楽は再生されていません!" + ), + "dl_err": ( + " トラックのダウンロードに失敗しました。" + ), + "volume_changed": ( + "🔊" + " 音量を {}% に変更しました。" + ), + "volume_invalid": ( + " 音量は0から100の数字で指定してください。" + ), + "volume_err": ( + " 音量の変更中にエラーが発生しました。" + ), + "no_volume_arg": ( + " 0から100の間で音量を指定してください。" + ), + "searching_tracks": ( + "🕔 {} を検索中..." + ), + "no_search_query": ( + " 検索キーワードを指定してください。" + ), + "no_tracks_found": ( + " {} は見つかりませんでした。" + ), + "search_results": ( + " {} の検索結果:\n\n{}" + ), + "downloading_search_track": ( + "🕔 {} をダウンロード中..." + ), + "download_success": ( + " {} - {} のダウンロードに成功しました。" + ), + "invalid_track_number": ( + " トラック番号が無効です。" + " 先に検索するか、リストから有効な番号を指定してください。" + ), + "device_list": ( + "📄 利用可能なデバイス:\n{}" + ), + "no_devices_found": ( + " デバイスが見つかりません。" + ), + "device_changed": ( + " 再生デバイスを" + " {} に切り替えました。" + ), + "invalid_device_id": ( + " デバイスIDが無効です。" + " .sdevice で利用可能なデバイスを確認してください。" + ), + "search_results_cleared": " 検索結果をクリアしました。", + "autobio": ( + "🎧 Spotify AutoBio: {}" + ), + "no_ytdlp": " yt-dlpが見つかりません... 設定を確認するか、インストールしてください ({}terminal pip install yt-dlp)", + "snowt_failed": "\n\n ダウンロードに失敗しました。", + "uploading_banner": "\n\n🕔 バナーをアップロード中...", + "downloading_track": "\n\n🕔 トラックをダウンロード中...", + "no_playlists": " プレイリストが見つかりません。", + "playlists_list": "📄 あなたのプレイリスト:\n\n{}", + "added_to_playlist": " {} を {} に追加しました。", + "removed_from_playlist": " {} を {} から削除しました。", + "invalid_playlist_index": " プレイリスト番号が無効です。", + "no_cached_playlists": " 先に .splaylists を使用してください。", + "playlist_created": " プレイリスト {} を作成しました。", + "playlist_deleted": " プレイリスト {} を削除しました。", + "no_playlist_name": " プレイリスト名を指定してください。", + } + + def __init__(self): + self._client_id = "e0708753ab60499c89ce263de9b4f57a" + self._client_secret = "80c927166c664ee98a43a2c0e2981b4a" + self.scope = ( + "user-read-playback-state playlist-read-private playlist-read-collaborative" + " user-modify-playback-state user-library-modify" + " playlist-modify-public playlist-modify-private" + ) + self.sp_auth = spotipy.oauth2.SpotifyOAuth( + client_id=self._client_id, + client_secret=self._client_secret, + redirect_uri="https://thefsch.github.io/spotify/", + scope=self.scope, + ) + self.config = loader.ModuleConfig( + loader.ConfigValue( + "show_banner", + True, + "Show banner with track info", + validator=loader.validators.Boolean(), + ), + loader.ConfigValue( + "custom_text", + ( + "🎧 Now playing: {track} — {artists}\n" + "🔗 song.link" + ), + """Custom text, supports {track}, {artists}, {album}, {playlist}, {playlist_owner}, {spotify_url}, {songlink}, {progress}, {duration}, {device} placeholders""", + validator=loader.validators.String(), + ), + loader.ConfigValue( + "font", + "https://raw.githubusercontent.com/kamekuro/assets/master/fonts/Onest-Bold.ttf", + "Custom font. Specify URL to .ttf file", + validator=loader.validators.String(), + ), + loader.ConfigValue( + "auto_bio_template", + "🎧 {}", + lambda: "Template for Spotify AutoBio", + ), + loader.ConfigValue( + "ytdlp_path", + "", + "Path to ytdlp binary", + validator=loader.validators.String(), + ), + loader.ConfigValue( + "banner_version", + "horizontal", + lambda: "Banner version", + validator=loader.validators.Choice(["horizontal", "vertical"]), + ), + ) + + async def client_ready(self, client, db): + self.font_ready = asyncio.Event() + + self._premium = getattr(await client.get_me(), "premium", False) + try: + self.sp = spotipy.Spotify(auth=self.get("acs_tkn")["access_token"]) + except Exception: + self.set("acs_tkn", None) + self.sp = None + + if self.get("autobio", False): + self.autobio.start() + + def tokenized(func) -> FunctionType: + @functools.wraps(func) + async def wrapped(*args, **kwargs): + if not args[0].get("acs_tkn", False) or not args[0].sp: + await utils.answer(args[1], args[0].strings("need_auth")) + return + + return await func(*args, **kwargs) + + wrapped.__doc__ = func.__doc__ + wrapped.__module__ = func.__module__ + + return wrapped + + def error_handler(func) -> FunctionType: + @functools.wraps(func) + async def wrapped(*args, **kwargs): + try: + return await func(*args, **kwargs) + except Exception: + logger.exception(traceback.format_exc()) + with contextlib.suppress(Exception): + await utils.answer( + args[1], + args[0].strings("err").format(traceback.format_exc()), + ) + + wrapped.__doc__ = func.__doc__ + wrapped.__module__ = func.__module__ + + return wrapped + + + @loader.loop(interval=90) + async def autobio(self): + try: + current_playback = self.sp.current_playback() + track = current_playback["item"]["name"] + track = re.sub(r"([(].*?[)])", "", track).strip() + except Exception: + return + + bio = self.config["auto_bio_template"].format(f"{track}") + + try: + await self._client( + UpdateProfileRequest(about=bio[: 140 if self._premium else 70]) + ) + except FloodWaitError as e: + logger.info(f"Sleeping {max(e.seconds, 60)} bc of floodwait") + await asyncio.sleep(max(e.seconds, 60)) + return + + async def _download_track(self, message, query: str, caption: str = ""): + dl_dir = os.path.join(os.getcwd(), "spotifymod") + if not os.path.exists(dl_dir): + os.makedirs(dl_dir, exist_ok=True) + + for f in os.listdir(dl_dir): + try: + os.remove(os.path.join(dl_dir, f)) + except: + pass + + try: + squery = query.replace('"', '').replace("'", "") + + cmd = ( + f'{self.config["ytdlp_path"]} -x --audio-format mp3 --add-metadata ' + f'-o "{dl_dir}/%(title)s [%(id)s].%(ext)s" ' + f'"ytsearch1:{squery}"' + ) + + proc = await asyncio.create_subprocess_shell( + cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + await proc.communicate() + + files = [f for f in os.listdir(dl_dir) if f.endswith(".mp3")] + + if files: + target_file = os.path.join(dl_dir, files[0]) + await utils.answer(message, caption, file=target_file) + else: + await utils.answer(message, self.strings("snowt_failed")) + + except Exception as e: + logger.error(e) + await utils.answer(message, self.strings("dl_err")) + + finally: + if os.path.exists(dl_dir): + for f in os.listdir(dl_dir): + try: + os.remove(os.path.join(dl_dir, f)) + except: + pass + + + @error_handler + @tokenized + @loader.command( + ru_doc="- ➕ Добавить текущий трек в плейлист (используйте номер из .splaylists)" + ) + async def splaylistadd(self, message: Message): + """- ➕ Add current track to playlist (use number from .splaylists)""" + args = utils.get_args_raw(message) + if not args or not args.isdigit(): + await utils.answer(message, self.strings("invalid_playlist_index")) + return + + index = int(args) - 1 + playlists = self.get("last_playlists", []) + + if not playlists: + await utils.answer(message, self.strings("no_cached_playlists")) + return + + if index < 0 or index >= len(playlists): + await utils.answer(message, self.strings("invalid_playlist_index")) + return + + current = self.sp.current_playback() + if not current or not current.get("item"): + await utils.answer(message, self.strings("no_music")) + return + + track_uri = current["item"]["uri"] + track_name = current["item"]["name"] + artists = ", ".join([a["name"] for a in current["item"]["artists"]]) + full_track_name = f"{artists} - {track_name}" + + playlist_id = playlists[index]["id"] + playlist_name = playlists[index]["name"] + + try: + self.sp.playlist_add_items(playlist_id, [track_uri]) + except spotipy.exceptions.SpotifyException as e: + if e.http_status == 403 and "Insufficient client scope" in str(e): + await utils.answer(message, self.strings("need_auth")) + return + raise e + + await utils.answer(message, self.strings("added_to_playlist").format(utils.escape_html(full_track_name), utils.escape_html(playlist_name))) + + @error_handler + @tokenized + @loader.command( + ru_doc="- ➖ Удалить текущий трек из плейлиста (используйте номер из .splaylists)" + ) + async def splaylistrem(self, message: Message): + """- ➖ Remove current track from playlist (use number from .splaylists)""" + args = utils.get_args_raw(message) + if not args or not args.isdigit(): + await utils.answer(message, self.strings("invalid_playlist_index")) + return + + index = int(args) - 1 + playlists = self.get("last_playlists", []) + + if not playlists: + await utils.answer(message, self.strings("no_cached_playlists")) + return + + if index < 0 or index >= len(playlists): + await utils.answer(message, self.strings("invalid_playlist_index")) + return + + current = self.sp.current_playback() + if not current or not current.get("item"): + await utils.answer(message, self.strings("no_music")) + return + + track_uri = current["item"]["uri"] + track_name = current["item"]["name"] + artists = ", ".join([a["name"] for a in current["item"]["artists"]]) + full_track_name = f"{artists} - {track_name}" + + playlist_id = playlists[index]["id"] + playlist_name = playlists[index]["name"] + + try: + self.sp.playlist_remove_all_occurrences_of_items(playlist_id, [track_uri]) + except spotipy.exceptions.SpotifyException as e: + if e.http_status == 403 and "Insufficient client scope" in str(e): + await utils.answer(message, self.strings("need_auth")) + return + raise e + + await utils.answer(message, self.strings("removed_from_playlist").format(utils.escape_html(full_track_name), utils.escape_html(playlist_name))) + + @error_handler + @tokenized + @loader.command( + ru_doc="- 🆕 Создать новый плейлист" + ) + async def splaylistcreate(self, message: Message): + """- 🆕 Create a new playlist""" + name = utils.get_args_raw(message) + if not name: + await utils.answer(message, self.strings("no_playlist_name")) + return + + user_id = self.sp.me()["id"] + self.sp.user_playlist_create(user_id, name) + await utils.answer(message, self.strings("playlist_created").format(utils.escape_html(name))) + + @error_handler + @tokenized + @loader.command( + ru_doc="- 🗑 Удалить плейлист (используйте номер из .splaylists)" + ) + async def splaylistdelete(self, message: Message): + """- 🗑 Delete playlist (use number from .splaylists)""" + args = utils.get_args_raw(message) + if not args or not args.isdigit(): + await utils.answer(message, self.strings("invalid_playlist_index")) + return + + index = int(args) - 1 + playlists = self.get("last_playlists", []) + + if not playlists: + await utils.answer(message, self.strings("no_cached_playlists")) + return + + if index < 0 or index >= len(playlists): + await utils.answer(message, self.strings("invalid_playlist_index")) + return + + playlist_id = playlists[index]["id"] + playlist_name = playlists[index]["name"] + + self.sp.current_user_unfollow_playlist(playlist_id) + await utils.answer(message, self.strings("playlist_deleted").format(utils.escape_html(playlist_name))) + + @error_handler + @tokenized + @loader.command( + ru_doc="- 📃 Получить все плейлисты" + ) + async def splaylists(self, message: Message): + """- 📃 Get all playlists""" + user_id = self.sp.me()["id"] + playlists = self.sp.current_user_playlists() + + editable_playlists = [] + for playlist in playlists["items"]: + if playlist["owner"]["id"] == user_id or playlist["collaborative"]: + editable_playlists.append(playlist) + + self.set("last_playlists", editable_playlists) + + playlist_list_text = "" + for i, playlist in enumerate(editable_playlists): + name = utils.escape_html(playlist["name"]) + url = playlist["external_urls"]["spotify"] + count = playlist["tracks"]["total"] + playlist_list_text += f"{i + 1}. {name} ({count} tracks)\n" + + if not playlist_list_text: + await utils.answer(message, self.strings("no_playlists")) + else: + await utils.answer(message, self.strings("playlists_list").format(playlist_list_text)) + + @error_handler + @tokenized + @loader.command( + ru_doc="- ℹ️ Переключить стриминг воспроизведения в био" + ) + async def sbiocmd(self, message: Message): + """- ℹ️ Toggle bio playback streaming""" + current = self.get("autobio", False) + new = not current + self.set("autobio", new) + await utils.answer( + message, + self.strings("autobio").format("enabled" if new else "disabled"), + ) + + if new: + self.autobio.start() + else: + self.autobio.stop() + + @error_handler + @tokenized + @loader.command( + ru_doc="- 🔊 Изменить громкость. .svolume <0-100>" + ) + async def svolume(self, message: Message): + """- 🔊 Change playback volume. .svolume <0-100>""" + try: + args = utils.get_args_raw(message) + if not args: + await utils.answer(message, self.strings("no_volume_arg")) + return + + volume_percent = int(args) + if 0 <= volume_percent <= 100: + self.sp.volume(volume_percent) + await utils.answer(message, self.strings("volume_changed").format(volume_percent)) + else: + await utils.answer(message, self.strings("volume_invalid")) + except ValueError: + await utils.answer(message, self.strings("volume_invalid")) + except Exception: + await utils.answer(message, self.strings("volume_err")) + + @error_handler + @tokenized + @loader.command( + ru_doc=( + "- 🎵 Выбрать устройство для воспроизведения. Например: .sdevice \n" + "- 📝 Показать список устройств: .sdevice" + ) + ) + async def sdevicecmd(self, message: Message): + """- 🎵 Set preferred playback device. Usage: .sdevice or .sdevice to list devices""" + args = utils.get_args_raw(message) + devices = self.sp.devices()["devices"] + + if not args: + if not devices: + await utils.answer(message, self.strings("no_devices_found")) + return + + device_list_text = "" + for i, device in enumerate(devices): + is_active = "(active)" if device["is_active"] else "" + device_list_text += ( + f"{i+1}. {device['name']}" + f" ({device['type']}) {is_active}\n" + ) + + await utils.answer(message, self.strings("device_list").format(device_list_text.strip())) + return + + device_id = None + try: + device_number = int(args) + if 0 < device_number <= len(devices): + device_id = devices[device_number - 1]["id"] + device_name = devices[device_number - 1]["name"] + else: + await utils.answer(message, self.strings("invalid_device_id")) + return + except ValueError: + found_device = next((d for d in devices if d["id"] == args.strip()), None) + if found_device: + device_id = found_device["id"] + device_name = found_device["name"] + else: + await utils.answer(message, self.strings("invalid_device_id")) + return + + self.sp.transfer_playback(device_id=device_id) + await utils.answer(message, self.strings("device_changed").format(device_name)) + + @error_handler + @tokenized + @loader.command( + ru_doc="- 💫 Включить повтор трека" + ) + async def srepeatcmd(self, message: Message): + """- 💫 Repeat""" + self.sp.repeat("track") + await utils.answer(message, self.strings("on-repeat")) + + @error_handler + @tokenized + @loader.command( + ru_doc="- ✋ Остановить повтор" + ) + async def sderepeatcmd(self, message: Message): + """- ✋ Stop repeat""" + self.sp.repeat("context") + await utils.answer(message, self.strings("off-repeat")) + + @error_handler + @tokenized + @loader.command( + ru_doc="- 👉 Следующий трек" + ) + async def snextcmd(self, message: Message): + """- 👉 Next track""" + self.sp.next_track() + await utils.answer(message, self.strings("skipped")) + + @error_handler + @tokenized + @loader.command( + ru_doc="- 🤚 Продолжить воспроизведение" + ) + async def sresumecmd(self, message: Message): + """- 🤚 Resume""" + self.sp.start_playback() + await utils.answer(message, self.strings("playing")) + + @error_handler + @tokenized + @loader.command( + ru_doc="- 🤚 Пауза" + ) + async def spausecmd(self, message: Message): + """- 🤚 Pause""" + self.sp.pause_playback() + await utils.answer(message, self.strings("paused")) + + @error_handler + @tokenized + @loader.command( + ru_doc="- ⏮ Предыдущий трек" + ) + async def sbackcmd(self, message: Message): + """- ⏮ Previous track""" + self.sp.previous_track() + await utils.answer(message, self.strings("back")) + + @error_handler + @tokenized + @loader.command( + ru_doc="- ⏪ Перезапустить трек" + ) + async def sbegincmd(self, message: Message): + """- ⏪ Restart track""" + self.sp.seek_track(0) + await utils.answer(message, self.strings("restarted")) + + @error_handler + @tokenized + @loader.command( + ru_doc="- ❤️ Лайкнуть играющий трек" + ) + async def slikecmd(self, message: Message): + """- ❤️ Like current track""" + cupl = self.sp.current_playback() + self.sp.current_user_saved_tracks_add([cupl["item"]["id"]]) + await utils.answer(message, self.strings("liked")) + + @error_handler + @tokenized + @loader.command( + ru_doc="- 💔 Убрать лайк с играющего трека" + ) + async def sunlikecmd(self, message: Message): + """- 💔 Unlike current track""" + cupl = self.sp.current_playback() + self.sp.current_user_saved_tracks_delete([cupl["item"]["id"]]) + await utils.answer(message, self.strings("unlike")) + + @error_handler + @loader.command( + ru_doc="- Получить ссылку для авторизации" + ) + async def sauthcmd(self, message: Message): + """- Get authorization link""" + if self.get("acs_tkn", False) and not self.sp: + await utils.answer(message, self.strings("already_authed")) + else: + self.sp_auth.get_authorize_url() + await utils.answer( + message, + self.strings("auth").format(self.sp_auth.get_authorize_url()), + ) + + @error_handler + @loader.command( + ru_doc="- Вставить код авторизации" + ) + async def scodecmd(self, message: Message): + """- Paste authorization code""" + url = message.message.split(" ")[1] + code = self.sp_auth.parse_auth_response_url(url) + self.set("acs_tkn", self.sp_auth.get_access_token(code, True, False)) + self.sp = spotipy.Spotify(auth=self.get("acs_tkn")["access_token"]) + await utils.answer(message, self.strings("authed")) + + @error_handler + @loader.command( + ru_doc="- Выйти из аккаунта" + ) + async def unauthcmd(self, message: Message): + """- Log out of account""" + self.set("acs_tkn", None) + del self.sp + await utils.answer(message, self.strings("deauth")) + + @error_handler + @tokenized + @loader.command( + ru_doc="- Обновить токен авторизации" + ) + async def stokrefreshcmd(self, message: Message): + """- Refresh authorization token""" + self.set( + "acs_tkn", + self.sp_auth.refresh_access_token(self.get("acs_tkn")["refresh_token"]), + ) + self.set("NextRefresh", time.time() + 45 * 60) + self.sp = spotipy.Spotify(auth=self.get("acs_tkn")["access_token"]) + await utils.answer(message, self.strings("authed")) + + @error_handler + @tokenized + @loader.command( + ru_doc="- 🎧 Показать карточку играющего трека" + ) + async def snowcmd(self, message: Message): + """- 🎧 View current track card.""" + current_playback = self.sp.current_playback() + if not current_playback or not current_playback.get("is_playing", False): + await utils.answer(message, self.strings("no_music")) + return + + track = current_playback["item"]["name"] + track_id = current_playback["item"]["id"] + artists = ", ".join([a["name"] for a in current_playback["item"]["artists"]]) + album_name = current_playback["item"]["album"].get("name", "Unknown Album") + duration_ms = current_playback["item"].get("duration_ms", 0) + progress_ms = current_playback.get("progress_ms", 0) + + duration = f"{duration_ms//1000//60}:{duration_ms//1000%60:02}" + progress = f"{progress_ms//1000//60}:{progress_ms//1000%60:02}" + + spotify_url = f"https://open.spotify.com/track/{track_id}" + songlink = f"https://song.link/s/{track_id}" + + try: + device_raw = ( + current_playback["device"]["name"] + + " " + + current_playback["device"]["type"].lower() + ) + device = device_raw.replace("computer", "").replace("smartphone", "").strip() + except Exception: + device = None + + try: + playlist_id = current_playback["context"]["uri"].split(":")[-1] + playlist = self.sp.playlist(playlist_id) + playlist_name = playlist.get("name", None) + try: + playlist_owner = ( + f'' + f'{playlist["owner"]["display_name"]}' + ) + except KeyError: + playlist_owner = playlist.get("owner", {}).get("display_name", "") + except Exception: + playlist_name = "" + playlist_owner = "" + + text = self.config["custom_text"].format( + track=utils.escape_html(track), + artists=utils.escape_html(artists), + album=utils.escape_html(album_name), + duration=duration, + progress=progress, + device=device, + spotify_url=spotify_url, + songlink=songlink, + playlist=utils.escape_html(playlist_name) if playlist_name else "", + playlist_owner=playlist_owner or "", + ) + + if self.config["show_banner"]: + cover_url = current_playback["item"]["album"]["images"][0]["url"] + + tmp_msg = await utils.answer(message, text + self.strings("uploading_banner")) + + banners = Banners( + title=track, + artists=artists, + duration=duration_ms, + progress=progress_ms, + track_cover=requests.get(cover_url).content, + font=self.config["font"], + ) + file = getattr(banners, self.config["banner_version"], banners.horizontal)() + + await utils.answer(tmp_msg, text, file=file) + else: + await utils.answer(message, text) + + @error_handler + @tokenized + @loader.command( + ru_doc="- 🎧 Скачать играющий трек" + ) + async def snowtcmd(self, message: Message): + """- 🎧 Download current track.""" + current_playback = self.sp.current_playback() + if not current_playback or not current_playback.get("is_playing", False): + await utils.answer(message, self.strings("no_music")) + return + + track = current_playback["item"]["name"] + artists = ", ".join([a["name"] for a in current_playback["item"]["artists"]]) + album_name = current_playback["item"]["album"].get("name", "Unknown Album") + duration_ms = current_playback["item"].get("duration_ms", 0) + progress_ms = current_playback.get("progress_ms", 0) + + duration = f"{duration_ms//1000//60}:{duration_ms//1000%60:02}" + progress = f"{progress_ms//1000//60}:{progress_ms//1000%60:02}" + + spotify_url = f"https://open.spotify.com/track/{current_playback['item']['id']}" + songlink = f"https://song.link/s/{current_playback['item']['id']}" + + try: + device_raw = ( + current_playback["device"]["name"] + + " " + + current_playback["device"]["type"].lower() + ) + device = device_raw.replace("computer", "").replace("smartphone", "").strip() + except Exception: + device = None + + try: + playlist_id = current_playback["context"]["uri"].split(":")[-1] + playlist = self.sp.playlist(playlist_id) + playlist_name = playlist.get("name", None) + try: + playlist_owner = ( + f'' + f'{playlist["owner"]["display_name"]}' + ) + except KeyError: + playlist_owner = playlist.get("owner", {}).get("display_name", "") + except Exception: + playlist_name = "" + playlist_owner = "" + + text = self.config["custom_text"].format( + track=utils.escape_html(track), + artists=utils.escape_html(artists), + album=utils.escape_html(album_name), + duration=duration, + progress=progress, + device=device, + spotify_url=spotify_url, + songlink=songlink, + playlist=utils.escape_html(playlist_name) if playlist_name else "", + playlist_owner=playlist_owner or "", + ) + + msg = await utils.answer(message, text + self.strings("downloading_track")) + + await self._download_track(msg, f"{artists} {track}", caption=text) + + @error_handler + @tokenized + @loader.command( + ru_doc=( + "- 🔍 Поиск треков. Например: .ssearch Imagine Dragons Believer\n" + "- 🎧 Скачать трек: .ssearch 1 (где 1 — номер трека из списка)" + ) + ) + async def ssearchcmd(self, message: Message): + """🔍 Search for tracks. Usage: .ssearch or .ssearch to download""" + args = utils.get_args_raw(message) + if not args: + await utils.answer(message, self.strings("no_search_query")) + return + + try: + track_number = int(args) + search_results = self.get("last_search_results", []) + + if not search_results: + await utils.answer(message, self.strings("no_tracks_found")) + return + + if track_number <= 0 or track_number > len(search_results): + raise ValueError + + msg = await utils.answer(message, self.strings("downloading_track")) + + track_info = search_results[track_number - 1] + track_name = track_info["name"] + artists = ", ".join([a["name"] for a in track_info["artists"]]) + + caption_text = self.strings("download_success").format( + utils.escape_html(track_name), + utils.escape_html(artists) + ) + + await self._download_track(msg, f"{artists} {track_name}", caption=caption_text) + return + + except ValueError: + await utils.answer(message, self.strings("searching_tracks").format(args)) + + results = self.sp.search(q=args, limit=5, type="track") + + if not results or not results["tracks"]["items"]: + await utils.answer(message, self.strings("no_tracks_found").format(args)) + return + + self.set("last_search_results", results["tracks"]["items"]) + + tracks_list = [] + for i, track in enumerate(results["tracks"]["items"]): + track_name = track["name"] + artists = ", ".join([artist["name"] for artist in track["artists"]]) + track_url = track["external_urls"]["spotify"] + tracks_list.append( + "{number}. {track_name} — {artists}\n🔗 Spotify".format( + number=i + 1, + track_name=utils.escape_html(track_name), + artists=utils.escape_html(artists), + track_url=track_url, + ) + ) + + text = "\n".join(tracks_list) + await utils.answer(message, self.strings("search_results").format(args, text)) + + + @loader.command( + ru_doc="- 🔄 Сброс результатов поиска по трекам" + ) + async def ssearchresetcmd(self, message: Message): + """- 🔄 Reset track search results""" + self.set("last_search_results", []) + await utils.answer(message, self.strings["search_results_cleared"]) + + async def watcher(self, message: Message): + """Watcher is used to update token""" + if not self.sp: + return + + if self.get("NextRefresh", False): + ttc = self.get("NextRefresh", 0) + crnt = time.time() + if ttc < crnt: + self.set( + "acs_tkn", + self.sp_auth.refresh_access_token( + self.get("acs_tkn")["refresh_token"] + ), + ) + self.set("NextRefresh", time.time() + 45 * 60) + self.sp = spotipy.Spotify(auth=self.get("acs_tkn")["access_token"]) + else: + self.set( + "acs_tkn", + self.sp_auth.refresh_access_token(self.get("acs_tkn")["refresh_token"]), + ) + self.set("NextRefresh", time.time() + 45 * 60) + self.sp = spotipy.Spotify(auth=self.get("acs_tkn")["access_token"])