# -*- coding: future_fstrings -*-
# Friendly Telegram (telegram userbot)
# By Magical Unicorn (based on official Anti PM & AFK Friendly Telegram modules)
# Copyright (C) 2020 Magical Unicorn
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
from .. import loader, utils
import logging
import datetime
import time
from telethon import functions, types
logger = logging.getLogger(__name__)
def register(cb):
cb(DoNotDisturbMod())
@loader.tds
class DoNotDisturbMod(loader.Module):
"""
DND (Do Not Disturb) :
-> Prevents people sending you unsolicited private messages.
-> Prevents disturbing when you are unavailable.\n
Commands :
"""
strings = {"name": "DND",
"afk": "I'm AFK right now (since {} ago).",
"afk_back": "I'm goin' BACK !",
"afk_gone": "I'm goin' AFK !",
"afk_no_group_off": "AFK status message enabled for group chats.",
"afk_no_group_on": "AFK status message disabled for group chats.",
"afk_no_pm_off": "AFK status message enabled for PMs.",
"afk_no_pm_on": "AFK status message disabled for PMs.",
"afk_notif_off": "Notifications are now disabled during AFK time.",
"afk_notif_on": "Notifications are now enabled during AFK time.",
"afk_rate_limit_off": "AFK status message rate limit disabled.",
"afk_rate_limit_on": ("AFK status message rate limit enabled."
"\n\nOne AFK status message max will be sent per chat."),
"afk_reason": ("I'm AFK right now (since {} ago)."
"\n\nReason : {}"),
"arg_on_off": "Argument must be 'off' or 'on' !",
"pm_off": ("Automatic answer for denied PMs disabled."
"\n\nUsers are now free to PM !"),
"pm_on": "An automatic answer is now sent for denied PMs.",
"pm_allowed": "I have allowed you to PM now.",
"pm_blocked": ("I don't want any PM from you, "
"so you have been blocked !"),
"pm_denied": "I have denied you to PM now.",
"pm_go_away": ("Hey there! Unfortunately, I don't accept private messages from strangers."
"\n\nPlease contact me in a group, or wait for me to approve you."),
"pm_reported": "You just got reported to spam !",
"pm_limit_arg": "Argument must be 'off', 'on' or a number between 10 and 1000 !",
"pm_limit_off": "Not allowed users are now free to PM without be automatically blocked.",
"pm_limit_on": "Not allowed users are now blocked after {} PMs.",
"pm_limit_current": "Current limit is {}.",
"pm_limit_current_no": "Automatic user blocking is currently disabled.",
"pm_limit_reset": "Limit reseted to {}.",
"pm_limit_set": "Limit set to {}.",
"pm_notif_off": "Notifications from denied PMs are now disabled.",
"pm_notif_on": "Notifications from denied PMs are now enabled.",
"pm_triggered": ("Hey! I don't appreciate you barging into my PM like this !"
"\nDid you even ask me for approving you to PM ? No ?"
"\nGoodbye then."
"\n\nPS: You've been reported as spam."),
"pm_unblocked": ("Alright fine! I'll forgive them this time. PM has been unblocked for "
"this user."),
"unknow": ("An unknow problem as occured."
"\n\nPlease report problem with logs on "
"Github."),
"who_to_allow": "Who shall I allow to PM ?",
"who_to_block": "Specify who to block.",
"who_to_deny": "Who shall I deny to PM ?",
"who_to_report": "Who shall I report ?",
"who_to_unblock": "Specify who to unblock."}
def __init__(self):
self._me = None
self.default_pm_limit = 50
def config_complete(self):
self.name = self.strings["name"]
async def client_ready(self, client, db):
self._db = db
self._client = client
self._me = await client.get_me(True)
async def afkbackcmd(self, message):
"""Remove the AFK status.\n """
self._db.set(__name__, "afk", False)
self._db.set(__name__, "afk_gone", None)
self._db.set(__name__, "afk_rate", [])
await utils.answer(message, self.strings["afk_back"])
async def afkgocmd(self, message):
"""
.afkgo : Enable AFK status.
.afkgo [message] : Enable AFK status and add a reason.
"""
if utils.get_args_raw(message):
self._db.set(__name__, "afk", utils.get_args_raw(message))
else:
self._db.set(__name__, "afk", True)
self._db.set(__name__, "afk_gone", time.time())
self._db.set(__name__, "afk_rate", [])
await utils.answer(message, self.strings["afk_gone"])
async def afknogroupcmd(self, message):
"""
.afknogroup : Disable/Enable AFK status message for group chats.
.afknogroup off : Enable AFK status message for group chats.
.afknogroup on : Disable AFK status message for group chats.
"""
if utils.get_args_raw(message):
afknogroup_arg = utils.get_args_raw(message)
if afknogroup_arg == "off":
self._db.set(__name__, "afk_no_group", False)
await utils.answer(message, self.strings["afk_no_group_off"])
elif afknogroup_arg == "on":
self._db.set(__name__, "afk_no_group", True)
await utils.answer(message, self.strings["afk_no_group_on"])
else:
await utils.answer(message, self.strings["arg_on_off"])
else:
afknogroup_current = self._db.get(__name__, "afk_no_group")
if afknogroup_current is None or afknogroup_current is False:
self._db.set(__name__, "afk_no_group", True)
await utils.answer(message, self.strings["afk_no_group_on"])
elif afknogroup_current is True:
self._db.set(__name__, "afk_no_group", False)
await utils.answer(message, self.strings["afk_no_group_off"])
else:
await utils.answer(message, self.strings["unknow"])
async def afknopmcmd(self, message):
"""
.afknopm : Disable/Enable AFK status message for PMs.
.afknopm off : Enable AFK status message for PMs.
.afknopm on : Disable AFK status message for PMs.
"""
if utils.get_args_raw(message):
afknopm_arg = utils.get_args_raw(message)
if afknopm_arg == "off":
self._db.set(__name__, "afk_no_pm", False)
await utils.answer(message, self.strings["afk_no_pm_off"])
elif afknopm_arg == "on":
self._db.set(__name__, "afk_no_pm", True)
await utils.answer(message, self.strings["afk_no_pm_on"])
else:
await utils.answer(message, self.strings["arg_on_off"])
else:
afknopm_current = self._db.get(__name__, "afk_no_pm")
if afknopm_current is None or afknopm_current is False:
self._db.set(__name__, "afk_no_pm", True)
await utils.answer(message, self.strings["afk_no_pm_on"])
elif afknopm_current is True:
self._db.set(__name__, "afk_no_pm", False)
await utils.answer(message, self.strings["afk_no_pm_off"])
else:
await utils.answer(message, self.strings["unknow"])
async def afknotifcmd(self, message):
"""
.afknotif : Disable/Enable the notifications during AFK time.
.afknotif off : Disable the notifications during AFK time.
.afknotif on : Enable the notifications during AFK time.
"""
if utils.get_args_raw(message):
afknotif_arg = utils.get_args_raw(message)
if afknotif_arg == "off":
self._db.set(__name__, "afk_notif", False)
await utils.answer(message, self.strings["afk_notif_off"])
elif afknotif_arg == "on":
self._db.set(__name__, "afk_notif", True)
await utils.answer(message, self.strings["afk_notif_on"])
else:
await utils.answer(message, self.strings["arg_on_off"])
else:
afknotif_current = self._db.get(__name__, "afk_notif")
if afknotif_current is None or afknotif_current is False:
self._db.set(__name__, "afk_notif", True)
await utils.answer(message, self.strings["afk_notif_on"])
elif afknotif_current is True:
self._db.set(__name__, "afk_notif", False)
await utils.answer(message, self.strings["afk_notif_off"])
else:
await utils.answer(message, self.strings["unknow"])
async def afkratecmd(self, message):
"""
.afkrate : Disable/Enable AFK rate limit.
.afkrate off : Disable AFK rate limit.
.afkrate on : Enable AFK rate limit. One AFK status message max will be sent per chat.
"""
if utils.get_args_raw(message):
afkrate_arg = utils.get_args_raw(message)
if afkrate_arg == "off":
self._db.set(__name__, "afk_rate_limit", False)
await utils.answer(message, self.strings["afk_rate_limit_off"])
elif afkrate_arg == "on":
self._db.set(__name__, "afk_rate_limit", True)
await utils.answer(message, self.strings["afk_rate_limit_on"])
else:
await utils.answer(message, self.strings["arg_on_off"])
else:
afkrate_current = self._db.get(__name__, "afk_rate_limit")
if afkrate_current is None or afkrate_current is False:
self._db.set(__name__, "afk_rate_limit", True)
await utils.answer(message, self.strings["afk_rate_limit_on"])
elif afkrate_current is True:
self._db.set(__name__, "afk_rate_limit", False)
await utils.answer(message, self.strings["afk_rate_limit_off"])
else:
await utils.answer(message, self.strings["unknow"])
async def allowcmd(self, message):
"""Allow this user to PM.\n """
user = await utils.get_target(message)
if not user:
await utils.answer(message, self.strings["who_to_allow"])
return
self._db.set(__name__, "allow", list(set(self._db.get(__name__, "allow", [])).union({user})))
await utils.answer(message, self.strings["pm_allowed"].format(user))
async def blockcmd(self, message):
"""Block this user to PM without being warned.\n """
user = await utils.get_target(message)
if not user:
await utils.answer(message, self.strings["who_to_block"])
return
await message.client(functions.contacts.BlockRequest(user))
await utils.answer(message, self.strings["pm_blocked"].format(user))
async def denycmd(self, message):
"""Deny this user to PM without being warned.\n """
user = await utils.get_target(message)
if not user:
await utils.answer(message, self.strings["who_to_deny"])
return
self._db.set(__name__, "allow", list(set(self._db.get(__name__, "allow", [])).difference({user})))
await utils.answer(message, self.strings["pm_denied"].format(user))
async def pmcmd(self, message):
"""
.pm : Disable/Enable automatic answer for denied PMs.
.pm off : Disable automatic answer for denied PMs.
.pm on : Enable automatic answer for denied PMs.
"""
if utils.get_args_raw(message):
pm_arg = utils.get_args_raw(message)
if pm_arg == "off":
self._db.set(__name__, "pm", True)
await utils.answer(message, self.strings["pm_off"])
elif pm_arg == "on":
self._db.set(__name__, "pm", False)
await utils.answer(message, self.strings["pm_on"])
else:
await utils.answer(message, self.strings["arg_on_off"])
else:
pm_current = self._db.get(__name__, "pm")
if pm_current is None or pm_current is False:
self._db.set(__name__, "pm", True)
await utils.answer(message, self.strings["pm_off"])
elif pm_current is True:
self._db.set(__name__, "pm", False)
await utils.answer(message, self.strings["pm_on"])
else:
await utils.answer(message, self.strings["unknow"])
async def pmlimitcmd(self, message):
"""
.pmlimit : Get current max number of PMs before automatically block not allowed user.
.pmlimit off : Disable automatic user blocking.
.pmlimit on : Enable automatic user blocking.
.pmlimit reset : Reset max number of PMs before automatically block not allowed user.
.pmlimit [number] : Modify max number of PMs before automatically block not allowed user.
"""
if utils.get_args_raw(message):
pmlimit_arg = utils.get_args_raw(message)
if pmlimit_arg == "off":
self._db.set(__name__, "pm_limit", False)
await utils.answer(message, self.strings["pm_limit_off"])
return
elif pmlimit_arg == "on":
self._db.set(__name__, "pm_limit", True)
pmlimit_on = self.strings["pm_limit_on"].format(self.get_current_limit())
await utils.answer(message, pmlimit_on)
return
elif pmlimit_arg == "reset":
self._db.set(__name__, "pm_limit_max", self.default_pm_limit)
pmlimit_reset = self.strings["pm_limit_reset"].format(self.get_current_pm_limit())
await utils.answer(message, pmlimit_reset)
return
else:
try:
pmlimit_number = int(pmlimit_arg)
if pmlimit_number >= 10 and pmlimit_number <= 1000:
self._db.set(__name__, "pm_limit_max", pmlimit_number)
pmlimit_new = self.strings["pm_limit_set"].format(self.get_current_pm_limit())
await utils.answer(message, pmlimit_new)
return
else:
await utils.answer(message, self.strings["pm_limit_arg"])
return
except ValueError:
await utils.answer(message, self.strings["pm_limit_arg"])
return
await utils.answer(message, self.strings["limit_arg"])
else:
pmlimit = self._db.get(__name__, "pm_limit")
if pmlimit is None or pmlimit is False:
pmlimit_current = self.strings["pm_limit_current_no"]
elif pmlimit is True:
pmlimit_current = self.strings["pm_limit_current"].format(self.get_current_pm_limit())
else:
await utils.answer(message, self.strings["unknow"])
return
await utils.answer(message, pmlimit_current)
async def pmnotifcmd(self, message):
"""
.pmnotif : Disable/Enable the notifications from denied PMs.
.pmnotif off : Disable the notifications from denied PMs.
.pmnotif on : Enable the notifications from denied PMs.
"""
if utils.get_args_raw(message):
pmnotif_arg = utils.get_args_raw(message)
if pmnotif_arg == "off":
self._db.set(__name__, "pm_notif", False)
await utils.answer(message, self.strings["pm_notif_off"])
elif pmnotif_arg == "on":
self._db.set(__name__, "pm_notif", True)
await utils.answer(message, self.strings["pm_notif_on"])
else:
await utils.answer(message, self.strings["arg_on_off"])
else:
pmnotif_current = self._db.get(__name__, "pm_notif")
if pmnotif_current is None or pmnotif_current is False:
self._db.set(__name__, "pm_notif", True)
await utils.answer(message, self.strings["pm_notif_on"])
elif pmnotif_current is True:
self._db.set(__name__, "pm_notif", False)
await utils.answer(message, self.strings["pm_notif_off"])
else:
await utils.answer(message, self.strings["unknow"])
async def reportcmd(self, message):
"""Report the user to spam. Use only in PM.\n """
user = await utils.get_target(message)
if not user:
await utils.answer(message, self.strings["who_to_report"])
return
self._db.set(__name__, "allow", list(set(self._db.get(__name__, "allow", [])).difference({user})))
if message.is_reply and isinstance(message.to_id, types.PeerChannel):
await message.client(functions.messages.ReportRequest(peer=message.chat_id,
id=[message.reply_to_msg_id],
reason=types.InputReportReasonSpam()))
else:
await message.client(functions.messages.ReportSpamRequest(peer=message.to_id))
await utils.answer(message, self.strings["pm_reported"])
async def unblockcmd(self, message):
"""Unblock this user to PM."""
user = await utils.get_target(message)
if not user:
await utils.answer(message, self.strings["who_to_unblock"])
return
await message.client(functions.contacts.UnblockRequest(user))
await utils.answer(message, self.strings["pm_unblocked"].format(user))
async def watcher(self, message):
user = await utils.get_user(message)
pm = self._db.get(__name__, "pm")
if getattr(message.to_id, "user_id", None) == self._me.user_id and (pm is None or pm is False):
if not user.is_self and not user.bot and not user.verified and not self.get_allowed(message.from_id):
await utils.answer(message, self.strings["pm_go_away"])
if self._db.get(__name__, "pm_limit") is True:
pms = self._db.get(__name__, "pms", {})
pm_limit = self._db.get(__name__, "pm_limit_max")
pm_user = pms.get(message.from_id, 0)
if isinstance(pm_limit, int) and pm_limit >= 10 and pm_limit <= 1000 and pm_user >= pm_limit:
await utils.answer(message, self.strings["pm_triggered"])
await message.client(functions.contacts.BlockRequest(message.from_id))
await message.client(functions.messages.ReportSpamRequest(peer=message.from_id))
del pms[message.from_id]
self._db.set(__name__, "pms", pms)
else:
self._db.set(__name__, "pms", {**pms, message.from_id: pms.get(message.from_id, 0) + 1})
pm_notif = self._db.get(__name__, "pm_notif")
if pm_notif is None or pm_notif is False:
await message.client.send_read_acknowledge(message.chat_id)
return
if message.mentioned or getattr(message.to_id, "user_id", None) == self._me.user_id:
afk_status = self._db.get(__name__, "afk")
if user.is_self or user.bot or user.verified or afk_status is False:
return
if message.mentioned and self._db.get(__name__, "afk_no_group") is True:
return
afk_no_pm = self._db.get(__name__, "afk_no_pm")
if getattr(message.to_id, "user_id", None) == self._me.user_id and afk_no_pm is True:
return
if self._db.get(__name__, "afk_rate_limit") is True:
afk_rate = self._db.get(__name__, "afk_rate", [])
if utils.get_chat_id(message) in afk_rate:
return
else:
self._db.setdefault(__name__, {}).setdefault("afk_rate", []).append(utils.get_chat_id(message))
self._db.save()
now = datetime.datetime.now().replace(microsecond=0)
gone = datetime.datetime.fromtimestamp(self._db.get(__name__, "afk_gone")).replace(microsecond=0)
diff = now - gone
if afk_status is True:
afk_message = self.strings["afk"].format(diff)
elif afk_status is not False:
afk_message = self.strings["afk_reason"].format(diff, afk_status)
await utils.answer(message, afk_message)
_notif = self._db.get(__name__, "_notif")
if _notif is None or _notif is False:
await message.client.send_read_acknowledge(message.chat_id)
def get_allowed(self, id):
return id in self._db.get(__name__, "allow", [])
def get_current_pm_limit(self):
pm_limit = self._db.get(__name__, "pm_limit_max")
if not isinstance(pm_limit, int) or pm_limit < 10 or pm_limit > 1000:
pm_limit = self.default_pm_limit
self._db.set(__name__, "pm_limit_max", pm_limit)
return pm_limit