# Friendly Telegram (telegram userbot)
# Copyright (C) 2018-2019 The Authors
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
import logging
import time
from datetime import datetime
from io import BytesIO
from .. import loader, utils
logger = logging.getLogger(__name__)
@loader.test(args=None)
async def dumptest(conv):
m = await conv.send_message("test")
await conv.send_message(".dump", reply_to=m)
r = await conv.get_response()
assert r.message.startswith("Message(") and "test" in r.message, r
@loader.test(args="0")
async def logstest(conv):
r = await conv.get_response()
assert r.message == "Loading media...", r
r2 = await conv.get_response()
assert r2.document, r2
@loader.tds
class TestMod(loader.Module):
"""Self-tests"""
strings = {"name": "Tester",
"pong": ".pong",
"bad_loglevel": "Не верный уровень логов.",
"uploading_logs": "Собираю логи...",
"no_logs": "Логов с уровнем {} нет.",
"logs_filename": "FTG-Logs[{}].txt",
"logs_unsafe": ("Если ты точно хочешь показать логи с уровнем {lvl}, то напиши .logs {lvl} {force}"),
"logs_force": "FORCE_INSECURE",
"suspend_invalid_time": "Invalid time to suspend"}
@loader.test(resp="Pong")
@loader.unrestricted
async def pingcmd(self, message):
"""Does nothing"""
await utils.answer(message, self.strings("pong", message))
@loader.owner
async def pungcmd(self, message):
"""Useless pinger"""
a = 5
r = utils.get_args(message)
if r and r[0].isdigit():
a = int(r[0])
ping_msg = []
ping_data = []
for _ in range(a):
start = datetime.now()
msg = await message.client.send_message("me", "ping")
end = datetime.now()
duration = (end - start).microseconds / 1000
ping_data.append(duration)
ping_msg.append(msg)
ping = sum(ping_data) / len(ping_data)
await message.edit(f"[ ping {str(ping)[0:5]}ms ]")
for i in ping_msg:
await i.delete()
@loader.test(func=dumptest)
async def dumpcmd(self, message):
"""Use in reply to get a dump of a message"""
if not message.is_reply:
return
await utils.answer(message, ""
+ utils.escape_html((await message.get_reply_message()).stringify()) + "")
@loader.test(func=logstest)
async def logscmd(self, message):
""".logs
Dumps logs. Loglevels below WARNING may contain personal info."""
args = utils.get_args(message)
if not len(args) == 1 and not len(args) == 2:
args = ["40"]
try:
lvl = int(args[0])
except ValueError:
# It's not an int. Maybe it's a loglevel
lvl = getattr(logging, args[0].upper(), None)
if not isinstance(lvl, int):
await utils.answer(message, self.strings("bad_loglevel", message))
return
if not (lvl >= logging.WARNING or (len(args) == 2 and args[1] == self.strings("logs_force", message))):
await utils.answer(message,
self.strings("logs_unsafe", message).format(lvl=lvl, force=self.strings("logs_force", message)))
return
[handler] = logging.getLogger().handlers
logs = ("\n".join(handler.dumps(lvl))).encode("utf-8")
if not len(logs) > 0:
await utils.answer(message, self.strings("no_logs", message).format(lvl))
return
logs = BytesIO(logs)
logs.name = self.strings("logs_filename", message).format(lvl)
await utils.answer(message, logs)
@loader.owner
async def suspendcmd(self, message):
""".suspend