__version__ = (13, 0, 3)
# █ █ ▀ █▄▀ ▄▀█ █▀█ ▀
# █▀█ █ █ █ █▀█ █▀▄ █
# © Copyright 2022
# https://t.me/hikariatama
#
# 🔒 Licensed under the GNU AGPLv3
# 🌐 https://www.gnu.org/licenses/agpl-3.0.html
# meta pic: https://static.dan.tatar/hikarichat_icon.png
# meta banner: https://mods.hikariatama.ru/badges/hikarichat.jpg
# meta desc: Chat administrator toolkit, now with powerful free version
# meta developer: @hikarimods
# scope: disable_onload_docs
# scope: inline
# scope: hikka_only
# scope: hikka_min 1.3.0
# requires: aiohttp websockets
import abc
import asyncio
import contextlib
import functools
import imghdr
import io
import json
import logging
import random
import re
import time
import typing
from math import ceil
from types import FunctionType
import aiohttp
import requests
import websockets
from aiogram.types import CallbackQuery, ChatPermissions
from aiogram.utils.exceptions import MessageCantBeDeleted, MessageToDeleteNotFound
from telethon.errors import ChatAdminRequiredError, UserAdminInvalidError
from telethon.errors.rpcerrorlist import WebpageCurlFailedError
from telethon.tl.functions.channels import (
EditAdminRequest,
EditBannedRequest,
GetFullChannelRequest,
GetParticipantRequest,
InviteToChannelRequest,
)
from telethon.tl.functions.messages import EditChatDefaultBannedRightsRequest
from telethon.tl.types import (
Channel,
ChannelParticipantCreator,
Chat,
ChatAdminRights,
ChatBannedRights,
DocumentAttributeAnimated,
Message,
MessageEntitySpoiler,
MessageMediaUnsupported,
User,
UserStatusOnline,
)
from .. import loader, utils
from ..inline.types import InlineCall, InlineMessage
try:
from PIL import Image, ImageDraw, ImageFont
except ImportError:
PIL_AVAILABLE = False
else:
PIL_AVAILABLE = True
logger = logging.getLogger(__name__)
version = f"v{__version__[0]}.{__version__[1]}.{__version__[2]}stable"
ver = f"HikariChat {version}"
FLOOD_TIMEOUT = 0.8
FLOOD_TRESHOLD = 4
PROTECTS = {
"antinsfw": "🔞 AntiNSFW",
"antiarab": "🇵🇸 AntiArab",
"antitagall": "🐵 AntiTagAll",
"antihelp": "🐺 AntiHelp",
"antiflood": "⏱ AntiFlood",
"antichannel": "📯 AntiChannel",
"antispoiler": "👻 AntiSpoiler",
"report": "📣 Report",
"antiexplicit": "🤬 AntiExplicit",
"antiservice": "⚙️ AntiService",
"antigif": "🎑 AntiGIF",
"antizalgo": "🌀 AntiZALGO",
"antistick": "🎨 AntiStick",
"antilagsticks": "⚰️ AntiLagSticks",
"cas": "🛡 CAS",
"bnd": "💬 BND",
"antiraid": "🚪 AntiRaid",
"banninja": "🥷 BanNinja",
"welcome": "👋 Welcome",
"captcha": "🚥 Captcha",
}
def fit(line: str, max_size: int) -> str:
if len(line) >= max_size:
return line
offsets_sum = max_size - len(line)
return f"{' ' * ceil(offsets_sum / 2 - 1)}{line}{' ' * int(offsets_sum / 2 - 1)}"
def gen_table(t: typing.List[typing.List[str]]) -> bytes:
table = ""
header = t[0]
rows_sizes = [len(i) + 2 for i in header]
for row in t[1:]:
rows_sizes = [max(len(j) + 2, rows_sizes[i]) for i, j in enumerate(row)]
rows_lines = ["━" * i for i in rows_sizes]
table += f"┏{('┯'.join(rows_lines))}┓\n"
for line in t:
table += (
f"┃ {' ┃ '.join([fit(row, rows_sizes[k]) for k, row in enumerate(line)])} ┃\n"
)
table += "┠"
for row in rows_sizes:
table += f"{'─' * row}┼"
table = table[:-1] + "┫\n"
return "\n".join(table.splitlines()[:-1]) + "\n" + f"┗{('┷'.join(rows_lines))}┛\n"
def get_first_name(user: typing.Union[User, Channel]) -> str:
"""Returns first name of user or channel title"""
return utils.escape_html(
user.first_name if isinstance(user, User) else user.title
).strip()
def get_full_name(user: typing.Union[User, Channel]) -> str:
return utils.escape_html(
user.title
if isinstance(user, Channel)
else (
f"{user.first_name} "
+ (user.last_name if getattr(user, "last_name", False) else "")
)
).strip()
BANNED_RIGHTS = {
"view_messages": False,
"send_messages": False,
"send_media": False,
"send_stickers": False,
"send_gifs": False,
"send_games": False,
"send_inline": False,
"send_polls": False,
"change_info": False,
"invite_users": False,
}
class HikariChatAPI:
def __init__(self):
self._bot = "@hikka_userbot"
self._queue = []
self.feds = {}
self.chats = {}
self.variables = {}
self.init_done = asyncio.Event()
self._show_warning = True
self._connected = False
self._inited = False
self._local = False
async def init(
self,
client: "CustomTelegramClient", # type: ignore
db: "Database", # type: ignore
module: loader.Module,
):
"""Entry point"""
self._client = client
self._db = db
self.module = module
if not self.module.get("token"):
await self._get_token()
self._task = asyncio.ensure_future(self._connect())
await self.init_done.wait()
async def _wss(self):
async with websockets.connect(
f"wss://hikarichat.hikariatama.ru/ws/{self.module.get('token')}"
) as wss:
init = json.loads(await wss.recv())
logger.debug(f"HikariChat connection debug info {init}")
if init["event"] == "startup":
self.variables = init["variables"]
elif init["event"] == "license_violation":
await wss.close()
raise Exception("local")
self.init_done.set()
logger.debug("HikariChat connected")
self._show_warning = True
self._connected = True
self._inited = True
while True:
ans = json.loads(await wss.recv())
if ans["event"] == "update_info":
self.chats = ans["chats"]
self.feds = ans["feds"]
await wss.send(json.dumps({"ok": True, "queue": self._queue}))
self._queue = []
for chat in self.chats:
if str(chat) not in self.module._linked_channels:
channel = (
await self._client(GetFullChannelRequest(int(chat)))
).full_chat.linked_chat_id
self.module._linked_channels[str(chat)] = channel or False
if ans["event"] == "queue_status":
await self._client.edit_message(
ans["chat_id"],
ans["message_id"],
ans["text"],
)
async def _connect(self):
while True:
try:
await self._wss()
except Exception:
logger.debug("HikariChat disconnection traceback", exc_info=True)
if not self._inited:
self._local = True
self.variables = json.loads(
(
await utils.run_sync(
requests.get,
"https://gist.githubusercontent.com/hikariatama/31a8246c9c6ad0b451324969d6ff2940/raw/608509efd7fee6fa876227e1c8c3c7dc0a952892/variables.json",
)
).text
)
self._feds = self.module.get("feds", {})
delattr(self, "feds")
self.chats = self.module.get("chats", {})
self._processor_task = asyncio.ensure_future(
self._queue_processor()
)
self.init_done.set()
self._task.cancel()
return
self._connected = False
if self._show_warning:
logger.debug("HikariChat disconnected, retry in 5 sec")
self._show_warning = False
await asyncio.sleep(5)
def request(self, payload: dict, message: typing.Optional[Message] = None):
if isinstance(message, Message):
payload = {
**payload,
**{
"chat_id": utils.get_chat_id(message),
"message_id": message.id,
},
}
self._queue += [payload]
def should_protect(self, chat_id: typing.Union[str, int], protection: str) -> bool:
return (
str(chat_id) in self.chats
and protection in self.chats[str(chat_id)]
and str(self.chats[str(chat_id)][protection][1]) == str(self.module._tg_id)
)
async def nsfw(self, photo: bytes) -> str:
if not self.module.get("token"):
logger.warning("Token is not sent, NSFW check forbidden")
return "sfw"
async with aiohttp.ClientSession() as session:
async with session.request(
"POST",
"https://hikarichat.hikariatama.ru/check_nsfw",
headers={"Authorization": f"Bearer {self.module.get('token')}"},
data={"file": photo},
) as resp:
r = await resp.text()
try:
r = json.loads(r)
except Exception:
logger.exception("Failed to check NSFW")
return "sfw"
if "error" in r and "Rate limit" in r["error"]:
logger.warning("NSFW checker ratelimit exceeded")
return "sfw"
if "success" not in r:
logger.error(f"API error {json.dumps(r, indent=4)}")
return "sfw"
return r["verdict"]
async def _get_token(self):
async with self._client.conversation(self._bot) as conv:
m = await conv.send_message("/token")
r = await conv.get_response()
token = r.raw_text
await m.delete()
await r.delete()
if not token.startswith("kirito_") and not token.startswith("asuna_"):
raise loader.LoadError("Can't get token")
self.module.set("token", token)
await self._client.delete_dialog(self._bot)
def __getattr__(self, attribute: str):
if self._local and attribute == "feds":
return {fed["shortname"]: fed for fed in self._feds.values()}
raise AttributeError
async def _queue_processor(self):
while True:
if not self._queue:
await asyncio.sleep(1)
continue
ERROR = (
"🚫 API Error:"
" {}"
)
async def assert_arguments(args: set, item: dict) -> bool:
if any(i not in item.get("args", {}) for i in args):
if "chat_id" in item:
await self._client.edit_message(
item["chat_id"],
item["message_id"],
(
"🚫"
" Bad API arguments, PROKAZNIK!"
),
)
return False
return True
async def error(msg: str, item: dict):
if "chat_id" in item:
await self._client.edit_message(
item["chat_id"],
item["message_id"],
ERROR.format(msg),
)
feds_copy = self._feds.copy()
chats_copy = self.chats.copy()
item = self._queue.pop(0)
u = str(self._client._tg_id)
if item["action"] == "create federation":
if not await assert_arguments({"shortname", "name"}, item):
continue
t = "fed_" + "".join(
[
random.choice(list("abcdefghijklmnopqrstuvwyz1234567890"))
for _ in range(32)
]
)
self._feds[t] = {
"shortname": item["args"]["shortname"],
"name": item["args"]["name"],
"chats": [],
"warns": {},
"admins": [u],
"owner": u,
"fdef": [],
"notes": {},
"uid": t,
}
if item["action"] == "add chat to federation":
if not await assert_arguments({"uid", "cid"}, item):
continue
if item["args"]["uid"] not in self._feds:
await error("Federation doesn't exist", item)
continue
if str(item["args"]["cid"]) in self._feds[item["args"]["uid"]]["chats"]:
await error("Chat is already in this federation", item)
continue
self._feds[item["args"]["uid"]]["chats"] += [str(item["args"]["cid"])]
if item["action"] == "remove chat from federation":
if not await assert_arguments({"uid", "cid"}, item):
continue
if item["args"]["uid"] not in self._feds:
await error("Federation doesn't exist", item)
continue
if (
str(item["args"]["cid"])
not in self._feds[item["args"]["uid"]]["chats"]
):
await error("Chat is not in this federation", item)
continue
self._feds[item["args"]["uid"]]["chats"].remove(
str(item["args"]["cid"])
)
if item["action"] == "update protections":
if not await assert_arguments({"protection", "state", "chat"}, item):
continue
chat, protection, state = (
str(item["args"]["chat"]),
item["args"]["protection"],
item["args"]["state"],
)
if protection not in self.variables["protections"] + ["welcome"]:
await error("Unknown protection type", item)
continue
if (
protection in self.variables["argumented_protects"]
and state not in self.variables["protect_actions"]
or protection not in self.variables["argumented_protects"]
and protection in self.variables["protections"]
and state not in {"on", "off"}
):
await error("Protection state invalid", item)
continue
if chat not in self.chats:
self.chats[chat] = {}
if state == "off":
if protection in self.chats[chat]:
del self.chats[chat][protection]
else:
self.chats[chat][protection] = [state, u]
if item["action"] == "delete federation":
if not await assert_arguments({"uid"}, item):
continue
uid = item["args"]["uid"]
if uid not in self._feds:
await error("Federation doesn't exist", item)
continue
del self._feds[uid]
if item["action"] == "rename federation":
if not await assert_arguments({"uid", "name"}, item):
continue
uid, name = item["args"]["uid"], item["args"]["name"]
if uid not in self._feds:
await error("Federation doesn't exist", item)
continue
self._feds[uid]["name"] = name
if item["action"] == "protect user":
if not await assert_arguments({"uid", "user"}, item):
continue
uid, user = item["args"]["uid"], item["args"]["user"]
user = str(user)
if not user.isdigit():
await error("Unexpected format for user", item)
continue
if uid not in self._feds:
await error("Federation doesn't exist", item)
continue
if user in self._feds[uid]["fdef"]:
self._feds[uid]["fdef"].remove(user)
else:
self._feds[uid]["fdef"] += [user]
if item["action"] == "warn user":
if not await assert_arguments({"uid", "user", "reason"}, item):
continue
uid, user, reason = (
item["args"]["uid"],
item["args"]["user"],
item["args"]["reason"],
)
user = str(user)
if not user.isdigit():
await error("Unexpected format for user", item)
continue
if uid not in self._feds:
await error("Federation doesn't exist", item)
continue
if user not in self._feds[uid]["warns"]:
self._feds[uid]["warns"][user] = []
self._feds[uid]["warns"][user] += [reason]
if item["action"] == "forgive user warn":
if not await assert_arguments({"uid", "user"}, item):
continue
uid, user = item["args"]["uid"], item["args"]["user"]
user = str(user)
if not user.isdigit():
await error("Unexpected format for user", item)
continue
if uid not in self._feds:
await error("Federation doesn't exist", item)
continue
if (
user not in self._feds[uid]["warns"]
or not self._feds[uid]["warns"][user]
):
await error("This user has no warns yet", item)
continue
del self._feds[uid]["warns"][user][-1]
if item["action"] == "clear all user warns":
if not await assert_arguments({"uid", "user"}, item):
continue
uid, user = item["args"]["uid"], item["args"]["user"]
user = str(user)
if not user.isdigit():
if not item["args"].get("silent", False):
await error("Unexpected format for user", item)
continue
if uid not in self._feds:
if not item["args"].get("silent", False):
await error("Federation doesn't exist", item)
continue
if not self._feds[uid].get("warns", {}).get(user):
if not item["args"].get("silent", False):
await error("This user has no warns yet", item)
continue
del self._feds[uid]["warns"][user]
if item["action"] == "clear federation warns":
if not await assert_arguments({"uid"}, item):
continue
uid = item["args"]["uid"]
if uid not in self._feds:
await error("Federation doesn't exist", item)
continue
if not self._feds[uid].get("warns"):
await error("This federation has no warns yet", item)
continue
del self._feds[uid]["warns"]
if item["action"] == "new note":
if not await assert_arguments({"uid", "shortname", "note"}, item):
continue
uid, shortname, note = (
item["args"]["uid"],
item["args"]["shortname"],
item["args"]["note"],
)
if uid not in self._feds:
await error("Federation doesn't exist", item)
continue
self._feds[uid]["notes"][shortname] = {"creator": u, "text": note}
if item["action"] == "delete note":
if not await assert_arguments({"uid", "shortname"}, item):
continue
uid, shortname = item["args"]["uid"], item["args"]["shortname"]
if uid not in self._feds:
await error("Federation doesn't exist", item)
continue
if shortname not in self._feds[uid]["notes"]:
await error(f"Note not found ({uid=}, {shortname=})", item)
continue
del self._feds[uid]["notes"][shortname]
if feds_copy != self._feds:
self.module.set("feds", self._feds)
if chats_copy != self.chats:
self.module.set("chats", self.chats)
api = HikariChatAPI()
def reverse_dict(d: dict) -> dict:
return {val: key for key, val in d.items()}
@loader.tds
class HikariChatMod(loader.Module):
"""
Advanced chat admin toolkit
"""
__metaclass__ = abc.ABCMeta
strings = {
"name": "HikariChat",
"args": (
"🚫 Args are"
" incorrect"
),
"no_reason": "Not specified",
"antitagall_on": (
"🐵 AntiTagAll is now on"
" in this chat\nAction: {}"
),
"antitagall_off": (
"🐵 AntiTagAll is now off"
" in this chat"
),
"antiarab_on": (
"🇵🇸 AntiArab is now on in"
" this chat\nAction: {}"
),
"antiarab_off": (
"🇵🇸 AntiArab is now off"
" in this chat"
),
"antilagsticks_on": (
"💣 Destructive stickers"
" protection is now on in this chat"
),
"antilagsticks_off": (
"💣 Destructive stickers"
" protection is now off in this chat"
),
"antizalgo_on": (
"🌀 AntiZALGO is now"
" on in"
" this chat\nAction: {}"
),
"antizalgo_off": (
"🌀 AntiZALGO is now off"
" in this chat"
),
"antistick_on": (
"🎨 AntiStick is now"
" on in"
" this chat\nAction: {}"
),
"antistick_off": (
"🎨 AntiStick is now off"
" in this chat"
),
"antihelp_on": (
"🐺 AntiHelp is now on in"
" this chat"
),
"antihelp_off": (
"🐺 AntiHelp is now"
" off in"
" this chat"
),
"antiraid_on": (
"🚪 AntiRaid is now on"
" in this chat\nAction: {}"
),
"antiraid_off": (
"🚪 AntiRaid is now off"
" in this chat"
),
"bnd_on": (
"💬 Block-Non-Discussion"
" is now on in this chat\nAction: {}"
),
"bnd_off": (
"💬 Block-Non-Discussion"
" is now off in this chat"
),
"antiraid": (
"🚪 AntiRaid is On."
" I {}"
' {} in chat {}'
),
"antichannel_on": (
"📯 AntiChannel is now on"
" in this chat"
),
"antichannel_off": (
"📯 AntiChannel is"
" now off"
" in this chat"
),
"report_on": (
"📣 Report is now on in"
" this chat"
),
"report_off": (
"📣 Report is now off in"
" this chat"
),
"antiflood_on": (
"⏱ AntiFlood is now on in"
" this chat\nAction: {}"
),
"antiflood_off": (
"⏱ AntiFlood is now off"
" in this chat"
),
"antispoiler_on": (
"👻 AntiSpoiler is now on"
" in this chat"
),
"antispoiler_off": (
"👻 AntiSpoiler is"
" now off"
" in this chat"
),
"antigif_on": (
"🎑 AntiGIF is now on in"
" this chat"
),
"antigif_off": (
"🎑 AntiGIF is now off in"
" this chat"
),
"antiservice_on": (
"⚙️ AntiService is now on"
" in this chat"
),
"antiservice_off": (
"⚙️ AntiService is now"
" off in this chat"
),
"banninja_on": (
"🥷 BanNinja is now on in"
" this chat"
),
"banninja_off": (
"🥷 BanNinja is now"
" off in"
" this chat"
),
"antiexplicit_on": (
"🤬 AntiExplicit is"
" now on"
" in this chat\nAction: {}"
),
"antiexplicit_off": (
"🤬 AntiExplicit is now"
" off in this chat"
),
"captcha_on": (
"🚥 Captcha is now on in"
" this chat\nAction: {}"
),
"captcha_off": (
"🚥 Captcha is now off in"
" this chat"
),
"cas_on": (
"🛡 CAS is now on in this"
" chat\nAction: {}"
),
"cas_off": (
"🛡 CAS is now off in this"
" chat"
),
"antinsfw_on": (
"🔞 AntiNSFW is now on in"
" this chat\nAction: {}"
),
"antinsfw_off": (
"🔞 AntiNSFW is now"
" off in"
" this chat"
),
"arabic_nickname": (
'🇵🇸 {}'
" has hieroglyphics in his nickname.\n👊 Action: I {}"
),
"zalgo": (
'🌀 {}'
" has ZALGO in his nickname.\n👊 Action: I {}"
),
"bnd": (
'💬 {}'
" sent a message to channel comments without being chat member.\n👊 Action:"
" I {}"
),
"cas": (
'🛡 {}'
" appears to be in Combat Anti Spam database.\n👊 Action: I {}"
),
"stick": (
"🎨 {} is'
" flooding stickers.\n👊 Action: I {}"
),
"explicit": (
'🤬 {}'
" sent explicit content.\n👊 Action: I {}"
),
"destructive_stick": (
'🚫 {}'
" sent destructive sticker.\n👊 Action: I {}"
),
"nsfw_content": (
'🔞 {}'
" sent NSFW content.\n👊 Action: I {}"
),
"flood": (
'⏱ {} is'
" flooding.\n👊 Action: I {}"
),
"tagall": (
'🐵 {}'
" used TagAll.\n👊 Action: I {}"
),
"fwarn": (
"👮♀️💼 {} got'
" {}/{} federative warn\nReason: {}\n\n{}"
),
"no_fed_warns": (
"👮♀️ This federation has"
" no warns yet"
),
"no_warns": (
'👮♀️ {}'
" has no warns yet"
),
"warns": (
'👮♀️ {}'
" has {}/{} warns\n\n{}"
),
"warns_adm_fed": (
"👮♀️ Warns in this"
" federation:\n"
),
"dwarn_fed": (
"👮♀️ Forgave last"
' federative warn of {}'
),
"clrwarns_fed": (
"👮♀️ Forgave all"
' federative warns of {}'
),
"warns_limit": (
'👮♀️ {}'
" reached warns limit.\nAction: I {}"
),
"welcome": (
"👋 Now I will greet"
" people in this chat\n{}"
),
"unwelcome": (
"👋 Now I will not greet"
" people in this chat"
),
"chat404": "🔓 I am not protecting this chat yet.\n",
"protections": (
"🇵🇸"
" .AntiArab - Bans spammy arabs\n🐺 .AntiHelp -"
" Removes frequent userbot commands\n🐵 .AntiTagAll -"
" Restricts tagging all members\n👋 .Welcome - Greets"
" new members\n🚪 .AntiRaid"
" - Bans all new members\n📯 .AntiChannel -"
" Restricts writing on behalf of channels\n👻 .AntiSpoiler -"
" Restricts spoilers\n🎑"
" .AntiGIF - Restricts GIFs\n🍓 .AntiNSFW -"
" Restricts NSFW photos and stickers\n⏱ .AntiFlood -"
" Prevents flooding\n🤬"
" .AntiExplicit - Restricts explicit content\n⚙️ .AntiService -"
" Removes service messages\n🌀 .AntiZALGO -"
" Penalty for users with ZALGO in nickname\n🎨 .AntiStick -"
" Prevents stickers flood\n🚥 .Captcha -"
" Requires every new participant to complete captcha\n🛡 .CAS - Check every"
" new participant through Combat Anti Spam\n💬 .BND - Restricts"
" messages from users, which are not a participants of chat"
" (comments)\n🥷"
" .BanNinja - Automatic version of AntiRaid\n⚰️"
" .AntiLagSticks - Bans laggy stickers\n👾 Admin:"
" .ban .kick"
" .mute\n.unban .unmute - Admin"
" tools\n👮♀️"
" Warns: .warn .warns\n.dwarn"
" .clrwarns - Warning system\n💼 Federations:"
" .fadd .frm"
" .newfed\n.namefed .fban"
" .rmfed .feds\n.fpromote"
" .fdemote\n.fdef .fdeflist -"
" Controlling multiple chats\n🗒 Notes: .fsave"
" .fstop .fnotes - Federative notes"
),
"not_admin": "🤷♂️ I'm not admin here, or don't have enough rights",
"mute": (
'🤐 {}'
" was muted {}. Reason: {}\n\n{}"
),
"mute_log": (
'🤐 {}'
' was muted {} in {}. Reason: {}\n\n{}'
),
"ban": (
'🔒 {}'
" was banned {}. Reason: {}\n\n{}"
),
"ban_log": (
'🔒 {}'
' was banned {} in {}. Reason: {}\n\n{}'
),
"kick": (
'🚪 {}'
" was kicked. Reason: {}\n\n{}"
),
"kick_log": (
'🚪 {}'
' was kicked in {}. Reason: {}\n\n{}'
),
"unmuted": (
'🎉 {}'
" was unmuted"
),
"unmuted_log": (
'🎉 {}'
' was unmuted in {}'
),
"unban": (
'🪄 {}'
" was unbanned"
),
"unban_log": (
'🪄 {}'
' was unbanned in {}'
),
"defense": (
"🛡 Shield for {} is now {}'
),
"no_defense": (
"🛡 Federative defense"
" list is empty"
),
"defense_list": (
"🛡 Federative defense"
" list:\n{}"
),
"fadded": (
"💼 Current chat added to"
' federation "{}"'
),
"newfed": (
"💼 Created federation"
' "{}"'
),
"rmfed": (
"💼 Removed federation"
' "{}"'
),
"fed404": (
"💼 Federation not"
" found"
),
"frem": (
"💼 Current chat removed"
' from federation "{}"'
),
"f404": (
"💼 Current chat is"
" not in"
' federation "{}"'
),
"fexists": (
"💼 Current chat is"
' already in federation "{}"'
),
"fedexists": (
"💼 Federation exists"
),
"joinfed": (
"💼 Federation joined"
),
"namedfed": (
"💼 Federation renamed to"
" {}"
),
"nofed": (
"💼 Current chat is"
" not in"
" any federation"
),
"fban": (
'💼 {}'
" was banned in federation {} {}\nReason: {}\n{}"
),
"gban": (
'🖕 {}'
" was gbanned.\nReason: {}\n\n{}"
),
"gbanning": (
"🖕 Gbanning {}...'
),
"gunban": (
'🤗 {}'
" was gunbanned.\n\n{}"
),
"gunbanning": (
"🤗 Gunbanning {}...'
),
"in_n_chats": (
"👎 Banned in {}"
" chat(-s)"
),
"unbanned_in_n_chats": (
"✋️ Unbanned in {}"
" chat(-s)"
),
"fmute": (
'💼 {}'
" muted in federation {} {}\nReason: {}\n{}"
),
"funban": (
'💼 {}'
" unbanned in federation {}\n"
),
"funmute": (
'💼 {}'
" unmuted in federation {}\n"
),
"feds_header": (
"💼 Federations:\n\n"
),
"fed": (
'💼 Federation "{}"'
" info:\n🔰 Chats:\n{}\n🔰 Channels:\n{}\n🔰"
" Admins:\n{}\n🔰 Warns: {}\n"
),
"no_fed": (
"💼 This chat is not in"
" any federation"
),
"fpromoted": (
'💼 {}'
" promoted in federation {}"
),
"fdemoted": (
'💼 {}'
" demoted in federation {}"
),
"api_error": (
"🚫 api.hikariatama.ru"
" Error!\n{}"
),
"fsave_args": (
"💼 Usage: .fsave"
" shortname <reply>"
),
"fstop_args": (
"💼 Usage: .fstop"
" shortname"
),
"fsave": (
"💼 Federative note"
" {} saved!"
),
"fstop": (
"💼 Federative note"
" {} removed!"
),
"fnotes": (
"💼 Federative"
" notes:\n{}"
),
"usage": "ℹ️ Usage: .{} <on/off>",
"chat_only": "ℹ️ This command is for chats only",
"version": (
"🎢 {}\n\n🤘 Author:"
" t.me/hikariatama\n☺️"
" Downloaded from @hikarimods\n{}"
),
"error": (
"💀 HikariChat Issued"
" error"
),
"reported": (
'💼 {}'
" reported this message to admins\nReason: {}"
),
"no_federations": (
"💼 You have no active"
" federations"
),
"clrallwarns_fed": (
"👮♀️ Forgave all"
" federative warns of federation"
),
"cleaning": (
"🫥 Looking for Deleted"
" accounts..."
),
"deleted": (
"🫥 Removed {} Deleted"
" accounts"
),
"fcleaning": (
"🫥 Looking for Deleted"
" accounts in federation..."
),
"btn_unban": "🔓 Unban (ADM)",
"btn_unmute": "🔈 Unmute (ADM)",
"btn_unwarn": "♻️ De-Warn (ADM)",
"inline_unbanned": (
'🔓 {} unbanned by {}'
),
"inline_unmuted": (
'🔈 {} unmuted by {}'
),
"inline_unwarned": (
'♻️ Forgave last warn of {} by {}'
),
"inline_funbanned": (
'🔓 {} unbanned in federation by {}'
),
"inline_funmuted": (
'🔈 {} unmuted in federation by {}'
),
"btn_funmute": "🔈 Fed Unmute (ADM)",
"btn_funban": "🔓 Fed Unban (ADM)",
"btn_mute": "🙊 Mute",
"btn_ban": "🔒 Ban",
"btn_fban": "💼 Fed Ban",
"btn_del": "🗑 Delete",
"inline_fbanned": (
'💼 {}'
' banned in federation by {}'
),
"inline_muted": '🙊 {} muted by {}',
"inline_banned": (
'🔒 {}'
' banned by {}'
),
"inline_deleted": '🗑 Deleted by {}',
"sync": "🔄 Syncing chats and feds with server in force mode...",
"sync_complete": "😌 Successfully synced",
"rename_noargs": (
"🚫 Specify new"
" federation"
" name"
),
"rename_success": '😇 Federation renamed to "{}"',
"suffix_removed": "📼 Punishment suffix removed",
"suffix_updated": "📼 New punishment suffix saved\n\n{}",
"processing_myrights": "😌 Processing chats",
"logchat_removed": "📲 Log chat disabled",
"logchat_invalid": (
"🚫 Log chat invalid"
),
"logchat_set": "📲 Log chat updated to {}",
"clnraid_args": (
"🥷 Example usage:"
" .clnraid 10"
),
"clnraid_admin": (
"🥷 Error occured while"
" promoting cleaner. Please, ensure you have enough rights in chat"
),
"clnraid_started": (
"🥷 RaidCleaner is in"
" progress... Found {} users to kick..."
),
"clnraid_confirm": (
"🥷 Please, confirm that"
" you want to start RaidCleaner on {} users"
),
"clnraid_yes": "🥷 Start",
"clnraid_cancel": "🔻 Cancel",
"clnraid_stop": "🚨 Stop",
"clnraid_complete": (
"🥷 RaidCleaner complete!"
" Removed: {} user(-s)"
),
"clnraid_cancelled": (
"🥷 RaidCleaner"
" cancelled."
" Removed: {} user(-s)"
),
"smart_anti_raid_active": (
"🥷 BanNinja is working"
" hard to prevent intrusion to this chat.\n\n{}Deleted {}"
" bot(-s)"
),
"smart_anti_raid_off": "🚨 Stop",
"smart_anti_raid_stopped": (
"🥷 BanNinja Stopped"
),
"banninja_report": (
"🥷 BanNinja has done his"
" job.\nDeleted {} bot(-s)\n\n🏹 «BanNinja can handle any"
" size"
" of attack» © @hikariatama"
),
"forbid_messages": (
"⚠️ I've forbidden sending messages until attack is fully"
" released\n\n"
),
"confirm_rmfed": (
"⚠️ Warning! This operation can't be reverted! Are you sure, "
"you want to delete federation {}?"
),
"confirm_rmfed_btn": "🗑 Delete",
"decline_rmfed_btn": "🔻 Cancel",
"pil_unavailable": (
"🚫 Pillow package"
" unavailable"
),
"action": "",
"configure": "Configure",
"toggle": "Toggle",
"no_protects": (
"🚫 This chat has no"
" active protections to show"
),
"from_where": (
"🚫 Reply to a message to"
" purge from"
),
"no_notes": (
"🚫 No notes found"
),
"complete_captcha": (
"🚥 {}, please, complete captcha within 5'
" minutes"
),
"captcha_timeout": (
'🚥 {}'
" have not completed captcha in time.\n👊 Action: I {}"
),
"captcha_failed": (
'🚥 {}'
" failed captcha.\n👊 Action: I {}"
),
"fdef403": (
"🛡 You can't {} this"
" user, because he is under federative protection"
),
}
strings_ru = {
"complete_captcha": (
"🚥 {}, пожалуйста, пройди капчу в течение 5'
" минут"
),
"captcha_timeout": (
"🚥 {} не'
" прошел капчу вовремя.\n👊 Действие: {}"
),
"captcha_failed": (
"🚥 {} не'
" прошел капчу.\n👊 Действие: {}"
),
"cas_on": (
"🛡 CAS теперь включен в"
" этом чате\nДействие: {}"
),
"cas_off": (
"🛡 CAS теперь выключен в"
" этом чате"
),
"cas": (
'🛡 {}'
" appears to be in Combat Anti Spam database.\n👊 Action: I {}"
),
"from_where": (
"🚫 Ответь на сообщение,"
" начиная с которого надо удалить."
),
"smart_anti_raid_active": (
"🥷 BanNinja работает в"
" поте лица, отбивая атаку на этот чат.\n\n{}Удалено {} бот(-ов)"
),
"forbid_messages": (
"⚠️ Я запретил отправку сообщений, пока атака не будет полностью"
" отражена\n\n"
),
"smart_anti_raid_off": "🚨 Остановить",
"smart_anti_raid_stopped": (
"🥷 BanNinja"
" остановлен"
),
"error": "😵 Произошла ошибка HikariChat",
"args": (
"🚫 Неверные"
" аргументы"
),
"no_reason": "Не указана",
"antitagall_on": (
"🐵 AntiTagAll теперь"
" включен в этом чате\nДействие: {}"
),
"antitagall_off": (
"🐵 AntiTagAll теперь"
" выключен в этом чате"
),
"antiarab_on": (
"🇵🇸 AntiArab теперь"
" включен в этом чате\nДействие: {}"
),
"antiarab_off": (
"🇵🇸 AntiArab теперь"
" выключен в этом чате"
),
"antizalgo_on": (
"🌀 AntiZALGO теперь"
" включен в этом чате\nДействие: {}"
),
"antizalgo_off": (
"🌀 AntiZALGO теперь"
" выключен в этом чате"
),
"antistick_on": (
"🎨 AntiStick теперь"
" включен в этом чате\nДействие: {}"
),
"antistick_off": (
"🎨 AntiStick теперь"
" выключен в этом чате"
),
"antihelp_on": (
"🐺 AntiHelp теперь"
" включен в этом чате"
),
"antihelp_off": (
"🐺 AntiHelp теперь"
" выключен в этом чате"
),
"antiraid_on": (
"🚪 AntiRaid теперь"
" включен в этом чате\nДействие: {}"
),
"antiraid_off": (
"🚪 AntiRaid теперь"
" выключен в этом чате"
),
"bnd_on": (
"💬 Block-Non-Discussion"
" теперь включен в этом чате\nДействие: {}"
),
"bnd_off": (
"💬 Block-Non-Discussion"
" теперь выключен в этом чате"
),
"antichannel_on": (
"📯 AntiChannel теперь"
" включен в этом чате"
),
"antichannel_off": (
"📯 AntiChannel теперь"
" выключен в этом чате"
),
"report_on": (
"📣 Report теперь включен"
" в этом чате"
),
"report_off": (
"📣 Report теперь"
" выключен"
" в этом чате"
),
"antiflood_on": (
"⏱ AntiFlood теперь"
" включен в этом чате\nДействие: {}"
),
"antiflood_off": (
"⏱ AntiFlood теперь"
" выключен в этом чате"
),
"antispoiler_on": (
"👻 AntiSpoiler теперь"
" включен в этом чате"
),
"antispoiler_off": (
"👻 AntiSpoiler теперь"
" выключен в этом чате"
),
"antigif_on": (
"🎑 AntiGIF теперь"
" включен"
" в этом чате"
),
"antigif_off": (
"🎑 AntiGIF теперь"
" выключен в этом чате"
),
"antiservice_on": (
"⚙️ AntiService теперь"
" включен в этом чате"
),
"antiservice_off": (
"⚙️ AntiService теперь"
" выключен в этом чате"
),
"banninja_on": (
"🥷 BanNinja теперь"
" включен в этом чате"
),
"banninja_off": (
"🥷 BanNinja теперь"
" выключен в этом чате"
),
"antiexplicit_on": (
"🤬 AntiExplicit теперь"
" включен в этом чате\nДействие: {}"
),
"antiexplicit_off": (
"🤬 AntiExplicit теперь"
" выключен в этом чате"
),
"antinsfw_on": (
"🔞 AntiNSFW теперь"
" включен в этом чате\nДействие: {}"
),
"antinsfw_off": (
"🔞 AntiNSFW теперь"
" выключен в этом чате"
),
"captcha_on": (
"🚥 Captcha теперь"
" включена в этом чате\nДействие: {}"
),
"captcha_off": (
"🚥 Captcha теперь"
" выключена в этом чате"
),
"no_fed_warns": (
"👮♀️ This federation has"
" no warns yet"
),
"warns_adm_fed": (
"👮♀️ Warns in this"
" federation:\n"
),
"welcome": (
"👋 Теперь я буду"
" приветствовать людей в этом чате\n{}"
),
"unwelcome": (
"👋 Я больше не буду"
" приветствовать людей в этом чате"
),
"chat404": "🔓 Этот чат еще не защищен.\n",
"not_admin": "🤷♂️ Я здесь не админ, или у меня недостаточно прав",
"no_defense": (
"🛡 Федеративный список"
" защиты пуст"
),
"defense_list": (
"🛡 Федеративный список"
" защиты:\n{}"
),
"fed404": (
"💼 Федерация не"
" найдена"
),
"fedexists": (
"💼 Федерация"
" существует"
),
"joinfed": (
"💼 Присоединился к"
" федерации"
),
"namedfed": (
"💼 Федерация"
" переименована в {}"
),
"nofed": (
"💼 Этот чат не находится"
" ни в одной из федераций"
),
"feds_header": (
"💼 Федерации:\n\n"
),
"no_fed": (
"💼 Этот чат не находится"
" ни в одной из федераций"
),
"api_error": (
"🚫 Ошибка"
" api.hikariatama.ru!\n{}"
),
"fsave_args": (
"💼 Пример: .fsave"
" shortname <reply>"
),
"fstop_args": (
"💼 Пример: .fstop"
" shortname"
),
"fsave": (
"💼 Федеративная заметка"
" {} сохранена!"
),
"fstop": (
"💼 Федеративная заметка"
" {} удалена!"
),
"fnotes": (
"💼 Федеративные"
" заметки:\n{}"
),
"usage": "ℹ️ Пример: .{} <on/off>",
"chat_only": "ℹ️ Эта команда предназначена для чатов",
"no_federations": (
"💼 Нет активных"
" федераций"
),
"clrallwarns_fed": (
"👮♀️ Прощены все"
" предупреждения в федерации"
),
"cleaning": (
"🫥 Поиск удаленных"
" аккаунтов..."
),
"deleted": (
"🫥 Удалено {} удаленных"
" аккаунтов"
),
"fcleaning": (
"🫥 Поиск удаленных"
" аккаунтов в федерации..."
),
"btn_unban": "🔓 Разбанить (админ)",
"btn_unmute": "🔈 Размутить (админ)",
"btn_unwarn": "♻️ Удалить предупреждение (админ)",
"btn_funmute": "🔈 Размутить в федерации (админ)",
"btn_funban": "🔓 Разбанить в федерации (админ)",
"btn_mute": "🙊 Мут",
"btn_ban": "🔒 Бан",
"btn_fban": "💼 Фед. бан",
"btn_del": "🗑 Удалить",
"sync": (
"🔄 Принудительная синхронизация федераций и чатов с сервером..."
),
"sync_complete": "😌 Сихнронизирован",
"rename_noargs": (
"🚫 Укажи имя"
" федерации"
),
"suffix_removed": "📼 Суффикс предупреждения удален",
"suffix_updated": "📼 Установлен новый суффикс предупреждения\n\n{}",
"processing_myrights": "😌 Обработка чатов",
"logchat_removed": "📲 Логирование отключено",
"logchat_invalid": (
"🚫 Неверный чат"
" логирования"
),
"logchat_set": "📲 Чат логирования установлен на {}",
"clnraid_args": (
"🥷 Пример:"
" .clnraid 10"
),
"clnraid_admin": (
"🥷 Ошибка выдачи прав"
" боту. Убедись, что у тебя достаточно прав"
),
"clnraid_started": (
"🥷 RaidCleaner"
" активен..."
" Найдено {} пользователей для бана..."
),
"clnraid_confirm": (
"🥷 Подтвердите запуск"
" RaidCleaner на {} пользователях"
),
"clnraid_yes": "🥷 Начать",
"banninja_report": (
"🥷 BanNinja закончил"
" работу.\nУдалено {} бот(-ов)\n\n🏹 «BanNinja can handle any"
" size of attack» © @hikariatama"
),
"clnraid_cancel": "🔻 Отмена",
"clnraid_stop": "🚨 Остановить",
"clnraid_complete": (
"🥷 RaidCleaner закончил"
" работу! Удалено: {} бот(-ов)"
),
"clnraid_cancelled": (
"🥷 RaidCleaner"
" остановлен. Удалено: {} бот(-ов)"
),
"confirm_rmfed_btn": "🗑 Удалить",
"decline_rmfed_btn": "🔻 Отмена",
"pil_unavailable": (
"🚫 Библиотека Pillow"
" недоступна"
),
"_cmd_doc_version": "Получить информацию о модуле",
"_cmd_doc_deleted": "Очистка удалнных аккаунтов в чате",
"_cmd_doc_fclean": "Очистка удаленных аккаунтов в федерации",
"_cmd_doc_newfed": " <имя> - Создать новую федерацию",
"_cmd_doc_rmfed": " - Удалить федерацию",
"_cmd_doc_fpromote": "<пользователь> - Выдать пользователю права в федерации",
"_cmd_doc_fdemote": (
" <пользователь> - Забрать у пользователя права в федерации"
),
"_cmd_doc_fadd": "<федерация> - Добавить чат в федерацию",
"_cmd_doc_frm": "Удалить чат из федерации",
"_cmd_doc_fban": "<пользователь> [причина] - Забанить пользователя в федерации",
"_cmd_doc_punishsuff": "Установить новый суффикс наказания",
"_cmd_doc_sethclog": "Установить чат логирования",
"_cmd_doc_funban": (
"<пользователь> [причина] - Разбанить пользователя в федерации"
),
"_cmd_doc_fmute": (
"<пользователь> [причина] - Замутить пользователя в федерации"
),
"_cmd_doc_funmute": (
"<пользователь> [причина] - Разбанить пользователя в федерации"
),
"_cmd_doc_kick": "<пользователь> [причина] - Кикнуть пользователя",
"_cmd_doc_ban": "<пользователь> [причина] - Забанить пользователя",
"_cmd_doc_mute": "<пользователь> [время] [причина] - Замутить пользователя",
"_cmd_doc_unmute": "<пользователь> - Размутить пользователя",
"_cmd_doc_unban": "<пользователь> - Разбанить пользователя",
"_cmd_doc_protects": "Показать доступные защиты",
"_cmd_doc_feds": "Показать федерации",
"_cmd_doc_fed": " - Информация о федерации",
"_cmd_doc_pchat": "Показать защиты в чате",
"_cmd_doc_warn": "<пользователь> - Предупредить пользователя",
"_cmd_doc_warns": (
"[пользователь] - Показать предупреждения в чате \\ у пользователя"
),
"_cmd_doc_delwarn": "<пользователь> - Простить последнее предупреждение",
"_cmd_doc_clrwarns": (
"<пользователь> - Простить все предупреждения пользователя"
),
"_cmd_doc_clrallwarns": "Простить все предупреждения в федерации",
"_cmd_doc_welcome": " - Изменить текст приветствовия",
"_cmd_doc_fdef": (
"<пользователь> - Включить\\выключить федеративную защиту пользователя"
),
"_cmd_doc_fsave": " - Сохранить федеративную заметку",
"_cmd_doc_fstop": " - Удалить федеративную заметку",
"_cmd_doc_fnotes": "Показать федеративные заметки",
"_cmd_doc_fdeflist": "Показать федеративный список защиты",
"_cmd_doc_dmute": "Удалить и замутить",
"_cmd_doc_dban": "Удалить и забанить",
"_cmd_doc_dwarn": "Удалить и предупредить",
"_cmd_doc_fsync": "Принудительная синхронизация федераций и чатов с сервером",
"_cmd_doc_frename": "Переименовать федерацию",
"_cmd_doc_myrights": "Показать все права администратора во всех чатах",
"action": "<действие>",
"configure": "Настроить",
"toggle": "Включить\\выключить",
"fed": (
"💼 Федерация"
' "{}":\n🔰'
" Чаты:\n{}\n🔰 Каналы:\n{}\n🔰"
" Админы:\n{}\n🔰 Предупреждения: {}\n"
),
"confirm_rmfed": (
"⚠️ Внимание! Это действие нельзя отменить! Ты уверен, что хочешь"
" удалить федерацию {}?"
),
"_cls_doc": "Must-have модуль администратора чата",
"no_notes": (
"🚫 Нет заметок"
),
}
def __init__(self):
self._punish_queue = []
self._raid_cleaners = []
self._global_queue = []
self._captcha_db = {}
self._captcha_messages = {}
self._ban_ninja = {}
self._ban_ninja_messages = []
self._ban_ninja_forms = {}
self._ban_ninja_progress = {}
self._ban_ninja_tasks = {}
self._ban_ninja_default_rights = {}
self.flood_timeout = FLOOD_TIMEOUT
self.flood_threshold = FLOOD_TRESHOLD
self._my_protects = {}
self._linked_channels = {}
self._sticks_ratelimit = {}
self._flood_fw_protection = {}
self._ratelimit = {"notes": {}, "report": {}}
self._delete_soon = []
self._gban_cache = {}
self.config = loader.ModuleConfig(
loader.ConfigValue(
"silent",
False,
lambda: "Do not notify about protections actions",
validator=loader.validators.Boolean(),
),
loader.ConfigValue(
"join_ratelimit",
10,
lambda: (
"How many users per minute need to join until BanNinja activates"
),
validator=loader.validators.Integer(minimum=1),
),
loader.ConfigValue(
"banninja_cooldown",
300,
lambda: "How long is BanNinja supposed to be active in seconds",
validator=loader.validators.Integer(minimum=15),
),
loader.ConfigValue(
"warns_limit",
7,
lambda: "How many warns can be issued before ban",
validator=loader.validators.Integer(minimum=1),
),
loader.ConfigValue(
"close_on_raid",
True,
lambda: "Close chat on raid with active BanNinja",
validator=loader.validators.Boolean(),
),
)
def render_table(self, t: typing.List[typing.List[str]]) -> bytes:
table = gen_table(t)
fnt = ImageFont.truetype(io.BytesIO(self.font), 20, encoding="utf-8")
def get_t_size(text, fnt):
if "\n" not in text:
return fnt.getsize(text)
w, h = 0, 0
for line in text.split("\n"):
line_size = fnt.getsize(line)
if line_size[0] > w:
w = line_size[0]
h += line_size[1]
w += 10
h += 10
return (w, h)
t_size = get_t_size(table, fnt)
img = Image.new("RGB", t_size, (30, 30, 30))
d = ImageDraw.Draw(img)
d.text((5, 5), table, font=fnt, fill=(200, 200, 200))
imgByteArr = io.BytesIO()
img.save(imgByteArr, format="PNG")
imgByteArr = imgByteArr.getvalue()
return imgByteArr
async def on_unload(self):
with contextlib.suppress(Exception):
self.api._task.cancel()
with contextlib.suppress(Exception):
self._pt_task.cancel()
with contextlib.suppress(Exception):
self.api._processor_task.cancel()
with contextlib.suppress(Exception):
for _, form in self._ban_ninja_forms.items():
with contextlib.suppress(Exception):
await form.delete()
def lookup(self, modname: str):
return next(
(
mod
for mod in self.allmodules.modules
if mod.name.lower() == modname.lower()
),
False,
)
async def check_admin(
self,
chat_id: typing.Union[Chat, Channel, int],
user_id: typing.Union[User, int],
) -> bool:
"""
Checks if user is admin in target chat
"""
try:
return (await self._client.get_permissions(chat_id, user_id)).is_admin
# We could've ignored only ValueError to check
# entity for validity, but there are many errors
# possible to occur, so we ignore all of them, bc
# actually we don't give a fuck about 'em
except Exception:
return (
user_id in self._client.dispatcher.security._owner
or user_id in self._client.dispatcher.security._sudo
)
def chat_command(function) -> FunctionType:
"""
Decorator to allow execution of certain commands in chat only
"""
@functools.wraps(function)
async def wrapped(*args, **kwargs):
if len(args) < 2 or not isinstance(args[1], Message):
return await function(*args, **kwargs)
if args[1].is_private:
await utils.answer(args[1], args[0].strings("chat_only"))
return
return await function(*args, **kwargs)
wrapped.__doc__ = function.__doc__
wrapped.__module__ = function.__module__
return wrapped
def error_handler(function) -> FunctionType:
"""
Decorator to handle functions' errors
"""
@functools.wraps(function)
async def wrapped(*args, **kwargs):
try:
return await function(*args, **kwargs)
except Exception:
logger.exception("Exception caught in HikariChat")
if function.__name__.startswith("p__"):
return
if function.__name__ == "watcher":
return
wrapped.__doc__ = function.__doc__
wrapped.__module__ = function.__module__
return wrapped
async def get_config(self, chat: typing.Union[str, int]) -> tuple:
info = self.api.chats[str(chat)]
cinfo = await self._client.get_entity(int(chat))
answer_message = (
f"🪆 HikariChat protection\n{get_full_name(cinfo)}\n\n"
)
protections = {
key: value
for key, value in PROTECTS.items()
if key in self.api.variables["protections"]
}
btns = []
for protection, style in protections.items():
answer_message += (
f" {style}: {info[protection][0]}\n"
if protection in info
else ""
)
style = style if protection in info else style[2:]
btns += [
{
"text": style,
"callback": self._change_protection_state,
"args": (chat, protection),
}
]
fed = None
for info in self.api.feds.values():
if str(chat) in info["chats"]:
fed = info
answer_message += (
f"\n💼 {fed['name']}"
if fed
else ""
)
btns = utils.chunks(btns, 3) + [[{"text": "❌ Close", "action": "close"}]]
return {"text": answer_message, "reply_markup": btns}
async def _inline_config(self, call: CallbackQuery, chat: typing.Union[str, int]):
await call.edit(**(await self.get_config(chat)))
async def _change_protection_state(
self,
call: CallbackQuery,
chat: typing.Union[str, int],
protection: str,
state: typing.Optional[str] = None,
):
if protection == "welcome":
await call.answer("Use .welcome to configure this option!", show_alert=True)
return
if protection in self.api.variables["argumented_protects"]:
if state is None:
cinfo = await self._client.get_entity(int(chat))
markup = utils.chunks(
[
{
"text": "🔒 Ban",
"callback": self._change_protection_state,
"args": (chat, protection, "ban"),
},
{
"text": "🙊 Mute",
"callback": self._change_protection_state,
"args": (chat, protection, "mute"),
},
{
"text": "🤕 Warn",
"callback": self._change_protection_state,
"args": (chat, protection, "warn"),
},
{
"text": "🚪 Kick",
"callback": self._change_protection_state,
"args": (chat, protection, "kick"),
},
{
"text": "😶🌫️ Delmsg",
"callback": self._change_protection_state,
"args": (chat, protection, "delmsg"),
},
{
"text": "🚫 Off",
"callback": self._change_protection_state,
"args": (chat, protection, "off"),
},
],
3,
) + [
[
{
"text": "🔙 Back",
"callback": self._inline_config,
"args": (chat,),
}
]
]
current_state = (
"off"
if protection not in self.api.chats[str(chat)]
else self.api.chats[str(chat)][protection][0]
)
await call.edit(
(
f"🌁 {get_full_name(cinfo)}:"
f" {PROTECTS[protection]} (now: {current_state})"
),
reply_markup=markup,
)
else:
self.api.request(
{
"action": "update protections",
"args": {
"chat": chat,
"protection": protection,
"state": state,
},
}
)
await call.answer("Configuration value saved")
if state != "off":
self.api.chats[str(chat)][protection] = [state, str(self._tg_id)]
else:
del self.api.chats[str(chat)][protection]
await self._inline_config(call, chat)
else:
current_state = protection in self.api.chats[str(chat)]
self.api.request(
{
"action": "update protections",
"args": {
"chat": chat,
"protection": protection,
"state": "off" if current_state else "on",
},
}
)
await call.answer(
f"{PROTECTS[protection]} -> {'off' if current_state else 'on'}"
)
if current_state:
del self.api.chats[str(chat)][protection]
else:
self.api.chats[str(chat)][protection] = ["on", str(self._tg_id)]
await self._inline_config(call, chat)
@error_handler
async def protect(self, message: Message, protection: str):
"""
Protection toggle handler
"""
args = utils.get_args_raw(message)
chat = utils.get_chat_id(message)
await self._promote_bot(chat)
if protection in self.api.variables["argumented_protects"]:
if args not in self.api.variables["protect_actions"] or args == "off":
args = "off"
await utils.answer(message, self.strings(f"{protection}_off"))
else:
await utils.answer(
message,
self.strings(f"{protection}_on").format(args),
)
elif args == "on":
await utils.answer(message, self.strings(f"{protection}_on"))
elif args == "off":
await utils.answer(
message,
self.strings(f"{protection}_off").format(args),
)
else:
await utils.answer(message, self.strings("usage").format(protection))
return
self.api.request(
{
"action": "update protections",
"args": {"protection": protection, "state": args, "chat": chat},
},
message,
)
def protection_template(self, protection: str) -> FunctionType:
"""
Template for protection toggler
For internal use only
"""
comments = self.api.variables["named_protects"]
func_name = f"{protection}cmd"
function = functools.partial(self.protect, protection=protection)
function.__module__ = self.__module__
function.__name__ = func_name
function.__self__ = self
args = (
self.strings("action")
if protection in self.api.variables["argumented_protects"]
else ""
)
action = (
self.strings("configure")
if protection in self.api.variables["argumented_protects"]
else self.strings("toggle")
)
function.__doc__ = f"{args} - {action} {comments[protection]}"
return function
@staticmethod
def convert_time(t: str) -> int:
"""
Tries to export time from text
"""
try:
if not str(t)[:-1].isdigit():
return 0
if "d" in str(t):
t = int(t[:-1]) * 60 * 60 * 24
if "h" in str(t):
t = int(t[:-1]) * 60 * 60
if "m" in str(t):
t = int(t[:-1]) * 60
if "s" in str(t):
t = int(t[:-1])
t = int(re.sub(r"[^0-9]", "", str(t)))
except ValueError:
return 0
return t
async def ban(
self,
chat: typing.Union[Chat, int],
user: typing.Union[User, Channel, int],
period: int = 0,
reason: str = None,
message: typing.Optional[Message] = None,
silent: bool = False,
):
"""Ban user in chat"""
if str(user).isdigit():
user = int(user)
if reason is None:
reason = self.strings("no_reason")
try:
await self.inline.bot.kick_chat_member(
int(f"-100{getattr(chat, 'id', chat)}"),
int(getattr(user, "id", user)),
)
except Exception:
logger.debug("Can't ban with bot", exc_info=True)
await self._client.edit_permissions(
chat,
user,
until_date=(time.time() + period) if period else 0,
**BANNED_RIGHTS,
)
if silent:
return
msg = self.strings("ban").format(
utils.get_link(user),
get_full_name(user),
f"for {period // 60} min(-s)" if period else "forever",
reason,
self.get("punish_suffix", ""),
)
if self._is_inline:
if self.get("logchat"):
if not isinstance(chat, (Chat, Channel)):
chat = await self._client.get_entity(chat)
await self.inline.form(
message=self.get("logchat"),
text=self.strings("ban_log").format(
utils.get_link(user),
get_full_name(user),
f"for {period // 60} min(-s)" if period else "forever",
utils.get_link(chat),
get_full_name(chat),
reason,
"",
),
reply_markup={
"text": self.strings("btn_unban"),
"data": (
f"ub/{chat.id if isinstance(chat, (Chat, Channel)) else chat}/{user.id}"
),
},
silent=True,
)
if isinstance(message, Message):
await utils.answer(message, msg)
else:
await self._client.send_message(chat.id, msg)
else:
await self.inline.form(
message=(
message
if isinstance(message, Message)
else getattr(chat, "id", chat)
),
text=msg,
reply_markup={
"text": self.strings("btn_unban"),
"data": (
f"ub/{chat.id if isinstance(chat, (Chat, Channel)) else chat}/{user.id}"
),
},
silent=True,
)
else:
await (utils.answer if message else self._client.send_message)(
message or chat.id, msg
)
async def mute(
self,
chat: typing.Union[Chat, int],
user: typing.Union[User, Channel, int],
period: int = 0,
reason: str = None,
message: typing.Optional[Message] = None,
silent: bool = False,
):
"""Mute user in chat"""
if str(user).isdigit():
user = int(user)
if reason is None:
reason = self.strings("no_reason")
try:
await self.inline.bot.restrict_chat_member(
int(f"-100{getattr(chat, 'id', chat)}"),
int(getattr(user, "id", user)),
permissions=ChatPermissions(can_send_messages=False),
until_date=time.time() + period,
)
except Exception:
logger.debug("Can't mute with bot", exc_info=True)
await self._client.edit_permissions(
chat,
user,
until_date=time.time() + period,
send_messages=False,
)
if silent:
return
msg = self.strings("mute").format(
utils.get_link(user),
get_full_name(user),
f"for {period // 60} min(-s)" if period else "forever",
reason,
self.get("punish_suffix", ""),
)
if self._is_inline:
if self.get("logchat"):
if not isinstance(chat, (Chat, Channel)):
chat = await self._client.get_entity(chat)
await self.inline.form(
message=self.get("logchat"),
text=self.strings("mute_log").format(
utils.get_link(user),
get_full_name(user),
f"for {period // 60} min(-s)" if period else "forever",
utils.get_link(chat),
get_full_name(chat),
reason,
"",
),
reply_markup={
"text": self.strings("btn_unmute"),
"data": (
f"um/{chat.id if isinstance(chat, (Chat, Channel)) else chat}/{user.id}"
),
},
silent=True,
)
if isinstance(message, Message):
await utils.answer(message, msg)
else:
await self._client.send_message(chat.id, msg)
else:
await self.inline.form(
message=(
message
if isinstance(message, Message)
else getattr(chat, "id", chat)
),
text=msg,
reply_markup={
"text": self.strings("btn_unmute"),
"data": (
f"um/{chat.id if isinstance(chat, (Chat, Channel)) else chat}/{user.id}"
),
},
silent=True,
)
else:
await (utils.answer if message else self._client.send_message)(
message or chat.id, msg
)
@loader.inline_everyone
async def actions_callback_handler(self, call: CallbackQuery):
"""
Handles unmute, unban, unwarn etc. button clicks
"""
if not re.match(r"[fbmudw]{1,3}\/[-0-9]+\/[-#0-9]+", call.data):
return
action, chat, user = call.data.split("/")
msg_id = None
try:
user, msg_id = user.split("#")
msg_id = int(msg_id)
except Exception:
pass
chat, user = int(chat), int(user)
if not await self.check_admin(chat, call.from_user.id):
await call.answer("You are not admin")
return
try:
user = await self._client.get_entity(user)
except Exception:
await call.answer("Unable to resolve entity")
return
try:
adm = await self._client.get_entity(call.from_user.id)
except Exception:
await call.answer("Unable to resolve admin entity")
return
p = (
await self._client(GetParticipantRequest(chat, call.from_user.id))
).participant
owner = isinstance(p, ChannelParticipantCreator)
if action == "ub":
if not owner and not p.admin_rights.ban_users:
await call.answer("Not enough rights!")
return
await self._client.edit_permissions(
chat,
user,
until_date=0,
**{right: True for right in BANNED_RIGHTS.keys()},
)
msg = self.strings("inline_unbanned").format(
utils.get_link(user),
get_full_name(user),
utils.get_link(adm),
get_full_name(adm),
)
try:
await self.inline.bot.edit_message_text(
msg,
inline_message_id=call.inline_message_id,
parse_mode="HTML",
disable_web_page_preview=False,
)
except Exception:
await self._client.send_message(chat, msg)
elif action == "um":
if not owner and not p.admin_rights.ban_users:
await call.answer("Not enough rights!")
return
await self._client.edit_permissions(
chat,
user,
until_date=0,
send_messages=True,
)
msg = self.strings("inline_unmuted").format(
utils.get_link(user),
get_full_name(user),
utils.get_link(adm),
get_full_name(adm),
)
try:
await self.inline.bot.edit_message_text(
msg,
inline_message_id=call.inline_message_id,
parse_mode="HTML",
disable_web_page_preview=False,
)
except Exception:
await self._client.send_message(chat, msg)
elif action == "dw":
if not owner and not p.admin_rights.ban_users:
await call.answer("Not enough rights!")
return
fed = await self.find_fed(chat)
self.api.request(
{
"action": "forgive user warn",
"args": {"uid": self.api.feds[fed]["uid"], "user": user.id},
}
)
msg = self.strings("inline_unwarned").format(
utils.get_link(user),
get_full_name(user),
utils.get_link(adm),
get_full_name(adm),
)
try:
await self.inline.bot.edit_message_text(
msg,
inline_message_id=call.inline_message_id,
parse_mode="HTML",
disable_web_page_preview=False,
)
except Exception:
await self._client.send_message(chat, msg)
elif action == "ufb":
if not owner and not p.admin_rights.ban_users:
await call.answer("Not enough rights!")
return
m = await self._client.send_message(
chat, f"{self.get_prefix()}funban {user.id}"
)
await self.funbancmd(m)
await m.delete()
msg = self.strings("inline_funbanned").format(
utils.get_link(user),
get_full_name(user),
utils.get_link(adm),
get_full_name(adm),
)
try:
await self.inline.bot.edit_message_text(
msg,
inline_message_id=call.inline_message_id,
parse_mode="HTML",
disable_web_page_preview=False,
)
except Exception:
await self._client.send_message(chat, msg)
elif action == "ufm":
if not owner and not p.admin_rights.ban_users:
await call.answer("Not enough rights!")
return
m = await self._client.send_message(
chat, f"{self.get_prefix()}funmute {user.id}"
)
await self.funmutecmd(m)
await m.delete()
msg = self.strings("inline_funmuted").format(
utils.get_link(user),
get_full_name(user),
utils.get_link(adm),
get_full_name(adm),
)
try:
await self.inline.bot.edit_message_text(
msg,
inline_message_id=call.inline_message_id,
parse_mode="HTML",
disable_web_page_preview=False,
)
except Exception:
await self._client.send_message(chat, msg)
elif action == "fb":
if not owner and not p.admin_rights.ban_users:
await call.answer("Not enough rights!")
return
m = await self._client.send_message(
chat, f"{self.get_prefix()}fban {user.id}"
)
await self.fbancmd(m)
await m.delete()
msg = self.strings("inline_fbanned").format(
utils.get_link(user),
get_full_name(user),
utils.get_link(adm),
get_full_name(adm),
)
try:
await self.inline.bot.edit_message_text(
msg,
inline_message_id=call.inline_message_id,
parse_mode="HTML",
disable_web_page_preview=False,
)
except Exception:
await self._client.send_message(chat, msg)
elif action == "m":
if not owner and not p.admin_rights.ban_users:
await call.answer("Not enough rights!")
return
await self.mute(chat, user, 0, silent=True)
msg = self.strings("inline_muted").format(
utils.get_link(user),
get_full_name(user),
utils.get_link(adm),
get_full_name(adm),
)
try:
await self.inline.bot.edit_message_text(
msg,
inline_message_id=call.inline_message_id,
parse_mode="HTML",
disable_web_page_preview=False,
)
except Exception:
await self._client.send_message(chat, msg)
elif action == "d":
if not owner and not p.admin_rights.delete_messages:
await call.answer("Not enough rights!")
return
msg = self.strings("inline_deleted").format(
utils.get_link(adm),
get_full_name(adm),
)
await self.inline.bot.edit_message_text(
msg,
inline_message_id=call.inline_message_id,
parse_mode="HTML",
disable_web_page_preview=False,
)
else:
return
if msg_id is not None:
await self._client.delete_messages(chat, message_ids=[msg_id])
async def args_parser(
self,
message: Message,
include_force: bool = False,
include_silent: bool = False,
) -> tuple:
"""Get args from message"""
args = " " + utils.get_args_raw(message)
if include_force and " -f" in args:
force = True
args = args.replace(" -f", "")
else:
force = False
if include_silent and " -s" in args:
silent = True
args = args.replace(" -s", "")
else:
silent = False
args = args.strip()
reply = await message.get_reply_message()
if reply and not args:
return (
(await self._client.get_entity(reply.sender_id)),
0,
utils.escape_html(self.strings("no_reason")).strip(),
*((force,) if include_force else []),
*((silent,) if include_silent else []),
)
try:
a = args.split()[0]
if str(a).isdigit():
a = int(a)
user = await self._client.get_entity(a)
except Exception:
try:
user = await self._client.get_entity(reply.sender_id)
except Exception:
return False
t = ([arg for arg in args.split() if self.convert_time(arg)] or ["0"])[0]
args = args.replace(t, "").replace(" ", " ")
t = self.convert_time(t)
if not reply:
try:
args = " ".join(args.split()[1:])
except Exception:
pass
if time.time() + t >= 2208978000: # 01.01.2040 00:00:00
t = 0
return (
user,
t,
utils.escape_html(args or self.strings("no_reason")).strip(),
*((force,) if include_force else []),
*((silent,) if include_silent else []),
)
async def find_fed(self, message: typing.Union[Message, int]) -> str:
"""Find if chat belongs to any federation"""
return next(
(
federation
for federation, info in self.api.feds.items()
if str(
utils.get_chat_id(message)
if isinstance(message, Message)
else message
)
in list(map(str, info["chats"]))
),
None,
)
@error_handler
async def punish(
self,
chat_id: int,
user: typing.Union[int, Channel, User],
violation: str,
action: str,
user_name: str,
fulltime: bool = False,
message: Message = None,
):
"""
Callback, called if the protection is triggered
Queue is being used to prevent spammy behavior
It is being processed in a loop `_punish_queue_handler`
"""
self._punish_queue += [
[chat_id, user, violation, action, user_name, fulltime, message]
]
@error_handler
async def purgecmd(self, message: Message):
"""[user(-s)] - Clean message history starting from replied one"""
if not message.is_reply:
await utils.answer(message, self.strings("from_where", message))
return
from_users = set()
args = utils.get_args(message)
for arg in args:
try:
entity = await message.client.get_entity(arg)
if isinstance(entity, User):
from_users.add(entity.id)
except ValueError:
pass
messages = []
async for msg in self._client.iter_messages(
entity=message.peer_id,
min_id=message.reply_to_msg_id - 1,
reverse=True,
):
logger.debug(msg)
if (not from_users or msg.sender_id in from_users) and (
not getattr(message.reply_to, "forum_topic", False)
or msg.reply_to
and (msg.reply_to.reply_to_top_id or msg.reply_to.reply_to_msg_id)
== (
message.reply_to.reply_to_top_id or message.reply_to.reply_to_msg_id
)
):
messages += [msg.id]
if len(messages) >= 99:
await self._client.delete_messages(message.peer_id, messages)
messages.clear()
if messages:
await self._client.delete_messages(message.peer_id, messages)
async def delcmd(self, message):
"""Delete the replied message"""
await self._client.delete_messages(
message.peer_id,
[
(
(
await self._client.iter_messages(
message.peer_id, 1, max_id=message.id
).__anext__()
)
if not message.is_reply
else (await message.get_reply_message())
).id,
message.id,
],
)
@loader.loop(interval=0.5, autostart=True)
async def _punish_queue_handler(self):
while self._punish_queue:
(
chat_id,
user,
violation,
action,
user_name,
fulltime,
message,
) = self._punish_queue.pop()
if str(chat_id) not in self._flood_fw_protection:
self._flood_fw_protection[str(chat_id)] = {}
if (
self._flood_fw_protection[str(chat_id)].get(str(user.id), 0)
>= time.time()
):
continue
comment = None
if action == "ban":
comment = "banned him"
await self.ban(
chat_id,
user,
0,
violation,
silent=str(chat_id) in self._ban_ninja or self.config["silent"],
)
elif action == "fban":
comment = "f-banned him"
await self.fbancmd(
await self._client.send_message(
chat_id,
f"{self.get_prefix()}fban {user.id} {violation}",
)
)
elif action == "delmsg":
# Do nothing...
...
elif action == "kick":
comment = "kicked him"
await self._client.kick_participant(chat_id, user)
elif action == "mute":
if fulltime:
comment = "muted him forever"
await self.mute(
chat_id,
user,
0,
violation,
silent=str(chat_id) in self._ban_ninja or self.config["silent"],
)
else:
comment = "muted him for 1 hour"
await self.mute(
chat_id,
user,
60 * 60,
violation,
silent=str(chat_id) in self._ban_ninja or self.config["silent"],
)
elif action == "warn":
comment = "warned him"
warn_msg = await self._client.send_message(
chat_id, f".warn {user.id} {violation}"
)
await self.allmodules.commands["warn"](warn_msg)
await warn_msg.delete()
if message is not None:
try:
await self.inline.bot.delete_message(
int(f"-100{chat_id}"),
message.id,
)
except Exception:
with contextlib.suppress(Exception):
await message.delete()
if not comment:
continue
if str(chat_id) not in self._ban_ninja and not self.config["silent"]:
self._flood_fw_protection[str(chat_id)][str(user.id)] = round(
time.time() + 10
)
await self._client.send_message(
chat_id,
self.strings(violation).format(
utils.get_link(user),
user_name,
comment,
),
)
@error_handler
async def versioncmd(self, message: Message):
"""Get module info"""
await utils.answer(
message,
self.strings("version").format(
ver,
(
"✅ Connected"
if self.api._connected
else ("🔁 Connecting..." if self.api._inited else "🗃 Local")
),
),
)
@error_handler
@chat_command
async def deletedcmd(self, message: Message):
"""Remove deleted accounts from chat"""
chat = await message.get_chat()
if not chat.admin_rights and not chat.creator:
await utils.answer(message, self.strings("not_admin"))
return
kicked = 0
message = await utils.answer(message, self.strings("cleaning"))
async for user in self._client.iter_participants(chat):
if user.deleted:
try:
await self._client.kick_participant(chat, user)
await self._client.edit_permissions(
chat,
user,
until_date=0,
**{right: True for right in BANNED_RIGHTS.keys()},
)
kicked += 1
except Exception:
pass
await utils.answer(message, self.strings("deleted").format(kicked))
@error_handler
@chat_command
async def fcleancmd(self, message: Message):
"""Remove deleted accounts from federation"""
fed = await self.find_fed(message)
if not fed:
await utils.answer(message, self.strings("no_fed"))
return
chats = self.api.feds[fed]["chats"]
cleaned_in = []
cleaned_in_c = []
message = await utils.answer(message, self.strings("fcleaning"))
overall = 0
for c in chats:
try:
if str(c).isdigit():
c = int(c)
chat = await self._client.get_entity(c)
except Exception:
continue
if not chat.admin_rights and not chat.creator:
continue
try:
kicked = 0
async for user in self._client.iter_participants(chat):
if user.deleted:
try:
await self._client.kick_participant(chat, user)
await self._client.edit_permissions(
chat,
user,
until_date=0,
**{right: True for right in BANNED_RIGHTS.keys()},
)
kicked += 1
except Exception:
pass
overall += kicked
cleaned_in += [
"👥 {utils.escape_html(chat.title)}'
f" - {kicked}"
]
except UserAdminInvalidError:
pass
if str(c) in self._linked_channels and self._linked_channels[str(c)]:
channel = await self._client.get_entity(self._linked_channels[str(c)])
kicked = 0
try:
async for user in self._client.iter_participants(
self._linked_channels[str(c)]
):
if user.deleted:
try:
await self._client.kick_participant(
self._linked_channels[str(c)],
user,
)
await self._client.edit_permissions(
self._linked_channels[str(c)],
user,
until_date=0,
**{right: True for right in BANNED_RIGHTS.keys()},
)
kicked += 1
except Exception:
pass
overall += kicked
cleaned_in_c += [
"📣 {utils.escape_html(channel.title)}'
f" - {kicked}"
]
except ChatAdminRequiredError:
pass
await utils.answer(
message,
self.strings("deleted").format(overall)
+ "\n\n"
+ "\n".join(cleaned_in)
+ ""
+ "\n\n"
+ "\n".join(cleaned_in_c)
+ "",
)
@error_handler
@chat_command
async def newfedcmd(self, message: Message):
""" - Create new federation"""
args = utils.get_args_raw(message)
if not args or args.count(" ") == 0:
await utils.answer(message, self.strings("args"))
return
shortname, name = args.split(maxsplit=1)
if shortname in self.api.feds:
await utils.answer(message, self.strings("fedexists"))
return
self.api.request(
{
"action": "create federation",
"args": {"shortname": shortname, "name": name},
},
message,
)
await utils.answer(message, self.strings("newfed").format(name))
async def inline__confirm_rmfed(self, call: CallbackQuery, args: str):
name = self.api.feds[args]["name"]
self.api.request(
{"action": "delete federation", "args": {"uid": self.api.feds[args]["uid"]}}
)
await call.edit(self.strings("rmfed").format(name))
@error_handler
@chat_command
async def rmfedcmd(self, message: Message):
""" - Remove federation"""
args = utils.get_args_raw(message)
if not args:
await utils.answer(message, self.strings("args"))
return
if args not in self.api.feds:
await utils.answer(message, self.strings("fed404"))
return
await self.inline.form(
self.strings("confirm_rmfed").format(
utils.escape_html(self.api.feds[args]["name"])
),
message=message,
reply_markup=[
{
"text": self.strings("confirm_rmfed_btn"),
"callback": self.inline__confirm_rmfed,
"args": (args,),
},
{
"text": self.strings("decline_rmfed_btn"),
"action": "close",
},
],
silent=True,
)
@error_handler
@chat_command
async def fpromotecmd(self, message: Message):
""" - Promote user in federation"""
fed = await self.find_fed(message)
if not fed:
await utils.answer(message, self.strings("no_fed"))
return
reply = await message.get_reply_message()
args = utils.get_args_raw(message)
if not reply and not args:
await utils.answer(message, self.strings("args"))
return
user = reply.sender_id if reply else args
try:
try:
if str(user).isdigit():
user = int(user)
obj = await self._client.get_entity(user)
except Exception:
await utils.answer(message, self.strings("args"))
return
name = get_full_name(obj)
except Exception:
await utils.answer(message, self.strings("args"))
return
self.api.request(
{
"action": "promote user in federation",
"args": {"uid": self.api.feds[fed]["uid"], "user": obj.id},
},
message,
)
await utils.answer(
message,
self.strings("fpromoted").format(
utils.get_link(obj),
name,
self.api.feds[fed]["name"],
),
)
@error_handler
@chat_command
async def fdemotecmd(self, message: Message):
""" - Demote user in federation"""
fed = await self.find_fed(message)
if not fed:
await utils.answer(message, self.strings("no_fed"))
return
reply = await message.get_reply_message()
args = utils.get_args_raw(message)
if not reply and not args:
await utils.answer(message, self.strings("args"))
return
user = reply.sender_id if reply else args
try:
try:
if str(user).isdigit():
user = int(user)
obj = await self._client.get_entity(user)
except Exception:
await utils.answer(message, self.strings("args"))
return
user = obj.id
name = get_full_name(obj)
except Exception:
logger.exception("Parsing entity exception")
name = "User"
self.api.request(
{
"action": "demote user in federation",
"args": {"uid": self.api.feds[fed]["uid"], "user": obj.id},
},
message,
)
await utils.answer(
message,
self.strings("fdemoted").format(
user,
name,
self.api.feds[fed]["name"],
),
)
@error_handler
@chat_command
async def faddcmd(self, message: Message):
""" - Add chat to federation"""
args = utils.get_args_raw(message)
if not args:
await utils.answer(message, self.strings("args"))
return
if args not in self.api.feds:
await utils.answer(message, self.strings("fed404"))
return
chat = utils.get_chat_id(message)
self.api.request(
{
"action": "add chat to federation",
"args": {"uid": self.api.feds[args]["uid"], "cid": chat},
},
message,
)
await utils.answer(
message,
self.strings("fadded").format(
self.api.feds[args]["name"],
),
)
@error_handler
@chat_command
async def frmcmd(self, message: Message):
"""Remove chat from federation"""
fed = await self.find_fed(message)
if not fed:
await utils.answer(message, self.strings("fed404"))
return
chat = utils.get_chat_id(message)
self.api.request(
{
"action": "remove chat from federation",
"args": {"uid": self.api.feds[fed]["uid"], "cid": chat},
},
message,
)
await utils.answer(
message,
self.strings("frem").format(
self.api.feds[fed]["name"],
),
)
@loader.command(
ru_doc=(
"<реплай | юзер> [причина] [-s] - Заблокировать пользователя во всех чатах,"
" где ты админ"
)
)
async def gban(self, message: Message):
""" [reason] [-s] - Ban user in all chats where you are admin"""
reply = await message.get_reply_message()
args = utils.get_args_raw(message)
if not reply and not args:
await utils.answer(message, self.strings("args"))
return
a = await self.args_parser(message, include_silent=True)
if not a:
await utils.answer(message, self.strings("args"))
return
user, t, reason, silent = a
message = await utils.answer(
message,
self.strings("gbanning").format(
utils.get_entity_url(user),
utils.escape_html(get_full_name(user)),
),
)
if not self._gban_cache or self._gban_cache["exp"] < time.time():
self._gban_cache = {
"exp": int(time.time()) + 10 * 60,
"chats": [
chat.entity.id
async for chat in self._client.iter_dialogs()
if (
(
isinstance(chat.entity, Chat)
or (
isinstance(chat.entity, Channel)
and getattr(chat.entity, "megagroup", False)
)
)
and chat.entity.admin_rights
and chat.entity.participants_count > 5
and chat.entity.admin_rights.ban_users
)
],
}
chats = ""
counter = 0
for chat in self._gban_cache["chats"]:
try:
await self.ban(chat, user, 0, reason, silent=True)
except Exception:
pass
else:
chats += '▫️ {}\n'.format(
utils.get_entity_url(await self._client.get_entity(chat, exp=0)),
utils.escape_html(
get_full_name(await self._client.get_entity(chat, exp=0))
),
)
counter += 1
await utils.answer(
message,
self.strings("gban").format(
utils.get_entity_url(user),
utils.escape_html(get_full_name(user)),
reason,
self.strings("in_n_chats").format(counter) if silent else chats,
),
)
@loader.command(
ru_doc=(
"<реплай | юзер> [причина] [-s] - Разблокировать пользователя во всех"
" чатах, где ты админ"
)
)
async def gunban(self, message: Message):
""" [reason] [-s] - Unban user in all chats where you are admin"""
reply = await message.get_reply_message()
args = utils.get_args_raw(message)
if not reply and not args:
await utils.answer(message, self.strings("args"))
return
a = await self.args_parser(message, include_silent=True)
if not a:
await utils.answer(message, self.strings("args"))
return
user, t, reason, silent = a
message = await utils.answer(
message,
self.strings("gunbanning").format(
utils.get_entity_url(user),
utils.escape_html(get_full_name(user)),
),
)
if not self._gban_cache or self._gban_cache["exp"] < time.time():
self._gban_cache = {
"exp": int(time.time()) + 10 * 60,
"chats": [
chat.entity.id
async for chat in self._client.iter_dialogs()
if (
(
isinstance(chat.entity, Chat)
or (
isinstance(chat.entity, Channel)
and getattr(chat.entity, "megagroup", False)
)
)
and chat.entity.admin_rights
and chat.entity.participants_count > 5
and chat.entity.admin_rights.ban_users
)
],
}
chats = ""
counter = 0
for chat in self._gban_cache["chats"]:
try:
await self._client.edit_permissions(
chat,
user,
until_date=0,
**{right: True for right in BANNED_RIGHTS.keys()},
)
except Exception:
pass
else:
chats += '▫️ {}\n'.format(
utils.get_entity_url(await self._client.get_entity(chat, exp=0)),
utils.escape_html(
get_full_name(await self._client.get_entity(chat, exp=0))
),
)
counter += 1
await utils.answer(
message,
self.strings("gunban").format(
utils.get_entity_url(user),
utils.escape_html(get_full_name(user)),
(
self.strings("unbanned_in_n_chats").format(counter)
if silent
else chats
),
),
)
@error_handler
@chat_command
async def fbancmd(self, message: Message):
""" [reason] - Ban user in federation"""
fed = await self.find_fed(message)
if not fed:
await utils.answer(message, self.strings("no_fed"))
return
a = await self.args_parser(message, include_force=True)
if not a:
await utils.answer(message, self.strings("args"))
return
user, t, reason, force = a
if not force and user.id in list(map(int, self.api.feds[fed]["fdef"])):
await utils.answer(message, self.strings("fdef403").format("fban"))
return
chats = self.api.feds[fed]["chats"]
banned_in = []
for c in chats:
try:
if str(c).isdigit():
c = int(c)
chat = await self._client.get_entity(c)
except Exception:
continue
if not chat.admin_rights and not chat.creator:
continue
try:
await self.ban(chat, user, t, reason, message, silent=True)
banned_in += [
f'{get_full_name(chat)}'
]
except Exception:
pass
msg = (
self.strings("fban").format(
utils.get_link(user),
get_first_name(user),
self.api.feds[fed]["name"],
f"for {t // 60} min(-s)" if t else "forever",
reason,
self.get("punish_suffix", ""),
)
+ "\n\n"
+ "\n".join(banned_in)
+ ""
)
if self._is_inline:
punishment_info = {
"reply_markup": {
"text": self.strings("btn_funban"),
"data": f"ufb/{utils.get_chat_id(message)}/{user.id}",
},
}
if self.get("logchat"):
await utils.answer(message, msg)
await self.inline.form(
text=self.strings("fban").format(
utils.get_link(user),
get_first_name(user),
self.api.feds[fed]["name"],
f"for {t // 60} min(-s)" if t else "forever",
reason,
"",
)
+ ""
+ "\n".join(banned_in)
+ "",
message=self.get("logchat"),
**punishment_info,
silent=True,
)
else:
await self.inline.form(
text=msg, message=message, **punishment_info, silent=True
)
else:
await utils.answer(message, msg)
self.api.request(
{
"action": "clear all user warns",
"args": {
"uid": self.api.feds[fed]["uid"],
"user": user.id,
"silent": True,
},
},
message,
)
reply = await message.get_reply_message()
if reply:
await reply.delete()
@error_handler
@chat_command
async def punishsuffcmd(self, message: Message):
"""Set new punishment suffix"""
if not utils.get_args_raw(message):
self.set("punish_suffix", "")
await utils.answer(message, self.strings("suffix_removed"))
else:
suffix = utils.get_args_raw(message)
self.set("punish_suffix", suffix)
await utils.answer(message, self.strings("suffix_updated").format(suffix))
@error_handler
@chat_command
async def sethclogcmd(self, message: Message):
"""Set logchat"""
if not utils.get_args_raw(message):
self.set("logchat", "")
await utils.answer(message, self.strings("logchat_removed"))
return
logchat = utils.get_args_raw(message)
if logchat.isdigit():
logchat = int(logchat)
try:
logchat = await self._client.get_entity(logchat)
except Exception:
await utils.answer(message, self.strings("logchat_invalid"))
return
self.set("logchat", logchat.id)
await utils.answer(
message,
self.strings("logchat_set").format(utils.escape_html(logchat.title)),
)
@error_handler
@chat_command
async def funbancmd(self, message: Message):
""" [reason] - Unban user in federation"""
fed = await self.find_fed(message)
if not fed:
await utils.answer(message, self.strings("no_fed"))
return
a = await self.args_parser(message)
if not a:
await utils.answer(message, self.strings("args"))
return
user, _, _ = a
chats = self.api.feds[fed]["chats"]
unbanned_in = []
for c in chats:
try:
if str(c).isdigit():
c = int(c)
chat = await self._client.get_entity(c)
except Exception:
continue
if not chat.admin_rights and not chat.creator:
continue
try:
await self._client.edit_permissions(
chat,
user,
until_date=0,
**{right: True for right in BANNED_RIGHTS.keys()},
)
unbanned_in += [chat.title]
except UserAdminInvalidError:
pass
m = (
self.strings("funban").format(
utils.get_link(user),
get_first_name(user),
self.api.feds[fed]["name"],
)
+ ""
+ "\n".join(unbanned_in)
+ ""
)
if self.get("logchat"):
await self._client.send_message(self.get("logchat"), m)
await utils.answer(message, m)
@error_handler
@chat_command
async def fmutecmd(self, message: Message):
""" [reason] - Mute user in federation"""
fed = await self.find_fed(message)
if not fed:
await utils.answer(message, self.strings("no_fed"))
return
a = await self.args_parser(message, include_force=True)
if not a:
await utils.answer(message, self.strings("args"))
return
user, t, reason, force = a
if not force and user.id in list(map(int, self.api.feds[fed]["fdef"])):
await utils.answer(message, self.strings("fdef403").format("fmute"))
return
chats = self.api.feds[fed]["chats"]
muted_in = []
for c in chats:
try:
if str(c).isdigit():
c = int(c)
chat = await self._client.get_entity(c)
except Exception:
continue
if not chat.admin_rights and not chat.creator:
continue
try:
await self.mute(chat, user, t, reason, message, silent=True)
muted_in += [
f'{get_full_name(chat)}'
]
except Exception:
pass
msg = (
self.strings("fmute").format(
utils.get_link(user),
get_first_name(user),
self.api.feds[fed]["name"],
f"for {t // 60} min(-s)" if t else "forever",
reason,
self.get("punish_suffix", ""),
)
+ "\n\n"
+ "\n".join(muted_in)
+ ""
)
if self._is_inline:
punishment_info = {
"reply_markup": {
"text": self.strings("btn_funmute"),
"data": f"ufm/{utils.get_chat_id(message)}/{user.id}",
},
}
if self.get("logchat"):
await utils.answer(message, msg)
await self.inline.form(
text=self.strings("fmute").format(
utils.get_link(user),
get_first_name(user),
self.api.feds[fed]["name"],
f"for {t // 60} min(-s)" if t else "forever",
reason,
"",
)
+ "\n\n"
+ "\n".join(muted_in)
+ "",
message=self.get("logchat"),
**punishment_info,
silent=True,
)
else:
await self.inline.form(
text=msg, message=message, **punishment_info, silent=True
)
else:
await utils.answer(message, msg)
@error_handler
@chat_command
async def funmutecmd(self, message: Message):
""" [reason] - Unban user in federation"""
fed = await self.find_fed(message)
if not fed:
await utils.answer(message, self.strings("no_fed"))
return
a = await self.args_parser(message)
if not a:
await utils.answer(message, self.strings("args"))
return
user, _, _ = a
chats = self.api.feds[fed]["chats"]
unbanned_in = []
for c in chats:
try:
if str(c).isdigit():
c = int(c)
chat = await self._client.get_entity(c)
except Exception:
continue
if not chat.admin_rights and not chat.creator:
continue
try:
await self._client.edit_permissions(
chat,
user,
until_date=0,
**{right: True for right in BANNED_RIGHTS.keys()},
)
unbanned_in += [chat.title]
except UserAdminInvalidError:
pass
msg = (
self.strings("funmute").format(
utils.get_link(user),
get_first_name(user),
self.api.feds[fed]["name"],
)
+ "\n\n"
+ "\n".join(unbanned_in)
+ ""
)
await utils.answer(message, msg)
if self.get("logchat"):
await self._client.send_message(self.get("logchat"), msg)
@error_handler
@chat_command
async def kickcmd(self, message: Message):
""" [reason] - Kick user"""
chat = await message.get_chat()
if not chat.admin_rights and not chat.creator:
await utils.answer(message, self.strings("not_admin"))
return
reply = await message.get_reply_message()
args = utils.get_args_raw(message)
user, reason = None, None
try:
if reply:
user = await self._client.get_entity(reply.sender_id)
reason = args or self.strings
else:
uid = args.split(maxsplit=1)[0]
if str(uid).isdigit():
uid = int(uid)
user = await self._client.get_entity(uid)
reason = (
args.split(maxsplit=1)[1]
if len(args.split(maxsplit=1)) > 1
else self.strings("no_reason")
)
except Exception:
await utils.answer(message, self.strings("args"))
return
try:
await self._client.kick_participant(utils.get_chat_id(message), user)
msg = self.strings("kick").format(
utils.get_link(user),
get_first_name(user),
reason,
self.get("punish_suffix", ""),
)
await utils.answer(message, msg)
if self.get("logchat"):
await self._client.send_message(
self.get("logchat"),
self.strings("kick_log").format(
utils.get_link(user),
get_first_name(user),
utils.get_link(chat),
get_first_name(chat),
reason,
"",
),
)
except UserAdminInvalidError:
await utils.answer(message, self.strings("not_admin"))
return
@error_handler
@chat_command
async def bancmd(self, message: Message):
""" [reason] - Ban user"""
chat = await message.get_chat()
a = await self.args_parser(message, include_force=True)
if not a:
await utils.answer(message, self.strings("args"))
return
user, t, reason, force = a
if not chat.admin_rights and not chat.creator:
await utils.answer(message, self.strings("not_admin"))
return
fed = await self.find_fed(message)
if (
not force
and fed in self.api.feds
and user.id in list(map(int, self.api.feds[fed]["fdef"]))
):
await utils.answer(message, self.strings("fdef403").format("ban"))
return
try:
await self.ban(chat, user, t, reason, message)
except UserAdminInvalidError:
await utils.answer(message, self.strings("not_admin"))
return
@error_handler
@chat_command
async def mutecmd(self, message: Message):
""" [time] [reason] - Mute user"""
chat = await message.get_chat()
a = await self.args_parser(message, include_force=True)
if not a:
await utils.answer(message, self.strings("args"))
return
user, t, reason, force = a
if not chat.admin_rights and not chat.creator:
await utils.answer(message, self.strings("not_admin"))
return
fed = await self.find_fed(message)
if (
not force
and fed in self.api.feds
and user.id in list(map(int, self.api.feds[fed]["fdef"]))
):
await utils.answer(message, self.strings("fdef403").format("mute"))
return
try:
await self.mute(chat, user, t, reason, message)
except UserAdminInvalidError:
await utils.answer(message, self.strings("not_admin"))
return
@error_handler
@chat_command
async def unmutecmd(self, message: Message):
""" - Unmute user"""
chat = await message.get_chat()
if not chat.admin_rights and not chat.creator:
await utils.answer(message, self.strings("not_admin"))
return
reply = await message.get_reply_message()
args = utils.get_args_raw(message)
user = None
try:
if args.isdigit():
args = int(args)
user = await self._client.get_entity(args)
except Exception:
try:
user = await self._client.get_entity(reply.sender_id)
except Exception:
await utils.answer(message, self.strings("args"))
return
try:
await self._client.edit_permissions(
chat,
user,
until_date=0,
send_messages=True,
)
msg = self.strings("unmuted").format(
utils.get_link(user), get_first_name(user)
)
await utils.answer(message, msg)
if self.get("logchat"):
await self._client.send_message(
self.get("logchat"),
self.strings("unmuted_log").format(
utils.get_link(user),
get_first_name(user),
utils.get_link(chat),
get_first_name(chat),
),
)
except UserAdminInvalidError:
await utils.answer(message, self.strings("not_admin"))
return
@error_handler
@chat_command
async def unbancmd(self, message: Message):
""" - Unban user"""
chat = await message.get_chat()
if not chat.admin_rights and not chat.creator:
await utils.answer(message, self.strings("not_admin"))
return
reply = await message.get_reply_message()
args = utils.get_args_raw(message)
user = None
try:
if args.isdigit():
args = int(args)
user = await self._client.get_entity(args)
except Exception:
try:
user = await self._client.get_entity(reply.sender_id)
except Exception:
await utils.answer(message, self.strings("args"))
return
try:
await self._client.edit_permissions(
chat,
user,
until_date=0,
**{right: True for right in BANNED_RIGHTS.keys()},
)
msg = self.strings("unban").format(
utils.get_link(user), get_first_name(user)
)
await utils.answer(message, msg)
if self.get("logchat"):
await self._client.send_message(
self.get("logchat"),
self.strings("unban_log").format(
utils.get_link(user),
get_first_name(user),
utils.get_link(chat),
get_first_name(chat),
),
)
except UserAdminInvalidError:
await utils.answer(message, self.strings("not_admin"))
return
@error_handler
async def protectscmd(self, message: Message):
"""typing.List available filters"""
await utils.answer(
message,
(
self.strings("protections")
if self.api._inited
else "\n".join(
[
line
for line in self.strings("protections").splitlines()
if "antinsfw" not in line.lower()
and "report" not in line.lower()
]
)
),
)
@error_handler
async def fedscmd(self, message: Message):
"""typing.List federations"""
res = self.strings("feds_header")
if not self.api.feds:
await utils.answer(message, self.strings("no_federations"))
return
for shortname, config in self.api.feds.copy().items():
res += f" ☮️ {config['name']} ({shortname})"
for chat in config["chats"]:
try:
if str(chat).isdigit():
chat = int(chat)
c = await self._client.get_entity(chat)
except Exception:
continue
res += (
"\n - {c.title}"
)
res += (
"\n 👮♀️"
f" {len(config.get('warns', []))} warns\n\n"
)
await utils.answer(message, res)
@error_handler
@chat_command
async def fedcmd(self, message: Message):
""" - Info about federation"""
args = utils.get_args_raw(message)
chat = utils.get_chat_id(message)
fed = await self.find_fed(message)
if (not args or args not in self.api.feds) and not fed:
await utils.answer(message, self.strings("no_fed"))
return
if not args or args not in self.api.feds:
args = fed
res = self.strings("fed")
fed = args
admins = ""
for admin in self.api.feds[fed]["admins"]:
try:
if str(admin).isdigit():
admin = int(admin)
user = await self._client.get_entity(admin)
except Exception:
continue
name = get_full_name(user)
status = (
" 🧃 online"
if isinstance(getattr(user, "status", None), UserStatusOnline)
else ""
)
admins += (
f' 👤 {name}{status}\n'
)
chats = ""
channels = ""
for chat in self.api.feds[fed]["chats"]:
try:
if str(chat).isdigit():
chat = int(chat)
c = await self._client.get_entity(chat)
except Exception:
continue
if str(chat) in self._linked_channels:
try:
channel = await self._client.get_entity(
self._linked_channels[str(chat)]
)
channels += (
" 📣 {utils.escape_html(channel.title)}\n'
)
except Exception:
pass
chats += (
" 🫂 {utils.escape_html(c.title)}\n'
)
await utils.answer(
message,
res.format(
self.api.feds[fed]["name"],
chats or "-",
channels or "-",
admins or "-",
len(self.api.feds[fed].get("warns", [])),
),
)
@error_handler
@chat_command
async def pchatcmd(self, message: Message):
"""typing.List protection for current chat"""
chat_id = utils.get_chat_id(message)
try:
await self.inline.form(
message=message,
**(await self.get_config(chat_id)),
manual_security=True,
silent=True,
)
except KeyError:
await utils.answer(message, self.strings("no_protects"))
@error_handler
@chat_command
async def warncmd(self, message: Message):
""" - Warn user"""
chat = await message.get_chat()
if not chat.admin_rights and not chat.creator:
await utils.answer(message, self.strings("not_admin"))
return
args = utils.get_args_raw(message)
if " -f" in args:
args = args.replace(" -f", "")
force = True
else:
force = False
reply = await message.get_reply_message()
user = None
if reply:
user = await self._client.get_entity(reply.sender_id)
reason = args or self.strings("no_reason")
else:
try:
u = args.split(maxsplit=1)[0]
if u.isdigit():
u = int(u)
user = await self._client.get_entity(u)
except IndexError:
await utils.answer(message, self.strings("args"))
return
try:
reason = args.split(maxsplit=1)[1]
except IndexError:
reason = self.strings("no_reason")
fed = await self.find_fed(message)
if not fed:
await utils.answer(message, self.strings("no_fed"))
return
if not force and user.id in list(map(int, self.api.feds[fed]["fdef"])):
await utils.answer(message, self.strings("fdef403").format("warn"))
return
self.api.request(
{
"action": "warn user",
"args": {
"uid": self.api.feds[fed]["uid"],
"user": user.id,
"reason": reason,
},
},
message,
)
warns = self.api.feds[fed].get("warns", {}).get(str(user.id), []) + [reason]
if len(warns) >= self.config["warns_limit"]:
user_name = get_first_name(user)
chats = self.api.feds[fed]["chats"]
for c in chats:
if str(c).isdigit():
c = int(str(c))
await self._client(
EditBannedRequest(
c,
user,
ChatBannedRights(
until_date=time.time() + 60**2 * 24 * 7,
send_messages=True,
),
)
)
if c == utils.get_chat_id(message):
await self._client.send_message(
c,
self.strings("warns_limit").format(
utils.get_link(user),
user_name,
"muted him in federation for 7 days",
),
)
if message.out:
await message.delete()
self.api.request(
{
"action": "clear all user warns",
"args": {"uid": self.api.feds[fed]["uid"], "user": user.id},
},
message,
)
else:
msg = self.strings("fwarn", message).format(
utils.get_link(user),
get_first_name(user),
len(warns),
self.config["warns_limit"],
reason,
self.get("punish_suffix", ""),
)
if self._is_inline:
punishment_info = {
"reply_markup": {
"text": self.strings("btn_unwarn"),
"data": f"dw/{utils.get_chat_id(message)}/{user.id}",
},
}
if self.get("logchat"):
await utils.answer(message, msg)
await self.inline.form(
text=self.strings("fwarn", message).format(
utils.get_link(user),
get_first_name(user),
len(warns),
self.config["warns_limit"],
reason,
"",
),
message=self.get("logchat"),
**punishment_info,
silent=True,
)
else:
await self.inline.form(
text=msg, message=message, **punishment_info, silent=True
)
else:
await utils.answer(message, msg)
@error_handler
@chat_command
async def warnscmd(self, message: Message):
"""[user] - Show warns in chat \\ of user"""
chat_id = utils.get_chat_id(message)
fed = await self.find_fed(message)
async def check_member(user_id):
try:
await self._client.get_permissions(chat_id, user_id)
return True
except Exception:
return False
if not fed:
await utils.answer(message, self.strings("no_fed"))
return
warns = self.api.feds[fed].get("warns", {})
if not warns:
await utils.answer(message, self.strings("no_fed_warns"))
return
async def send_user_warns(usid):
try:
if int(usid) < 0:
usid = int(str(usid)[4:])
except Exception:
pass
if not warns:
await utils.answer(message, self.strings("no_fed_warns"))
return
if str(usid) not in warns or not warns[str(usid)]:
user_obj = await self._client.get_entity(usid)
await utils.answer(
message,
self.strings("no_warns").format(
utils.get_link(user_obj), get_full_name(user_obj)
),
)
else:
user_obj = await self._client.get_entity(usid)
_warns = ""
processed = []
for warn in warns[str(usid)].copy():
if warn in processed:
continue
processed += [warn]
_warns += (
"🛑 "
+ warn
+ (
f" [x{warns[str(usid)].count(warn)}]"
if warns[str(usid)].count(warn) > 1
else ""
)
+ "\n"
)
await utils.answer(
message,
self.strings("warns").format(
utils.get_link(user_obj),
get_full_name(user_obj),
len(warns[str(usid)]),
self.config["warns_limit"],
_warns,
),
)
if not await self.check_admin(chat_id, message.sender_id):
await send_user_warns(message.sender_id)
else:
reply = await message.get_reply_message()
args = utils.get_args_raw(message)
if not reply and not args:
res = self.strings("warns_adm_fed")
for user, _warns in warns.copy().items():
try:
user_obj = await self._client.get_entity(int(user))
except Exception:
continue
if isinstance(user_obj, User):
try:
name = get_full_name(user_obj)
except TypeError:
continue
else:
name = user_obj.title
res += (
"🐺 '
+ name
+ "\n"
)
processed = []
for warn in _warns.copy():
if warn in processed:
continue
processed += [warn]
res += (
" 🏴 "
+ warn
+ (
f" [x{_warns.count(warn)}]"
if _warns.count(warn) > 1
else ""
)
+ "\n"
)
await utils.answer(message, res)
return
elif reply:
await send_user_warns(reply.sender_id)
elif args:
await send_user_warns(args)
@error_handler
@chat_command
async def delwarncmd(self, message: Message):
""" - Forgave last warn"""
args = utils.get_args_raw(message)
reply = await message.get_reply_message()
user = None
if reply:
user = await self._client.get_entity(reply.sender_id)
else:
if args.isdigit():
args = int(args)
try:
user = await self._client.get_entity(args)
except IndexError:
await utils.answer(message, self.strings("args"))
return
fed = await self.find_fed(message)
if not fed:
await utils.answer(message, self.strings("no_fed"))
return
self.api.request(
{
"action": "forgive user warn",
"args": {"uid": self.api.feds[fed]["uid"], "user": user.id},
},
message,
)
msg = self.strings("dwarn_fed").format(
utils.get_link(user), get_first_name(user)
)
await utils.answer(message, msg)
if self.get("logchat", False):
await self._client.send_message(self.get("logchat"), msg)
@error_handler
@chat_command
async def clrwarnscmd(self, message: Message):
""" - Remove all warns from user"""
args = utils.get_args_raw(message)
reply = await message.get_reply_message()
user = None
if reply:
user = await self._client.get_entity(reply.sender_id)
else:
if args.isdigit():
args = int(args)
try:
user = await self._client.get_entity(args)
except IndexError:
await utils.answer(message, self.strings("args"))
return
fed = await self.find_fed(message)
if not fed:
await utils.answer(message, self.strings("no_fed"))
return
self.api.request(
{
"action": "clear all user warns",
"args": {"uid": self.api.feds[fed]["uid"], "user": user.id},
},
message,
)
await utils.answer(
message,
self.strings("clrwarns_fed").format(
utils.get_link(user), get_first_name(user)
),
)
@error_handler
@chat_command
async def clrallwarnscmd(self, message: Message):
"""Remove all warns from current federation"""
fed = await self.find_fed(message)
if not fed:
await utils.answer(message, self.strings("no_fed"))
return
self.api.request(
{
"action": "clear federation warns",
"args": {"uid": self.api.feds[fed]["uid"]},
},
message,
)
await utils.answer(message, self.strings("clrallwarns_fed"))
@error_handler
@chat_command
async def welcomecmd(self, message: Message):
""" - Change welcome text"""
chat_id = utils.get_chat_id(message)
args = utils.get_args_raw(message) or "off"
self.api.request(
{
"action": "update protections",
"args": {"protection": "welcome", "state": args, "chat": chat_id},
},
message,
)
if args and args != "off":
await utils.answer(message, self.strings("welcome").format(args))
else:
await utils.answer(message, self.strings("unwelcome"))
@error_handler
@chat_command
async def fdefcmd(self, message: Message):
""" - Toggle global user invulnerability"""
fed = await self.find_fed(message)
if not fed:
await utils.answer(message, self.strings("no_fed"))
return
args = utils.get_args_raw(message)
reply = await message.get_reply_message()
user = None
if reply:
user = await self._client.get_entity(reply.sender_id)
else:
if str(args).isdigit():
args = int(args)
try:
user = await self._client.get_entity(args)
except Exception:
await utils.answer(message, self.strings("args"))
return
self.api.request(
{
"action": "protect user",
"args": {"uid": self.api.feds[fed]["uid"], "user": user.id},
},
message,
)
await utils.answer(
message,
self.strings("defense").format(
utils.get_link(user),
get_first_name(user),
"on" if str(user.id) not in self.api.feds[fed]["fdef"] else "off",
),
)
@error_handler
@chat_command
async def fsavecmd(self, message: Message):
""" - Save federative note"""
fed = await self.find_fed(message)
if not fed:
await utils.answer(message, self.strings("no_fed"))
return
args = utils.get_args_raw(message)
reply = await message.get_reply_message()
if not reply or not args or not reply.text:
await utils.answer(message, self.strings("fsave_args"))
return
self.api.request(
{
"action": "new note",
"args": {
"uid": self.api.feds[fed]["uid"],
"shortname": args,
"note": reply.text,
},
},
message,
)
await utils.answer(message, self.strings("fsave").format(args))
@error_handler
@chat_command
async def fstopcmd(self, message: Message):
""" - Remove federative note"""
fed = await self.find_fed(message)
if not fed:
await utils.answer(message, self.strings("no_fed"))
return
args = utils.get_args_raw(message)
if not args:
await utils.answer(message, self.strings("fstop_args"))
return
self.api.request(
{
"action": "delete note",
"args": {"uid": self.api.feds[fed]["uid"], "shortname": args},
},
message,
)
await utils.answer(message, self.strings("fstop").format(args))
@error_handler
@chat_command
async def fnotescmd(self, message: Message, from_watcher: bool = False):
"""Show federative notes"""
fed = await self.find_fed(message)
if not fed:
await utils.answer(message, self.strings("no_fed"))
return
res = {}
cache = {}
for shortname, note in self.api.feds[fed].get("notes", {}).items():
if int(note["creator"]) != self._tg_id and from_watcher:
continue
try:
if int(note["creator"]) not in cache:
obj = await self._client.get_entity(int(note["creator"]))
cache[int(note["creator"])] = obj.first_name or obj.title
key = (
f'{cache[int(note["creator"])]}'
)
if key not in res:
res[key] = ""
res[key] += f" {shortname}\n"
except Exception:
key = "unknown"
if key not in res:
res[key] = ""
res[key] += f" {shortname}\n"
notes = "".join(f"\nby {owner}:\n{note}" for owner, note in res.items())
if not notes and not from_watcher:
await utils.answer(message, self.strings("no_notes"))
return
if not notes:
return
await utils.answer(message, self.strings("fnotes").format(notes))
@error_handler
@chat_command
async def fdeflistcmd(self, message: Message):
"""Show global invulnerable users"""
fed = await self.find_fed(message)
if not fed:
await utils.answer(message, self.strings("no_fed"))
return
if not self.api.feds[fed].get("fdef", []):
await utils.answer(message, self.strings("no_defense"))
return
res = ""
for user in self.api.feds[fed].get("fdef", []).copy():
try:
u = await self._client.get_entity(int(user), exp=0)
except Exception:
self.api.request(
{
"action": "protect user",
"args": {"uid": self.api.feds[fed]["uid"], "user": user},
},
message,
)
await asyncio.sleep(0.2)
continue
tit = get_full_name(u)
res += f' 🇻🇦 {tit}\n'
await utils.answer(message, self.strings("defense_list").format(res))
return
@error_handler
@chat_command
async def dmutecmd(self, message: Message):
"""Delete and mute"""
reply = await message.get_reply_message()
await self.mutecmd(message)
await reply.delete()
@error_handler
@chat_command
async def dbancmd(self, message: Message):
"""Delete and ban"""
reply = await message.get_reply_message()
await self.bancmd(message)
await reply.delete()
@error_handler
@chat_command
async def dwarncmd(self, message: Message):
"""Delete and warn"""
reply = await message.get_reply_message()
await self.warncmd(message)
await reply.delete()
@error_handler
@chat_command
async def frenamecmd(self, message: Message):
"""Rename federation"""
args = utils.get_args_raw(message)
fed = await self.find_fed(message)
if not fed:
await utils.answer(message, self.strings("no_fed"))
return
if not args:
await utils.answer(message, self.strings("rename_noargs"))
return
self.api.request(
{
"action": "rename federation",
"args": {"uid": self.api.feds[fed]["uid"], "name": args},
},
message,
)
await utils.answer(
message,
self.strings("rename_success").format(utils.escape_html(args)),
)
@error_handler
@chat_command
async def clnraidcmd(self, message: Message):
""" - Clean raid"""
args = utils.get_args_raw(message)
if not args or not args.isdigit():
await utils.answer(message, self.strings("clnraid_args"))
return
args = min(int(args), 10000)
await self.inline.form(
message=message,
text=self.strings("clnraid_confirm").format(args),
reply_markup=[
{
"text": self.strings("clnraid_yes"),
"callback": self._clnraid,
"args": (utils.get_chat_id(message), args),
},
{
"text": self.strings("clnraid_cancel"),
"action": "close",
},
],
silent=True,
)
async def _clnraid(
self,
call: typing.Union[InlineCall, InlineMessage],
chat_id: int,
quantity: int,
) -> InlineCall:
if call is not None:
await call.edit(self.strings("clnraid_started").format(quantity))
deleted = 0
actually_deleted = 0
async for log_msg in self._client.iter_admin_log(chat_id, join=True):
if deleted >= quantity:
break
deleted += 1
try:
await self.inline.bot.kick_chat_member(
int(f"-100{chat_id}"),
log_msg.user.id,
)
except Exception:
logger.debug("Can't kick member", exc_info=True)
else:
actually_deleted += 1
if call is not None:
await call.edit(self.strings("clnraid_complete").format(actually_deleted))
return call
@error_handler
async def myrightscmd(self, message: Message):
"""typing.List your admin rights in all chats"""
if not PIL_AVAILABLE:
await utils.answer(message, self.strings("pil_unavailable"))
return
message = await utils.answer(message, self.strings("processing_myrights"))
rights = []
async for chat in self._client.iter_dialogs():
ent = chat.entity
if (
not (
isinstance(ent, Chat)
or (isinstance(ent, Channel) and getattr(ent, "megagroup", False))
)
or not ent.admin_rights
or ent.participants_count < 5
):
continue
r = ent.admin_rights
rights += [
[
ent.title if len(ent.title) < 30 else f"{ent.title[:30]}...",
"YES" if r.change_info else "-----",
"YES" if r.delete_messages else "-----",
"YES" if r.ban_users else "-----",
"YES" if r.invite_users else "-----",
"YES" if r.pin_messages else "-----",
"YES" if r.add_admins else "-----",
]
]
await self._client.send_file(
message.peer_id,
self.render_table(
[
[
"Chat",
"change_info",
"delete_messages",
"ban_users",
"invite_users",
"pin_messages",
"add_admins",
]
]
+ rights
),
)
if message.out:
await message.delete()
@error_handler
async def p__antiservice(self, chat_id: typing.Union[str, int], message: Message):
if (
self.api.should_protect(chat_id, "antiservice")
and str(chat_id) not in self._ban_ninja
and getattr(message, "action_message", False)
):
if self.api.should_protect(chat_id, "captcha") and (
getattr(message, "user_joined", False)
or getattr(message, "user_added", False)
):
self._delete_soon += [(message, time.time() + 5 * 60)]
return
try:
await self.inline.bot.delete_message(
int(f"-100{chat_id}"),
message.action_message.id,
)
except Exception:
await message.delete()
async def _update_ban_ninja(self, chat_id: str):
while (
chat_id in self._ban_ninja_forms and self._ban_ninja[chat_id] > time.time()
):
try:
await self._ban_ninja_forms[chat_id].edit(
self.strings("smart_anti_raid_active").format(
(
self.strings("forbid_messages")
if self.config["close_on_raid"]
else ""
),
self._ban_ninja_progress[chat_id],
),
{
"text": self.strings("smart_anti_raid_off"),
"callback": self.disable_smart_anti_raid,
"args": (chat_id,),
},
)
except Exception:
pass
await asyncio.sleep(15)
try:
await self.disable_smart_anti_raid(None, chat_id)
except Exception:
pass
@error_handler
async def p__banninja(
self,
chat_id: typing.Union[str, int],
user_id: typing.Union[str, int],
message: Message,
) -> bool:
if not (
self.api.should_protect(chat_id, "banninja")
and (
getattr(message, "user_joined", False)
or getattr(message, "user_added", False)
)
):
return False
chat_id = str(chat_id)
if chat_id in self._ban_ninja:
if self._ban_ninja[chat_id] > time.time():
self._ban_ninja[chat_id] = time.time() + int(
self.config["banninja_cooldown"]
)
await self.inline.bot.kick_chat_member(int(f"-100{chat_id}"), user_id)
self._ban_ninja_progress[chat_id] += 1
try:
await self.inline.bot.delete_message(
int(f"-100{chat_id}"),
message.action_message.id,
)
except MessageToDeleteNotFound:
pass
except MessageCantBeDeleted:
await self._promote_bot(chat_id)
await self.inline.bot.delete_message(
int(f"-100{chat_id}"),
message.action_message.id,
)
logger.debug(
f"BanNinja is active in chat {chat_id=}, I kicked {user_id=}"
)
return True
await self.disable_smart_anti_raid(None, chat_id)
if chat_id not in self._join_ratelimit:
self._join_ratelimit[chat_id] = []
self._join_ratelimit[chat_id] += [[user_id, round(time.time())]]
processed = []
for u, t in self._join_ratelimit[chat_id].copy():
if u in processed or t + 60 < time.time():
self._join_ratelimit[chat_id].remove([u, t])
else:
processed += [u]
self.set("join_ratelimit", self._join_ratelimit)
if len(self._join_ratelimit[chat_id]) >= self.config["join_ratelimit"]:
if chat_id in self._ban_ninja:
return False
self._ban_ninja[chat_id] = (
round(time.time()) + self.config["banninja_cooldown"]
)
form = await self.inline.form(
self.strings("smart_anti_raid_active").format(
(
self.strings("forbid_messages")
if self.config["close_on_raid"]
else ""
),
self.config["join_ratelimit"],
),
message=int(chat_id),
reply_markup={
"text": self.strings("smart_anti_raid_off"),
"callback": self.disable_smart_anti_raid,
"args": (chat_id,),
},
silent=True,
)
if self.config["close_on_raid"]:
try:
chat = await message.get_chat()
self._ban_ninja_default_rights[chat_id] = chat.default_banned_rights
await self._client(
EditChatDefaultBannedRightsRequest(
chat.id,
ChatBannedRights(
send_messages=True, until_date=2**31 - 1
),
)
)
except Exception:
pass
self._ban_ninja_forms[chat_id] = form
self._ban_ninja_progress[chat_id] = self.config["join_ratelimit"]
self._ban_ninja_tasks[chat_id] = asyncio.ensure_future(
self._update_ban_ninja(chat_id)
)
await (
await self._clnraid(
call=(
await self.inline.form(
self.strings("clnraid_started").format("*loading*"),
message=int(chat_id),
reply_markup={"text": ".", "action": "close"},
silent=True,
)
),
chat_id=int(chat_id),
quantity=self.config["join_ratelimit"],
)
).delete()
messages = []
users = []
for u, m in self._ban_ninja_messages:
if u not in users:
if len(users) > self.config["join_ratelimit"]:
break
users += [u]
messages += [m]
for m in messages:
try:
await self.inline.bot.delete_message(
int(f"-100{utils.get_chat_id(m)}"),
m.id,
)
except MessageToDeleteNotFound:
pass
except MessageCantBeDeleted:
await self._promote_bot(utils.get_chat_id(m))
await self.inline.bot.delete_message(
int(f"-100{utils.get_chat_id(m)}"),
m.id,
)
except Exception:
await m.delete()
try:
await self._client.pin_message(int(chat_id), form.form["message_id"])
except Exception:
pass
return False
async def disable_smart_anti_raid(self, call: InlineCall, chat_id: int):
chat_id = str(chat_id)
if chat_id in self._ban_ninja:
del self._ban_ninja[chat_id]
if call:
await call.edit(self.strings("smart_anti_raid_stopped"))
if call:
await call.answer("Success")
try:
await self._client.unpin_message(
int(chat_id),
self._ban_ninja_forms[str(chat_id)].form["message_id"],
)
except Exception:
pass
if self.config["close_on_raid"]:
try:
await self._client(
EditChatDefaultBannedRightsRequest(
int(chat_id),
self._ban_ninja_default_rights[chat_id],
)
)
del self._ban_ninja_default_rights[chat_id]
except Exception:
pass
await self._client.send_message(
int(chat_id),
self.strings("banninja_report").format(
self._ban_ninja_progress[chat_id]
),
)
if chat_id in self._ban_ninja_forms:
await self._ban_ninja_forms[chat_id].delete()
del self._ban_ninja_forms[chat_id]
if chat_id in self._ban_ninja_progress:
del self._ban_ninja_progress[chat_id]
if chat_id in self._ban_ninja_tasks:
self._ban_ninja_tasks[chat_id].cancel()
del self._ban_ninja_tasks[chat_id]
return
await call.answer("Already stopped")
@error_handler
async def p__antiraid(
self,
chat_id: typing.Union[str, int],
user_id: typing.Union[str, int],
user: typing.Union[User, Channel],
message: Message,
chat: typing.Union[Chat, Channel],
) -> bool:
if self.api.should_protect(chat_id, "antiraid") and (
getattr(message, "user_joined", False)
or getattr(message, "user_added", False)
):
action = self.api.chats[str(chat_id)]["antiraid"][0]
if action == "kick":
await self._client.send_message(
"me",
self.strings("antiraid").format(
"kicked",
user.id,
get_full_name(user),
utils.escape_html(chat.title),
),
)
await self._client.kick_participant(chat_id, user)
elif action == "ban":
await self._client.send_message(
"me",
self.strings("antiraid").format(
"banned",
user.id,
get_full_name(user),
utils.escape_html(chat.title),
),
)
await self.ban(chat, user, 0, "antiraid")
elif action == "mute":
await self._client.send_message(
"me",
self.strings("antiraid").format(
"muted",
user.id,
get_full_name(user),
utils.escape_html(chat.title),
),
)
await self.mute(chat, user, 0, "antiraid")
return True
return False
async def _captcha_invalid(
self,
call: InlineCall,
chat_id: int,
user: User,
):
if call.from_user.id != user.id:
await call.answer("Not for you....")
return
with contextlib.suppress(KeyError):
del self._captcha_db[chat_id][user.id]
await call.answer("Sorry ☹️")
await self.punish(
chat_id,
user,
"captcha_failed",
self.api.chats[str(chat_id)]["captcha"][0],
get_full_name(user),
fulltime=True,
message=None,
)
with contextlib.suppress(Exception):
await self._captcha_messages[chat_id][user.id].delete()
async def _captcha_valid(self, call: InlineCall, chat_id: int, user_id: int):
if call.from_user.id != user_id:
await call.answer("Not for you....")
return
if self._captcha_db[chat_id][user_id]["unmute"]:
await self._client.edit_permissions(
int(chat_id),
int(user_id),
until_date=0,
send_messages=True,
)
with contextlib.suppress(KeyError):
del self._captcha_db[chat_id][user_id]
with contextlib.suppress(Exception):
await self._captcha_messages[chat_id][user_id].delete()
await call.answer("Welcome!")
@error_handler
async def p__captcha(
self,
chat_id: typing.Union[str, int],
user_id: typing.Union[str, int],
user: typing.Union[User, Channel],
message: Message,
chat: Chat,
) -> bool:
if not (
self.api.should_protect(chat_id, "captcha")
and str(chat_id) not in self._ban_ninja
and (
getattr(message, "user_joined", False)
or getattr(message, "user_added", False)
)
):
return False
valid = utils.rand(6)
invalid = [utils.rand(6) for _ in range(5)]
markup = [
{
"text": i,
"callback": self._captcha_invalid,
"args": (chat_id, user),
}
for i in invalid
] + [
{"text": valid, "callback": self._captcha_valid, "args": (chat_id, user_id)}
]
random.shuffle(markup)
markup = utils.chunks(markup, 2)
unmute = False
if not (
await self._client.get_permissions(int(chat_id), int(user_id))
).is_banned:
unmute = True
await self.mute(chat, user, 15 * 60, "captcha_processing", silent=True)
for _ in range(5):
try:
m = await self.inline.form(
message=(await message.reply("🪄 Loading captcha...")),
text=self.strings("complete_captcha").format(
user.id,
get_full_name(user),
),
photo=f"https://hikarichat.hikariatama.ru/captcha/{valid}",
reply_markup=markup,
disable_security=True,
)
except WebpageCurlFailedError:
await asyncio.sleep(0.5)
else:
break
if chat_id not in self._captcha_db:
self._captcha_db[chat_id] = {}
if chat_id not in self._captcha_messages:
self._captcha_messages[chat_id] = {}
self._captcha_db[chat_id][user_id] = {
"time": time.time() + 5 * 60,
"user": user,
"unmute": unmute,
}
self._captcha_messages[chat_id][user_id] = m
self._ban_ninja_messages = [(user_id, m)] + self._ban_ninja_messages
@error_handler
async def p__cas(
self,
chat_id: typing.Union[str, int],
user_id: typing.Union[str, int],
user: typing.Union[User, Channel],
message: Message,
chat: Chat,
) -> bool:
if not (
self.api.should_protect(chat_id, "cas")
and str(chat_id) not in self._ban_ninja
and (
getattr(message, "user_joined", False)
or getattr(message, "user_added", False)
)
):
return False
return (
self.api.chats[str(chat_id)]["cas"][0]
if (
(
await utils.run_sync(
requests.get,
f"https://api.cas.chat/check?user_id={user_id}",
)
)
.json()
.get("result", {})
.get("offenses", False)
)
else False
)
@error_handler
async def p__welcome(
self,
chat_id: typing.Union[str, int],
user_id: typing.Union[str, int],
user: typing.Union[User, Channel],
message: Message,
chat: Chat,
) -> bool:
if not (
self.api.should_protect(chat_id, "welcome")
and str(chat_id) not in self._ban_ninja
and (
getattr(message, "user_joined", False)
or getattr(message, "user_added", False)
)
):
return False
m = await self._client.send_message(
chat_id,
self.api.chats[str(chat_id)]["welcome"][0]
.replace("{user}", get_full_name(user))
.replace("{chat}", utils.escape_html(chat.title))
.replace(
"{mention}",
f'{get_full_name(user)}',
),
reply_to=message.action_message.id,
)
self._ban_ninja_messages = [(user_id, m)] + self._ban_ninja_messages
return True
@error_handler
async def p__report(
self,
chat_id: typing.Union[str, int],
user_id: typing.Union[str, int],
user: typing.Union[User, Channel],
message: Message,
):
if not self.api.should_protect(chat_id, "report") or not getattr(
message,
"reply_to_msg_id",
False,
):
return
reply = await message.get_reply_message()
if (
str(user_id) not in self._ratelimit["report"]
or self._ratelimit["report"][str(user_id)] < time.time()
) and (
(
message.raw_text.startswith("#report")
or message.raw_text.startswith("/report")
)
and reply
):
fed = await self.find_fed(message)
if fed in self.api.feds and reply.sender_id in list(
map(int, self.api.feds[fed]["fdef"])
):
await utils.answer(message, self.strings("fdef403").format("report"))
return
chat = await message.get_chat()
reason = (
message.raw_text.split(maxsplit=1)[1]
if message.raw_text.count(" ") >= 1
else self.strings("no_reason")
)
self.api.request(
{
"action": "report",
"args": {
"chat": chat_id,
"reason": reason,
"link": await utils.get_message_link(reply, chat),
"user_link": utils.get_link(user),
"user_name": get_full_name(user),
"text_thumbnail": (getattr(reply, "raw_text", "") or "")[
:1024
] or "",
},
},
message,
)
msg = self.strings("reported").format(
utils.get_link(user),
get_full_name(user),
reason,
)
if self._is_inline:
m = await self._client.send_message(
chat.id,
"🌘 Reporting message to admins...",
reply_to=message.reply_to_msg_id,
)
await self.inline.form(
message=m,
text=msg,
reply_markup=[
[
{
"text": self.strings("btn_mute"),
"data": f"m/{chat.id}/{reply.sender_id}#{reply.id}",
},
{
"text": self.strings("btn_ban"),
"data": f"b/{chat.id}/{reply.sender_id}#{reply.id}",
},
],
[
{
"text": self.strings("btn_fban"),
"data": f"fb/{chat.id}/{reply.sender_id}#{reply.id}",
},
{
"text": self.strings("btn_del"),
"data": f"d/{chat.id}/{reply.sender_id}#{reply.id}",
},
],
],
silent=True,
)
else:
await (utils.answer if message else self._client.send_message)(
message or chat.id,
msg,
)
self._ratelimit["report"][str(user_id)] = time.time() + 30
try:
await self.inline.bot.delete_message(
int(f"-100{chat_id}"),
getattr(message, "action_message", message).id,
)
except MessageToDeleteNotFound:
pass
except MessageCantBeDeleted:
await self._promote_bot(chat_id)
await self.inline.bot.delete_message(
int(f"-100{chat_id}"),
getattr(message, "action_message", message).id,
)
@error_handler
async def _promote_bot(self, chat_id: int):
try:
await self._client(
InviteToChannelRequest(
int(chat_id),
[self.inline.bot_username],
)
)
except Exception:
logger.warning(
"Unable to invite cleaner to chat. Maybe he's already there?"
)
try:
await self._client(
EditAdminRequest(
channel=int(chat_id),
user_id=self.inline.bot_username,
admin_rights=ChatAdminRights(ban_users=True, delete_messages=True),
rank="HikariChat",
)
)
except Exception:
logger.exception("Cleaner promotion failed!")
@error_handler
async def p__antiflood(
self,
chat_id: typing.Union[str, int],
user_id: typing.Union[str, int],
user: typing.Union[User, Channel],
message: Message,
) -> typing.Union[bool, str]:
if self.api.should_protect(chat_id, "antiflood"):
if str(chat_id) not in self._flood_cache:
self._flood_cache[str(chat_id)] = {}
if str(user_id) not in self._flood_cache[str(chat_id)]:
self._flood_cache[str(chat_id)][str(user_id)] = []
for item in self._flood_cache[str(chat_id)][str(user_id)].copy():
if time.time() - item > self.flood_timeout:
self._flood_cache[str(chat_id)][str(user_id)].remove(item)
self._flood_cache[str(chat_id)][str(user_id)].append(
round(time.mktime(message.date.timetuple()))
if getattr(message, "date", False)
else round(time.time())
)
self.set("flood_cache", self._flood_cache)
if (
len(self._flood_cache[str(chat_id)][str(user_id)])
>= self.flood_threshold
):
return self.api.chats[str(chat_id)]["antiflood"][0]
return False
@error_handler
async def p__antichannel(
self,
chat_id: typing.Union[str, int],
user_id: typing.Union[str, int],
user: typing.Union[User, Channel],
message: Message,
) -> bool:
if (
self.api.should_protect(chat_id, "antichannel")
and getattr(message, "sender_id", 0) < 0
):
await self.ban(chat_id, user_id, 0, "", None, True)
try:
await self.inline.bot.delete_message(int(f"-100{chat_id}"), message.id)
except Exception:
await message.delete()
return True
return False
@error_handler
async def p__antigif(
self,
chat_id: typing.Union[str, int],
user_id: typing.Union[str, int],
user: typing.Union[User, Channel],
message: Message,
) -> bool:
if self.api.should_protect(chat_id, "antigif"):
try:
if (
message.media
and DocumentAttributeAnimated() in message.media.document.attributes
):
await message.delete()
return True
except Exception:
pass
return False
@error_handler
async def p__antispoiler(
self,
chat_id: typing.Union[str, int],
user_id: typing.Union[str, int],
user: typing.Union[User, Channel],
message: Message,
) -> bool:
if self.api.should_protect(chat_id, "antispoiler"):
try:
if any(isinstance(_, MessageEntitySpoiler) for _ in message.entities):
await message.delete()
return True
except Exception:
pass
return False
@error_handler
async def p__antiexplicit(
self,
chat_id: typing.Union[str, int],
user_id: typing.Union[str, int],
user: typing.Union[User, Channel],
message: Message,
) -> typing.Union[bool, str]:
if self.api.should_protect(chat_id, "antiexplicit"):
text = getattr(message, "raw_text", "")
P = "пПnPp"
I = "иИiI1uІИ́Їіи́ї" # noqa: E741
E = "еЕeEЕ́е́"
D = "дДdD"
Z = "зЗ3zZ3"
M = "мМmM"
U = "уУyYuUУ́у́"
O = "оОoO0О́о́" # noqa: E741
L = "лЛlL1"
A = "аАaAА́а́@"
N = "нНhH"
G = "гГgG"
K = "кКkK"
R = "рРpPrR"
H = "хХxXhH"
YI = "йЙyуУY"
YA = "яЯЯ́я́"
YO = "ёЁ"
YU = "юЮЮ́ю́"
B = "бБ6bB"
T = "тТtT1"
HS = "ъЪ"
SS = "ьЬ"
Y = "ыЫ"
occurrences = re.findall(
rf"""\b[0-9]*(\w*[{P}][{I}{E}][{Z}][{D}]\w*|(?:[^{I}{U}\s]+|{N}{I})?(? typing.Union[bool, str]:
if not self.api.should_protect(chat_id, "antinsfw"):
return False
media = False
if getattr(message, "sticker", False):
media = message.sticker
elif getattr(message, "media", False):
media = message.media
if not media:
return False
photo = io.BytesIO()
await self._client.download_media(message.media, photo)
photo.seek(0)
if imghdr.what(photo) not in self.api.variables["image_types"]:
return False
response = await self.api.nsfw(photo)
if response != "nsfw":
return False
todel = []
async for _ in self._client.iter_messages(
message.peer_id,
reverse=True,
offset_id=message.id - 1,
):
todel += [_]
if _.sender_id != message.sender_id:
break
await self._client.delete_messages(
message.peer_id,
message_ids=todel,
revoke=True,
)
return self.api.chats[str(chat_id)]["antinsfw"][0]
@error_handler
async def p__antitagall(
self,
chat_id: typing.Union[str, int],
user_id: typing.Union[str, int],
user: typing.Union[User, Channel],
message: Message,
) -> typing.Union[bool, str]:
return (
self.api.chats[str(chat_id)]["antitagall"][0]
if self.api.should_protect(chat_id, "antitagall")
and getattr(message, "text", False)
and message.text.count("tg://user?id=") >= 5
else False
)
@error_handler
async def p__antihelp(
self,
chat_id: typing.Union[str, int],
user_id: typing.Union[str, int],
user: typing.Union[User, Channel],
message: Message,
) -> bool:
if not self.api.should_protect(chat_id, "antihelp") or not getattr(
message, "text", False
):
return False
search = message.text
if "@" in search:
search = search[: search.find("@")]
if (
not search.split()
or search.split()[0][1:] not in self.api.variables["blocked_commands"]
):
return False
await message.delete()
return True
@error_handler
async def p__antiarab(
self,
chat_id: typing.Union[str, int],
user_id: typing.Union[str, int],
user: typing.Union[User, Channel],
message: Message,
) -> typing.Union[bool, str]:
return (
self.api.chats[str(chat_id)]["antiarab"][0]
if (
self.api.should_protect(chat_id, "antiarab")
and (
getattr(message, "user_joined", False)
or getattr(message, "user_added", False)
)
and (
len(re.findall("[\u4e00-\u9fff]+", get_full_name(user))) != 0
or len(re.findall("[\u0621-\u064a]+", get_full_name(user))) != 0
)
)
else False
)
@error_handler
async def p__antizalgo(
self,
chat_id: typing.Union[str, int],
user_id: typing.Union[str, int],
user: typing.Union[User, Channel],
message: Message,
) -> typing.Union[bool, str]:
return (
self.api.chats[str(chat_id)]["antizalgo"][0]
if (
self.api.should_protect(chat_id, "antizalgo")
and len(
re.findall(
"[\u200f\u200e\u0300-\u0361\u0316-\u0362\u0334-\u0338\u0363-\u036f\u3164\ud83d\udd07\u0020\u00a0\u2000-\u2009\u200a\u2028\u205f\u1160\ufff4]",
get_full_name(user),
)
)
/ len(get_full_name(user))
>= 0.6
)
else False
)
@error_handler
async def p__bnd(
self,
chat_id: typing.Union[str, int],
user_id: typing.Union[str, int],
user: typing.Union[User, Channel],
message: Message,
) -> typing.Union[bool, str]:
if not self.api.should_protect(chat_id, "bnd"):
return False
if (
self.get("bnd_cache", {}).get(str(chat_id), {}).get(str(user_id), 0)
>= time.time()
):
return False
try:
assert (
(
await self.inline.bot.get_chat_member(
int(f"-100{chat_id}"),
int(user_id),
)
).status
) not in {"left", "kicked"}
except Exception:
return self.api.chats[str(chat_id)]["bnd"][0]
else:
bnd_cache = self.get("bnd_cache", {})
bnd_cache.setdefault(str(chat_id), {}).update(
{str(user_id): round(time.time()) + 60}
)
self.set("bnd_cache", bnd_cache)
return False
@error_handler
async def p__antistick(
self,
chat_id: typing.Union[str, int],
user_id: typing.Union[str, int],
user: typing.Union[User, Channel],
message: Message,
) -> typing.Union[bool, str]:
if not self.api.should_protect(chat_id, "antistick") or not (
getattr(message, "sticker", False)
or getattr(message, "media", False)
and isinstance(message.media, MessageMediaUnsupported)
):
return False
sender = user.id
if sender not in self._sticks_ratelimit:
self._sticks_ratelimit[sender] = []
self._sticks_ratelimit[sender] += [round(time.time())]
for timing in self._sticks_ratelimit[sender].copy():
if time.time() - timing > 60:
self._sticks_ratelimit[sender].remove(timing)
if len(self._sticks_ratelimit[sender]) > self._sticks_limit:
return self.api.chats[str(chat_id)]["antistick"][0]
@error_handler
async def p__antilagsticks(
self,
chat_id: typing.Union[str, int],
user_id: typing.Union[str, int],
user: typing.Union[User, Channel],
message: Message,
) -> typing.Union[bool, str]:
res = (
self.api.should_protect(chat_id, "antilagsticks")
and getattr(message, "sticker", False)
and getattr(message.sticker, "id", False)
in self.api.variables["destructive_sticks"]
)
if res:
await message.delete()
return res
@error_handler
async def watcher(self, message: Message):
self._global_queue += [message]
@error_handler
async def _global_queue_handler(self):
while True:
while self._global_queue:
await self._global_queue_handler_process(self._global_queue.pop(0))
for chat_id, info in self._captcha_db.copy().items():
for user_id, captcha in info.copy().items():
if captcha["time"] < time.time():
del self._captcha_db[chat_id][user_id]
await self.punish(
chat_id,
captcha["user"],
"captcha_timeout",
self.api.chats[str(chat_id)]["captcha"][0],
get_full_name(captcha["user"]),
fulltime=True,
message=None,
)
with contextlib.suppress(Exception):
await self._captcha_messages[chat_id][user_id].delete()
for message, deletion_ts in self._delete_soon.copy():
if deletion_ts < time.time():
with contextlib.suppress(Exception):
await message.delete()
with contextlib.suppress(Exception):
self._delete_soon.remove((message, deletion_ts))
await asyncio.sleep(0.01)
@error_handler
async def _global_queue_handler_process(self, message: Message):
if not isinstance(getattr(message, "chat", 0), (Chat, Channel)):
return
chat_id = utils.get_chat_id(message)
if (
isinstance(getattr(message, "chat", 0), Channel)
and not getattr(message, "megagroup", False)
and int(chat_id) in reverse_dict(self._linked_channels)
):
actual_chat = str(reverse_dict(self._linked_channels)[int(chat_id)])
await self.p__antiservice(actual_chat, message)
return
await self.p__antiservice(chat_id, message)
try:
user_id = (
getattr(message, "sender_id", False)
or message.action_message.action.users[0]
)
except Exception:
try:
user_id = message.action_message.action.from_id.user_id
except Exception:
try:
user_id = message.from_id.user_id
except Exception:
try:
user_id = message.action_message.from_id.user_id
except Exception:
try:
user_id = message.action.from_user.id
except Exception:
try:
user_id = (await message.get_user()).id
except Exception:
logger.debug(
f"Can't extract entity from event {type(message)}"
)
return
user_id = (
int(str(user_id)[4:]) if str(user_id).startswith("-100") else int(user_id)
)
if await self.p__banninja(chat_id, user_id, message):
return
fed = await self.find_fed(message)
if fed in self.api.feds:
if (
getattr(message, "raw_text", False)
and (
str(user_id) not in self._ratelimit["notes"]
or self._ratelimit["notes"][str(user_id)] < time.time()
)
and not message.raw_text.startswith(self.get_prefix())
):
logger.debug("Checking message for notes...")
if message.raw_text.lower().strip() in ["#заметки", "#notes", "/notes"]:
self._ratelimit["notes"][str(user_id)] = time.time() + 3
if any(
str(note["creator"]) == str(self._tg_id)
for _, note in self.api.feds[fed]["notes"].items()
):
await self.fnotescmd(
await message.reply(
f"{self.get_prefix()}fnotes"
),
True,
)
for note, note_info in self.api.feds[fed]["notes"].items():
if str(note_info["creator"]) != str(self._tg_id):
continue
if note.lower() in message.raw_text.lower():
txt = note_info["text"]
self._ratelimit["notes"][str(user_id)] = time.time() + 3
if not txt.startswith("@inline"):
await utils.answer(message, txt)
break
txt = "\n".join(txt.splitlines()[1:])
buttons = []
button_re = r"\[(.+)\]\((https?://.*)\)"
txt_r = []
for line in txt.splitlines():
if re.match(button_re, re.sub(r"<.*?>", "", line).strip()):
match = re.search(
button_re, re.sub(r"<.*?>", "", line).strip()
)
buttons += [
[{"text": match.group(1), "url": match.group(2)}]
]
else:
txt_r += [line]
if not buttons:
await utils.answer(message, txt)
break
await self.inline.form(
message=message,
text="\n".join(txt_r),
reply_markup=buttons,
silent=True,
)
if int(user_id) in (
list(map(int, self.api.feds[fed]["fdef"]))
+ list(self._linked_channels.values())
):
return
if str(chat_id) not in self.api.chats or not self.api.chats[str(chat_id)]:
return
try:
user = await self._client.get_entity(user_id)
except ValueError:
return
chat = await message.get_chat()
user_name = get_full_name(user)
args = (chat_id, user_id, user, message)
await self.p__report(*args)
try:
if (
await self._client.get_perms_cached(chat_id, message.sender_id)
).is_admin:
return
except Exception:
pass
if await self.p__antiraid(*args, chat):
return
cas_result = False
if self.api.should_protect(chat_id, "cas"):
cas_result = await self.p__cas(*args, chat)
if cas_result:
await self.punish(
chat_id,
user,
"cas",
cas_result,
user_name,
message=message,
)
return
r = await self.p__antiarab(*args)
if r:
await self.punish(
chat_id,
user,
"arabic_nickname",
r,
user_name,
message=message,
)
return
if await self.p__welcome(*args, chat) and not self.api.should_protect(
chat_id,
"captcha",
):
return
if await self.p__captcha(*args, chat):
return
if getattr(message, "action", ""):
return
await self.p__report(*args)
r = await self.p__bnd(*args)
if r:
await self.punish(chat_id, user, "bnd", r, user_name, message=message)
return
r = await self.p__antiflood(*args)
if r:
await self.punish(chat_id, user, "flood", r, user_name, message=message)
return
if await self.p__antichannel(*args):
return
r = await self.p__antizalgo(*args)
if r:
await self.punish(chat_id, user, "zalgo", r, user_name, message=message)
return
if await self.p__antigif(*args):
return
r = await self.p__antilagsticks(*args)
if r:
await self.punish(
chat_id, user, "destructive_stick", "ban", user_name, message=message
)
return
r = await self.p__antistick(*args)
if r:
await self.punish(chat_id, user, "stick", r, user_name, message=message)
return
if await self.p__antispoiler(*args):
return
r = await self.p__antiexplicit(*args)
if r:
await self.punish(chat_id, user, "explicit", r, user_name, message=message)
return
r = await self.p__antinsfw(*args)
if r:
await self.punish(
chat_id,
user,
"nsfw_content",
r,
user_name,
message=message,
)
return
r = await self.p__antitagall(*args)
if r:
await self.punish(chat_id, user, "tagall", r, user_name, message=message)
return
await self.p__antihelp(*args)
async def client_ready(
self,
client: "CustomTelegramClient", # type: ignore
db: "hikka.database.Database", # type: ignore
):
"""Entry point"""
global api
self._is_inline = self.inline.init_complete
self._sticks_limit = 7
self._join_ratelimit = self.get("join_ratelimit", {})
self._flood_cache = self.get("flood_cache", {})
self.api = api
await api.init(client, db, self)
for protection in self.api.variables["protections"]:
setattr(self, f"{protection}cmd", self.protection_template(protection))
# We can override class docstings because of abc meta
self.__doc__ = (
"Advanced chat admin toolkit\nNow became free...\n\n💻 Developer:"
" t.me/hikariatama\n📣"
" Downloaded from: @hikarimods\n\n"
+ f"📦Version: {version}\n"
+ ("🗃 Local" if not self.api._inited else "⭐️ Full")
)
self._pt_task = asyncio.ensure_future(self._global_queue_handler())
if PIL_AVAILABLE:
asyncio.ensure_future(self._download_font())
async def _download_font(self):
self.font = (
await utils.run_sync(
requests.get,
"https://github.com/hikariatama/assets/raw/master/EversonMono.ttf",
)
).content