"""
█▀▀ ▄▀█ █▄▀ █▀▀ █▀ ▀█▀ █░█░█ █ ▀▄▀
█▄▄ █▀█ █░█ ██▄ ▄█ ░█░ ▀▄▀▄▀ █ █░█
Copyleft 2022 t.me/CakesTwix
This program is free software; you can redistribute it and/or modify
"""
__version__ = (1, 2, 3)
# meta pic: https://img.icons8.com/bubbles/512/000000/youtube-play.png
# meta developer: @cakestwix_mods
# requires: yt_dlp aiohttp
# scope: hikka_min 1.1.11
# scope: hikka_only
import asyncio
import logging
import aiohttp
import os
from yt_dlp.utils import DownloadError
import yt_dlp
from telethon.tl.types import Message
from telethon import functions, types
from .. import loader, utils
from ..inline.types import InlineCall
logger = logging.getLogger(__name__)
def bytes2human(num, suffix="B"):
if not num:
return 0
for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]:
if abs(num) < 1024.0:
return f"{num:3.1f}{unit}{suffix}"
num /= 1024.0
return f"{num:.1f}Yi{suffix}"
def progressbar(iteration: int, length: int) -> str:
percent = ("{0:." + str(1) + "f}").format(100 * (iteration / float(100)))
filledLength = int(length * iteration // 100)
return "█" * filledLength + "▒" * (length - filledLength)
@loader.tds
class YouTubeMod(loader.Module):
"""Download YouTube videos with video and audio quality selection"""
strings = {
"name": "InlineYouTube",
"args": "🎞 You need to specify link",
"downloading": "🎞 Downloading...",
"not_found": "🎞 Video not found...",
"no_qualt":"No quality",
"format": "Format:",
"ext": "Ext:",
"video_codec": "Video codec:",
"audio": "Audio",
"file_size": "File size:",
"uploading": "🎞 Uploading File...",
"getting_info": "ℹ️ Getting information about the video...",
}
strings_ru = {
"name": "InlineYouTube",
"args": "🎞 Вам необходимо указать ссылку",
"downloading": "🎞 Скачиваю...",
"not_found": "🎞 Видео не найдено...",
"no_qualt":"Нету такого качества",
"format": "Формат:",
"ext": "Расширение:",
"video_codec": "Видео кодек:",
"audio": "Аудио",
"file_size": "Размер:",
"uploading": "🎞 Загружаю...",
"getting_info": "ℹ️ ",
}
async def client_ready(self, client, db):
self._db = db
self._client = client
@loader.unrestricted
async def ytcmd(self, message: Message):
"""[quality(144p/720p/etc)] - Download video from youtube"""
args = utils.get_args(message)
await utils.answer(message, self.strings("getting_info"))
if not args:
return await utils.answer(message, self.strings("args"))
with yt_dlp.YoutubeDL() as ydl:
try:
info_dict = ydl.extract_info(
args[1] if len(args) >= 2 else args[0], download=False
)
except DownloadError as e:
return await utils.answer(message, e.msg)
formats_list = [{"text": f"{item['format_note']} ({item['video_ext']})", "callback": self.format_change, "args": (item, info_dict, message.chat.id, item["format_id"],),} for item in info_dict["formats"] if item["ext"] in ["mp4", "webm"] and item["vcodec"] != "none" and (len(args) >= 2 and args[0] == item["format_note"] or len(args) < 2)]
caption = f"{info_dict['title']}\n\n"
# caption += info_dict["description"]
await self.inline.form(text=caption if formats_list else self.strings["no_qualt"], photo=f"https://img.youtube.com/vi/{info_dict['id']}/0.jpg", message=message, reply_markup=utils.chunks(formats_list, 2))
async def format_change(
self,
call: InlineCall,
quality: dict,
info_dict: dict,
chat_id: int,
format_id: int):
string = f"{self.strings['format']} {quality['format']}\n"
string += f"{self.strings['ext']} {quality['ext']}\n"
string += f"{self.strings['video_codec']} {quality['vcodec']}\n"
string += f"{self.strings['file_size']} {bytes2human(quality['filesize'])}\n"
audio_keyboard = [{"text": f"{self.strings['audio']} ({audio['format_note']})", "callback": self.download, "args": (info_dict["id"], quality["ext"], quality["format_id"], audio["format_id"], chat_id,),} for audio in info_dict["formats"] if audio["ext"] == "m4a"]
audio_keyboard.append(
{
"text": "Back",
"callback": self.back,
"args": (
info_dict,
chat_id,
),
},
)
await call.edit(
text=string,
reply_markup=audio_keyboard,
)
async def back(self, call: InlineCall, info_dict: dict, chat_id: int):
formats_list = [{"text": f"{item['format_note']} ({item['video_ext']})", "callback": self.format_change, "args": (item, info_dict, chat_id, item["format_id"]),} for item in info_dict["formats"] if item["ext"] in ["mp4", "webm"] and item["vcodec"] != "none"]
caption = f"{info_dict['title']}\n\n"
# caption += info_dict["description"]
await call.edit(text=caption, reply_markup=utils.chunks(formats_list, 2))
async def download(
self,
call: InlineCall,
video_id: str,
ext: str,
video_format: int,
audio_format: int,
chat_id: int,
):
meta = {}
def download():
nonlocal meta
with yt_dlp.YoutubeDL(
{
"format": "{}+{}".format(str(video_format), str(audio_format)),
"outtmpl": "%(resolution)s.%(id)s.%(ext)s",
}
) as yd:
meta = yd.extract_info(
"https://www.youtube.com/watch?v=" + video_id, download=False
)
yd.download("https://www.youtube.com/watch?v=" + video_id)
await call.edit(
text=f"{self.strings['downloading']}",
)
await utils.run_sync(download)
await call.edit(
text=f"{self.strings['uploading']}",
)
# Download thumb for video
async with aiohttp.ClientSession() as session:
async with session.get(f"https://img.youtube.com/vi/{meta['id']}/0.jpg") as resp:
with open(meta['id'] + ".jpg", 'wb') as fd:
async for chunk in resp.content.iter_chunked(512):
fd.write(chunk)
await self._client.send_file(
chat_id,
"{0}x{1}.{2}.{3}".format(
(meta["width"]),
(meta["height"]),
(meta["id"]),
(meta["ext"].replace("webm", "mkv")),
),
supports_streaming=True,
thumb=meta["id"] + ".jpg"
)
os.remove(
"{0}x{1}.{2}.{3}".format(
(meta["width"]),
(meta["height"]),
(meta["id"]),
(meta["ext"].replace("webm", "mkv")),
)
)
os.remove(meta["id"] + ".jpg")
await call.delete()