# █ █ ▀ █▄▀ ▄▀█ █▀█ ▀ # █▀█ █ █ █ █▀█ █▀▄ █ # © Copyright 2022 # https://t.me/hikariatama # # 🔒 Licensed under the GNU AGPLv3 # 🌐 https://www.gnu.org/licenses/agpl-3.0.html # meta pic: https://static.dan.tatar/wolfram_icon.png # meta banner: https://mods.hikariatama.ru/badges/wolfram.jpg # meta developer: @hikarimods # requires: Pillow # scope: hikka_only # scope: hikka_min 1.2.10 import contextlib import io import json import logging import os import random import tempfile from urllib.parse import quote_plus import aiohttp import requests from PIL import Image, ImageEnhance, ImageOps from telethon.tl.types import Message from .. import loader, utils logger = logging.getLogger(__name__) appids = [ "26LQEH-YT3P6T3YY9", "K49A6Y-4REWHGRWW6", "J77PG9-UY8A3WQ2PG", "P3WLYY-2G9GA6RQGE", "P7JH3K-27RHWR53JQ", "L349HV-29P5JV8Y7J", "77PP56-XLQK5GKUAA", "59EQ3X-HE26TY2W64", "8Q68TL-QA8W9GEXAA", "KQRKKJ-8WHPY395HA", "AAT4HU-Q3RETTGY93", "7JKH84-T648HW2UV9", "WYEQU3-2T55JP3WUG", "T2XT8W-57PJW3L433", ] async def wolfram_compute(query: str) -> tuple: try: async with aiohttp.ClientSession() as session: async with session.request( "GET", ( "https://api.wolframalpha.com/v2/query?" "scantimeout=30&" "podtimeout=30&" "formattimeout=30&" "parsetimeout=30&" "totaltimeout=30&" "podstate=Show+all+steps&" "output=json&" f"input={quote_plus(query)}&" f"appid={random.choice(appids)}" ), ) as resp: answer = await resp.text() answer = json.loads(answer) result = f"🔎 Query: {utils.escape_html(query)}\n" result += "😼 Result:\n" images = [] for pod in answer["queryresult"]["pods"]: if pod["title"] in {"Result", "Solution", "Exact result"}: result += "\n".join( [ ( f"{utils.escape_html(subpod['plaintext'])}\n" if "plaintext" in subpod else "" ) for subpod in pod["subpods"] ] ) for pod in reversed(answer["queryresult"]["pods"]): if "subpods" in pod and pod["title"] != "Input": images += [ subpod["img"]["src"] for subpod in pod["subpods"] if "img" in subpod and "src" in subpod["img"] ] if not images: with contextlib.suppress(Exception): images = [ next( subpod["img"]["src"] for subpod in next( pod["subpods"] for pod in answer["queryresult"]["pods"] if pod["title"] == "Input" ) if "img" in subpod and "src" in subpod["img"] ) ] if images: files = [] for image in images: with tempfile.TemporaryDirectory() as d: ImageEnhance.Brightness( ImageOps.invert( ImageOps.expand( Image.open( io.BytesIO( ( await utils.run_sync( requests.get, image ) ).content ) ), border=50, fill="white", ).convert("RGB") ) ).enhance(1.2).save(os.path.join(d, "wolfram.png")) with open(os.path.join(d, "wolfram.png"), "rb") as f: files += [f.read()] images = files return result, images or None except Exception: logger.exception("Wolfram query processing error") @loader.tds class WolframAlphaMod(loader.Module): """Solves hard math questions""" strings = { "name": "WolframAlpha", "computing": ( "🧠 I'm doing my best to solve this problem...|" "🧠 Meh, mafs again...|" "🧠 Oh no, it's very hard problem. I need some time to solve it...|" "🧠 I'm a math god and Im gonna help u...|" "🧠 ACTIVATING MATH GOD MODE|" "🧠 Beep-boop calculating|" "🧠 Can't solve this by urself? Meh, k, I'll help" ), "hard": "🤯 I don't know how to solve this problem", } strings_ru = { "hard": "🤯 Я не знаю, как решить эту задачу", "_cmd_doc_wolfram": "Решить математическую задачу", "_cls_doc": "Решает математические задачи", "computing": ( "🧠 Делаю все, чтобы решить эту математическую задачу...|" "🧠 Блин, опять математика...|" "🧠 Ой, это очень сложная задача. Дай мне немного времени...|" "🧠 Я бог математики и я помогу тебе...|" "🧠 АКТИВИРУЮ РЕЖИМ БОГА МАТЕМАТИКИ|" "🧠 Бип-буп решаю|" "🧠 Не можешь решить сам? Мэх, ладно, помогу..." ), } async def wolframcmd(self, message: Message): """Solve mathematic problem""" args = utils.get_args_raw(message) if not args: args = "x ^ 2 + y ^ 2 = 1" message = await utils.answer( message, random.choice(self.strings("computing").split("|")), ) answer = await wolfram_compute(args) if not answer: await utils.answer(message, self.strings("hard")) return if not answer[1]: await utils.answer(message, answer[0]) else: await message.delete() await self._client.send_message(message.peer_id, answer[0], file=answer[1])