__version__ = (2, 0, 0)
# ©️ Dan Gazizullin, 2021-2023
# This file is a part of Hikka Userbot
# Code is licensed under CC-BY-NC-ND 4.0 unless otherwise specified.
# 🌐 https://github.com/hikariatama/Hikka
# 🔑 https://creativecommons.org/licenses/by-nc-nd/4.0/
# + attribution
# + non-commercial
# + no-derivatives
# You CANNOT edit this file without direct permission from the author.
# You can redistribute this file without any changes.
# meta pic: https://0x0.st/Hcj1.png
# meta banner: https://mods.hikariatama.ru/badges/checkege.jpg
# meta developer: @hikarimods
# scope: hikka_only
# scope: hikka_min 1.6.2
import asyncio
import base64
import hashlib
import typing
import warnings
import requests
from .. import loader, utils
warnings.filterwarnings("ignore")
SUBJECT_MAPPING = {
"Русский": "🇷🇺",
"Математика": "➗",
"Физика": "👨🏫",
"География": "🗺",
"Информатика": "💻",
"Английский": "🇬🇧",
"Немецкий": "🇩🇪",
"Французский": "🇫🇷",
"Китайский": "🇨🇳",
"Общество": "👥",
"История": "👵",
"Литература": "📚",
"Химия": "🧪",
"Биология": "😺",
}
@loader.tds
class CheckEge(loader.Module):
"""Checks Russian National Exam results"""
strings = {"name": "CheckEge"}
def __init__(self):
self.config = loader.ModuleConfig(
loader.ConfigValue(
"CHECKEGE_TOKEN",
None,
(
"Токен CheckEge. Можно получить на https://checkege.rustest.ru из"
" куки Participant. Если заполнить остальные поля конфига, при"
" авторизации заполнится автоматически."
),
validator=loader.validators.Hidden(),
),
loader.ConfigValue(
"FIO",
None,
(
"ФИО, с которым нужно авторизоваться в формате Иванов Иван"
" Иванович. Требует наличия RuCaptcha токена в конфиге."
),
validator=loader.validators.Hidden(),
),
loader.ConfigValue(
"DOCUMENT",
None,
"Номер паспорта без серии. Требует наличия RuCaptcha токена в конфиге.",
validator=loader.validators.Hidden(
loader.validators.RegExp(r"^\d{6}$")
),
),
loader.ConfigValue(
"REGION",
None,
(
"Код региона, в котором вы сдавали ЕГЭ. Можно посмотреть в"
" https://gist.github.com/hikariatama/95f1a92dbe0379a88b6e673a1d79ed17."
" Требует наличия RuCaptcha токена в конфиге."
),
validator=loader.validators.Hidden(
loader.validators.RegExp(r"^\d{1,2}$")
),
),
loader.ConfigValue(
"RUCAPTCHA_TOKEN",
None,
"Токен RuCaptcha. Можно получить на https://rucaptcha.com",
validator=loader.validators.Hidden(),
),
loader.ConfigValue(
"PROXY",
None,
"Прокси в формате http://user:pass@host:port",
validator=loader.validators.Hidden(),
),
)
async def _auth(self):
captcha = (
await utils.run_sync(
requests.get,
"https://checkege.rustest.ru/api/captcha",
proxies={"https": self.config["PROXY"]},
verify=False,
)
).json()
captcha_img = base64.b64decode(captcha["Image"].encode())
captcha_token = captcha["Token"]
captcha_id = (
await utils.run_sync(
requests.post,
"https://rucaptcha.com/in.php",
data={
"key": self.config["RUCAPTCHA_TOKEN"],
"method": "post",
"numeric": 1,
"min_len": 6,
"max_len": 6,
},
files={
"file": ("captcha.png", captcha_img, "image/png"),
},
proxies={"https": self.config["PROXY"]},
)
).text.split("|")[1]
while True:
await asyncio.sleep(3)
captcha_result = (
await utils.run_sync(
requests.get,
"https://rucaptcha.com/res.php",
params={
"key": self.config["RUCAPTCHA_TOKEN"],
"action": "get",
"id": captcha_id,
},
proxies={"https": self.config["PROXY"]},
)
).text
if captcha_result != "CAPCHA_NOT_READY":
break
captcha_result = captcha_result.split("|")[1]
self.config["CHECKEGE_TOKEN"] = dict(
(
await utils.run_sync(
requests.post,
"https://checkege.rustest.ru/api/participant/login",
data={
"Hash": hashlib.md5(
self.config["FIO"].replace(" ", "").lower().encode()
).hexdigest(),
"Code": "",
"Document": f"000000{self.config['DOCUMENT']}",
"Region": self.config["REGION"],
"AgreeCheck": "on",
"Captcha": captcha_result,
"Token": captcha_token,
"reCaptureToken": captcha_result,
},
verify=False,
proxies={"https": self.config["PROXY"]},
)
).cookies
)["Participant"]
async def _get_result(self, retry: bool = True) -> typing.Union[dict, bool]:
if not self.config["CHECKEGE_TOKEN"] and (
not self.config["FIO"]
or not self.config["DOCUMENT"]
or not self.config["REGION"]
):
return False
if not self.config["CHECKEGE_TOKEN"]:
await self._auth()
result = (
await utils.run_sync(
requests.get,
"https://checkege.rustest.ru/api/exam",
cookies={"Participant": self.config["CHECKEGE_TOKEN"]},
verify=False,
proxies={"https": self.config["PROXY"]},
)
).json()
if result.get("Message") == "Authorization has been denied for this request.":
if retry:
await self._auth()
return await self._get_result(retry=False)
return False
return result
async def _format_result(self, result: dict) -> str:
strings = ""
for exam in result["Result"]["Exams"]:
name, has_result, test_mark = (
exam["Subject"],
exam["HasResult"],
exam["TestMark"],
)
emoji = next(
(SUBJECT_MAPPING.get(n) for n in SUBJECT_MAPPING if n in name),
"📕",
)
result = (
(
"👍 зачёт"
if has_result and test_mark
else (
"🚫"
" незачёт"
)
)
if name == "Сочинение"
else (
"👍"
f" {test_mark} балл(-ов)"
if has_result
else (
"🚫 нет"
" результата"
)
)
)
strings += f"{emoji} {name} · {result}\n"
return strings
def _update_current_results(self, result: dict):
self.set(
"have_results",
[
(exam["ExamId"], exam["TestMark"])
for exam in result["Result"]["Exams"]
if exam["HasResult"]
],
)
@loader.command()
async def checkege(self, message):
"""Авторизоваться и вывести результаты ЕГЭ"""
if not self.config["CHECKEGE_TOKEN"] and (
not self.config["FIO"]
or not self.config["DOCUMENT"]
or not self.config["REGION"]
):
await utils.answer(
message,
(
"🚫 Токен"
" CheckEge не установлен.\n\nАвторизуйтесь на"
" https://checkege.rustest.ru и получите его из cookie Participant"
),
)
return
message = await utils.answer(
message,
(
"🔓 Взламываю"
" ФИПИ..."
),
)
if not (result := await self._get_result()):
await utils.answer(
message,
(
"⚰️ Неверный токен"
" / данные авторизации!"
),
)
self.set("authorized", False)
return
await utils.answer(message, await self._format_result(result))
self.set("authorized", True)
@loader.loop(interval=30, autostart=True)
async def check_loop(self):
if not self.get("authorized"):
return
if not (result := await self._get_result()):
await self.inline.bot.send_message(
self._tg_id,
(
"⚰️ Авторизация на CheckEge истекла, авторизоваться не"
" получилось!"
),
)
self.set("authorized", False)
return
for exam in result["Result"]["Exams"]:
if exam["HasResult"] and (exam["ExamId"], exam["TestMark"]) not in self.get(
"have_results", []
):
await self.inline.bot.send_message(
self._tg_id,
(
f"🎉 Получен результат за экзамен {exam['Subject']}:"
f" {exam['TestMark']} балл(-ов)"
),
)
self._update_current_results(result)