# .------.------.------.------.------.------.------.------.------.------. # |D.--. |4.--. |N.--. |1.--. |3.--. |L.--. |3.--. |K.--. |0.--. |0.--. | # | :/\: | :/\: | :(): | :/\: | :(): | :/\: | :(): | :/\: | :/\: | :/\: | # | (__) | :\/: | ()() | (__) | ()() | (__) | ()() | :\/: | :\/: | :\/: | # | '--'D| '--'4| '--'N| '--'1| '--'3| '--'L| '--'3| '--'K| '--'0| '--'0| # `------`------`------`------`------`------`------`------`------`------' # # Copyright 2023 t.me/D4n13l3k00 # Licensed under the Creative Commons CC BY-NC-ND 4.0 # # Full license text can be found at: # https://creativecommons.org/licenses/by-nc-nd/4.0/legalcode # # Human-friendly one: # https://creativecommons.org/licenses/by-nc-nd/4.0 # meta developer: @D4n13l3k00 # requires: yt-dlp import os from telethon.tl.types import DocumentAttributeAudio from yt_dlp import YoutubeDL from yt_dlp.utils import ( ContentTooShortError, DownloadError, ExtractorError, GeoRestrictedError, MaxDownloadsReached, PostProcessingError, UnavailableVideoError, XAttrMetadataError, ) from .. import loader, utils # type: ignore @loader.tds class YtDlMod(loader.Module): """Youtube-Dl Module""" strings = { "name": "Youtube-Dl", "preparing": "[YouTube-Dl] Preparing...", "downloading": "[YouTube-Dl] Downloading...", "working": "[YouTube-Dl] Working...", "exporting": "[YouTube-Dl] Exporting...", "reply": "[YouTube-Dl] No link!", "noargs": "[YouTube-Dl] No args!", "content_too_short": "[YouTube-Dl] Downloading content too short!", "geoban": "[YouTube-Dl] The video is not available " "for your geographical location due to geographical " "restrictions set by the website!", "maxdlserr": '[YouTube-Dl] The download limit is as follows: " oh ahah"', "pperr": "[YouTube-Dl] Error in post-processing!", "noformat": "[YouTube-Dl] Media is not available in " "the requested format", "xameerr": "[YouTube-Dl] {0.code}: {0.msg}\n{0.reason}", "exporterr": "[YouTube-Dl] Error when exporting video", "err": "[YouTube-Dl] {}", "err2": "[YouTube-Dl] {}: {}", } async def ripvcmd(self, m): """.ripv - download video""" await self.riper(m, "video") async def ripacmd(self, m): """.ripa - download audio""" await self.riper(m, "audio") async def riper(self, m, type): reply = await m.get_reply_message() args = utils.get_args_raw(m) url = args or reply.raw_text if not url: return await utils.answer(m, self.strings("noargs", m)) m = await utils.answer(m, self.strings("preparing", m)) if type == "audio": opts = { "format": "bestaudio", "addmetadata": True, "key": "FFmpegMetadata", "writethumbnail": True, "prefer_ffmpeg": True, "geo_bypass": True, "nocheckcertificate": True, "postprocessors": [ { "key": "FFmpegExtractAudio", "preferredcodec": "mp3", "preferredquality": "320", } ], "outtmpl": "%(id)s.mp3", "quiet": True, "logtostderr": False, } video = False song = True elif type == "video": opts = { "format": "best", "addmetadata": True, "key": "FFmpegMetadata", "prefer_ffmpeg": True, "geo_bypass": True, "nocheckcertificate": True, "postprocessors": [ {"key": "FFmpegVideoConvertor", "preferedformat": "mp4"} ], "outtmpl": "%(id)s.mp4", "logtostderr": False, "quiet": True, } song = False video = True try: await utils.answer(m, self.strings("downloading", m)) with YoutubeDL(opts) as rip: rip_data = rip.extract_info(url) except DownloadError as DE: return await utils.answer(m, self.strings("err", m).format(str(DE))) except ContentTooShortError: return await utils.answer(m, self.strings("content_too_short", m)) except GeoRestrictedError: return await utils.answer(m, self.strings("geoban", m)) except MaxDownloadsReached: return await utils.answer(m, self.strings("maxdlserr", m)) except PostProcessingError: return await utils.answer(m, self.strings("pperr", m)) except UnavailableVideoError: return await utils.answer(m, self.strings("noformat", m)) except XAttrMetadataError as XAME: return await utils.answer(m, self.strings("xameerr", m).format(XAME)) except ExtractorError: return await utils.answer(m, self.strings("exporterr", m)) except Exception as e: return await utils.answer( m, self.strings("err2", m).format(str(type(e)), str(e)) ) if song: u = rip_data["uploader"] if "uploader" in rip_data else "Northing" await utils.answer( m, open(f"{rip_data['id']}.mp3.mp3", "rb"), supports_streaming=True, reply_to=reply.id if reply else None, attributes=[ DocumentAttributeAudio( duration=int(rip_data["duration"]), title=str(rip_data["title"]), performer=u, ) ], ) os.remove(f"{rip_data['id']}.mp3") elif video: await utils.answer( m, open(f"{rip_data['id']}.mp4", "rb"), reply_to=reply.id if reply else None, supports_streaming=True, caption=rip_data["title"], ) os.remove(f"{rip_data['id']}.mp4")