__version__ = (1, 0, 5)
# meta pic: https://static.whypodg.me/mods!lastfm.png
# meta banner: https://mods.whypodg.me/badges/lastfm.jpg
# meta developer: @idiotcoders
# scope: hikka_only
# scope: hikka_min 1.2.10
# requires: pylast
import asyncio
import contextlib
import functools
import logging
import pylast
import requests
from traceback import format_exc
from typing import Optional
from types import FunctionType
from io import BytesIO
from telethon.tl.functions.account import UpdateProfileRequest
from telethon.errors.rpcerrorlist import FloodWaitError
from telethon.tl.types import Message
from .. import loader, utils
logger = logging.getLogger(__name__)
@loader.tds
class LastFMMod(loader.Module):
"""LastFM Now (based on SpotifyNow)"""
strings = {
"name": "LastFM",
"_cfg_token": "Enter API token from your Last.fm account 🤫",
"_cfg_secret": "Enter API secret from your Last.fm account 🤫",
"_cfg_username": "Enter username from your Last.fm account 🤫",
"_cfg_passwd": "Enter password from your Last.fm account 🤫",
"_cfg_autobio": "Enter a template for auto-bio\nArguments: {author} — author of track, {track} — title of track",
"error": "❌ Error occurred. Make sure you are authorized and the track is playing!\n\n{error}",
"no_auth": "❌ You are unauthorized!",
"nothing_playing": "❌ Nothing is playing right now!",
"no_args": "❌ Specify the args!",
"autobioe": "✅ Last.fm bio enabled",
"autobiod": "✅ Last.fm bio disabled",
"top": "🏆 Your top-{count} the most listened tracks:\n{top}",
"nores": "❌ No results!",
"now_playing": "🎧 {author} - {track}",
"search": "🎧 {}"
}
strings_ru = {
"name": "LastFM",
"_cfg_token": "Укажи API токен от аккаунта Last.fm 🤫",
"_cfg_secret": "Укажи API секрет от аккаунта Last.fm 🤫",
"_cfg_username": "Укажи логин от аккаунта Last.fm 🤫",
"_cfg_passwd": "Укажи пароль от аккаунта Last.fm 🤫",
"_cfg_autobio": "Укажи шаблон автобио\nАргументы: {author} — автор, {track} — название трека",
"_cls_doc": "LastFM Now (основан на SpotifyNow)",
"error": "❌ Произошла ошибка. Убедитесь, что Вы автворизованы и музыка играет!\n\n{error}",
"no_auth": "❌ Вы не авторизованы!",
"nothing_playing": "❌ Ничего сейчас не играет!",
"no_args": "❌ Укажите аргументы!",
"autobioe": "✅ Авто-био для Last.fm включено",
"autobiod": "✅ Авто-био для Last.fm выключено",
"top": "🏆 Ваш топ-{count} самых прослушиваемых треков:\n{top}",
"nores": "❌ Результатов не найдено!",
"now_playing": "🎧 {author} - {track}",
"search": "🎧 {}"
}
def __init__(self):
self.config = loader.ModuleConfig(
loader.ConfigValue(
"LastFmToken",
None,
lambda: self.strings["_cfg_token"],
validator=loader.validators.Hidden(),
),
loader.ConfigValue(
"LastFmSecret",
None,
lambda: self.strings["_cfg_secret"],
validator=loader.validators.Hidden(),
),
loader.ConfigValue(
"LastFmLogin",
None,
lambda: self.strings["_cfg_username"],
validator=loader.validators.Hidden(),
),
loader.ConfigValue(
"LastFmPassword",
None,
lambda: self.strings["_cfg_passwd"],
validator=loader.validators.Hidden(),
),
loader.ConfigValue(
"AutoBioTemplate",
"🎧🎵 {author} - {track}",
lambda: self.strings["_cfg_autobio"],
validator=loader.validators.String(),
),
)
async def client_ready(self, client, db):
self._premium = getattr(await client.get_me(), "premium", False)
try:
self._netw = pylast.LastFMNetwork(
api_key=self.config['LastFmToken'],
api_secret=self.config['LastFmSecret'],
username=self.config['LastFmLogin'],
password_hash=pylast.md5(self.config['LastFmPassword'])
)
self._user = self._netw.get_user(self.config['LastFmLogin'])
self.set("auth", True)
except Exception:
self.set("auth", False)
if self.get("autobio", False):
self.autobio.start()
self.musicdl = await self.import_lib(
"https://libs.hikariatama.ru/musicdl.py",
suspend_on_error=True,
)
def error_handler(func) -> FunctionType:
@functools.wraps(func)
async def wrapped(*args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception:
logger.exception(format_exc())
with contextlib.supperss(Exception):
await utils.answer(
args[1],
args[0].strings['error'].format(error=format_exc())
)
wrapped.__doc__ = func.__doc__
wrapped.__module__ = func.__module__
return wrapped
@loader.loop(interval=90)
async def autobio(self):
if not self.get('auth', False):
return
now = self._user.get_now_playing()
if not now:
return
now = self._user.get_now_playing()
bio = self.config["AutoBioTemplate"].format(
author = str(now.artist),
track = str(now.title)
)
try:
await self._client(
UpdateProfileRequest(about=bio[: 140 if self._premium else 70])
)
except FloodWaitError as e:
logger.info(f"Sleeping {max(e.seconds, 60)} bc of floodwait")
await asyncio.sleep(max(e.seconds, 60))
return
@error_handler
@loader.command(
ru_doc="<название> 👉 Поиск по трекам. Работает без авторизации",
alias="lsch"
)
async def lsearchcmd(self, message: Message):
" 👉 Search for tracks. Works without authorization"
name = utils.get_args_raw(message)
if not name:
await utils.answer(
message,
self.strings['no_args']
)
return
await self._open_track(track=name, message=message)
@error_handler
@loader.command(
ru_doc="[кол-во треков в топе] 👉 Получить топ самых прослушиваемых треков. Вы можете указать кол-во треков в топе (необязательно)"
)
async def ltopcmd(self, message: Message):
"[count of tracks in top] 👉 Get the top most listened tracks. You can enter the count of tracks (optional)"
args = utils.get_args(message)
c, out = 5, ""
if len(args) > 0 and args[0].isdigit():
c = int(args[0])
top = self._user.get_top_tracks(limit=c)
emj = {'1': '🥇', '2': '🥈', '3': '🥉'}
for i in range(len(top)):
out += f"{(str(i+1) + '.') if str(i+1) not in list(emj.keys()) else emj[str(i+1)]} {top[i].item.artist} - {top[i].item.title} — {top[i].weight}\n"
await utils.answer(
message,
self.strings['top'].format(count=c, top=out)
)
@error_handler
@loader.command(
ru_doc="👉 Включить/выключить авто-био"
)
async def lbiocmd(self, message: Message):
"""👉 Toggle bio playback streaming"""
current = self.get("autobio", False)
new = not current
self.set("autobio", new)
await utils.answer(
message,
self.strings[f"autobio{'e' if new else 'd'}"]
)
if new:
self.autobio.start()
else:
self.autobio.stop()
@error_handler
@loader.command(
ru_doc="👉 Покажет проигрываемый сейчас трек"
)
async def lnowcmd(self, message: Message):
"""👉 Shows track, that playing right now"""
now = self._user.get_now_playing()
if now is None:
await utils.answer(
message,
self.strings['nothing_playing']
)
return
artists = str(now.artist).split(', ')
track = {'name': str(now.title), 'artists': []}
track['artists'].append({'name': i for i in artists})
await self._open_track(
track=track, message=message,
override_text=self.strings['now_playing'].format(
author=str(now.artist), track=str(now.title)
)
)
async def _open_track(
self,
track,
message: Message,
override_text: str = None,
):
if type(track) is dict:
name = track.get("name")
artists = [
artist["name"] for artist in track.get("artists", []) if "name" in artist
]
full_song_name = f"{name} - {', '.join(artists)}"
else:
name = ""
artists = []
full_song_name = str(track)
music = await self.musicdl.dl(full_song_name, only_document=True)
if not override_text:
override_text = (
f"🎧 {', '.join(artists)} - {name}"
if artists else
f"🎧 {full_song_name}"
)
try:
await self._client.send_file(
message.peer_id,
music,
caption=override_text
)
except:
await utils.answer(
message,
"Some error!"
)
return
if message.out:
await message.delete()