__version__ = (1, 0, 3)
# █ █ ▀ █▄▀ ▄▀█ █▀█ ▀
# █▀█ █ █ █ █▀█ █▀▄ █
# © Copyright 2022
# https://t.me/hikariatama
#
# 🔒 Licensed under the GNU AGPLv3
# 🌐 https://www.gnu.org/licenses/agpl-3.0.html
# meta pic: https://static.dan.tatar/spotify_icon.png
# meta banner: https://mods.hikariatama.ru/badges/spotify.jpg
# meta developer: @hikarimods
# scope: hikka_only
# scope: hikka_min 1.2.10
# requires: spotipy Pillow
import asyncio
import contextlib
import functools
import io
import logging
import re
import time
import traceback
from math import ceil
from types import FunctionType
import requests
import spotipy
from hikkatl.errors.rpcerrorlist import FloodWaitError
from hikkatl.tl.functions.account import UpdateProfileRequest
from hikkatl.tl.types import Message
from PIL import Image, ImageDraw, ImageFont
from .. import loader, utils
logger = logging.getLogger(__name__)
logging.getLogger("spotipy").setLevel(logging.CRITICAL)
SIZE = (1200, 320)
INNER_MARGIN = (16, 16)
TRACK_FS = 48
ARTIST_FS = 32
@loader.tds
class SpotifyMod(loader.Module):
"""Display beautiful spotify now bar. Idea: t.me/fuccsoc. Implementation: t.me/hikariatama"""
strings = {
"name": "SpotifyNow",
"need_auth": (
"🚫 Call"
" .sauth before using this action."
),
"on-repeat": (
"💫 Set on-repeat."
),
"off-repeat": (
"✋ Stopped track"
" repeat."
),
"skipped": (
"👉 Skipped track."
),
"err": (
"🚫 Error occurred. Make"
" sure the track is playing!\n{}"
),
"already_authed": (
"🚫 You are already"
" authentificated"
),
"authed": (
"🎧 Auth successful"
),
"playing": (
"🎧 Playing..."
),
"back": (
"👈 Switched to previous"
" track"
),
"paused": "🤚 Pause",
"deauth": (
"🚪 Unauthentificated"
),
"restarted": (
"👈 Playing track"
" from the"
" beginning"
),
"auth": (
'🔐 Proceed'
" here, approve request, then .scode https://... with"
" redirected url"
),
"liked": (
"❤️ Liked current"
" playback"
),
"autobio": (
"🎧 Spotify autobio"
" {}"
),
"404": "🚫 No results",
"playing_track": (
"🎧 {} added to queue"
),
"no_music": (
"🚫 No music is"
" playing!"
),
"searching": (
"🔎 Searching..."
),
"currently_on": "Currently listening on",
"playlist": "Playlist",
"owner": "Owner",
"quality": "Quality",
}
strings_ru = {
"need_auth": (
"🚫 Выполни"
" .sauth перед выполнением этого действия."
),
"on-repeat": (
"💫 Повторение"
" включено."
),
"off-repeat": (
"✋ Повторение"
" выключено."
),
"skipped": (
"👉 Трек переключен."
),
"err": (
"🚫 Произошла ошибка."
" Убедитесь, что музыка играет!\n{}"
),
"already_authed": (
"🚫 Уже авторизован"
),
"authed": (
"🎧 Успешная"
" аутентификация"
),
"playing": "🎧 Играю...",
"back": (
"👈 Переключил назад"
),
"paused": "🤚 Пауза",
"deauth": (
"🚪 Авторизация"
" отменена"
),
"restarted": (
"👈 Начал трек"
" сначала"
),
"liked": (
'❤️ Поставил "Мне'
' нравится" текущему треку'
),
"autobio": (
"🎧 Обновление био"
" включено {}"
),
"404": (
"🚫 Нет результатов"
),
"playing_track": (
"🎧 {} добавлен в"
" очередь"
),
"no_music": (
"🚫 Музыка не играет!"
),
"_cmd_doc_sfind": "Найти информацию о треке",
"_cmd_doc_sauth": "Первый этап аутентификации",
"_cmd_doc_scode": "Второй этап аутентификации",
"_cmd_doc_unauth": "Отменить аутентификацию",
"_cmd_doc_sbio": "Включить автоматическое био",
"_cmd_doc_stokrefresh": "Принудительное обновление токена",
"_cmd_doc_snow": "Показать карточку текущего трека",
"_cls_doc": (
"Тулкит для Spotify. Автор идеи: @fuccsoc. Реализация: @hikariatama"
),
"currently_on": "Сейчас слушаю на",
"playlist": "Плейлист",
"owner": "Владелец",
"quality": "Качество",
}
strings_de = {
"need_auth": (
"🚫 Führe"
" .sauth aus, bevor du diese Aktion ausführst."
),
"on-repeat": (
"💫 Wiederholung"
" aktiviert."
),
"off-repeat": (
"✋ Wiederholung"
" deaktiviert."
),
"skipped": (
"👉 Track"
" übersprungen."
),
"err": (
"🚫 Ein Fehler ist"
" aufgetreten. Stelle sicher, dass eine Musik abgespielt"
" wird!\n{}"
),
"already_authed": (
"🚫 Bereits"
" authentifiziert"
),
"authed": (
"🎧 Authentifizierung"
" erfolgreich"
),
"playing": (
"🎧 Spielt ab..."
),
"back": (
"👈 Zum vorherigen Track"
" gewechselt"
),
"paused": "🤚 Pausiert",
"deauth": (
"🚪 Authentifizierung"
" aufgehoben"
),
"restarted": (
"👈 Track von vorne"
" gestartet"
),
"liked": (
"❤️ Der aktuelle Track"
" wurde geliked"
),
"autobio": (
"🎧 Spotify Autobio"
" {}"
),
"404": (
"🚫 Keine Ergebnisse"
),
"playing_track": (
"🎧 {} zur Warteschlange"
" hinzugefügt"
),
"no_music": (
"🚫 Es wird keine Musik"
" abgespielt!"
),
"_cmd_doc_sfind": "Finde Informationen über einen Track",
"_cmd_doc_sauth": "Erster Schritt der Authentifizierung",
"_cmd_doc_scode": "Zweiter Schritt der Authentifizierung",
"_cmd_doc_unauth": "Authentifizierung aufheben",
"_cmd_doc_sbio": "Spotify Autobio aktivieren",
"_cmd_doc_stokrefresh": "Token erzwingen",
"_cmd_doc_snow": "Zeige die Karte des aktuellen Tracks",
"_cls_doc": (
"Toolkit für Spotify. Idee von: @fuccsoc. Implementierung von: @hikariatama"
),
"currently_on": "Aktuell auf",
"playlist": "Wiedergabeliste",
"owner": "Besitzer",
"quality": "Qualität",
}
strings_tr = {
"need_auth": (
"🚫 Bu eylemi"
" gerçekleştirmeden önce .sauth komutunu kullanın."
),
"on-repeat": (
"💫 Tekrar açık."
),
"off-repeat": (
"✋ Tekrar kapalı."
),
"skipped": (
"👉 Şarkı atlandı."
),
"err": (
"🚫 Bir hata oluştu."
" Müzik"
" çalmak istediğinizden emin olun!\n{}"
),
"already_authed": (
"🚫 Zaten"
" yetkilendirildi"
),
"authed": (
"🎧 Yetkilendirme"
" başarılı"
),
"playing": (
"🎧 Oynatılıyor..."
),
"back": (
"👈 Önceki şarkıya"
" geçildi"
),
"paused": (
"🤚 Duraklatıldı"
),
"deauth": (
"🚪 Yetkilendirme iptal"
" edildi"
),
"restarted": (
"👈 Şarkı tekrar"
" başlatıldı"
),
"liked": (
"❤️ Geçerli şarkı"
" beğenildi"
),
"autobio": (
"🎧 Spotify Otomatik Bio"
" {}"
),
"404": "🚫 Sonuç yok",
"playing_track": (
"🎧 {} kuyruğa"
" eklendi"
),
"no_music": (
"🚫 Şu anda müzik"
" çalmıyor!"
),
"_cmd_doc_sfind": "Bir şarkı hakkında bilgi bul",
"_cmd_doc_sauth": "Yetkilendirme için ilk adım",
"_cmd_doc_scode": "Yetkilendirme için ikinci adım",
"_cmd_doc_unauth": "Yetkilendirmeyi kaldır",
"_cmd_doc_sbio": "Spotify Otomatik Bio etkinleştir",
"_cmd_doc_stokrefresh": "Zorla token yenile",
"_cmd_doc_snow": "Geçerli şarkı kartını göster",
"_cls_doc": "Spotify için bir araç. Fikir: @fuccsoc. Uygulama: @hikariatama",
"currently_on": "Şu anda dinleniyor",
"playlist": "Çalma listesi",
"owner": "Sahibi",
"quality": "Kalite",
}
strings_uz = {
"need_auth": (
"🚫 Bu harakatni"
" bajarishdan oldin .sauth buyrug'ini ishlating."
),
"on-repeat": (
"💫 Takror yoqilgan."
),
"off-repeat": (
"✋ Takror yopilgan."
),
"skipped": (
"👉 Mashq o'tkazildi."
),
"err": (
"🚫 Xatolik yuz berdi."
" Muzyka oynatilganiga ishonchingiz komilmi?\n{}"
),
"already_authed": (
"🚫 Allaqachon"
" tasdiqlangan"
),
"authed": (
"🎧 Tasdiqlash"
" muvaffaqiyatli bajarildi"
),
"playing": (
"🎧 Oynatilmoqda..."
),
"back": (
"👈 Oldingi mashqa"
" o'tildi"
),
"paused": (
"🤚 To'xtatildi"
),
"deauth": (
"🚪 Tasdiqlash bekor"
" qilindi"
),
"restarted": (
"👈 Mashq qayta"
" boshlandi"
),
"liked": (
"❤️ Joriy mashq"
" yoqildi"
),
"autobio": (
"🎧 Spotify Avtomatik Bio"
" {}"
),
"404": (
"🚫 Natija topilmadi"
),
"playing_track": (
"🎧 {} qo'shildi"
),
"no_music": (
"🚫 Hozircha hech qanday"
" musiqa oynatilmoqda emas!"
),
"_cmd_doc_sfind": "Mashq haqida ma'lumot toping",
"_cmd_doc_sauth": "Tasdiqlash uchun birinchi qadam",
"_cmd_doc_scode": "Tasdiqlash uchun ikkinchi qadam",
"_cmd_doc_unauth": "Tasdiqlashni bekor qilish",
"_cmd_doc_sbio": "Spotify Avtomatik Bio yoqish",
"_cmd_doc_stokrefresh": "Tokenni qo'lda qayta tiklash",
"_cmd_doc_snow": "Joriy mashq kartasini ko'rsatish",
"_cls_doc": "Spotify uchun asbob. Fikr: @fuccsoc. Tuzilishi: @hikariatama",
"currently_on": "Hozircha",
"playlist": "O'ynatiladiganlar",
"owner": "Sahibi",
"quality": "Sifat",
}
strings_es = {
"need_auth": (
"🚫 Para usar este"
" comando, primero usa .sauth."
),
"on-repeat": (
"💫 Reproducción en bucle"
" activada."
),
"off-repeat": (
"✋ Reproducción en bucle"
" desactivada."
),
"skipped": (
"👉 Pista saltada."
),
"err": (
"🚫 Ha ocurrido un error."
" ¿Estás seguro de que hay música sonando?\n{}"
),
"already_authed": (
"🚫 Ya autorizado"
),
"authed": (
"🎧 Autorizado"
" correctamente"
),
"playing": (
"🎧 Reproduciendo..."
),
"back": (
"👈 Volviste a la pista"
" anterior"
),
"paused": "🤚 Pausado",
"deauth": (
"🚪 Autorización"
" desactivada"
),
"restarted": (
"👈 Pista reiniciada"
),
"liked": (
"❤️ Pista actual"
" añadida a favoritos"
),
"autobio": (
"🎧 Spotify Auto Bio"
" {}"
),
"404": (
"🚫 No se encontraron"
" resultados"
),
"playing_track": (
"🎧 {} añadido"
),
"no_music": (
"🚫 ¡No hay música"
" actualmente!"
),
"_cmd_doc_sfind": "Encuentra información sobre una canción",
"_cmd_doc_sauth": "Primer paso para autorizar",
"_cmd_doc_scode": "Segundo paso para autorizar",
"_cmd_doc_unauth": "Desautorizar",
"_cmd_doc_sbio": "Activar Auto Bio de Spotify",
"_cmd_doc_stokrefresh": "Actualizar token en segundo plano",
"_cmd_doc_snow": "Muestra la tarjeta de la canción actual",
"_cls_doc": "Recursos para Spotify. Idea: @fuccsoc. Creado por: @hikariatama",
"currently_on": "Escuchando actualmente en",
"playlist": "Lista de reproducción",
"owner": "Propietario",
"quality": "Calidad",
}
def __init__(self):
self._client_id = "e0708753ab60499c89ce263de9b4f57a"
self._client_secret = "80c927166c664ee98a43a2c0e2981b4a"
self.scope = (
"user-read-playback-state playlist-read-private playlist-read-collaborative"
" app-remote-control user-modify-playback-state user-library-modify"
" user-library-read"
)
self.sp_auth = spotipy.oauth2.SpotifyOAuth(
client_id=self._client_id,
client_secret=self._client_secret,
redirect_uri="https://thefsch.github.io/spotify/",
scope=self.scope,
)
self.config = loader.ModuleConfig(
loader.ConfigValue(
"AutoBioTemplate",
"🎧 {} ───○ 🔊 ᴴᴰ",
lambda: "Template for Spotify AutoBio",
)
)
def create_bar(self, current_playback: dict) -> str:
try:
percentage = ceil(
current_playback["progress_ms"]
/ current_playback["item"]["duration_ms"]
* 100
)
bar_filled = ceil(percentage / 10) - 1
bar_empty = 10 - bar_filled - 1
bar = "".join("─" for _ in range(bar_filled)) + "🞆"
bar += "".join("─" for _ in range(bar_empty))
bar += (
f' {current_playback["progress_ms"] // 1000 // 60:02}:{current_playback["progress_ms"] // 1000 % 60:02} /'
)
bar += (
f' {current_playback["item"]["duration_ms"] // 1000 // 60:02}:{current_playback["item"]["duration_ms"] // 1000 % 60:02}'
)
except Exception:
bar = "──────🞆─── 0:00 / 0:00"
return bar
@staticmethod
def create_vol(vol: int) -> str:
volume = "─" * (vol * 4 // 100)
volume += "○"
volume += "─" * (4 - vol * 4 // 100)
return volume
async def create_badge(self, thumb_url: str, title: str, artist: str) -> bytes:
thumb = Image.open(
io.BytesIO((await utils.run_sync(requests.get, thumb_url)).content)
)
im = Image.new("RGB", SIZE, (30, 30, 30))
draw = ImageDraw.Draw(im)
thumb_size = SIZE[1] - INNER_MARGIN[1] * 2
thumb = thumb.resize((thumb_size, thumb_size))
im.paste(thumb, INNER_MARGIN)
tpos = INNER_MARGIN
tpos = (
tpos[0] + thumb_size + INNER_MARGIN[0] + 8,
thumb_size // 2 - (TRACK_FS + ARTIST_FS) // 2,
)
draw.text(tpos, title, (255, 255, 255), font=self.font)
draw.text(
(tpos[0], tpos[1] + TRACK_FS + 8),
artist,
(180, 180, 180),
font=self.font_smaller,
)
img = io.BytesIO()
im.save(img, format="PNG")
return img.getvalue()
@loader.loop(interval=90)
async def autobio(self):
try:
current_playback = self.sp.current_playback()
track = current_playback["item"]["name"]
track = re.sub(r"([(].*?[)])", "", track).strip()
except Exception:
return
bio = self.config["AutoBioTemplate"].format(f"{track}")
try:
await self._client(
UpdateProfileRequest(about=bio[: 140 if self._premium else 70])
)
except FloodWaitError as e:
logger.info(f"Sleeping {max(e.seconds, 60)} bc of floodwait")
await asyncio.sleep(max(e.seconds, 60))
return
async def _dl_font(self):
font = (
await utils.run_sync(
requests.get,
"https://github.com/hikariatama/assets/raw/master/ARIALUNI.TTF",
)
).content
self.font_smaller = ImageFont.truetype(
io.BytesIO(font), ARTIST_FS, encoding="UTF-8"
)
self.font = ImageFont.truetype(io.BytesIO(font), TRACK_FS, encoding="UTF-8")
self.font_ready.set()
async def client_ready(self, client, db):
self.font_ready = asyncio.Event()
asyncio.ensure_future(self._dl_font())
self._premium = getattr(await client.get_me(), "premium", False)
try:
self.sp = spotipy.Spotify(auth=self.get("acs_tkn")["access_token"])
except Exception:
self.set("acs_tkn", None)
self.sp = None
if self.get("autobio", False):
self.autobio.start()
with contextlib.suppress(Exception):
await utils.dnd(client, "@DirectLinkGenerator_Bot", archive=True)
self.musicdl = await self.import_lib(
"https://libs.hikariatama.ru/musicdl.py",
suspend_on_error=True,
)
def tokenized(func) -> FunctionType:
@functools.wraps(func)
async def wrapped(*args, **kwargs):
if not args[0].get("acs_tkn", False) or not args[0].sp:
await utils.answer(args[1], args[0].strings("need_auth"))
return
return await func(*args, **kwargs)
wrapped.__doc__ = func.__doc__
wrapped.__module__ = func.__module__
return wrapped
def error_handler(func) -> FunctionType:
@functools.wraps(func)
async def wrapped(*args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception:
logger.exception(traceback.format_exc())
with contextlib.suppress(Exception):
await utils.answer(
args[1],
args[0].strings("err").format(traceback.format_exc()),
)
wrapped.__doc__ = func.__doc__
wrapped.__module__ = func.__module__
return wrapped
def autodelete(func) -> FunctionType:
@functools.wraps(func)
async def wrapped(*args, **kwargs):
a = await func(*args, **kwargs)
with contextlib.suppress(Exception):
await asyncio.sleep(10)
await args[1].delete()
return a
wrapped.__doc__ = func.__doc__
wrapped.__module__ = func.__module__
return wrapped
@error_handler
@tokenized
@autodelete
async def srepeatcmd(self, message: Message):
"""💫 Repeat"""
self.sp.repeat("track")
await utils.answer(message, self.strings("on-repeat"))
@error_handler
@tokenized
@autodelete
async def sderepeatcmd(self, message: Message):
"""✋ Stop repeat"""
self.sp.repeat("context")
await utils.answer(message, self.strings("off-repeat"))
@error_handler
@tokenized
@autodelete
async def snextcmd(self, message: Message):
"""👉 Skip"""
self.sp.next_track()
await utils.answer(message, self.strings("skipped"))
@error_handler
@tokenized
@autodelete
async def spausecmd(self, message: Message):
"""🤚 Pause"""
self.sp.pause_playback()
await utils.answer(message, self.strings("paused"))
@error_handler
@tokenized
@autodelete
async def splaycmd(self, message: Message, from_sq: bool = False):
"""▶️ Play"""
args = utils.get_args_raw(message)
reply = await message.get_reply_message()
if not args:
if not reply or "https://open.spotify.com/track/" not in reply.text:
self.sp.start_playback()
await utils.answer(message, self.strings("playing"))
return
else:
args = re.search('https://open.spotify.com/track/(.+?)"', reply.text)[1]
try:
track = self.sp.track(args)
except Exception:
search = self.sp.search(q=args, type="track", limit=1)
if not search:
await utils.answer(message, self.strings("404"))
try:
track = search["tracks"]["items"][0]
except Exception:
await utils.answer(message, self.strings("404"))
return
self.sp.add_to_queue(track["id"])
if not from_sq:
self.sp.next_track()
await message.delete()
await self._client.send_file(
message.peer_id,
await self.create_badge(
track["album"]["images"][0]["url"],
track["name"],
", ".join([_["name"] for _ in track["artists"]]),
),
caption=self.strings("playing_track").format(track["name"]),
)
@error_handler
@tokenized
@autodelete
async def sfindcmd(self, message: Message):
"""Find info about track"""
args = utils.get_args_raw(message)
if not args:
await utils.answer(message, self.strings("404"))
message = await utils.answer(message, self.strings("searching"))
try:
track = self.sp.track(args)
except Exception:
search = self.sp.search(q=args, type="track", limit=1)
if not search:
await utils.answer(message, self.strings("404"))
try:
track = search["tracks"]["items"][0]
assert track
except Exception:
await utils.answer(message, self.strings("404"))
return
await self._open_track(track, message)
async def _open_track(
self,
track: dict,
message: Message,
override_text: str = None,
):
name = track.get("name")
artists = [
artist["name"] for artist in track.get("artists", []) if "name" in artist
]
full_song_name = f"{name} - {', '.join(artists)}"
music = await self.musicdl.dl(full_song_name, only_document=True)
await self._client.send_file(
message.peer_id,
music,
caption=(
override_text
or (
(
f"🗽 {utils.escape_html(full_song_name)}{{is_flac}}"
if artists
else f"🗽 {utils.escape_html(track)}{{is_flac}}"
)
if track
else "{is_flac}"
)
).format(
is_flac=(
"\n😎 FLAC"
f" {self.strings('quality')}"
if getattr(music, "is_flac", False)
else ""
)
),
)
if message.out:
await message.delete()
@error_handler
@tokenized
async def sqcmd(self, message: Message):
"""🔎"""
await self.splaycmd(message, True)
@error_handler
@tokenized
@autodelete
async def sbackcmd(self, message: Message):
"""⏮"""
self.sp.previous_track()
await utils.answer(message, self.strings("back"))
@error_handler
@tokenized
@autodelete
async def sbegincmd(self, message: Message):
"""⏪"""
self.sp.seek_track(0)
await utils.answer(message, self.strings("restarted"))
@error_handler
@tokenized
@autodelete
async def slikecmd(self, message: Message):
"""❤️"""
cupl = self.sp.current_playback()
self.sp.current_user_saved_tracks_add([cupl["item"]["id"]])
await utils.answer(message, self.strings("liked"))
@error_handler
async def sauthcmd(self, message: Message):
"""First stage of auth"""
if self.get("acs_tkn", False) and not self.sp:
await utils.answer(message, self.strings("already_authed"))
else:
self.sp_auth.get_authorize_url()
await utils.answer(
message,
self.strings("auth").format(self.sp_auth.get_authorize_url()),
)
@error_handler
@autodelete
async def scodecmd(self, message: Message):
"""Second stage of auth"""
url = message.message.split(" ")[1]
code = self.sp_auth.parse_auth_response_url(url)
self.set("acs_tkn", self.sp_auth.get_access_token(code, True, False))
self.sp = spotipy.Spotify(auth=self.get("acs_tkn")["access_token"])
await utils.answer(message, self.strings("authed"))
@error_handler
@autodelete
async def unauthcmd(self, message: Message):
"""Deauth from Spotify API"""
self.set("acs_tkn", None)
del self.sp
await utils.answer(message, self.strings("deauth"))
@error_handler
@tokenized
@autodelete
async def sbiocmd(self, message: Message):
"""Toggle bio playback streaming"""
current = self.get("autobio", False)
new = not current
self.set("autobio", new)
await utils.answer(
message,
self.strings("autobio").format("enabled" if new else "disabled"),
)
if new:
self.autobio.start()
else:
self.autobio.stop()
@error_handler
@tokenized
@autodelete
async def stokrefreshcmd(self, message: Message):
"""Force refresh token"""
self.set(
"acs_tkn",
self.sp_auth.refresh_access_token(self.get("acs_tkn")["refresh_token"]),
)
self.set("NextRefresh", time.time() + 45 * 60)
self.sp = spotipy.Spotify(auth=self.get("acs_tkn")["access_token"])
await utils.answer(message, self.strings("authed"))
@error_handler
async def snowcmd(self, message: Message):
"""Show current playback badge"""
current_playback = self.sp.current_playback()
try:
device = (
current_playback["device"]["name"]
+ " "
+ current_playback["device"]["type"].lower()
)
except Exception:
device = None
volume = current_playback.get("device", {}).get("volume_percent", 0)
try:
playlist_id = current_playback["context"]["uri"].split(":")[-1]
playlist = self.sp.playlist(playlist_id)
playlist_name = playlist.get("name", None)
try:
playlist_owner = (
f'{playlist["owner"]["display_name"]}'
)
except KeyError:
playlist_owner = None
except Exception:
playlist_name = None
playlist_owner = None
try:
track = current_playback["item"]["name"]
track_id = current_playback["item"]["id"]
except Exception:
await utils.answer(message, self.strings("no_music"))
return
track_url = (
current_playback.get("item", {})
.get("external_urls", {})
.get("spotify", None)
)
artists = [
artist["name"]
for artist in current_playback.get("item", {}).get("artists", [])
if "name" in artist
]
try:
result = (
(
"🎶"
f" {utils.escape_html(track)} -"
f" {utils.escape_html(' '.join(artists))}"
if artists
else (
"🎶"
f" {utils.escape_html(track)}"
)
)
if track
else ""
)
icon = (
"💻"
if "computer" in str(device)
else "📱"
)
result += (
f"{{is_flac}}\n\n{icon} {self.strings('currently_on')}"
f" {device}"
if device
else ""
)
result += (
"\n🗂"
f" {self.strings('playlist')}: {playlist_name}'
if playlist_name and playlist_id
else ""
)
result += (
"\n👑"
f" {self.strings('owner')}: {playlist_owner}"
if playlist_owner
else ""
)
result += (
"\n\n🎵 Spotify'
)
except Exception:
result = self.strings("no_music")
message = await utils.answer(
message,
result.format(is_flac="")
+ "\n\n🕔 Loading audio"
" file...",
)
await self._open_track(current_playback["item"], message, result)
async def watcher(self, message: Message):
"""Watcher is used to update token"""
if not self.sp:
return
if self.get("NextRefresh", False):
ttc = self.get("NextRefresh", 0)
crnt = time.time()
if ttc < crnt:
self.set(
"acs_tkn",
self.sp_auth.refresh_access_token(
self.get("acs_tkn")["refresh_token"]
),
)
self.set("NextRefresh", time.time() + 45 * 60)
self.sp = spotipy.Spotify(auth=self.get("acs_tkn")["access_token"])
else:
self.set(
"acs_tkn",
self.sp_auth.refresh_access_token(self.get("acs_tkn")["refresh_token"]),
)
self.set("NextRefresh", time.time() + 45 * 60)
self.sp = spotipy.Spotify(auth=self.get("acs_tkn")["access_token"])
async def on_unload(self):
with contextlib.suppress(Exception):
self.autobio.stop()