# █ █ ▀ █▄▀ ▄▀█ █▀█ ▀ # █▀█ █ █ █ █▀█ █▀▄ █ # © Copyright 2022 # https://t.me/hikariatama # # 🔒 Licensed under the GNU AGPLv3 # 🌐 https://www.gnu.org/licenses/agpl-3.0.html # scope: hikka_min 1.2.10 # meta pic: https://img.icons8.com/plasticine/344/apple-settings--v2.png # meta banner: https://mods.hikariatama.ru/badges/systemd.jpg # scope: inline # scope: hikka_only # meta developer: @hikarimods # ⚠️ Please, ensure that userbot has enough rights to control units # Put these lines in /etc/sudoers using visudo command: # # user ALL=(ALL) NOPASSWD: /bin/systemctl # user ALL=(ALL) NOPASSWD: /bin/journalctl # # Where `user` is user on behalf of which the userbot is running import asyncio import io import subprocess from typing import Union from telethon.tl.types import Message from .. import loader, utils from ..inline.types import InlineCall def human_readable_size(size: float, decimal_places: int = 2) -> str: for unit in ["B", "K", "M", "G", "T", "P"]: if size < 1024.0 or unit == "P": break size /= 1024.0 return f"{size:.{decimal_places}f} {unit}" @loader.tds class SystemdMod(loader.Module): """Control systemd units easily""" strings = { "name": "Systemd", "panel": ( "🎛 Here you can control" " your systemd units\n\n{}" ), "unit_doesnt_exist": ( "🚫 Unit" " {} doesn't exist!" ), "args": ( "🚫 No arguments" " specified" ), "unit_added": ( " Unit" " {} with name {} added" ), "unit_removed": ( " Unit" " {} removed" ), "unit_action_done": ( " Action" " {} performed on unit {}" ), "unit_control": ( "🎛 Interacting with unit" " {} ({})\n{} Unit status:" " {}" ), "action_not_found": ( "🚫 Action" " {} not found" ), "unit_renamed": ( " Unit" " {} renamed to {}" ), "stop_btn": "🍎 Stop", "start_btn": "🍏 Start", "restart_btn": "🔄 Restart", "logs_btn": "📄 Logs", "tail_btn": "🚅 Tail", "back_btn": "🔙 Back", "close_btn": "✖️ Close", "refresh_btn": "🔄 Refresh", } strings_ru = { "panel": ( "🎛 Здесь вы можете" " управлять своими юнитами systemd\n\n{}" ), "unit_doesnt_exist": ( "🚫 Юнит" " {} не существует!" ), "args": ( "🚫 Не указаны" " аргументы" ), "unit_added": ( " Юнит" " {} с именем {} добавлен" ), "unit_removed": ( " Юнит" " {} удалён" ), "unit_action_done": ( " Действие" " {} выполнено на юните {}" ), "unit_control": ( "🎛 Взаимодействие с" " юнитом {} ({})\n{} Статус" " юнита: {}" ), "action_not_found": ( "🚫 Действие" " {} не найдено" ), "unit_renamed": ( " Юнит" " {} переименован в {}" ), "stop_btn": "🍎 Стоп", "start_btn": "🍏 Старт", "restart_btn": "🔄 Рестарт", "logs_btn": "📄 Логи", "tail_btn": "🚅 Тейл", "back_btn": "🔙 Назад", "close_btn": "✖️ Закрыть", "refresh_btn": "🔄 Обновить", "_cmd_doc_units": "Показать список юнитов", "_cmd_doc_addunit": " - Добавить юнит", "_cmd_doc_nameunit": " - Переименовать юнит", "_cmd_doc_delunit": " - Удалить юнит", "_cmd_doc_unit": " - Управлять юнитом", "_cls_doc": "Простое и удобное управление юнитами systemd", } strings_de = { "panel": ( "🎛 Hier kannst du deine" " systemd-Einheiten kontrollieren\n\n{}" ), "unit_doesnt_exist": ( "🚫 Einheit" " {} existiert nicht!" ), "args": ( "🚫 Keine Argumente" " angegeben" ), "unit_added": ( " Einheit" " {} mit dem Namen {}" " hinzugefügt" ), "unit_removed": ( " Einheit" " {} entfernt" ), "unit_action_done": ( " Aktion" " {} auf Einheit {} ausgeführt" ), "unit_control": ( "🎛 Interagiere mit" " Einheit {} ({})\n{}" " Einheitsstatus: {}" ), "action_not_found": ( "🚫 Aktion" " {} nicht gefunden" ), "unit_renamed": ( " Einheit" " {} umbenannt zu {}" ), "stop_btn": "🍎 Stop", "start_btn": "🍏 Start", "restart_btn": "🔄 Neustart", "logs_btn": "📄 Logs", "tail_btn": "🚅 Tail", "back_btn": "🔙 Zurück", "close_btn": "✖️ Schließen", "refresh_btn": "🔄 Aktualisieren", "_cmd_doc_units": "Liste der Einheiten anzeigen", "_cmd_doc_addunit": " - Einheit hinzufügen", "_cmd_doc_nameunit": " - Einheit umbenennen", "_cmd_doc_delunit": " - Einheit entfernen", "_cmd_doc_unit": " - Einheit verwalten", "_cls_doc": "Einfache und bequeme Verwaltung von systemd-Einheiten", } strings_hi = { "panel": ( "🎛 यहाँ आप अपने systemd" " इकाइयों का नियंत्रण कर सकते हैं\n\n{}" ), "unit_doesnt_exist": ( "🚫 इकाई" " {} अस्तित्व में नहीं है!" ), "args": ( "🚫 कोई तर्क निर्दिष्ट" " नहीं किया गया" ), "unit_added": ( " इकाई" " {} नाम {} के साथ जोड़ा गया" ), "unit_removed": ( " इकाई" " {} हटा दिया गया" ), "unit_action_done": ( " कार्य" " {} इकाई {} पर किया गया" ), "unit_control": ( "🎛 इकाई" " {} के साथ इंटरैक्ट करें" " ({})\n{} इकाई स्थिति: {}" ), "action_not_found": ( "🚫 कार्य" " {} नहीं मिला" ), "unit_renamed": ( " इकाई" " {} का नाम बदल दिया गया {}" ), "stop_btn": "🍎 रोकें", "start_btn": "🍏 शुरू करें", "restart_btn": "🔄 पुनः शुरू करें", "logs_btn": "📄 लॉग", "tail_btn": "🚅 Tail", "back_btn": "🔙 पीछे जाएँ", "close_btn": "✖️ बंद करें", "refresh_btn": "🔄 ताज़ा करें", "_cmd_doc_units": "इकाइयों की सूची दिखाएँ", "_cmd_doc_addunit": " - इकाई जोड़ें", "_cmd_doc_nameunit": " - इकाई का नाम बदलें", "_cmd_doc_delunit": " - इकाई हटाएँ", "_cmd_doc_unit": " - इकाई प्रबंधित करें", "_cls_doc": "systemd इकाइयों का सरल और सुविधाजनक प्रबंधन", } strings_uz = { "panel": ( "🎛 Bu yerda siz sizning" " systemd birliklaringizni boshqarishingiz mumkin\n\n{}" ), "unit_doesnt_exist": ( "🚫 Birlik" " {} mavjud emas!" ), "args": ( "🚫 Hech qanday" " argumentlar ko'rsatilmadi" ), "unit_added": ( " Birlik" " {} nomi {} qo'shildi" ), "unit_removed": ( " Birlik" " {} o'chirildi" ), "unit_action_done": ( " Amal" " {} birlik {} uchun bajirildi" ), "unit_control": ( "🎛 Birlik" " {} bilan ishlash ({})\n{}" " Birlik holati: {}" ), "action_not_found": ( "🚫 Amal" " {} topilmadi" ), "unit_renamed": ( " Birlik" " {} nomi {} o'zgartirildi" ), "stop_btn": "🍎 To'xtatish", "start_btn": "🍏 Boshlash", "restart_btn": "🔄 Qayta ishga tushirish", "logs_btn": "📄 Jurnal", "tail_btn": "🚅 Tail", "back_btn": "🔙 Orqaga", "close_btn": "✖️ Yopish", "refresh_btn": "🔄 Yangilash", "_cmd_doc_units": "Birliklar ro'yxatini ko'rsatish", "_cmd_doc_addunit": " - Birlik qo'shish", "_cmd_doc_nameunit": " - Birlik nomini o'zgartirish", "_cmd_doc_delunit": " - Birlikni o'chirish", "_cmd_doc_unit": " - Birlikni boshqarish", } def _get_unit_status_text(self, unit: str) -> str: return ( subprocess.run( [ "sudo", "-S", "systemctl", "is-active", unit, ], check=False, stdout=subprocess.PIPE, ) .stdout.decode() .strip() ) def _is_running(self, unit: str) -> bool: return self._get_unit_status_text(unit) == "active" def _unit_exists(self, unit: str) -> bool: return ( subprocess.run( [ "sudo", "-S", "systemctl", "cat", unit, ], check=False, stdout=subprocess.PIPE, ).returncode == 0 ) async def _manage_unit(self, call: Union[InlineCall, int], unit: dict, action: str): if action == "start": subprocess.run( ["sudo", "-S", "systemctl", "start", unit["formal"]], check=True ) elif action == "stop": subprocess.run( ["sudo", "-S", "systemctl", "stop", unit["formal"]], check=True ) elif action == "restart": subprocess.run( ["sudo", "-S", "systemctl", "restart", unit["formal"]], check=True ) elif action in {"logs", "tail"}: logs = ( subprocess.run( [ "sudo", "-S", "journalctl", "-u", unit["formal"], "-n", "1000", ], check=True, stdout=subprocess.PIPE, ) .stdout.decode() .strip() ) hostname = ( subprocess.run(["hostname"], check=True, stdout=subprocess.PIPE) .stdout.decode() .strip() ) logs = logs.replace(f"{hostname} ", "") logs = logs.replace("[" + str(self._get_unit_pid(unit["formal"])) + "]", "") if action == "logs": logs = io.BytesIO(logs.encode()) logs.name = f"{unit['formal']}-logs.txt" await self._client.send_file( call.form["chat"] if not isinstance(call, int) else call, logs ) else: actual_logs = "" logs = list(reversed(logs.splitlines())) while logs: chunk = f"{logs.pop()}\n" if len(actual_logs + chunk) >= 4096: break actual_logs += chunk if isinstance(call, int): await self.inline.form( f"{utils.escape_html(actual_logs)}", call, reply_markup=self._get_unit_markup(unit), ) return await call.edit( f"{utils.escape_html(actual_logs)}", reply_markup=self._get_unit_markup(unit), ) await call.answer("Action complete") return if isinstance(call, int): return await call.answer("Action complete") await asyncio.sleep(2) await self._control_service(call, unit) def _get_unit_markup(self, unit: dict) -> list: return [ [ { "text": self.strings("start_btn"), "callback": self._manage_unit, "args": (unit, "start"), }, { "text": self.strings("stop_btn"), "callback": self._manage_unit, "args": (unit, "stop"), }, { "text": self.strings("restart_btn"), "callback": self._manage_unit, "args": (unit, "restart"), }, ], [ { "text": self.strings("logs_btn"), "callback": self._manage_unit, "args": (unit, "logs"), }, { "text": self.strings("tail_btn"), "callback": self._manage_unit, "args": (unit, "tail"), }, ], [ { "text": self.strings("refresh_btn"), "callback": self._control_service, "args": (unit,), }, { "text": self.strings("back_btn"), "callback": self._control_services, }, ], ] async def _control_service(self, call: InlineCall, unit: dict): await call.edit( self.strings("unit_control").format( unit["name"], unit["formal"], self._get_unit_status_emoji(unit["formal"]), self._get_unit_status_text(unit["formal"]), ), reply_markup=self._get_unit_markup(unit), ) def _get_unit_pid(self, unit: str) -> str: return ( subprocess.run( [ "sudo", "-S", "systemctl", "show", unit, "--property=MainPID", "--value", ], check=False, stdout=subprocess.PIPE, ) .stdout.decode() .strip() ) def _get_unit_resources_consumption(self, unit: str) -> str: if not self._is_running(unit): return "" pid = self._get_unit_pid(unit) ram = human_readable_size( int( subprocess.run( [ "ps", "-p", pid, "-o", "rss", ], check=False, stdout=subprocess.PIPE, ) .stdout.decode() .strip() .split("\n")[1] ) * 1024 ) cpu = ( subprocess.run( [ "ps", "-p", pid, "-o", r"%cpu", ], check=False, stdout=subprocess.PIPE, ) .stdout.decode() .strip() .split("\n")[1] + "%" ) return f"📟 {ram} | 🗃 {cpu}" def _get_panel(self): return self.strings("panel").format( "\n".join( [ f"{self._get_unit_status_emoji(unit['formal'])} {unit['name']}" f" ({unit['formal']}):" f" {self._get_unit_status_text(unit['formal'])} {self._get_unit_resources_consumption(unit['formal'])}" for unit in self.get("services", []) ] ) ) async def _control_services(self, call: InlineCall, refresh: bool = False): await call.edit( self._get_panel(), reply_markup=self._get_services_markup(), ) if refresh: await call.answer("Information updated!") def _get_unit_status_emoji(self, unit: str) -> str: status = self._get_unit_status_text(unit) if status == "active": return "🍏" elif status == "inactive": return "🍎" elif status == "failed": return "🚫" elif status == "activating": return "🔄" else: return "❓" def _get_services_markup(self) -> list: return utils.chunks( [ { "text": ( self._get_unit_status_emoji(service["formal"]) + " " + service["name"] ), "callback": self._control_service, "args": (service,), } for service in self.get("services", []) ], 2, ) + [ [ { "text": self.strings("refresh_btn"), "callback": self._control_services, "args": (True,), }, {"text": self.strings("close_btn"), "action": "close"}, ] ] async def unitscmd(self, message: Message): """Open control panel""" form = await self.inline.form( self._get_panel(), message, reply_markup=self._get_services_markup(), ) async def addunitcmd(self, message: Message): """ - Add new unit""" args = utils.get_args_raw(message) if not args: await utils.answer(message, self.strings("args")) return try: unit, name = args.split(maxsplit=1) except ValueError: unit = args name = args if not self._unit_exists(unit): await utils.answer(message, self.strings("unit_doesnt_exist").format(unit)) return self.set( "services", self.get("services", []) + [{"name": name, "formal": unit}], ) await utils.answer(message, self.strings("unit_added").format(unit, name)) async def delunitcmd(self, message: Message): """ - Delete unit""" args = utils.get_args_raw(message) if not args: await utils.answer(message, self.strings("args")) return if not any(unit["formal"] == args for unit in self.get("services", [])): await utils.answer(message, self.strings("unit_doesnt_exist").format(args)) return self.set( "services", [ service for service in self.get("services", []) if service["formal"] != args ], ) await utils.answer(message, self.strings("unit_removed").format(args)) async def unitcmd(self, message: Message): """ - Perform specific action on unit bypassing main menu""" args = utils.get_args_raw(message) if not args or len(args.split()) < 2: await utils.answer(message, self.strings("args")) return unit, action = args.split(maxsplit=1) if not self._unit_exists(unit): await utils.answer(message, self.strings("unit_doesnt_exist").format(unit)) return if action in {"start", "stop", "restart", "logs"}: await self._manage_unit( utils.get_chat_id(message), {"formal": unit, "name": unit}, action, ) elif action == "tail": await self._manage_unit( utils.get_chat_id(message), {"formal": unit, "name": unit}, "tail", ) else: await utils.answer(message, self.strings("action_not_found").format(action)) return await utils.answer( message, self.strings("unit_action_done").format(action, unit), ) async def nameunitcmd(self, message: Message): """ - Rename unit""" args = utils.get_args_raw(message) if not args or len(args.split()) < 2: await utils.answer(message, self.strings("args")) return unit, name = args.split(maxsplit=1) if not any(unit_["formal"] == unit for unit_ in self.get("services", [])): await utils.answer(message, self.strings("unit_doesnt_exist").format(unit)) return self.set( "services", [ service for service in self.get("services", []) if service["formal"] != unit ] + [{"name": name, "formal": unit}], ) await utils.answer(message, self.strings("unit_renamed").format(unit, name))