From bfeb11ff5323836085d470a5bd4d7edb8c56b8cd Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Wed, 10 Jun 2026 02:44:47 +0000 Subject: [PATCH] Added and updated repositories 2026-06-10 02:44:47 --- coddrago/modules/YaMusic.py | 81 +++--------- radiocycle/Modules/SpotifyMod.py | 216 ++++++++++++++++++++++++------- 2 files changed, 192 insertions(+), 105 deletions(-) diff --git a/coddrago/modules/YaMusic.py b/coddrago/modules/YaMusic.py index 036e671..01becc2 100644 --- a/coddrago/modules/YaMusic.py +++ b/coddrago/modules/YaMusic.py @@ -26,7 +26,6 @@ from .. import loader, utils logger = logging.getLogger(__name__) - class Banners: def __init__( self, @@ -324,6 +323,16 @@ class YaMusicMod(loader.Module): "name": "YaMusic" } + duration_placeholder = { + "start_duration": "☀️☀️", + "start_full_duration": "☀️☀️", + "closed_duration": "☀️", + "empty_mid": "☀️", + "empty_closed_duration_duration": "☀️", + "end_duration_full": "☀️", + "empty_closed_duration": "☀️", + } + def __init__(self): self.config = loader.ModuleConfig( loader.ConfigValue( @@ -553,67 +562,19 @@ class YaMusicMod(loader.Module): return "0%" percent = (progress / duration) * 100 + fill_logic = int(percent // 16.66) - s_less_10 = ( - "" - "" - "" - "" - "" - "" - ) - - s_10_to_20 = ( - "" - "" - "" - "" - "" - "" - ) - - s_30_to_40 = ( - "" - "" - "" - "" - "" - "" - ) - - s_over_50 = ( - "" - "" - "" - "" - "" - "" - ) - - s_over_80 = ( - "" - "" - "" - "" - "" - "" - ) - - if percent < 10: - return s_less_10 - elif percent < 20: - return s_10_to_20 - elif percent < 30: - return s_10_to_20 - elif percent < 40: - return s_30_to_40 - elif percent < 50: - return s_30_to_40 - elif percent < 80: - return s_over_50 + bar = self.duration_placeholder["start_full_duration"] if fill_logic >= 1 else self.duration_placeholder["start_duration"] + for i in range(2, 6): + if fill_logic >= i: + bar += self.duration_placeholder["closed_duration"] + else: + bar += self.duration_placeholder["empty_mid"] + if fill_logic >= 6: + bar += self.duration_placeholder["end_duration_full"] else: - return s_over_80 - + bar += self.duration_placeholder["empty_closed_duration"] + return bar except Exception as e: return f"Error: {e}" diff --git a/radiocycle/Modules/SpotifyMod.py b/radiocycle/Modules/SpotifyMod.py index af176c6..cf2a30e 100644 --- a/radiocycle/Modules/SpotifyMod.py +++ b/radiocycle/Modules/SpotifyMod.py @@ -17,10 +17,10 @@ # ======================================= # # meta developer: @ke_mods -# requires: telethon spotipy pillow requests yt-dlp curl_cffi +# requires: telethon spotipy pillow requests httpx # scope: ffmpeg -__version__ = (1, 0) +__version__ = (1, 0, 2) import asyncio import contextlib @@ -35,6 +35,7 @@ import os from types import FunctionType import random +import httpx import requests import spotipy from PIL import Image, ImageDraw, ImageEnhance, ImageFilter, ImageFont, ImageOps @@ -48,6 +49,14 @@ from .. import loader, utils logger = logging.getLogger(__name__) logging.getLogger("spotipy").setLevel(logging.CRITICAL) +headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36", + "Accept": "application/json", + "Content-Type": "application/json", + "Origin": "https://spotmate.online", + "Referer": "https://spotmate.online/en1", +} + class Banners: def __init__( self, @@ -517,7 +526,6 @@ class SpotifyMod(loader.Module): "autobio": ( "🎧 Spotify autobio {}" ), - "no_ytdlp": " yt-dlp not found... Check config or install yt-dlp ({}terminal pip install yt-dlp)", "snowt_failed": "\n\n Download failed", "uploading_banner": "\n\n🕔 Uploading banner...", "downloading_track": "\n\n🕔 Downloading track...", @@ -531,6 +539,12 @@ class SpotifyMod(loader.Module): "playlist_deleted": " Playlist {} deleted.", "no_playlist_name": " Please specify a playlist name.", "device_select": "📄 Select playback device:", + "on-shuffle": ( + "🔀 Shuffle enabled." + ), + "off-shuffle": ( + "🔀 Shuffle disabled." + ), } strings_ru = { @@ -641,7 +655,6 @@ class SpotifyMod(loader.Module): "🎧 Обновление био" " включено {}" ), - "no_ytdlp": " yt-dlp не найден... Проверьте конфиг или установите yt-dlp ({}terminal pip install yt-dlp)", "snowt_failed": "\n\n Ошибка скачивания.", "uploading_banner": "\n\n🕔 Загрузка баннера...", "downloading_track": "\n\n🕔 Скачивание трека...", @@ -655,6 +668,12 @@ class SpotifyMod(loader.Module): "playlist_deleted": " Плейлист {} удален.", "no_playlist_name": " Пожалуйста, укажите название плейлиста.", "device_select": "📄 Выберите устройство для воспроизведения:", + "on-shuffle": ( + "🔀 Перемешивание включено." + ), + "off-shuffle": ( + "🔀 Перемешивание отключено." + ), } def __init__(self): @@ -700,16 +719,10 @@ class SpotifyMod(loader.Module): lambda: "Template for Spotify AutoBio, supports {artist}, {title}", ), loader.ConfigValue( - "ytdlp_path", - "", - "Path to ytdlp binary", - validator=loader.validators.String(), - ), - loader.ConfigValue( - "cookies_path", - "", - "Path to your cookies for yt-dlp", - validator=loader.validators.String(), + "TimeOut", + 60, + "Response timeout in seconds | Время ожидания ответа в секундах", + validator=loader.validators.Integer(minimum=30), ), loader.ConfigValue( "banner_version", @@ -944,36 +957,102 @@ class SpotifyMod(loader.Module): success = False try: - squery = query.replace('"', '').replace("'", "") - cookies = self.config["cookies_path"] - ytdlp_flags = '-x --audio-format mp3 --audio-quality 0 --add-metadata --format "bestaudio/best" --no-playlist' - cookies_flag = f"--cookies {cookies} " if cookies else "" - cmd = ( - f'{self.config["ytdlp_path"]} {ytdlp_flags} {cookies_flag}' - f'-o "{dl_dir}/%(title)s [%(id)s].%(ext)s" ' - f'"ytsearch1:{squery}"' - ) + track_url = (query or "").strip().split("?")[0] + if "spotify:track:" in track_url: + track_url = f"https://open.spotify.com/track/{track_url.split(':')[-1]}" - proc = await asyncio.create_subprocess_shell( - cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - _, stderr = await proc.communicate() + if "track/" not in track_url: + results = await asyncio.to_thread( + self.sp.search, + q=query, + limit=1, + type="track", + ) + items = (results or {}).get("tracks", {}).get("items", []) + if not items: + logger.error("SpotifyMod: Spotify track not found for %r", log_context or query) + await send_text(self.strings["snowt_failed"]) + return False - if proc.returncode: - err_text = stderr.decode(errors="ignore").strip() if stderr else "yt-dlp failed" - logger.error("SpotifyMod: yt-dlp code %s for %r: %s", proc.returncode, log_context or query, err_text[-400:]) + track_data = items[0] + track_url = track_data.get("external_urls", {}).get("spotify") or f"https://open.spotify.com/track/{track_data['id']}" - files = [f for f in os.listdir(dl_dir) if f.endswith(".mp3")] - if files: - success = await send_file(os.path.join(dl_dir, files[0])) + async with httpx.AsyncClient(follow_redirects=True) as client: + csrf = await self.get_session(client) + hdrs = {**headers, "X-CSRF-TOKEN": csrf} + + info_res = await client.post( + "https://spotmate.online/getTrackData", + headers=hdrs, + json={"spotify_url": track_url}, + timeout=self.config["TimeOut"], + ) + info = info_res.json() + if info.get("type") != "track": + logger.error("SpotifyMod: spotmate returned no track for %r", log_context or query) + await send_text(self.strings["snowt_failed"]) + return False + + track_id = info.get("id", track_url.split("/")[-1]) + conv_res = await client.post( + "https://spotmate.online/convert", + headers=hdrs, + json={"urls": track_url}, + timeout=self.config["TimeOut"], + ) + conv = conv_res.json() + download_url = conv.get("url") or conv.get("download_url") + task_id = conv.get("task_id") or conv.get("taskId") + + if not download_url and task_id: + for _ in range(40): + await asyncio.sleep(4.5) + task_res = await client.get( + f"https://spotmate.online/tasks/{task_id}", + headers={**hdrs, "Accept": "application/json"}, + timeout=self.config["TimeOut"], + ) + task = task_res.json() + if task.get("error"): + logger.error("SpotifyMod: task error for %r", log_context or query) + await send_text(self.strings["dl_err"]) + return False + + data = task.get("data") or task.get("result") or {} + status = str(data.get("status") or data.get("state") or "").lower() + if status == "finished": + download_url = ( + data.get("url") + or data.get("download_url") + or (data.get("result") or {}).get("url") + or (data.get("result") or {}).get("download_url") + ) + break + + if status in ("failed", "error", "expired", "cancelled"): + logger.error("SpotifyMod: task failed for %r", log_context or query) + await send_text(self.strings["dl_err"]) + return False + + if not download_url: + logger.error("SpotifyMod: download timeout for %r", log_context or query) + await send_text(self.strings["snowt_failed"]) + return False + + file_res = await client.get( + download_url, + headers={"User-Agent": headers["User-Agent"], "Referer": "https://spotmate.online/en1"}, + timeout=self.config["TimeOut"], + ) + + file_path = os.path.join(dl_dir, f"{track_id}.mp3") + with open(file_path, "wb") as f: + f.write(file_res.content) + + success = await send_file(file_path) if not success: logger.error("SpotifyMod: failed to send %r (target=%s)", log_context or query, type(target).__name__) await send_text(self.strings["dl_err"]) - else: - logger.error("SpotifyMod: yt-dlp produced no mp3 for %r", log_context or query) - await send_text(self.strings["snowt_failed"]) except Exception as e: logger.error("Download track error (%s): %s", log_context or "no context", e, exc_info=True) @@ -986,6 +1065,20 @@ class SpotifyMod(loader.Module): return success + async def get_session(self, client: httpx.AsyncClient) -> str: + res = await client.get( + "https://spotmate.online/en1", + headers={ + "User-Agent": headers["User-Agent"], + "Accept": "text/html", + }, + timeout=self.config["TimeOut"], + ) + match = re.search(r'csrf-token[^>]*content="([^"]+)"', res.text) + if not match: + raise ValueError("CSRF token not found") + return match.group(1) + def _short_text(self, text: str, limit: int = 60) -> str: text = " ".join(text.split()) if len(text) <= limit: @@ -1126,7 +1219,13 @@ class SpotifyMod(loader.Module): tracks = results["tracks"]["items"] store_id = id(tracks) - self._sp_store[store_id] = [(t.get("name", "Unknown"), ", ".join(a.get("name", "") for a in t.get("artists", []) if a.get("name")) or "Unknown Artist") for t in tracks] + self._sp_store[store_id] = [ + ( + t.get("name", "Unknown"), + ", ".join(a.get("name", "") for a in t.get("artists", []) if a.get("name")) or "Unknown Artist", + ) + for t in tracks + ] entries = [] for i, track in enumerate(tracks): @@ -1154,7 +1253,7 @@ class SpotifyMod(loader.Module): async def ssearch(self, query): """ - search Spotify track""" return await self._inline_search_tracks(query) - + @error_handler @tokenized @loader.command( @@ -1427,6 +1526,26 @@ class SpotifyMod(loader.Module): self.sp.repeat("context") await utils.answer(message, self.strings["off-repeat"]) + @error_handler + @tokenized + @loader.command( + ru_doc="- 🔀 Включить перемешивание" + ) + async def sshufflecmd(self, message: Message): + """- 🔀 Enable shuffle""" + self.sp.shuffle(True) + await utils.answer(message, self.strings["on-shuffle"]) + + @error_handler + @tokenized + @loader.command( + ru_doc="- 🔀 Отключить перемешивание" + ) + async def sdeshufflecmd(self, message: Message): + """- 🔀 Disable shuffle""" + self.sp.shuffle(False) + await utils.answer(message, self.strings["off-shuffle"]) + @error_handler @tokenized @loader.command( @@ -1730,11 +1849,10 @@ class SpotifyMod(loader.Module): @error_handler @tokenized @loader.command( - ru_doc="| .sq - 🔍 Поиск треков.", - alias="sq" + ru_doc="- 🔍 Поиск треков." ) - async def ssearchcmd(self, message: Message): - """| .sq - 🔍 Search for tracks.""" + async def sqcmd(self, message: Message): + """- 🔍 Search for tracks.""" args = utils.get_args_raw(message) if not args: await utils.answer(message, self.strings["no_search_query"]) @@ -1800,6 +1918,13 @@ class SpotifyMod(loader.Module): ), ) + @error_handler + @tokenized + @loader.command(ru_doc="- 🔍 Поиск треков.") + async def ssearchcmd(self, message: Message): + """- 🔍 Search for tracks.""" + await self.sqcmd(message) + async def watcher(self, message: Message): """Watcher is used to update token""" if not self.sp: @@ -1855,4 +1980,5 @@ class SpotifyMod(loader.Module): refresh_token = await self.invoke("stokrefresh", "", self.inline.bot.id) await refresh_token.delete() else: - self.set("NextRefresh", time.time() + 300) \ No newline at end of file + self.set("NextRefresh", time.time() + 300) +# слендермен