diff --git a/coddrago/modules/YaMusic.py b/coddrago/modules/YaMusic.py
index 036e671..01becc2 100644
--- a/coddrago/modules/YaMusic.py
+++ b/coddrago/modules/YaMusic.py
@@ -26,7 +26,6 @@ from .. import loader, utils
logger = logging.getLogger(__name__)
-
class Banners:
def __init__(
self,
@@ -324,6 +323,16 @@ class YaMusicMod(loader.Module):
"name": "YaMusic"
}
+ duration_placeholder = {
+ "start_duration": "☀️☀️",
+ "start_full_duration": "☀️☀️",
+ "closed_duration": "☀️",
+ "empty_mid": "☀️",
+ "empty_closed_duration_duration": "☀️",
+ "end_duration_full": "☀️",
+ "empty_closed_duration": "☀️",
+ }
+
def __init__(self):
self.config = loader.ModuleConfig(
loader.ConfigValue(
@@ -553,67 +562,19 @@ class YaMusicMod(loader.Module):
return "0%"
percent = (progress / duration) * 100
+ fill_logic = int(percent // 16.66)
- s_less_10 = (
- "➖"
- "⭐"
- "⭐"
- "⭐"
- "⭐"
- "⭐"
- )
-
- s_10_to_20 = (
- "➖"
- "⭐"
- "⭐"
- "⭐"
- "⭐"
- "⭐"
- )
-
- s_30_to_40 = (
- "➖"
- "➖"
- "➖"
- "⭐"
- "⭐"
- "⭐"
- )
-
- s_over_50 = (
- "➖"
- "➖"
- "➖"
- "➖"
- "⭐"
- "⭐"
- )
-
- s_over_80 = (
- "➖"
- "➖"
- "➖"
- "➖"
- "➖"
- "⭐"
- )
-
- if percent < 10:
- return s_less_10
- elif percent < 20:
- return s_10_to_20
- elif percent < 30:
- return s_10_to_20
- elif percent < 40:
- return s_30_to_40
- elif percent < 50:
- return s_30_to_40
- elif percent < 80:
- return s_over_50
+ bar = self.duration_placeholder["start_full_duration"] if fill_logic >= 1 else self.duration_placeholder["start_duration"]
+ for i in range(2, 6):
+ if fill_logic >= i:
+ bar += self.duration_placeholder["closed_duration"]
+ else:
+ bar += self.duration_placeholder["empty_mid"]
+ if fill_logic >= 6:
+ bar += self.duration_placeholder["end_duration_full"]
else:
- return s_over_80
-
+ bar += self.duration_placeholder["empty_closed_duration"]
+ return bar
except Exception as e:
return f"Error: {e}"
diff --git a/radiocycle/Modules/SpotifyMod.py b/radiocycle/Modules/SpotifyMod.py
index af176c6..cf2a30e 100644
--- a/radiocycle/Modules/SpotifyMod.py
+++ b/radiocycle/Modules/SpotifyMod.py
@@ -17,10 +17,10 @@
# =======================================
#
# meta developer: @ke_mods
-# requires: telethon spotipy pillow requests yt-dlp curl_cffi
+# requires: telethon spotipy pillow requests httpx
# scope: ffmpeg
-__version__ = (1, 0)
+__version__ = (1, 0, 2)
import asyncio
import contextlib
@@ -35,6 +35,7 @@ import os
from types import FunctionType
import random
+import httpx
import requests
import spotipy
from PIL import Image, ImageDraw, ImageEnhance, ImageFilter, ImageFont, ImageOps
@@ -48,6 +49,14 @@ from .. import loader, utils
logger = logging.getLogger(__name__)
logging.getLogger("spotipy").setLevel(logging.CRITICAL)
+headers = {
+ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36",
+ "Accept": "application/json",
+ "Content-Type": "application/json",
+ "Origin": "https://spotmate.online",
+ "Referer": "https://spotmate.online/en1",
+}
+
class Banners:
def __init__(
self,
@@ -517,7 +526,6 @@ class SpotifyMod(loader.Module):
"autobio": (
"🎧 Spotify autobio {}"
),
- "no_ytdlp": "❌ yt-dlp not found... Check config or install yt-dlp ({}terminal pip install yt-dlp)",
"snowt_failed": "\n\n❌ Download failed",
"uploading_banner": "\n\n🕔 Uploading banner...",
"downloading_track": "\n\n🕔 Downloading track...",
@@ -531,6 +539,12 @@ class SpotifyMod(loader.Module):
"playlist_deleted": "✅ Playlist {} deleted.",
"no_playlist_name": "❌ Please specify a playlist name.",
"device_select": "📄 Select playback device:",
+ "on-shuffle": (
+ "🔀 Shuffle enabled."
+ ),
+ "off-shuffle": (
+ "🔀 Shuffle disabled."
+ ),
}
strings_ru = {
@@ -641,7 +655,6 @@ class SpotifyMod(loader.Module):
"🎧 Обновление био"
" включено {}"
),
- "no_ytdlp": "❌ yt-dlp не найден... Проверьте конфиг или установите yt-dlp ({}terminal pip install yt-dlp)",
"snowt_failed": "\n\n❌ Ошибка скачивания.",
"uploading_banner": "\n\n🕔 Загрузка баннера...",
"downloading_track": "\n\n🕔 Скачивание трека...",
@@ -655,6 +668,12 @@ class SpotifyMod(loader.Module):
"playlist_deleted": "✅ Плейлист {} удален.",
"no_playlist_name": "❌ Пожалуйста, укажите название плейлиста.",
"device_select": "📄 Выберите устройство для воспроизведения:",
+ "on-shuffle": (
+ "🔀 Перемешивание включено."
+ ),
+ "off-shuffle": (
+ "🔀 Перемешивание отключено."
+ ),
}
def __init__(self):
@@ -700,16 +719,10 @@ class SpotifyMod(loader.Module):
lambda: "Template for Spotify AutoBio, supports {artist}, {title}",
),
loader.ConfigValue(
- "ytdlp_path",
- "",
- "Path to ytdlp binary",
- validator=loader.validators.String(),
- ),
- loader.ConfigValue(
- "cookies_path",
- "",
- "Path to your cookies for yt-dlp",
- validator=loader.validators.String(),
+ "TimeOut",
+ 60,
+ "Response timeout in seconds | Время ожидания ответа в секундах",
+ validator=loader.validators.Integer(minimum=30),
),
loader.ConfigValue(
"banner_version",
@@ -944,36 +957,102 @@ class SpotifyMod(loader.Module):
success = False
try:
- squery = query.replace('"', '').replace("'", "")
- cookies = self.config["cookies_path"]
- ytdlp_flags = '-x --audio-format mp3 --audio-quality 0 --add-metadata --format "bestaudio/best" --no-playlist'
- cookies_flag = f"--cookies {cookies} " if cookies else ""
- cmd = (
- f'{self.config["ytdlp_path"]} {ytdlp_flags} {cookies_flag}'
- f'-o "{dl_dir}/%(title)s [%(id)s].%(ext)s" '
- f'"ytsearch1:{squery}"'
- )
+ track_url = (query or "").strip().split("?")[0]
+ if "spotify:track:" in track_url:
+ track_url = f"https://open.spotify.com/track/{track_url.split(':')[-1]}"
- proc = await asyncio.create_subprocess_shell(
- cmd,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- _, stderr = await proc.communicate()
+ if "track/" not in track_url:
+ results = await asyncio.to_thread(
+ self.sp.search,
+ q=query,
+ limit=1,
+ type="track",
+ )
+ items = (results or {}).get("tracks", {}).get("items", [])
+ if not items:
+ logger.error("SpotifyMod: Spotify track not found for %r", log_context or query)
+ await send_text(self.strings["snowt_failed"])
+ return False
- if proc.returncode:
- err_text = stderr.decode(errors="ignore").strip() if stderr else "yt-dlp failed"
- logger.error("SpotifyMod: yt-dlp code %s for %r: %s", proc.returncode, log_context or query, err_text[-400:])
+ track_data = items[0]
+ track_url = track_data.get("external_urls", {}).get("spotify") or f"https://open.spotify.com/track/{track_data['id']}"
- files = [f for f in os.listdir(dl_dir) if f.endswith(".mp3")]
- if files:
- success = await send_file(os.path.join(dl_dir, files[0]))
+ async with httpx.AsyncClient(follow_redirects=True) as client:
+ csrf = await self.get_session(client)
+ hdrs = {**headers, "X-CSRF-TOKEN": csrf}
+
+ info_res = await client.post(
+ "https://spotmate.online/getTrackData",
+ headers=hdrs,
+ json={"spotify_url": track_url},
+ timeout=self.config["TimeOut"],
+ )
+ info = info_res.json()
+ if info.get("type") != "track":
+ logger.error("SpotifyMod: spotmate returned no track for %r", log_context or query)
+ await send_text(self.strings["snowt_failed"])
+ return False
+
+ track_id = info.get("id", track_url.split("/")[-1])
+ conv_res = await client.post(
+ "https://spotmate.online/convert",
+ headers=hdrs,
+ json={"urls": track_url},
+ timeout=self.config["TimeOut"],
+ )
+ conv = conv_res.json()
+ download_url = conv.get("url") or conv.get("download_url")
+ task_id = conv.get("task_id") or conv.get("taskId")
+
+ if not download_url and task_id:
+ for _ in range(40):
+ await asyncio.sleep(4.5)
+ task_res = await client.get(
+ f"https://spotmate.online/tasks/{task_id}",
+ headers={**hdrs, "Accept": "application/json"},
+ timeout=self.config["TimeOut"],
+ )
+ task = task_res.json()
+ if task.get("error"):
+ logger.error("SpotifyMod: task error for %r", log_context or query)
+ await send_text(self.strings["dl_err"])
+ return False
+
+ data = task.get("data") or task.get("result") or {}
+ status = str(data.get("status") or data.get("state") or "").lower()
+ if status == "finished":
+ download_url = (
+ data.get("url")
+ or data.get("download_url")
+ or (data.get("result") or {}).get("url")
+ or (data.get("result") or {}).get("download_url")
+ )
+ break
+
+ if status in ("failed", "error", "expired", "cancelled"):
+ logger.error("SpotifyMod: task failed for %r", log_context or query)
+ await send_text(self.strings["dl_err"])
+ return False
+
+ if not download_url:
+ logger.error("SpotifyMod: download timeout for %r", log_context or query)
+ await send_text(self.strings["snowt_failed"])
+ return False
+
+ file_res = await client.get(
+ download_url,
+ headers={"User-Agent": headers["User-Agent"], "Referer": "https://spotmate.online/en1"},
+ timeout=self.config["TimeOut"],
+ )
+
+ file_path = os.path.join(dl_dir, f"{track_id}.mp3")
+ with open(file_path, "wb") as f:
+ f.write(file_res.content)
+
+ success = await send_file(file_path)
if not success:
logger.error("SpotifyMod: failed to send %r (target=%s)", log_context or query, type(target).__name__)
await send_text(self.strings["dl_err"])
- else:
- logger.error("SpotifyMod: yt-dlp produced no mp3 for %r", log_context or query)
- await send_text(self.strings["snowt_failed"])
except Exception as e:
logger.error("Download track error (%s): %s", log_context or "no context", e, exc_info=True)
@@ -986,6 +1065,20 @@ class SpotifyMod(loader.Module):
return success
+ async def get_session(self, client: httpx.AsyncClient) -> str:
+ res = await client.get(
+ "https://spotmate.online/en1",
+ headers={
+ "User-Agent": headers["User-Agent"],
+ "Accept": "text/html",
+ },
+ timeout=self.config["TimeOut"],
+ )
+ match = re.search(r'csrf-token[^>]*content="([^"]+)"', res.text)
+ if not match:
+ raise ValueError("CSRF token not found")
+ return match.group(1)
+
def _short_text(self, text: str, limit: int = 60) -> str:
text = " ".join(text.split())
if len(text) <= limit:
@@ -1126,7 +1219,13 @@ class SpotifyMod(loader.Module):
tracks = results["tracks"]["items"]
store_id = id(tracks)
- self._sp_store[store_id] = [(t.get("name", "Unknown"), ", ".join(a.get("name", "") for a in t.get("artists", []) if a.get("name")) or "Unknown Artist") for t in tracks]
+ self._sp_store[store_id] = [
+ (
+ t.get("name", "Unknown"),
+ ", ".join(a.get("name", "") for a in t.get("artists", []) if a.get("name")) or "Unknown Artist",
+ )
+ for t in tracks
+ ]
entries = []
for i, track in enumerate(tracks):
@@ -1154,7 +1253,7 @@ class SpotifyMod(loader.Module):
async def ssearch(self, query):
""" - search Spotify track"""
return await self._inline_search_tracks(query)
-
+
@error_handler
@tokenized
@loader.command(
@@ -1427,6 +1526,26 @@ class SpotifyMod(loader.Module):
self.sp.repeat("context")
await utils.answer(message, self.strings["off-repeat"])
+ @error_handler
+ @tokenized
+ @loader.command(
+ ru_doc="- 🔀 Включить перемешивание"
+ )
+ async def sshufflecmd(self, message: Message):
+ """- 🔀 Enable shuffle"""
+ self.sp.shuffle(True)
+ await utils.answer(message, self.strings["on-shuffle"])
+
+ @error_handler
+ @tokenized
+ @loader.command(
+ ru_doc="- 🔀 Отключить перемешивание"
+ )
+ async def sdeshufflecmd(self, message: Message):
+ """- 🔀 Disable shuffle"""
+ self.sp.shuffle(False)
+ await utils.answer(message, self.strings["off-shuffle"])
+
@error_handler
@tokenized
@loader.command(
@@ -1730,11 +1849,10 @@ class SpotifyMod(loader.Module):
@error_handler
@tokenized
@loader.command(
- ru_doc="| .sq - 🔍 Поиск треков.",
- alias="sq"
+ ru_doc="- 🔍 Поиск треков."
)
- async def ssearchcmd(self, message: Message):
- """| .sq - 🔍 Search for tracks."""
+ async def sqcmd(self, message: Message):
+ """- 🔍 Search for tracks."""
args = utils.get_args_raw(message)
if not args:
await utils.answer(message, self.strings["no_search_query"])
@@ -1800,6 +1918,13 @@ class SpotifyMod(loader.Module):
),
)
+ @error_handler
+ @tokenized
+ @loader.command(ru_doc="- 🔍 Поиск треков.")
+ async def ssearchcmd(self, message: Message):
+ """- 🔍 Search for tracks."""
+ await self.sqcmd(message)
+
async def watcher(self, message: Message):
"""Watcher is used to update token"""
if not self.sp:
@@ -1855,4 +1980,5 @@ class SpotifyMod(loader.Module):
refresh_token = await self.invoke("stokrefresh", "", self.inline.bot.id)
await refresh_token.delete()
else:
- self.set("NextRefresh", time.time() + 300)
\ No newline at end of file
+ self.set("NextRefresh", time.time() + 300)
+# слендермен