# █ █ ▀ █▄▀ ▄▀█ █▀█ ▀
# █▀█ █ █ █ █▀█ █▀▄ █
# © Copyright 2022
#
# https://t.me/hikariatama
#
# 🔒 Licensed under the GNU AGPLv3
# 🌐 https://www.gnu.org/licenses/agpl-3.0.html
#
# You CANNOT edit, distribute or redistribute this file without direct permission from the author.
#
# ORIGINAL MODULE: https://raw.githubusercontent.com/hikariatama/ftg/master/spotify.py
# =======================================
# _ __ __ __ _
# | |/ /___ | \/ | ___ __| |___
# | ' // _ \ | |\/| |/ _ \ / _` / __|
# | . \ __/ | | | | (_) | (_| \__ \
# |_|\_\___| |_| |_|\___/ \__,_|___/
# @ke_mods
# =======================================
#
# LICENSE: CC BY-ND 4.0 (Attribution-NoDerivatives 4.0 International)
# --------------------------------------
# https://creativecommons.org/licenses/by-nd/4.0/legalcode
# =======================================
# meta developer: @ke_mods
# requires: telethon spotipy pillow requests yt-dlp
import asyncio
import contextlib
import functools
import io
import logging
import re
import textwrap
import time
import traceback
import os
from types import FunctionType
import requests
import spotipy
from PIL import Image, ImageDraw, ImageEnhance, ImageFilter, ImageFont, ImageOps
from telethon.errors import FloodWaitError
from telethon.tl.functions.account import UpdateProfileRequest
from telethon.tl.types import Message
from .. import loader, utils
logger = logging.getLogger(__name__)
logging.getLogger("spotipy").setLevel(logging.CRITICAL)
class Banners:
def __init__(
self,
title: str,
artists: list,
duration: int,
progress: int,
track_cover: bytes,
font
):
self.title = title
self.artists = ", ".join(artists) if isinstance(artists, list) else artists
self.duration = duration
self.progress = progress
self.track_cover = track_cover
self.font_url = font
def _get_font(self, size, font_bytes):
return ImageFont.truetype(io.BytesIO(font_bytes), size)
def _prepare_cover(self, size, radius):
cover = Image.open(io.BytesIO(self.track_cover)).convert("RGBA")
cover = cover.resize((size, size), Image.Resampling.LANCZOS)
mask = Image.new("L", (size, size), 0)
draw = ImageDraw.Draw(mask)
draw.rounded_rectangle((0, 0, size, size), radius=radius, fill=255)
output = Image.new("RGBA", (size, size), (0, 0, 0, 0))
output.paste(cover, (0, 0), mask=mask)
return output
def _prepare_background(self, w, h):
bg = Image.open(io.BytesIO(self.track_cover)).convert("RGBA")
bg = bg.resize((w, h), Image.Resampling.BICUBIC)
bg = bg.filter(ImageFilter.GaussianBlur(radius=40))
bg = ImageEnhance.Brightness(bg).enhance(0.4)
return bg
def _draw_progress_bar(self, draw, x, y, w, h, progress_pct, color="white", bg_color="#5e5e5e"):
draw.rounded_rectangle((x, y, x + w, y + h), radius=h/2, fill=bg_color)
fill_w = int(w * progress_pct)
if fill_w > 0:
draw.rounded_rectangle((x, y, x + fill_w, y + h), radius=h/2, fill=color)
dot_radius = h * 1.2
dot_x = x + fill_w
dot_y = y + (h / 2)
draw.ellipse(
(dot_x - dot_radius, dot_y - dot_radius, dot_x + dot_radius, dot_y + dot_radius),
fill=color
)
def horizontal(self):
W, H = 1500, 600
padding = 60
cover_size = 480
font_bytes = requests.get(self.font_url).content
title_font = self._get_font(55, font_bytes)
artist_font = self._get_font(45, font_bytes)
time_font = self._get_font(25, font_bytes)
img = self._prepare_background(W, H)
draw = ImageDraw.Draw(img)
cover = self._prepare_cover(cover_size, 30)
img.paste(cover, (padding, (H - cover_size) // 2), cover)
text_x = padding + cover_size + 60
text_y_start = 100
text_width_limit = W - text_x - padding
display_title = self.title
while title_font.getlength(display_title) > text_width_limit and len(display_title) > 0:
display_title = display_title[:-1]
if len(display_title) < len(self.title): display_title += "…"
display_artist = self.artists
while artist_font.getlength(display_artist) > text_width_limit and len(display_artist) > 0:
display_artist = display_artist[:-1]
if len(display_artist) < len(self.artists): display_artist += "…"
draw.text((text_x, text_y_start), display_title, font=title_font, fill="white")
draw.text((text_x, text_y_start + 70), display_artist, font=artist_font, fill="#B3B3B3")
cur_time = f"{(self.progress//1000//60):02}:{(self.progress//1000%60):02}"
dur_time = f"{(self.duration//1000//60):02}:{(self.duration//1000%60):02}"
cur_w = time_font.getlength(cur_time)
dur_w = time_font.getlength(dur_time)
bar_y = 480
bar_h = 8
gap = 25
draw.text((text_x, bar_y - 12), cur_time, font=time_font, fill="white")
bar_start_x = text_x + cur_w + gap
bar_end_x = text_x + text_width_limit - dur_w - gap
bar_w = bar_end_x - bar_start_x
prog_pct = self.progress / self.duration if self.duration > 0 else 0
self._draw_progress_bar(draw, bar_start_x, bar_y, bar_w, bar_h, prog_pct)
draw.text((bar_end_x + gap, bar_y - 12), dur_time, font=time_font, fill="white")
by = io.BytesIO()
img.save(by, format="PNG")
by.seek(0)
by.name = "banner.png"
return by
def vertical(self):
W, H = 1000, 1500
padding = 80
cover_size = 800
font_bytes = requests.get(self.font_url).content
title_font = self._get_font(60, font_bytes)
artist_font = self._get_font(45, font_bytes)
time_font = self._get_font(35, font_bytes)
img = self._prepare_background(W, H)
draw = ImageDraw.Draw(img)
cover = self._prepare_cover(cover_size, 40)
cover_x = (W - cover_size) // 2
cover_y = 120
img.paste(cover, (cover_x, cover_y), cover)
text_area_y = cover_y + cover_size + 120
text_width_limit = W - (padding * 2)
display_title = self.title
while title_font.getlength(display_title) > text_width_limit and len(display_title) > 0:
display_title = display_title[:-1]
if len(display_title) < len(self.title): display_title += "…"
display_artist = self.artists
while artist_font.getlength(display_artist) > text_width_limit and len(display_artist) > 0:
display_artist = display_artist[:-1]
if len(display_artist) < len(self.artists): display_artist += "…"
title_w = title_font.getlength(display_title)
draw.text(((W - title_w) / 2, text_area_y), display_title, font=title_font, fill="white")
artist_w = artist_font.getlength(display_artist)
draw.text(((W - artist_w) / 2, text_area_y + 75), display_artist, font=artist_font, fill="#B3B3B3")
bar_y = text_area_y + 260
bar_h = 8
bar_w = W - (padding * 2)
prog_pct = self.progress / self.duration if self.duration > 0 else 0
self._draw_progress_bar(draw, padding, bar_y, bar_w, bar_h, prog_pct, color="white", bg_color="#5e5e5e")
cur_time = f"{(self.progress//1000//60):02}:{(self.progress//1000%60):02}"
dur_time = f"{(self.duration//1000//60):02}:{(self.duration//1000%60):02}"
draw.text((padding, bar_y + 40), cur_time, font=time_font, fill="#B3B3B3")
dur_w = time_font.getlength(dur_time)
draw.text((W - padding - dur_w, bar_y + 40), dur_time, font=time_font, fill="#B3B3B3")
by = io.BytesIO()
img.save(by, format="PNG")
by.seek(0)
by.name = "banner.png"
return by
@loader.tds
class SpotifyMod(loader.Module):
"""Card with the currently playing track on Spotify."""
strings = {
"name": "SpotifyMod",
"need_auth": (
"❌ Please execute"
" .sauth before performing this action."
),
"on-repeat": (
"🔄 Set on-repeat."
),
"off-repeat": (
"🔄 Stopped track"
" repeat."
),
"skipped": (
"➡️ Skipped track."
),
"playing": "▶️ Playing...",
"back": (
"⬅️ Switched to previous"
" track"
),
"paused": "❌ Pause",
"restarted": (
"✅️ Playing track"
" from the"
" beginning"
),
"liked": (
"❤️ Liked current"
" playback"
),
"unlike": (
"❌"
" Unliked current playback"
),
"err": (
"❌ An error occurred."
"\n{}"
),
"already_authed": (
"❌ Already authorized"
),
"authed": (
"✅ Authentication"
" successful"
),
"deauth": (
"🚪 Successfully logged out"
" of account"
),
"auth": (
'🔗 Follow this'
" link, allow access, then enter .scode https://... with"
" the link you received."
),
"no_music": (
"❌ No music is playing!"
),
"dl_err": (
"❌ Failed to download"
" track."
),
"volume_changed": (
"🔊"
" Volume changed to {}%."
),
"volume_invalid": (
"❌ Volume level must be"
" a number between 0 and 100."
),
"volume_err": (
"❌ An error occurred while"
" changing volume."
),
"no_volume_arg": (
"❌ Please specify a"
" volume level between 0 and 100."
),
"searching_tracks": (
"🕔 Searching for tracks"
" matching {}..."
),
"no_search_query": (
"❌ Please specify a"
" search query."
),
"no_tracks_found": (
"❌ No tracks found for"
" {}."
),
"search_results": (
"✅ Search results for"
" {}:\n\n{}"
),
"downloading_search_track": (
"🕔 Downloading {}..."
),
"download_success": (
"✅ Successfully downloaded {} - {}"
),
"invalid_track_number": (
"❌ Invalid track number."
" Please search first or provide a valid number from the list."
),
"device_list": (
"📄 Available devices:\n{}"
),
"no_devices_found": (
"❌ No devices found."
),
"device_changed": (
"✅ Playback transferred to"
" {}."
),
"invalid_device_id": (
"❌ Invalid device ID."
" Use .sdevice to see available devices."
),
"search_results_cleared": "✅ Search results cleared",
"autobio": (
"🎧 Spotify autobio {}"
),
"no_ytdlp": "❌ yt-dlp not found... Check config or install yt-dlp ({}terminal pip install yt-dlp)",
"snowt_failed": "\n\n❌ Download failed",
"uploading_banner": "\n\n🕔 Uploading banner...",
"downloading_track": "\n\n🕔 Downloading track...",
"no_playlists": "❌ No playlists found.",
"playlists_list": "📄 Your playlists:\n\n{}",
"added_to_playlist": "✅ Added {} to {}",
"removed_from_playlist": "✅ Removed {} from {}",
"invalid_playlist_index": "❌ Invalid playlist number.",
"no_cached_playlists": "❌ Use .splaylists first.",
"playlist_created": "✅ Playlist {} created.",
"playlist_deleted": "✅ Playlist {} deleted.",
"no_playlist_name": "❌ Please specify a playlist name.",
}
strings_ru = {
"_cls_doc": "Карточка с играющим треком в Spotify.",
"need_auth": (
"❌ Выполни"
" .sauth перед выполнением этого действия."
),
"err": (
"❌ Произошла ошибка."
"\n{}"
),
"on-repeat": (
"🔄 Включен повтор трека."
),
"off-repeat": (
"🔄 Повтор трека отключён."
),
"skipped": (
"➡️ Трек пропущен."
),
"playing": "▶️ Играет...",
"back": (
"⬅️ Переключено на предыдущий трек"
),
"paused": "❌ Пауза",
"restarted": (
"✅️ Воспроизведение трека с начала..."
),
"liked": (
"❤️ Текущий трек добавлен в избранное"
),
"unlike": (
"❌ Убрал лайк с текущего трека"
),
"already_authed": (
"❌ Уже авторизован"
),
"authed": (
"✅ Успешная аутентификация"
),
"deauth": (
"🚪 Успешный выход из аккаунта"
),
"auth": (
'🔗 Пройдите по этой ссылке, разрешите вход, затем введите .scode https://... с ссылкой которую вы получили.'
),
"no_music": (
"❌ Музыка не играет!"
),
"dl_err": (
"❌ Не удалось скачать трек."
),
"volume_changed": (
"🔊"
" Громкость изменена на {}%."
),
"volume_invalid": (
"❌ Уровень громкости должен"
" быть числом от 0 до 100."
),
"volume_err": (
"❌ Произошла ошибка при"
" изменении громкости."
),
"no_volume_arg": (
"❌ Пожалуйста, укажите"
" уровень громкости от 0 до 100."
),
"searching_tracks": (
"🕔 Идет поиск треков"
" по запросу {}..."
),
"no_search_query": (
"❌ Пожалуйста, укажите"
" поисковый запрос."
),
"no_tracks_found": (
"❌ По запросу '{}'"
" ничего не найдено."
),
"search_results": (
"✅ Результаты поиска"
" по запросу {}:\n\n{}"
),
"downloading_search_track": (
"🕔 Скачиваю {}..."
),
"download_success": (
"✅ Трек {} - {} успешно скачан."
),
"invalid_track_number": (
"❌ Некорректный номер трека."
" Сначала выполните поиск или укажите правильный номер из списка."
),
"device_list": (
"📄 Доступные устройства:\n{}"
),
"no_devices_found": (
"❌ Устройства не найдены."
),
"device_changed": (
"✅ Воспроизведение переключено на"
" {}."
),
"invalid_device_id": (
"❌ Некорректный ID устройства."
" Используйте .sdevice , чтобы увидеть доступные устройства."
),
"search_results_cleared": "✅ Результаты поиска очищены",
"autobio": (
"🎧 Обновление био"
" включено {}"
),
"no_ytdlp": "❌ yt-dlp не найден... Проверьте конфиг или установите yt-dlp ({}terminal pip install yt-dlp)",
"snowt_failed": "\n\n❌ Ошибка скачивания.",
"uploading_banner": "\n\n🕔 Загрузка баннера...",
"downloading_track": "\n\n🕔 Скачивание трека...",
"no_playlists": "❌ Плейлисты не найдены.",
"playlists_list": "📄 Ваши плейлисты:\n\n{}",
"added_to_playlist": "✅ Трек {} добавлен в {}",
"removed_from_playlist": "✅ Трек {} удален из {}",
"invalid_playlist_index": "❌ Неверный номер плейлиста.",
"no_cached_playlists": "❌ Сначала используйте .splaylists.",
"playlist_created": "✅ Плейлист {} создан.",
"playlist_deleted": "✅ Плейлист {} удален.",
"no_playlist_name": "❌ Пожалуйста, укажите название плейлиста.",
}
strings_jp = {
"_cls_doc": "Spotify からのメッセージ",
"need_auth": (
"❌ この操作を行う前に "
".sauth を実行してください。"
),
"on-repeat": (
"🔄 リピート再生を設定しました。"
),
"off-repeat": (
"🔄 リピート再生を解除しました。"
),
"skipped": (
"➡️ スキップしました。"
),
"playing": "▶️ 再生中...",
"back": (
"⬅️ 前のトラックに戻りました。"
),
"paused": "❌ 一時停止",
"restarted": (
"✅️ 最初から再生します。"
),
"liked": (
"❤️ お気に入りに追加しました。"
),
"unlike": (
"❌"
" お気に入りから削除しました。"
),
"err": (
"❌ エラーが発生しました。"
"\n{}"
),
"already_authed": (
"❌ 既に認証されています。"
),
"authed": (
"✅ 認証に成功しました。"
),
"deauth": (
"🚪 ログアウトしました。"
),
"auth": (
'🔗 リンクをクリックしてアクセスを許可し、取得したURLを使って .scode https://... を入力してください。'
),
"no_music": (
"❌ 音楽は再生されていません!"
),
"dl_err": (
"❌ トラックのダウンロードに失敗しました。"
),
"volume_changed": (
"🔊"
" 音量を {}% に変更しました。"
),
"volume_invalid": (
"❌ 音量は0から100の数字で指定してください。"
),
"volume_err": (
"❌ 音量の変更中にエラーが発生しました。"
),
"no_volume_arg": (
"❌ 0から100の間で音量を指定してください。"
),
"searching_tracks": (
"🕔 {} を検索中..."
),
"no_search_query": (
"❌ 検索キーワードを指定してください。"
),
"no_tracks_found": (
"❌ {} は見つかりませんでした。"
),
"search_results": (
"✅ {} の検索結果:\n\n{}"
),
"downloading_search_track": (
"🕔 {} をダウンロード中..."
),
"download_success": (
"✅ {} - {} のダウンロードに成功しました。"
),
"invalid_track_number": (
"❌ トラック番号が無効です。"
" 先に検索するか、リストから有効な番号を指定してください。"
),
"device_list": (
"📄 利用可能なデバイス:\n{}"
),
"no_devices_found": (
"❌ デバイスが見つかりません。"
),
"device_changed": (
"✅ 再生デバイスを"
" {} に切り替えました。"
),
"invalid_device_id": (
"❌ デバイスIDが無効です。"
" .sdevice で利用可能なデバイスを確認してください。"
),
"search_results_cleared": "✅ 検索結果をクリアしました。",
"autobio": (
"🎧 Spotify AutoBio: {}"
),
"no_ytdlp": "❌ yt-dlpが見つかりません... 設定を確認するか、インストールしてください ({}terminal pip install yt-dlp)",
"snowt_failed": "\n\n❌ ダウンロードに失敗しました。",
"uploading_banner": "\n\n🕔 バナーをアップロード中...",
"downloading_track": "\n\n🕔 トラックをダウンロード中...",
"no_playlists": "❌ プレイリストが見つかりません。",
"playlists_list": "📄 あなたのプレイリスト:\n\n{}",
"added_to_playlist": "✅ {} を {} に追加しました。",
"removed_from_playlist": "✅ {} を {} から削除しました。",
"invalid_playlist_index": "❌ プレイリスト番号が無効です。",
"no_cached_playlists": "❌ 先に .splaylists を使用してください。",
"playlist_created": "✅ プレイリスト {} を作成しました。",
"playlist_deleted": "✅ プレイリスト {} を削除しました。",
"no_playlist_name": "❌ プレイリスト名を指定してください。",
}
def __init__(self):
self._client_id = "e0708753ab60499c89ce263de9b4f57a"
self._client_secret = "80c927166c664ee98a43a2c0e2981b4a"
self.scope = (
"user-read-playback-state playlist-read-private playlist-read-collaborative"
" user-modify-playback-state user-library-modify"
" playlist-modify-public playlist-modify-private"
)
self.sp_auth = spotipy.oauth2.SpotifyOAuth(
client_id=self._client_id,
client_secret=self._client_secret,
redirect_uri="https://thefsch.github.io/spotify/",
scope=self.scope,
)
self.config = loader.ModuleConfig(
loader.ConfigValue(
"show_banner",
True,
"Show banner with track info",
validator=loader.validators.Boolean(),
),
loader.ConfigValue(
"custom_text",
(
"🎧 Now playing: {track} — {artists}\n"
"🔗 song.link"
),
"""Custom text, supports {track}, {artists}, {album}, {playlist}, {playlist_owner}, {spotify_url}, {songlink}, {progress}, {duration}, {device} placeholders""",
validator=loader.validators.String(),
),
loader.ConfigValue(
"font",
"https://raw.githubusercontent.com/kamekuro/assets/master/fonts/Onest-Bold.ttf",
"Custom font. Specify URL to .ttf file",
validator=loader.validators.String(),
),
loader.ConfigValue(
"auto_bio_template",
"🎧 {}",
lambda: "Template for Spotify AutoBio",
),
loader.ConfigValue(
"ytdlp_path",
"",
"Path to ytdlp binary",
validator=loader.validators.String(),
),
loader.ConfigValue(
"banner_version",
"horizontal",
lambda: "Banner version",
validator=loader.validators.Choice(["horizontal", "vertical"]),
),
)
async def client_ready(self, client, db):
self.font_ready = asyncio.Event()
self._premium = getattr(await client.get_me(), "premium", False)
try:
self.sp = spotipy.Spotify(auth=self.get("acs_tkn")["access_token"])
except Exception:
self.set("acs_tkn", None)
self.sp = None
if self.get("autobio", False):
self.autobio.start()
def tokenized(func) -> FunctionType:
@functools.wraps(func)
async def wrapped(*args, **kwargs):
if not args[0].get("acs_tkn", False) or not args[0].sp:
await utils.answer(args[1], args[0].strings("need_auth"))
return
return await func(*args, **kwargs)
wrapped.__doc__ = func.__doc__
wrapped.__module__ = func.__module__
return wrapped
def error_handler(func) -> FunctionType:
@functools.wraps(func)
async def wrapped(*args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception:
logger.exception(traceback.format_exc())
with contextlib.suppress(Exception):
await utils.answer(
args[1],
args[0].strings("err").format(traceback.format_exc()),
)
wrapped.__doc__ = func.__doc__
wrapped.__module__ = func.__module__
return wrapped
@loader.loop(interval=90)
async def autobio(self):
try:
current_playback = self.sp.current_playback()
track = current_playback["item"]["name"]
track = re.sub(r"([(].*?[)])", "", track).strip()
except Exception:
return
bio = self.config["auto_bio_template"].format(f"{track}")
try:
await self._client(
UpdateProfileRequest(about=bio[: 140 if self._premium else 70])
)
except FloodWaitError as e:
logger.info(f"Sleeping {max(e.seconds, 60)} bc of floodwait")
await asyncio.sleep(max(e.seconds, 60))
return
async def _download_track(self, message, query: str, caption: str = ""):
dl_dir = os.path.join(os.getcwd(), "spotifymod")
if not os.path.exists(dl_dir):
os.makedirs(dl_dir, exist_ok=True)
for f in os.listdir(dl_dir):
try:
os.remove(os.path.join(dl_dir, f))
except:
pass
try:
squery = query.replace('"', '').replace("'", "")
cmd = (
f'{self.config["ytdlp_path"]} -x --audio-format mp3 --add-metadata '
f'-o "{dl_dir}/%(title)s [%(id)s].%(ext)s" '
f'"ytsearch1:{squery}"'
)
proc = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
await proc.communicate()
files = [f for f in os.listdir(dl_dir) if f.endswith(".mp3")]
if files:
target_file = os.path.join(dl_dir, files[0])
await utils.answer(message, caption, file=target_file)
else:
await utils.answer(message, self.strings("snowt_failed"))
except Exception as e:
logger.error(e)
await utils.answer(message, self.strings("dl_err"))
finally:
if os.path.exists(dl_dir):
for f in os.listdir(dl_dir):
try:
os.remove(os.path.join(dl_dir, f))
except:
pass
@error_handler
@tokenized
@loader.command(
ru_doc="- ➕ Добавить текущий трек в плейлист (используйте номер из .splaylists)"
)
async def splaylistadd(self, message: Message):
"""- ➕ Add current track to playlist (use number from .splaylists)"""
args = utils.get_args_raw(message)
if not args or not args.isdigit():
await utils.answer(message, self.strings("invalid_playlist_index"))
return
index = int(args) - 1
playlists = self.get("last_playlists", [])
if not playlists:
await utils.answer(message, self.strings("no_cached_playlists"))
return
if index < 0 or index >= len(playlists):
await utils.answer(message, self.strings("invalid_playlist_index"))
return
current = self.sp.current_playback()
if not current or not current.get("item"):
await utils.answer(message, self.strings("no_music"))
return
track_uri = current["item"]["uri"]
track_name = current["item"]["name"]
artists = ", ".join([a["name"] for a in current["item"]["artists"]])
full_track_name = f"{artists} - {track_name}"
playlist_id = playlists[index]["id"]
playlist_name = playlists[index]["name"]
try:
self.sp.playlist_add_items(playlist_id, [track_uri])
except spotipy.exceptions.SpotifyException as e:
if e.http_status == 403 and "Insufficient client scope" in str(e):
await utils.answer(message, self.strings("need_auth"))
return
raise e
await utils.answer(message, self.strings("added_to_playlist").format(utils.escape_html(full_track_name), utils.escape_html(playlist_name)))
@error_handler
@tokenized
@loader.command(
ru_doc="- ➖ Удалить текущий трек из плейлиста (используйте номер из .splaylists)"
)
async def splaylistrem(self, message: Message):
"""- ➖ Remove current track from playlist (use number from .splaylists)"""
args = utils.get_args_raw(message)
if not args or not args.isdigit():
await utils.answer(message, self.strings("invalid_playlist_index"))
return
index = int(args) - 1
playlists = self.get("last_playlists", [])
if not playlists:
await utils.answer(message, self.strings("no_cached_playlists"))
return
if index < 0 or index >= len(playlists):
await utils.answer(message, self.strings("invalid_playlist_index"))
return
current = self.sp.current_playback()
if not current or not current.get("item"):
await utils.answer(message, self.strings("no_music"))
return
track_uri = current["item"]["uri"]
track_name = current["item"]["name"]
artists = ", ".join([a["name"] for a in current["item"]["artists"]])
full_track_name = f"{artists} - {track_name}"
playlist_id = playlists[index]["id"]
playlist_name = playlists[index]["name"]
try:
self.sp.playlist_remove_all_occurrences_of_items(playlist_id, [track_uri])
except spotipy.exceptions.SpotifyException as e:
if e.http_status == 403 and "Insufficient client scope" in str(e):
await utils.answer(message, self.strings("need_auth"))
return
raise e
await utils.answer(message, self.strings("removed_from_playlist").format(utils.escape_html(full_track_name), utils.escape_html(playlist_name)))
@error_handler
@tokenized
@loader.command(
ru_doc="- 🆕 Создать новый плейлист"
)
async def splaylistcreate(self, message: Message):
"""- 🆕 Create a new playlist"""
name = utils.get_args_raw(message)
if not name:
await utils.answer(message, self.strings("no_playlist_name"))
return
user_id = self.sp.me()["id"]
self.sp.user_playlist_create(user_id, name)
await utils.answer(message, self.strings("playlist_created").format(utils.escape_html(name)))
@error_handler
@tokenized
@loader.command(
ru_doc="- 🗑 Удалить плейлист (используйте номер из .splaylists)"
)
async def splaylistdelete(self, message: Message):
"""- 🗑 Delete playlist (use number from .splaylists)"""
args = utils.get_args_raw(message)
if not args or not args.isdigit():
await utils.answer(message, self.strings("invalid_playlist_index"))
return
index = int(args) - 1
playlists = self.get("last_playlists", [])
if not playlists:
await utils.answer(message, self.strings("no_cached_playlists"))
return
if index < 0 or index >= len(playlists):
await utils.answer(message, self.strings("invalid_playlist_index"))
return
playlist_id = playlists[index]["id"]
playlist_name = playlists[index]["name"]
self.sp.current_user_unfollow_playlist(playlist_id)
await utils.answer(message, self.strings("playlist_deleted").format(utils.escape_html(playlist_name)))
@error_handler
@tokenized
@loader.command(
ru_doc="- 📃 Получить все плейлисты"
)
async def splaylists(self, message: Message):
"""- 📃 Get all playlists"""
user_id = self.sp.me()["id"]
playlists = self.sp.current_user_playlists()
editable_playlists = []
for playlist in playlists["items"]:
if playlist["owner"]["id"] == user_id or playlist["collaborative"]:
editable_playlists.append(playlist)
self.set("last_playlists", editable_playlists)
playlist_list_text = ""
for i, playlist in enumerate(editable_playlists):
name = utils.escape_html(playlist["name"])
url = playlist["external_urls"]["spotify"]
count = playlist["tracks"]["total"]
playlist_list_text += f"{i + 1}. {name} ({count} tracks)\n"
if not playlist_list_text:
await utils.answer(message, self.strings("no_playlists"))
else:
await utils.answer(message, self.strings("playlists_list").format(playlist_list_text))
@error_handler
@tokenized
@loader.command(
ru_doc="- ℹ️ Переключить стриминг воспроизведения в био"
)
async def sbiocmd(self, message: Message):
"""- ℹ️ Toggle bio playback streaming"""
current = self.get("autobio", False)
new = not current
self.set("autobio", new)
await utils.answer(
message,
self.strings("autobio").format("enabled" if new else "disabled"),
)
if new:
self.autobio.start()
else:
self.autobio.stop()
@error_handler
@tokenized
@loader.command(
ru_doc="- 🔊 Изменить громкость. .svolume <0-100>"
)
async def svolume(self, message: Message):
"""- 🔊 Change playback volume. .svolume <0-100>"""
try:
args = utils.get_args_raw(message)
if not args:
await utils.answer(message, self.strings("no_volume_arg"))
return
volume_percent = int(args)
if 0 <= volume_percent <= 100:
self.sp.volume(volume_percent)
await utils.answer(message, self.strings("volume_changed").format(volume_percent))
else:
await utils.answer(message, self.strings("volume_invalid"))
except ValueError:
await utils.answer(message, self.strings("volume_invalid"))
except Exception:
await utils.answer(message, self.strings("volume_err"))
@error_handler
@tokenized
@loader.command(
ru_doc=(
"- 🎵 Выбрать устройство для воспроизведения. Например: .sdevice \n"
"- 📝 Показать список устройств: .sdevice"
)
)
async def sdevicecmd(self, message: Message):
"""- 🎵 Set preferred playback device. Usage: .sdevice or .sdevice to list devices"""
args = utils.get_args_raw(message)
devices = self.sp.devices()["devices"]
if not args:
if not devices:
await utils.answer(message, self.strings("no_devices_found"))
return
device_list_text = ""
for i, device in enumerate(devices):
is_active = "(active)" if device["is_active"] else ""
device_list_text += (
f"{i+1}. {device['name']}"
f" ({device['type']}) {is_active}\n"
)
await utils.answer(message, self.strings("device_list").format(device_list_text.strip()))
return
device_id = None
try:
device_number = int(args)
if 0 < device_number <= len(devices):
device_id = devices[device_number - 1]["id"]
device_name = devices[device_number - 1]["name"]
else:
await utils.answer(message, self.strings("invalid_device_id"))
return
except ValueError:
found_device = next((d for d in devices if d["id"] == args.strip()), None)
if found_device:
device_id = found_device["id"]
device_name = found_device["name"]
else:
await utils.answer(message, self.strings("invalid_device_id"))
return
self.sp.transfer_playback(device_id=device_id)
await utils.answer(message, self.strings("device_changed").format(device_name))
@error_handler
@tokenized
@loader.command(
ru_doc="- 💫 Включить повтор трека"
)
async def srepeatcmd(self, message: Message):
"""- 💫 Repeat"""
self.sp.repeat("track")
await utils.answer(message, self.strings("on-repeat"))
@error_handler
@tokenized
@loader.command(
ru_doc="- ✋ Остановить повтор"
)
async def sderepeatcmd(self, message: Message):
"""- ✋ Stop repeat"""
self.sp.repeat("context")
await utils.answer(message, self.strings("off-repeat"))
@error_handler
@tokenized
@loader.command(
ru_doc="- 👉 Следующий трек"
)
async def snextcmd(self, message: Message):
"""- 👉 Next track"""
self.sp.next_track()
await utils.answer(message, self.strings("skipped"))
@error_handler
@tokenized
@loader.command(
ru_doc="- 🤚 Продолжить воспроизведение"
)
async def sresumecmd(self, message: Message):
"""- 🤚 Resume"""
self.sp.start_playback()
await utils.answer(message, self.strings("playing"))
@error_handler
@tokenized
@loader.command(
ru_doc="- 🤚 Пауза"
)
async def spausecmd(self, message: Message):
"""- 🤚 Pause"""
self.sp.pause_playback()
await utils.answer(message, self.strings("paused"))
@error_handler
@tokenized
@loader.command(
ru_doc="- ⏮ Предыдущий трек"
)
async def sbackcmd(self, message: Message):
"""- ⏮ Previous track"""
self.sp.previous_track()
await utils.answer(message, self.strings("back"))
@error_handler
@tokenized
@loader.command(
ru_doc="- ⏪ Перезапустить трек"
)
async def sbegincmd(self, message: Message):
"""- ⏪ Restart track"""
self.sp.seek_track(0)
await utils.answer(message, self.strings("restarted"))
@error_handler
@tokenized
@loader.command(
ru_doc="- ❤️ Лайкнуть играющий трек"
)
async def slikecmd(self, message: Message):
"""- ❤️ Like current track"""
cupl = self.sp.current_playback()
self.sp.current_user_saved_tracks_add([cupl["item"]["id"]])
await utils.answer(message, self.strings("liked"))
@error_handler
@tokenized
@loader.command(
ru_doc="- 💔 Убрать лайк с играющего трека"
)
async def sunlikecmd(self, message: Message):
"""- 💔 Unlike current track"""
cupl = self.sp.current_playback()
self.sp.current_user_saved_tracks_delete([cupl["item"]["id"]])
await utils.answer(message, self.strings("unlike"))
@error_handler
@loader.command(
ru_doc="- Получить ссылку для авторизации"
)
async def sauthcmd(self, message: Message):
"""- Get authorization link"""
if self.get("acs_tkn", False) and not self.sp:
await utils.answer(message, self.strings("already_authed"))
else:
self.sp_auth.get_authorize_url()
await utils.answer(
message,
self.strings("auth").format(self.sp_auth.get_authorize_url()),
)
@error_handler
@loader.command(
ru_doc="- Вставить код авторизации"
)
async def scodecmd(self, message: Message):
"""- Paste authorization code"""
url = message.message.split(" ")[1]
code = self.sp_auth.parse_auth_response_url(url)
self.set("acs_tkn", self.sp_auth.get_access_token(code, True, False))
self.sp = spotipy.Spotify(auth=self.get("acs_tkn")["access_token"])
await utils.answer(message, self.strings("authed"))
@error_handler
@loader.command(
ru_doc="- Выйти из аккаунта"
)
async def unauthcmd(self, message: Message):
"""- Log out of account"""
self.set("acs_tkn", None)
del self.sp
await utils.answer(message, self.strings("deauth"))
@error_handler
@tokenized
@loader.command(
ru_doc="- Обновить токен авторизации"
)
async def stokrefreshcmd(self, message: Message):
"""- Refresh authorization token"""
self.set(
"acs_tkn",
self.sp_auth.refresh_access_token(self.get("acs_tkn")["refresh_token"]),
)
self.set("NextRefresh", time.time() + 45 * 60)
self.sp = spotipy.Spotify(auth=self.get("acs_tkn")["access_token"])
await utils.answer(message, self.strings("authed"))
@error_handler
@tokenized
@loader.command(
ru_doc="- 🎧 Показать карточку играющего трека"
)
async def snowcmd(self, message: Message):
"""- 🎧 View current track card."""
current_playback = self.sp.current_playback()
if not current_playback or not current_playback.get("is_playing", False):
await utils.answer(message, self.strings("no_music"))
return
track = current_playback["item"]["name"]
track_id = current_playback["item"]["id"]
artists = ", ".join([a["name"] for a in current_playback["item"]["artists"]])
album_name = current_playback["item"]["album"].get("name", "Unknown Album")
duration_ms = current_playback["item"].get("duration_ms", 0)
progress_ms = current_playback.get("progress_ms", 0)
duration = f"{duration_ms//1000//60}:{duration_ms//1000%60:02}"
progress = f"{progress_ms//1000//60}:{progress_ms//1000%60:02}"
spotify_url = f"https://open.spotify.com/track/{track_id}"
songlink = f"https://song.link/s/{track_id}"
try:
device_raw = (
current_playback["device"]["name"]
+ " "
+ current_playback["device"]["type"].lower()
)
device = device_raw.replace("computer", "").replace("smartphone", "").strip()
except Exception:
device = None
try:
playlist_id = current_playback["context"]["uri"].split(":")[-1]
playlist = self.sp.playlist(playlist_id)
playlist_name = playlist.get("name", None)
try:
playlist_owner = (
f''
f'{playlist["owner"]["display_name"]}'
)
except KeyError:
playlist_owner = playlist.get("owner", {}).get("display_name", "")
except Exception:
playlist_name = ""
playlist_owner = ""
text = self.config["custom_text"].format(
track=utils.escape_html(track),
artists=utils.escape_html(artists),
album=utils.escape_html(album_name),
duration=duration,
progress=progress,
device=device,
spotify_url=spotify_url,
songlink=songlink,
playlist=utils.escape_html(playlist_name) if playlist_name else "",
playlist_owner=playlist_owner or "",
)
if self.config["show_banner"]:
cover_url = current_playback["item"]["album"]["images"][0]["url"]
tmp_msg = await utils.answer(message, text + self.strings("uploading_banner"))
banners = Banners(
title=track,
artists=artists,
duration=duration_ms,
progress=progress_ms,
track_cover=requests.get(cover_url).content,
font=self.config["font"],
)
file = getattr(banners, self.config["banner_version"], banners.horizontal)()
await utils.answer(tmp_msg, text, file=file)
else:
await utils.answer(message, text)
@error_handler
@tokenized
@loader.command(
ru_doc="- 🎧 Скачать играющий трек"
)
async def snowtcmd(self, message: Message):
"""- 🎧 Download current track."""
current_playback = self.sp.current_playback()
if not current_playback or not current_playback.get("is_playing", False):
await utils.answer(message, self.strings("no_music"))
return
track = current_playback["item"]["name"]
artists = ", ".join([a["name"] for a in current_playback["item"]["artists"]])
album_name = current_playback["item"]["album"].get("name", "Unknown Album")
duration_ms = current_playback["item"].get("duration_ms", 0)
progress_ms = current_playback.get("progress_ms", 0)
duration = f"{duration_ms//1000//60}:{duration_ms//1000%60:02}"
progress = f"{progress_ms//1000//60}:{progress_ms//1000%60:02}"
spotify_url = f"https://open.spotify.com/track/{current_playback['item']['id']}"
songlink = f"https://song.link/s/{current_playback['item']['id']}"
try:
device_raw = (
current_playback["device"]["name"]
+ " "
+ current_playback["device"]["type"].lower()
)
device = device_raw.replace("computer", "").replace("smartphone", "").strip()
except Exception:
device = None
try:
playlist_id = current_playback["context"]["uri"].split(":")[-1]
playlist = self.sp.playlist(playlist_id)
playlist_name = playlist.get("name", None)
try:
playlist_owner = (
f''
f'{playlist["owner"]["display_name"]}'
)
except KeyError:
playlist_owner = playlist.get("owner", {}).get("display_name", "")
except Exception:
playlist_name = ""
playlist_owner = ""
text = self.config["custom_text"].format(
track=utils.escape_html(track),
artists=utils.escape_html(artists),
album=utils.escape_html(album_name),
duration=duration,
progress=progress,
device=device,
spotify_url=spotify_url,
songlink=songlink,
playlist=utils.escape_html(playlist_name) if playlist_name else "",
playlist_owner=playlist_owner or "",
)
msg = await utils.answer(message, text + self.strings("downloading_track"))
await self._download_track(msg, f"{artists} {track}", caption=text)
@error_handler
@tokenized
@loader.command(
ru_doc=(
"- 🔍 Поиск треков. Например: .ssearch Imagine Dragons Believer\n"
"- 🎧 Скачать трек: .ssearch 1 (где 1 — номер трека из списка)"
)
)
async def ssearchcmd(self, message: Message):
"""🔍 Search for tracks. Usage: .ssearch or .ssearch to download"""
args = utils.get_args_raw(message)
if not args:
await utils.answer(message, self.strings("no_search_query"))
return
try:
track_number = int(args)
search_results = self.get("last_search_results", [])
if not search_results:
await utils.answer(message, self.strings("no_tracks_found"))
return
if track_number <= 0 or track_number > len(search_results):
raise ValueError
msg = await utils.answer(message, self.strings("downloading_track"))
track_info = search_results[track_number - 1]
track_name = track_info["name"]
artists = ", ".join([a["name"] for a in track_info["artists"]])
caption_text = self.strings("download_success").format(
utils.escape_html(track_name),
utils.escape_html(artists)
)
await self._download_track(msg, f"{artists} {track_name}", caption=caption_text)
return
except ValueError:
await utils.answer(message, self.strings("searching_tracks").format(args))
results = self.sp.search(q=args, limit=5, type="track")
if not results or not results["tracks"]["items"]:
await utils.answer(message, self.strings("no_tracks_found").format(args))
return
self.set("last_search_results", results["tracks"]["items"])
tracks_list = []
for i, track in enumerate(results["tracks"]["items"]):
track_name = track["name"]
artists = ", ".join([artist["name"] for artist in track["artists"]])
track_url = track["external_urls"]["spotify"]
tracks_list.append(
"{number}. {track_name} — {artists}\n🔗 Spotify".format(
number=i + 1,
track_name=utils.escape_html(track_name),
artists=utils.escape_html(artists),
track_url=track_url,
)
)
text = "\n".join(tracks_list)
await utils.answer(message, self.strings("search_results").format(args, text))
@loader.command(
ru_doc="- 🔄 Сброс результатов поиска по трекам"
)
async def ssearchresetcmd(self, message: Message):
"""- 🔄 Reset track search results"""
self.set("last_search_results", [])
await utils.answer(message, self.strings["search_results_cleared"])
async def watcher(self, message: Message):
"""Watcher is used to update token"""
if not self.sp:
return
if self.get("NextRefresh", False):
ttc = self.get("NextRefresh", 0)
crnt = time.time()
if ttc < crnt:
self.set(
"acs_tkn",
self.sp_auth.refresh_access_token(
self.get("acs_tkn")["refresh_token"]
),
)
self.set("NextRefresh", time.time() + 45 * 60)
self.sp = spotipy.Spotify(auth=self.get("acs_tkn")["access_token"])
else:
self.set(
"acs_tkn",
self.sp_auth.refresh_access_token(self.get("acs_tkn")["refresh_token"]),
)
self.set("NextRefresh", time.time() + 45 * 60)
self.sp = spotipy.Spotify(auth=self.get("acs_tkn")["access_token"])