# █ █ ▀ █▄▀ ▄▀█ █▀█ ▀ # █▀█ █ █ █ █▀█ █▀▄ █ # © Copyright 2022 # https://t.me/hikariatama # # 🔒 Licensed under the GNU AGPLv3 # 🌐 https://www.gnu.org/licenses/agpl-3.0.html # ORIGINAL MODULE: https://raw.githubusercontent.com/hikariatama/ftg/master/spotify.py # # ======================================= # _ __ __ __ _ # | |/ /___ | \/ | ___ __| |___ # | ' // _ \ | |\/| |/ _ \ / _` / __| # | . \ __/ | | | | (_) | (_| \__ \ # |_|\_\___| |_| |_|\___/ \__,_|___/ # @ke_mods # ======================================= # # meta developer: @ke_mods # requires: telethon spotipy pillow requests httpx # scope: ffmpeg __version__ = (1, 0, 2) import asyncio import contextlib import functools import io import logging import re import textwrap import time import traceback import os from types import FunctionType import random import httpx import requests import spotipy from PIL import Image, ImageDraw, ImageEnhance, ImageFilter, ImageFont, ImageOps from telethon.errors import FloodWaitError from telethon.tl.functions.account import UpdateProfileRequest from telethon.tl.functions.users import GetFullUserRequest from telethon.tl.types import Message from .. import loader, utils logger = logging.getLogger(__name__) logging.getLogger("spotipy").setLevel(logging.CRITICAL) headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36", "Accept": "application/json", "Content-Type": "application/json", "Origin": "https://spotmate.online", "Referer": "https://spotmate.online/en1", } class Banners: def __init__( self, title: str, artists: list, duration: int, progress: int, track_cover: bytes, font, blur, album_title: str = "", meta_info: str = "", ): self.title = title self.artists = ", ".join(artists) if isinstance(artists, list) else artists self.duration = duration self.progress = progress self.track_cover = track_cover self.font_url = font self.blur_intensity = blur self.album_title = album_title self.meta_info = meta_info def _get_font(self, size, font_bytes): return ImageFont.truetype(io.BytesIO(font_bytes), size) def _prepare_cover(self, size, radius): cover = Image.open(io.BytesIO(self.track_cover)).convert("RGBA") cover = cover.resize((size, size), Image.Resampling.LANCZOS) mask = Image.new("L", (size, size), 0) draw = ImageDraw.Draw(mask) draw.rounded_rectangle((0, 0, size, size), radius=radius, fill=255) output = Image.new("RGBA", (size, size), (0, 0, 0, 0)) output.paste(cover, (0, 0), mask=mask) return output def _prepare_background(self, w, h): bg = Image.open(io.BytesIO(self.track_cover)).convert("RGBA") bg = bg.resize((w, h), Image.Resampling.LANCZOS) bg = bg.filter(ImageFilter.GaussianBlur(radius=self.blur_intensity)) bg = ImageEnhance.Brightness(bg).enhance(0.35) return bg def _draw_progress_bar(self, draw, x, y, w, h, progress_pct, color="white", bg_color="#6b6b6b"): draw.rounded_rectangle((x, y, x + w, y + h), radius=h/2, fill=bg_color) fill_w = int(w * progress_pct) if fill_w > 0: draw.rounded_rectangle((x, y, x + fill_w, y + h), radius=h/2, fill=color) def horizontal(self): W, H = 1500, 600 padding = 60 cover_size = 480 font_bytes = requests.get(self.font_url).content title_font = self._get_font(55, font_bytes) artist_font = self._get_font(45, font_bytes) time_font = self._get_font(25, font_bytes) img = self._prepare_background(W, H) draw = ImageDraw.Draw(img) cover = self._prepare_cover(cover_size, 30) img.paste(cover, (padding, (H - cover_size) // 2), cover) text_x = padding + cover_size + 60 text_y_start = 100 text_width_limit = W - text_x - padding wrapper = textwrap.TextWrapper(width=23) title_lines = wrapper.wrap(self.title) if len(title_lines) > 2: title_lines = title_lines[:2] title_lines[-1] += "..." current_y = text_y_start title_height = title_font.getbbox("Ah")[3] + 15 for line in title_lines: draw.text((text_x, current_y), line, font=title_font, fill="white") current_y += title_height display_artist = self.artists while artist_font.getlength(display_artist) > text_width_limit and len(display_artist) > 0: display_artist = display_artist[:-1] if len(display_artist) < len(self.artists): display_artist += "…" artist_y = current_y + 10 draw.text((text_x, artist_y), display_artist, font=artist_font, fill="#b3b3b3") cur_time = f"{(self.progress//1000//60):02}:{(self.progress//1000%60):02}" dur_time = f"{(self.duration//1000//60):02}:{(self.duration//1000%60):02}" cur_w = time_font.getlength(cur_time) dur_w = time_font.getlength(dur_time) bar_y = 480 bar_h = 8 gap = 25 draw.text((text_x, bar_y - 12), cur_time, font=time_font, fill="white") bar_start_x = text_x + cur_w + gap bar_end_x = text_x + text_width_limit - dur_w - gap bar_w = bar_end_x - bar_start_x prog_pct = self.progress / self.duration if self.duration > 0 else 0 self._draw_progress_bar(draw, bar_start_x, bar_y, bar_w, bar_h, prog_pct) draw.text((bar_end_x + gap, bar_y - 12), dur_time, font=time_font, fill="white") by = io.BytesIO() img.save(by, format="PNG") by.seek(0) by.name = "banner.png" return by def vertical(self): W, H = 1000, 1500 padding = 80 cover_size = 800 font_bytes = requests.get(self.font_url).content title_font = self._get_font(60, font_bytes) artist_font = self._get_font(45, font_bytes) time_font = self._get_font(35, font_bytes) img = self._prepare_background(W, H) draw = ImageDraw.Draw(img) cover = self._prepare_cover(cover_size, 40) cover_x = (W - cover_size) // 2 cover_y = 120 img.paste(cover, (cover_x, cover_y), cover) text_area_y = cover_y + cover_size + 120 text_width_limit = W - (padding * 2) wrapper = textwrap.TextWrapper(width=23) title_lines = wrapper.wrap(self.title) if len(title_lines) > 2: title_lines = title_lines[:2] title_lines[-1] += "..." current_y = text_area_y title_height = title_font.getbbox("Ah")[3] + 15 for line in title_lines: w = title_font.getlength(line) draw.text(((W - w) / 2, current_y), line, font=title_font, fill="white") current_y += title_height display_artist = self.artists while artist_font.getlength(display_artist) > text_width_limit and len(display_artist) > 0: display_artist = display_artist[:-1] if len(display_artist) < len(self.artists): display_artist += "…" artist_w = artist_font.getlength(display_artist) draw.text(((W - artist_w) / 2, current_y + 15), display_artist, font=artist_font, fill="#b3b3b3") bar_y = text_area_y + 260 if len(title_lines) > 1: bar_y += 60 bar_h = 8 bar_w = W - (padding * 2) prog_pct = self.progress / self.duration if self.duration > 0 else 0 self._draw_progress_bar(draw, padding, bar_y, bar_w, bar_h, prog_pct, color="white", bg_color="#6b6b6b") cur_time = f"{(self.progress//1000//60):02}:{(self.progress//1000%60):02}" dur_time = f"{(self.duration//1000//60):02}:{(self.duration//1000%60):02}" draw.text((padding, bar_y + 40), cur_time, font=time_font, fill="white") dur_w = time_font.getlength(dur_time) draw.text((W - padding - dur_w, bar_y + 40), dur_time, font=time_font, fill="white") by = io.BytesIO() img.save(by, format="PNG") by.seek(0) by.name = "banner.png" return by # Ultra banner from YaMusic by @codrago_m def ultra(self) -> io.BytesIO: WIDTH, HEIGHT = 2560, 1220 font_bytes = requests.get(self.font_url).content def get_font(size): try: return ImageFont.truetype(io.BytesIO(font_bytes), size) except Exception: return ImageFont.load_default() try: original_cover = Image.open(io.BytesIO(self.track_cover)).convert("RGBA") except Exception: original_cover = Image.new("RGBA", (1000, 1000), "black") dominant_color_img = original_cover.resize((1, 1), Image.Resampling.LANCZOS) dominant_color = dominant_color_img.getpixel((0, 0)) r, g, b, a = dominant_color brightness = (r * 299 + g * 587 + b * 114) / 1000 if brightness < 60: r = min(255, r + 60) g = min(255, g + 60) b = min(255, b + 60) dominant_color = (r, g, b, 255) background = original_cover.copy() bg_w, bg_h = background.size target_ratio = WIDTH / HEIGHT current_ratio = bg_w / bg_h if current_ratio > target_ratio: new_w = int(bg_h * target_ratio) offset = (bg_w - new_w) // 2 background = background.crop((offset, 0, offset + new_w, bg_h)) else: new_h = int(bg_w / target_ratio) offset = (bg_h - new_h) // 2 background = background.crop((0, offset, bg_w, offset + new_h)) background = background.resize((WIDTH, HEIGHT), Image.Resampling.LANCZOS) if self.blur_intensity > 0: background = background.filter(ImageFilter.GaussianBlur(radius=self.blur_intensity)) dark_overlay = Image.new("RGBA", (WIDTH, HEIGHT), (0, 0, 0, 180)) background = Image.alpha_composite(background, dark_overlay) cover_size = 500 cover_x = (WIDTH - cover_size) // 2 cover_y = 160 glow_layer = Image.new("RGBA", (WIDTH, HEIGHT), (0, 0, 0, 0)) draw_glow = ImageDraw.Draw(glow_layer) glow_rect_size = 620 g_x = (WIDTH - glow_rect_size) // 2 g_y = cover_y + (cover_size - glow_rect_size) // 2 draw_glow.rounded_rectangle( (g_x, g_y, g_x + glow_rect_size, g_y + glow_rect_size), radius=50, fill=dominant_color, ) glow_layer = glow_layer.filter(ImageFilter.GaussianBlur(radius=60)) glow_layer = ImageEnhance.Brightness(glow_layer).enhance(1.4) glow_layer = ImageEnhance.Color(glow_layer).enhance(1.2) background = Image.alpha_composite(background, glow_layer) cover_img = original_cover.resize((cover_size, cover_size), Image.Resampling.LANCZOS) mask = Image.new("L", (cover_size, cover_size), 0) draw_mask = ImageDraw.Draw(mask) draw_mask.rounded_rectangle((0, 0, cover_size, cover_size), radius=45, fill=255) background.paste(cover_img, (cover_x, cover_y), mask) draw = ImageDraw.Draw(background) center_x = WIDTH // 2 current_y = cover_y + cover_size + 130 def draw_text_shadow(text, pos, font, fill="white", anchor="ms"): x, y = pos draw.text((x + 2, y + 2), text, font=font, fill=(0, 0, 0, 240), anchor=anchor) draw.text((x, y), text, font=font, fill=fill, anchor=anchor) font_title = get_font(100) title_text = self.title if len(self.title) <= 30 else self.title[:30] + "..." draw_text_shadow(title_text.upper(), (center_x, current_y), font_title) current_y += 85 font_artist = get_font(65) artist_text = self.artists if len(self.artists) <= 45 else self.artists[:45] + "..." draw_text_shadow(artist_text.upper(), (center_x, current_y), font_artist, fill=(255, 255, 255, 240)) current_y += 80 bar_width = 800 font_time = get_font(40) bar_start_x = center_x - (bar_width // 2) bar_end_x = center_x + (bar_width // 2) bar_y = current_y total_time_str = f"{self.duration // 1000 // 60:02d}:{(self.duration // 1000) % 60:02d}" cur_time_str = f"{self.progress // 1000 // 60:02d}:{(self.progress // 1000) % 60:02d}" draw_text_shadow(cur_time_str, (bar_start_x - 30, bar_y), font_time, anchor="rm") draw_text_shadow(total_time_str, (bar_end_x + 30, bar_y), font_time, anchor="lm") old_state = random.getstate() random.seed(self.title + str(self.duration)) num_bars = 65 bar_spacing = bar_width / num_bars bar_w = max(4, int(bar_spacing * 0.5)) max_h, min_h = 50, 6 active_bars = int(num_bars * (self.progress / self.duration)) if self.duration > 0 else 0 for i in range(num_bars): base_h = random.randint(min_h, max_h) edge_factor = 1.0 - abs((i - num_bars / 2) / (num_bars / 2)) h = max(min_h, int(base_h * 0.4 + max_h * edge_factor * 0.6)) x_center = bar_start_x + i * bar_spacing color = (255, 255, 255, 255) if i < active_bars else (80, 80, 80, 100) draw.rounded_rectangle( (x_center - bar_w / 2, bar_y - h / 2, x_center + bar_w / 2, bar_y + h / 2), radius=int(bar_w / 2), fill=color, ) random.setstate(old_state) current_y += 80 if self.album_title: font_album = get_font(50) album_text = self.album_title if len(self.album_title) <= 50 else self.album_title[:50] + "..." draw_text_shadow(album_text, (center_x, current_y), font_album, fill=(230, 230, 230)) current_y += 60 if self.meta_info: font_meta = get_font(40) draw_text_shadow(self.meta_info, (center_x, current_y), font_meta, fill=(210, 210, 210)) by = io.BytesIO() background.save(by, format="PNG") by.seek(0) by.name = "banner.png" return by @loader.tds class SpotifyMod(loader.Module): """Card with the currently playing track on Spotify.""" strings = { "name": "SpotifyMod", "need_auth": ( " Please execute" " .sauth before performing this action." ), "on-repeat": ( "🔄 Set on-repeat." ), "off-repeat": ( "🔄 Stopped track" " repeat." ), "skipped": ( "➡️ Skipped track." ), "playing": "▶️ Playing...", "back": ( "⬅️ Switched to previous" " track" ), "paused": " Pause", "restarted": ( "✅️ Playing track" " from the" " beginning" ), "liked": ( "❤️ Liked current" " playback" ), "unlike": ( "" " Unliked current playback" ), "err": ( " An error occurred." "\n{}" ), "already_authed": ( " Already authorized" ), "authed": ( " Authentication" " successful" ), "deauth": ( "🚪 Successfully logged out" " of account" ), "auth": ( '🔗 Follow this' " link, allow access, then enter .scode https://... with" " the link you received." ), "no_music": ( " No music is playing!" ), "dl_err": ( " Failed to download" " track." ), "volume_changed": ( "🔊" " Volume changed to {}%." ), "volume_invalid": ( " Volume level must be" " a number between 0 and 100." ), "volume_err": ( " An error occurred while" " changing volume." ), "no_volume_arg": ( " Please specify a" " volume level between 0 and 100." ), "searching_tracks": ( "🕔 Searching for tracks" " matching {}..." ), "no_search_query": ( " Please specify a" " search query." ), "no_tracks_found": ( " No tracks found for" " {}." ), "search_results": ( " Search results for" " {}:\n\n{}" ), "search_results_inline": ( " Found {count} results" " for {query}.\nSelect a track:" ), "downloading_search_track": ( "🕔 Downloading {}..." ), "download_success": ( " Successfully downloaded {} - {}" ), "invalid_track_number": ( " Invalid track number." " Please search first or provide a valid number from the list." ), "no_devices_found": ( " No devices found." ), "device_changed": ( " Playback transferred to" " {}." ), "autobio": ( "🎧 Spotify autobio {}" ), "snowt_failed": "\n\n Download failed", "uploading_banner": "\n\n🕔 Uploading banner...", "downloading_track": "\n\n🕔 Downloading track...", "no_playlists": " No playlists found.", "playlists_list": "📄 Your playlists:\n\n{}", "added_to_playlist": " Added {} to {}", "removed_from_playlist": " Removed {} from {}", "invalid_playlist_index": " Invalid playlist number.", "no_cached_playlists": " Use .splaylists first.", "playlist_created": " Playlist {} created.", "playlist_deleted": " Playlist {} deleted.", "no_playlist_name": " Please specify a playlist name.", "device_select": "📄 Select playback device:", "on-shuffle": ( "🔀 Shuffle enabled." ), "off-shuffle": ( "🔀 Shuffle disabled." ), } strings_ru = { "_cls_doc": "Карточка с играющим треком в Spotify.", "need_auth": ( " Выполни" " .sauth перед выполнением этого действия." ), "err": ( " Произошла ошибка." "\n{}" ), "on-repeat": ( "🔄 Включен повтор трека." ), "off-repeat": ( "🔄 Повтор трека отключён." ), "skipped": ( "➡️ Трек пропущен." ), "playing": "▶️ Играет...", "back": ( "⬅️ Переключено на предыдущий трек" ), "paused": " Пауза", "restarted": ( "✅️ Воспроизведение трека с начала..." ), "liked": ( "❤️ Текущий трек добавлен в избранное" ), "unlike": ( " Убрал лайк с текущего трека" ), "already_authed": ( " Уже авторизован" ), "authed": ( " Успешная аутентификация" ), "deauth": ( "🚪 Успешный выход из аккаунта" ), "auth": ( '🔗 Пройдите по этой ссылке, разрешите вход, затем введите .scode https://... с ссылкой которую вы получили.' ), "no_music": ( " Музыка не играет!" ), "dl_err": ( " Не удалось скачать трек." ), "volume_changed": ( "🔊" " Громкость изменена на {}%." ), "volume_invalid": ( " Уровень громкости должен" " быть числом от 0 до 100." ), "volume_err": ( " Произошла ошибка при" " изменении громкости." ), "no_volume_arg": ( " Пожалуйста, укажите" " уровень громкости от 0 до 100." ), "searching_tracks": ( "🕔 Идет поиск треков" " по запросу {}..." ), "no_search_query": ( " Пожалуйста, укажите" " поисковый запрос." ), "no_tracks_found": ( " По запросу '{}'" " ничего не найдено." ), "search_results": ( " Результаты поиска" " по запросу {}:\n\n{}" ), "search_results_inline": ( " Найдено {count} результатов" " по запросу {query}.\nВыберите трек:" ), "downloading_search_track": ( "🕔 Скачиваю {}..." ), "download_success": ( " Трек {} - {} успешно скачан." ), "invalid_track_number": ( " Некорректный номер трека." " Сначала выполните поиск или укажите правильный номер из списка." ), "no_devices_found": ( " Устройства не найдены." ), "device_changed": ( " Воспроизведение переключено на" " {}." ), "autobio": ( "🎧 Обновление био" " включено {}" ), "snowt_failed": "\n\n Ошибка скачивания.", "uploading_banner": "\n\n🕔 Загрузка баннера...", "downloading_track": "\n\n🕔 Скачивание трека...", "no_playlists": " Плейлисты не найдены.", "playlists_list": "📄 Ваши плейлисты:\n\n{}", "added_to_playlist": " Трек {} добавлен в {}", "removed_from_playlist": " Трек {} удален из {}", "invalid_playlist_index": " Неверный номер плейлиста.", "no_cached_playlists": " Сначала используйте .splaylists.", "playlist_created": " Плейлист {} создан.", "playlist_deleted": " Плейлист {} удален.", "no_playlist_name": " Пожалуйста, укажите название плейлиста.", "device_select": "📄 Выберите устройство для воспроизведения:", "on-shuffle": ( "🔀 Перемешивание включено." ), "off-shuffle": ( "🔀 Перемешивание отключено." ), } def __init__(self): self._client_id = "e0708753ab60499c89ce263de9b4f57a" self._client_secret = "80c927166c664ee98a43a2c0e2981b4a" self.sp = None self.scope = ( "user-read-playback-state playlist-read-private playlist-read-collaborative" " user-modify-playback-state user-library-modify" " playlist-modify-public playlist-modify-private" ) self.sp_auth = spotipy.oauth2.SpotifyOAuth( client_id=self._client_id, client_secret=self._client_secret, redirect_uri="https://thefsch.github.io/spotify/", scope=self.scope, ) self.config = loader.ModuleConfig( loader.ConfigValue( "show_banner", True, "Show banner with track info", validator=loader.validators.Boolean(), ), loader.ConfigValue( "custom_text", ( "🎧 Now playing: {track} — {artists}\n" "🔗 song.link" ), "Custom text, supports {track}, {artists}, {album}, {playlist}, {playlist_owner}, {spotify_url}, {songlink}, {progress}, {duration}, {device} placeholders." + "\n\n" + "ℹ️ Custom placeholders: {}".format(utils.config_placeholders()), validator=loader.validators.String(), ), loader.ConfigValue( "font", "https://raw.githubusercontent.com/kamekuro/assets/master/fonts/Onest-Bold.ttf", "Custom font. Specify URL to .ttf file", validator=loader.validators.String(), ), loader.ConfigValue( "auto_bio_template", "🎧 {title} - {artist}", lambda: "Template for Spotify AutoBio, supports {artist}, {title}", ), loader.ConfigValue( "TimeOut", 60, "Response timeout in seconds | Время ожидания ответа в секундах", validator=loader.validators.Integer(minimum=30), ), loader.ConfigValue( "banner_version", "horizontal", lambda: "Banner version", validator=loader.validators.Choice(["horizontal", "vertical", "ultra"]), ), loader.ConfigValue( "blur_intensity", 40, lambda: "Blur intensity", validator=loader.validators.Integer(minimum=0), ), ) self._sp_store = {} def _init_spotify_client(self) -> bool: token = self.get("acs_tkn") or {} access_token = token.get("access_token") if not access_token: self.sp = None return False try: self.sp = spotipy.Spotify(auth=access_token) return True except Exception: self.sp = None return False async def client_ready(self, client, db): self.font_ready = asyncio.Event() self._premium = getattr(await client.get_me(), "premium", False) if not self._init_spotify_client(): self.set("acs_tkn", None) self.bio_task = None if self.get("autobio", False) and self.sp: await self.autobio() def tokenized(func) -> FunctionType: @functools.wraps(func) async def wrapped(*args, **kwargs): if not args[0].get("acs_tkn", False) or not args[0].sp: await utils.answer(args[1], args[0].strings("need_auth")) return return await func(*args, **kwargs) wrapped.__doc__ = func.__doc__ wrapped.__module__ = func.__module__ return wrapped def error_handler(func) -> FunctionType: @functools.wraps(func) async def wrapped(*args, **kwargs): try: return await func(*args, **kwargs) except Exception as e: error_msg = str(e) if "NO_ACTIVE_DEVICE" in error_msg: user_error = "No active device" elif "PREMIUM_REQUIRED" in error_msg: user_error = "Spotify Premium is required for this feature" elif "Insufficient client scope" in error_msg: user_error = "Insufficient permissions. Please re-authenticate." else: user_error = f"{type(e).__name__}: {error_msg[:50]}" with contextlib.suppress(Exception): await utils.answer( args[1], args[0].strings("err").format(user_error), ) wrapped.__doc__ = func.__doc__ wrapped.__module__ = func.__module__ return wrapped async def autobio(self): if getattr(self, "bio_task", None) and not self.bio_task.done(): self.bio_task.cancel() async def _loop(): while self.get("autobio", False): try: if not self.sp and not self._init_spotify_client(): self.set("autobio", False) await self._restore_original_bio() break current_playback = await utils.run_sync(self.sp.current_playback) if not current_playback or not current_playback.get("is_playing"): if self.get("last_bio", ""): await self._restore_original_bio(clear_original=False) await asyncio.sleep(10) continue item = current_playback.get("item") or {} title = item.get("name") or "" artists = ", ".join( [a.get("name", "") for a in item.get("artists", []) if a.get("name")] ) if not title: await asyncio.sleep(10) continue bio = self.config["auto_bio_template"].format( title=title, artist=artists or "Unknown Artist", ).strip() if len(bio) > 70: bio = bio[:69] + "…" if bio != self.get("last_bio", ""): await self._client(UpdateProfileRequest(about=bio)) self.set("last_bio", bio) except FloodWaitError as e: await asyncio.sleep(getattr(e, "seconds", 30) + 1) except asyncio.CancelledError: break except Exception: pass await asyncio.sleep(self.config.get("BIO_UPDATE_DELAY", 30)) self.bio_task = asyncio.create_task(_loop()) async def _get_current_about(self) -> str: full_user = await self._client(GetFullUserRequest("me")) return getattr(full_user.full_user, "about", "") or "" async def _restore_original_bio( self, *, clear_original: bool = True, clear_last: bool = True, ): original_bio = self.get("original_bio", None) if original_bio is None: return await self._client(UpdateProfileRequest(about=original_bio)) if clear_original: self.set("original_bio", None) if clear_last: self.set("last_bio", "") def _get_chat_id(self, target): if isinstance(target, int): return target if not target: return None chat_id = getattr(target, "chat_id", None) if chat_id: return chat_id with contextlib.suppress(Exception): return utils.get_chat_id(target) return None def _reply_id(self, message): reply_to_id = getattr(message, "reply_to_msg_id", None) if reply_to_id: return reply_to_id reply_to = getattr(message, "reply_to", None) return getattr(reply_to, "reply_to_msg_id", None) if reply_to else None async def _download_track( self, target, query, caption=None, track_name=None, artists=None, log_context=None, reply_to_id=None, ) -> bool: dl_dir = os.path.join(os.getcwd(), "spotifymod") os.makedirs(dl_dir, exist_ok=True) for f in os.listdir(dl_dir): with contextlib.suppress(Exception): os.remove(os.path.join(dl_dir, f)) if caption is None: caption = self.strings["download_success"].format( utils.escape_html(track_name or "Unknown"), utils.escape_html(artists or "Unknown Artist"), ) async def send_text(text: str) -> bool: if target is None: return False if isinstance(target, int): await self._client.send_message(target, text, reply_to=reply_to_id) return True try: await utils.answer(target, text) return True except Exception: chat_id = self._get_chat_id(target) if chat_id is None: return False await self._client.send_message(chat_id, text, reply_to=reply_to_id) return True async def send_file(file_path: str) -> bool: if target is None: return False if isinstance(target, int): await self._client.send_file(target, file_path, caption=caption, reply_to=reply_to_id) return True try: await utils.answer(target, caption, file=file_path) return True except Exception as e: logger.error("SpotifyMod send_file fallback: %s", e, exc_info=True) chat_id = self._get_chat_id(target) if chat_id is None: return False await self._client.send_file(chat_id, file_path, caption=caption, reply_to=reply_to_id) return True success = False try: track_url = (query or "").strip().split("?")[0] if "spotify:track:" in track_url: track_url = f"https://open.spotify.com/track/{track_url.split(':')[-1]}" if "track/" not in track_url: results = await asyncio.to_thread( self.sp.search, q=query, limit=1, type="track", ) items = (results or {}).get("tracks", {}).get("items", []) if not items: logger.error("SpotifyMod: Spotify track not found for %r", log_context or query) await send_text(self.strings["snowt_failed"]) return False track_data = items[0] track_url = track_data.get("external_urls", {}).get("spotify") or f"https://open.spotify.com/track/{track_data['id']}" async with httpx.AsyncClient(follow_redirects=True) as client: csrf = await self.get_session(client) hdrs = {**headers, "X-CSRF-TOKEN": csrf} info_res = await client.post( "https://spotmate.online/getTrackData", headers=hdrs, json={"spotify_url": track_url}, timeout=self.config["TimeOut"], ) info = info_res.json() if info.get("type") != "track": logger.error("SpotifyMod: spotmate returned no track for %r", log_context or query) await send_text(self.strings["snowt_failed"]) return False track_id = info.get("id", track_url.split("/")[-1]) conv_res = await client.post( "https://spotmate.online/convert", headers=hdrs, json={"urls": track_url}, timeout=self.config["TimeOut"], ) conv = conv_res.json() download_url = conv.get("url") or conv.get("download_url") task_id = conv.get("task_id") or conv.get("taskId") if not download_url and task_id: for _ in range(40): await asyncio.sleep(4.5) task_res = await client.get( f"https://spotmate.online/tasks/{task_id}", headers={**hdrs, "Accept": "application/json"}, timeout=self.config["TimeOut"], ) task = task_res.json() if task.get("error"): logger.error("SpotifyMod: task error for %r", log_context or query) await send_text(self.strings["dl_err"]) return False data = task.get("data") or task.get("result") or {} status = str(data.get("status") or data.get("state") or "").lower() if status == "finished": download_url = ( data.get("url") or data.get("download_url") or (data.get("result") or {}).get("url") or (data.get("result") or {}).get("download_url") ) break if status in ("failed", "error", "expired", "cancelled"): logger.error("SpotifyMod: task failed for %r", log_context or query) await send_text(self.strings["dl_err"]) return False if not download_url: logger.error("SpotifyMod: download timeout for %r", log_context or query) await send_text(self.strings["snowt_failed"]) return False file_res = await client.get( download_url, headers={"User-Agent": headers["User-Agent"], "Referer": "https://spotmate.online/en1"}, timeout=self.config["TimeOut"], ) file_path = os.path.join(dl_dir, f"{track_id}.mp3") with open(file_path, "wb") as f: f.write(file_res.content) success = await send_file(file_path) if not success: logger.error("SpotifyMod: failed to send %r (target=%s)", log_context or query, type(target).__name__) await send_text(self.strings["dl_err"]) except Exception as e: logger.error("Download track error (%s): %s", log_context or "no context", e, exc_info=True) await send_text(self.strings["dl_err"]) finally: for f in os.listdir(dl_dir): with contextlib.suppress(Exception): os.remove(os.path.join(dl_dir, f)) return success async def get_session(self, client: httpx.AsyncClient) -> str: res = await client.get( "https://spotmate.online/en1", headers={ "User-Agent": headers["User-Agent"], "Accept": "text/html", }, timeout=self.config["TimeOut"], ) match = re.search(r'csrf-token[^>]*content="([^"]+)"', res.text) if not match: raise ValueError("CSRF token not found") return match.group(1) def _short_text(self, text: str, limit: int = 60) -> str: text = " ".join(text.split()) if len(text) <= limit: return text if limit <= 3: return text[:limit] return text[: limit - 3] + "..." def _track_info(self, track_info) -> tuple: if isinstance(track_info, dict): track_name = track_info.get("name", "Unknown") artists_list = [ a.get("name") for a in track_info.get("artists", []) if a.get("name") ] artists = ", ".join(artists_list) if artists_list else "Unknown Artist" return track_name, artists if isinstance(track_info, (list, tuple)): track_name = track_info[0] if len(track_info) > 0 else "Unknown" artists = track_info[1] if len(track_info) > 1 else "Unknown Artist" if not artists: artists = "Unknown Artist" return track_name or "Unknown", artists return "Unknown", "Unknown Artist" def _search_keyboard(self, tracks: list, chat_id=None, reply_to_id=None) -> list: keyboard = [] for track in tracks: track_name, artists = self._track_info(track) label = f"{track_name} — {artists}" if artists else track_name keyboard.append( [ { "text": self._short_text(label), "callback": self._inline_download_track, "args": (track_name, artists, reply_to_id, chat_id), } ] ) return keyboard async def _inline_download_track( self, call, track_name: str, artists: str, reply_to_id=None, chat_id=None, ): track_name = track_name or "Unknown" artists = artists or "Unknown Artist" with contextlib.suppress(Exception): await call.answer() with contextlib.suppress(Exception): await call.edit(self.strings["downloading_track"].lstrip(), reply_markup=None) target_message = getattr(call, "message", None) if reply_to_id is None: reply_to_id = self._reply_id(target_message) if chat_id is None: chat_id = self._get_chat_id(target_message) if chat_id is None: chat_id = getattr(call, "chat_id", None) if chat_id is None: chat_id = self._get_chat_id(call) if chat_id is None and target_message is None: pass with contextlib.suppress(Exception): await call.edit(self.strings["dl_err"], reply_markup=None) return target = chat_id if chat_id is not None else target_message success = await self._download_track( target, f"{artists} {track_name}", track_name=track_name, artists=artists, log_context=f"{track_name} - {artists}", reply_to_id=reply_to_id, ) if success: with contextlib.suppress(Exception): await call.delete() else: with contextlib.suppress(Exception): await call.edit(self.strings["dl_err"], reply_markup=None) async def _inline_search_tracks(self, query): if not self.get("acs_tkn", False) or not self.sp: return { "title": "Auth required", "description": "Run .sauth", "message": self.strings["need_auth"], } query_text = (query.args or "").strip() if not query_text: return { "title": "No query", "description": "Provide search query", "message": self.strings["no_search_query"], } try: results = await asyncio.to_thread( self.sp.search, q=query_text, limit=5, type="track", ) except Exception as e: return { "title": "Search error", "description": "Try again", "message": self.strings["err"].format( utils.escape_html(str(e)[:50]) ), } if not results or not results["tracks"]["items"]: return { "title": "No results", "description": self._short_text(query_text, limit=60), "message": self.strings["no_tracks_found"].format( utils.escape_html(query_text) ), } tracks = results["tracks"]["items"] store_id = id(tracks) self._sp_store[store_id] = [ ( t.get("name", "Unknown"), ", ".join(a.get("name", "") for a in t.get("artists", []) if a.get("name")) or "Unknown Artist", ) for t in tracks ] entries = [] for i, track in enumerate(tracks): track_name, artists = self._track_info(track) cover_list = track.get("album", {}).get("images", []) thumb = cover_list[0]["url"] if cover_list else None entries.append( { "title": self._short_text(track_name, limit=60), "description": self._short_text(artists, limit=60) if artists else "", "message": f'{self.strings["downloading_track"].lstrip()}\nspdl_{store_id}_{i}', "thumb": thumb, } ) return entries @loader.inline_handler(ru_doc="<запрос> - поиск треков Spotify.") async def sq(self, query): """ - search Spotify track""" return await self._inline_search_tracks(query) @loader.inline_handler(ru_doc="<запрос> - поиск треков Spotify.") async def ssearch(self, query): """ - search Spotify track""" return await self._inline_search_tracks(query) @error_handler @tokenized @loader.command( ru_doc="| .spla - ➕ Добавить текущий трек в плейлист (используйте номер из .splaylists | .spls)", alias="spla" ) async def splaylistadd(self, message: Message): """| .spla - ➕ Add current track to playlist (use number from .splaylists | .spls)""" args = utils.get_args_raw(message) if not args or not args.isdigit(): await utils.answer(message, self.strings["invalid_playlist_index"]) return index = int(args) - 1 playlists = self.get("last_playlists", []) if not playlists: await utils.answer(message, self.strings["no_cached_playlists"]) return if index < 0 or index >= len(playlists): await utils.answer(message, self.strings["invalid_playlist_index"]) return current = self.sp.current_playback() if not current or not current.get("item"): await utils.answer(message, self.strings["no_music"]) return track_uri = current["item"]["uri"] track_name = current["item"]["name"] artists = ", ".join([a["name"] for a in current["item"]["artists"]]) full_track_name = f"{artists} - {track_name}" playlist_id = playlists[index]["id"] playlist_name = playlists[index]["name"] self.sp.playlist_add_items(playlist_id, [track_uri]) await utils.answer(message, self.strings["added_to_playlist"].format(utils.escape_html(full_track_name), utils.escape_html(playlist_name))) @error_handler @tokenized @loader.command( ru_doc="| .splr - ➖ Удалить текущий трек из плейлиста (используйте номер из .splaylists | .spls)", alias="splr" ) async def splaylistrem(self, message: Message): """| .splr - ➖ Remove current track from playlist (use number from .splaylists | .spls)""" args = utils.get_args_raw(message) if not args or not args.isdigit(): await utils.answer(message, self.strings["invalid_playlist_index"]) return index = int(args) - 1 playlists = self.get("last_playlists", []) if not playlists: await utils.answer(message, self.strings["no_cached_playlists"]) return if index < 0 or index >= len(playlists): await utils.answer(message, self.strings["invalid_playlist_index"]) return current = self.sp.current_playback() if not current or not current.get("item"): await utils.answer(message, self.strings["no_music"]) return track_uri = current["item"]["uri"] track_name = current["item"]["name"] artists = ", ".join([a["name"] for a in current["item"]["artists"]]) full_track_name = f"{artists} - {track_name}" playlist_id = playlists[index]["id"] playlist_name = playlists[index]["name"] self.sp.playlist_remove_all_occurrences_of_items(playlist_id, [track_uri]) await utils.answer(message, self.strings["removed_from_playlist"].format(utils.escape_html(full_track_name), utils.escape_html(playlist_name))) @error_handler @tokenized @loader.command( ru_doc="| .splc - 🆕 Создать новый плейлист", alias="splc" ) async def splaylistcreate(self, message: Message): """| .splc - 🆕 Create a new playlist""" name = utils.get_args_raw(message) if not name: await utils.answer(message, self.strings["no_playlist_name"]) return user_id = self.sp.me()["id"] self.sp.user_playlist_create(user_id, name) await utils.answer(message, self.strings["playlist_created"].format(utils.escape_html(name))) @error_handler @tokenized @loader.command( ru_doc="| .spld - 🗑 Удалить плейлист (используйте номер из .splaylists | .spls)", alias="spld" ) async def splaylistdelete(self, message: Message): """| .spld - 🗑 Delete playlist (use number from .splaylists | .spls)""" args = utils.get_args_raw(message) if not args or not args.isdigit(): await utils.answer(message, self.strings["invalid_playlist_index"]) return index = int(args) - 1 playlists = self.get("last_playlists", []) if not playlists: await utils.answer(message, self.strings["no_cached_playlists"]) return if index < 0 or index >= len(playlists): await utils.answer(message, self.strings["invalid_playlist_index"]) return playlist_id = playlists[index]["id"] playlist_name = playlists[index]["name"] self.sp.current_user_unfollow_playlist(playlist_id) await utils.answer(message, self.strings["playlist_deleted"].format(utils.escape_html(playlist_name))) @error_handler @tokenized @loader.command( ru_doc="| .spls - 📃 Получить все плейлисты", alias="spls" ) async def splaylists(self, message: Message): """| .spls - 📃 Get all playlists""" user_id = self.sp.me()["id"] playlists = self.sp.current_user_playlists() editable_playlists = [ p for p in playlists["items"] if p["owner"]["id"] == user_id or p["collaborative"] ] self.set("last_playlists", editable_playlists) playlist_list_text = "" for i, playlist in enumerate(editable_playlists): name = utils.escape_html(playlist["name"]) url = playlist["external_urls"]["spotify"] count = playlist["tracks"]["total"] playlist_list_text += f"{i + 1}. {name} ({count} tracks)\n" if playlist_list_text == "": await utils.answer(message, self.strings["no_playlists"]) else: await utils.answer(message, self.strings["playlists_list"].format(playlist_list_text)) @error_handler @tokenized @loader.command( ru_doc="- ℹ️ Переключить стриминг воспроизведения в био" ) async def sbiocmd(self, message): """- ℹ️ Toggle streaming playback in bio""" if not getattr(self, "sp", None): await utils.answer(message, self.strings["need_auth"]) return state = not self.get("autobio", False) self.set("autobio", state) if state: self.set("original_bio", await self._get_current_about()) self.set("last_bio", "") await self.autobio() else: task = getattr(self, "bio_task", None) if task and not task.done(): task.cancel() self.bio_task = None await self._restore_original_bio() await utils.answer( message, self.strings["autobio"].format("on" if state else "off"), ) @error_handler @tokenized @loader.command( ru_doc="| .sv - 🔊 Изменить громкость. .svolume | .sv <0-100>", alias="sv" ) async def svolume(self, message: Message): """| .sv - 🔊 Change playback volume. .svolume | .sv <0-100>""" args = utils.get_args_raw(message) if args == "": await utils.answer(message, self.strings["no_volume_arg"]) else: try: volume_percent = int(args) if 0 <= volume_percent <= 100: self.sp.volume(volume_percent) await utils.answer(message, self.strings["volume_changed"].format(volume_percent)) else: await utils.answer(message, self.strings["volume_invalid"]) except ValueError: await utils.answer(message, self.strings["volume_invalid"]) @error_handler @tokenized @loader.command( ru_doc="| .sd - 🎵 Выбрать устройство для воспроизведения", alias="sd" ) async def sdevicecmd(self, message: Message): """| .sd - 🎵 Select playback device""" devices = self.sp.devices()["devices"] if not devices: await utils.answer(message, self.strings["no_devices_found"]) return async def _switch(call, device_id: str, device_name: str): with contextlib.suppress(Exception): await call.answer() try: self.sp.transfer_playback(device_id=device_id) with contextlib.suppress(Exception): await call.edit( self.strings["device_changed"].format(utils.escape_html(device_name)), reply_markup=None, ) except Exception as e: with contextlib.suppress(Exception): await call.edit( self.strings["err"].format(utils.escape_html(str(e)[:80])), reply_markup=None, ) keyboard = [] for device in devices: active_mark = "> " if device["is_active"] else "" label = f"{active_mark}{device['name']} ({device['type'].lower()})" keyboard.append([{ "text": label, "callback": _switch, "args": (device["id"], device["name"]), }]) await self.inline.form( self.strings["device_select"], message=message, reply_markup=keyboard, ) @error_handler @tokenized @loader.command( ru_doc="- 💫 Включить повтор трека" ) async def srepeatcmd(self, message: Message): """- 💫 Repeat""" self.sp.repeat("track") await utils.answer(message, self.strings["on-repeat"]) @error_handler @tokenized @loader.command( ru_doc="- ✋ Остановить повтор" ) async def sderepeatcmd(self, message: Message): """- ✋ Stop repeat""" self.sp.repeat("context") await utils.answer(message, self.strings["off-repeat"]) @error_handler @tokenized @loader.command( ru_doc="- 🔀 Включить перемешивание" ) async def sshufflecmd(self, message: Message): """- 🔀 Enable shuffle""" self.sp.shuffle(True) await utils.answer(message, self.strings["on-shuffle"]) @error_handler @tokenized @loader.command( ru_doc="- 🔀 Отключить перемешивание" ) async def sdeshufflecmd(self, message: Message): """- 🔀 Disable shuffle""" self.sp.shuffle(False) await utils.answer(message, self.strings["off-shuffle"]) @error_handler @tokenized @loader.command( ru_doc="- 👉 Следующий трек" ) async def snextcmd(self, message: Message): """- 👉 Next track""" self.sp.next_track() await utils.answer(message, self.strings["skipped"]) @error_handler @tokenized @loader.command( ru_doc="- 🤚 Продолжить воспроизведение" ) async def sresumecmd(self, message: Message): """- 🤚 Resume""" self.sp.start_playback() await utils.answer(message, self.strings["playing"]) @error_handler @tokenized @loader.command( ru_doc="- 🤚 Пауза" ) async def spausecmd(self, message: Message): """- 🤚 Pause""" self.sp.pause_playback() await utils.answer(message, self.strings["paused"]) @error_handler @tokenized @loader.command( ru_doc="- ⏮ Предыдущий трек" ) async def sbackcmd(self, message: Message): """- ⏮ Previous track""" self.sp.previous_track() await utils.answer(message, self.strings["back"]) @error_handler @tokenized @loader.command( ru_doc="- ⏪ Перезапустить трек" ) async def sbegincmd(self, message: Message): """- ⏪ Restart track""" self.sp.seek_track(0) await utils.answer(message, self.strings["restarted"]) @error_handler @tokenized @loader.command( ru_doc="- ❤️ Лайкнуть играющий трек" ) async def slikecmd(self, message: Message): """- ❤️ Like current track""" cupl = self.sp.current_playback() self.sp.current_user_saved_tracks_add([cupl["item"]["id"]]) await utils.answer(message, self.strings["liked"]) @error_handler @tokenized @loader.command( ru_doc="- 💔 Убрать лайк с играющего трека" ) async def sunlikecmd(self, message: Message): """- 💔 Unlike current track""" cupl = self.sp.current_playback() self.sp.current_user_saved_tracks_delete([cupl["item"]["id"]]) await utils.answer(message, self.strings["unlike"]) @error_handler @loader.command( ru_doc="- Получить ссылку для авторизации" ) async def sauthcmd(self, message: Message): """- Get authorization link""" if self.get("acs_tkn", False) and not self.sp: await utils.answer(message, self.strings["already_authed"]) else: self.sp_auth.get_authorize_url() await utils.answer( message, self.strings["auth"].format(self.sp_auth.get_authorize_url()), ) @error_handler @loader.command( ru_doc="- Вставить код авторизации" ) async def scodecmd(self, message: Message): """- Paste authorization code""" url = message.message.split(" ")[1] code = self.sp_auth.parse_auth_response_url(url) self.set("acs_tkn", self.sp_auth.get_access_token(code, True, False)) self._init_spotify_client() await utils.answer(message, self.strings["authed"]) @error_handler @loader.command( ru_doc="- Выйти из аккаунта" ) async def unauthcmd(self, message: Message): """- Log out of account""" self.set("acs_tkn", None) self.sp = None await utils.answer(message, self.strings["deauth"]) @error_handler @tokenized @loader.command( ru_doc="| .stokr - Обновить токен авторизации", alias="stokr" ) async def stokrefreshcmd(self, message: Message): """| .stokr - Refresh authorization token""" self.set( "acs_tkn", self.sp_auth.refresh_access_token(self.get("acs_tkn")["refresh_token"]), ) self.set("NextRefresh", time.time() + 45 * 60) self._init_spotify_client() await utils.answer(message, self.strings["authed"]) @error_handler @tokenized @loader.command( ru_doc="| .sn - 🎧 Показать карточку играющего трека", alias="sn" ) async def snowcmd(self, message: Message): """| .sn - 🎧 View current track card.""" current_playback = self.sp.current_playback() if not current_playback or not current_playback.get("is_playing", False): await utils.answer(message, self.strings["no_music"]) return track = current_playback["item"]["name"] track_id = current_playback["item"]["id"] artists = ", ".join([a["name"] for a in current_playback["item"]["artists"]]) album_name = current_playback["item"]["album"].get("name", "Unknown Album") duration_ms = current_playback["item"].get("duration_ms", 0) progress_ms = current_playback.get("progress_ms", 0) duration = f"{duration_ms//1000//60}:{duration_ms//1000%60:02}" progress = f"{progress_ms//1000//60}:{progress_ms//1000%60:02}" spotify_url = f"https://open.spotify.com/track/{track_id}" songlink = f"https://song.link/s/{track_id}" try: device_raw = ( current_playback["device"]["name"] + " " + current_playback["device"]["type"].lower() ) device = device_raw.replace("computer", "").replace("smartphone", "").strip() except Exception: device = None try: playlist_id = current_playback["context"]["uri"].split(":")[-1] playlist = self.sp.playlist(playlist_id) playlist_name = playlist.get("name", None) try: playlist_owner = ( f'' f'{playlist["owner"]["display_name"]}' ) except KeyError: playlist_owner = playlist.get("owner", {}).get("display_name", "") except Exception: playlist_name = "" playlist_owner = "" sdata = { "track": utils.escape_html(track), "artists": utils.escape_html(artists), "album": utils.escape_html(album_name), "duration": duration, "progress": progress, "device": device, "spotify_url": spotify_url, "songlink": songlink, "playlist": utils.escape_html(playlist_name) if playlist_name else "", "playlist_owner": playlist_owner or "", } data = await utils.get_placeholders(sdata, self.config["custom_text"]) text = self.config["custom_text"].format(**data) if self.config["show_banner"]: cover_url = current_playback["item"]["album"]["images"][0]["url"] tmp_msg = await utils.answer(message, text + self.strings["uploading_banner"]) banners = Banners( title=track, artists=artists, duration=duration_ms, progress=progress_ms, track_cover=requests.get(cover_url).content, font=self.config["font"], blur=self.config["blur_intensity"], album_title=album_name, meta_info="Spotify", ) version = self.config["banner_version"] if version == "ultra": file = banners.ultra() elif version == "vertical": file = banners.vertical() else: file = banners.horizontal() await utils.answer(tmp_msg, text, file=file) else: await utils.answer(message, text) @error_handler @tokenized @loader.command( ru_doc="| .snt - 🎧 Скачать играющий трек", alias="snt" ) async def snowtcmd(self, message: Message): """| .snt - 🎧 Download current track.""" current_playback = self.sp.current_playback() if not current_playback or not current_playback.get("is_playing", False): await utils.answer(message, self.strings["no_music"]) return track = current_playback["item"]["name"] artists = ", ".join([a["name"] for a in current_playback["item"]["artists"]]) album_name = current_playback["item"]["album"].get("name", "Unknown Album") duration_ms = current_playback["item"].get("duration_ms", 0) progress_ms = current_playback.get("progress_ms", 0) duration = f"{duration_ms//1000//60}:{duration_ms//1000%60:02}" progress = f"{progress_ms//1000//60}:{progress_ms//1000%60:02}" spotify_url = f"https://open.spotify.com/track/{current_playback['item']['id']}" songlink = f"https://song.link/s/{current_playback['item']['id']}" try: device_raw = ( current_playback["device"]["name"] + " " + current_playback["device"]["type"].lower() ) device = device_raw.replace("computer", "").replace("smartphone", "").strip() except Exception: device = None try: playlist_id = current_playback["context"]["uri"].split(":")[-1] playlist = self.sp.playlist(playlist_id) playlist_name = playlist.get("name", None) try: playlist_owner = ( f'' f'{playlist["owner"]["display_name"]}' ) except KeyError: playlist_owner = playlist.get("owner", {}).get("display_name", "") except Exception: playlist_name = "" playlist_owner = "" sdata = { "track": utils.escape_html(track), "artists": utils.escape_html(artists), "album": utils.escape_html(album_name), "duration": duration, "progress": progress, "device": device, "spotify_url": spotify_url, "songlink": songlink, "playlist": utils.escape_html(playlist_name) if playlist_name else "", "playlist_owner": playlist_owner or "", } data = await utils.get_placeholders(sdata, self.config["custom_text"]) text = self.config["custom_text"].format(**data) msg = await utils.answer(message, text + self.strings["downloading_track"]) await self._download_track( msg, f"{artists} {track}", caption=text, track_name=track, artists=artists, log_context=f"{track} - {artists}", ) @error_handler @tokenized @loader.command( ru_doc="- 🔍 Поиск треков." ) async def sqcmd(self, message: Message): """- 🔍 Search for tracks.""" args = utils.get_args_raw(message) if not args: await utils.answer(message, self.strings["no_search_query"]) return search_results = self.get("last_search_results", []) is_selection = False if args.isdigit(): track_number = int(args) if search_results and 0 < track_number <= len(search_results): is_selection = True if is_selection: track_number = int(args) msg = await utils.answer(message, self.strings["downloading_track"]) track_info = search_results[track_number - 1] track_name, artists = self._track_info(track_info) reply_to_id = self._reply_id(message) chat_id = self._get_chat_id(message) target = chat_id if chat_id is not None else msg success = await self._download_track( target, f"{artists} {track_name}", track_name=track_name, artists=artists, log_context=f"{track_name} - {artists}", reply_to_id=reply_to_id, ) if success: with contextlib.suppress(Exception): await msg.delete() self.set("last_search_results", []) else: results = await asyncio.to_thread( self.sp.search, q=args, limit=5, type="track", ) if not results or not results["tracks"]["items"]: await utils.answer(message, self.strings["no_tracks_found"].format(args)) return tracks = results["tracks"]["items"] self.set("last_search_results", tracks) reply_to_id = self._reply_id(message) await self.inline.form( self.strings["search_results_inline"].format( count=len(tracks), query=utils.escape_html(args), ), message=message, reply_markup=self._search_keyboard( tracks, self._get_chat_id(message), reply_to_id, ), ) @error_handler @tokenized @loader.command(ru_doc="- 🔍 Поиск треков.") async def ssearchcmd(self, message: Message): """- 🔍 Search for tracks.""" await self.sqcmd(message) async def watcher(self, message: Message): """Watcher is used to update token""" if not self.sp: return raw = getattr(message, "raw_text", "") or "" if "spdl_" in raw: try: tag = raw.split("spdl_")[1].split("")[0] sid, idx = tag.split("_") store_id, index = int(sid), int(idx) except: return data = self._sp_store.pop(store_id, []) if not data or index >= len(data): return track_name, artists = data[index] chat_id = self._get_chat_id(message) if not chat_id: return reply_to_id = self._reply_id(message) success = await self._download_track( chat_id, f"{artists} {track_name}", track_name=track_name, artists=artists, log_context=f"{track_name} - {artists}", reply_to_id=reply_to_id, ) if success: with contextlib.suppress(Exception): await message.delete() return next_refresh = self.get("NextRefresh") if not next_refresh or next_refresh < time.time(): acs_tkn = self.get("acs_tkn") if not acs_tkn or not acs_tkn.get("refresh_token"): self.set("NextRefresh", time.time() + 300) return try: new_token = self.sp_auth.refresh_access_token(acs_tkn["refresh_token"]) self.set("acs_tkn", new_token) self.set("NextRefresh", time.time() + 45 * 60) if new_token and new_token.get("access_token"): self.sp = spotipy.Spotify(auth=new_token["access_token"]) logger.debug("Token refreshed successfully") except Exception as e: logger.error("Token refresh error: %s", e, exc_info=True) if "Refresh token revoked" in str(e): logger.warning("Refresh token revoked, re-authenticating") refresh_token = await self.invoke("stokrefresh", "", self.inline.bot.id) await refresh_token.delete() else: self.set("NextRefresh", time.time() + 300) # слендермен