from typing import Union, Dict
import aiohttp
from aiohttp.client_exceptions import ServerTimeoutError
import logging
import difflib
import re
from telethon.tl.types import Message
from telethon import types
from .. import loader, utils
"""
███ ███ ██ ██ ██████ ██ ██ ██ ██████ ███████ ███████
████ ████ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██
██ ████ ██ ██ ██ ██████ ██ ██ ██ ██ ██ ███████ █████
██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██
██ ██ ██████ ██ ██ ██████ ███████ ██████ ███████ ███████
VKMusic
📜 Licensed under the GNU AGPLv3
"""
# meta banner: https://0x0.st/HYVT.jpg
# meta desc: desc
# meta developer: @BruhHikkaModules
# requires: aiohttp difflib
logger = logging.getLogger(__name__)
class VKMusicAPI:
def __init__(self, user_id: str, token: str) -> Union[Dict, int]:
self.token = token
self.user_id = user_id
async def get_music(self):
try:
async with aiohttp.ClientSession() as session:
async with session.post(
f"https://api.vk.com/method/status.get?user_id={self.user_id}&access_token={self.token}&v=5.199"
) as response:
data: dict = await response.json()
if 'error' in data or 'response' not in data:
return 10, None
if data['response'].get('audio') is not None:
return 50, data['response']
else:
return 40, data['response']['text']
except ServerTimeoutError:
return 30
@loader.tds
class VKMusic(loader.Module):
strings = {
"name": "VKMusic",
"no_music": "Music is not playing (not all music is displayed in the status).",
"server_error": "Server of VK does not answering",
"music_form": (
"🎵 Listening now: {title}"
"\n🐱 Artist: {artist}"
),
"instructions": (
"Go to vkhost, open settings, leave anytime access and status,"
"and click get, copy the token and id, and then paste it in properly (in config)."
),
"not_russia": (
"\n🇷🇺 VK gave not all information about"
"the track because your userbot server is outside the Russian Federation."
),
"bot_searching": "Searching via Telegram bot...",
"bot_not_found": "Music not found via Telegram bot.",
"bot_start": "Bot requires /start, initializing...",
"empty_query": "Cannot search: possibly, no music is playing, or music is not broadcasted to the status.",
"invalid_token": "Invalid or expired VK token. Please update your token using .vkmtoken instructions."
}
def __init__(self):
self.config = loader.ModuleConfig(
loader.ConfigValue(
"token",
"token",
lambda: "How get token: .vkmtoken",
validator=loader.validators.Hidden(loader.validators.String()),
),
loader.ConfigValue(
"user_id",
"1278631",
lambda: "Here your userid, (about this in .vkmtoken)",
validator=loader.validators.Hidden(loader.validators.String()),
),
loader.ConfigValue(
"telegram_bot",
"@vkm_bot",
lambda: "Telegram bot username for music search (e.g., @vkmusic_bot)",
validator=loader.validators.String(),
),
)
self._vkmusic = VKMusicAPI(self.config["user_id"], self.config["token"])
def _clean_string(self, text: str) -> str:
text = text.lower().strip()
text = re.sub(r'\(official video\)', '', text, flags=re.IGNORECASE)
text = re.sub(r'\(lyrics\)', '', text, flags=re.IGNORECASE)
text = re.sub(r'\(audio\)', '', text, flags=re.IGNORECASE)
text = re.sub(r'\(live\)', '', text, flags=re.IGNORECASE)
text = re.sub(r'\(remix\)', '', text, flags=re.IGNORECASE)
text = re.sub(r'\(feat\..*?\)', '', text, flags=re.IGNORECASE)
text = re.sub(r'\(russian ver\.?\)', '', text, flags=re.IGNORECASE)
text = re.sub(r'\(english ver\.?\)', '', text, flags=re.IGNORECASE)
text = re.sub(r'\(ver\.?\)', '', text, flags=re.IGNORECASE)
text = re.sub(r'\(version\)', '', text, flags=re.IGNORECASE)
text = re.sub(r'\(edit\)', '', text, flags=re.IGNORECASE)
text = re.sub(r'russian version', '', text, flags=re.IGNORECASE)
text = re.sub(r'english version', '', text, flags=re.IGNORECASE)
text = re.sub(r'—', '-', text)
text = re.sub(r'\s+', ' ', text)
return text.strip()
def _simplify_query(self, query: str) -> str:
simplified = query
parts = simplified.split(' x ')
if len(parts) > 1:
simplified = ' x '.join(parts[1:])
return simplified
async def _get_music_from_bot(self, query: str):
bot_username = self.config["telegram_bot"]
messages_to_delete = []
if not query or query.strip() == "":
return None, None, None
async with self.client.conversation(bot_username) as conv:
try:
request = await conv.send_message(query)
messages_to_delete.append(request)
try:
response = await conv.get_response(timeout=10)
messages_to_delete.append(response)
except TimeoutError:
await conv.send_message("/start")
messages_to_delete.append(await conv.get_response(timeout=5))
await conv.send_message(query)
messages_to_delete.append(await conv.get_response(timeout=10))
response = messages_to_delete[-1]
if not hasattr(response, 'reply_markup') or response.reply_markup is None:
await self.client.delete_messages(bot_username, messages_to_delete)
return None, None, None
if hasattr(response.reply_markup, "rows"):
buttons = []
for row in response.reply_markup.rows:
for button in row.buttons:
if hasattr(button, "text"):
buttons.append((button.text, button))
if not buttons:
await self.client.delete_messages(bot_username, messages_to_delete)
return None, None, None
best_match = None
highest_similarity = 0.0
query_cleaned = self._clean_string(query)
query_simplified = self._simplify_query(query_cleaned)
for button_text, button in buttons:
button_text_cleaned = self._clean_string(button_text)
similarity = difflib.SequenceMatcher(None, query_cleaned, button_text_cleaned).ratio()
if similarity > highest_similarity and similarity >= 0.5:
highest_similarity = similarity
best_match = button
if not best_match and query_simplified != query_cleaned:
for button_text, button in buttons:
button_text_cleaned = self._clean_string(button_text)
similarity = difflib.SequenceMatcher(None, query_simplified, button_text_cleaned).ratio()
if similarity > highest_similarity and similarity >= 0.5:
highest_similarity = similarity
best_match = button
if best_match:
music_response = await response.click(button=best_match)
else:
music_response = await response.click(0)
file_response = await conv.get_response(timeout=10)
messages_to_delete.append(file_response)
if file_response.media and isinstance(file_response.media, types.MessageMediaDocument):
document = file_response.media.document
for attr in document.attributes:
if isinstance(attr, types.DocumentAttributeAudio):
title = attr.title or "Unknown Title"
artist = attr.performer or "Unknown Artist"
return title, artist, document
return None, None, document
return None, None, None
else:
file_response = response
if file_response.media and isinstance(file_response.media, types.MessageMediaDocument):
document = file_response.media.document
for attr in document.attributes:
if isinstance(attr, types.DocumentAttributeAudio):
title = attr.title or "Unknown Title"
artist = attr.performer or "Unknown Artist"
await self.client.delete_messages(bot_username, messages_to_delete)
return title, artist, document
return None, None, document
return None, None, None
except Exception as e:
await self.client.delete_messages(bot_username, messages_to_delete)
return None, None, None
@loader.command(ru_doc=" - Текущая песня")
async def vkmpnow(self, message: Message):
""" - Current song"""
self._vkmusic = VKMusicAPI(str(self.config["user_id"]), str(self.config["token"]))
music = await self._vkmusic.get_music()
if music[0] == 50:
await utils.answer(message, self.strings["bot_searching"])
query = f"{music[1]['audio']['artist']} - {music[1]['audio']['title']}"
query = self._clean_string(query)
if not query:
await utils.answer(message, self.strings["empty_query"])
return
title, artist, document = await self._get_music_from_bot(query)
if document:
file_name = f"{artist or 'Unknown'} - {title or 'Unknown'}.mp3".replace("/", "_").replace("\\", "_").replace(":", "_").strip()
await utils.answer_file(
message,
file=document,
file_name=file_name,
caption=self.strings["music_form"].format(
title=title or "Unknown",
artist=artist or "Unknown"
)
)
else:
await utils.answer(message, self.strings["bot_not_found"])
elif music[0] == 40:
await utils.answer(message, self.strings["bot_searching"])
query = music[1]
query = self._clean_string(query)
if not query:
await utils.answer(message, self.strings["empty_query"])
return
title, artist, document = await self._get_music_from_bot(query)
if document:
file_name = f"{artist or 'Unknown'} - {title or 'Unknown'}.mp3".replace("/", "_").replace("\\", "_").replace(":", "_").strip()
await utils.answer_file(
message,
file=document,
file_name=file_name,
caption=self.strings["music_form"].format(
title=title or "Unknown",
artist=artist or "Unknown"
)
)
else:
await utils.answer(message, self.strings["bot_not_found"])
elif music[0] == 30:
await utils.answer(message, self.strings["server_error"])
elif music[0] == 10:
await utils.answer(message, self.strings["invalid_token"])
else:
await utils.answer(message, self.strings["no_music"])
@loader.command(ru_doc=" - Инструкции для токена и пользовательского идентификатора")
async def vkmtoken(self, message: Message):
"""- Instructions for token and user ID"""
await utils.answer(message, self.strings["instructions"])