Files
limoka/vsecoder/hikka_modules/ymnow.py
2025-07-10 21:02:34 +03:00

456 lines
18 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

__version__ = (3, 1, 1)
"""
_
__ _____ ___ ___ ___ __| | ___ _ __
\ \ / / __|/ _ \/ __/ _ \ / _` |/ _ \ '__|
\ V /\__ \ __/ (_| (_) | (_| | __/ |
\_/ |___/\___|\___\___/ \__,_|\___|_|
Copyleft 2022 t.me/vsecoder
This program is free software; you can redistribute it and/or modify
"""
# meta developer: @vsecoder_m
# requires: yandex-music aiohttp
# meta desc: Module for yandex music. Based on SpotifyNow, YaNow and WakaTime [beta]
# meta pic: https://img.freepik.com/premium-vector/yandex-music-logo_578229-242.jpg
# meta banner: https://chojuu.vercel.app/api/banner?img=https://img.freepik.com/premium-vector/yandex-music-logo_578229-242.jpg&title=YMNow&description=Module%20for%20yandex%20music
import logging
import asyncio
import logging
import aiohttp
import random
import json
import string
from asyncio import sleep
from yandex_music import ClientAsync
from telethon import TelegramClient
from telethon.tl.types import Message
from telethon.errors.rpcerrorlist import FloodWaitError, MessageNotModifiedError
from telethon.tl.functions.account import UpdateProfileRequest
from .. import loader, utils # type: ignore
logger = logging.getLogger(__name__)
logging.getLogger("yandex_music").propagate = False
# https://github.com/FozerG/YandexMusicRPC/blob/main/main.py#L133
async def get_current_track(client, token):
device_info = {
"app_name": "Chrome",
"type": 1,
}
ws_proto = {
"Ynison-Device-Id": "".join(
[random.choice(string.ascii_lowercase) for _ in range(16)]
),
"Ynison-Device-Info": json.dumps(device_info),
}
timeout = aiohttp.ClientTimeout(total=15, connect=10)
try:
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.ws_connect(
url="wss://ynison.music.yandex.ru/redirector.YnisonRedirectService/GetRedirectToYnison",
headers={
"Sec-WebSocket-Protocol": f"Bearer, v2, {json.dumps(ws_proto)}",
"Origin": "http://music.yandex.ru",
"Authorization": f"OAuth {token}",
},
timeout=10,
) as ws:
recv = await ws.receive()
data = json.loads(recv.data)
if "redirect_ticket" not in data or "host" not in data:
print(f"Invalid response structure: {data}")
return {"success": False}
new_ws_proto = ws_proto.copy()
new_ws_proto["Ynison-Redirect-Ticket"] = data["redirect_ticket"]
to_send = {
"update_full_state": {
"player_state": {
"player_queue": {
"current_playable_index": -1,
"entity_id": "",
"entity_type": "VARIOUS",
"playable_list": [],
"options": {"repeat_mode": "NONE"},
"entity_context": "BASED_ON_ENTITY_BY_DEFAULT",
"version": {
"device_id": ws_proto["Ynison-Device-Id"],
"version": 9021243204784341000,
"timestamp_ms": 0,
},
"from_optional": "",
},
"status": {
"duration_ms": 0,
"paused": True,
"playback_speed": 1,
"progress_ms": 0,
"version": {
"device_id": ws_proto["Ynison-Device-Id"],
"version": 8321822175199937000,
"timestamp_ms": 0,
},
},
},
"device": {
"capabilities": {
"can_be_player": True,
"can_be_remote_controller": False,
"volume_granularity": 16,
},
"info": {
"device_id": ws_proto["Ynison-Device-Id"],
"type": "WEB",
"title": "Chrome Browser",
"app_name": "Chrome",
},
"volume_info": {"volume": 0},
"is_shadow": True,
},
"is_currently_active": False,
},
"rid": "ac281c26-a047-4419-ad00-e4fbfda1cba3",
"player_action_timestamp_ms": 0,
"activity_interception_type": "DO_NOT_INTERCEPT_BY_DEFAULT",
}
async with session.ws_connect(
url=f"wss://{data['host']}/ynison_state.YnisonStateService/PutYnisonState",
headers={
"Sec-WebSocket-Protocol": f"Bearer, v2, {json.dumps(new_ws_proto)}",
"Origin": "http://music.yandex.ru",
"Authorization": f"OAuth {token}",
},
timeout=10,
method="GET",
) as ws:
await ws.send_str(json.dumps(to_send))
recv = await asyncio.wait_for(ws.receive(), timeout=10)
ynison = json.loads(recv.data)
track_index = ynison["player_state"]["player_queue"][
"current_playable_index"
]
if track_index == -1:
print("No track is currently playing.")
return {"success": False}
track = ynison["player_state"]["player_queue"]["playable_list"][
track_index
]
await session.close()
info = await client.tracks_download_info(track["playable_id"], True)
track = await client.tracks(track["playable_id"])
return {
"paused": ynison["player_state"]["status"]["paused"],
"duration_ms": ynison["player_state"]["status"]["duration_ms"],
"progress_ms": ynison["player_state"]["status"]["progress_ms"],
"entity_id": ynison["player_state"]["player_queue"]["entity_id"],
"repeat_mode": ynison["player_state"]["player_queue"]["options"][
"repeat_mode"
],
"entity_type": ynison["player_state"]["player_queue"]["entity_type"],
"track": track,
"info": info,
"success": True,
}
except Exception as e:
return {"success": False, "error": str(e), "track": None}
@loader.tds
class YmNowBetaMod(loader.Module):
"""
Module for yandex music. Based on SpotifyNow, YaNow and WakaTime. [BETA]
Now on Ynison API.
"""
strings = {
"name": "YmNow",
"no_token": "<b><emoji document_id=5843952899184398024>🚫</emoji> Specify a token in config!</b>",
"playing": "<b><emoji document_id=5188705588925702510>🎶</emoji> Now playing: </b><code>{}</code><b> - </b><code>{}</code>\n<b>🕐 {}</b>",
"no_args": "<b><emoji document_id=5843952899184398024>🚫</emoji> Provide arguments!</b>",
"state": "🙂 <b>Widgets are now {}</b>\n{}",
"tutorial": (
" <b>To enable widget, send a message to a preffered chat with text"
" </b><code>{YANDEXMUSIC}</code>"
),
"no_results": "<b><emoji document_id=5285037058220372959>☹️</emoji> No results found :(</b>",
"autobioe": "<b>🔁 Autobio enabled</b>",
"autobiod": "<b>🔁 Autobio disabled</b>",
"_cfg_yandexmusictoken": "Yandex.Music account token",
"_cfg_autobiotemplate": "Template for AutoBio",
"_cfg_automesgtemplate": "Template for AutoMessage",
"_cfg_update_interval": "Update interval",
"guide": (
'<a href="https://github.com/MarshalX/yandex-music-api/discussions/513#discussioncomment-2729781">'
"Instructions for obtaining a Yandex.Music token</a>"
),
"configuring": "🙂 <b>Widget is ready and will be updated soon</b>",
}
strings_ru = {
"no_token": "<b><emoji document_id=5843952899184398024>🚫</emoji> Укажи токен в конфиге!</b>",
"playing": "<b><emoji document_id=5188705588925702510>🎶</emoji> Сейчас играет: </b><code>{}</code><b> - </b><code>{}</code>\n<b>🕐 {}</b>",
"no_args": "<b><emoji document_id=5843952899184398024>🚫</emoji> Укажи аргументы!</b>",
"state": "🙂 <b>Виджеты теперь {}</b>\n{}",
"tutorial": (
" <b>Чтобы включить виджет, отправь сообщение в нужный чат с текстом"
" </b><code>{YANDEXMUSIC}</code>"
),
"no_results": "<b><emoji document_id=5285037058220372959>☹️</emoji> Ничего не найдено :(</b>",
"autobioe": "<b>🔁 Autobio включен</b>",
"autobiod": "<b>🔁 Autobio выключен</b>",
"_cls_doc": "Модуль для Яндекс.Музыка. Основан на SpotifyNow, YaNow и WakaTime. [BETA]",
"_cfg_yandexmusictoken": "Токен аккаунта Яндекс.Музыка",
"_cfg_autobiotemplate": "Шаблон для AutoBio, параметры: {artists}, {track}, {time}",
"_cfg_automesgtemplate": "Шаблон для AutoMessage, параметры: {artists}, {track}, {time}, {link}",
"_cfg_update_interval": "Интервал обновления виджета",
"guide": (
'<a href="https://github.com/MarshalX/yandex-music-api/discussions/513#discussioncomment-2729781">'
"Инструкция по получению токена Яндекс.Музыка</a>"
),
"configuring": "🙂 <b>Виджет готов и скоро будет обновлен</b>",
}
def __init__(self):
self.config = loader.ModuleConfig(
loader.ConfigValue(
"YandexMusicToken",
None,
lambda: self.strings["_cfg_yandexmusictoken"],
validator=loader.validators.Hidden(),
),
loader.ConfigValue(
"AutoBioTemplate",
"🎧 {artists} - {track} / {time}",
lambda: self.strings["_cfg_autobiotemplate"],
validator=loader.validators.String(),
),
loader.ConfigValue(
"AutoMessageTemplate",
"🎧 {artists} - {track} / {time} {link}",
lambda: self.strings["_cfg_automesgtemplate"],
validator=loader.validators.String(),
),
loader.ConfigValue(
"update_interval",
300,
lambda: self.strings["_cfg_update_interval"],
validator=loader.validators.Integer(minimum=100),
),
)
async def on_dlmod(self):
if not self.get("guide_send", False):
await self.inline.bot.send_message(
self._tg_id,
self.strings["guide"],
)
self.set("guide_send", True)
async def client_ready(self, client: TelegramClient, db):
self.client = client
self.db = db
self._premium = getattr(await self.client.get_me(), "premium", False)
self.set("widgets", list(map(tuple, self.get("widgets", []))))
self._task = asyncio.ensure_future(self._parse())
if self.get("autobio", False):
self.autobio.start()
@loader.command()
async def ynowcmd(self, message: Message):
"""Get now playing track"""
if not self.config["YandexMusicToken"]:
await utils.answer(message, self.strings["no_token"])
return
try:
client = ClientAsync(self.config["YandexMusicToken"])
await client.init()
except: # noqa: E722
await utils.answer(message, self.strings["no_token"])
return
res = await get_current_track(client, self.config["YandexMusicToken"])
if not res["success"]:
await utils.answer(message, self.strings["no_results"])
return
track = res["track"][0] # type: ignore
link = res["info"][0]["direct_link"] # type: ignore
title = track["title"]
artists = [artist["name"] for artist in track["artists"]]
duration_ms = int(track["duration_ms"])
caption = self.strings["playing"].format(
utils.escape_html(", ".join(artists)),
utils.escape_html(title),
f"{duration_ms // 1000 // 60:02}:{duration_ms // 1000 % 60:02}",
)
lnk = track["id"]
await self.inline.form(
message=message,
text=caption,
reply_markup={
"text": "song.link",
"url": f"https://song.link/ya/{lnk}",
},
silent=True,
audio={
"url": link,
"title": utils.escape_html(title),
"performer": utils.escape_html(", ".join(artists)),
},
)
@loader.command()
async def ybio(self, message: Message):
"""Show now playing track in your bio"""
if not self.config["YandexMusicToken"]:
await utils.answer(message, self.strings["no_token"])
return
try:
client = ClientAsync(self.config["YandexMusicToken"])
await client.init()
except: # noqa: E722
await utils.answer(message, self.strings["no_token"])
return
current = self.get("autobio", False)
new = not current
self.set("autobio", new)
if new:
await utils.answer(message, self.strings["autobioe"])
self.autobio.start()
else:
await utils.answer(message, self.strings["autobiod"])
self.autobio.stop()
@loader.command()
async def automsgcmd(self, message: Message):
"""Toggle YandexMusic widgets' updates"""
state = not self.get("state", False)
self.set("state", state)
await utils.answer(
message,
self.strings["state"].format(
"on" if state else "off", self.strings("tutorial") if state else ""
),
)
@loader.loop(interval=60)
async def autobio(self):
client = ClientAsync(self.config["YandexMusicToken"])
await client.init()
res = await get_current_track(client, self.config["YandexMusicToken"])
track = res["track"][0] # type: ignore
title = track["title"]
artists = [artist["name"] for artist in track["artists"]]
duration_ms = int(track["duration_ms"])
text = self.config["AutoBioTemplate"].format(
artists=utils.escape_html(", ".join(artists)),
track=utils.escape_html(title),
time=f"{duration_ms // 1000 // 60:02}:{duration_ms // 1000 % 60:02}",
)
try:
await self.client(
UpdateProfileRequest(about=text[: 140 if self._premium else 70])
)
except FloodWaitError as e:
logger.info(f"Sleeping {e.seconds}")
await sleep(e.seconds)
return
async def _parse(self, do_not_loop: bool = False):
while True:
for widget in self.get("widgets", []):
client = ClientAsync(self.config["YandexMusicToken"])
await client.init()
res = await get_current_track(client, self.config["YandexMusicToken"])
track = res["track"][0] # type: ignore
title = track["title"]
artists = [artist["name"] for artist in track["artists"]]
duration_ms = int(track["duration_ms"])
try:
await self._client.edit_message(
*widget[:2],
self.config["AutoMessageTemplate"].format(
artists=utils.escape_html(", ".join(artists)),
track=utils.escape_html(title),
time=f"{duration_ms // 1000 // 60:02}:{duration_ms // 1000 % 60:02}",
link=f"https://song.link/ya/{track['id']}",
),
)
except MessageNotModifiedError:
pass
except FloodWaitError:
pass
except Exception:
logger.debug("YmNow widget update failed")
self.set(
"widgets", list(set(self.get("widgets", [])) - set([widget]))
)
continue
if do_not_loop:
break
await asyncio.sleep(int(self.config["update_interval"]))
async def on_unload(self):
self._task.cancel()
async def watcher(self, message: Message):
try:
if "{YANDEXMUSIC}" not in getattr(message, "text", "") or not message.out:
return
chat_id = utils.get_chat_id(message)
message_id = message.id
self.set(
"widgets",
self.get("widgets", []) + [(chat_id, message_id, message.text)], # type: ignore
)
await utils.answer(message, self.strings["configuring"])
await self._parse(do_not_loop=True)
except Exception as e:
logger.exception("Can't send widget")
await utils.respond(message, self.strings["error"].format(e))