diff --git a/radiocycle/Modules/SpotifyMod.py b/radiocycle/Modules/SpotifyMod.py
index 218588b..7834a05 100644
--- a/radiocycle/Modules/SpotifyMod.py
+++ b/radiocycle/Modules/SpotifyMod.py
@@ -1,1436 +1 @@
-# █ █ ▀ █▄▀ ▄▀█ █▀█ ▀
-# █▀█ █ █ █ █▀█ █▀▄ █
-# © Copyright 2022
-#
-# https://t.me/hikariatama
-#
-# 🔒 Licensed under the GNU AGPLv3
-# 🌐 https://www.gnu.org/licenses/agpl-3.0.html
-#
-# You CANNOT edit, distribute or redistribute this file without direct permission from the author.
-#
-# ORIGINAL MODULE: https://raw.githubusercontent.com/hikariatama/ftg/master/spotify.py
-
-# =======================================
-# _ __ __ __ _
-# | |/ /___ | \/ | ___ __| |___
-# | ' // _ \ | |\/| |/ _ \ / _` / __|
-# | . \ __/ | | | | (_) | (_| \__ \
-# |_|\_\___| |_| |_|\___/ \__,_|___/
-# @ke_mods
-# =======================================
-#
-# LICENSE: CC BY-ND 4.0 (Attribution-NoDerivatives 4.0 International)
-# --------------------------------------
-# https://creativecommons.org/licenses/by-nd/4.0/legalcode
-# =======================================
-
-# meta developer: @ke_mods
-# requires: telethon spotipy pillow requests yt-dlp
-
-import asyncio
-import contextlib
-import functools
-import io
-import logging
-import re
-import textwrap
-import time
-import traceback
-import os
-from types import FunctionType
-
-import requests
-import spotipy
-from PIL import Image, ImageDraw, ImageEnhance, ImageFilter, ImageFont, ImageOps
-from telethon.errors import FloodWaitError
-from telethon.tl.functions.account import UpdateProfileRequest
-from telethon.tl.types import Message
-
-from .. import loader, utils
-
-logger = logging.getLogger(__name__)
-logging.getLogger("spotipy").setLevel(logging.CRITICAL)
-
-class Banners:
- def __init__(
- self,
- title: str,
- artists: list,
- duration: int,
- progress: int,
- track_cover: bytes,
- font
- ):
- self.title = title
- self.artists = ", ".join(artists) if isinstance(artists, list) else artists
- self.duration = duration
- self.progress = progress
- self.track_cover = track_cover
- self.font_url = font
-
- def _get_font(self, size, font_bytes):
- return ImageFont.truetype(io.BytesIO(font_bytes), size)
-
- def _prepare_cover(self, size, radius):
- cover = Image.open(io.BytesIO(self.track_cover)).convert("RGBA")
- cover = cover.resize((size, size), Image.Resampling.LANCZOS)
-
- mask = Image.new("L", (size, size), 0)
- draw = ImageDraw.Draw(mask)
- draw.rounded_rectangle((0, 0, size, size), radius=radius, fill=255)
-
- output = Image.new("RGBA", (size, size), (0, 0, 0, 0))
- output.paste(cover, (0, 0), mask=mask)
- return output
-
- def _prepare_background(self, w, h):
- bg = Image.open(io.BytesIO(self.track_cover)).convert("RGBA")
- bg = bg.resize((w, h), Image.Resampling.BICUBIC)
- bg = bg.filter(ImageFilter.GaussianBlur(radius=40))
- bg = ImageEnhance.Brightness(bg).enhance(0.4)
- return bg
-
- def _draw_progress_bar(self, draw, x, y, w, h, progress_pct, color="white", bg_color="#5e5e5e"):
- draw.rounded_rectangle((x, y, x + w, y + h), radius=h/2, fill=bg_color)
-
- fill_w = int(w * progress_pct)
- if fill_w > 0:
- draw.rounded_rectangle((x, y, x + fill_w, y + h), radius=h/2, fill=color)
-
- dot_radius = h * 1.2
- dot_x = x + fill_w
- dot_y = y + (h / 2)
-
- draw.ellipse(
- (dot_x - dot_radius, dot_y - dot_radius, dot_x + dot_radius, dot_y + dot_radius),
- fill=color
- )
-
- def horizontal(self):
- W, H = 1500, 600
- padding = 60
- cover_size = 480
-
- font_bytes = requests.get(self.font_url).content
- title_font = self._get_font(55, font_bytes)
- artist_font = self._get_font(45, font_bytes)
- time_font = self._get_font(25, font_bytes)
-
- img = self._prepare_background(W, H)
- draw = ImageDraw.Draw(img)
-
- cover = self._prepare_cover(cover_size, 30)
- img.paste(cover, (padding, (H - cover_size) // 2), cover)
-
- text_x = padding + cover_size + 60
- text_y_start = 100
- text_width_limit = W - text_x - padding
-
- display_title = self.title
- while title_font.getlength(display_title) > text_width_limit and len(display_title) > 0:
- display_title = display_title[:-1]
- if len(display_title) < len(self.title): display_title += "…"
-
- display_artist = self.artists
- while artist_font.getlength(display_artist) > text_width_limit and len(display_artist) > 0:
- display_artist = display_artist[:-1]
- if len(display_artist) < len(self.artists): display_artist += "…"
-
- draw.text((text_x, text_y_start), display_title, font=title_font, fill="white")
- draw.text((text_x, text_y_start + 70), display_artist, font=artist_font, fill="#B3B3B3")
-
- cur_time = f"{(self.progress//1000//60):02}:{(self.progress//1000%60):02}"
- dur_time = f"{(self.duration//1000//60):02}:{(self.duration//1000%60):02}"
-
- cur_w = time_font.getlength(cur_time)
- dur_w = time_font.getlength(dur_time)
-
- bar_y = 480
- bar_h = 8
- gap = 25
-
- draw.text((text_x, bar_y - 12), cur_time, font=time_font, fill="white")
-
- bar_start_x = text_x + cur_w + gap
- bar_end_x = text_x + text_width_limit - dur_w - gap
- bar_w = bar_end_x - bar_start_x
-
- prog_pct = self.progress / self.duration if self.duration > 0 else 0
- self._draw_progress_bar(draw, bar_start_x, bar_y, bar_w, bar_h, prog_pct)
-
- draw.text((bar_end_x + gap, bar_y - 12), dur_time, font=time_font, fill="white")
-
- by = io.BytesIO()
- img.save(by, format="PNG")
- by.seek(0)
- by.name = "banner.png"
- return by
-
- def vertical(self):
- W, H = 1000, 1500
- padding = 80
- cover_size = 800
-
- font_bytes = requests.get(self.font_url).content
- title_font = self._get_font(60, font_bytes)
- artist_font = self._get_font(45, font_bytes)
- time_font = self._get_font(35, font_bytes)
-
- img = self._prepare_background(W, H)
- draw = ImageDraw.Draw(img)
-
- cover = self._prepare_cover(cover_size, 40)
- cover_x = (W - cover_size) // 2
- cover_y = 120
- img.paste(cover, (cover_x, cover_y), cover)
-
- text_area_y = cover_y + cover_size + 120
- text_width_limit = W - (padding * 2)
-
- display_title = self.title
- while title_font.getlength(display_title) > text_width_limit and len(display_title) > 0:
- display_title = display_title[:-1]
- if len(display_title) < len(self.title): display_title += "…"
-
- display_artist = self.artists
- while artist_font.getlength(display_artist) > text_width_limit and len(display_artist) > 0:
- display_artist = display_artist[:-1]
- if len(display_artist) < len(self.artists): display_artist += "…"
-
- title_w = title_font.getlength(display_title)
- draw.text(((W - title_w) / 2, text_area_y), display_title, font=title_font, fill="white")
-
- artist_w = artist_font.getlength(display_artist)
- draw.text(((W - artist_w) / 2, text_area_y + 75), display_artist, font=artist_font, fill="#B3B3B3")
-
- bar_y = text_area_y + 260
- bar_h = 8
- bar_w = W - (padding * 2)
- prog_pct = self.progress / self.duration if self.duration > 0 else 0
-
- self._draw_progress_bar(draw, padding, bar_y, bar_w, bar_h, prog_pct, color="white", bg_color="#5e5e5e")
-
- cur_time = f"{(self.progress//1000//60):02}:{(self.progress//1000%60):02}"
- dur_time = f"{(self.duration//1000//60):02}:{(self.duration//1000%60):02}"
-
- draw.text((padding, bar_y + 40), cur_time, font=time_font, fill="#B3B3B3")
-
- dur_w = time_font.getlength(dur_time)
- draw.text((W - padding - dur_w, bar_y + 40), dur_time, font=time_font, fill="#B3B3B3")
-
- by = io.BytesIO()
- img.save(by, format="PNG")
- by.seek(0)
- by.name = "banner.png"
- return by
-
-@loader.tds
-class SpotifyMod(loader.Module):
- """Card with the currently playing track on Spotify."""
-
- strings = {
- "name": "SpotifyMod",
- "need_auth": (
- "❌ Please execute"
- " .sauth before performing this action."
- ),
- "on-repeat": (
- "🔄 Set on-repeat."
- ),
- "off-repeat": (
- "🔄 Stopped track"
- " repeat."
- ),
- "skipped": (
- "➡️ Skipped track."
- ),
- "playing": "▶️ Playing...",
- "back": (
- "⬅️ Switched to previous"
- " track"
- ),
- "paused": "❌ Pause",
- "restarted": (
- "✅️ Playing track"
- " from the"
- " beginning"
- ),
- "liked": (
- "❤️ Liked current"
- " playback"
- ),
- "unlike": (
- "❌"
- " Unliked current playback"
- ),
- "err": (
- "❌ An error occurred."
- "\n{}"
- ),
- "already_authed": (
- "❌ Already authorized"
- ),
- "authed": (
- "✅ Authentication"
- " successful"
- ),
- "deauth": (
- "🚪 Successfully logged out"
- " of account"
- ),
- "auth": (
- '🔗 Follow this'
- " link, allow access, then enter .scode https://... with"
- " the link you received."
- ),
- "no_music": (
- "❌ No music is playing!"
- ),
- "dl_err": (
- "❌ Failed to download"
- " track."
- ),
- "volume_changed": (
- "🔊"
- " Volume changed to {}%."
- ),
- "volume_invalid": (
- "❌ Volume level must be"
- " a number between 0 and 100."
- ),
- "volume_err": (
- "❌ An error occurred while"
- " changing volume."
- ),
- "no_volume_arg": (
- "❌ Please specify a"
- " volume level between 0 and 100."
- ),
- "searching_tracks": (
- "🕔 Searching for tracks"
- " matching {}..."
- ),
- "no_search_query": (
- "❌ Please specify a"
- " search query."
- ),
- "no_tracks_found": (
- "❌ No tracks found for"
- " {}."
- ),
- "search_results": (
- "✅ Search results for"
- " {}:\n\n{}"
- ),
- "downloading_search_track": (
- "🕔 Downloading {}..."
- ),
- "download_success": (
- "✅ Successfully downloaded {} - {}"
- ),
- "invalid_track_number": (
- "❌ Invalid track number."
- " Please search first or provide a valid number from the list."
- ),
- "device_list": (
- "📄 Available devices:\n{}"
- ),
- "no_devices_found": (
- "❌ No devices found."
- ),
- "device_changed": (
- "✅ Playback transferred to"
- " {}."
- ),
- "invalid_device_id": (
- "❌ Invalid device ID."
- " Use .sdevice to see available devices."
- ),
- "search_results_cleared": "✅ Search results cleared",
- "autobio": (
- "🎧 Spotify autobio {}"
- ),
- "no_ytdlp": "❌ yt-dlp not found... Check config or install yt-dlp ({}terminal pip install yt-dlp)",
- "snowt_failed": "\n\n❌ Download failed",
- "uploading_banner": "\n\n🕔 Uploading banner...",
- "downloading_track": "\n\n🕔 Downloading track...",
- "no_playlists": "❌ No playlists found.",
- "playlists_list": "📄 Your playlists:\n\n{}",
- "added_to_playlist": "✅ Added {} to {}",
- "removed_from_playlist": "✅ Removed {} from {}",
- "invalid_playlist_index": "❌ Invalid playlist number.",
- "no_cached_playlists": "❌ Use .splaylists first.",
- "playlist_created": "✅ Playlist {} created.",
- "playlist_deleted": "✅ Playlist {} deleted.",
- "no_playlist_name": "❌ Please specify a playlist name.",
- }
-
- strings_ru = {
- "_cls_doc": "Карточка с играющим треком в Spotify.",
- "need_auth": (
- "❌ Выполни"
- " .sauth перед выполнением этого действия."
- ),
- "err": (
- "❌ Произошла ошибка."
- "\n{}"
- ),
- "on-repeat": (
- "🔄 Включен повтор трека."
- ),
- "off-repeat": (
- "🔄 Повтор трека отключён."
- ),
- "skipped": (
- "➡️ Трек пропущен."
- ),
- "playing": "▶️ Играет...",
- "back": (
- "⬅️ Переключено на предыдущий трек"
- ),
- "paused": "❌ Пауза",
- "restarted": (
- "✅️ Воспроизведение трека с начала..."
- ),
- "liked": (
- "❤️ Текущий трек добавлен в избранное"
- ),
- "unlike": (
- "❌ Убрал лайк с текущего трека"
- ),
- "already_authed": (
- "❌ Уже авторизован"
- ),
- "authed": (
- "✅ Успешная аутентификация"
- ),
- "deauth": (
- "🚪 Успешный выход из аккаунта"
- ),
- "auth": (
- '🔗 Пройдите по этой ссылке, разрешите вход, затем введите .scode https://... с ссылкой которую вы получили.'
- ),
- "no_music": (
- "❌ Музыка не играет!"
- ),
- "dl_err": (
- "❌ Не удалось скачать трек."
- ),
- "volume_changed": (
- "🔊"
- " Громкость изменена на {}%."
- ),
- "volume_invalid": (
- "❌ Уровень громкости должен"
- " быть числом от 0 до 100."
- ),
- "volume_err": (
- "❌ Произошла ошибка при"
- " изменении громкости."
- ),
- "no_volume_arg": (
- "❌ Пожалуйста, укажите"
- " уровень громкости от 0 до 100."
- ),
- "searching_tracks": (
- "🕔 Идет поиск треков"
- " по запросу {}..."
- ),
- "no_search_query": (
- "❌ Пожалуйста, укажите"
- " поисковый запрос."
- ),
- "no_tracks_found": (
- "❌ По запросу '{}'"
- " ничего не найдено."
- ),
- "search_results": (
- "✅ Результаты поиска"
- " по запросу {}:\n\n{}"
- ),
- "downloading_search_track": (
- "🕔 Скачиваю {}..."
- ),
- "download_success": (
- "✅ Трек {} - {} успешно скачан."
- ),
- "invalid_track_number": (
- "❌ Некорректный номер трека."
- " Сначала выполните поиск или укажите правильный номер из списка."
- ),
- "device_list": (
- "📄 Доступные устройства:\n{}"
- ),
- "no_devices_found": (
- "❌ Устройства не найдены."
- ),
- "device_changed": (
- "✅ Воспроизведение переключено на"
- " {}."
- ),
- "invalid_device_id": (
- "❌ Некорректный ID устройства."
- " Используйте .sdevice , чтобы увидеть доступные устройства."
- ),
- "search_results_cleared": "✅ Результаты поиска очищены",
- "autobio": (
- "🎧 Обновление био"
- " включено {}"
- ),
- "no_ytdlp": "❌ yt-dlp не найден... Проверьте конфиг или установите yt-dlp ({}terminal pip install yt-dlp)",
- "snowt_failed": "\n\n❌ Ошибка скачивания.",
- "uploading_banner": "\n\n🕔 Загрузка баннера...",
- "downloading_track": "\n\n🕔 Скачивание трека...",
- "no_playlists": "❌ Плейлисты не найдены.",
- "playlists_list": "📄 Ваши плейлисты:\n\n{}",
- "added_to_playlist": "✅ Трек {} добавлен в {}",
- "removed_from_playlist": "✅ Трек {} удален из {}",
- "invalid_playlist_index": "❌ Неверный номер плейлиста.",
- "no_cached_playlists": "❌ Сначала используйте .splaylists.",
- "playlist_created": "✅ Плейлист {} создан.",
- "playlist_deleted": "✅ Плейлист {} удален.",
- "no_playlist_name": "❌ Пожалуйста, укажите название плейлиста.",
- }
- strings_jp = {
- "_cls_doc": "Spotify からのメッセージ",
- "need_auth": (
- "❌ この操作を行う前に "
- ".sauth を実行してください。"
- ),
- "on-repeat": (
- "🔄 リピート再生を設定しました。"
- ),
- "off-repeat": (
- "🔄 リピート再生を解除しました。"
- ),
- "skipped": (
- "➡️ スキップしました。"
- ),
- "playing": "▶️ 再生中...",
- "back": (
- "⬅️ 前のトラックに戻りました。"
- ),
- "paused": "❌ 一時停止",
- "restarted": (
- "✅️ 最初から再生します。"
- ),
- "liked": (
- "❤️ お気に入りに追加しました。"
- ),
- "unlike": (
- "❌"
- " お気に入りから削除しました。"
- ),
- "err": (
- "❌ エラーが発生しました。"
- "\n{}"
- ),
- "already_authed": (
- "❌ 既に認証されています。"
- ),
- "authed": (
- "✅ 認証に成功しました。"
- ),
- "deauth": (
- "🚪 ログアウトしました。"
- ),
- "auth": (
- '🔗 リンクをクリックしてアクセスを許可し、取得したURLを使って .scode https://... を入力してください。'
- ),
- "no_music": (
- "❌ 音楽は再生されていません!"
- ),
- "dl_err": (
- "❌ トラックのダウンロードに失敗しました。"
- ),
- "volume_changed": (
- "🔊"
- " 音量を {}% に変更しました。"
- ),
- "volume_invalid": (
- "❌ 音量は0から100の数字で指定してください。"
- ),
- "volume_err": (
- "❌ 音量の変更中にエラーが発生しました。"
- ),
- "no_volume_arg": (
- "❌ 0から100の間で音量を指定してください。"
- ),
- "searching_tracks": (
- "🕔 {} を検索中..."
- ),
- "no_search_query": (
- "❌ 検索キーワードを指定してください。"
- ),
- "no_tracks_found": (
- "❌ {} は見つかりませんでした。"
- ),
- "search_results": (
- "✅ {} の検索結果:\n\n{}"
- ),
- "downloading_search_track": (
- "🕔 {} をダウンロード中..."
- ),
- "download_success": (
- "✅ {} - {} のダウンロードに成功しました。"
- ),
- "invalid_track_number": (
- "❌ トラック番号が無効です。"
- " 先に検索するか、リストから有効な番号を指定してください。"
- ),
- "device_list": (
- "📄 利用可能なデバイス:\n{}"
- ),
- "no_devices_found": (
- "❌ デバイスが見つかりません。"
- ),
- "device_changed": (
- "✅ 再生デバイスを"
- " {} に切り替えました。"
- ),
- "invalid_device_id": (
- "❌ デバイスIDが無効です。"
- " .sdevice で利用可能なデバイスを確認してください。"
- ),
- "search_results_cleared": "✅ 検索結果をクリアしました。",
- "autobio": (
- "🎧 Spotify AutoBio: {}"
- ),
- "no_ytdlp": "❌ yt-dlpが見つかりません... 設定を確認するか、インストールしてください ({}terminal pip install yt-dlp)",
- "snowt_failed": "\n\n❌ ダウンロードに失敗しました。",
- "uploading_banner": "\n\n🕔 バナーをアップロード中...",
- "downloading_track": "\n\n🕔 トラックをダウンロード中...",
- "no_playlists": "❌ プレイリストが見つかりません。",
- "playlists_list": "📄 あなたのプレイリスト:\n\n{}",
- "added_to_playlist": "✅ {} を {} に追加しました。",
- "removed_from_playlist": "✅ {} を {} から削除しました。",
- "invalid_playlist_index": "❌ プレイリスト番号が無効です。",
- "no_cached_playlists": "❌ 先に .splaylists を使用してください。",
- "playlist_created": "✅ プレイリスト {} を作成しました。",
- "playlist_deleted": "✅ プレイリスト {} を削除しました。",
- "no_playlist_name": "❌ プレイリスト名を指定してください。",
- }
-
- def __init__(self):
- self._client_id = "e0708753ab60499c89ce263de9b4f57a"
- self._client_secret = "80c927166c664ee98a43a2c0e2981b4a"
- self.scope = (
- "user-read-playback-state playlist-read-private playlist-read-collaborative"
- " user-modify-playback-state user-library-modify"
- " playlist-modify-public playlist-modify-private"
- )
- self.sp_auth = spotipy.oauth2.SpotifyOAuth(
- client_id=self._client_id,
- client_secret=self._client_secret,
- redirect_uri="https://thefsch.github.io/spotify/",
- scope=self.scope,
- )
- self.config = loader.ModuleConfig(
- loader.ConfigValue(
- "show_banner",
- True,
- "Show banner with track info",
- validator=loader.validators.Boolean(),
- ),
- loader.ConfigValue(
- "custom_text",
- (
- "🎧 Now playing: {track} — {artists}\n"
- "🔗 song.link"
- ),
- """Custom text, supports {track}, {artists}, {album}, {playlist}, {playlist_owner}, {spotify_url}, {songlink}, {progress}, {duration}, {device} placeholders""",
- validator=loader.validators.String(),
- ),
- loader.ConfigValue(
- "font",
- "https://raw.githubusercontent.com/kamekuro/assets/master/fonts/Onest-Bold.ttf",
- "Custom font. Specify URL to .ttf file",
- validator=loader.validators.String(),
- ),
- loader.ConfigValue(
- "auto_bio_template",
- "🎧 {}",
- lambda: "Template for Spotify AutoBio",
- ),
- loader.ConfigValue(
- "ytdlp_path",
- "",
- "Path to ytdlp binary",
- validator=loader.validators.String(),
- ),
- loader.ConfigValue(
- "banner_version",
- "horizontal",
- lambda: "Banner version",
- validator=loader.validators.Choice(["horizontal", "vertical"]),
- ),
- )
-
- async def client_ready(self, client, db):
- self.font_ready = asyncio.Event()
-
- self._premium = getattr(await client.get_me(), "premium", False)
- try:
- self.sp = spotipy.Spotify(auth=self.get("acs_tkn")["access_token"])
- except Exception:
- self.set("acs_tkn", None)
- self.sp = None
-
- if self.get("autobio", False):
- self.autobio.start()
-
- def tokenized(func) -> FunctionType:
- @functools.wraps(func)
- async def wrapped(*args, **kwargs):
- if not args[0].get("acs_tkn", False) or not args[0].sp:
- await utils.answer(args[1], args[0].strings("need_auth"))
- return
-
- return await func(*args, **kwargs)
-
- wrapped.__doc__ = func.__doc__
- wrapped.__module__ = func.__module__
-
- return wrapped
-
- def error_handler(func) -> FunctionType:
- @functools.wraps(func)
- async def wrapped(*args, **kwargs):
- try:
- return await func(*args, **kwargs)
- except Exception:
- logger.exception(traceback.format_exc())
- with contextlib.suppress(Exception):
- await utils.answer(
- args[1],
- args[0].strings("err").format(traceback.format_exc()),
- )
-
- wrapped.__doc__ = func.__doc__
- wrapped.__module__ = func.__module__
-
- return wrapped
-
-
- @loader.loop(interval=90)
- async def autobio(self):
- try:
- current_playback = self.sp.current_playback()
- track = current_playback["item"]["name"]
- track = re.sub(r"([(].*?[)])", "", track).strip()
- except Exception:
- return
-
- bio = self.config["auto_bio_template"].format(f"{track}")
-
- try:
- await self._client(
- UpdateProfileRequest(about=bio[: 140 if self._premium else 70])
- )
- except FloodWaitError as e:
- logger.info(f"Sleeping {max(e.seconds, 60)} bc of floodwait")
- await asyncio.sleep(max(e.seconds, 60))
- return
-
- async def _download_track(self, message, query: str, caption: str = ""):
- dl_dir = os.path.join(os.getcwd(), "spotifymod")
- if not os.path.exists(dl_dir):
- os.makedirs(dl_dir, exist_ok=True)
-
- for f in os.listdir(dl_dir):
- try:
- os.remove(os.path.join(dl_dir, f))
- except:
- pass
-
- try:
- squery = query.replace('"', '').replace("'", "")
-
- cmd = (
- f'{self.config["ytdlp_path"]} -x --audio-format mp3 --add-metadata '
- f'-o "{dl_dir}/%(title)s [%(id)s].%(ext)s" '
- f'"ytsearch1:{squery}"'
- )
-
- proc = await asyncio.create_subprocess_shell(
- cmd,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE
- )
- await proc.communicate()
-
- files = [f for f in os.listdir(dl_dir) if f.endswith(".mp3")]
-
- if files:
- target_file = os.path.join(dl_dir, files[0])
- await utils.answer(message, caption, file=target_file)
- else:
- await utils.answer(message, self.strings("snowt_failed"))
-
- except Exception as e:
- logger.error(e)
- await utils.answer(message, self.strings("dl_err"))
-
- finally:
- if os.path.exists(dl_dir):
- for f in os.listdir(dl_dir):
- try:
- os.remove(os.path.join(dl_dir, f))
- except:
- pass
-
-
- @error_handler
- @tokenized
- @loader.command(
- ru_doc="- ➕ Добавить текущий трек в плейлист (используйте номер из .splaylists)"
- )
- async def splaylistadd(self, message: Message):
- """- ➕ Add current track to playlist (use number from .splaylists)"""
- args = utils.get_args_raw(message)
- if not args or not args.isdigit():
- await utils.answer(message, self.strings("invalid_playlist_index"))
- return
-
- index = int(args) - 1
- playlists = self.get("last_playlists", [])
-
- if not playlists:
- await utils.answer(message, self.strings("no_cached_playlists"))
- return
-
- if index < 0 or index >= len(playlists):
- await utils.answer(message, self.strings("invalid_playlist_index"))
- return
-
- current = self.sp.current_playback()
- if not current or not current.get("item"):
- await utils.answer(message, self.strings("no_music"))
- return
-
- track_uri = current["item"]["uri"]
- track_name = current["item"]["name"]
- artists = ", ".join([a["name"] for a in current["item"]["artists"]])
- full_track_name = f"{artists} - {track_name}"
-
- playlist_id = playlists[index]["id"]
- playlist_name = playlists[index]["name"]
-
- try:
- self.sp.playlist_add_items(playlist_id, [track_uri])
- except spotipy.exceptions.SpotifyException as e:
- if e.http_status == 403 and "Insufficient client scope" in str(e):
- await utils.answer(message, self.strings("need_auth"))
- return
- raise e
-
- await utils.answer(message, self.strings("added_to_playlist").format(utils.escape_html(full_track_name), utils.escape_html(playlist_name)))
-
- @error_handler
- @tokenized
- @loader.command(
- ru_doc="- ➖ Удалить текущий трек из плейлиста (используйте номер из .splaylists)"
- )
- async def splaylistrem(self, message: Message):
- """- ➖ Remove current track from playlist (use number from .splaylists)"""
- args = utils.get_args_raw(message)
- if not args or not args.isdigit():
- await utils.answer(message, self.strings("invalid_playlist_index"))
- return
-
- index = int(args) - 1
- playlists = self.get("last_playlists", [])
-
- if not playlists:
- await utils.answer(message, self.strings("no_cached_playlists"))
- return
-
- if index < 0 or index >= len(playlists):
- await utils.answer(message, self.strings("invalid_playlist_index"))
- return
-
- current = self.sp.current_playback()
- if not current or not current.get("item"):
- await utils.answer(message, self.strings("no_music"))
- return
-
- track_uri = current["item"]["uri"]
- track_name = current["item"]["name"]
- artists = ", ".join([a["name"] for a in current["item"]["artists"]])
- full_track_name = f"{artists} - {track_name}"
-
- playlist_id = playlists[index]["id"]
- playlist_name = playlists[index]["name"]
-
- try:
- self.sp.playlist_remove_all_occurrences_of_items(playlist_id, [track_uri])
- except spotipy.exceptions.SpotifyException as e:
- if e.http_status == 403 and "Insufficient client scope" in str(e):
- await utils.answer(message, self.strings("need_auth"))
- return
- raise e
-
- await utils.answer(message, self.strings("removed_from_playlist").format(utils.escape_html(full_track_name), utils.escape_html(playlist_name)))
-
- @error_handler
- @tokenized
- @loader.command(
- ru_doc="- 🆕 Создать новый плейлист"
- )
- async def splaylistcreate(self, message: Message):
- """- 🆕 Create a new playlist"""
- name = utils.get_args_raw(message)
- if not name:
- await utils.answer(message, self.strings("no_playlist_name"))
- return
-
- user_id = self.sp.me()["id"]
- self.sp.user_playlist_create(user_id, name)
- await utils.answer(message, self.strings("playlist_created").format(utils.escape_html(name)))
-
- @error_handler
- @tokenized
- @loader.command(
- ru_doc="- 🗑 Удалить плейлист (используйте номер из .splaylists)"
- )
- async def splaylistdelete(self, message: Message):
- """- 🗑 Delete playlist (use number from .splaylists)"""
- args = utils.get_args_raw(message)
- if not args or not args.isdigit():
- await utils.answer(message, self.strings("invalid_playlist_index"))
- return
-
- index = int(args) - 1
- playlists = self.get("last_playlists", [])
-
- if not playlists:
- await utils.answer(message, self.strings("no_cached_playlists"))
- return
-
- if index < 0 or index >= len(playlists):
- await utils.answer(message, self.strings("invalid_playlist_index"))
- return
-
- playlist_id = playlists[index]["id"]
- playlist_name = playlists[index]["name"]
-
- self.sp.current_user_unfollow_playlist(playlist_id)
- await utils.answer(message, self.strings("playlist_deleted").format(utils.escape_html(playlist_name)))
-
- @error_handler
- @tokenized
- @loader.command(
- ru_doc="- 📃 Получить все плейлисты"
- )
- async def splaylists(self, message: Message):
- """- 📃 Get all playlists"""
- user_id = self.sp.me()["id"]
- playlists = self.sp.current_user_playlists()
-
- editable_playlists = []
- for playlist in playlists["items"]:
- if playlist["owner"]["id"] == user_id or playlist["collaborative"]:
- editable_playlists.append(playlist)
-
- self.set("last_playlists", editable_playlists)
-
- playlist_list_text = ""
- for i, playlist in enumerate(editable_playlists):
- name = utils.escape_html(playlist["name"])
- url = playlist["external_urls"]["spotify"]
- count = playlist["tracks"]["total"]
- playlist_list_text += f"{i + 1}. {name} ({count} tracks)\n"
-
- if not playlist_list_text:
- await utils.answer(message, self.strings("no_playlists"))
- else:
- await utils.answer(message, self.strings("playlists_list").format(playlist_list_text))
-
- @error_handler
- @tokenized
- @loader.command(
- ru_doc="- ℹ️ Переключить стриминг воспроизведения в био"
- )
- async def sbiocmd(self, message: Message):
- """- ℹ️ Toggle bio playback streaming"""
- current = self.get("autobio", False)
- new = not current
- self.set("autobio", new)
- await utils.answer(
- message,
- self.strings("autobio").format("enabled" if new else "disabled"),
- )
-
- if new:
- self.autobio.start()
- else:
- self.autobio.stop()
-
- @error_handler
- @tokenized
- @loader.command(
- ru_doc="- 🔊 Изменить громкость. .svolume <0-100>"
- )
- async def svolume(self, message: Message):
- """- 🔊 Change playback volume. .svolume <0-100>"""
- try:
- args = utils.get_args_raw(message)
- if not args:
- await utils.answer(message, self.strings("no_volume_arg"))
- return
-
- volume_percent = int(args)
- if 0 <= volume_percent <= 100:
- self.sp.volume(volume_percent)
- await utils.answer(message, self.strings("volume_changed").format(volume_percent))
- else:
- await utils.answer(message, self.strings("volume_invalid"))
- except ValueError:
- await utils.answer(message, self.strings("volume_invalid"))
- except Exception:
- await utils.answer(message, self.strings("volume_err"))
-
- @error_handler
- @tokenized
- @loader.command(
- ru_doc=(
- "- 🎵 Выбрать устройство для воспроизведения. Например: .sdevice \n"
- "- 📝 Показать список устройств: .sdevice"
- )
- )
- async def sdevicecmd(self, message: Message):
- """- 🎵 Set preferred playback device. Usage: .sdevice or .sdevice to list devices"""
- args = utils.get_args_raw(message)
- devices = self.sp.devices()["devices"]
-
- if not args:
- if not devices:
- await utils.answer(message, self.strings("no_devices_found"))
- return
-
- device_list_text = ""
- for i, device in enumerate(devices):
- is_active = "(active)" if device["is_active"] else ""
- device_list_text += (
- f"{i+1}. {device['name']}"
- f" ({device['type']}) {is_active}\n"
- )
-
- await utils.answer(message, self.strings("device_list").format(device_list_text.strip()))
- return
-
- device_id = None
- try:
- device_number = int(args)
- if 0 < device_number <= len(devices):
- device_id = devices[device_number - 1]["id"]
- device_name = devices[device_number - 1]["name"]
- else:
- await utils.answer(message, self.strings("invalid_device_id"))
- return
- except ValueError:
- found_device = next((d for d in devices if d["id"] == args.strip()), None)
- if found_device:
- device_id = found_device["id"]
- device_name = found_device["name"]
- else:
- await utils.answer(message, self.strings("invalid_device_id"))
- return
-
- self.sp.transfer_playback(device_id=device_id)
- await utils.answer(message, self.strings("device_changed").format(device_name))
-
- @error_handler
- @tokenized
- @loader.command(
- ru_doc="- 💫 Включить повтор трека"
- )
- async def srepeatcmd(self, message: Message):
- """- 💫 Repeat"""
- self.sp.repeat("track")
- await utils.answer(message, self.strings("on-repeat"))
-
- @error_handler
- @tokenized
- @loader.command(
- ru_doc="- ✋ Остановить повтор"
- )
- async def sderepeatcmd(self, message: Message):
- """- ✋ Stop repeat"""
- self.sp.repeat("context")
- await utils.answer(message, self.strings("off-repeat"))
-
- @error_handler
- @tokenized
- @loader.command(
- ru_doc="- 👉 Следующий трек"
- )
- async def snextcmd(self, message: Message):
- """- 👉 Next track"""
- self.sp.next_track()
- await utils.answer(message, self.strings("skipped"))
-
- @error_handler
- @tokenized
- @loader.command(
- ru_doc="- 🤚 Продолжить воспроизведение"
- )
- async def sresumecmd(self, message: Message):
- """- 🤚 Resume"""
- self.sp.start_playback()
- await utils.answer(message, self.strings("playing"))
-
- @error_handler
- @tokenized
- @loader.command(
- ru_doc="- 🤚 Пауза"
- )
- async def spausecmd(self, message: Message):
- """- 🤚 Pause"""
- self.sp.pause_playback()
- await utils.answer(message, self.strings("paused"))
-
- @error_handler
- @tokenized
- @loader.command(
- ru_doc="- ⏮ Предыдущий трек"
- )
- async def sbackcmd(self, message: Message):
- """- ⏮ Previous track"""
- self.sp.previous_track()
- await utils.answer(message, self.strings("back"))
-
- @error_handler
- @tokenized
- @loader.command(
- ru_doc="- ⏪ Перезапустить трек"
- )
- async def sbegincmd(self, message: Message):
- """- ⏪ Restart track"""
- self.sp.seek_track(0)
- await utils.answer(message, self.strings("restarted"))
-
- @error_handler
- @tokenized
- @loader.command(
- ru_doc="- ❤️ Лайкнуть играющий трек"
- )
- async def slikecmd(self, message: Message):
- """- ❤️ Like current track"""
- cupl = self.sp.current_playback()
- self.sp.current_user_saved_tracks_add([cupl["item"]["id"]])
- await utils.answer(message, self.strings("liked"))
-
- @error_handler
- @tokenized
- @loader.command(
- ru_doc="- 💔 Убрать лайк с играющего трека"
- )
- async def sunlikecmd(self, message: Message):
- """- 💔 Unlike current track"""
- cupl = self.sp.current_playback()
- self.sp.current_user_saved_tracks_delete([cupl["item"]["id"]])
- await utils.answer(message, self.strings("unlike"))
-
- @error_handler
- @loader.command(
- ru_doc="- Получить ссылку для авторизации"
- )
- async def sauthcmd(self, message: Message):
- """- Get authorization link"""
- if self.get("acs_tkn", False) and not self.sp:
- await utils.answer(message, self.strings("already_authed"))
- else:
- self.sp_auth.get_authorize_url()
- await utils.answer(
- message,
- self.strings("auth").format(self.sp_auth.get_authorize_url()),
- )
-
- @error_handler
- @loader.command(
- ru_doc="- Вставить код авторизации"
- )
- async def scodecmd(self, message: Message):
- """- Paste authorization code"""
- url = message.message.split(" ")[1]
- code = self.sp_auth.parse_auth_response_url(url)
- self.set("acs_tkn", self.sp_auth.get_access_token(code, True, False))
- self.sp = spotipy.Spotify(auth=self.get("acs_tkn")["access_token"])
- await utils.answer(message, self.strings("authed"))
-
- @error_handler
- @loader.command(
- ru_doc="- Выйти из аккаунта"
- )
- async def unauthcmd(self, message: Message):
- """- Log out of account"""
- self.set("acs_tkn", None)
- del self.sp
- await utils.answer(message, self.strings("deauth"))
-
- @error_handler
- @tokenized
- @loader.command(
- ru_doc="- Обновить токен авторизации"
- )
- async def stokrefreshcmd(self, message: Message):
- """- Refresh authorization token"""
- self.set(
- "acs_tkn",
- self.sp_auth.refresh_access_token(self.get("acs_tkn")["refresh_token"]),
- )
- self.set("NextRefresh", time.time() + 45 * 60)
- self.sp = spotipy.Spotify(auth=self.get("acs_tkn")["access_token"])
- await utils.answer(message, self.strings("authed"))
-
- @error_handler
- @tokenized
- @loader.command(
- ru_doc="- 🎧 Показать карточку играющего трека"
- )
- async def snowcmd(self, message: Message):
- """- 🎧 View current track card."""
- current_playback = self.sp.current_playback()
- if not current_playback or not current_playback.get("is_playing", False):
- await utils.answer(message, self.strings("no_music"))
- return
-
- track = current_playback["item"]["name"]
- track_id = current_playback["item"]["id"]
- artists = ", ".join([a["name"] for a in current_playback["item"]["artists"]])
- album_name = current_playback["item"]["album"].get("name", "Unknown Album")
- duration_ms = current_playback["item"].get("duration_ms", 0)
- progress_ms = current_playback.get("progress_ms", 0)
-
- duration = f"{duration_ms//1000//60}:{duration_ms//1000%60:02}"
- progress = f"{progress_ms//1000//60}:{progress_ms//1000%60:02}"
-
- spotify_url = f"https://open.spotify.com/track/{track_id}"
- songlink = f"https://song.link/s/{track_id}"
-
- try:
- device_raw = (
- current_playback["device"]["name"]
- + " "
- + current_playback["device"]["type"].lower()
- )
- device = device_raw.replace("computer", "").replace("smartphone", "").strip()
- except Exception:
- device = None
-
- try:
- playlist_id = current_playback["context"]["uri"].split(":")[-1]
- playlist = self.sp.playlist(playlist_id)
- playlist_name = playlist.get("name", None)
- try:
- playlist_owner = (
- f''
- f'{playlist["owner"]["display_name"]}'
- )
- except KeyError:
- playlist_owner = playlist.get("owner", {}).get("display_name", "")
- except Exception:
- playlist_name = ""
- playlist_owner = ""
-
- text = self.config["custom_text"].format(
- track=utils.escape_html(track),
- artists=utils.escape_html(artists),
- album=utils.escape_html(album_name),
- duration=duration,
- progress=progress,
- device=device,
- spotify_url=spotify_url,
- songlink=songlink,
- playlist=utils.escape_html(playlist_name) if playlist_name else "",
- playlist_owner=playlist_owner or "",
- )
-
- if self.config["show_banner"]:
- cover_url = current_playback["item"]["album"]["images"][0]["url"]
-
- tmp_msg = await utils.answer(message, text + self.strings("uploading_banner"))
-
- banners = Banners(
- title=track,
- artists=artists,
- duration=duration_ms,
- progress=progress_ms,
- track_cover=requests.get(cover_url).content,
- font=self.config["font"],
- )
- file = getattr(banners, self.config["banner_version"], banners.horizontal)()
-
- await utils.answer(tmp_msg, text, file=file)
- else:
- await utils.answer(message, text)
-
- @error_handler
- @tokenized
- @loader.command(
- ru_doc="- 🎧 Скачать играющий трек"
- )
- async def snowtcmd(self, message: Message):
- """- 🎧 Download current track."""
- current_playback = self.sp.current_playback()
- if not current_playback or not current_playback.get("is_playing", False):
- await utils.answer(message, self.strings("no_music"))
- return
-
- track = current_playback["item"]["name"]
- artists = ", ".join([a["name"] for a in current_playback["item"]["artists"]])
- album_name = current_playback["item"]["album"].get("name", "Unknown Album")
- duration_ms = current_playback["item"].get("duration_ms", 0)
- progress_ms = current_playback.get("progress_ms", 0)
-
- duration = f"{duration_ms//1000//60}:{duration_ms//1000%60:02}"
- progress = f"{progress_ms//1000//60}:{progress_ms//1000%60:02}"
-
- spotify_url = f"https://open.spotify.com/track/{current_playback['item']['id']}"
- songlink = f"https://song.link/s/{current_playback['item']['id']}"
-
- try:
- device_raw = (
- current_playback["device"]["name"]
- + " "
- + current_playback["device"]["type"].lower()
- )
- device = device_raw.replace("computer", "").replace("smartphone", "").strip()
- except Exception:
- device = None
-
- try:
- playlist_id = current_playback["context"]["uri"].split(":")[-1]
- playlist = self.sp.playlist(playlist_id)
- playlist_name = playlist.get("name", None)
- try:
- playlist_owner = (
- f''
- f'{playlist["owner"]["display_name"]}'
- )
- except KeyError:
- playlist_owner = playlist.get("owner", {}).get("display_name", "")
- except Exception:
- playlist_name = ""
- playlist_owner = ""
-
- text = self.config["custom_text"].format(
- track=utils.escape_html(track),
- artists=utils.escape_html(artists),
- album=utils.escape_html(album_name),
- duration=duration,
- progress=progress,
- device=device,
- spotify_url=spotify_url,
- songlink=songlink,
- playlist=utils.escape_html(playlist_name) if playlist_name else "",
- playlist_owner=playlist_owner or "",
- )
-
- msg = await utils.answer(message, text + self.strings("downloading_track"))
-
- await self._download_track(msg, f"{artists} {track}", caption=text)
-
- @error_handler
- @tokenized
- @loader.command(
- ru_doc=(
- "- 🔍 Поиск треков. Например: .ssearch Imagine Dragons Believer\n"
- "- 🎧 Скачать трек: .ssearch 1 (где 1 — номер трека из списка)"
- )
- )
- async def ssearchcmd(self, message: Message):
- """🔍 Search for tracks. Usage: .ssearch or .ssearch to download"""
- args = utils.get_args_raw(message)
- if not args:
- await utils.answer(message, self.strings("no_search_query"))
- return
-
- try:
- track_number = int(args)
- search_results = self.get("last_search_results", [])
-
- if not search_results:
- await utils.answer(message, self.strings("no_tracks_found"))
- return
-
- if track_number <= 0 or track_number > len(search_results):
- raise ValueError
-
- msg = await utils.answer(message, self.strings("downloading_track"))
-
- track_info = search_results[track_number - 1]
- track_name = track_info["name"]
- artists = ", ".join([a["name"] for a in track_info["artists"]])
-
- caption_text = self.strings("download_success").format(
- utils.escape_html(track_name),
- utils.escape_html(artists)
- )
-
- await self._download_track(msg, f"{artists} {track_name}", caption=caption_text)
- return
-
- except ValueError:
- await utils.answer(message, self.strings("searching_tracks").format(args))
-
- results = self.sp.search(q=args, limit=5, type="track")
-
- if not results or not results["tracks"]["items"]:
- await utils.answer(message, self.strings("no_tracks_found").format(args))
- return
-
- self.set("last_search_results", results["tracks"]["items"])
-
- tracks_list = []
- for i, track in enumerate(results["tracks"]["items"]):
- track_name = track["name"]
- artists = ", ".join([artist["name"] for artist in track["artists"]])
- track_url = track["external_urls"]["spotify"]
- tracks_list.append(
- "{number}. {track_name} — {artists}\n🔗 Spotify".format(
- number=i + 1,
- track_name=utils.escape_html(track_name),
- artists=utils.escape_html(artists),
- track_url=track_url,
- )
- )
-
- text = "\n".join(tracks_list)
- await utils.answer(message, self.strings("search_results").format(args, text))
-
-
- @loader.command(
- ru_doc="- 🔄 Сброс результатов поиска по трекам"
- )
- async def ssearchresetcmd(self, message: Message):
- """- 🔄 Reset track search results"""
- self.set("last_search_results", [])
- await utils.answer(message, self.strings["search_results_cleared"])
-
- async def watcher(self, message: Message):
- """Watcher is used to update token"""
- if not self.sp:
- return
-
- if self.get("NextRefresh", False):
- ttc = self.get("NextRefresh", 0)
- crnt = time.time()
- if ttc < crnt:
- self.set(
- "acs_tkn",
- self.sp_auth.refresh_access_token(
- self.get("acs_tkn")["refresh_token"]
- ),
- )
- self.set("NextRefresh", time.time() + 45 * 60)
- self.sp = spotipy.Spotify(auth=self.get("acs_tkn")["access_token"])
- else:
- self.set(
- "acs_tkn",
- self.sp_auth.refresh_access_token(self.get("acs_tkn")["refresh_token"]),
- )
- self.set("NextRefresh", time.time() + 45 * 60)
- self.sp = spotipy.Spotify(auth=self.get("acs_tkn")["access_token"])
+# License Violation. Original module has GPL-v3 but author of nowhere tried to change it to CC BY-ND 4.0