""" █▀▀ ▄▀█ █▄▀ █▀▀ █▀ ▀█▀ █░█░█ █ ▀▄▀ █▄▄ █▀█ █░█ ██▄ ▄█ ░█░ ▀▄▀▄▀ █ █░█ Copyleft 2022 t.me/CakesTwix This program is free software; you can redistribute it and/or modify """ __version__ = (1, 0, 1) # requires: transmission-rpc # scope: inline # scope: geektg_only # scope: geektg_min 3.1.15 # meta pic: https://img.icons8.com/ios-filled/512/40C057/torrent.png # meta developer: @cakestwix_mods import logging from .. import loader, utils import asyncio from telethon.tl.functions.channels import CreateChannelRequest from ..inline import GeekInlineQuery, rand from aiogram.types import ( InlineQueryResultArticle, InputTextMessageContent, InlineKeyboardMarkup, InlineKeyboardButton, CallbackQuery, ) from aiogram.utils.exceptions import MessageNotModified from transmission_rpc import Client from transmission_rpc.utils import format_size from transmission_rpc.error import TransmissionConnectError logger = logging.getLogger(__name__) @loader.unrestricted @loader.ratelimit @loader.tds class TransmissionMod(loader.Module): """Simple torrent client for Transmission""" strings = { "name": "Transmission", "cfg_username": "Username", "cfg_password": "Password", "cfg_port": "Post (9091)", "cfg_host": "Host (localhost)", "cfg_protocol": "Protocol (http)", "cfg_rpc": "RPC url (/transmission/)", "not_ready": "Pls check config", "torrent_name": "Name: ", "torrent_status": "Status: ", "torrent_hash": "Hash: ", "torrent_dir": "Directory: ", "torrent_size": "Size: ", "kb_update": "🔄 Update", "kb_close": "🚫 Close", "torrent_eta": "ETA: ", "torrent_error": "Torrent not found in result", "kb_start": "▶️", "kb_stop": "⏹", "kb_delete": "❌ Delete torrent ❌", "kb_delete_data": "❌ Delete torrent with data ❌", "answer_start": "Torrent starting", "answer_stop": "Torrent stopped", "answer_delete": "Torrent removed", "inline_title": "Torrent Manager", "inline_desc": "ℹ Click to view the parameters", "inline_answer": "ℹ No changes", } strings_ru = { "cfg_username": "Юзернейм", "cfg_password": "Пароль", "cfg_port": "Сообщение (9091)", "cfg_host": "Хост (localhost)", "cfg_protocol": "Протокол (http)", "cfg_rpc": "URL-адрес RPC (/transmission/)", "not_ready": "Пожалуйста, проверьте конфиг", "torrent_name": "Имя: ", "torrent_status": "Статус: ", "torrent_hash": "Хэш: ", "torrent_dir": "Директория: ", "torrent_size": "Размер: ", "kb_update": "🔄 Обновить", "kb_close": "🚫 Закрыть", "torrent_eta": "ETA: ", "torrent_error": "Torrent not found in result", "kb_delete": "❌ Удалить торрент ❌", "kb_delete_data": "❌ Удалить торрент с данными ❌", "answer_start": "Запуск торрента", "answer_stop": "Торрент остановлен", "answer_delete": "Торрент удален", "inline_title": "Торрент-менеджер", "inline_desc": "ℹ Нажмите, чтобы просмотреть параметры", "inline_answer": "ℹ Без изменений", } def stringTorrent(self, torrent): torrent_text = f"{self.strings['torrent_name']}{torrent.name} \n" torrent_text += f"{self.strings['torrent_status']}{torrent.status} \n" torrent_text += f"{self.strings['torrent_eta']}{torrent.format_eta()} \n" torrent_text += f"{self.strings['torrent_hash']}{torrent.hashString} \n" torrent_text += f"{self.strings['torrent_size']}{format_size(torrent.total_size)[0]} {format_size(torrent.total_size)[1]} \n" torrent_text += f"{self.strings['torrent_dir']}{torrent.download_dir} \n" return torrent_text def __init__(self): self.config = loader.ModuleConfig( "username", None, lambda m: self.strings("cfg_username", m), "password", None, lambda m: self.strings("cfg_password", m), "port", 9091, lambda m: self.strings("cfg_port", m), "host", "127.0.0.1", lambda m: self.strings("cfg_host", m), "protocol", "http", lambda m: self.strings("cfg_protocol", m), "rpc", "/transmission/", lambda m: self.strings("cfg_rpc", m), ) self.name = self.strings["name"] # Check Transmission Server self.is_ready = False try: self.TransmissionClientUserBot = Client( host=self.config["host"], port=self.config["port"], username=self.config["username"], password=self.config["password"], path=self.config["rpc"], protocol=self.config["protocol"], ) self.is_ready = True except TransmissionConnectError: pass async def client_ready(self, client, db): self._client = client self._me = await client.get_me(True) @loader.unrestricted @loader.ratelimit async def tinfocmd(self, message): """Useful information about transmission server""" if self.is_ready: session_stats = self.TransmissionClientUserBot.session_stats() timeout = self.TransmissionClientUserBot.timeout rpc_version = self.TransmissionClientUserBot.rpc_version is_port_open = self.TransmissionClientUserBot.port_test() port = session_stats.peer_port download_dir = session_stats.download_dir free_space = self.TransmissionClientUserBot.free_space(download_dir) string = "Info about your Transmission Server\n\n" string += f"RPC version : {rpc_version}\n" string += f"Current timeout for HTTP queries : {timeout}\n" string += f"Port is open : {is_port_open}\n" string += f"Port : {port}\n" string += f"Download path : {download_dir}\n" string += f"Free space : {format_size(free_space)[0]} {format_size(free_space)[1]}\n" await utils.answer(message, string) else: await utils.answer(message, self.strings["not_ready"]) await asyncio.sleep(5) await message.delete() @loader.unrestricted @loader.ratelimit async def tdownloadcmd(self, message): """Download Torrent file""" reply, args = await message.get_reply_message(), utils.get_args_raw(message) if reply and reply.media.document.mime_type == "application/x-bittorrent": path = await self._client.download_media(reply.media, "scam.torrent") torrent = self.TransmissionClientUserBot.add_torrent( "file://scam.torrent", download_dir=args or None ) kb = [ [ { "text": self.strings["kb_update"], "callback": self.inline_update_torrent, "args": [torrent.id], } ], [ { "text": self.strings["kb_start"], "callback": self.inline__start, "args": [torrent.id], }, { "text": self.strings["kb_stop"], "callback": self.inline__stop, "args": [torrent.id], }, ], [ { "text": self.strings["kb_delete_data"], "callback": self.inline__delete, "args": [torrent.id, True], }, { "text": self.strings["kb_delete"], "callback": self.inline__delete, "args": [torrent.id, False], }, ], [{"text": self.strings["kb_close"], "callback": self.inline__close}], ] await self.inline.form( self.stringTorrent( self.TransmissionClientUserBot.get_torrent(torrent.id) ), message=message, reply_markup=kb, always_allow=self._client.dispatcher.security._owner, ) async def transmission_inline_handler(self, query: GeekInlineQuery) -> None: """ General info (Inline) """ args = query.args param = {"list": "List of 10 torrents", "search": "Search torrents by name"} param_text = "Available parameters: \n" for item in param: param_text += f"⦁ {item} - {param[item]}\n" if not args: await query.answer( [ InlineQueryResultArticle( id=1, title=self.strings["inline_title"], description=self.strings["inline_desc"], input_message_content=InputTextMessageContent( param_text, "HTML", disable_web_page_preview=True ), thumb_url="https://img.icons8.com/ios-filled/128/26e07f/torrent.png", thumb_width=128, thumb_height=128, ) ], cache_time=0, ) return if "list" in args: kb_torrent_list = [] for torrent in self.TransmissionClientUserBot.get_torrents(): torrent_markup = InlineKeyboardMarkup(row_width=3) torrent_markup.insert( InlineKeyboardButton( self.strings["kb_update"], callback_data="cake_update" + str(torrent.id), ), ) torrent_markup.add( InlineKeyboardButton( self.strings["kb_start"], callback_data="cake_start" + str(torrent.id), ), InlineKeyboardButton( self.strings["kb_stop"], callback_data="cake_stop" + str(torrent.id), ), ) torrent_markup.add( InlineKeyboardButton( self.strings["kb_delete_data"], callback_data="cake_delete" + str(torrent.id), ), InlineKeyboardButton( self.strings["kb_delete"], callback_data="cake_remove" + str(torrent.id), ), ) kb_torrent_list.append( InlineQueryResultArticle( id=rand(10), title=torrent.name, description=self.strings["inline_desc"], input_message_content=InputTextMessageContent( self.stringTorrent( self.TransmissionClientUserBot.get_torrent(torrent.id) ), "HTML", disable_web_page_preview=True, ), reply_markup=torrent_markup, ) ) if len(kb_torrent_list) == 10: break await query.answer(kb_torrent_list[:10], cache_time=0) return if "search" in args: search_arg = " ".join(args.split()[1:]) # transmission search BlaBlaBla kb_torrent_list = [] for torrent in self.TransmissionClientUserBot.get_torrents(): if search_arg in torrent.name: torrent_markup = InlineKeyboardMarkup(row_width=3) torrent_markup.insert( InlineKeyboardButton( self.strings["kb_update"], callback_data="cake_update" + str(torrent.id), ), ) torrent_markup.add( InlineKeyboardButton( self.strings["kb_start"], callback_data="cake_start" + str(torrent.id), ), InlineKeyboardButton( self.strings["kb_stop"], callback_data="cake_stop" + str(torrent.id), ), ) torrent_markup.add( InlineKeyboardButton( self.strings["kb_delete_data"], callback_data="cake_delete" + str(torrent.id), ), InlineKeyboardButton( self.strings["kb_delete"], callback_data="cake_remove" + str(torrent.id), ), ) kb_torrent_list.append( InlineQueryResultArticle( id=rand(20), title=torrent.name, description=self.strings["inline_desc"], input_message_content=InputTextMessageContent( self.stringTorrent( self.TransmissionClientUserBot.get_torrent( torrent.id ) ), "HTML", disable_web_page_preview=True, ), reply_markup=torrent_markup, ) ) return await query.answer(kb_torrent_list[:10], cache_time=0) # Inline button handler async def inline__close(self, call) -> None: await call.delete() async def inline_update_torrent(self, call, torrent_id) -> None: kb = [ [ { "text": self.strings["kb_update"], "callback": self.inline_update_torrent, "args": [torrent_id], } ], [ { "text": self.strings["kb_start"], "callback": self.inline__start, "args": [torrent_id], }, { "text": self.strings["kb_stop"], "callback": self.inline__stop, "args": [torrent_id], }, ], [ { "text": self.strings["kb_delete_data"], "callback": self.inline__delete, "args": [torrent_id, True], }, { "text": self.strings["kb_delete"], "callback": self.inline__delete, "args": [torrent_id, False], }, ], [{"text": self.strings["kb_close"], "callback": self.inline__close}], ] try: await call.edit( self.stringTorrent( self.TransmissionClientUserBot.get_torrent(torrent_id) ), reply_markup=kb, ) except KeyError: await call.edit(self.strings["torrent_error"]) async def inline__start(self, call, torrent_id) -> None: self.TransmissionClientUserBot.get_torrent(torrent_id).start() await call.answer(self.strings["answer_start"]) async def inline__stop(self, call, torrent_id) -> None: self.TransmissionClientUserBot.get_torrent(torrent_id).stop() await call.answer(self.strings["answer_stop"]) async def inline__delete(self, call, torrent_id, delete_data) -> None: self.TransmissionClientUserBot.remove_torrent(torrent_id, delete_data) await call.answer(self.strings["answer_delete"]) # Callback buttons (for Inline search) async def button_callback_handler(self, call: CallbackQuery) -> None: """ Process button presses """ # await call.answer(call.data, show_alert=True) # Debug # Update if call.data[:11] == "cake_update": torrent_markup = InlineKeyboardMarkup(row_width=3) torrent_markup.insert( InlineKeyboardButton( self.strings["kb_update"], callback_data="cake_update" + str(call.data[11:]), ), ) torrent_markup.add( InlineKeyboardButton( self.strings["kb_start"], callback_data="cake_start" + str(call.data[10:]), ), InlineKeyboardButton( self.strings["kb_stop"], callback_data="cake_stop" + str(call.data[9:]), ), ) torrent_markup.add( InlineKeyboardButton( self.strings["kb_delete_data"], callback_data="cake_detete" + str(call.data[11:]), ), InlineKeyboardButton( self.strings["kb_delete"], callback_data="cake_remove" + str(call.data[11:]), ), ) try: torrent = self.TransmissionClientUserBot.get_torrent( int(call.data[11:]) ) await self.inline.bot.edit_message_text( self.stringTorrent(torrent), reply_markup=torrent_markup, inline_message_id=call.inline_message_id, parse_mode="HTML", ) except KeyError: await self.inline.bot.edit_message_text( self.strings["torrent_error"], inline_message_id=call.inline_message_id, parse_mode="HTML", ) except MessageNotModified: await call.answer(self.strings["inline_answer"]) # Start if call.data[:10] == "cake_start": self.TransmissionClientUserBot.get_torrent(int(call.data[11:])).start() return await call.answer(self.strings["answer_start"]) # Stop if call.data[:9] == "cake_stop": self.TransmissionClientUserBot.get_torrent(int(call.data[11:])).stop() return await call.answer(self.strings["answer_stop"]) # Delete torrent with data if call.data[:11] == "cake_delete": self.TransmissionClientUserBot.remove_torrent(int(call.data[11:]), True) return await call.answer(self.strings["answer_delete"]) # Just delete torrent if call.data[:11] == "cake_remove": self.TransmissionClientUserBot.remove_torrent(int(call.data[11:]), False) return await call.answer(self.strings["answer_delete"])