# Coded by D4n1l3k300 # # supplemented by Yahikor0 # # This code under AGPL-3.0 # # requires: ffmpeg-python pytgcalls[telethon] youtube-dl ShazamAPI import io import os import re import logging import ffmpeg import pytgcalls from ShazamAPI import Shazam from youtube_dl import YoutubeDL from pytgcalls import GroupCallFactory from pytgcalls.implementation.group_call_file import GroupCallFile from telethon import types from typing import * from .. import loader, utils @loader.unrestricted @loader.ratelimit @loader.tds class VoiceMod(loader.Module): """Module for working with voicechat """ strings = { "name": "VoiceMod", "downloading": "[VoiceMod] Downloading...", "converting": "[VoiceMod] Converting...", "playing": "[VoiceMod] Playing...", "plsjoin": "[VoiceMod] You are not joined (type .vjoin)", "stop": "[VoiceMod] Playing stopped!", "join": "[VoiceMod] Joined!", "leave": "[VoiceMod] Leaved!", "pause": "[VoiceMod] Paused!", "resume": "[VoiceMod] Resumed!", "mute": "[VoiceMod] Muted!", "unmute": "[VoiceMod] Unmuted!", "replay": "[VoiceMod] Replaying...", "error": "[VoiceMod] Error: {}", } ytdlopts = { 'format': 'bestaudio', 'addmetadata': True, 'key': 'FFmpegMetadata', 'writethumbnail': True, 'prefer_ffmpeg': True, 'geo_bypass': True, 'nocheckcertificate': True, 'postprocessors': [{ 'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3', 'preferredquality': '320', }], 'outtmpl': 'ytdl_out.mp3', 'quiet': True, 'logtostderr': False } group_calls: Dict[int, GroupCallFile] = {} tag = "[Shazam] " async def get_chat(self, m: types.Message): args = utils.get_args_raw(m) if not args: chat = m.chat.id else: try: chat = int(args) except: chat = args try: chat = (await m.client.get_entity(chat)).id except Exception as e: await utils.answer(m, self.strings('error').format(str(e))) return None return chat def _call(self, m: types.Message, chat: int): if str(chat) not in self.group_calls: self.group_calls[str(chat)] = GroupCallFactory( m.client, pytgcalls.GroupCallFactory.MTPROTO_CLIENT_TYPE.TELETHON).get_file_group_call() async def vplaycmd(self, m: types.Message): """.vplay [chat (optional)] Play audio in VC""" args = utils.get_args_raw(m) r = await m.get_reply_message() chat = from_file = link = None if args: _ = re.match(r'(-?\d+|@[A-Za-z0-9_]{5,})\s+(.*)', args) __ = re.match(r'(-?\d+|@[A-Za-z0-9_]{5,})', args) if _: chat = _.group(1) link = _.group(2) elif __: chat = __.group(1) else: chat = m.chat.id link = args or None try: chat = int(chat) except: chat = chat try: chat = (await m.client.get_entity(chat)).id except Exception as e: return await utils.answer(m, self.strings('error').format(str(e))) else: chat = m.chat.id if r and r.audio and not link: from_file = True if not link and (not r or not r.audio): return utils.answer(m, 'no audio/link') if str(chat) not in self.group_calls: return await utils.answer(m, self.strings('plsjoin')) self._call(m, chat) input_file = f'{chat}.raw' m = await utils.answer(m, self.strings('downloading')) if from_file: audio_original = await r.download_media() else: try: with YoutubeDL(self.ytdlopts) as rip: rip.extract_info(link) except Exception as e: return await utils.answer(m, self.strings('error').format(str(e))) audio_original = 'ytdl_out.mp3' m = await utils.answer(m, self.strings('converting')) ffmpeg.input(audio_original).output( input_file, format='s16le', acodec='pcm_s16le', ac=2, ar='48k' ).overwrite_output().run() os.remove(audio_original) await utils.answer(m, self.strings('playing')) self.group_calls[str(chat)].input_filename = input_file async def vjoincmd(self, m: types.Message): """.vjoin Join to the VC""" chat = await self.get_chat(m) if not chat: return self._call(m, chat) await self.group_calls[str(chat)].start(chat) await utils.answer(m, self.strings('join')) async def vleavecmd(self, m: types.Message): """.vleave Leave from the VC""" chat = await self.get_chat(m) if not chat: return self._call(m, chat) await self.group_calls[str(chat)].stop() del self.group_calls[str(chat)] try: os.remove(f'{chat}.raw') except: pass await utils.answer(m, self.strings('leave')) async def vreplaycmd(self, m: types.Message): """.vreplay Replay audio in VC""" chat = await self.get_chat(m) if not chat: return self._call(m, chat) self.group_calls[str(chat)].restart_playout() await utils.answer(m, self.strings('replay')) async def vstopcmd(self, m: types.Message): """.vstop Stop play in VC""" chat = await self.get_chat(m) if not chat: return self._call(m, chat) self.group_calls[str(chat)].stop_playout() await utils.answer(m, self.strings('stop')) async def vmutecmd(self, m: types.Message): """.vmute Mute player in VC""" chat = await self.get_chat(m) if not chat: return self._call(m, chat) self.group_calls[str(chat)].set_is_mute(True) await utils.answer(m, self.strings('unmute')) async def vunmutecmd(self, m: types.Message): """.vmute Unmute player in VC""" chat = await self.get_chat(m) if not chat: return self._call(m, chat) self.group_calls[str(chat)].set_is_mute(False) await utils.answer(m, self.strings('mute')) async def vpausecmd(self, m: types.Message): """.vpause Pause player in VC""" chat = await self.get_chat(m) if not chat: return self._call(m, chat) self.group_calls[str(chat)].pause_playout() await utils.answer(m, self.strings('pause')) async def vresumecmd(self, m: types.Message): """.vresume Resume player in VC""" chat = await self.get_chat(m) if not chat: return self._call(m, chat) self.group_calls[str(chat)].resume_playout() await utils.answer(m, self.strings('resume')) async def vdebugcmd(self, m: types.Message): """.vdebug debug""" await utils.answer(m, f'DEBUG : {str(self.group_calls)}') async def smcmd(self, message): """.sm to find music.""" args = utils.get_args_raw(message) reply = await message.get_reply_message() if not args: return await message.edit("No args.") try: await message.edit("Loading...") music = await message.client.inline_query('lybot', args) await message.delete() await message.client.send_file(message.to_id, music[0].result.document, reply_to=reply.id if reply else None) except: return await message.client.send_message(message.chat_id, f" Music named {args} not found. ") async def shazamcmd(self, message): """.shazam - recognize track""" s = await get_audio_shazam(message) if not s: return try: shazam = Shazam(s.track.read()) recog = shazam.recognizeSong() track = next(recog)[1]['track'] await message.client.send_file(message.to_id, file=track['images']['background'], caption=self.tag + "recognized track: " + track['share']['subject'], reply_to=s.reply.id) await message.delete() except: await message.edit(self.tag + "Could not recognize...") async def get_audio_shazam(message): class rct(): track = io.BytesIO() reply = None reply = await message.get_reply_message() if reply and reply.file and reply.file.mime_type.split("/")[0] == "audio": ae = rct() await utils.answer(message, "Downloading...") ae.track = io.BytesIO(await reply.download_media(bytes)) ae.reply = reply await message.edit("Recognizing...") return ae else: await utils.answer(message, "reply to audio...") return None