__version__ = (1, 0, 5) # meta pic: https://static.whypodg.me/mods!lastfm.png # meta banner: https://mods.whypodg.me/badges/lastfm.jpg # meta developer: @idiotcoders # scope: hikka_only # scope: hikka_min 1.2.10 # requires: pylast import asyncio import contextlib import functools import logging import pylast import requests from traceback import format_exc from typing import Optional from types import FunctionType from io import BytesIO from telethon.tl.functions.account import UpdateProfileRequest from telethon.errors.rpcerrorlist import FloodWaitError from telethon.tl.types import Message from .. import loader, utils logger = logging.getLogger(__name__) @loader.tds class LastFMMod(loader.Module): """LastFM Now (based on SpotifyNow)""" strings = { "name": "LastFM", "_cfg_token": "Enter API token from your Last.fm account 🤫", "_cfg_secret": "Enter API secret from your Last.fm account 🤫", "_cfg_username": "Enter username from your Last.fm account 🤫", "_cfg_passwd": "Enter password from your Last.fm account 🤫", "_cfg_autobio": "Enter a template for auto-bio\nArguments: {author} — author of track, {track} — title of track", "error": " Error occurred. Make sure you are authorized and the track is playing!\n\n{error}", "no_auth": " You are unauthorized!", "nothing_playing": " Nothing is playing right now!", "no_args": " Specify the args!", "autobioe": " Last.fm bio enabled", "autobiod": " Last.fm bio disabled", "top": "🏆 Your top-{count} the most listened tracks:\n{top}", "nores": " No results!", "now_playing": "🎧 {author} - {track}", "search": "🎧 {}" } strings_ru = { "name": "LastFM", "_cfg_token": "Укажи API токен от аккаунта Last.fm 🤫", "_cfg_secret": "Укажи API секрет от аккаунта Last.fm 🤫", "_cfg_username": "Укажи логин от аккаунта Last.fm 🤫", "_cfg_passwd": "Укажи пароль от аккаунта Last.fm 🤫", "_cfg_autobio": "Укажи шаблон автобио\nАргументы: {author} — автор, {track} — название трека", "_cls_doc": "LastFM Now (основан на SpotifyNow)", "error": " Произошла ошибка. Убедитесь, что Вы автворизованы и музыка играет!\n\n{error}", "no_auth": " Вы не авторизованы!", "nothing_playing": " Ничего сейчас не играет!", "no_args": " Укажите аргументы!", "autobioe": " Авто-био для Last.fm включено", "autobiod": " Авто-био для Last.fm выключено", "top": "🏆 Ваш топ-{count} самых прослушиваемых треков:\n{top}", "nores": " Результатов не найдено!", "now_playing": "🎧 {author} - {track}", "search": "🎧 {}" } def __init__(self): self.config = loader.ModuleConfig( loader.ConfigValue( "LastFmToken", None, lambda: self.strings["_cfg_token"], validator=loader.validators.Hidden(), ), loader.ConfigValue( "LastFmSecret", None, lambda: self.strings["_cfg_secret"], validator=loader.validators.Hidden(), ), loader.ConfigValue( "LastFmLogin", None, lambda: self.strings["_cfg_username"], validator=loader.validators.Hidden(), ), loader.ConfigValue( "LastFmPassword", None, lambda: self.strings["_cfg_passwd"], validator=loader.validators.Hidden(), ), loader.ConfigValue( "AutoBioTemplate", "🎧🎵 {author} - {track}", lambda: self.strings["_cfg_autobio"], validator=loader.validators.String(), ), ) async def client_ready(self, client, db): self._premium = getattr(await client.get_me(), "premium", False) try: self._netw = pylast.LastFMNetwork( api_key=self.config['LastFmToken'], api_secret=self.config['LastFmSecret'], username=self.config['LastFmLogin'], password_hash=pylast.md5(self.config['LastFmPassword']) ) self._user = self._netw.get_user(self.config['LastFmLogin']) self.set("auth", True) except Exception: self.set("auth", False) if self.get("autobio", False): self.autobio.start() self.musicdl = await self.import_lib( "https://libs.hikariatama.ru/musicdl.py", suspend_on_error=True, ) def error_handler(func) -> FunctionType: @functools.wraps(func) async def wrapped(*args, **kwargs): try: return await func(*args, **kwargs) except Exception: logger.exception(format_exc()) with contextlib.supperss(Exception): await utils.answer( args[1], args[0].strings['error'].format(error=format_exc()) ) wrapped.__doc__ = func.__doc__ wrapped.__module__ = func.__module__ return wrapped @loader.loop(interval=90) async def autobio(self): if not self.get('auth', False): return now = self._user.get_now_playing() if not now: return now = self._user.get_now_playing() bio = self.config["AutoBioTemplate"].format( author = str(now.artist), track = str(now.title) ) try: await self._client( UpdateProfileRequest(about=bio[: 140 if self._premium else 70]) ) except FloodWaitError as e: logger.info(f"Sleeping {max(e.seconds, 60)} bc of floodwait") await asyncio.sleep(max(e.seconds, 60)) return @error_handler @loader.command( ru_doc="<название> 👉 Поиск по трекам. Работает без авторизации", alias="lsch" ) async def lsearchcmd(self, message: Message): " 👉 Search for tracks. Works without authorization" name = utils.get_args_raw(message) if not name: await utils.answer( message, self.strings['no_args'] ) return await self._open_track(track=name, message=message) @error_handler @loader.command( ru_doc="[кол-во треков в топе] 👉 Получить топ самых прослушиваемых треков. Вы можете указать кол-во треков в топе (необязательно)" ) async def ltopcmd(self, message: Message): "[count of tracks in top] 👉 Get the top most listened tracks. You can enter the count of tracks (optional)" args = utils.get_args(message) c, out = 5, "" if len(args) > 0 and args[0].isdigit(): c = int(args[0]) top = self._user.get_top_tracks(limit=c) emj = {'1': '🥇', '2': '🥈', '3': '🥉'} for i in range(len(top)): out += f"{(str(i+1) + '.') if str(i+1) not in list(emj.keys()) else emj[str(i+1)]} {top[i].item.artist} - {top[i].item.title} — {top[i].weight}\n" await utils.answer( message, self.strings['top'].format(count=c, top=out) ) @error_handler @loader.command( ru_doc="👉 Включить/выключить авто-био" ) async def lbiocmd(self, message: Message): """👉 Toggle bio playback streaming""" current = self.get("autobio", False) new = not current self.set("autobio", new) await utils.answer( message, self.strings[f"autobio{'e' if new else 'd'}"] ) if new: self.autobio.start() else: self.autobio.stop() @error_handler @loader.command( ru_doc="👉 Покажет проигрываемый сейчас трек" ) async def lnowcmd(self, message: Message): """👉 Shows track, that playing right now""" now = self._user.get_now_playing() if now is None: await utils.answer( message, self.strings['nothing_playing'] ) return artists = str(now.artist).split(', ') track = {'name': str(now.title), 'artists': []} track['artists'].append({'name': i for i in artists}) await self._open_track( track=track, message=message, override_text=self.strings['now_playing'].format( author=str(now.artist), track=str(now.title) ) ) async def _open_track( self, track, message: Message, override_text: str = None, ): if type(track) is dict: name = track.get("name") artists = [ artist["name"] for artist in track.get("artists", []) if "name" in artist ] full_song_name = f"{name} - {', '.join(artists)}" else: name = "" artists = [] full_song_name = str(track) music = await self.musicdl.dl(full_song_name, only_document=True) if not override_text: override_text = ( f"🎧 {', '.join(artists)} - {name}" if artists else f"🎧 {full_song_name}" ) try: await self._client.send_file( message.peer_id, music, caption=override_text ) except: await utils.answer( message, "Some error!" ) return if message.out: await message.delete()