# █ █ ▀ █▄▀ ▄▀█ █▀█ ▀
# █▀█ █ █ █ █▀█ █▀▄ █
# © Copyright 2022
# https://t.me/hikariatama
#
# 🔒 Licensed under the GNU AGPLv3
# 🌐 https://www.gnu.org/licenses/agpl-3.0.html
# scope: hikka_min 1.2.10
# meta pic: https://img.icons8.com/external-phatplus-lineal-color-phatplus/512/000000/external-rate-email-phatplus-lineal-color-phatplus.png
# meta banner: https://mods.hikariatama.ru/badges/ratemod.jpg
# meta developer: @hikarimods
import asyncio
import hashlib
import re
import requests
from telethon.tl.types import Message
from .. import loader, utils
@loader.tds
class RateModuleMod(loader.Module):
"""Rates module and suggests fixes"""
strings = {
"name": "RateMod",
"template": (
"👮♂️ Mode rating {}:\n{} {} [{}]\n\n{}"
),
"no_file": "What should I check?... 🗿",
"cannot_check_file": "Check error",
}
strings_ru = {
"template": (
"👮♂️ Оценка модуля {}:\n{} {} [{}]\n\n{}"
),
"no_file": "А что проверять то?... 🗿",
"cannot_check_file": "Ошибка проверки",
"_cmd_doc_ratemod": "<код> - Оценить модуль",
"_cls_doc": "Оценивает модуль и дает рекомендации",
}
@loader.unrestricted
async def ratemodcmd(self, message: Message):
""" - Rate code"""
args = utils.get_args_raw(message)
reply = await message.get_reply_message()
if (
not reply
and not getattr(reply, "media", None)
and not getattr(message, "media", None)
and not args
and not utils.check_url(args)
):
return await utils.answer(message, self.strings("no_file"))
checking = (
getattr(reply, "media", None)
if getattr(reply, "media", None) is not None
else (
getattr(message, "media", None)
if getattr(message, "media", None) is not None
else (args if args and utils.check_url(args) else 0)
)
)
if type(checking) is int:
return await utils.answer(message, self.strings("no_file"))
if type(checking) is not str:
try:
file = await self._client.download_file(
(
getattr(reply, "media", None)
if getattr(reply, "media", None) is not None
else getattr(message, "media", None)
),
bytes,
)
except Exception:
return await utils.answer(
message,
self.strings("cannot_check_file"),
)
try:
code = file.decode("utf-8").replace("\r\n", "\n")
except Exception:
return await utils.answer(
message,
self.strings("cannot_check_file"),
)
else:
try:
code = (await utils.run_sync(requests.get, args)).text
except Exception:
return await utils.answer(message, self.strings("cannot_check_file"))
try:
mod_name = re.search(
r"""strings[ ]*=[ ]*{.*?name['"]:[ ]*['"](.*?)['"]""", code, flags=re.S
).group(1)
except Exception:
mod_name = "Unknown"
import_regex = [
r"^[^#]rom ([^\n\r]*) import [^\n\r]*$",
r"^[^#]mport ([^\n\r]*)[^\n\r]*$",
r"""__import__[(]['"]([^'"]*)['"][)]""",
]
imports = [
re.findall(import_re, code, flags=re.M | re.DOTALL)
for import_re in import_regex
]
if ".." in imports:
del imports[imports.index("..")]
splitted = [
_
for _ in list(
zip(
list(
map(
lambda x: len(re.findall(r"[ \t]+(if|elif|else).+:", x)),
re.split(r"[ \t]*async def .*?cmd\(", code),
)
),
[""] + re.findall(r"[ \t]*async def (.*?)cmd\(", code),
)
)
if _[0] > 10
]
comments = ""
score = 4.6
if len(imports) > 10:
comments += (
f"🔻 {{-0.1}} A lot of imports ({len(imports)})"
" [memory]\n"
)
score -= 0.1
if "requests" in imports and "utils.run_sync" not in code:
comments += (
"🔻 {-0.5} Sync requests [blocks runtime]\n"
)
score -= 0.5
if "while True" in code or "while 1" in code:
comments += (
"🔻 {-0.1} While true [block runtime*]\n"
)
score -= 0.1
if ".edit(" in code:
comments += (
"🔻 {-0.3} Classic message.edit [no twink"
" support]\n"
)
score -= 0.3
if re.search(r"@.*?[bB][oO][tT]", code) is not None:
bots = " | ".join(re.findall(r"@.*?[bB][oO][tT]", code))
comments += (
"🔻 {-0.2} Bot-abuse"
f" ({bots})"
" [module will die some day]\n"
)
score -= 0.2
if re.search(r'[ \t]+async def .*?cmd.*\n[ \t]+[^\'" \t]', code) is not None:
undoc = " | ".join(
list(re.findall(r'[ \t]+async def (.*?)cmd.*\n[ \t]+[^" \t]', code))
)
comments += (
f"🔻 {{-0.4}} No docs ({undoc})"
" [all commands must be documented]\n"
)
score -= 0.4
if "time.sleep" in code or "from time import sleep" in code:
comments += (
"🔻 {-2.0} Sync sleep (time.sleep)"
" replace with (await asyncio.sleep) [blocks"
" runtime]\n"
)
score -= 2
if [_ for _ in code.split("\n") if len(_) > 300]:
ll = max(len(_) for _ in code.split("\n") if len(_) > 300)
comments += (
f"🔻 {{-0.1}} Long lines ({ll}) [PEP"
" violation]\n"
)
score -= 0.1
if re.search(r'[\'"] ?\+ ?.*? ?\+ ?[\'"]', code) is not None:
comments += (
"🔻 {-0.1} Avoiding f-строк [causes"
" problems]\n"
)
score -= 0.1
if splitted:
comments += (
"🔻 {-0.2} Big 'if' trees"
f" ({' | '.join([f'{chain} в {fun}' for chain, fun in splitted])})"
" [readability]\n"
)
score -= 0.2
if "== None" in code or "==None" in code:
comments += (
"🔻 {-0.3} Type comparsation via == [google"
" it]\n"
)
score -= 0.3
if "is not None else" in code:
comments += (
"🔻 {-0.1} Unneccessary ternary operator usage"
" (if some_var is not None else another ->"
" some_var or another) [readability]\n"
)
score -= 0.1
if "utils.answer" in code and ".edit(" not in code:
comments += (
"🔸 {+0.3} utils.answer [twinks support]\n"
)
score += 0.3
if re.search(r'[ \t]+async def .*?cmd.*\n[ \t]+[^\'" \t]', code) is None:
comments += (
"🔸 {+0.3} Docs [all commands are"
" documented]\n"
)
score += 0.3
if "requests" in imports and "utils.run_sync" in code or "aiohttp" in imports:
comments += (
"🔸 {+0.3} Async requests [do not stop"
" runtime]\n"
)
score += 0.3
api_endpoint = "https://mods.hikariatama.ru/check?hash="
sha1 = hashlib.sha1()
sha1.update(code.encode("utf-8"))
try:
check_res = (
await utils.run_sync(requests.get, api_endpoint + str(sha1.hexdigest()))
).text
except Exception:
check_res = ""
if check_res in {"yes", "db"}:
comments += (
"🔸 {+1.0} Module is verified [no scam]\n"
)
score += 1.0
score = round(score, 1)
score = min(score, 5.0)
await utils.answer(
message,
self.strings("template").format(
mod_name,
"⭐️" * round(score),
score,
["Shit", "Bad", "Poor", "Normal", "Ok", "Good"][round(score)],
comments,
),
)