# █ █ ▀ █▄▀ ▄▀█ █▀█ ▀
# █▀█ █ █ █ █▀█ █▀▄ █
# © Copyright 2022
# https://t.me/hikariatama
#
# 🔒 Licensed under the GNU AGPLv3
# 🌐 https://www.gnu.org/licenses/agpl-3.0.html
# meta pic: https://static.dan.tatar/wolfram_icon.png
# meta banner: https://mods.hikariatama.ru/badges/wolfram.jpg
# meta developer: @hikarimods
# requires: Pillow
# scope: hikka_only
# scope: hikka_min 1.2.10
import contextlib
import io
import json
import logging
import os
import random
import tempfile
from urllib.parse import quote_plus
import aiohttp
import requests
from PIL import Image, ImageEnhance, ImageOps
from telethon.tl.types import Message
from .. import loader, utils
logger = logging.getLogger(__name__)
appids = [
"26LQEH-YT3P6T3YY9",
"K49A6Y-4REWHGRWW6",
"J77PG9-UY8A3WQ2PG",
"P3WLYY-2G9GA6RQGE",
"P7JH3K-27RHWR53JQ",
"L349HV-29P5JV8Y7J",
"77PP56-XLQK5GKUAA",
"59EQ3X-HE26TY2W64",
"8Q68TL-QA8W9GEXAA",
"KQRKKJ-8WHPY395HA",
"AAT4HU-Q3RETTGY93",
"7JKH84-T648HW2UV9",
"WYEQU3-2T55JP3WUG",
"T2XT8W-57PJW3L433",
]
async def wolfram_compute(query: str) -> tuple:
try:
async with aiohttp.ClientSession() as session:
async with session.request(
"GET",
(
"https://api.wolframalpha.com/v2/query?"
"scantimeout=30&"
"podtimeout=30&"
"formattimeout=30&"
"parsetimeout=30&"
"totaltimeout=30&"
"podstate=Show+all+steps&"
"output=json&"
f"input={quote_plus(query)}&"
f"appid={random.choice(appids)}"
),
) as resp:
answer = await resp.text()
answer = json.loads(answer)
result = f"🔎 Query: {utils.escape_html(query)}\n"
result += "😼 Result:\n"
images = []
for pod in answer["queryresult"]["pods"]:
if pod["title"] in {"Result", "Solution", "Exact result"}:
result += "\n".join(
[
(
f"{utils.escape_html(subpod['plaintext'])}\n"
if "plaintext" in subpod
else ""
)
for subpod in pod["subpods"]
]
)
for pod in reversed(answer["queryresult"]["pods"]):
if "subpods" in pod and pod["title"] != "Input":
images += [
subpod["img"]["src"]
for subpod in pod["subpods"]
if "img" in subpod and "src" in subpod["img"]
]
if not images:
with contextlib.suppress(Exception):
images = [
next(
subpod["img"]["src"]
for subpod in next(
pod["subpods"]
for pod in answer["queryresult"]["pods"]
if pod["title"] == "Input"
)
if "img" in subpod and "src" in subpod["img"]
)
]
if images:
files = []
for image in images:
with tempfile.TemporaryDirectory() as d:
ImageEnhance.Brightness(
ImageOps.invert(
ImageOps.expand(
Image.open(
io.BytesIO(
(
await utils.run_sync(
requests.get, image
)
).content
)
),
border=50,
fill="white",
).convert("RGB")
)
).enhance(1.2).save(os.path.join(d, "wolfram.png"))
with open(os.path.join(d, "wolfram.png"), "rb") as f:
files += [f.read()]
images = files
return result, images or None
except Exception:
logger.exception("Wolfram query processing error")
@loader.tds
class WolframAlphaMod(loader.Module):
"""Solves hard math questions"""
strings = {
"name": "WolframAlpha",
"computing": (
"🧠 I'm doing my best to solve this problem...|"
"🧠 Meh, mafs again...|"
"🧠 Oh no, it's very hard problem. I need some time to solve it...|"
"🧠 I'm a math god and Im gonna help u...|"
"🧠 ACTIVATING MATH GOD MODE|"
"🧠 Beep-boop calculating|"
"🧠 Can't solve this by urself? Meh, k, I'll help"
),
"hard": "🤯 I don't know how to solve this problem",
}
strings_ru = {
"hard": "🤯 Я не знаю, как решить эту задачу",
"_cmd_doc_wolfram": "Решить математическую задачу",
"_cls_doc": "Решает математические задачи",
"computing": (
"🧠 Делаю все, чтобы решить эту математическую задачу...|"
"🧠 Блин, опять математика...|"
"🧠 Ой, это очень сложная задача. Дай мне немного времени...|"
"🧠 Я бог математики и я помогу тебе...|"
"🧠 АКТИВИРУЮ РЕЖИМ БОГА МАТЕМАТИКИ|"
"🧠 Бип-буп решаю|"
"🧠 Не можешь решить сам? Мэх, ладно, помогу..."
),
}
async def wolframcmd(self, message: Message):
"""Solve mathematic problem"""
args = utils.get_args_raw(message)
if not args:
args = "x ^ 2 + y ^ 2 = 1"
message = await utils.answer(
message,
random.choice(self.strings("computing").split("|")),
)
answer = await wolfram_compute(args)
if not answer:
await utils.answer(message, self.strings("hard"))
return
if not answer[1]:
await utils.answer(message, answer[0])
else:
await message.delete()
await self._client.send_message(message.peer_id, answer[0], file=answer[1])