# Deletes messages from certain users
# Copyright © 2022 https://t.me/nalinor
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
# meta developer: @nalinormods
import logging
import re
import time
from typing import Any, List
from telethon import TelegramClient
from telethon.hints import Entity
from telethon.tl.custom import Message
from telethon.tl.functions.channels import JoinChannelRequest
from telethon.utils import get_peer_id
from .. import loader, security, utils
logger = logging.getLogger(__name__)
USER_ID_RE = re.compile(r"^(-100)?\d+$")
# pylint: disable=invalid-name
def s2time(string) -> int:
"""Parse time from text `string`"""
r = {} # results
for time_type in ["mon", "w", "d", "h", "m", "s"]:
try:
r[time_type] = int(re.search(rf"(\d+)\s*{time_type}", string)[1])
except TypeError:
r[time_type] = 0
return (
r["mon"] * 86400 * 30
+ r["w"] * 86400 * 7
+ r["d"] * 86400
+ r["h"] * 3600
+ r["m"] * 60
+ r["s"]
)
# pylint: disable=consider-using-f-string
def get_link(user: Entity) -> str:
"""Return permanent link to `user`"""
return "{name}".format(
id=user.id,
name=utils.escape_html(
user.first_name if hasattr(user, "first_name") else user.title
),
)
def plural_number(n: int) -> str:
"""Pluralize number `n`"""
return (
"one"
if n % 10 == 1 and n % 100 != 11
else "few"
if 2 <= n % 10 <= 4 and (n % 100 < 10 or n % 100 >= 20)
else "many"
)
# noinspection PyCallingNonCallable,PyAttributeOutsideInit
# pylint: disable=not-callable,attribute-defined-outside-init,invalid-name
@loader.tds
class SwmuteMod(loader.Module):
"""Deletes messages from certain users"""
strings = {
"name": "Swmute",
"author": "@nalinormods",
"not_group": "🚫 This command is for groups only",
"muted": "🔇 Swmuted {user} for {time}",
"muted_forever": "🔇 Swmuted {user} indefinitely",
"unmuted": "🔉 Removed swmute from {user}",
"not_muted": "🚫 This user wasn't muted",
"invalid_user": "🚫 Provided username/id {entity} is invalid",
"no_mute_target": "🧐 Whom should I mute?",
"no_unmute_target": "🧐 Whom should I unmute?",
"mutes_empty": "😔 There's no mutes in this group",
"muted_users": "📃 Swmuted users at the moment:\n{names}",
"cleared": "🧹 Cleared mutes in this chat",
"cleared_all": "🧹 Cleared all mutes",
"s_one": "second",
"s_few": "seconds",
"s_many": "seconds",
"m_one": "minute",
"m_few": "minutes",
"m_many": "minutes",
"h_one": "hour",
"h_few": "hours",
"h_many": "hours",
"d_one": "day",
"d_few": "days",
"d_many": "days",
}
strings_ru = {
"_cls_doc": "Удаляет сообщения от выбранных пользователей",
"_cmd_doc_swmute": " <время> — Добавить пользователя в список swmute",
"_cmd_doc_swunmute": " — Удалить пользователя из списка swmute",
"_cmd_doc_swmutelist": "Получить пользователей в списке swmute",
"_cmd_doc_swmuteclear": (
" — Удалить всех пользователей из списка swmute в этом/всех чатах"
),
"not_group": "🚫 Эта команда предназначена только для групп",
"muted": "🔇 {user} добавлен в список swmute на {time}",
"muted_forever": "🔇 {user} добавлен в список swmute навсегда",
"unmuted": "🔉 {user} удалён из списка swmute",
"not_muted": "🚫 Этот пользователь не был в муте",
"invalid_user": "🚫 Предоставленный юзернейм/айди {entity} некорректный",
"no_mute_target": "🧐 Кого я должен замутить?",
"no_unmute_target": "🧐 Кого я должен размутить?",
"mutes_empty": "😔 В этой группе никто не в муте",
"muted_users": "📃 Пользователи в списке swmute:\n{names}",
"cleared": "🧹 Муты в этой группе очищены",
"cleared_all": "🧹 Все муты очищены",
"s_one": "секунда",
"s_few": "секунды",
"s_many": "секунд",
"m_one": "минута",
"m_few": "минуты",
"m_many": "минут",
"h_one": "час",
"h_few": "часа",
"h_many": "часов",
"d_one": "день",
"d_few": "дня",
"d_many": "дней",
}
async def client_ready(self, client: TelegramClient, db):
"""client_ready hook"""
self.client = client
self.db = db
await client(JoinChannelRequest(channel=self.strings("author")))
self.cleanup()
def get(self, key: str, default: Any = None):
"""Get value from database"""
return self.db.get(self.strings("name"), key, default)
def set(self, key: str, value: Any):
"""Set value in database"""
return self.db.set(self.strings("name"), key, value)
def format_time(self, seconds: int, max_words: int = None) -> str:
"""Format time to human-readable variant"""
words = []
time_dict = {
"d": seconds // 86400,
"h": seconds % 86400 // 3600,
"m": seconds % 3600 // 60,
"s": seconds % 60,
}
for time_type, count in time_dict.items():
if max_words and len(words) >= max_words:
break
if count != 0:
words.append(
f"{count} {self.strings(time_type + '_' + plural_number(count))}"
)
return " ".join(words)
def mute(self, chat_id: int, user_id: int, until_time: int = 0):
"""Add user to mute list"""
chat_id = str(chat_id)
user_id = str(user_id)
mutes = self.get("mutes", {})
mutes.setdefault(chat_id, {})
mutes[chat_id][user_id] = until_time
self.set("mutes", mutes)
logger.debug("Muted user %s in chat %s", user_id, chat_id)
def unmute(self, chat_id: int, user_id: int):
"""Remove user from mute list"""
chat_id = str(chat_id)
user_id = str(user_id)
mutes = self.get("mutes", {})
if chat_id in mutes and user_id in mutes[chat_id]:
mutes[chat_id].pop(user_id)
self.set("mutes", mutes)
logger.debug("Unmuted user %s in chat %s", user_id, chat_id)
def get_mutes(self, chat_id: int) -> List[int]:
"""Get current mutes for specified chat"""
return [
int(user_id)
for user_id, until_time in self.get("mutes", {})
.get(str(chat_id), {})
.items()
if until_time > time.time() or until_time == 0
]
def get_mute_time(self, chat_id: int, user_id: int) -> int:
"""Get mute expiration timestamp"""
return self.get("mutes", {}).get(str(chat_id), {}).get(str(user_id))
def cleanup(self):
"""Cleanup expired mutes"""
mutes = {}
for chat_id, chat_mutes in self.get("mutes", {}).items():
if new_chat_mutes := {
user_id: until_time
for user_id, until_time in chat_mutes.items()
if until_time == 0 or until_time > time.time()
}:
mutes[chat_id] = new_chat_mutes
self.set("mutes", mutes)
def clear_mutes(self, chat_id: int = None):
"""Clear all mutes for given or all chats"""
if chat_id:
mutes = self.get("mutes", {})
mutes.pop(str(chat_id), None)
self.set("mutes", mutes)
else:
self.set("mutes", {})
async def swmutecmd(self, message: Message):
"""