mirror of
https://github.com/MuRuLOSE/limoka.git
synced 2026-06-17 23:04:17 +02:00
Commited backup
This commit is contained in:
279
MuRuLOSE/HikkaModulesRepo/VKMusic.py
Normal file
279
MuRuLOSE/HikkaModulesRepo/VKMusic.py
Normal file
@@ -0,0 +1,279 @@
|
||||
from typing import Union, Dict
|
||||
import aiohttp
|
||||
from aiohttp.client_exceptions import ServerTimeoutError
|
||||
import logging
|
||||
import difflib
|
||||
import re
|
||||
|
||||
from telethon.tl.types import Message
|
||||
from telethon import types
|
||||
from .. import loader, utils
|
||||
|
||||
|
||||
"""
|
||||
███ ███ ██ ██ ██████ ██ ██ ██ ██████ ███████ ███████
|
||||
████ ████ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██
|
||||
██ ████ ██ ██ ██ ██████ ██ ██ ██ ██ ██ ███████ █████
|
||||
██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██
|
||||
██ ██ ██████ ██ ██ ██████ ███████ ██████ ███████ ███████
|
||||
|
||||
|
||||
VKMusic
|
||||
📜 Licensed under the GNU AGPLv3
|
||||
"""
|
||||
|
||||
# meta banner: https://0x0.st/HYVT.jpg
|
||||
# meta desc: desc
|
||||
# meta developer: @BruhHikkaModules
|
||||
# requires: aiohttp difflib
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class VKMusicAPI:
|
||||
def __init__(self, user_id: str, token: str) -> Union[Dict, int]:
|
||||
self.token = token
|
||||
self.user_id = user_id
|
||||
|
||||
async def get_music(self):
|
||||
try:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(
|
||||
f"https://api.vk.com/method/status.get?user_id={self.user_id}&access_token={self.token}&v=5.199"
|
||||
) as response:
|
||||
data: dict = await response.json()
|
||||
if 'error' in data or 'response' not in data:
|
||||
return 10, None
|
||||
if data['response'].get('audio') is not None:
|
||||
return 50, data['response']
|
||||
else:
|
||||
return 40, data['response']['text']
|
||||
except ServerTimeoutError:
|
||||
return 30
|
||||
|
||||
@loader.tds
|
||||
class VKMusic(loader.Module):
|
||||
strings = {
|
||||
"name": "VKMusic",
|
||||
"no_music": "Music is not playing (not all music is displayed in the status).",
|
||||
"server_error": "Server of VK does not answering",
|
||||
"music_form": (
|
||||
"<emoji document_id=5222175680652917218>🎵</emoji> <b>Listening now:</b> <code>{title}</code>"
|
||||
"\n<emoji document_id=5269537556336222550>🐱</emoji> <b>Artist:</b> <code>{artist}</code>"
|
||||
),
|
||||
"instructions": (
|
||||
"<b>Go to <a href='https://vkhost.github.io/'>vkhost</a>, open settings, leave anytime access and status,"
|
||||
"and click get, copy the token and id, and then paste it in properly (in config).</b>"
|
||||
),
|
||||
"not_russia": (
|
||||
"\n<emoji document_id=5303281542422865331>🇷🇺</emoji> VK gave not all information about"
|
||||
"the track because your userbot server is outside the Russian Federation."
|
||||
),
|
||||
"bot_searching": "Searching via Telegram bot...",
|
||||
"bot_not_found": "Music not found via Telegram bot.",
|
||||
"bot_start": "Bot requires /start, initializing...",
|
||||
"empty_query": "Cannot search: possibly, no music is playing, or music is not broadcasted to the status.",
|
||||
"invalid_token": "Invalid or expired VK token. Please update your token using .vkmtoken instructions."
|
||||
}
|
||||
|
||||
def __init__(self):
|
||||
self.config = loader.ModuleConfig(
|
||||
loader.ConfigValue(
|
||||
"token",
|
||||
"token",
|
||||
lambda: "How get token: .vkmtoken",
|
||||
validator=loader.validators.Hidden(loader.validators.String()),
|
||||
),
|
||||
loader.ConfigValue(
|
||||
"user_id",
|
||||
"1278631",
|
||||
lambda: "Here your userid, (about this in .vkmtoken)",
|
||||
validator=loader.validators.Hidden(loader.validators.String()),
|
||||
),
|
||||
loader.ConfigValue(
|
||||
"telegram_bot",
|
||||
"@vkm_bot",
|
||||
lambda: "Telegram bot username for music search (e.g., @vkmusic_bot)",
|
||||
validator=loader.validators.String(),
|
||||
),
|
||||
)
|
||||
self._vkmusic = VKMusicAPI(self.config["user_id"], self.config["token"])
|
||||
|
||||
def _clean_string(self, text: str) -> str:
|
||||
text = text.lower().strip()
|
||||
text = re.sub(r'\(official video\)', '', text, flags=re.IGNORECASE)
|
||||
text = re.sub(r'\(lyrics\)', '', text, flags=re.IGNORECASE)
|
||||
text = re.sub(r'\(audio\)', '', text, flags=re.IGNORECASE)
|
||||
text = re.sub(r'\(live\)', '', text, flags=re.IGNORECASE)
|
||||
text = re.sub(r'\(remix\)', '', text, flags=re.IGNORECASE)
|
||||
text = re.sub(r'\(feat\..*?\)', '', text, flags=re.IGNORECASE)
|
||||
text = re.sub(r'\(russian ver\.?\)', '', text, flags=re.IGNORECASE)
|
||||
text = re.sub(r'\(english ver\.?\)', '', text, flags=re.IGNORECASE)
|
||||
text = re.sub(r'\(ver\.?\)', '', text, flags=re.IGNORECASE)
|
||||
text = re.sub(r'\(version\)', '', text, flags=re.IGNORECASE)
|
||||
text = re.sub(r'\(edit\)', '', text, flags=re.IGNORECASE)
|
||||
text = re.sub(r'russian version', '', text, flags=re.IGNORECASE)
|
||||
text = re.sub(r'english version', '', text, flags=re.IGNORECASE)
|
||||
text = re.sub(r'—', '-', text)
|
||||
text = re.sub(r'\s+', ' ', text)
|
||||
return text.strip()
|
||||
|
||||
def _simplify_query(self, query: str) -> str:
|
||||
simplified = query
|
||||
parts = simplified.split(' x ')
|
||||
if len(parts) > 1:
|
||||
simplified = ' x '.join(parts[1:])
|
||||
return simplified
|
||||
|
||||
async def _get_music_from_bot(self, query: str):
|
||||
bot_username = self.config["telegram_bot"]
|
||||
messages_to_delete = []
|
||||
|
||||
if not query or query.strip() == "":
|
||||
return None, None, None
|
||||
|
||||
async with self.client.conversation(bot_username) as conv:
|
||||
try:
|
||||
request = await conv.send_message(query)
|
||||
messages_to_delete.append(request)
|
||||
|
||||
try:
|
||||
response = await conv.get_response(timeout=10)
|
||||
messages_to_delete.append(response)
|
||||
except TimeoutError:
|
||||
await conv.send_message("/start")
|
||||
messages_to_delete.append(await conv.get_response(timeout=5))
|
||||
await conv.send_message(query)
|
||||
messages_to_delete.append(await conv.get_response(timeout=10))
|
||||
response = messages_to_delete[-1]
|
||||
|
||||
if not hasattr(response, 'reply_markup') or response.reply_markup is None:
|
||||
await self.client.delete_messages(bot_username, messages_to_delete)
|
||||
return None, None, None
|
||||
|
||||
if hasattr(response.reply_markup, "rows"):
|
||||
buttons = []
|
||||
for row in response.reply_markup.rows:
|
||||
for button in row.buttons:
|
||||
if hasattr(button, "text"):
|
||||
buttons.append((button.text, button))
|
||||
|
||||
if not buttons:
|
||||
await self.client.delete_messages(bot_username, messages_to_delete)
|
||||
return None, None, None
|
||||
|
||||
best_match = None
|
||||
highest_similarity = 0.0
|
||||
query_cleaned = self._clean_string(query)
|
||||
query_simplified = self._simplify_query(query_cleaned)
|
||||
|
||||
for button_text, button in buttons:
|
||||
button_text_cleaned = self._clean_string(button_text)
|
||||
similarity = difflib.SequenceMatcher(None, query_cleaned, button_text_cleaned).ratio()
|
||||
if similarity > highest_similarity and similarity >= 0.5:
|
||||
highest_similarity = similarity
|
||||
best_match = button
|
||||
|
||||
if not best_match and query_simplified != query_cleaned:
|
||||
for button_text, button in buttons:
|
||||
button_text_cleaned = self._clean_string(button_text)
|
||||
similarity = difflib.SequenceMatcher(None, query_simplified, button_text_cleaned).ratio()
|
||||
if similarity > highest_similarity and similarity >= 0.5:
|
||||
highest_similarity = similarity
|
||||
best_match = button
|
||||
|
||||
if best_match:
|
||||
music_response = await response.click(button=best_match)
|
||||
else:
|
||||
music_response = await response.click(0)
|
||||
|
||||
file_response = await conv.get_response(timeout=10)
|
||||
messages_to_delete.append(file_response)
|
||||
|
||||
if file_response.media and isinstance(file_response.media, types.MessageMediaDocument):
|
||||
document = file_response.media.document
|
||||
for attr in document.attributes:
|
||||
if isinstance(attr, types.DocumentAttributeAudio):
|
||||
title = attr.title or "Unknown Title"
|
||||
artist = attr.performer or "Unknown Artist"
|
||||
return title, artist, document
|
||||
return None, None, document
|
||||
return None, None, None
|
||||
else:
|
||||
file_response = response
|
||||
|
||||
if file_response.media and isinstance(file_response.media, types.MessageMediaDocument):
|
||||
document = file_response.media.document
|
||||
for attr in document.attributes:
|
||||
if isinstance(attr, types.DocumentAttributeAudio):
|
||||
title = attr.title or "Unknown Title"
|
||||
artist = attr.performer or "Unknown Artist"
|
||||
await self.client.delete_messages(bot_username, messages_to_delete)
|
||||
return title, artist, document
|
||||
return None, None, document
|
||||
return None, None, None
|
||||
except Exception as e:
|
||||
await self.client.delete_messages(bot_username, messages_to_delete)
|
||||
return None, None, None
|
||||
|
||||
@loader.command(ru_doc=" - Текущая песня")
|
||||
async def vkmpnow(self, message: Message):
|
||||
""" - Current song"""
|
||||
self._vkmusic = VKMusicAPI(str(self.config["user_id"]), str(self.config["token"]))
|
||||
|
||||
music = await self._vkmusic.get_music()
|
||||
|
||||
if music[0] == 50:
|
||||
await utils.answer(message, self.strings["bot_searching"])
|
||||
query = f"{music[1]['audio']['artist']} - {music[1]['audio']['title']}"
|
||||
query = self._clean_string(query)
|
||||
if not query:
|
||||
await utils.answer(message, self.strings["empty_query"])
|
||||
return
|
||||
title, artist, document = await self._get_music_from_bot(query)
|
||||
|
||||
if document:
|
||||
file_name = f"{artist or 'Unknown'} - {title or 'Unknown'}.mp3".replace("/", "_").replace("\\", "_").replace(":", "_").strip()
|
||||
await utils.answer_file(
|
||||
message,
|
||||
file=document,
|
||||
file_name=file_name,
|
||||
caption=self.strings["music_form"].format(
|
||||
title=title or "Unknown",
|
||||
artist=artist or "Unknown"
|
||||
)
|
||||
)
|
||||
else:
|
||||
await utils.answer(message, self.strings["bot_not_found"])
|
||||
elif music[0] == 40:
|
||||
await utils.answer(message, self.strings["bot_searching"])
|
||||
query = music[1]
|
||||
query = self._clean_string(query)
|
||||
if not query:
|
||||
await utils.answer(message, self.strings["empty_query"])
|
||||
return
|
||||
title, artist, document = await self._get_music_from_bot(query)
|
||||
|
||||
if document:
|
||||
file_name = f"{artist or 'Unknown'} - {title or 'Unknown'}.mp3".replace("/", "_").replace("\\", "_").replace(":", "_").strip()
|
||||
await utils.answer_file(
|
||||
message,
|
||||
file=document,
|
||||
file_name=file_name,
|
||||
caption=self.strings["music_form"].format(
|
||||
title=title or "Unknown",
|
||||
artist=artist or "Unknown"
|
||||
)
|
||||
)
|
||||
else:
|
||||
await utils.answer(message, self.strings["bot_not_found"])
|
||||
elif music[0] == 30:
|
||||
await utils.answer(message, self.strings["server_error"])
|
||||
elif music[0] == 10:
|
||||
await utils.answer(message, self.strings["invalid_token"])
|
||||
else:
|
||||
await utils.answer(message, self.strings["no_music"])
|
||||
|
||||
@loader.command(ru_doc=" - Инструкции для токена и пользовательского идентификатора")
|
||||
async def vkmtoken(self, message: Message):
|
||||
"""- Instructions for token and user ID"""
|
||||
await utils.answer(message, self.strings["instructions"])
|
||||
Reference in New Issue
Block a user