import datetime import io import contextlib import logging import sys import typing import aiohttp from enum import Enum from meval import meval from dataclasses import dataclass, field from hikkatl.errors.rpcerrorlist import MessageIdInvalidError from hikkatl.types import Message from .. import loader, utils from ..inline.types import InlineCall from ..log import HikkaException try: from ..types import HikkaReplyMarkup except ImportError: from ..types import HerokuReplyMarkup as HikkaReplyMarkup # meta banner: https://github.com/sqlmerr/hikka_mods/blob/main/assets/banners/upgradedeval.png?raw=true # meta icon: https://github.com/sqlmerr/hikka_mods/blob/main/assets/icons/upgradedeval.png?raw=true # meta developer: @sqlmerr_m ITEMS_PER_PAGE = 6 EMOJIS = { "python": "🐍", "kotlin": "👩‍💻", "rust": "🦀", "go": "🐹", } class LanguageEnum(str, Enum): python = "python" rust = "rust" kotlin = "kotlin" go = "go" @dataclass(frozen=True) class EvaluationInfo: code: str language: LanguageEnum = field(default=LanguageEnum.python) result: typing.Optional[str] = field(default=None) error: typing.Optional[typing.Union[HikkaException, str]] = field(default=None) is_error: bool = field(default=False) date: datetime.datetime = field(default_factory=datetime.datetime.now) @loader.tds class UpgradedEval(loader.Module): """Just eval with customizable text and stdout""" strings = { "name": "UpgradedEval", "_cfg_text_result": "Text for result", "_cfg_text_error": "Text for error", "_cfg_text_result_and_error": "Text containing both error and result", "_cfg_mode": "Code run mode. stdout is when print works. return, this is standard .e; auto is just a mode that automatically selects stdout or return", } strings_ru = { "_cfg_text_result": "Текст результата", "_cfg_text_error": "Текст ошибки", "_cfg_text_result_and_error": "Текст содержащий и ошибку и результат", "_cfg_mode": "Режим запуска кода. stdout, это когда работает print. return, это стандартный .e; auto - это просто режим, который автоматически выбирает stdout или return", } def __init__(self): self.config = loader.ModuleConfig( loader.ConfigValue( "text_result", ( "Language: {lang}\n" "{emoji} Code:\n" "
{code}
\n\n" " Result:\n" "
{result}
" ), lambda: self.strings("_cfg_text_result"), validator=loader.validators.String(), ), loader.ConfigValue( "text_error", ( "Language: {lang}\n" "{emoji} Code:\n" "
{code}
\n\n" " Error:\n" "
{error}
" ), lambda: self.strings("_cfg_text_error"), validator=loader.validators.String(), ), loader.ConfigValue( "text_result_and_error", ( "Language: {lang}\n" "{emoji} Code:\n" "
{code}
\n\n" " Result:\n" "
{result}
\n\n" " Error:\n" "
{error}
" ), lambda: self.strings("_cfg_text_result_and_error"), validator=loader.validators.String() ), loader.ConfigValue( "mode", "auto", lambda: self.strings("_cfg_mode"), validator=loader.validators.Choice(["stdout", "return", "auto"]), ), ) self._evals: typing.List[EvaluationInfo] = [] async def __inline_open_eval(self, call: InlineCall, eval_info: EvaluationInfo, current_page: int): if not eval_info.is_error: text = self.config["text_result"].format( emoji=EMOJIS[eval_info.language.lower()], lang=eval_info.language.lower(), code=utils.escape_html(eval_info.code) if eval_info.code else "None", result=eval_info.result if eval_info.result else "None", ) else: if eval_info.language == LanguageEnum.python and isinstance(eval_info.error, HikkaException): error = ( self.lookup("Evaluator").censor( ( "\n".join(eval_info.error.full_stack.splitlines()[:-1]) + "\n\n" + "🚫 " + eval_info.error.full_stack.splitlines()[-1] ) ), ) text = self.config["text_error"].format( emoji=EMOJIS["python"], lang="python", code=utils.escape_html(eval_info.code), error=error[0] ) else: error = eval_info.error text = self.config["text_result_and_error"].format( lang=eval_info.language.lower(), code=utils.escape_html(eval_info.code), result=eval_info.result, error=error ) await call.edit( text=text, reply_markup={ "text": "←", "callback": self.__inline_open, "args": (current_page,) } ) async def __inline_open(self, call: InlineCall, page: int = 0): await call.edit( text="📋 Evaluation history", reply_markup=self.__inline_generate_keyboard(self._evals, page), ) def __inline_generate_keyboard(self, evals: typing.List[EvaluationInfo], page: int = 0): if len(evals) == 0: return [] # Sort evals in descending order based on date to ensure newest first sorted_evals = sorted(evals, key=lambda x: x.date, reverse=True) offset = page * ITEMS_PER_PAGE if offset < 0 or offset >= len(sorted_evals): page = 0 offset = 0 # Slice evaluations for the current page page_evals = sorted_evals[offset:offset + ITEMS_PER_PAGE] buttons = [] # Generate buttons for evaluations on current page (no reverse) for e in page_evals: buttons.append( [{ "text": f"{e.date.strftime('%Y-%m-%d %H:%M:%S')} {'✅' if not e.is_error else '❌'} {e.language.capitalize()}", "callback": self.__inline_open_eval, "args": (e, page) }] ) # Navigation buttons nav_buttons = [] if offset > 0: nav_buttons.append({ "text": "<", "callback": self.__inline_open, "args": (page - 1,) }) else: nav_buttons.append({ "text": "X", "data": "empty" }) nav_buttons.append({ "text": f"{page + 1}/{(len(evals) + ITEMS_PER_PAGE - 1) // ITEMS_PER_PAGE}", "data": "empty" }) if offset + ITEMS_PER_PAGE < len(sorted_evals): nav_buttons.append({ "text": ">", "callback": self.__inline_open, "args": (page + 1,) }) else: nav_buttons.append({ "text": "X", "data": "empty" }) if nav_buttons: buttons.append(nav_buttons) return buttons @loader.command(ru_doc="Получить историю (с рестарта юзербота)") async def ehistory(self, message: Message): """Get history (since userbot restart)""" await self.inline.form( text="📋 Evaluation history", message=message, reply_markup=self.__inline_generate_keyboard(self._evals), ) @loader.command(ru_doc="Улучшенный eval") async def ie(self, message: Message): """Upgraded eval""" args = utils.get_args_raw(message) try: attrs = await self.lookup("Evaluator").getattrs(message) stdout = io.StringIO() with contextlib.redirect_stdout(stdout): result = await meval( utils.escape_html(args), globals(), **attrs, ) output = stdout.getvalue() except Exception: item = HikkaException.from_exc_info(*sys.exc_info()) error = ( self.lookup("Evaluator").censor( ( "\n".join(item.full_stack.splitlines()[:-1]) + "\n\n" + "🚫 " + item.full_stack.splitlines()[-1] ) ), ) await utils.answer( message, self.config["text_error"].format( emoji=EMOJIS["python"], lang="python", code=utils.escape_html(utils.get_args_raw(message)), error=error[0] ), ) self._evals.append(EvaluationInfo( args, error=item, is_error=True, )) return mode = self.config["mode"] if mode == "stdout": result = output elif mode == "auto" and output.strip(): result = output if callable(getattr(result, "stringify", None)): with contextlib.suppress(Exception): result = str(result.stringify()) with contextlib.suppress(MessageIdInvalidError): await utils.answer( message, self.config["text_result"].format( emoji=EMOJIS["python"], lang="python", code=utils.escape_html(args) if args else "None", result=result if result else "None", ), ) self._evals.append(EvaluationInfo( args, result=result, )) @loader.command(ru_doc="Запустить код на Rust") async def erust(self, message: Message): """Evaluate Rust code""" code = utils.get_args_raw(message) url = "https://play.rust-lang.org/execute" payload = { "channel": "stable", "mode": "debug", "edition": "2024", "crateType": "bin", "tests": False, "code": code } headers = {"Content-Type": "application/json"} async with aiohttp.ClientSession() as session: async with session.post(url, json=payload, headers=headers, timeout=10) as response: response.raise_for_status() result = await response.json() output = result.get("stdout", "") errors = result.get("stderr", "") with contextlib.suppress(MessageIdInvalidError): await utils.answer( message, self.config["text_result_and_error"].format( emoji=EMOJIS["rust"], lang="rust", code=utils.escape_html(code) if code else "None", result=output, error=errors, ), ) self._evals.append(EvaluationInfo(code, LanguageEnum.rust, result=output, error=errors, is_error=errors!="")) @loader.command(ru_doc="Запустить код на Go") async def ego(self, message: Message): """Evaluate Go code""" code = utils.get_args_raw(message) url = "https://play.golang.org/compile" payload = { "version": 2, "body": code, "withVet": False } headers = {"Content-Type": "application/x-www-form-urlencoded"} async with aiohttp.ClientSession() as session: async with session.post(url, data=payload, headers=headers, timeout=10) as response: response.raise_for_status() result = await response.json() output = "" if result.get("Events"): for event in result["Events"]: if event.get("Kind") == "stdout": output += event.get("Message", "") errors = result.get("Errors", "") if errors: with contextlib.suppress(MessageIdInvalidError): await utils.answer( message, self.config["text_result_and_error"].format( emoji=EMOJIS["go"], lang="go", code=utils.escape_html(code) if code else "None", result=output, error=errors, ), ) else: with contextlib.suppress(MessageIdInvalidError): await utils.answer( message, self.config["text_result"].format( emoji=EMOJIS["go"], lang="go", code=utils.escape_html(code) if code else "None", result=output, ), ) self._evals.append(EvaluationInfo(code, LanguageEnum.go, result=output, error=errors, is_error=errors!="")) @loader.command(ru_doc="Запустить код на Kotlin") async def ekt(self, message: Message): """Evaluate Kotlin code""" code = utils.get_args_raw(message) url = "https://api.kotlinlang.org/api/2.1.20/compiler/run" payload = { "args": "", "conftype": "java", "files": [ { "name": "Main.kt", "publicId": "", "text": code } ] } headers = {"Content-Type": "application/json"} async with aiohttp.ClientSession() as session: async with session.post(url, json=payload, headers=headers, timeout=10) as response: response.raise_for_status() result = await response.json() output = result.get("text", "") errors = result.get("errors", {}).get("Main.kt", []) error = "" for err in errors: error += f"- {err.get('message')}\n" if errors: with contextlib.suppress(MessageIdInvalidError): await utils.answer( message, self.config["text_result_and_error"].format( emoji=EMOJIS["kotlin"], lang="kotlin", code=utils.escape_html(code) if code else "None", result=output, error=error, ), ) else: with contextlib.suppress(MessageIdInvalidError): await utils.answer( message, self.config["text_result"].format( emoji=EMOJIS["kotlin"], lang="kotlin", code=utils.escape_html(code) if code else "None", result=output, ), ) self._evals.append(EvaluationInfo(code, LanguageEnum.kotlin, result=output, error=errors, is_error=errors!=[]))