diff --git a/ZetGoHack/nullmod/Gradientor.py b/ZetGoHack/nullmod/Gradientor.py
new file mode 100644
index 0000000..91ba9cb
--- /dev/null
+++ b/ZetGoHack/nullmod/Gradientor.py
@@ -0,0 +1,246 @@
+#░░░███░███░███░███░███
+#░░░░░█░█░░░░█░░█░░░█░█
+#░░░░█░░███░░█░░█░█░█░█
+#░░░█░░░█░░░░█░░█░█░█░█
+#░░░███░███░░█░░███░███
+
+# meta developer: @ZetGo
+
+__version__ = (1, 0, 0)
+
+import io
+import math
+
+from PIL import Image, ImageDraw
+
+from herokutl.tl.custom import Message
+from herokutl.tl.functions.help import (
+ GetPeerProfileColorsRequest
+)
+from herokutl.tl.types import (
+ EmojiStatusCollectible
+)
+
+from .. import loader, utils
+
+def resize_image(image: Image.Image, max_size: int = 1280) -> Image.Image:
+ w, h = image.size
+ if max(w, h) <= max_size:
+ return image
+ else:
+ scale = max_size / max(w, h)
+ new_w = int(w * scale)
+ new_h = int(h * scale)
+
+ return image.resize((new_w, new_h), Image.LANCZOS)
+
+# Source: https://gist.github.com/weihanglo/1e754ec47fdd683a42fdf6a272904535#file-draw_gradient_pillow-py
+def get_gradient(size: tuple, color1: tuple, color2: tuple, gradient_type: str = "linear") -> Image.Image:
+ def interpolate(f_co, t_co, interval):
+ if interval <= 1:
+ yield list(t_co)
+ return
+
+ det_co = [(t - f) / (interval - 1) for f, t in zip(f_co, t_co)]
+ for i in range(interval):
+ yield [round(f + det * i) for f, det in zip(f_co, det_co)]
+
+ gradient = Image.new('RGB', size, color=(0, 0, 0))
+ draw = ImageDraw.Draw(gradient)
+
+ if gradient_type == "linear":
+ top_color, bottom_color = color1, color2
+
+ for y, color in enumerate(interpolate(top_color, bottom_color, max(1, size[1]))):
+ draw.line([(0, y), (size[0], y)], fill=tuple(color), width=1)
+
+ elif gradient_type == "radial":
+ center_color, edge_color = color1, color2
+
+ max_radius = math.hypot(size[0], size[1]) / 2.0
+ interval = max(1, int(math.ceil(max_radius)) + 1)
+
+ colors = list(interpolate(center_color, edge_color, interval))
+
+ cx = size[0] / 2
+ cy = size[1] / 2
+
+ for r_index, color in enumerate(colors):
+ r = interval - 1 - r_index
+ if r < 0:
+ continue
+ bbox = [
+ int(round(cx - r)),
+ int(round(cy - r)),
+ int(round(cx + r)),
+ int(round(cy + r))
+ ]
+ draw.ellipse(bbox, fill=tuple(color))
+
+ return gradient
+
+def set_gradient(im: io.BytesIO, gradient: Image.Image) -> io.BytesIO:
+ img = resize_image(Image.open(im).convert('RGBA'))
+
+ max_size = max(img.width, img.height)
+ gradient = gradient.resize((max_size, max_size), Image.LANCZOS).convert('RGBA')
+ left = (max_size - img.width) // 2
+ top = (max_size - img.height) // 2
+ gradient.paste(img, (left, top), img)
+ buffer = io.BytesIO()
+
+ gradient.save(buffer, format='PNG')
+
+ buffer.seek(0)
+ return buffer
+
+def crop_by_bbox(img: Image.Image):
+ img_w, img_h = img.size
+ x, y, w, h = BBOX_TGA_TGD
+
+ left = int(round(x * img_w))
+ top = int(round(y * img_h))
+ right = int(round((x + w) * img_w))
+ bottom = int(round((y + h) * img_h))
+
+ return img.crop((left, top, right, bottom))
+
+
+def hex_to_rgb(value: int):
+ return ((value >> 16) & 255, (value >> 8) & 255, value & 255)
+
+def hexes_to_rgbs(value: list):
+ if len(value) > 1:
+ res = list()
+ for i in value:
+ res.append(hex_to_rgb(i))
+
+ return tuple(res)
+ else:
+ res = hex_to_rgb(value[0])
+ return (res, res)
+
+SHAPES = {
+ # TODO: фигуры для создания масок на авы
+}
+
+BBOX_TGA_TGD = (
+ 2894 / 8268,
+ 1260 / 8268,
+ 2504 / 8268,
+ 2504 / 8268,
+)
+
+
+@loader.translatable_docstring
+class Gradientor(loader.Module):
+ strings = {
+ "name": "Gradientor",
+ "_cls_doc": "A module to create your profile picture with a background from your profile",
+ "gradient_creating": "🔁 Creating gradient...",
+ "gradient_created": "✅ Gradient created!",
+ }
+ strings_ru = {
+ "_cls_doc": "Модуль для создания вашей аватарки на фоне из вашего профиля",
+ "gradient_creating": "🔁 Создание градиента...",
+ "gradient_created": "✅ Градиент создан!",
+ }
+
+ async def client_ready(self):
+ self.colors = self.get("PROFILE_COLORS", None)
+ if not self.colors:
+ raw_colors = (await self.client(GetPeerProfileColorsRequest(0))).colors
+ self.colors = {
+ str(col.color_id): hexes_to_rgbs(col.dark_colors.bg_colors) for col
+ in raw_colors
+ }
+
+ self.set("PROFILE_COLORS", self.colors)
+
+ @loader.command(
+ ru_doc="[фотография/reply] - создать аватарку с градиентом из цвета профиля\n"
+ "--update-cache - обновить кеш профиля, если вы только что сменили фон профиля\n"
+ "--linear - использовать линейный градиент"
+ )
+ async def makepp(self, message: Message):
+ """[photo/reply] - create a profile picture with a gradient from profile color\n
+ --update-cache - update profile cache if you just changed profile background\n
+ --linear - use linear gradient"""
+ reply: Message = await message.get_reply_message()
+ args = utils.get_args(message)
+
+ if "--update-cache" in args:
+ upd_cache = True
+ args.remove("--update-cache")
+ else:
+ upd_cache = False
+
+ if "--linear" in args:
+ force_linear = True
+ args.remove("--linear")
+ else:
+ force_linear = False
+
+ user = None
+ background_only = False
+
+ if args:
+ user = await self.client.get_entity(int(args[0]) if args[0].isdigit() else args[0])
+
+ photo_source = (
+ message
+ if (not reply or not (reply.photo or reply.document and "image/" in getattr(reply.document, "mime_type", "")))
+ else reply
+ )
+ if not (photo_source.photo or photo_source.document and "image/" in getattr(photo_source.document, "mime_type", "")):
+ background_only = True
+
+ if not user:
+ if upd_cache:
+ user = self.client.hikka_me = await self.client.get_me()
+ elif reply:
+ user = reply.sender
+ else:
+ user = self.client.hikka_me
+
+ if not user.premium:
+ color1, color2 = (28, 28, 28), (28, 28, 28)
+
+ elif user.emoji_status and isinstance(user.emoji_status, EmojiStatusCollectible):
+ color1, color2 = (
+ user.emoji_status.edge_color, user.emoji_status.center_color
+ )
+ color1 = hex_to_rgb(color1)
+ color2 = hex_to_rgb(color2)
+
+ elif user.profile_color:
+ color_variant = user.profile_color.color
+
+ color1, color2 = self.colors.get(
+ str(color_variant),
+ ((28, 28, 28), (28, 28, 28))
+ )
+
+ else:
+ color1, color2 = (28, 28, 28), (28, 28, 28)
+
+ await utils.answer(message, self.strings["gradient_creating"])
+
+ gradient = get_gradient((1280, 1280), color1, color2, "linear" if force_linear else "radial")
+ gradient = crop_by_bbox(gradient)
+
+ if not background_only:
+ p_b = await photo_source.download_media(bytes)
+ p_b_io = io.BytesIO(p_b)
+ p_b_io.seek(0)
+
+ result = set_gradient(p_b_io, gradient)
+
+ else:
+ result = io.BytesIO()
+ gradient.save(result, format='PNG')
+ result.seek(0)
+
+ result.name = "grad @nullmod.png"
+
+ await utils.answer(message, self.strings["gradient_created"], file=result, force_document=True)
diff --git a/ZetGoHack/nullmod/full.txt b/ZetGoHack/nullmod/full.txt
index 41a7a9b..bd51138 100644
--- a/ZetGoHack/nullmod/full.txt
+++ b/ZetGoHack/nullmod/full.txt
@@ -1,3 +1,4 @@
Chess
HaremManager
-SchedulePlus
\ No newline at end of file
+SchedulePlus
+Gradientor
diff --git a/radiocycle/Modules/SpotifyMod.py b/radiocycle/Modules/SpotifyMod.py
index 7834a05..218588b 100644
--- a/radiocycle/Modules/SpotifyMod.py
+++ b/radiocycle/Modules/SpotifyMod.py
@@ -1 +1,1436 @@
-# License Violation. Original module has GPL-v3 but author of nowhere tried to change it to CC BY-ND 4.0
+# █ █ ▀ █▄▀ ▄▀█ █▀█ ▀
+# █▀█ █ █ █ █▀█ █▀▄ █
+# © Copyright 2022
+#
+# https://t.me/hikariatama
+#
+# 🔒 Licensed under the GNU AGPLv3
+# 🌐 https://www.gnu.org/licenses/agpl-3.0.html
+#
+# You CANNOT edit, distribute or redistribute this file without direct permission from the author.
+#
+# ORIGINAL MODULE: https://raw.githubusercontent.com/hikariatama/ftg/master/spotify.py
+
+# =======================================
+# _ __ __ __ _
+# | |/ /___ | \/ | ___ __| |___
+# | ' // _ \ | |\/| |/ _ \ / _` / __|
+# | . \ __/ | | | | (_) | (_| \__ \
+# |_|\_\___| |_| |_|\___/ \__,_|___/
+# @ke_mods
+# =======================================
+#
+# LICENSE: CC BY-ND 4.0 (Attribution-NoDerivatives 4.0 International)
+# --------------------------------------
+# https://creativecommons.org/licenses/by-nd/4.0/legalcode
+# =======================================
+
+# meta developer: @ke_mods
+# requires: telethon spotipy pillow requests yt-dlp
+
+import asyncio
+import contextlib
+import functools
+import io
+import logging
+import re
+import textwrap
+import time
+import traceback
+import os
+from types import FunctionType
+
+import requests
+import spotipy
+from PIL import Image, ImageDraw, ImageEnhance, ImageFilter, ImageFont, ImageOps
+from telethon.errors import FloodWaitError
+from telethon.tl.functions.account import UpdateProfileRequest
+from telethon.tl.types import Message
+
+from .. import loader, utils
+
+logger = logging.getLogger(__name__)
+logging.getLogger("spotipy").setLevel(logging.CRITICAL)
+
+class Banners:
+ def __init__(
+ self,
+ title: str,
+ artists: list,
+ duration: int,
+ progress: int,
+ track_cover: bytes,
+ font
+ ):
+ self.title = title
+ self.artists = ", ".join(artists) if isinstance(artists, list) else artists
+ self.duration = duration
+ self.progress = progress
+ self.track_cover = track_cover
+ self.font_url = font
+
+ def _get_font(self, size, font_bytes):
+ return ImageFont.truetype(io.BytesIO(font_bytes), size)
+
+ def _prepare_cover(self, size, radius):
+ cover = Image.open(io.BytesIO(self.track_cover)).convert("RGBA")
+ cover = cover.resize((size, size), Image.Resampling.LANCZOS)
+
+ mask = Image.new("L", (size, size), 0)
+ draw = ImageDraw.Draw(mask)
+ draw.rounded_rectangle((0, 0, size, size), radius=radius, fill=255)
+
+ output = Image.new("RGBA", (size, size), (0, 0, 0, 0))
+ output.paste(cover, (0, 0), mask=mask)
+ return output
+
+ def _prepare_background(self, w, h):
+ bg = Image.open(io.BytesIO(self.track_cover)).convert("RGBA")
+ bg = bg.resize((w, h), Image.Resampling.BICUBIC)
+ bg = bg.filter(ImageFilter.GaussianBlur(radius=40))
+ bg = ImageEnhance.Brightness(bg).enhance(0.4)
+ return bg
+
+ def _draw_progress_bar(self, draw, x, y, w, h, progress_pct, color="white", bg_color="#5e5e5e"):
+ draw.rounded_rectangle((x, y, x + w, y + h), radius=h/2, fill=bg_color)
+
+ fill_w = int(w * progress_pct)
+ if fill_w > 0:
+ draw.rounded_rectangle((x, y, x + fill_w, y + h), radius=h/2, fill=color)
+
+ dot_radius = h * 1.2
+ dot_x = x + fill_w
+ dot_y = y + (h / 2)
+
+ draw.ellipse(
+ (dot_x - dot_radius, dot_y - dot_radius, dot_x + dot_radius, dot_y + dot_radius),
+ fill=color
+ )
+
+ def horizontal(self):
+ W, H = 1500, 600
+ padding = 60
+ cover_size = 480
+
+ font_bytes = requests.get(self.font_url).content
+ title_font = self._get_font(55, font_bytes)
+ artist_font = self._get_font(45, font_bytes)
+ time_font = self._get_font(25, font_bytes)
+
+ img = self._prepare_background(W, H)
+ draw = ImageDraw.Draw(img)
+
+ cover = self._prepare_cover(cover_size, 30)
+ img.paste(cover, (padding, (H - cover_size) // 2), cover)
+
+ text_x = padding + cover_size + 60
+ text_y_start = 100
+ text_width_limit = W - text_x - padding
+
+ display_title = self.title
+ while title_font.getlength(display_title) > text_width_limit and len(display_title) > 0:
+ display_title = display_title[:-1]
+ if len(display_title) < len(self.title): display_title += "…"
+
+ display_artist = self.artists
+ while artist_font.getlength(display_artist) > text_width_limit and len(display_artist) > 0:
+ display_artist = display_artist[:-1]
+ if len(display_artist) < len(self.artists): display_artist += "…"
+
+ draw.text((text_x, text_y_start), display_title, font=title_font, fill="white")
+ draw.text((text_x, text_y_start + 70), display_artist, font=artist_font, fill="#B3B3B3")
+
+ cur_time = f"{(self.progress//1000//60):02}:{(self.progress//1000%60):02}"
+ dur_time = f"{(self.duration//1000//60):02}:{(self.duration//1000%60):02}"
+
+ cur_w = time_font.getlength(cur_time)
+ dur_w = time_font.getlength(dur_time)
+
+ bar_y = 480
+ bar_h = 8
+ gap = 25
+
+ draw.text((text_x, bar_y - 12), cur_time, font=time_font, fill="white")
+
+ bar_start_x = text_x + cur_w + gap
+ bar_end_x = text_x + text_width_limit - dur_w - gap
+ bar_w = bar_end_x - bar_start_x
+
+ prog_pct = self.progress / self.duration if self.duration > 0 else 0
+ self._draw_progress_bar(draw, bar_start_x, bar_y, bar_w, bar_h, prog_pct)
+
+ draw.text((bar_end_x + gap, bar_y - 12), dur_time, font=time_font, fill="white")
+
+ by = io.BytesIO()
+ img.save(by, format="PNG")
+ by.seek(0)
+ by.name = "banner.png"
+ return by
+
+ def vertical(self):
+ W, H = 1000, 1500
+ padding = 80
+ cover_size = 800
+
+ font_bytes = requests.get(self.font_url).content
+ title_font = self._get_font(60, font_bytes)
+ artist_font = self._get_font(45, font_bytes)
+ time_font = self._get_font(35, font_bytes)
+
+ img = self._prepare_background(W, H)
+ draw = ImageDraw.Draw(img)
+
+ cover = self._prepare_cover(cover_size, 40)
+ cover_x = (W - cover_size) // 2
+ cover_y = 120
+ img.paste(cover, (cover_x, cover_y), cover)
+
+ text_area_y = cover_y + cover_size + 120
+ text_width_limit = W - (padding * 2)
+
+ display_title = self.title
+ while title_font.getlength(display_title) > text_width_limit and len(display_title) > 0:
+ display_title = display_title[:-1]
+ if len(display_title) < len(self.title): display_title += "…"
+
+ display_artist = self.artists
+ while artist_font.getlength(display_artist) > text_width_limit and len(display_artist) > 0:
+ display_artist = display_artist[:-1]
+ if len(display_artist) < len(self.artists): display_artist += "…"
+
+ title_w = title_font.getlength(display_title)
+ draw.text(((W - title_w) / 2, text_area_y), display_title, font=title_font, fill="white")
+
+ artist_w = artist_font.getlength(display_artist)
+ draw.text(((W - artist_w) / 2, text_area_y + 75), display_artist, font=artist_font, fill="#B3B3B3")
+
+ bar_y = text_area_y + 260
+ bar_h = 8
+ bar_w = W - (padding * 2)
+ prog_pct = self.progress / self.duration if self.duration > 0 else 0
+
+ self._draw_progress_bar(draw, padding, bar_y, bar_w, bar_h, prog_pct, color="white", bg_color="#5e5e5e")
+
+ cur_time = f"{(self.progress//1000//60):02}:{(self.progress//1000%60):02}"
+ dur_time = f"{(self.duration//1000//60):02}:{(self.duration//1000%60):02}"
+
+ draw.text((padding, bar_y + 40), cur_time, font=time_font, fill="#B3B3B3")
+
+ dur_w = time_font.getlength(dur_time)
+ draw.text((W - padding - dur_w, bar_y + 40), dur_time, font=time_font, fill="#B3B3B3")
+
+ by = io.BytesIO()
+ img.save(by, format="PNG")
+ by.seek(0)
+ by.name = "banner.png"
+ return by
+
+@loader.tds
+class SpotifyMod(loader.Module):
+ """Card with the currently playing track on Spotify."""
+
+ strings = {
+ "name": "SpotifyMod",
+ "need_auth": (
+ "❌ Please execute"
+ " .sauth before performing this action."
+ ),
+ "on-repeat": (
+ "🔄 Set on-repeat."
+ ),
+ "off-repeat": (
+ "🔄 Stopped track"
+ " repeat."
+ ),
+ "skipped": (
+ "➡️ Skipped track."
+ ),
+ "playing": "▶️ Playing...",
+ "back": (
+ "⬅️ Switched to previous"
+ " track"
+ ),
+ "paused": "❌ Pause",
+ "restarted": (
+ "✅️ Playing track"
+ " from the"
+ " beginning"
+ ),
+ "liked": (
+ "❤️ Liked current"
+ " playback"
+ ),
+ "unlike": (
+ "❌"
+ " Unliked current playback"
+ ),
+ "err": (
+ "❌ An error occurred."
+ "\n{}"
+ ),
+ "already_authed": (
+ "❌ Already authorized"
+ ),
+ "authed": (
+ "✅ Authentication"
+ " successful"
+ ),
+ "deauth": (
+ "🚪 Successfully logged out"
+ " of account"
+ ),
+ "auth": (
+ '🔗 Follow this'
+ " link, allow access, then enter .scode https://... with"
+ " the link you received."
+ ),
+ "no_music": (
+ "❌ No music is playing!"
+ ),
+ "dl_err": (
+ "❌ Failed to download"
+ " track."
+ ),
+ "volume_changed": (
+ "🔊"
+ " Volume changed to {}%."
+ ),
+ "volume_invalid": (
+ "❌ Volume level must be"
+ " a number between 0 and 100."
+ ),
+ "volume_err": (
+ "❌ An error occurred while"
+ " changing volume."
+ ),
+ "no_volume_arg": (
+ "❌ Please specify a"
+ " volume level between 0 and 100."
+ ),
+ "searching_tracks": (
+ "🕔 Searching for tracks"
+ " matching {}..."
+ ),
+ "no_search_query": (
+ "❌ Please specify a"
+ " search query."
+ ),
+ "no_tracks_found": (
+ "❌ No tracks found for"
+ " {}."
+ ),
+ "search_results": (
+ "✅ Search results for"
+ " {}:\n\n{}"
+ ),
+ "downloading_search_track": (
+ "🕔 Downloading {}..."
+ ),
+ "download_success": (
+ "✅ Successfully downloaded {} - {}"
+ ),
+ "invalid_track_number": (
+ "❌ Invalid track number."
+ " Please search first or provide a valid number from the list."
+ ),
+ "device_list": (
+ "📄 Available devices:\n{}"
+ ),
+ "no_devices_found": (
+ "❌ No devices found."
+ ),
+ "device_changed": (
+ "✅ Playback transferred to"
+ " {}."
+ ),
+ "invalid_device_id": (
+ "❌ Invalid device ID."
+ " Use .sdevice to see available devices."
+ ),
+ "search_results_cleared": "✅ Search results cleared",
+ "autobio": (
+ "🎧 Spotify autobio {}"
+ ),
+ "no_ytdlp": "❌ yt-dlp not found... Check config or install yt-dlp ({}terminal pip install yt-dlp)",
+ "snowt_failed": "\n\n❌ Download failed",
+ "uploading_banner": "\n\n🕔 Uploading banner...",
+ "downloading_track": "\n\n🕔 Downloading track...",
+ "no_playlists": "❌ No playlists found.",
+ "playlists_list": "📄 Your playlists:\n\n{}",
+ "added_to_playlist": "✅ Added {} to {}",
+ "removed_from_playlist": "✅ Removed {} from {}",
+ "invalid_playlist_index": "❌ Invalid playlist number.",
+ "no_cached_playlists": "❌ Use .splaylists first.",
+ "playlist_created": "✅ Playlist {} created.",
+ "playlist_deleted": "✅ Playlist {} deleted.",
+ "no_playlist_name": "❌ Please specify a playlist name.",
+ }
+
+ strings_ru = {
+ "_cls_doc": "Карточка с играющим треком в Spotify.",
+ "need_auth": (
+ "❌ Выполни"
+ " .sauth перед выполнением этого действия."
+ ),
+ "err": (
+ "❌ Произошла ошибка."
+ "\n{}"
+ ),
+ "on-repeat": (
+ "🔄 Включен повтор трека."
+ ),
+ "off-repeat": (
+ "🔄 Повтор трека отключён."
+ ),
+ "skipped": (
+ "➡️ Трек пропущен."
+ ),
+ "playing": "▶️ Играет...",
+ "back": (
+ "⬅️ Переключено на предыдущий трек"
+ ),
+ "paused": "❌ Пауза",
+ "restarted": (
+ "✅️ Воспроизведение трека с начала..."
+ ),
+ "liked": (
+ "❤️ Текущий трек добавлен в избранное"
+ ),
+ "unlike": (
+ "❌ Убрал лайк с текущего трека"
+ ),
+ "already_authed": (
+ "❌ Уже авторизован"
+ ),
+ "authed": (
+ "✅ Успешная аутентификация"
+ ),
+ "deauth": (
+ "🚪 Успешный выход из аккаунта"
+ ),
+ "auth": (
+ '🔗 Пройдите по этой ссылке, разрешите вход, затем введите .scode https://... с ссылкой которую вы получили.'
+ ),
+ "no_music": (
+ "❌ Музыка не играет!"
+ ),
+ "dl_err": (
+ "❌ Не удалось скачать трек."
+ ),
+ "volume_changed": (
+ "🔊"
+ " Громкость изменена на {}%."
+ ),
+ "volume_invalid": (
+ "❌ Уровень громкости должен"
+ " быть числом от 0 до 100."
+ ),
+ "volume_err": (
+ "❌ Произошла ошибка при"
+ " изменении громкости."
+ ),
+ "no_volume_arg": (
+ "❌ Пожалуйста, укажите"
+ " уровень громкости от 0 до 100."
+ ),
+ "searching_tracks": (
+ "🕔 Идет поиск треков"
+ " по запросу {}..."
+ ),
+ "no_search_query": (
+ "❌ Пожалуйста, укажите"
+ " поисковый запрос."
+ ),
+ "no_tracks_found": (
+ "❌ По запросу '{}'"
+ " ничего не найдено."
+ ),
+ "search_results": (
+ "✅ Результаты поиска"
+ " по запросу {}:\n\n{}"
+ ),
+ "downloading_search_track": (
+ "🕔 Скачиваю {}..."
+ ),
+ "download_success": (
+ "✅ Трек {} - {} успешно скачан."
+ ),
+ "invalid_track_number": (
+ "❌ Некорректный номер трека."
+ " Сначала выполните поиск или укажите правильный номер из списка."
+ ),
+ "device_list": (
+ "📄 Доступные устройства:\n{}"
+ ),
+ "no_devices_found": (
+ "❌ Устройства не найдены."
+ ),
+ "device_changed": (
+ "✅ Воспроизведение переключено на"
+ " {}."
+ ),
+ "invalid_device_id": (
+ "❌ Некорректный ID устройства."
+ " Используйте .sdevice , чтобы увидеть доступные устройства."
+ ),
+ "search_results_cleared": "✅ Результаты поиска очищены",
+ "autobio": (
+ "🎧 Обновление био"
+ " включено {}"
+ ),
+ "no_ytdlp": "❌ yt-dlp не найден... Проверьте конфиг или установите yt-dlp ({}terminal pip install yt-dlp)",
+ "snowt_failed": "\n\n❌ Ошибка скачивания.",
+ "uploading_banner": "\n\n🕔 Загрузка баннера...",
+ "downloading_track": "\n\n🕔 Скачивание трека...",
+ "no_playlists": "❌ Плейлисты не найдены.",
+ "playlists_list": "📄 Ваши плейлисты:\n\n{}",
+ "added_to_playlist": "✅ Трек {} добавлен в {}",
+ "removed_from_playlist": "✅ Трек {} удален из {}",
+ "invalid_playlist_index": "❌ Неверный номер плейлиста.",
+ "no_cached_playlists": "❌ Сначала используйте .splaylists.",
+ "playlist_created": "✅ Плейлист {} создан.",
+ "playlist_deleted": "✅ Плейлист {} удален.",
+ "no_playlist_name": "❌ Пожалуйста, укажите название плейлиста.",
+ }
+ strings_jp = {
+ "_cls_doc": "Spotify からのメッセージ",
+ "need_auth": (
+ "❌ この操作を行う前に "
+ ".sauth を実行してください。"
+ ),
+ "on-repeat": (
+ "🔄 リピート再生を設定しました。"
+ ),
+ "off-repeat": (
+ "🔄 リピート再生を解除しました。"
+ ),
+ "skipped": (
+ "➡️ スキップしました。"
+ ),
+ "playing": "▶️ 再生中...",
+ "back": (
+ "⬅️ 前のトラックに戻りました。"
+ ),
+ "paused": "❌ 一時停止",
+ "restarted": (
+ "✅️ 最初から再生します。"
+ ),
+ "liked": (
+ "❤️ お気に入りに追加しました。"
+ ),
+ "unlike": (
+ "❌"
+ " お気に入りから削除しました。"
+ ),
+ "err": (
+ "❌ エラーが発生しました。"
+ "\n{}"
+ ),
+ "already_authed": (
+ "❌ 既に認証されています。"
+ ),
+ "authed": (
+ "✅ 認証に成功しました。"
+ ),
+ "deauth": (
+ "🚪 ログアウトしました。"
+ ),
+ "auth": (
+ '🔗 リンクをクリックしてアクセスを許可し、取得したURLを使って .scode https://... を入力してください。'
+ ),
+ "no_music": (
+ "❌ 音楽は再生されていません!"
+ ),
+ "dl_err": (
+ "❌ トラックのダウンロードに失敗しました。"
+ ),
+ "volume_changed": (
+ "🔊"
+ " 音量を {}% に変更しました。"
+ ),
+ "volume_invalid": (
+ "❌ 音量は0から100の数字で指定してください。"
+ ),
+ "volume_err": (
+ "❌ 音量の変更中にエラーが発生しました。"
+ ),
+ "no_volume_arg": (
+ "❌ 0から100の間で音量を指定してください。"
+ ),
+ "searching_tracks": (
+ "🕔 {} を検索中..."
+ ),
+ "no_search_query": (
+ "❌ 検索キーワードを指定してください。"
+ ),
+ "no_tracks_found": (
+ "❌ {} は見つかりませんでした。"
+ ),
+ "search_results": (
+ "✅ {} の検索結果:\n\n{}"
+ ),
+ "downloading_search_track": (
+ "🕔 {} をダウンロード中..."
+ ),
+ "download_success": (
+ "✅ {} - {} のダウンロードに成功しました。"
+ ),
+ "invalid_track_number": (
+ "❌ トラック番号が無効です。"
+ " 先に検索するか、リストから有効な番号を指定してください。"
+ ),
+ "device_list": (
+ "📄 利用可能なデバイス:\n{}"
+ ),
+ "no_devices_found": (
+ "❌ デバイスが見つかりません。"
+ ),
+ "device_changed": (
+ "✅ 再生デバイスを"
+ " {} に切り替えました。"
+ ),
+ "invalid_device_id": (
+ "❌ デバイスIDが無効です。"
+ " .sdevice で利用可能なデバイスを確認してください。"
+ ),
+ "search_results_cleared": "✅ 検索結果をクリアしました。",
+ "autobio": (
+ "🎧 Spotify AutoBio: {}"
+ ),
+ "no_ytdlp": "❌ yt-dlpが見つかりません... 設定を確認するか、インストールしてください ({}terminal pip install yt-dlp)",
+ "snowt_failed": "\n\n❌ ダウンロードに失敗しました。",
+ "uploading_banner": "\n\n🕔 バナーをアップロード中...",
+ "downloading_track": "\n\n🕔 トラックをダウンロード中...",
+ "no_playlists": "❌ プレイリストが見つかりません。",
+ "playlists_list": "📄 あなたのプレイリスト:\n\n{}",
+ "added_to_playlist": "✅ {} を {} に追加しました。",
+ "removed_from_playlist": "✅ {} を {} から削除しました。",
+ "invalid_playlist_index": "❌ プレイリスト番号が無効です。",
+ "no_cached_playlists": "❌ 先に .splaylists を使用してください。",
+ "playlist_created": "✅ プレイリスト {} を作成しました。",
+ "playlist_deleted": "✅ プレイリスト {} を削除しました。",
+ "no_playlist_name": "❌ プレイリスト名を指定してください。",
+ }
+
+ def __init__(self):
+ self._client_id = "e0708753ab60499c89ce263de9b4f57a"
+ self._client_secret = "80c927166c664ee98a43a2c0e2981b4a"
+ self.scope = (
+ "user-read-playback-state playlist-read-private playlist-read-collaborative"
+ " user-modify-playback-state user-library-modify"
+ " playlist-modify-public playlist-modify-private"
+ )
+ self.sp_auth = spotipy.oauth2.SpotifyOAuth(
+ client_id=self._client_id,
+ client_secret=self._client_secret,
+ redirect_uri="https://thefsch.github.io/spotify/",
+ scope=self.scope,
+ )
+ self.config = loader.ModuleConfig(
+ loader.ConfigValue(
+ "show_banner",
+ True,
+ "Show banner with track info",
+ validator=loader.validators.Boolean(),
+ ),
+ loader.ConfigValue(
+ "custom_text",
+ (
+ "🎧 Now playing: {track} — {artists}\n"
+ "🔗 song.link"
+ ),
+ """Custom text, supports {track}, {artists}, {album}, {playlist}, {playlist_owner}, {spotify_url}, {songlink}, {progress}, {duration}, {device} placeholders""",
+ validator=loader.validators.String(),
+ ),
+ loader.ConfigValue(
+ "font",
+ "https://raw.githubusercontent.com/kamekuro/assets/master/fonts/Onest-Bold.ttf",
+ "Custom font. Specify URL to .ttf file",
+ validator=loader.validators.String(),
+ ),
+ loader.ConfigValue(
+ "auto_bio_template",
+ "🎧 {}",
+ lambda: "Template for Spotify AutoBio",
+ ),
+ loader.ConfigValue(
+ "ytdlp_path",
+ "",
+ "Path to ytdlp binary",
+ validator=loader.validators.String(),
+ ),
+ loader.ConfigValue(
+ "banner_version",
+ "horizontal",
+ lambda: "Banner version",
+ validator=loader.validators.Choice(["horizontal", "vertical"]),
+ ),
+ )
+
+ async def client_ready(self, client, db):
+ self.font_ready = asyncio.Event()
+
+ self._premium = getattr(await client.get_me(), "premium", False)
+ try:
+ self.sp = spotipy.Spotify(auth=self.get("acs_tkn")["access_token"])
+ except Exception:
+ self.set("acs_tkn", None)
+ self.sp = None
+
+ if self.get("autobio", False):
+ self.autobio.start()
+
+ def tokenized(func) -> FunctionType:
+ @functools.wraps(func)
+ async def wrapped(*args, **kwargs):
+ if not args[0].get("acs_tkn", False) or not args[0].sp:
+ await utils.answer(args[1], args[0].strings("need_auth"))
+ return
+
+ return await func(*args, **kwargs)
+
+ wrapped.__doc__ = func.__doc__
+ wrapped.__module__ = func.__module__
+
+ return wrapped
+
+ def error_handler(func) -> FunctionType:
+ @functools.wraps(func)
+ async def wrapped(*args, **kwargs):
+ try:
+ return await func(*args, **kwargs)
+ except Exception:
+ logger.exception(traceback.format_exc())
+ with contextlib.suppress(Exception):
+ await utils.answer(
+ args[1],
+ args[0].strings("err").format(traceback.format_exc()),
+ )
+
+ wrapped.__doc__ = func.__doc__
+ wrapped.__module__ = func.__module__
+
+ return wrapped
+
+
+ @loader.loop(interval=90)
+ async def autobio(self):
+ try:
+ current_playback = self.sp.current_playback()
+ track = current_playback["item"]["name"]
+ track = re.sub(r"([(].*?[)])", "", track).strip()
+ except Exception:
+ return
+
+ bio = self.config["auto_bio_template"].format(f"{track}")
+
+ try:
+ await self._client(
+ UpdateProfileRequest(about=bio[: 140 if self._premium else 70])
+ )
+ except FloodWaitError as e:
+ logger.info(f"Sleeping {max(e.seconds, 60)} bc of floodwait")
+ await asyncio.sleep(max(e.seconds, 60))
+ return
+
+ async def _download_track(self, message, query: str, caption: str = ""):
+ dl_dir = os.path.join(os.getcwd(), "spotifymod")
+ if not os.path.exists(dl_dir):
+ os.makedirs(dl_dir, exist_ok=True)
+
+ for f in os.listdir(dl_dir):
+ try:
+ os.remove(os.path.join(dl_dir, f))
+ except:
+ pass
+
+ try:
+ squery = query.replace('"', '').replace("'", "")
+
+ cmd = (
+ f'{self.config["ytdlp_path"]} -x --audio-format mp3 --add-metadata '
+ f'-o "{dl_dir}/%(title)s [%(id)s].%(ext)s" '
+ f'"ytsearch1:{squery}"'
+ )
+
+ proc = await asyncio.create_subprocess_shell(
+ cmd,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE
+ )
+ await proc.communicate()
+
+ files = [f for f in os.listdir(dl_dir) if f.endswith(".mp3")]
+
+ if files:
+ target_file = os.path.join(dl_dir, files[0])
+ await utils.answer(message, caption, file=target_file)
+ else:
+ await utils.answer(message, self.strings("snowt_failed"))
+
+ except Exception as e:
+ logger.error(e)
+ await utils.answer(message, self.strings("dl_err"))
+
+ finally:
+ if os.path.exists(dl_dir):
+ for f in os.listdir(dl_dir):
+ try:
+ os.remove(os.path.join(dl_dir, f))
+ except:
+ pass
+
+
+ @error_handler
+ @tokenized
+ @loader.command(
+ ru_doc="- ➕ Добавить текущий трек в плейлист (используйте номер из .splaylists)"
+ )
+ async def splaylistadd(self, message: Message):
+ """- ➕ Add current track to playlist (use number from .splaylists)"""
+ args = utils.get_args_raw(message)
+ if not args or not args.isdigit():
+ await utils.answer(message, self.strings("invalid_playlist_index"))
+ return
+
+ index = int(args) - 1
+ playlists = self.get("last_playlists", [])
+
+ if not playlists:
+ await utils.answer(message, self.strings("no_cached_playlists"))
+ return
+
+ if index < 0 or index >= len(playlists):
+ await utils.answer(message, self.strings("invalid_playlist_index"))
+ return
+
+ current = self.sp.current_playback()
+ if not current or not current.get("item"):
+ await utils.answer(message, self.strings("no_music"))
+ return
+
+ track_uri = current["item"]["uri"]
+ track_name = current["item"]["name"]
+ artists = ", ".join([a["name"] for a in current["item"]["artists"]])
+ full_track_name = f"{artists} - {track_name}"
+
+ playlist_id = playlists[index]["id"]
+ playlist_name = playlists[index]["name"]
+
+ try:
+ self.sp.playlist_add_items(playlist_id, [track_uri])
+ except spotipy.exceptions.SpotifyException as e:
+ if e.http_status == 403 and "Insufficient client scope" in str(e):
+ await utils.answer(message, self.strings("need_auth"))
+ return
+ raise e
+
+ await utils.answer(message, self.strings("added_to_playlist").format(utils.escape_html(full_track_name), utils.escape_html(playlist_name)))
+
+ @error_handler
+ @tokenized
+ @loader.command(
+ ru_doc="- ➖ Удалить текущий трек из плейлиста (используйте номер из .splaylists)"
+ )
+ async def splaylistrem(self, message: Message):
+ """- ➖ Remove current track from playlist (use number from .splaylists)"""
+ args = utils.get_args_raw(message)
+ if not args or not args.isdigit():
+ await utils.answer(message, self.strings("invalid_playlist_index"))
+ return
+
+ index = int(args) - 1
+ playlists = self.get("last_playlists", [])
+
+ if not playlists:
+ await utils.answer(message, self.strings("no_cached_playlists"))
+ return
+
+ if index < 0 or index >= len(playlists):
+ await utils.answer(message, self.strings("invalid_playlist_index"))
+ return
+
+ current = self.sp.current_playback()
+ if not current or not current.get("item"):
+ await utils.answer(message, self.strings("no_music"))
+ return
+
+ track_uri = current["item"]["uri"]
+ track_name = current["item"]["name"]
+ artists = ", ".join([a["name"] for a in current["item"]["artists"]])
+ full_track_name = f"{artists} - {track_name}"
+
+ playlist_id = playlists[index]["id"]
+ playlist_name = playlists[index]["name"]
+
+ try:
+ self.sp.playlist_remove_all_occurrences_of_items(playlist_id, [track_uri])
+ except spotipy.exceptions.SpotifyException as e:
+ if e.http_status == 403 and "Insufficient client scope" in str(e):
+ await utils.answer(message, self.strings("need_auth"))
+ return
+ raise e
+
+ await utils.answer(message, self.strings("removed_from_playlist").format(utils.escape_html(full_track_name), utils.escape_html(playlist_name)))
+
+ @error_handler
+ @tokenized
+ @loader.command(
+ ru_doc="- 🆕 Создать новый плейлист"
+ )
+ async def splaylistcreate(self, message: Message):
+ """- 🆕 Create a new playlist"""
+ name = utils.get_args_raw(message)
+ if not name:
+ await utils.answer(message, self.strings("no_playlist_name"))
+ return
+
+ user_id = self.sp.me()["id"]
+ self.sp.user_playlist_create(user_id, name)
+ await utils.answer(message, self.strings("playlist_created").format(utils.escape_html(name)))
+
+ @error_handler
+ @tokenized
+ @loader.command(
+ ru_doc="- 🗑 Удалить плейлист (используйте номер из .splaylists)"
+ )
+ async def splaylistdelete(self, message: Message):
+ """- 🗑 Delete playlist (use number from .splaylists)"""
+ args = utils.get_args_raw(message)
+ if not args or not args.isdigit():
+ await utils.answer(message, self.strings("invalid_playlist_index"))
+ return
+
+ index = int(args) - 1
+ playlists = self.get("last_playlists", [])
+
+ if not playlists:
+ await utils.answer(message, self.strings("no_cached_playlists"))
+ return
+
+ if index < 0 or index >= len(playlists):
+ await utils.answer(message, self.strings("invalid_playlist_index"))
+ return
+
+ playlist_id = playlists[index]["id"]
+ playlist_name = playlists[index]["name"]
+
+ self.sp.current_user_unfollow_playlist(playlist_id)
+ await utils.answer(message, self.strings("playlist_deleted").format(utils.escape_html(playlist_name)))
+
+ @error_handler
+ @tokenized
+ @loader.command(
+ ru_doc="- 📃 Получить все плейлисты"
+ )
+ async def splaylists(self, message: Message):
+ """- 📃 Get all playlists"""
+ user_id = self.sp.me()["id"]
+ playlists = self.sp.current_user_playlists()
+
+ editable_playlists = []
+ for playlist in playlists["items"]:
+ if playlist["owner"]["id"] == user_id or playlist["collaborative"]:
+ editable_playlists.append(playlist)
+
+ self.set("last_playlists", editable_playlists)
+
+ playlist_list_text = ""
+ for i, playlist in enumerate(editable_playlists):
+ name = utils.escape_html(playlist["name"])
+ url = playlist["external_urls"]["spotify"]
+ count = playlist["tracks"]["total"]
+ playlist_list_text += f"{i + 1}. {name} ({count} tracks)\n"
+
+ if not playlist_list_text:
+ await utils.answer(message, self.strings("no_playlists"))
+ else:
+ await utils.answer(message, self.strings("playlists_list").format(playlist_list_text))
+
+ @error_handler
+ @tokenized
+ @loader.command(
+ ru_doc="- ℹ️ Переключить стриминг воспроизведения в био"
+ )
+ async def sbiocmd(self, message: Message):
+ """- ℹ️ Toggle bio playback streaming"""
+ current = self.get("autobio", False)
+ new = not current
+ self.set("autobio", new)
+ await utils.answer(
+ message,
+ self.strings("autobio").format("enabled" if new else "disabled"),
+ )
+
+ if new:
+ self.autobio.start()
+ else:
+ self.autobio.stop()
+
+ @error_handler
+ @tokenized
+ @loader.command(
+ ru_doc="- 🔊 Изменить громкость. .svolume <0-100>"
+ )
+ async def svolume(self, message: Message):
+ """- 🔊 Change playback volume. .svolume <0-100>"""
+ try:
+ args = utils.get_args_raw(message)
+ if not args:
+ await utils.answer(message, self.strings("no_volume_arg"))
+ return
+
+ volume_percent = int(args)
+ if 0 <= volume_percent <= 100:
+ self.sp.volume(volume_percent)
+ await utils.answer(message, self.strings("volume_changed").format(volume_percent))
+ else:
+ await utils.answer(message, self.strings("volume_invalid"))
+ except ValueError:
+ await utils.answer(message, self.strings("volume_invalid"))
+ except Exception:
+ await utils.answer(message, self.strings("volume_err"))
+
+ @error_handler
+ @tokenized
+ @loader.command(
+ ru_doc=(
+ "- 🎵 Выбрать устройство для воспроизведения. Например: .sdevice \n"
+ "- 📝 Показать список устройств: .sdevice"
+ )
+ )
+ async def sdevicecmd(self, message: Message):
+ """- 🎵 Set preferred playback device. Usage: .sdevice or .sdevice to list devices"""
+ args = utils.get_args_raw(message)
+ devices = self.sp.devices()["devices"]
+
+ if not args:
+ if not devices:
+ await utils.answer(message, self.strings("no_devices_found"))
+ return
+
+ device_list_text = ""
+ for i, device in enumerate(devices):
+ is_active = "(active)" if device["is_active"] else ""
+ device_list_text += (
+ f"{i+1}. {device['name']}"
+ f" ({device['type']}) {is_active}\n"
+ )
+
+ await utils.answer(message, self.strings("device_list").format(device_list_text.strip()))
+ return
+
+ device_id = None
+ try:
+ device_number = int(args)
+ if 0 < device_number <= len(devices):
+ device_id = devices[device_number - 1]["id"]
+ device_name = devices[device_number - 1]["name"]
+ else:
+ await utils.answer(message, self.strings("invalid_device_id"))
+ return
+ except ValueError:
+ found_device = next((d for d in devices if d["id"] == args.strip()), None)
+ if found_device:
+ device_id = found_device["id"]
+ device_name = found_device["name"]
+ else:
+ await utils.answer(message, self.strings("invalid_device_id"))
+ return
+
+ self.sp.transfer_playback(device_id=device_id)
+ await utils.answer(message, self.strings("device_changed").format(device_name))
+
+ @error_handler
+ @tokenized
+ @loader.command(
+ ru_doc="- 💫 Включить повтор трека"
+ )
+ async def srepeatcmd(self, message: Message):
+ """- 💫 Repeat"""
+ self.sp.repeat("track")
+ await utils.answer(message, self.strings("on-repeat"))
+
+ @error_handler
+ @tokenized
+ @loader.command(
+ ru_doc="- ✋ Остановить повтор"
+ )
+ async def sderepeatcmd(self, message: Message):
+ """- ✋ Stop repeat"""
+ self.sp.repeat("context")
+ await utils.answer(message, self.strings("off-repeat"))
+
+ @error_handler
+ @tokenized
+ @loader.command(
+ ru_doc="- 👉 Следующий трек"
+ )
+ async def snextcmd(self, message: Message):
+ """- 👉 Next track"""
+ self.sp.next_track()
+ await utils.answer(message, self.strings("skipped"))
+
+ @error_handler
+ @tokenized
+ @loader.command(
+ ru_doc="- 🤚 Продолжить воспроизведение"
+ )
+ async def sresumecmd(self, message: Message):
+ """- 🤚 Resume"""
+ self.sp.start_playback()
+ await utils.answer(message, self.strings("playing"))
+
+ @error_handler
+ @tokenized
+ @loader.command(
+ ru_doc="- 🤚 Пауза"
+ )
+ async def spausecmd(self, message: Message):
+ """- 🤚 Pause"""
+ self.sp.pause_playback()
+ await utils.answer(message, self.strings("paused"))
+
+ @error_handler
+ @tokenized
+ @loader.command(
+ ru_doc="- ⏮ Предыдущий трек"
+ )
+ async def sbackcmd(self, message: Message):
+ """- ⏮ Previous track"""
+ self.sp.previous_track()
+ await utils.answer(message, self.strings("back"))
+
+ @error_handler
+ @tokenized
+ @loader.command(
+ ru_doc="- ⏪ Перезапустить трек"
+ )
+ async def sbegincmd(self, message: Message):
+ """- ⏪ Restart track"""
+ self.sp.seek_track(0)
+ await utils.answer(message, self.strings("restarted"))
+
+ @error_handler
+ @tokenized
+ @loader.command(
+ ru_doc="- ❤️ Лайкнуть играющий трек"
+ )
+ async def slikecmd(self, message: Message):
+ """- ❤️ Like current track"""
+ cupl = self.sp.current_playback()
+ self.sp.current_user_saved_tracks_add([cupl["item"]["id"]])
+ await utils.answer(message, self.strings("liked"))
+
+ @error_handler
+ @tokenized
+ @loader.command(
+ ru_doc="- 💔 Убрать лайк с играющего трека"
+ )
+ async def sunlikecmd(self, message: Message):
+ """- 💔 Unlike current track"""
+ cupl = self.sp.current_playback()
+ self.sp.current_user_saved_tracks_delete([cupl["item"]["id"]])
+ await utils.answer(message, self.strings("unlike"))
+
+ @error_handler
+ @loader.command(
+ ru_doc="- Получить ссылку для авторизации"
+ )
+ async def sauthcmd(self, message: Message):
+ """- Get authorization link"""
+ if self.get("acs_tkn", False) and not self.sp:
+ await utils.answer(message, self.strings("already_authed"))
+ else:
+ self.sp_auth.get_authorize_url()
+ await utils.answer(
+ message,
+ self.strings("auth").format(self.sp_auth.get_authorize_url()),
+ )
+
+ @error_handler
+ @loader.command(
+ ru_doc="- Вставить код авторизации"
+ )
+ async def scodecmd(self, message: Message):
+ """- Paste authorization code"""
+ url = message.message.split(" ")[1]
+ code = self.sp_auth.parse_auth_response_url(url)
+ self.set("acs_tkn", self.sp_auth.get_access_token(code, True, False))
+ self.sp = spotipy.Spotify(auth=self.get("acs_tkn")["access_token"])
+ await utils.answer(message, self.strings("authed"))
+
+ @error_handler
+ @loader.command(
+ ru_doc="- Выйти из аккаунта"
+ )
+ async def unauthcmd(self, message: Message):
+ """- Log out of account"""
+ self.set("acs_tkn", None)
+ del self.sp
+ await utils.answer(message, self.strings("deauth"))
+
+ @error_handler
+ @tokenized
+ @loader.command(
+ ru_doc="- Обновить токен авторизации"
+ )
+ async def stokrefreshcmd(self, message: Message):
+ """- Refresh authorization token"""
+ self.set(
+ "acs_tkn",
+ self.sp_auth.refresh_access_token(self.get("acs_tkn")["refresh_token"]),
+ )
+ self.set("NextRefresh", time.time() + 45 * 60)
+ self.sp = spotipy.Spotify(auth=self.get("acs_tkn")["access_token"])
+ await utils.answer(message, self.strings("authed"))
+
+ @error_handler
+ @tokenized
+ @loader.command(
+ ru_doc="- 🎧 Показать карточку играющего трека"
+ )
+ async def snowcmd(self, message: Message):
+ """- 🎧 View current track card."""
+ current_playback = self.sp.current_playback()
+ if not current_playback or not current_playback.get("is_playing", False):
+ await utils.answer(message, self.strings("no_music"))
+ return
+
+ track = current_playback["item"]["name"]
+ track_id = current_playback["item"]["id"]
+ artists = ", ".join([a["name"] for a in current_playback["item"]["artists"]])
+ album_name = current_playback["item"]["album"].get("name", "Unknown Album")
+ duration_ms = current_playback["item"].get("duration_ms", 0)
+ progress_ms = current_playback.get("progress_ms", 0)
+
+ duration = f"{duration_ms//1000//60}:{duration_ms//1000%60:02}"
+ progress = f"{progress_ms//1000//60}:{progress_ms//1000%60:02}"
+
+ spotify_url = f"https://open.spotify.com/track/{track_id}"
+ songlink = f"https://song.link/s/{track_id}"
+
+ try:
+ device_raw = (
+ current_playback["device"]["name"]
+ + " "
+ + current_playback["device"]["type"].lower()
+ )
+ device = device_raw.replace("computer", "").replace("smartphone", "").strip()
+ except Exception:
+ device = None
+
+ try:
+ playlist_id = current_playback["context"]["uri"].split(":")[-1]
+ playlist = self.sp.playlist(playlist_id)
+ playlist_name = playlist.get("name", None)
+ try:
+ playlist_owner = (
+ f''
+ f'{playlist["owner"]["display_name"]}'
+ )
+ except KeyError:
+ playlist_owner = playlist.get("owner", {}).get("display_name", "")
+ except Exception:
+ playlist_name = ""
+ playlist_owner = ""
+
+ text = self.config["custom_text"].format(
+ track=utils.escape_html(track),
+ artists=utils.escape_html(artists),
+ album=utils.escape_html(album_name),
+ duration=duration,
+ progress=progress,
+ device=device,
+ spotify_url=spotify_url,
+ songlink=songlink,
+ playlist=utils.escape_html(playlist_name) if playlist_name else "",
+ playlist_owner=playlist_owner or "",
+ )
+
+ if self.config["show_banner"]:
+ cover_url = current_playback["item"]["album"]["images"][0]["url"]
+
+ tmp_msg = await utils.answer(message, text + self.strings("uploading_banner"))
+
+ banners = Banners(
+ title=track,
+ artists=artists,
+ duration=duration_ms,
+ progress=progress_ms,
+ track_cover=requests.get(cover_url).content,
+ font=self.config["font"],
+ )
+ file = getattr(banners, self.config["banner_version"], banners.horizontal)()
+
+ await utils.answer(tmp_msg, text, file=file)
+ else:
+ await utils.answer(message, text)
+
+ @error_handler
+ @tokenized
+ @loader.command(
+ ru_doc="- 🎧 Скачать играющий трек"
+ )
+ async def snowtcmd(self, message: Message):
+ """- 🎧 Download current track."""
+ current_playback = self.sp.current_playback()
+ if not current_playback or not current_playback.get("is_playing", False):
+ await utils.answer(message, self.strings("no_music"))
+ return
+
+ track = current_playback["item"]["name"]
+ artists = ", ".join([a["name"] for a in current_playback["item"]["artists"]])
+ album_name = current_playback["item"]["album"].get("name", "Unknown Album")
+ duration_ms = current_playback["item"].get("duration_ms", 0)
+ progress_ms = current_playback.get("progress_ms", 0)
+
+ duration = f"{duration_ms//1000//60}:{duration_ms//1000%60:02}"
+ progress = f"{progress_ms//1000//60}:{progress_ms//1000%60:02}"
+
+ spotify_url = f"https://open.spotify.com/track/{current_playback['item']['id']}"
+ songlink = f"https://song.link/s/{current_playback['item']['id']}"
+
+ try:
+ device_raw = (
+ current_playback["device"]["name"]
+ + " "
+ + current_playback["device"]["type"].lower()
+ )
+ device = device_raw.replace("computer", "").replace("smartphone", "").strip()
+ except Exception:
+ device = None
+
+ try:
+ playlist_id = current_playback["context"]["uri"].split(":")[-1]
+ playlist = self.sp.playlist(playlist_id)
+ playlist_name = playlist.get("name", None)
+ try:
+ playlist_owner = (
+ f''
+ f'{playlist["owner"]["display_name"]}'
+ )
+ except KeyError:
+ playlist_owner = playlist.get("owner", {}).get("display_name", "")
+ except Exception:
+ playlist_name = ""
+ playlist_owner = ""
+
+ text = self.config["custom_text"].format(
+ track=utils.escape_html(track),
+ artists=utils.escape_html(artists),
+ album=utils.escape_html(album_name),
+ duration=duration,
+ progress=progress,
+ device=device,
+ spotify_url=spotify_url,
+ songlink=songlink,
+ playlist=utils.escape_html(playlist_name) if playlist_name else "",
+ playlist_owner=playlist_owner or "",
+ )
+
+ msg = await utils.answer(message, text + self.strings("downloading_track"))
+
+ await self._download_track(msg, f"{artists} {track}", caption=text)
+
+ @error_handler
+ @tokenized
+ @loader.command(
+ ru_doc=(
+ "- 🔍 Поиск треков. Например: .ssearch Imagine Dragons Believer\n"
+ "- 🎧 Скачать трек: .ssearch 1 (где 1 — номер трека из списка)"
+ )
+ )
+ async def ssearchcmd(self, message: Message):
+ """🔍 Search for tracks. Usage: .ssearch or .ssearch to download"""
+ args = utils.get_args_raw(message)
+ if not args:
+ await utils.answer(message, self.strings("no_search_query"))
+ return
+
+ try:
+ track_number = int(args)
+ search_results = self.get("last_search_results", [])
+
+ if not search_results:
+ await utils.answer(message, self.strings("no_tracks_found"))
+ return
+
+ if track_number <= 0 or track_number > len(search_results):
+ raise ValueError
+
+ msg = await utils.answer(message, self.strings("downloading_track"))
+
+ track_info = search_results[track_number - 1]
+ track_name = track_info["name"]
+ artists = ", ".join([a["name"] for a in track_info["artists"]])
+
+ caption_text = self.strings("download_success").format(
+ utils.escape_html(track_name),
+ utils.escape_html(artists)
+ )
+
+ await self._download_track(msg, f"{artists} {track_name}", caption=caption_text)
+ return
+
+ except ValueError:
+ await utils.answer(message, self.strings("searching_tracks").format(args))
+
+ results = self.sp.search(q=args, limit=5, type="track")
+
+ if not results or not results["tracks"]["items"]:
+ await utils.answer(message, self.strings("no_tracks_found").format(args))
+ return
+
+ self.set("last_search_results", results["tracks"]["items"])
+
+ tracks_list = []
+ for i, track in enumerate(results["tracks"]["items"]):
+ track_name = track["name"]
+ artists = ", ".join([artist["name"] for artist in track["artists"]])
+ track_url = track["external_urls"]["spotify"]
+ tracks_list.append(
+ "{number}. {track_name} — {artists}\n🔗 Spotify".format(
+ number=i + 1,
+ track_name=utils.escape_html(track_name),
+ artists=utils.escape_html(artists),
+ track_url=track_url,
+ )
+ )
+
+ text = "\n".join(tracks_list)
+ await utils.answer(message, self.strings("search_results").format(args, text))
+
+
+ @loader.command(
+ ru_doc="- 🔄 Сброс результатов поиска по трекам"
+ )
+ async def ssearchresetcmd(self, message: Message):
+ """- 🔄 Reset track search results"""
+ self.set("last_search_results", [])
+ await utils.answer(message, self.strings["search_results_cleared"])
+
+ async def watcher(self, message: Message):
+ """Watcher is used to update token"""
+ if not self.sp:
+ return
+
+ if self.get("NextRefresh", False):
+ ttc = self.get("NextRefresh", 0)
+ crnt = time.time()
+ if ttc < crnt:
+ self.set(
+ "acs_tkn",
+ self.sp_auth.refresh_access_token(
+ self.get("acs_tkn")["refresh_token"]
+ ),
+ )
+ self.set("NextRefresh", time.time() + 45 * 60)
+ self.sp = spotipy.Spotify(auth=self.get("acs_tkn")["access_token"])
+ else:
+ self.set(
+ "acs_tkn",
+ self.sp_auth.refresh_access_token(self.get("acs_tkn")["refresh_token"]),
+ )
+ self.set("NextRefresh", time.time() + 45 * 60)
+ self.sp = spotipy.Spotify(auth=self.get("acs_tkn")["access_token"])