@@ -16,11 +16,6 @@
# @ke_mods
# =======================================
#
# LICENSE: CC BY-ND 4.0 (Attribution-NoDerivatives 4.0 International)
# --------------------------------------
# https://creativecommons.org/licenses/by-nd/4.0/legalcode
# =======================================
#
# meta developer: @ke_mods
# requires: telethon spotipy pillow requests yt-dlp curl_cffi
# scope: ffmpeg
@@ -39,6 +34,7 @@ import traceback
import os
from types import FunctionType
import random
import requests
import spotipy
from PIL import Image , ImageDraw , ImageEnhance , ImageFilter , ImageFont , ImageOps
@@ -61,7 +57,9 @@ class Banners:
progress : int ,
track_cover : bytes ,
font ,
blur
blur ,
album_title : str = " " ,
meta_info : str = " " ,
) :
self . title = title
self . artists = " , " . join ( artists ) if isinstance ( artists , list ) else artists
@@ -70,6 +68,8 @@ class Banners:
self . track_cover = track_cover
self . font_url = font
self . blur_intensity = blur
self . album_title = album_title
self . meta_info = meta_info
def _get_font ( self , size , font_bytes ) :
return ImageFont . truetype ( io . BytesIO ( font_bytes ) , size )
@@ -237,6 +237,164 @@ class Banners:
by . name = " banner.png "
return by
# Ultra banner from YaMusic by @codrago_m
def ultra ( self ) - > io . BytesIO :
WIDTH , HEIGHT = 2560 , 1220
font_bytes = requests . get ( self . font_url ) . content
def get_font ( size ) :
try :
return ImageFont . truetype ( io . BytesIO ( font_bytes ) , size )
except Exception :
return ImageFont . load_default ( )
try :
original_cover = Image . open ( io . BytesIO ( self . track_cover ) ) . convert ( " RGBA " )
except Exception :
original_cover = Image . new ( " RGBA " , ( 1000 , 1000 ) , " black " )
dominant_color_img = original_cover . resize ( ( 1 , 1 ) , Image . Resampling . LANCZOS )
dominant_color = dominant_color_img . getpixel ( ( 0 , 0 ) )
r , g , b , a = dominant_color
brightness = ( r * 299 + g * 587 + b * 114 ) / 1000
if brightness < 60 :
r = min ( 255 , r + 60 )
g = min ( 255 , g + 60 )
b = min ( 255 , b + 60 )
dominant_color = ( r , g , b , 255 )
background = original_cover . copy ( )
bg_w , bg_h = background . size
target_ratio = WIDTH / HEIGHT
current_ratio = bg_w / bg_h
if current_ratio > target_ratio :
new_w = int ( bg_h * target_ratio )
offset = ( bg_w - new_w ) / / 2
background = background . crop ( ( offset , 0 , offset + new_w , bg_h ) )
else :
new_h = int ( bg_w / target_ratio )
offset = ( bg_h - new_h ) / / 2
background = background . crop ( ( 0 , offset , bg_w , offset + new_h ) )
background = background . resize ( ( WIDTH , HEIGHT ) , Image . Resampling . LANCZOS )
if self . blur_intensity > 0 :
background = background . filter ( ImageFilter . GaussianBlur ( radius = self . blur_intensity ) )
dark_overlay = Image . new ( " RGBA " , ( WIDTH , HEIGHT ) , ( 0 , 0 , 0 , 180 ) )
background = Image . alpha_composite ( background , dark_overlay )
cover_size = 500
cover_x = ( WIDTH - cover_size ) / / 2
cover_y = 160
glow_layer = Image . new ( " RGBA " , ( WIDTH , HEIGHT ) , ( 0 , 0 , 0 , 0 ) )
draw_glow = ImageDraw . Draw ( glow_layer )
glow_rect_size = 620
g_x = ( WIDTH - glow_rect_size ) / / 2
g_y = cover_y + ( cover_size - glow_rect_size ) / / 2
draw_glow . rounded_rectangle (
( g_x , g_y , g_x + glow_rect_size , g_y + glow_rect_size ) ,
radius = 50 ,
fill = dominant_color ,
)
glow_layer = glow_layer . filter ( ImageFilter . GaussianBlur ( radius = 60 ) )
glow_layer = ImageEnhance . Brightness ( glow_layer ) . enhance ( 1.4 )
glow_layer = ImageEnhance . Color ( glow_layer ) . enhance ( 1.2 )
background = Image . alpha_composite ( background , glow_layer )
cover_img = original_cover . resize ( ( cover_size , cover_size ) , Image . Resampling . LANCZOS )
mask = Image . new ( " L " , ( cover_size , cover_size ) , 0 )
draw_mask = ImageDraw . Draw ( mask )
draw_mask . rounded_rectangle ( ( 0 , 0 , cover_size , cover_size ) , radius = 45 , fill = 255 )
background . paste ( cover_img , ( cover_x , cover_y ) , mask )
draw = ImageDraw . Draw ( background )
center_x = WIDTH / / 2
current_y = cover_y + cover_size + 130
def draw_text_shadow ( text , pos , font , fill = " white " , anchor = " ms " ) :
x , y = pos
draw . text ( ( x + 2 , y + 2 ) , text , font = font , fill = ( 0 , 0 , 0 , 240 ) , anchor = anchor )
draw . text ( ( x , y ) , text , font = font , fill = fill , anchor = anchor )
font_title = get_font ( 100 )
title_text = self . title if len ( self . title ) < = 30 else self . title [ : 30 ] + " ... "
draw_text_shadow ( title_text . upper ( ) , ( center_x , current_y ) , font_title )
current_y + = 85
font_artist = get_font ( 65 )
artist_text = self . artists if len ( self . artists ) < = 45 else self . artists [ : 45 ] + " ... "
draw_text_shadow ( artist_text . upper ( ) , ( center_x , current_y ) , font_artist , fill = ( 255 , 255 , 255 , 240 ) )
current_y + = 80
bar_width = 800
font_time = get_font ( 40 )
bar_start_x = center_x - ( bar_width / / 2 )
bar_end_x = center_x + ( bar_width / / 2 )
bar_y = current_y
total_time_str = f " { self . duration / / 1000 / / 60 : 02d } : { ( self . duration / / 1000 ) % 60 : 02d } "
cur_time_str = f " { self . progress / / 1000 / / 60 : 02d } : { ( self . progress / / 1000 ) % 60 : 02d } "
draw_text_shadow ( cur_time_str , ( bar_start_x - 30 , bar_y ) , font_time , anchor = " rm " )
draw_text_shadow ( total_time_str , ( bar_end_x + 30 , bar_y ) , font_time , anchor = " lm " )
old_state = random . getstate ( )
random . seed ( self . title + str ( self . duration ) )
num_bars = 65
bar_spacing = bar_width / num_bars
bar_w = max ( 4 , int ( bar_spacing * 0.5 ) )
max_h , min_h = 50 , 6
active_bars = int ( num_bars * ( self . progress / self . duration ) ) if self . duration > 0 else 0
for i in range ( num_bars ) :
base_h = random . randint ( min_h , max_h )
edge_factor = 1.0 - abs ( ( i - num_bars / 2 ) / ( num_bars / 2 ) )
h = max ( min_h , int ( base_h * 0.4 + max_h * edge_factor * 0.6 ) )
x_center = bar_start_x + i * bar_spacing
color = ( 255 , 255 , 255 , 255 ) if i < active_bars else ( 80 , 80 , 80 , 100 )
draw . rounded_rectangle (
( x_center - bar_w / 2 , bar_y - h / 2 , x_center + bar_w / 2 , bar_y + h / 2 ) ,
radius = int ( bar_w / 2 ) ,
fill = color ,
)
random . setstate ( old_state )
current_y + = 80
if self . album_title :
font_album = get_font ( 50 )
album_text = self . album_title if len ( self . album_title ) < = 50 else self . album_title [ : 50 ] + " ... "
draw_text_shadow ( album_text , ( center_x , current_y ) , font_album , fill = ( 230 , 230 , 230 ) )
current_y + = 60
if self . meta_info :
font_meta = get_font ( 40 )
draw_text_shadow ( self . meta_info , ( center_x , current_y ) , font_meta , fill = ( 210 , 210 , 210 ) )
by = io . BytesIO ( )
background . save ( by , format = " PNG " )
by . seek ( 0 )
by . name = " banner.png "
return by
@loader.tds
class SpotifyMod ( loader . Module ) :
""" Card with the currently playing track on Spotify. """
@@ -349,9 +507,6 @@ class SpotifyMod(loader.Module):
" <tg-emoji emoji-id=5778527486270770928>❌</tg-emoji> <b>Invalid track number. "
" Please search first or provide a valid number from the list.</b> "
) ,
" device_list " : (
" <tg-emoji emoji-id=5956561916573782596>📄</tg-emoji> <b>Available devices:</b> \n {} "
) ,
" no_devices_found " : (
" <tg-emoji emoji-id=5778527486270770928>❌</tg-emoji> <b>No devices found.</b> "
) ,
@@ -359,10 +514,6 @@ class SpotifyMod(loader.Module):
" <tg-emoji emoji-id=5776375003280838798>✅</tg-emoji> <b>Playback transferred to "
" {} .</b> "
) ,
" invalid_device_id " : (
" <tg-emoji emoji-id=5778527486270770928>❌</tg-emoji> <b>Invalid device ID. "
" Use</b> <code>.sdevice</code> <b>to see available devices.</b> "
) ,
" autobio " : (
" <tg-emoji emoji-id=6319076999105087378>🎧</tg-emoji> <b>Spotify autobio {} </b> "
) ,
@@ -379,6 +530,7 @@ class SpotifyMod(loader.Module):
" playlist_created " : " <tg-emoji emoji-id=5776375003280838798>✅</tg-emoji> <b>Playlist {} created.</b> " ,
" playlist_deleted " : " <tg-emoji emoji-id=5776375003280838798>✅</tg-emoji> <b>Playlist {} deleted.</b> " ,
" no_playlist_name " : " <tg-emoji emoji-id=5778527486270770928>❌</tg-emoji> <b>Please specify a playlist name.</b> " ,
" device_select " : " <tg-emoji emoji-id=5956561916573782596>📄</tg-emoji> <b>Select playback device:</b> " ,
}
strings_ru = {
@@ -478,9 +630,6 @@ class SpotifyMod(loader.Module):
" <tg-emoji emoji-id=5778527486270770928>❌</tg-emoji> <b>Некорректный номер трека. "
" Сначала выполните поиск или укажите правильный номер из списка.</b> "
) ,
" device_list " : (
" <tg-emoji emoji-id=5956561916573782596>📄</tg-emoji> <b>Доступные устройства:</b> \n {} "
) ,
" no_devices_found " : (
" <tg-emoji emoji-id=5778527486270770928>❌</tg-emoji> <b>Устройства не найдены.</b> "
) ,
@@ -488,10 +637,6 @@ class SpotifyMod(loader.Module):
" <tg-emoji emoji-id=5776375003280838798>✅</tg-emoji> <b>Воспроизведение переключено на "
" {} .</b> "
) ,
" invalid_device_id " : (
" <tg-emoji emoji-id=5778527486270770928>❌</tg-emoji> <b>Некорректный ID устройства. "
" Используйте</b> <code>.sdevice</code> <b>, чтобы увидеть доступные устройства.</b> "
) ,
" autobio " : (
" <tg-emoji emoji-id=6319076999105087378>🎧</tg-emoji> <b>Обновление био "
" включено {} </b> "
@@ -509,6 +654,7 @@ class SpotifyMod(loader.Module):
" playlist_created " : " <tg-emoji emoji-id=5776375003280838798>✅</tg-emoji> <b>Плейлист {} создан.</b> " ,
" playlist_deleted " : " <tg-emoji emoji-id=5776375003280838798>✅</tg-emoji> <b>Плейлист {} удален.</b> " ,
" no_playlist_name " : " <tg-emoji emoji-id=5778527486270770928>❌</tg-emoji> <b>Пожалуйста, укажите название плейлиста.</b> " ,
" device_select " : " <tg-emoji emoji-id=5956561916573782596>📄</tg-emoji> <b>Выберите устройство для воспроизведения:</b> " ,
}
def __init__ ( self ) :
@@ -569,7 +715,7 @@ class SpotifyMod(loader.Module):
" banner_version " ,
" horizontal " ,
lambda : " Banner version " ,
validator = loader . validators . Choice ( [ " horizontal " , " vertical " ] ) ,
validator = loader . validators . Choice ( [ " horizontal " , " vertical " , " ultra " ]) ,
) ,
loader . ConfigValue (
" blur_intensity " ,
@@ -589,12 +735,11 @@ class SpotifyMod(loader.Module):
try :
self . sp = spotipy . Spotify ( auth = access_token )
return True
except Exception :
self . sp = None
return False
return True
async def client_ready ( self , client , db ) :
self . font_ready = asyncio . Event ( )
@@ -628,8 +773,6 @@ class SpotifyMod(loader.Module):
return await func ( * args , * * kwargs )
except Exception as e :
error_msg = str ( e )
logger . error ( f " Error in { func . __name__ } : { error_msg } " )
if " NO_ACTIVE_DEVICE " in error_msg :
user_error = " No active device "
elif " PREMIUM_REQUIRED " in error_msg :
@@ -697,8 +840,8 @@ class SpotifyMod(loader.Module):
await asyncio . sleep ( getattr ( e , " seconds " , 30 ) + 1 )
except asyncio . CancelledError :
break
except Exception as e :
logger . exception ( " autobio error: %s " , e )
except Exception :
pass
await asyncio . sleep ( self . config . get ( " BIO_UPDATE_DELAY " , 30 ) )
@@ -754,20 +897,17 @@ class SpotifyMod(loader.Module):
reply_to_id = None ,
) - > bool :
dl_dir = os . path . join ( os . getcwd ( ) , " spotifymod " )
if not os . path . exists ( dl_dir ) :
os . makedirs ( dl_dir , exist_ok = True )
os . makedirs ( dl_dir , exist_ok = True )
for f in os . listdir ( dl_dir ) :
try :
with contextlib . suppress ( Exception ) :
os . remove ( os . path . join ( dl_dir , f ) )
except Exception :
pass
success = False
if caption is None :
safe_track = utils . escape_html ( track_name or " Unknown " )
safe_artists = utils . escape_html ( artists or " Unknown Artist " )
caption = self . strings ( " download_success " ) . format ( safe_track , safe_artists )
caption = self . strings [ " download_success " ] . format (
utils . escape_html ( track_name or " Unknown " ) ,
utils . escape_html ( artists or " Unknown Artist " ) ,
)
async def send_text ( text : str ) - > bool :
if target is None :
@@ -789,91 +929,60 @@ class SpotifyMod(loader.Module):
if target is None :
return False
if isinstance ( target , int ) :
await self . _client . send_file (
target ,
file_path ,
caption = caption ,
reply_to = reply_to_id ,
)
await self . _client . send_file ( target , file_path , caption = caption , reply_to = reply_to_id )
return True
try :
await utils . answer ( target , caption , file = file_path )
return True
except Exception :
except Exception as e :
logger . error ( " SpotifyMod send_file fallback: %s " , e , exc_info = True )
chat_id = self . _get_chat_id ( target )
if chat_id is None :
return False
await self . _client . send_file (
chat_id ,
file_path ,
caption = caption ,
reply_to = reply_to_id ,
)
await self . _client . send_file ( chat_id , file_path , caption = caption , reply_to = reply_to_id )
return True
success = False
try :
squery = query . replace ( ' " ' , ' ' ) . replace ( " ' " , " " )
cookies = self . config [ " cookies_path " ]
if cookies :
cmd = (
f ' { self . config [ " ytdlp_path " ] } -x --impersonate= " " --cookies { cookies } --audio-format mp3 --add-metadata '
f ' --audio-quality 0 -o " { dl_dir } /%(title)s [%(id)s].%(ext)s " '
f ' " ytsearch1: { squery } " '
)
else :
cmd = (
f ' { self . config [ " ytdlp_path " ] } -x --impersonate= " " --audio-format mp3 --add-metadata '
f ' --audio-quality 0 -o " { dl_dir } /%(title)s [%(id)s].%(ext)s " '
f ' " ytsearch1: { squery } " '
)
ytdlp_flags = ' -x --audio-format mp3 --audio-quality 0 --add-metadata --format " bestaudio/best " --no-playlist '
cookies_flag = f " --cookies { cookies } " if cookies else " "
cmd = (
f ' { self . config [ " ytdlp_path " ] } { ytdlp_flags } { cookies_flag } '
f ' -o " { dl_dir } /%(title)s [%(id)s].%(ext)s " '
f ' " ytsearch1: { squery } " '
)
proc = await asyncio . create_subprocess_shell (
cmd ,
stdout = asyncio . subprocess . PIPE ,
stderr = asyncio . subprocess . PIPE
stderr = asyncio . subprocess . PIPE ,
)
_ , stderr = await proc . communicate ( )
if proc . returncode and log_context :
err_text = stderr . decode ( errors = " ignore " ) . strip ( ) if stderr else " "
err_text = err_text [ - 400 : ] if err_text else " yt-dlp failed "
logger . error ( " Search download failed ( %s ): %s " , log_context , err_text )
if proc . returncode :
err_text = stderr . decode ( errors = " ignore " ) . strip ( ) if std err else " yt-dlp failed "
logger . error ( " SpotifyMod: yt-dlp code %s for %r : %s " , proc . returncode , log_context or query , err_text [ - 400 : ] )
files = [ f for f in os . listdir ( dl_dir ) if f . endswith ( " .mp3 " ) ]
if files :
first = files [ 0 ]
target_file = os . path . join ( dl_dir , first )
success = await send_file ( target_file )
success = await send_file ( os . path . join ( dl_dir , files [ 0 ] ) )
if not success :
if log_context :
logger . error (
" Search download send failed ( %s ). target= %s chat_id= %s " ,
log_context ,
type ( target ) . __name__ ,
self . _get_chat_id ( target ) ,
)
await send_text ( self . strings ( " dl_err " ) )
logger . error ( " SpotifyMod: failed to send %r (target= %s ) " , log_context or query , type ( target ) . __name__ )
await send_text ( self . strings [ " dl_err " ] )
else :
if log_context :
logger . error ( " Search download produced no files ( %s ) " , log_context )
await send_text ( self . strings ( " snowt_failed " ) )
logger . error ( " SpotifyMod: yt-dlp produced no mp3 for %r " , log_context or query )
await send_text ( self . strings [ " snowt_failed " ] )
except Exception as e :
if log_context :
logger . exception ( " Search download error ( %s ) " , log_context )
else :
logger . error ( e )
await send_text ( self . strings ( " dl_err " ) )
logger . error ( " Download track error ( %s ): %s " , log_context or " no context " , e , exc_info = True )
await send_text ( self . strings [ " dl_err " ] )
finally :
if os . path . ex ists ( dl_dir ) :
for f in os . listdir ( dl_dir ) :
try :
os . remove ( os . path . join ( dl_dir , f ) )
except Exception :
pass
for f in os . l istdir ( dl_dir ) :
with contextlib . suppress ( Exception ) :
os . remove ( os . path . join ( dl_dir , f ) )
return success
@@ -937,7 +1046,7 @@ class SpotifyMod(loader.Module):
await call . answer ( )
with contextlib . suppress ( Exception ) :
await call . edit ( self . strings ( " downloading_track " ) . lstrip ( ) , reply_markup = None )
await call . edit ( self . strings [ " downloading_track " ] . lstrip ( ) , reply_markup = None )
target_message = getattr ( call , " message " , None )
if reply_to_id is None :
@@ -951,9 +1060,9 @@ class SpotifyMod(loader.Module):
chat_id = self . _get_chat_id ( call )
if chat_id is None and target_message is None :
logger . error ( " Inline download missing chat_id ( %s - %s ) " , track_name , artists )
pass
with contextlib . suppress ( Exception ) :
await call . edit ( self . strings ( " dl_err " ) , reply_markup = None )
await call . edit ( self . strings [ " dl_err " ] , reply_markup = None )
return
target = chat_id if chat_id is not None else target_message
@@ -972,14 +1081,14 @@ class SpotifyMod(loader.Module):
await call . delete ( )
else :
with contextlib . suppress ( Exception ) :
await call . edit ( self . strings ( " dl_err " ) , reply_markup = None )
await call . edit ( self . strings [ " dl_err " ] , reply_markup = None )
async def _inline_search_tracks ( self , query ) :
if not self . get ( " acs_tkn " , False ) or not self . sp :
return {
" title " : " Auth required " ,
" description " : " Run .sauth " ,
" message " : self . strings ( " need_auth " ) ,
" message " : self . strings [ " need_auth " ] ,
}
query_text = ( query . args or " " ) . strip ( )
@@ -987,7 +1096,7 @@ class SpotifyMod(loader.Module):
return {
" title " : " No query " ,
" description " : " Provide search query " ,
" message " : self . strings ( " no_search_query " ) ,
" message " : self . strings [ " no_search_query " ] ,
}
try :
@@ -1001,7 +1110,7 @@ class SpotifyMod(loader.Module):
return {
" title " : " Search error " ,
" description " : " Try again " ,
" message " : self . strings ( " err " ) . format (
" message " : self . strings [ " err " ] . format (
utils . escape_html ( str ( e ) [ : 50 ] )
) ,
}
@@ -1010,7 +1119,7 @@ class SpotifyMod(loader.Module):
return {
" title " : " No results " ,
" description " : self . _short_text ( query_text , limit = 60 ) ,
" message " : self . strings ( " no_tracks_found " ) . format (
" message " : self . strings [ " no_tracks_found " ] . format (
utils . escape_html ( query_text )
) ,
}
@@ -1029,7 +1138,7 @@ class SpotifyMod(loader.Module):
{
" title " : self . _short_text ( track_name , limit = 60 ) ,
" description " : self . _short_text ( artists , limit = 60 ) if artists else " " ,
" message " : f " { self . strings ( ' downloading_track' ) . lstrip ( ) } \n <i>spdl_{ store_id } _{ i } </i>" ,
" message " : f ' { self . strings [ " downloading_track" ] . lstrip ( ) } \n <i>spdl_{ store_id } _{ i } </i>' ,
" thumb " : thumb ,
}
)
@@ -1056,22 +1165,22 @@ class SpotifyMod(loader.Module):
""" | .spla - ➕ Add current track to playlist (use number from .splaylists | .spls) """
args = utils . get_args_raw ( message )
if not args or not args . isdigit ( ) :
await utils . answer ( message , self . strings ( " invalid_playlist_index " ) )
await utils . answer ( message , self . strings [ " invalid_playlist_index " ] )
return
index = int ( args ) - 1
playlists = self . get ( " last_playlists " , [ ] )
if not playlists :
await utils . answer ( message , self . strings ( " no_cached_playlists " ) )
await utils . answer ( message , self . strings [ " no_cached_playlists " ] )
return
if index < 0 or index > = len ( playlists ) :
await utils . answer ( message , self . strings ( " invalid_playlist_index " ) )
await utils . answer ( message , self . strings [ " invalid_playlist_index " ] )
return
current = self . sp . current_playback ( )
if not current or not current . get ( " item " ) :
await utils . answer ( message , self . strings ( " no_music " ) )
await utils . answer ( message , self . strings [ " no_music " ] )
return
track_uri = current [ " item " ] [ " uri " ]
@@ -1083,7 +1192,7 @@ class SpotifyMod(loader.Module):
playlist_name = playlists [ index ] [ " name " ]
self . sp . playlist_add_items ( playlist_id , [ track_uri ] )
await utils . answer ( message , self . strings ( " added_to_playlist " ) . format ( utils . escape_html ( full_track_name ) , utils . escape_html ( playlist_name ) ) )
await utils . answer ( message , self . strings [ " added_to_playlist " ] . format ( utils . escape_html ( full_track_name ) , utils . escape_html ( playlist_name ) ) )
@error_handler
@tokenized
@@ -1095,22 +1204,22 @@ class SpotifyMod(loader.Module):
""" | .splr - ➖ Remove current track from playlist (use number from .splaylists | .spls) """
args = utils . get_args_raw ( message )
if not args or not args . isdigit ( ) :
await utils . answer ( message , self . strings ( " invalid_playlist_index " ) )
await utils . answer ( message , self . strings [ " invalid_playlist_index " ] )
return
index = int ( args ) - 1
playlists = self . get ( " last_playlists " , [ ] )
if not playlists :
await utils . answer ( message , self . strings ( " no_cached_playlists " ) )
await utils . answer ( message , self . strings [ " no_cached_playlists " ] )
return
if index < 0 or index > = len ( playlists ) :
await utils . answer ( message , self . strings ( " invalid_playlist_index " ) )
await utils . answer ( message , self . strings [ " invalid_playlist_index " ] )
return
current = self . sp . current_playback ( )
if not current or not current . get ( " item " ) :
await utils . answer ( message , self . strings ( " no_music " ) )
await utils . answer ( message , self . strings [ " no_music " ] )
return
track_uri = current [ " item " ] [ " uri " ]
@@ -1122,7 +1231,7 @@ class SpotifyMod(loader.Module):
playlist_name = playlists [ index ] [ " name " ]
self . sp . playlist_remove_all_occurrences_of_items ( playlist_id , [ track_uri ] )
await utils . answer ( message , self . strings ( " removed_from_playlist " ) . format ( utils . escape_html ( full_track_name ) , utils . escape_html ( playlist_name ) ) )
await utils . answer ( message , self . strings [ " removed_from_playlist " ] . format ( utils . escape_html ( full_track_name ) , utils . escape_html ( playlist_name ) ) )
@error_handler
@tokenized
@@ -1134,12 +1243,12 @@ class SpotifyMod(loader.Module):
""" | .splc - 🆕 Create a new playlist """
name = utils . get_args_raw ( message )
if not name :
await utils . answer ( message , self . strings ( " no_playlist_name " ) )
await utils . answer ( message , self . strings [ " no_playlist_name " ] )
return
user_id = self . sp . me ( ) [ " id " ]
self . sp . user_playlist_create ( user_id , name )
await utils . answer ( message , self . strings ( " playlist_created " ) . format ( utils . escape_html ( name ) ) )
await utils . answer ( message , self . strings [ " playlist_created " ] . format ( utils . escape_html ( name ) ) )
@error_handler
@tokenized
@@ -1151,24 +1260,24 @@ class SpotifyMod(loader.Module):
""" | .spld - 🗑 Delete playlist (use number from .splaylists | .spls) """
args = utils . get_args_raw ( message )
if not args or not args . isdigit ( ) :
await utils . answer ( message , self . strings ( " invalid_playlist_index " ) )
await utils . answer ( message , self . strings [ " invalid_playlist_index " ] )
return
index = int ( args ) - 1
playlists = self . get ( " last_playlists " , [ ] )
if not playlists :
await utils . answer ( message , self . strings ( " no_cached_playlists " ) )
await utils . answer ( message , self . strings [ " no_cached_playlists " ] )
return
if index < 0 or index > = len ( playlists ) :
await utils . answer ( message , self . strings ( " invalid_playlist_index " ) )
await utils . answer ( message , self . strings [ " invalid_playlist_index " ] )
return
playlist_id = playlists [ index ] [ " id " ]
playlist_name = playlists [ index ] [ " name " ]
self . sp . current_user_unfollow_playlist ( playlist_id )
await utils . answer ( message , self . strings ( " playlist_deleted " ) . format ( utils . escape_html ( playlist_name ) ) )
await utils . answer ( message , self . strings [ " playlist_deleted " ] . format ( utils . escape_html ( playlist_name ) ) )
@error_handler
@tokenized
@@ -1196,9 +1305,9 @@ class SpotifyMod(loader.Module):
playlist_list_text + = f " <b> { i + 1 } .</b> <a href= ' { url } ' > { name } </a> ( { count } tracks) \n "
if playlist_list_text == " " :
await utils . answer ( message , self . strings ( " no_playlists " ) )
await utils . answer ( message , self . strings [ " no_playlists " ] )
else :
await utils . answer ( message , self . strings ( " playlists_list " ) . format ( playlist_list_text ) )
await utils . answer ( message , self . strings [ " playlists_list " ] . format ( playlist_list_text ) )
@error_handler
@tokenized
@@ -1208,7 +1317,7 @@ class SpotifyMod(loader.Module):
async def sbiocmd ( self , message ) :
""" - ℹ ️ Toggle streaming playback in bio """
if not getattr ( self , " sp " , None ) :
await utils . answer ( message , self . strings ( " need_auth " ) )
await utils . answer ( message , self . strings [ " need_auth " ] )
return
state = not self . get ( " autobio " , False )
@@ -1227,7 +1336,7 @@ class SpotifyMod(loader.Module):
await utils . answer (
message ,
self . strings ( " autobio " ) . format ( " on " if state else " off " ) ,
self . strings [ " autobio " ] . format ( " on " if state else " off " ) ,
)
@error_handler
@@ -1240,64 +1349,63 @@ class SpotifyMod(loader.Module):
""" | .sv - 🔊 Change playback volume. .svolume | .sv <0-100> """
args = utils . get_args_raw ( message )
if args == " " :
await utils . answer ( message , self . strings ( " no_volume_arg " ) )
await utils . answer ( message , self . strings [ " no_volume_arg " ] )
else :
try :
volume_percent = int ( args )
if 0 < = volume_percent < = 100 :
self . sp . volume ( volume_percent )
await utils . answer ( message , self . strings ( " volume_changed " ) . format ( volume_percent ) )
await utils . answer ( message , self . strings [ " volume_changed " ] . format ( volume_percent ) )
else :
await utils . answer ( message , self . strings ( " volume_invalid " ) )
await utils . answer ( message , self . strings [ " volume_invalid " ] )
except ValueError :
await utils . answer ( message , self . strings ( " volume_invalid " ) )
await utils . answer ( message , self . strings [ " volume_invalid " ] )
@error_handler
@tokenized
@loader.command (
ru_doc = (
" | .sd - 🎵 Выбрать устройство для воспроизведения. Например: .sdevice <ID устройства>или .sdevice | .sd для вывода списка устройств "
) ,
ru_doc = " | .sd - 🎵 Выбрать устройство для воспроизведения " ,
alias = " sd "
)
async def sdevicecmd ( self , message : Message ) :
""" | .sd - 🎵 Set preferred playback device. Usage: .sdevice <device_id> or .sdevice | .sd to list devices """
args = utils . get_args_raw ( message )
""" | .sd - 🎵 Select playback device """
devices = self . sp . devices ( ) [ " devices " ]
if not devices :
await utils . answer ( message , self . strings [ " no_devices_found " ] )
return
if args == " " :
if not devices :
await utils . answer ( message , self . strings ( " no_devices_found " ) )
else :
device_list_text = " "
for i , device in enumerate ( devices ) :
is_active = " (active) " if device [ " is_active " ] else " "
device_list_text + = (
f " <b> { i + 1 } .</b> { device [ ' name ' ] } "
f " ( { device [ ' type ' ] } ) { is_active } \n "
)
await utils . answer ( message , self . strings ( " device_list " ) . format ( device_list_text . strip ( ) ) )
else :
device_id = None
async def _switch ( call , device_id : str , device_name : str ) :
with contextlib . suppress ( Exception ) :
await call . answer ( )
try :
device_number = int ( args )
if 0 < device_number < = len ( devices ) :
device_id = devices [ device_number - 1 ] [ " id " ]
device_name = devices [ device_number - 1 ] [ " name " ]
els e:
await utils . answer ( message , self . strings ( " invalid_device_id " ) )
return
except ValueError :
found_device = next ( ( d for d in devices if d [ " id " ] == args . strip ( ) ) , None )
if found_device :
device_id = found_device [ " id " ]
device_name = found_device [ " name " ]
else :
await utils . answer ( message , self . strings ( " invalid_device_id " ) )
return
self . sp . transfer_playback ( device_id = device_id )
with contextlib . suppress ( Exception ) :
await call . edit (
self . strings [ " device_changed " ] . format ( utils . escape_html ( device_name ) ) ,
reply_markup = Non e ,
)
except Exception as e :
with contextlib . suppress ( Exception ) :
await call . edit (
self . strings [ " err " ] . format ( utils . escape_html ( str ( e ) [ : 80 ] ) ) ,
reply_markup = None ,
)
self . sp . transfer_playback ( device_i d = device_id )
await utils . answer ( message , self . strings ( " device_changed " ) . format ( device_name ) )
keyboar d = [ ]
for device in devices :
active_mark = " > " if device [ " is_active " ] else " "
label = f " { active_mark } { device [ ' name ' ] } ( { device [ ' type ' ] . lower ( ) } ) "
keyboard . append ( [ {
" text " : label ,
" callback " : _switch ,
" args " : ( device [ " id " ] , device [ " name " ] ) ,
} ] )
await self . inline . form (
self . strings [ " device_select " ] ,
message = message ,
reply_markup = keyboard ,
)
@error_handler
@tokenized
@@ -1307,7 +1415,7 @@ class SpotifyMod(loader.Module):
async def srepeatcmd ( self , message : Message ) :
""" - 💫 Repeat """
self . sp . repeat ( " track " )
await utils . answer ( message , self . strings ( " on-repeat " ) )
await utils . answer ( message , self . strings [ " on-repeat " ] )
@error_handler
@tokenized
@@ -1317,7 +1425,7 @@ class SpotifyMod(loader.Module):
async def sderepeatcmd ( self , message : Message ) :
""" - ✋ Stop repeat """
self . sp . repeat ( " context " )
await utils . answer ( message , self . strings ( " off-repeat " ) )
await utils . answer ( message , self . strings [ " off-repeat " ] )
@error_handler
@tokenized
@@ -1327,7 +1435,7 @@ class SpotifyMod(loader.Module):
async def snextcmd ( self , message : Message ) :
""" - 👉 Next track """
self . sp . next_track ( )
await utils . answer ( message , self . strings ( " skipped " ) )
await utils . answer ( message , self . strings [ " skipped " ] )
@error_handler
@tokenized
@@ -1337,7 +1445,7 @@ class SpotifyMod(loader.Module):
async def sresumecmd ( self , message : Message ) :
""" - 🤚 Resume """
self . sp . start_playback ( )
await utils . answer ( message , self . strings ( " playing " ) )
await utils . answer ( message , self . strings [ " playing " ] )
@error_handler
@tokenized
@@ -1347,7 +1455,7 @@ class SpotifyMod(loader.Module):
async def spausecmd ( self , message : Message ) :
""" - 🤚 Pause """
self . sp . pause_playback ( )
await utils . answer ( message , self . strings ( " paused " ) )
await utils . answer ( message , self . strings [ " paused " ] )
@error_handler
@tokenized
@@ -1357,7 +1465,7 @@ class SpotifyMod(loader.Module):
async def sbackcmd ( self , message : Message ) :
""" - ⏮ Previous track """
self . sp . previous_track ( )
await utils . answer ( message , self . strings ( " back " ) )
await utils . answer ( message , self . strings [ " back " ] )
@error_handler
@tokenized
@@ -1367,7 +1475,7 @@ class SpotifyMod(loader.Module):
async def sbegincmd ( self , message : Message ) :
""" - ⏪ Restart track """
self . sp . seek_track ( 0 )
await utils . answer ( message , self . strings ( " restarted " ) )
await utils . answer ( message , self . strings [ " restarted " ] )
@error_handler
@tokenized
@@ -1378,7 +1486,7 @@ class SpotifyMod(loader.Module):
""" - ❤️ Like current track """
cupl = self . sp . current_playback ( )
self . sp . current_user_saved_tracks_add ( [ cupl [ " item " ] [ " id " ] ] )
await utils . answer ( message , self . strings ( " liked " ) )
await utils . answer ( message , self . strings [ " liked " ] )
@error_handler
@tokenized
@@ -1389,7 +1497,7 @@ class SpotifyMod(loader.Module):
""" - 💔 Unlike current track """
cupl = self . sp . current_playback ( )
self . sp . current_user_saved_tracks_delete ( [ cupl [ " item " ] [ " id " ] ] )
await utils . answer ( message , self . strings ( " unlike " ) )
await utils . answer ( message , self . strings [ " unlike " ] )
@error_handler
@loader.command (
@@ -1398,12 +1506,12 @@ class SpotifyMod(loader.Module):
async def sauthcmd ( self , message : Message ) :
""" - Get authorization link """
if self . get ( " acs_tkn " , False ) and not self . sp :
await utils . answer ( message , self . strings ( " already_authed " ) )
await utils . answer ( message , self . strings [ " already_authed " ] )
else :
self . sp_auth . get_authorize_url ( )
await utils . answer (
message ,
self . strings ( " auth " ) . format ( self . sp_auth . get_authorize_url ( ) ) ,
self . strings [ " auth " ] . format ( self . sp_auth . get_authorize_url ( ) ) ,
)
@error_handler
@@ -1416,7 +1524,7 @@ class SpotifyMod(loader.Module):
code = self . sp_auth . parse_auth_response_url ( url )
self . set ( " acs_tkn " , self . sp_auth . get_access_token ( code , True , False ) )
self . _init_spotify_client ( )
await utils . answer ( message , self . strings ( " authed " ) )
await utils . answer ( message , self . strings [ " authed " ] )
@error_handler
@loader.command (
@@ -1426,7 +1534,7 @@ class SpotifyMod(loader.Module):
""" - Log out of account """
self . set ( " acs_tkn " , None )
self . sp = None
await utils . answer ( message , self . strings ( " deauth " ) )
await utils . answer ( message , self . strings [ " deauth " ] )
@error_handler
@tokenized
@@ -1442,7 +1550,7 @@ class SpotifyMod(loader.Module):
)
self . set ( " NextRefresh " , time . time ( ) + 45 * 60 )
self . _init_spotify_client ( )
await utils . answer ( message , self . strings ( " authed " ) )
await utils . answer ( message , self . strings [ " authed " ] )
@error_handler
@tokenized
@@ -1454,7 +1562,7 @@ class SpotifyMod(loader.Module):
""" | .sn - 🎧 View current track card. """
current_playback = self . sp . current_playback ( )
if not current_playback or not current_playback . get ( " is_playing " , False ) :
await utils . answer ( message , self . strings ( " no_music " ) )
await utils . answer ( message , self . strings [ " no_music " ] )
return
track = current_playback [ " item " ] [ " name " ]
@@ -1515,7 +1623,7 @@ class SpotifyMod(loader.Module):
if self . config [ " show_banner " ] :
cover_url = current_playback [ " item " ] [ " album " ] [ " images " ] [ 0 ] [ " url " ]
tmp_msg = await utils . answer ( message , text + self . strings ( " uploading_banner " ) )
tmp_msg = await utils . answer ( message , text + self . strings [ " uploading_banner " ] )
banners = Banners (
title = track ,
@@ -1525,9 +1633,14 @@ class SpotifyMod(loader.Module):
track_cover = requests . get ( cover_url ) . content ,
font = self . config [ " font " ] ,
blur = self . config [ " blur_intensity " ] ,
album_title = album_name ,
meta_info = " Spotify " ,
)
if self . config [ " banner_version " ] == " vertical " :
version = self . config [ " banner_version " ]
if version == " ultra " :
file = banners . ultra ( )
elif version == " vertical " :
file = banners . vertical ( )
else :
file = banners . horizontal ( )
@@ -1546,7 +1659,7 @@ class SpotifyMod(loader.Module):
""" | .snt - 🎧 Download current track. """
current_playback = self . sp . current_playback ( )
if not current_playback or not current_playback . get ( " is_playing " , False ) :
await utils . answer ( message , self . strings ( " no_music " ) )
await utils . answer ( message , self . strings [ " no_music " ] )
return
track = current_playback [ " item " ] [ " name " ]
@@ -1603,9 +1716,16 @@ class SpotifyMod(loader.Module):
text = self . config [ " custom_text " ] . format ( * * data )
msg = await utils . answer ( message , text + self . strings ( " downloading_track " ) )
await self . _download_track ( msg , f " { artists } { track } " , caption = text )
msg = await utils . answer ( message , text + self . strings [ " downloading_track " ] )
await self . _download_track (
msg ,
f " { artists } { track } " ,
caption = text ,
track_name = track ,
artists = artists ,
log_context = f " { track } - { artists } " ,
)
@error_handler
@tokenized
@@ -1617,7 +1737,7 @@ class SpotifyMod(loader.Module):
""" | .sq - 🔍 Search for tracks. """
args = utils . get_args_raw ( message )
if not args :
await utils . answer ( message , self . strings ( " no_search_query " ) )
await utils . answer ( message , self . strings [ " no_search_query " ] )
return
search_results = self . get ( " last_search_results " , [ ] )
@@ -1630,7 +1750,7 @@ class SpotifyMod(loader.Module):
if is_selection :
track_number = int ( args )
msg = await utils . answer ( message , self . strings ( " downloading_track " ) )
msg = await utils . answer ( message , self . strings [ " downloading_track " ] )
track_info = search_results [ track_number - 1 ]
track_name , artists = self . _track_info ( track_info )
reply_to_id = self . _reply_id ( message )
@@ -1659,7 +1779,7 @@ class SpotifyMod(loader.Module):
)
if not results or not results [ " tracks " ] [ " items " ] :
await utils . answer ( message , self . strings ( " no_tracks_found " ) . format ( args ) )
await utils . answer ( message , self . strings [ " no_tracks_found " ] . format ( args ) )
return
tracks = results [ " tracks " ] [ " items " ]
@@ -1668,7 +1788,7 @@ class SpotifyMod(loader.Module):
reply_to_id = self . _reply_id ( message )
await self . inline . form (
self . strings ( " search_results_inline " ) . format (
self . strings [ " search_results_inline " ] . format (
count = len ( tracks ) ,
query = utils . escape_html ( args ) ,
) ,
@@ -1717,17 +1837,22 @@ class SpotifyMod(loader.Module):
next_refresh = self . get ( " NextRefresh " )
if not next_refresh or next_refresh < time . time ( ) :
acs_tkn = self . get ( " acs_tkn " )
if not acs_tkn or not acs_tkn . get ( " refresh_token " ) :
self . set ( " NextRefresh " , time . time ( ) + 300 )
return
try :
self . set (
" acs_tkn " ,
self . sp_auth . refresh_access_token ( self . get ( " acs_tkn " ) [ " refresh_token " ] ) ,
)
new_token = self . sp_auth . refresh_access_token ( acs_tkn [ " refresh_token " ] )
self . set ( " acs_tkn " , new_token )
self . set ( " NextRefresh " , time . time ( ) + 45 * 60 )
self . sp = spotipy . Spotify ( auth = self . get ( " acs_tkn " ) [ " access_token " ] )
if new_token and new_token . get ( " access_token " ) :
self . sp = spotipy . Spotify ( auth = new_token [ " access_token " ] )
logger . debug ( " Token refreshed successfully " )
except Exception as e :
logger . error ( f " Spotify watcher error: { e } " )
logger . error ( " Token refresh error: %s " , e , exc_info = True )
if " Refresh token revoked " in str ( e ) :
logger . warning ( " Refresh token revoked, re-authenticating " )
refresh_token = await self . invoke ( " stokrefresh " , " " , self . inline . bot . id )
await refresh_token . delete ( )
else :
self . set ( " NextRefresh " , time . time ( ) + 300 )
self . set ( " NextRefresh " , time . time ( ) + 300 )