""" █▀▀ ▄▀█ █▄▀ █▀▀ █▀ ▀█▀ █░█░█ █ ▀▄▀ █▄▄ █▀█ █░█ ██▄ ▄█ ░█░ ▀▄▀▄▀ █ █░█ Copyleft 2022 t.me/CakesTwix This program is free software; you can redistribute it and/or modify """ __version__ = (1, 0, 0) # requires: spotdl # meta pic: https://cdn-icons-png.flaticon.com/512/2111/2111624.png # meta developer: @cakestwix_mods # scope: hikka_only import os import subprocess import logging import asyncio import spotdl from .. import loader, utils logger = logging.getLogger(__name__) @loader.tds class InlineSpotifyDownloaderMod(loader.Module): """Module for downloading music from Spotify""" strings = { "name": "InlineSpotifyDownloader", "cfg_spotdl_path": "Path to spotdl bin", "no_args": "🎞 You need to specify link", "no_track": "🎞 You need to specify track link", "downloading": "🎞 Downloading...", "no_spotdl": "🚫 Spotdl not found... Check config or install spotdl (.terminal pip install spotdl)", "not_found": "🎧 Music not found...", } def __init__(self): self.config = loader.ModuleConfig( "CONFIG_SPOTDL_PATH", "~/.local/bin/spotdl", lambda: self.strings("cfg_spotdl_path"), ) async def spotdlcmd(self, message): """Download music from Spotify (Only tracks)""" if not (args := utils.get_args_raw(message)): return await utils.answer(message, self.strings["no_args"]) if "https://open.spotify.com/track/" not in args: return await utils.answer(message, self.strings["no_track"]) # Try write command await utils.answer(message, self.strings["downloading"]) proc = await asyncio.create_subprocess_shell( self.config["CONFIG_SPOTDL_PATH"] + " " + args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) # Pass one line _ = await proc.stdout.readline() # Get console strings line = (await proc.stdout.readline()).decode().rstrip() await proc.wait() # Check "not found" error err = await proc.stderr.readline() if ": not found" in err.decode(): return await utils.answer(message, self.strings["no_spotdl"]) # Try get youtube video id if "https://www.youtube.com/watch?v=" in line and len(line.split("https://www.youtube.com/watch?v=")) == 2: photo_id = line.split("https://www.youtube.com/watch?v=")[1] elif "as it's already downloaded" in line: photo_id = None else: return await utils.answer(message, self.strings["not_found"]) # Get track name and send message track_name = line.split('"')[1] await self.inline.form( text=line, photo=f"https://img.youtube.com/vi/{photo_id}/maxresdefault.jpg" if photo_id else None, message=message, reply_markup=[{"text": "Upload", "callback": self.inline__upload, "args": (track_name, message.chat.id)}] ) async def inline__upload(self, call, track_name, chat_id): with open(track_name + ".mp3","rb",) as input_file: content = input_file.read() input_file.seek(0) await self._client.send_file( chat_id, track_name + ".mp3" ) os.remove(track_name + ".mp3") await call.delete()