From 1ced5efae6851e6474eddad1e7573c1ec1aca633 Mon Sep 17 00:00:00 2001 From: Macsim <134152147+MuRuLOSE@users.noreply.github.com> Date: Thu, 5 Feb 2026 20:01:13 +0300 Subject: [PATCH] Update SpotifyMod.py --- radiocycle/Modules/SpotifyMod.py | 1437 +----------------------------- 1 file changed, 1 insertion(+), 1436 deletions(-) diff --git a/radiocycle/Modules/SpotifyMod.py b/radiocycle/Modules/SpotifyMod.py index 218588b..7834a05 100644 --- a/radiocycle/Modules/SpotifyMod.py +++ b/radiocycle/Modules/SpotifyMod.py @@ -1,1436 +1 @@ -# █ █ ▀ █▄▀ ▄▀█ █▀█ ▀ -# █▀█ █ █ █ █▀█ █▀▄ █ -# © Copyright 2022 -# -# https://t.me/hikariatama -# -# 🔒 Licensed under the GNU AGPLv3 -# 🌐 https://www.gnu.org/licenses/agpl-3.0.html -# -# You CANNOT edit, distribute or redistribute this file without direct permission from the author. -# -# ORIGINAL MODULE: https://raw.githubusercontent.com/hikariatama/ftg/master/spotify.py - -# ======================================= -# _ __ __ __ _ -# | |/ /___ | \/ | ___ __| |___ -# | ' // _ \ | |\/| |/ _ \ / _` / __| -# | . \ __/ | | | | (_) | (_| \__ \ -# |_|\_\___| |_| |_|\___/ \__,_|___/ -# @ke_mods -# ======================================= -# -# LICENSE: CC BY-ND 4.0 (Attribution-NoDerivatives 4.0 International) -# -------------------------------------- -# https://creativecommons.org/licenses/by-nd/4.0/legalcode -# ======================================= - -# meta developer: @ke_mods -# requires: telethon spotipy pillow requests yt-dlp - -import asyncio -import contextlib -import functools -import io -import logging -import re -import textwrap -import time -import traceback -import os -from types import FunctionType - -import requests -import spotipy -from PIL import Image, ImageDraw, ImageEnhance, ImageFilter, ImageFont, ImageOps -from telethon.errors import FloodWaitError -from telethon.tl.functions.account import UpdateProfileRequest -from telethon.tl.types import Message - -from .. import loader, utils - -logger = logging.getLogger(__name__) -logging.getLogger("spotipy").setLevel(logging.CRITICAL) - -class Banners: - def __init__( - self, - title: str, - artists: list, - duration: int, - progress: int, - track_cover: bytes, - font - ): - self.title = title - self.artists = ", ".join(artists) if isinstance(artists, list) else artists - self.duration = duration - self.progress = progress - self.track_cover = track_cover - self.font_url = font - - def _get_font(self, size, font_bytes): - return ImageFont.truetype(io.BytesIO(font_bytes), size) - - def _prepare_cover(self, size, radius): - cover = Image.open(io.BytesIO(self.track_cover)).convert("RGBA") - cover = cover.resize((size, size), Image.Resampling.LANCZOS) - - mask = Image.new("L", (size, size), 0) - draw = ImageDraw.Draw(mask) - draw.rounded_rectangle((0, 0, size, size), radius=radius, fill=255) - - output = Image.new("RGBA", (size, size), (0, 0, 0, 0)) - output.paste(cover, (0, 0), mask=mask) - return output - - def _prepare_background(self, w, h): - bg = Image.open(io.BytesIO(self.track_cover)).convert("RGBA") - bg = bg.resize((w, h), Image.Resampling.BICUBIC) - bg = bg.filter(ImageFilter.GaussianBlur(radius=40)) - bg = ImageEnhance.Brightness(bg).enhance(0.4) - return bg - - def _draw_progress_bar(self, draw, x, y, w, h, progress_pct, color="white", bg_color="#5e5e5e"): - draw.rounded_rectangle((x, y, x + w, y + h), radius=h/2, fill=bg_color) - - fill_w = int(w * progress_pct) - if fill_w > 0: - draw.rounded_rectangle((x, y, x + fill_w, y + h), radius=h/2, fill=color) - - dot_radius = h * 1.2 - dot_x = x + fill_w - dot_y = y + (h / 2) - - draw.ellipse( - (dot_x - dot_radius, dot_y - dot_radius, dot_x + dot_radius, dot_y + dot_radius), - fill=color - ) - - def horizontal(self): - W, H = 1500, 600 - padding = 60 - cover_size = 480 - - font_bytes = requests.get(self.font_url).content - title_font = self._get_font(55, font_bytes) - artist_font = self._get_font(45, font_bytes) - time_font = self._get_font(25, font_bytes) - - img = self._prepare_background(W, H) - draw = ImageDraw.Draw(img) - - cover = self._prepare_cover(cover_size, 30) - img.paste(cover, (padding, (H - cover_size) // 2), cover) - - text_x = padding + cover_size + 60 - text_y_start = 100 - text_width_limit = W - text_x - padding - - display_title = self.title - while title_font.getlength(display_title) > text_width_limit and len(display_title) > 0: - display_title = display_title[:-1] - if len(display_title) < len(self.title): display_title += "…" - - display_artist = self.artists - while artist_font.getlength(display_artist) > text_width_limit and len(display_artist) > 0: - display_artist = display_artist[:-1] - if len(display_artist) < len(self.artists): display_artist += "…" - - draw.text((text_x, text_y_start), display_title, font=title_font, fill="white") - draw.text((text_x, text_y_start + 70), display_artist, font=artist_font, fill="#B3B3B3") - - cur_time = f"{(self.progress//1000//60):02}:{(self.progress//1000%60):02}" - dur_time = f"{(self.duration//1000//60):02}:{(self.duration//1000%60):02}" - - cur_w = time_font.getlength(cur_time) - dur_w = time_font.getlength(dur_time) - - bar_y = 480 - bar_h = 8 - gap = 25 - - draw.text((text_x, bar_y - 12), cur_time, font=time_font, fill="white") - - bar_start_x = text_x + cur_w + gap - bar_end_x = text_x + text_width_limit - dur_w - gap - bar_w = bar_end_x - bar_start_x - - prog_pct = self.progress / self.duration if self.duration > 0 else 0 - self._draw_progress_bar(draw, bar_start_x, bar_y, bar_w, bar_h, prog_pct) - - draw.text((bar_end_x + gap, bar_y - 12), dur_time, font=time_font, fill="white") - - by = io.BytesIO() - img.save(by, format="PNG") - by.seek(0) - by.name = "banner.png" - return by - - def vertical(self): - W, H = 1000, 1500 - padding = 80 - cover_size = 800 - - font_bytes = requests.get(self.font_url).content - title_font = self._get_font(60, font_bytes) - artist_font = self._get_font(45, font_bytes) - time_font = self._get_font(35, font_bytes) - - img = self._prepare_background(W, H) - draw = ImageDraw.Draw(img) - - cover = self._prepare_cover(cover_size, 40) - cover_x = (W - cover_size) // 2 - cover_y = 120 - img.paste(cover, (cover_x, cover_y), cover) - - text_area_y = cover_y + cover_size + 120 - text_width_limit = W - (padding * 2) - - display_title = self.title - while title_font.getlength(display_title) > text_width_limit and len(display_title) > 0: - display_title = display_title[:-1] - if len(display_title) < len(self.title): display_title += "…" - - display_artist = self.artists - while artist_font.getlength(display_artist) > text_width_limit and len(display_artist) > 0: - display_artist = display_artist[:-1] - if len(display_artist) < len(self.artists): display_artist += "…" - - title_w = title_font.getlength(display_title) - draw.text(((W - title_w) / 2, text_area_y), display_title, font=title_font, fill="white") - - artist_w = artist_font.getlength(display_artist) - draw.text(((W - artist_w) / 2, text_area_y + 75), display_artist, font=artist_font, fill="#B3B3B3") - - bar_y = text_area_y + 260 - bar_h = 8 - bar_w = W - (padding * 2) - prog_pct = self.progress / self.duration if self.duration > 0 else 0 - - self._draw_progress_bar(draw, padding, bar_y, bar_w, bar_h, prog_pct, color="white", bg_color="#5e5e5e") - - cur_time = f"{(self.progress//1000//60):02}:{(self.progress//1000%60):02}" - dur_time = f"{(self.duration//1000//60):02}:{(self.duration//1000%60):02}" - - draw.text((padding, bar_y + 40), cur_time, font=time_font, fill="#B3B3B3") - - dur_w = time_font.getlength(dur_time) - draw.text((W - padding - dur_w, bar_y + 40), dur_time, font=time_font, fill="#B3B3B3") - - by = io.BytesIO() - img.save(by, format="PNG") - by.seek(0) - by.name = "banner.png" - return by - -@loader.tds -class SpotifyMod(loader.Module): - """Card with the currently playing track on Spotify.""" - - strings = { - "name": "SpotifyMod", - "need_auth": ( - " Please execute" - " .sauth before performing this action." - ), - "on-repeat": ( - "🔄 Set on-repeat." - ), - "off-repeat": ( - "🔄 Stopped track" - " repeat." - ), - "skipped": ( - "➡️ Skipped track." - ), - "playing": "▶️ Playing...", - "back": ( - "⬅️ Switched to previous" - " track" - ), - "paused": " Pause", - "restarted": ( - "✅️ Playing track" - " from the" - " beginning" - ), - "liked": ( - "❤️ Liked current" - " playback" - ), - "unlike": ( - "" - " Unliked current playback" - ), - "err": ( - " An error occurred." - "\n{}" - ), - "already_authed": ( - " Already authorized" - ), - "authed": ( - " Authentication" - " successful" - ), - "deauth": ( - "🚪 Successfully logged out" - " of account" - ), - "auth": ( - '🔗 Follow this' - " link, allow access, then enter .scode https://... with" - " the link you received." - ), - "no_music": ( - " No music is playing!" - ), - "dl_err": ( - " Failed to download" - " track." - ), - "volume_changed": ( - "🔊" - " Volume changed to {}%." - ), - "volume_invalid": ( - " Volume level must be" - " a number between 0 and 100." - ), - "volume_err": ( - " An error occurred while" - " changing volume." - ), - "no_volume_arg": ( - " Please specify a" - " volume level between 0 and 100." - ), - "searching_tracks": ( - "🕔 Searching for tracks" - " matching {}..." - ), - "no_search_query": ( - " Please specify a" - " search query." - ), - "no_tracks_found": ( - " No tracks found for" - " {}." - ), - "search_results": ( - " Search results for" - " {}:\n\n{}" - ), - "downloading_search_track": ( - "🕔 Downloading {}..." - ), - "download_success": ( - " Successfully downloaded {} - {}" - ), - "invalid_track_number": ( - " Invalid track number." - " Please search first or provide a valid number from the list." - ), - "device_list": ( - "📄 Available devices:\n{}" - ), - "no_devices_found": ( - " No devices found." - ), - "device_changed": ( - " Playback transferred to" - " {}." - ), - "invalid_device_id": ( - " Invalid device ID." - " Use .sdevice to see available devices." - ), - "search_results_cleared": " Search results cleared", - "autobio": ( - "🎧 Spotify autobio {}" - ), - "no_ytdlp": " yt-dlp not found... Check config or install yt-dlp ({}terminal pip install yt-dlp)", - "snowt_failed": "\n\n Download failed", - "uploading_banner": "\n\n🕔 Uploading banner...", - "downloading_track": "\n\n🕔 Downloading track...", - "no_playlists": " No playlists found.", - "playlists_list": "📄 Your playlists:\n\n{}", - "added_to_playlist": " Added {} to {}", - "removed_from_playlist": " Removed {} from {}", - "invalid_playlist_index": " Invalid playlist number.", - "no_cached_playlists": " Use .splaylists first.", - "playlist_created": " Playlist {} created.", - "playlist_deleted": " Playlist {} deleted.", - "no_playlist_name": " Please specify a playlist name.", - } - - strings_ru = { - "_cls_doc": "Карточка с играющим треком в Spotify.", - "need_auth": ( - " Выполни" - " .sauth перед выполнением этого действия." - ), - "err": ( - " Произошла ошибка." - "\n{}" - ), - "on-repeat": ( - "🔄 Включен повтор трека." - ), - "off-repeat": ( - "🔄 Повтор трека отключён." - ), - "skipped": ( - "➡️ Трек пропущен." - ), - "playing": "▶️ Играет...", - "back": ( - "⬅️ Переключено на предыдущий трек" - ), - "paused": " Пауза", - "restarted": ( - "✅️ Воспроизведение трека с начала..." - ), - "liked": ( - "❤️ Текущий трек добавлен в избранное" - ), - "unlike": ( - " Убрал лайк с текущего трека" - ), - "already_authed": ( - " Уже авторизован" - ), - "authed": ( - " Успешная аутентификация" - ), - "deauth": ( - "🚪 Успешный выход из аккаунта" - ), - "auth": ( - '🔗 Пройдите по этой ссылке, разрешите вход, затем введите .scode https://... с ссылкой которую вы получили.' - ), - "no_music": ( - " Музыка не играет!" - ), - "dl_err": ( - " Не удалось скачать трек." - ), - "volume_changed": ( - "🔊" - " Громкость изменена на {}%." - ), - "volume_invalid": ( - " Уровень громкости должен" - " быть числом от 0 до 100." - ), - "volume_err": ( - " Произошла ошибка при" - " изменении громкости." - ), - "no_volume_arg": ( - " Пожалуйста, укажите" - " уровень громкости от 0 до 100." - ), - "searching_tracks": ( - "🕔 Идет поиск треков" - " по запросу {}..." - ), - "no_search_query": ( - " Пожалуйста, укажите" - " поисковый запрос." - ), - "no_tracks_found": ( - " По запросу '{}'" - " ничего не найдено." - ), - "search_results": ( - " Результаты поиска" - " по запросу {}:\n\n{}" - ), - "downloading_search_track": ( - "🕔 Скачиваю {}..." - ), - "download_success": ( - " Трек {} - {} успешно скачан." - ), - "invalid_track_number": ( - " Некорректный номер трека." - " Сначала выполните поиск или укажите правильный номер из списка." - ), - "device_list": ( - "📄 Доступные устройства:\n{}" - ), - "no_devices_found": ( - " Устройства не найдены." - ), - "device_changed": ( - " Воспроизведение переключено на" - " {}." - ), - "invalid_device_id": ( - " Некорректный ID устройства." - " Используйте .sdevice , чтобы увидеть доступные устройства." - ), - "search_results_cleared": " Результаты поиска очищены", - "autobio": ( - "🎧 Обновление био" - " включено {}" - ), - "no_ytdlp": " yt-dlp не найден... Проверьте конфиг или установите yt-dlp ({}terminal pip install yt-dlp)", - "snowt_failed": "\n\n Ошибка скачивания.", - "uploading_banner": "\n\n🕔 Загрузка баннера...", - "downloading_track": "\n\n🕔 Скачивание трека...", - "no_playlists": " Плейлисты не найдены.", - "playlists_list": "📄 Ваши плейлисты:\n\n{}", - "added_to_playlist": " Трек {} добавлен в {}", - "removed_from_playlist": " Трек {} удален из {}", - "invalid_playlist_index": " Неверный номер плейлиста.", - "no_cached_playlists": " Сначала используйте .splaylists.", - "playlist_created": " Плейлист {} создан.", - "playlist_deleted": " Плейлист {} удален.", - "no_playlist_name": " Пожалуйста, укажите название плейлиста.", - } - strings_jp = { - "_cls_doc": "Spotify からのメッセージ", - "need_auth": ( - " この操作を行う前に " - ".sauth を実行してください。" - ), - "on-repeat": ( - "🔄 リピート再生を設定しました。" - ), - "off-repeat": ( - "🔄 リピート再生を解除しました。" - ), - "skipped": ( - "➡️ スキップしました。" - ), - "playing": "▶️ 再生中...", - "back": ( - "⬅️ 前のトラックに戻りました。" - ), - "paused": " 一時停止", - "restarted": ( - "✅️ 最初から再生します。" - ), - "liked": ( - "❤️ お気に入りに追加しました。" - ), - "unlike": ( - "" - " お気に入りから削除しました。" - ), - "err": ( - " エラーが発生しました。" - "\n{}" - ), - "already_authed": ( - " 既に認証されています。" - ), - "authed": ( - " 認証に成功しました。" - ), - "deauth": ( - "🚪 ログアウトしました。" - ), - "auth": ( - '🔗 リンクをクリックしてアクセスを許可し、取得したURLを使って .scode https://... を入力してください。' - ), - "no_music": ( - " 音楽は再生されていません!" - ), - "dl_err": ( - " トラックのダウンロードに失敗しました。" - ), - "volume_changed": ( - "🔊" - " 音量を {}% に変更しました。" - ), - "volume_invalid": ( - " 音量は0から100の数字で指定してください。" - ), - "volume_err": ( - " 音量の変更中にエラーが発生しました。" - ), - "no_volume_arg": ( - " 0から100の間で音量を指定してください。" - ), - "searching_tracks": ( - "🕔 {} を検索中..." - ), - "no_search_query": ( - " 検索キーワードを指定してください。" - ), - "no_tracks_found": ( - " {} は見つかりませんでした。" - ), - "search_results": ( - " {} の検索結果:\n\n{}" - ), - "downloading_search_track": ( - "🕔 {} をダウンロード中..." - ), - "download_success": ( - " {} - {} のダウンロードに成功しました。" - ), - "invalid_track_number": ( - " トラック番号が無効です。" - " 先に検索するか、リストから有効な番号を指定してください。" - ), - "device_list": ( - "📄 利用可能なデバイス:\n{}" - ), - "no_devices_found": ( - " デバイスが見つかりません。" - ), - "device_changed": ( - " 再生デバイスを" - " {} に切り替えました。" - ), - "invalid_device_id": ( - " デバイスIDが無効です。" - " .sdevice で利用可能なデバイスを確認してください。" - ), - "search_results_cleared": " 検索結果をクリアしました。", - "autobio": ( - "🎧 Spotify AutoBio: {}" - ), - "no_ytdlp": " yt-dlpが見つかりません... 設定を確認するか、インストールしてください ({}terminal pip install yt-dlp)", - "snowt_failed": "\n\n ダウンロードに失敗しました。", - "uploading_banner": "\n\n🕔 バナーをアップロード中...", - "downloading_track": "\n\n🕔 トラックをダウンロード中...", - "no_playlists": " プレイリストが見つかりません。", - "playlists_list": "📄 あなたのプレイリスト:\n\n{}", - "added_to_playlist": " {} を {} に追加しました。", - "removed_from_playlist": " {} を {} から削除しました。", - "invalid_playlist_index": " プレイリスト番号が無効です。", - "no_cached_playlists": " 先に .splaylists を使用してください。", - "playlist_created": " プレイリスト {} を作成しました。", - "playlist_deleted": " プレイリスト {} を削除しました。", - "no_playlist_name": " プレイリスト名を指定してください。", - } - - def __init__(self): - self._client_id = "e0708753ab60499c89ce263de9b4f57a" - self._client_secret = "80c927166c664ee98a43a2c0e2981b4a" - self.scope = ( - "user-read-playback-state playlist-read-private playlist-read-collaborative" - " user-modify-playback-state user-library-modify" - " playlist-modify-public playlist-modify-private" - ) - self.sp_auth = spotipy.oauth2.SpotifyOAuth( - client_id=self._client_id, - client_secret=self._client_secret, - redirect_uri="https://thefsch.github.io/spotify/", - scope=self.scope, - ) - self.config = loader.ModuleConfig( - loader.ConfigValue( - "show_banner", - True, - "Show banner with track info", - validator=loader.validators.Boolean(), - ), - loader.ConfigValue( - "custom_text", - ( - "🎧 Now playing: {track} — {artists}\n" - "🔗 song.link" - ), - """Custom text, supports {track}, {artists}, {album}, {playlist}, {playlist_owner}, {spotify_url}, {songlink}, {progress}, {duration}, {device} placeholders""", - validator=loader.validators.String(), - ), - loader.ConfigValue( - "font", - "https://raw.githubusercontent.com/kamekuro/assets/master/fonts/Onest-Bold.ttf", - "Custom font. Specify URL to .ttf file", - validator=loader.validators.String(), - ), - loader.ConfigValue( - "auto_bio_template", - "🎧 {}", - lambda: "Template for Spotify AutoBio", - ), - loader.ConfigValue( - "ytdlp_path", - "", - "Path to ytdlp binary", - validator=loader.validators.String(), - ), - loader.ConfigValue( - "banner_version", - "horizontal", - lambda: "Banner version", - validator=loader.validators.Choice(["horizontal", "vertical"]), - ), - ) - - async def client_ready(self, client, db): - self.font_ready = asyncio.Event() - - self._premium = getattr(await client.get_me(), "premium", False) - try: - self.sp = spotipy.Spotify(auth=self.get("acs_tkn")["access_token"]) - except Exception: - self.set("acs_tkn", None) - self.sp = None - - if self.get("autobio", False): - self.autobio.start() - - def tokenized(func) -> FunctionType: - @functools.wraps(func) - async def wrapped(*args, **kwargs): - if not args[0].get("acs_tkn", False) or not args[0].sp: - await utils.answer(args[1], args[0].strings("need_auth")) - return - - return await func(*args, **kwargs) - - wrapped.__doc__ = func.__doc__ - wrapped.__module__ = func.__module__ - - return wrapped - - def error_handler(func) -> FunctionType: - @functools.wraps(func) - async def wrapped(*args, **kwargs): - try: - return await func(*args, **kwargs) - except Exception: - logger.exception(traceback.format_exc()) - with contextlib.suppress(Exception): - await utils.answer( - args[1], - args[0].strings("err").format(traceback.format_exc()), - ) - - wrapped.__doc__ = func.__doc__ - wrapped.__module__ = func.__module__ - - return wrapped - - - @loader.loop(interval=90) - async def autobio(self): - try: - current_playback = self.sp.current_playback() - track = current_playback["item"]["name"] - track = re.sub(r"([(].*?[)])", "", track).strip() - except Exception: - return - - bio = self.config["auto_bio_template"].format(f"{track}") - - try: - await self._client( - UpdateProfileRequest(about=bio[: 140 if self._premium else 70]) - ) - except FloodWaitError as e: - logger.info(f"Sleeping {max(e.seconds, 60)} bc of floodwait") - await asyncio.sleep(max(e.seconds, 60)) - return - - async def _download_track(self, message, query: str, caption: str = ""): - dl_dir = os.path.join(os.getcwd(), "spotifymod") - if not os.path.exists(dl_dir): - os.makedirs(dl_dir, exist_ok=True) - - for f in os.listdir(dl_dir): - try: - os.remove(os.path.join(dl_dir, f)) - except: - pass - - try: - squery = query.replace('"', '').replace("'", "") - - cmd = ( - f'{self.config["ytdlp_path"]} -x --audio-format mp3 --add-metadata ' - f'-o "{dl_dir}/%(title)s [%(id)s].%(ext)s" ' - f'"ytsearch1:{squery}"' - ) - - proc = await asyncio.create_subprocess_shell( - cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE - ) - await proc.communicate() - - files = [f for f in os.listdir(dl_dir) if f.endswith(".mp3")] - - if files: - target_file = os.path.join(dl_dir, files[0]) - await utils.answer(message, caption, file=target_file) - else: - await utils.answer(message, self.strings("snowt_failed")) - - except Exception as e: - logger.error(e) - await utils.answer(message, self.strings("dl_err")) - - finally: - if os.path.exists(dl_dir): - for f in os.listdir(dl_dir): - try: - os.remove(os.path.join(dl_dir, f)) - except: - pass - - - @error_handler - @tokenized - @loader.command( - ru_doc="- ➕ Добавить текущий трек в плейлист (используйте номер из .splaylists)" - ) - async def splaylistadd(self, message: Message): - """- ➕ Add current track to playlist (use number from .splaylists)""" - args = utils.get_args_raw(message) - if not args or not args.isdigit(): - await utils.answer(message, self.strings("invalid_playlist_index")) - return - - index = int(args) - 1 - playlists = self.get("last_playlists", []) - - if not playlists: - await utils.answer(message, self.strings("no_cached_playlists")) - return - - if index < 0 or index >= len(playlists): - await utils.answer(message, self.strings("invalid_playlist_index")) - return - - current = self.sp.current_playback() - if not current or not current.get("item"): - await utils.answer(message, self.strings("no_music")) - return - - track_uri = current["item"]["uri"] - track_name = current["item"]["name"] - artists = ", ".join([a["name"] for a in current["item"]["artists"]]) - full_track_name = f"{artists} - {track_name}" - - playlist_id = playlists[index]["id"] - playlist_name = playlists[index]["name"] - - try: - self.sp.playlist_add_items(playlist_id, [track_uri]) - except spotipy.exceptions.SpotifyException as e: - if e.http_status == 403 and "Insufficient client scope" in str(e): - await utils.answer(message, self.strings("need_auth")) - return - raise e - - await utils.answer(message, self.strings("added_to_playlist").format(utils.escape_html(full_track_name), utils.escape_html(playlist_name))) - - @error_handler - @tokenized - @loader.command( - ru_doc="- ➖ Удалить текущий трек из плейлиста (используйте номер из .splaylists)" - ) - async def splaylistrem(self, message: Message): - """- ➖ Remove current track from playlist (use number from .splaylists)""" - args = utils.get_args_raw(message) - if not args or not args.isdigit(): - await utils.answer(message, self.strings("invalid_playlist_index")) - return - - index = int(args) - 1 - playlists = self.get("last_playlists", []) - - if not playlists: - await utils.answer(message, self.strings("no_cached_playlists")) - return - - if index < 0 or index >= len(playlists): - await utils.answer(message, self.strings("invalid_playlist_index")) - return - - current = self.sp.current_playback() - if not current or not current.get("item"): - await utils.answer(message, self.strings("no_music")) - return - - track_uri = current["item"]["uri"] - track_name = current["item"]["name"] - artists = ", ".join([a["name"] for a in current["item"]["artists"]]) - full_track_name = f"{artists} - {track_name}" - - playlist_id = playlists[index]["id"] - playlist_name = playlists[index]["name"] - - try: - self.sp.playlist_remove_all_occurrences_of_items(playlist_id, [track_uri]) - except spotipy.exceptions.SpotifyException as e: - if e.http_status == 403 and "Insufficient client scope" in str(e): - await utils.answer(message, self.strings("need_auth")) - return - raise e - - await utils.answer(message, self.strings("removed_from_playlist").format(utils.escape_html(full_track_name), utils.escape_html(playlist_name))) - - @error_handler - @tokenized - @loader.command( - ru_doc="- 🆕 Создать новый плейлист" - ) - async def splaylistcreate(self, message: Message): - """- 🆕 Create a new playlist""" - name = utils.get_args_raw(message) - if not name: - await utils.answer(message, self.strings("no_playlist_name")) - return - - user_id = self.sp.me()["id"] - self.sp.user_playlist_create(user_id, name) - await utils.answer(message, self.strings("playlist_created").format(utils.escape_html(name))) - - @error_handler - @tokenized - @loader.command( - ru_doc="- 🗑 Удалить плейлист (используйте номер из .splaylists)" - ) - async def splaylistdelete(self, message: Message): - """- 🗑 Delete playlist (use number from .splaylists)""" - args = utils.get_args_raw(message) - if not args or not args.isdigit(): - await utils.answer(message, self.strings("invalid_playlist_index")) - return - - index = int(args) - 1 - playlists = self.get("last_playlists", []) - - if not playlists: - await utils.answer(message, self.strings("no_cached_playlists")) - return - - if index < 0 or index >= len(playlists): - await utils.answer(message, self.strings("invalid_playlist_index")) - return - - playlist_id = playlists[index]["id"] - playlist_name = playlists[index]["name"] - - self.sp.current_user_unfollow_playlist(playlist_id) - await utils.answer(message, self.strings("playlist_deleted").format(utils.escape_html(playlist_name))) - - @error_handler - @tokenized - @loader.command( - ru_doc="- 📃 Получить все плейлисты" - ) - async def splaylists(self, message: Message): - """- 📃 Get all playlists""" - user_id = self.sp.me()["id"] - playlists = self.sp.current_user_playlists() - - editable_playlists = [] - for playlist in playlists["items"]: - if playlist["owner"]["id"] == user_id or playlist["collaborative"]: - editable_playlists.append(playlist) - - self.set("last_playlists", editable_playlists) - - playlist_list_text = "" - for i, playlist in enumerate(editable_playlists): - name = utils.escape_html(playlist["name"]) - url = playlist["external_urls"]["spotify"] - count = playlist["tracks"]["total"] - playlist_list_text += f"{i + 1}. {name} ({count} tracks)\n" - - if not playlist_list_text: - await utils.answer(message, self.strings("no_playlists")) - else: - await utils.answer(message, self.strings("playlists_list").format(playlist_list_text)) - - @error_handler - @tokenized - @loader.command( - ru_doc="- ℹ️ Переключить стриминг воспроизведения в био" - ) - async def sbiocmd(self, message: Message): - """- ℹ️ Toggle bio playback streaming""" - current = self.get("autobio", False) - new = not current - self.set("autobio", new) - await utils.answer( - message, - self.strings("autobio").format("enabled" if new else "disabled"), - ) - - if new: - self.autobio.start() - else: - self.autobio.stop() - - @error_handler - @tokenized - @loader.command( - ru_doc="- 🔊 Изменить громкость. .svolume <0-100>" - ) - async def svolume(self, message: Message): - """- 🔊 Change playback volume. .svolume <0-100>""" - try: - args = utils.get_args_raw(message) - if not args: - await utils.answer(message, self.strings("no_volume_arg")) - return - - volume_percent = int(args) - if 0 <= volume_percent <= 100: - self.sp.volume(volume_percent) - await utils.answer(message, self.strings("volume_changed").format(volume_percent)) - else: - await utils.answer(message, self.strings("volume_invalid")) - except ValueError: - await utils.answer(message, self.strings("volume_invalid")) - except Exception: - await utils.answer(message, self.strings("volume_err")) - - @error_handler - @tokenized - @loader.command( - ru_doc=( - "- 🎵 Выбрать устройство для воспроизведения. Например: .sdevice \n" - "- 📝 Показать список устройств: .sdevice" - ) - ) - async def sdevicecmd(self, message: Message): - """- 🎵 Set preferred playback device. Usage: .sdevice or .sdevice to list devices""" - args = utils.get_args_raw(message) - devices = self.sp.devices()["devices"] - - if not args: - if not devices: - await utils.answer(message, self.strings("no_devices_found")) - return - - device_list_text = "" - for i, device in enumerate(devices): - is_active = "(active)" if device["is_active"] else "" - device_list_text += ( - f"{i+1}. {device['name']}" - f" ({device['type']}) {is_active}\n" - ) - - await utils.answer(message, self.strings("device_list").format(device_list_text.strip())) - return - - device_id = None - try: - device_number = int(args) - if 0 < device_number <= len(devices): - device_id = devices[device_number - 1]["id"] - device_name = devices[device_number - 1]["name"] - else: - await utils.answer(message, self.strings("invalid_device_id")) - return - except ValueError: - found_device = next((d for d in devices if d["id"] == args.strip()), None) - if found_device: - device_id = found_device["id"] - device_name = found_device["name"] - else: - await utils.answer(message, self.strings("invalid_device_id")) - return - - self.sp.transfer_playback(device_id=device_id) - await utils.answer(message, self.strings("device_changed").format(device_name)) - - @error_handler - @tokenized - @loader.command( - ru_doc="- 💫 Включить повтор трека" - ) - async def srepeatcmd(self, message: Message): - """- 💫 Repeat""" - self.sp.repeat("track") - await utils.answer(message, self.strings("on-repeat")) - - @error_handler - @tokenized - @loader.command( - ru_doc="- ✋ Остановить повтор" - ) - async def sderepeatcmd(self, message: Message): - """- ✋ Stop repeat""" - self.sp.repeat("context") - await utils.answer(message, self.strings("off-repeat")) - - @error_handler - @tokenized - @loader.command( - ru_doc="- 👉 Следующий трек" - ) - async def snextcmd(self, message: Message): - """- 👉 Next track""" - self.sp.next_track() - await utils.answer(message, self.strings("skipped")) - - @error_handler - @tokenized - @loader.command( - ru_doc="- 🤚 Продолжить воспроизведение" - ) - async def sresumecmd(self, message: Message): - """- 🤚 Resume""" - self.sp.start_playback() - await utils.answer(message, self.strings("playing")) - - @error_handler - @tokenized - @loader.command( - ru_doc="- 🤚 Пауза" - ) - async def spausecmd(self, message: Message): - """- 🤚 Pause""" - self.sp.pause_playback() - await utils.answer(message, self.strings("paused")) - - @error_handler - @tokenized - @loader.command( - ru_doc="- ⏮ Предыдущий трек" - ) - async def sbackcmd(self, message: Message): - """- ⏮ Previous track""" - self.sp.previous_track() - await utils.answer(message, self.strings("back")) - - @error_handler - @tokenized - @loader.command( - ru_doc="- ⏪ Перезапустить трек" - ) - async def sbegincmd(self, message: Message): - """- ⏪ Restart track""" - self.sp.seek_track(0) - await utils.answer(message, self.strings("restarted")) - - @error_handler - @tokenized - @loader.command( - ru_doc="- ❤️ Лайкнуть играющий трек" - ) - async def slikecmd(self, message: Message): - """- ❤️ Like current track""" - cupl = self.sp.current_playback() - self.sp.current_user_saved_tracks_add([cupl["item"]["id"]]) - await utils.answer(message, self.strings("liked")) - - @error_handler - @tokenized - @loader.command( - ru_doc="- 💔 Убрать лайк с играющего трека" - ) - async def sunlikecmd(self, message: Message): - """- 💔 Unlike current track""" - cupl = self.sp.current_playback() - self.sp.current_user_saved_tracks_delete([cupl["item"]["id"]]) - await utils.answer(message, self.strings("unlike")) - - @error_handler - @loader.command( - ru_doc="- Получить ссылку для авторизации" - ) - async def sauthcmd(self, message: Message): - """- Get authorization link""" - if self.get("acs_tkn", False) and not self.sp: - await utils.answer(message, self.strings("already_authed")) - else: - self.sp_auth.get_authorize_url() - await utils.answer( - message, - self.strings("auth").format(self.sp_auth.get_authorize_url()), - ) - - @error_handler - @loader.command( - ru_doc="- Вставить код авторизации" - ) - async def scodecmd(self, message: Message): - """- Paste authorization code""" - url = message.message.split(" ")[1] - code = self.sp_auth.parse_auth_response_url(url) - self.set("acs_tkn", self.sp_auth.get_access_token(code, True, False)) - self.sp = spotipy.Spotify(auth=self.get("acs_tkn")["access_token"]) - await utils.answer(message, self.strings("authed")) - - @error_handler - @loader.command( - ru_doc="- Выйти из аккаунта" - ) - async def unauthcmd(self, message: Message): - """- Log out of account""" - self.set("acs_tkn", None) - del self.sp - await utils.answer(message, self.strings("deauth")) - - @error_handler - @tokenized - @loader.command( - ru_doc="- Обновить токен авторизации" - ) - async def stokrefreshcmd(self, message: Message): - """- Refresh authorization token""" - self.set( - "acs_tkn", - self.sp_auth.refresh_access_token(self.get("acs_tkn")["refresh_token"]), - ) - self.set("NextRefresh", time.time() + 45 * 60) - self.sp = spotipy.Spotify(auth=self.get("acs_tkn")["access_token"]) - await utils.answer(message, self.strings("authed")) - - @error_handler - @tokenized - @loader.command( - ru_doc="- 🎧 Показать карточку играющего трека" - ) - async def snowcmd(self, message: Message): - """- 🎧 View current track card.""" - current_playback = self.sp.current_playback() - if not current_playback or not current_playback.get("is_playing", False): - await utils.answer(message, self.strings("no_music")) - return - - track = current_playback["item"]["name"] - track_id = current_playback["item"]["id"] - artists = ", ".join([a["name"] for a in current_playback["item"]["artists"]]) - album_name = current_playback["item"]["album"].get("name", "Unknown Album") - duration_ms = current_playback["item"].get("duration_ms", 0) - progress_ms = current_playback.get("progress_ms", 0) - - duration = f"{duration_ms//1000//60}:{duration_ms//1000%60:02}" - progress = f"{progress_ms//1000//60}:{progress_ms//1000%60:02}" - - spotify_url = f"https://open.spotify.com/track/{track_id}" - songlink = f"https://song.link/s/{track_id}" - - try: - device_raw = ( - current_playback["device"]["name"] - + " " - + current_playback["device"]["type"].lower() - ) - device = device_raw.replace("computer", "").replace("smartphone", "").strip() - except Exception: - device = None - - try: - playlist_id = current_playback["context"]["uri"].split(":")[-1] - playlist = self.sp.playlist(playlist_id) - playlist_name = playlist.get("name", None) - try: - playlist_owner = ( - f'' - f'{playlist["owner"]["display_name"]}' - ) - except KeyError: - playlist_owner = playlist.get("owner", {}).get("display_name", "") - except Exception: - playlist_name = "" - playlist_owner = "" - - text = self.config["custom_text"].format( - track=utils.escape_html(track), - artists=utils.escape_html(artists), - album=utils.escape_html(album_name), - duration=duration, - progress=progress, - device=device, - spotify_url=spotify_url, - songlink=songlink, - playlist=utils.escape_html(playlist_name) if playlist_name else "", - playlist_owner=playlist_owner or "", - ) - - if self.config["show_banner"]: - cover_url = current_playback["item"]["album"]["images"][0]["url"] - - tmp_msg = await utils.answer(message, text + self.strings("uploading_banner")) - - banners = Banners( - title=track, - artists=artists, - duration=duration_ms, - progress=progress_ms, - track_cover=requests.get(cover_url).content, - font=self.config["font"], - ) - file = getattr(banners, self.config["banner_version"], banners.horizontal)() - - await utils.answer(tmp_msg, text, file=file) - else: - await utils.answer(message, text) - - @error_handler - @tokenized - @loader.command( - ru_doc="- 🎧 Скачать играющий трек" - ) - async def snowtcmd(self, message: Message): - """- 🎧 Download current track.""" - current_playback = self.sp.current_playback() - if not current_playback or not current_playback.get("is_playing", False): - await utils.answer(message, self.strings("no_music")) - return - - track = current_playback["item"]["name"] - artists = ", ".join([a["name"] for a in current_playback["item"]["artists"]]) - album_name = current_playback["item"]["album"].get("name", "Unknown Album") - duration_ms = current_playback["item"].get("duration_ms", 0) - progress_ms = current_playback.get("progress_ms", 0) - - duration = f"{duration_ms//1000//60}:{duration_ms//1000%60:02}" - progress = f"{progress_ms//1000//60}:{progress_ms//1000%60:02}" - - spotify_url = f"https://open.spotify.com/track/{current_playback['item']['id']}" - songlink = f"https://song.link/s/{current_playback['item']['id']}" - - try: - device_raw = ( - current_playback["device"]["name"] - + " " - + current_playback["device"]["type"].lower() - ) - device = device_raw.replace("computer", "").replace("smartphone", "").strip() - except Exception: - device = None - - try: - playlist_id = current_playback["context"]["uri"].split(":")[-1] - playlist = self.sp.playlist(playlist_id) - playlist_name = playlist.get("name", None) - try: - playlist_owner = ( - f'' - f'{playlist["owner"]["display_name"]}' - ) - except KeyError: - playlist_owner = playlist.get("owner", {}).get("display_name", "") - except Exception: - playlist_name = "" - playlist_owner = "" - - text = self.config["custom_text"].format( - track=utils.escape_html(track), - artists=utils.escape_html(artists), - album=utils.escape_html(album_name), - duration=duration, - progress=progress, - device=device, - spotify_url=spotify_url, - songlink=songlink, - playlist=utils.escape_html(playlist_name) if playlist_name else "", - playlist_owner=playlist_owner or "", - ) - - msg = await utils.answer(message, text + self.strings("downloading_track")) - - await self._download_track(msg, f"{artists} {track}", caption=text) - - @error_handler - @tokenized - @loader.command( - ru_doc=( - "- 🔍 Поиск треков. Например: .ssearch Imagine Dragons Believer\n" - "- 🎧 Скачать трек: .ssearch 1 (где 1 — номер трека из списка)" - ) - ) - async def ssearchcmd(self, message: Message): - """🔍 Search for tracks. Usage: .ssearch or .ssearch to download""" - args = utils.get_args_raw(message) - if not args: - await utils.answer(message, self.strings("no_search_query")) - return - - try: - track_number = int(args) - search_results = self.get("last_search_results", []) - - if not search_results: - await utils.answer(message, self.strings("no_tracks_found")) - return - - if track_number <= 0 or track_number > len(search_results): - raise ValueError - - msg = await utils.answer(message, self.strings("downloading_track")) - - track_info = search_results[track_number - 1] - track_name = track_info["name"] - artists = ", ".join([a["name"] for a in track_info["artists"]]) - - caption_text = self.strings("download_success").format( - utils.escape_html(track_name), - utils.escape_html(artists) - ) - - await self._download_track(msg, f"{artists} {track_name}", caption=caption_text) - return - - except ValueError: - await utils.answer(message, self.strings("searching_tracks").format(args)) - - results = self.sp.search(q=args, limit=5, type="track") - - if not results or not results["tracks"]["items"]: - await utils.answer(message, self.strings("no_tracks_found").format(args)) - return - - self.set("last_search_results", results["tracks"]["items"]) - - tracks_list = [] - for i, track in enumerate(results["tracks"]["items"]): - track_name = track["name"] - artists = ", ".join([artist["name"] for artist in track["artists"]]) - track_url = track["external_urls"]["spotify"] - tracks_list.append( - "{number}. {track_name} — {artists}\n🔗 Spotify".format( - number=i + 1, - track_name=utils.escape_html(track_name), - artists=utils.escape_html(artists), - track_url=track_url, - ) - ) - - text = "\n".join(tracks_list) - await utils.answer(message, self.strings("search_results").format(args, text)) - - - @loader.command( - ru_doc="- 🔄 Сброс результатов поиска по трекам" - ) - async def ssearchresetcmd(self, message: Message): - """- 🔄 Reset track search results""" - self.set("last_search_results", []) - await utils.answer(message, self.strings["search_results_cleared"]) - - async def watcher(self, message: Message): - """Watcher is used to update token""" - if not self.sp: - return - - if self.get("NextRefresh", False): - ttc = self.get("NextRefresh", 0) - crnt = time.time() - if ttc < crnt: - self.set( - "acs_tkn", - self.sp_auth.refresh_access_token( - self.get("acs_tkn")["refresh_token"] - ), - ) - self.set("NextRefresh", time.time() + 45 * 60) - self.sp = spotipy.Spotify(auth=self.get("acs_tkn")["access_token"]) - else: - self.set( - "acs_tkn", - self.sp_auth.refresh_access_token(self.get("acs_tkn")["refresh_token"]), - ) - self.set("NextRefresh", time.time() + 45 * 60) - self.sp = spotipy.Spotify(auth=self.get("acs_tkn")["access_token"]) +# License Violation. Original module has GPL-v3 but author of nowhere tried to change it to CC BY-ND 4.0