""" █▀▀ ▄▀█ █▄▀ █▀▀ █▀ ▀█▀ █░█░█ █ ▀▄▀ █▄▄ █▀█ █░█ ██▄ ▄█ ░█░ ▀▄▀▄▀ █ █░█ Copyleft 2022 t.me/CakesTwix This program is free software; you can redistribute it and/or modify """ __version__ = (1, 2, 3) # meta pic: https://img.icons8.com/bubbles/512/000000/youtube-play.png # meta developer: @cakestwix_mods # requires: yt_dlp aiohttp # scope: hikka_min 1.1.11 # scope: hikka_only import asyncio import logging import aiohttp import os from yt_dlp.utils import DownloadError import yt_dlp from telethon.tl.types import Message from telethon import functions, types from .. import loader, utils from ..inline.types import InlineCall logger = logging.getLogger(__name__) def bytes2human(num, suffix="B"): if not num: return 0 for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]: if abs(num) < 1024.0: return f"{num:3.1f}{unit}{suffix}" num /= 1024.0 return f"{num:.1f}Yi{suffix}" def progressbar(iteration: int, length: int) -> str: percent = ("{0:." + str(1) + "f}").format(100 * (iteration / float(100))) filledLength = int(length * iteration // 100) return "█" * filledLength + "▒" * (length - filledLength) @loader.tds class YouTubeMod(loader.Module): """Download YouTube videos with video and audio quality selection""" strings = { "name": "InlineYouTube", "args": "🎞 You need to specify link", "downloading": "🎞 Downloading...", "not_found": "🎞 Video not found...", "no_qualt":"No quality", "format": "Format:", "ext": "Ext:", "video_codec": "Video codec:", "audio": "Audio", "file_size": "File size:", "uploading": "🎞 Uploading File...", "getting_info": "ℹ️ Getting information about the video...", } strings_ru = { "name": "InlineYouTube", "args": "🎞 Вам необходимо указать ссылку", "downloading": "🎞 Скачиваю...", "not_found": "🎞 Видео не найдено...", "no_qualt":"Нету такого качества", "format": "Формат:", "ext": "Расширение:", "video_codec": "Видео кодек:", "audio": "Аудио", "file_size": "Размер:", "uploading": "🎞 Загружаю...", "getting_info": "ℹ️ ", } async def client_ready(self, client, db): self._db = db self._client = client @loader.unrestricted async def ytcmd(self, message: Message): """[quality(144p/720p/etc)] - Download video from youtube""" args = utils.get_args(message) await utils.answer(message, self.strings("getting_info")) if not args: return await utils.answer(message, self.strings("args")) with yt_dlp.YoutubeDL() as ydl: try: info_dict = ydl.extract_info( args[1] if len(args) >= 2 else args[0], download=False ) except DownloadError as e: return await utils.answer(message, e.msg) formats_list = [{"text": f"{item['format_note']} ({item['video_ext']})", "callback": self.format_change, "args": (item, info_dict, message.chat.id, item["format_id"],),} for item in info_dict["formats"] if item["ext"] in ["mp4", "webm"] and item["vcodec"] != "none" and (len(args) >= 2 and args[0] == item["format_note"] or len(args) < 2)] caption = f"{info_dict['title']}\n\n" # caption += info_dict["description"] await self.inline.form(text=caption if formats_list else self.strings["no_qualt"], photo=f"https://img.youtube.com/vi/{info_dict['id']}/0.jpg", message=message, reply_markup=utils.chunks(formats_list, 2)) async def format_change( self, call: InlineCall, quality: dict, info_dict: dict, chat_id: int, format_id: int): string = f"{self.strings['format']} {quality['format']}\n" string += f"{self.strings['ext']} {quality['ext']}\n" string += f"{self.strings['video_codec']} {quality['vcodec']}\n" string += f"{self.strings['file_size']} {bytes2human(quality['filesize'])}\n" audio_keyboard = [{"text": f"{self.strings['audio']} ({audio['format_note']})", "callback": self.download, "args": (info_dict["id"], quality["ext"], quality["format_id"], audio["format_id"], chat_id,),} for audio in info_dict["formats"] if audio["ext"] == "m4a"] audio_keyboard.append( { "text": "Back", "callback": self.back, "args": ( info_dict, chat_id, ), }, ) await call.edit( text=string, reply_markup=audio_keyboard, ) async def back(self, call: InlineCall, info_dict: dict, chat_id: int): formats_list = [{"text": f"{item['format_note']} ({item['video_ext']})", "callback": self.format_change, "args": (item, info_dict, chat_id, item["format_id"]),} for item in info_dict["formats"] if item["ext"] in ["mp4", "webm"] and item["vcodec"] != "none"] caption = f"{info_dict['title']}\n\n" # caption += info_dict["description"] await call.edit(text=caption, reply_markup=utils.chunks(formats_list, 2)) async def download( self, call: InlineCall, video_id: str, ext: str, video_format: int, audio_format: int, chat_id: int, ): meta = {} def download(): nonlocal meta with yt_dlp.YoutubeDL( { "format": "{}+{}".format(str(video_format), str(audio_format)), "outtmpl": "%(resolution)s.%(id)s.%(ext)s", } ) as yd: meta = yd.extract_info( "https://www.youtube.com/watch?v=" + video_id, download=False ) yd.download("https://www.youtube.com/watch?v=" + video_id) await call.edit( text=f"{self.strings['downloading']}", ) await utils.run_sync(download) await call.edit( text=f"{self.strings['uploading']}", ) # Download thumb for video async with aiohttp.ClientSession() as session: async with session.get(f"https://img.youtube.com/vi/{meta['id']}/0.jpg") as resp: with open(meta['id'] + ".jpg", 'wb') as fd: async for chunk in resp.content.iter_chunked(512): fd.write(chunk) await self._client.send_file( chat_id, "{0}x{1}.{2}.{3}".format( (meta["width"]), (meta["height"]), (meta["id"]), (meta["ext"].replace("webm", "mkv")), ), supports_streaming=True, thumb=meta["id"] + ".jpg" ) os.remove( "{0}x{1}.{2}.{3}".format( (meta["width"]), (meta["height"]), (meta["id"]), (meta["ext"].replace("webm", "mkv")), ) ) os.remove(meta["id"] + ".jpg") await call.delete()