__version__ = (1, 0, 3)
# █▄▀ ▄▀█ █▀▄▀█ █▀▀ █▄▀ █ █ █▀█ █▀█
# █ █ █▀█ █ ▀ █ ██▄ █ █ ▀▄▄▀ █▀▄ █▄█ ▄
# © Copyright 2025
# ✈ https://t.me/kamekuro
# 🔒 Licensed under CC-BY-NC-ND 4.0 unless otherwise specified.
# 🌐 https://creativecommons.org/licenses/by-nc-nd/4.0
# + attribution
# + non-commercial
# + no-derivatives
# You CANNOT edit, distribute or redistribute this file without direct permission from the author.
# meta banner: https://raw.githubusercontent.com/kamekuro/hikka-mods/main/banners/yamusic.png
# meta pic: https://raw.githubusercontent.com/kamekuro/hikka-mods/main/icons/yamusic.png
# meta developer: @kamekuro_hmods
# scope: hikka_only
# scope: hikka_min 1.6.3
# requires: aiohttp asyncio requests pillow==11.2.1 git+https://github.com/MarshalX/yandex-music-api
import aiohttp
import asyncio
import io
import json
import logging
import random
import requests
import string
import yandex_music
import telethon
import textwrap
from PIL import (
Image, ImageDraw, ImageEnhance,
ImageFilter, ImageFont
)
import yandex_music.exceptions
from .. import loader, utils
logger = logging.getLogger(__name__)
@loader.tds
class YaMusicMod(loader.Module):
"""The module for Yandex.Music streaming service"""
strings = {
"name": "YaMusic",
"queue_types": {
"VARIOUS": "Your queue",
"RADIO": "«My Wave»",
"PLAYLIST": "Playlist «{}»",
"ALBUM": "Album «{}»"
},
"guide": (
"📜 Guide for obtaining a Yandex.Music token"
),
"no_token": (
"❌ You didn't specify "
"the access token in the config!"
),
"autobio_e": "🎧 Autobio is on now",
"autobio_d": "🎧 Autobio is off now",
"there_is_no_playing": (
"❌ You don't "
"listening to anything right now."
),
"now": (
"🎧 {performer} — {title}\n\n"
"⌨️ Now is listening on {device} "
"(🔊 {volume}%)\n"
"🗂 Playing from: {playing_from}\n\n"
"🎵 Yandex.Music | "
"song.link"
),
"downloading": "\n\n🕔 Downloading audio…",
"downloading_banner": "\n\n🕔 Downloading banner…",
"likes": {
"liked": (
"❤️ Track "
"{track} "
"was successfully liked"
),
"unliked": (
"❤️ Track "
"{track} "
"was successfully unliked"
),
"disliked": (
"💔 Track "
"{track} "
"was successfully disliked"
)
},
"lyrics": (
"📜 Lyrics of the {track} track:\n"
"
{text}
\n\n"
"©️ Writers: {writers}"
),
"no_lyrics": (
"❌ Track "
"{track} "
"has no lyrics!"
),
"search": (
"🎧 {performer} — {title}\n"
"🎵 Yandex.Music | "
"song.link"
),
"args": "❌ Specify search query",
"404": "❌ No results found",
"searching": "🔍 Searching…",
"_cfg_token": "Your access token of Yandex.Music",
"_cfg_autobio": "Automatic bio template (may contain {artist} and {title})",
"_cfg_no_playing_bio": "Bio that is set when nothing is playing"
}
strings_ru = {
"_cls_doc": "Модуль для стримингового сервиса Яндекс.Музыка",
"queue_types": {
"VARIOUS": "Ваша очередь",
"RADIO": "«Моя Волна»",
"PLAYLIST": "Плейлист «{}»",
"ALBUM": "Альбом «{}»"
},
"guide": (
"📜 Гайд по получению токена Яндекс.Музыки"
),
"no_token": (
"❌ Ты не "
"указал токен Яндекс.Музыки в конфиге!"
),
"autobio_e": "🎧 Автобио включено",
"autobio_d": "🎧 Автобио выключено",
"there_is_no_playing": (
"❌ Ты ничего "
"не слушаешь сейчас."
),
"now": (
"🎧 {performer} — {title}\n\n"
"⌨️ Сейчас слушаю на {device} "
" (🔊 {volume}%)\n"
"🗂 Откуда играет: {playing_from}\n\n"
"🎵 Яндекс.Музыка | "
"song.link"
),
"downloading": "\n\n🕔 Загрузка трека…",
"downloading_banner": "\n\n🕔 Загрузка баннера…",
"likes": {
"liked": (
"❤️ Лайкнул трек "
"{track}"
),
"unliked": (
"❤️ Убрал лайк с трека "
"{track}"
),
"disliked": (
"💔 Дизлайкнул трек "
"{track}"
)
},
"lyrics": (
"📜 Текст трека "
"{track}:\n"
"{text}
\n\n"
"©️ Авторы: {writers}"
),
"no_lyrics": (
"❌ У трека "
"{track} "
"нет текста!"
),
"search": (
"🎧 {performer} — {title}\n"
"🎵 Яндекс.Музыка | "
"song.link"
),
"args": "❌ Укажите поисковый запрос",
"404": "❌ Ничего не найдено",
"searching": "🔍 Ищем…",
"_cfg_token": "Твой токен от Яндекс.Музыки",
"_cfg_autobio": "Шаблон автоматического био (может содержать {artist} и {title})",
"_cfg_no_playing_bio": "Био, которое ставится, когда ничего не играет"
}
def __init__(self):
self.config = loader.ModuleConfig(
loader.ConfigValue(
"token",
None,
lambda: self.strings["_cfg_token"],
validator=loader.validators.Hidden()
),
loader.ConfigValue(
"autobio",
"🎧 {artist} - {title}",
lambda: self.strings["_cfg_autobio"],
validator=loader.validators.String()
),
loader.ConfigValue(
"no_playing_bio",
"Hello!",
lambda: self.strings["_cfg_no_playing_bio"],
validator=loader.validators.String()
)
)
async def on_dlmod(self):
if not self.get("guide_send", False):
await self.inline.bot.send_message(
self._tg_id,
self.strings("guide").replace("📜", "📜"),
)
self.set("guide_send", True)
async def client_ready(self, client, db):
self._client = client
self._db = db
me = await self._client.get_me()
self._premium = me.premium if hasattr(me, "premium") else False
self.premium_check.start()
if self.get("autobio", False):
self.autobio.start()
@loader.loop(1800)
async def premium_check(self):
me = await self._client.get_me()
self._premium = me.premium if hasattr(me, "premium") else False
@loader.loop(30)
async def autobio(self):
if not self.config['token']:
self.autobio.stop(); self.set("autobio", False)
return
client = yandex_music.Client(self.config['token']).init()
now = await self.__get_now_playing(self.config['token'], client)
out = self.config['no_playing_bio'][:(140 if self._premium else 70)]
if now and (not now['paused']):
out = self.config['autobio'].format(
title=now['track']['title'],
artist=", ".join(now['track']['artist'])
)[:(140 if self._premium else 70)]
try:
await self._client(
telethon.functions.account.UpdateProfileRequest(about=out)
)
except telethon.errors.rpcerrorlist.FloodWaitError as e:
logger.info(f"Sleeping {max(e.seconds, 60)} because of floodwait")
await asyncio.sleep(max(e.seconds, 60))
@loader.command(
ru_doc="👉 Гайд по получению токена Яндекс.Музыки",
alias="yg"
)
async def yguidecmd(self, message: telethon.types.Message):
"""👉 Guide for obtaining a Yandex.Music token"""
await utils.answer(message, self.strings("guide"))
@loader.command(
ru_doc="👉 Включить/выключить автобио",
alias="yb"
)
async def ybiocmd(self, message: telethon.types.Message):
"""👉 Enable/disable autobio"""
if not self.config['token']:
return await utils.answer(message, self.strings("no_token"))
bio_now = self.get("autobio", False)
self.set("autobio", not bio_now)
if (not bio_now):
self.autobio.start()
else:
self.autobio.stop()
try:
await self._client(
telethon.functions.account.UpdateProfileRequest(
about=self.config['no_playing_bio'][:(140 if self._premium else 70)]
)
)
except: pass
await utils.answer(
message,
self.strings(f"autobio_{'e' if (not bio_now) else 'd'}")
)
@loader.command(
ru_doc="👉 Получить трек, который играет сейчас",
alias="yn"
)
async def ynowcmd(self, message: telethon.types.Message):
"""👉 Get now playing track"""
if not self.config['token']:
return await utils.answer(message, self.strings("no_token"))
client = yandex_music.Client(self.config['token']).init()
now = await self.__get_now_playing(self.config['token'], client)
if not now:
return await utils.answer(message, self.strings("there_is_no_playing"))
playlist_name = ""
if now['entity_type'] in ["PLAYLIST", "ALBUM"]:
func = getattr(
client,
"playlists_list" if now['entity_type'] == "PLAYLIST" else "albums"
)
if func:
entity = func(now['entity_id'])[0]
playlist_name = f"{entity.title}"
else:
now['entity_type'] = "RADIO"
device_eid, device, volume = "6039404727542747508", "Unknown Device", "❓"
if now['device']:
device=now['device']['info']['title']
volume=round(now['device']['volume']*100, 2)
if now['device']['info']['type'] == "ANDROID": device_eid = "5373266788970670174"
if now['device']['info']['type'] == "IOS": device_eid = "5372908412604525258"
out = self.strings("now").format(
title=now['track']['title'],
performer=", ".join(now['track']['artist']),
device=device, volume=volume, device_eid=device_eid,
playing_from=self.strings("queue_types").get(now['entity_type'], "VARIOUS").format(playlist_name),
track_id=now['track']['track_id'],
album_id=now['track']['album_id']
)
await utils.answer(
message, out+self.strings("downloading")
)
audio = io.BytesIO((await utils.run_sync(requests.get, now['track']['download_link'])).content)
audio.name = "audio.mp3"
await utils.answer(
message=message, response=out,
file=audio,
attributes=([
telethon.types.DocumentAttributeAudio(
duration=now['track']['duration'],
title=now['track']['title'],
performer=", ".join(now['track']['artist'])
)
])
)
@loader.command(
ru_doc="👉 Получить баннер трека, который играет сейчас",
alias="ynb"
)
async def ynowbcmd(self, message: telethon.types.Message):
"""👉 Get now playing track's banner"""
if not self.config['token']:
return await utils.answer(message, self.strings("no_token"))
client = yandex_music.Client(self.config['token']).init()
now = await self.__get_now_playing(self.config['token'], client)
if not now:
return await utils.answer(message, self.strings("there_is_no_playing"))
playlist_name = ""
if now['entity_type'] in ["PLAYLIST", "ALBUM"]:
func = getattr(
client,
"playlists_list" if now['entity_type'] == "PLAYLIST" else "albums"
)
if func:
entity = func(now['entity_id'])[0]
playlist_name = f"{entity.title}"
else:
now['entity_type'] = "RADIO"
device_eid, device, volume = "6039404727542747508", "Unknown Device", "❓"
if now['device']:
device=now['device']['info']['title']
volume=round(now['device']['volume']*100, 2)
if now['device']['info']['type'] == "ANDROID": device_eid = "5373266788970670174"
if now['device']['info']['type'] == "IOS": device_eid = "5372908412604525258"
out = self.strings("now").format(
title=now['track']['title'],
performer=", ".join(now['track']['artist']),
device=device, volume=volume, device_eid=device_eid,
playing_from=self.strings("queue_types").get(now['entity_type'], "VARIOUS").format(playlist_name),
track_id=now['track']['track_id'],
album_id=now['track']['album_id']
)
await utils.answer(
message, out+self.strings("downloading_banner")
)
file = self.__create_banner(
now['track']['title'], now['track']['artist'],
now['duration_ms'], now['progress_ms'],
requests.get(now['track']['img']).content
)
await utils.answer(
message=message, response=out, file=file
)
@loader.command(
ru_doc="👉 Лайкнуть играющий сейчас трек"
)
async def ylikecmd(self, message: telethon.types.Message):
"""👉 Like now playing track's banner"""
if not self.config['token']:
return await utils.answer(message, self.strings("no_token"))
client = yandex_music.Client(self.config['token']).init()
now = await self.__get_now_playing(self.config['token'], client)
if not now:
return await utils.answer(message, self.strings("there_is_no_playing"))
client.users_likes_tracks_add(now['track']['track_id'])
await utils.answer(
message, self.strings("likes")['liked'].format(
track_id=now['track']['track_id'], album_id=now['track']['album_id'],
track=f"{', '.join(now['track']['artist'])} — {now['track']['title']}"
)
)
@loader.command(
ru_doc="👉 Убрать лайк с играющего сейчас трека"
)
async def yunlikecmd(self, message: telethon.types.Message):
"""👉 Unlike now playing track"""
if not self.config['token']:
return await utils.answer(message, self.strings("no_token"))
client = yandex_music.Client(self.config['token']).init()
now = await self.__get_now_playing(self.config['token'], client)
if not now:
return await utils.answer(message, self.strings("there_is_no_playing"))
client.users_likes_tracks_remove(now['track']['track_id'])
await utils.answer(
message, self.strings("likes")['unliked'].format(
track_id=now['track']['track_id'], album_id=now['track']['album_id'],
track=f"{', '.join(now['track']['artist'])} — {now['track']['title']}"
)
)
@loader.command(
ru_doc="👉 Дизлайкнуть играющий сейчас трек",
alias="ydis"
)
async def ydislikecmd(self, message: telethon.types.Message):
"""👉 Dislike now playing track"""
if not self.config['token']:
return await utils.answer(message, self.strings("no_token"))
client = yandex_music.Client(self.config['token']).init()
now = await self.__get_now_playing(self.config['token'], client)
if not now:
return await utils.answer(message, self.strings("there_is_no_playing"))
client.users_dislikes_tracks_add(now['track']['track_id'])
await utils.answer(
message, self.strings("likes")['disliked'].format(
track_id=now['track']['track_id'], album_id=now['track']['album_id'],
track=f"{', '.join(now['track']['artist'])} — {now['track']['title']}"
)
)
@loader.command(
ru_doc="👉 Получить текст играющего сейчас трека"
)
async def ylyricscmd(self, message: telethon.types.Message):
"""👉 Get lyrics of the now playing track"""
if not self.config['token']:
return await utils.answer(message, self.strings("no_token"))
client = yandex_music.Client(self.config['token']).init()
now = await self.__get_now_playing(self.config['token'], client)
if not now:
return await utils.answer(message, self.strings("there_is_no_playing"))
try:
lyrics = client.tracks_lyrics(now['track']['track_id'])
await utils.answer(
message, self.strings("lyrics").format(
track_id=now['track']['track_id'], album_id=now['track']['album_id'],
track=f"{', '.join(now['track']['artist'])} — {now['track']['title']}",
text=requests.get(lyrics.download_url).text,
writers=", ".join(lyrics.writers)
)
)
except yandex_music.exceptions.NotFoundError:
await utils.answer(
message, self.strings("no_lyrics").format(
track_id=now['track']['track_id'], album_id=now['track']['album_id'],
track=f"{', '.join(now['track']['artist'])} — {now['track']['title']}"
)
)
@loader.command(
ru_doc="<запрос> 👉 Поиск трека в Яндекс.Музыке",
alias="yq"
)
async def ysearchcmd(self, message: telethon.types.Message):
""" 👉 Search track in Yandex.Music"""
if not self.config['token']:
return await utils.answer(message, self.strings("no_token"))
client = yandex_music.Client(self.config['token']).init()
query = utils.get_args_raw(message)
if not query:
await utils.answer(message, self.strings("args"))
return
message = await utils.answer(message, self.strings("searching"))
search = client.search(query, type_="track")
if (not search.tracks) or (len(search.tracks.results) == 0):
return await utils.answer(message, self.strings("404"))
out = self.strings("search").format(
title=search.tracks.results[0].title + (
f" ({search.tracks.results[0].version})" if search.tracks.results[0].version else ""
),
performer=", ".join([x.name for x in search.tracks.results[0].artists]),
album_id=search.tracks.results[0].albums[0].id, track_id=search.tracks.results[0].id
)
message = await utils.answer(message, out+self.strings("downloading"))
info = client.tracks_download_info(search.tracks.results[0].id, True)
link = info[0].direct_link
audio = None
audio = io.BytesIO((await utils.run_sync(requests.get, link)).content)
audio.name = "audio.mp3"
await utils.answer(
message=message, response=out,
file=audio,
attributes=([
telethon.types.DocumentAttributeAudio(
duration=int(search.tracks.results[0].duration_ms / 1000),
title=search.tracks.results[0].title,
performer=", ".join([x.name for x in search.tracks.results[0].artists])
)
])
)
# Original code: https://raw.githubusercontent.com/MIPOHBOPOHIH/YMMBFA/main/main.py
async def __create_ynison_ws(self, yamusic_token: str, ws_proto: dict) -> dict:
async with aiohttp.ClientSession() as session:
async with session.ws_connect(
"wss://ynison.music.yandex.ru/redirector.YnisonRedirectService/GetRedirectToYnison",
headers={
"Sec-WebSocket-Protocol": f"Bearer, v2, {json.dumps(ws_proto)}",
"Origin": "http://music.yandex.ru",
"Authorization": f"OAuth {yamusic_token}",
},
) as ws:
response = await ws.receive()
return json.loads(response.data)
# Original code: https://raw.githubusercontent.com/MIPOHBOPOHIH/YMMBFA/main/main.py
async def __get_now_playing(self, yamusic_token: str, client: yandex_music.Client):
device_id = ''.join(random.choices(string.ascii_lowercase, k=16))
ws_proto = {
"Ynison-Device-Id": device_id,
"Ynison-Device-Info": json.dumps({"app_name": "Chrome", "type": 1}),
}
data = await self.__create_ynison_ws(yamusic_token, ws_proto)
ws_proto["Ynison-Redirect-Ticket"] = data["redirect_ticket"]
payload = {
"update_full_state": {
"player_state": {
"player_queue": {
"current_playable_index": -1,
"entity_id": "",
"entity_type": "VARIOUS",
"playable_list": [],
"options": {"repeat_mode": "NONE"},
"entity_context": "BASED_ON_ENTITY_BY_DEFAULT",
"version": {"device_id": device_id, "version": 9021243204784341000, "timestamp_ms": 0},
"from_optional": "",
},
"status": {
"duration_ms": 0,
"paused": True,
"playback_speed": 1,
"progress_ms": 0,
"version": {"device_id": device_id, "version": 8321822175199937000, "timestamp_ms": 0},
},
},
"device": {
"capabilities": {"can_be_player": True, "can_be_remote_controller": False, "volume_granularity": 16},
"info": {
"device_id": device_id,
"type": "WEB",
"title": "Chrome Browser",
"app_name": "Chrome",
},
"volume_info": {"volume": 0},
"is_shadow": True,
},
"is_currently_active": False,
},
"rid": "ac281c26-a047-4419-ad00-e4fbfda1cba3",
"player_action_timestamp_ms": 0,
"activity_interception_type": "DO_NOT_INTERCEPT_BY_DEFAULT",
}
async with aiohttp.ClientSession() as session:
async with session.ws_connect(
f"wss://{data['host']}/ynison_state.YnisonStateService/PutYnisonState",
headers={
"Sec-WebSocket-Protocol": f"Bearer, v2, {json.dumps(ws_proto)}",
"Origin": "http://music.yandex.ru",
"Authorization": f"OAuth {yamusic_token}",
}
) as ws:
await ws.send_str(json.dumps(payload))
response = await ws.receive()
ynison: dict = json.loads(response.data)
if len(ynison.get("player_state", {}).get("player_queue", {}).get("playable_list", [])) == 0:
return {}
raw_track = ynison["player_state"]["player_queue"]["playable_list"][
ynison["player_state"]["player_queue"]["current_playable_index"]
]
track = client.tracks(raw_track["playable_id"])[0]
device = [
x for x in ynison['devices'] if x['info']['device_id'] == ynison.get('active_device_id_optional', "")
]
return {
"paused": ynison["player_state"]["status"]["paused"],
"duration_ms": int(ynison["player_state"]["status"]["duration_ms"]),
"progress_ms": int(ynison["player_state"]["status"]["progress_ms"]),
"entity_id": ynison["player_state"]["player_queue"]["entity_id"],
"entity_type": ynison["player_state"]["player_queue"]["entity_type"],
"device": device[0] if len(device) > 0 else None,
"track": {
"track_id": int(track.track_id.split(":")[0]) if track.track_id.split(":")[0].isdigit() else track.track_id,
"album_id": track.albums[0].id,
"title": track.title,
"artist": track.artists_name(),
"img": f"https://{track.cover_uri[:-2]}1000x1000",
"duration": track.duration_ms // 1000,
"minutes": round(track.duration_ms / 1000) // 60,
"seconds": round(track.duration_ms / 1000) % 60,
"download_link": track.get_download_info(get_direct_links=True)[0].direct_link
}
} if raw_track['playable_type'] != "LOCAL_TRACK" else {}
def __create_banner(
self,
title: str, artists: list,
duration: int, progress: int,
track_cover: bytes
):
w, h = 1920, 768
title_font = ImageFont.truetype(io.BytesIO(requests.get(
"https://raw.githubusercontent.com/kamekuro/assets/master/fonts/Onest-Bold.ttf"
).content), 80)
art_font = ImageFont.truetype(io.BytesIO(requests.get(
"https://raw.githubusercontent.com/kamekuro/assets/master/fonts/Onest-Regular.ttf"
).content), 55)
time_font = ImageFont.truetype(io.BytesIO(requests.get(
"https://raw.githubusercontent.com/kamekuro/assets/master/fonts/Onest-Bold.ttf"
).content), 36)
# Gen banner (bg)
track_cov = Image.open(io.BytesIO(track_cover)).convert("RGBA")
banner = track_cov.resize((w, w)).crop(
(0, (w-h)//2, w, ((w-h)//2)+h)
).filter(ImageFilter.GaussianBlur(radius=14))
banner = ImageEnhance.Brightness(banner).enhance(0.3)
# Gen track cover and put to bg
track_cov = track_cov.resize((banner.size[1]-150, banner.size[1]-150))
mask = Image.new("L", track_cov.size, 0)
ImageDraw.Draw(mask).rounded_rectangle((0, 0, track_cov.size[0], track_cov.size[1]), radius=35, fill=255)
track_cov.putalpha(mask)
track_cov = track_cov.crop(track_cov.getbbox())
banner.paste(track_cov, (75, 75), mask)
# Editing text
title_lines = textwrap.wrap(title, 23)
if len(title_lines) > 1:
title_lines[1] = title_lines[1] + "..." if len(title_lines) > 2 else title_lines[1]
title_lines = title_lines[:2]
artists_lines = textwrap.wrap(" • ".join(artists), width=40)
if len(artists_lines) > 1:
for index, art in enumerate(artists_lines):
if "•" in art[-2:]:
artists_lines[index] = art[:art.rfind("•") - 1]
# Put title and artists to banner
draw = ImageDraw.Draw(banner)
x, y = 150+track_cov.size[0], 110
for index, line in enumerate(title_lines):
draw.text((x, y), line, font=title_font, fill="#FFFFFF")
if index != len(title_lines)-1:
y += 70
x, y = 150+track_cov.size[0], 110*2
if len(title_lines) > 1: y += 70
for index, line in enumerate(artists_lines):
draw.text((x, y), line, font=art_font, fill="#A0A0A0")
if index != len(artists_lines)-1:
y += 50
# Drawing status bar
draw.rounded_rectangle([768, 650, 768+1072, 650+15], radius=15//2, fill="#A0A0A0")
draw.rounded_rectangle([768, 650, 768+int(1072*(progress/duration)), 650+15], radius=15//2, fill="#FFFFFF")
draw.text((768, 600), f"{(progress//1000//60):02}:{(progress//1000%60):02}", font=time_font, fill="#FFFFFF")
draw.text((1745, 600), f"{(duration//1000//60):02}:{(duration//1000%60):02}", font=time_font, fill="#FFFFFF")
by = io.BytesIO()
banner.save(by, format="PNG"); by.seek(0)
by.name = "banner.png"
return by