import logging
import json
# import telethon
from .. import loader, utils, security
logger = logging.getLogger(__name__)
@loader.tds
class InactiveDetectorMod(loader.Module):
"""Detects inactive users"""
strings = {
"name": "Inactivity detector",
"top_header": "These {un} users wrote {mn} messages or less since joining the group:\n\n",
"top_place": "[{name}](tg://user?id={uid}) ({nmsg})", # FIXME: mentions
"top_delimiter": ", ", # TODO: move to config
"not_int": "Most messages must be integer",
"recount_priv": "I can't recount stats in private messages!",
"recount_started": "Processing recount for chat {}. It may take a lot.",
"recount_db_dumped": "Dumped database to owners and/or saved messages",
"recount_dump": "Database dump for chat {cid}:\n\n{dmp}",
"recount_iter_done": "Iterated over {} messages in this chat",
"recount_finish": "Recount successful!"
}
def __init__(self):
self.config = loader.ModuleConfig(
"default_chat_id", -1001457369532, "Chat ID to get top if command used in PM",
"top_delimiter", ', ', "Separates inactivity top members",
"dump_db_before_recount", False, "Dump database of chat before recounting. "
"Dump will be sent to saved or bot owners"
)
self.name = self.strings['name']
async def client_ready(self, client, db):
self.client = client
self.db = db
self.me = await self.client.get_me()
async def inactivecmd(self, message):
""".inactive """
if message.is_private:
chat_id = self.config['default_chat_id']
else:
chat_id = message.chat_id
args = utils.get_args(message)
if args:
if args[0].isdigit():
most = int(args[0])
else:
await utils.answer(message, self.strings("not_int", message))
return
else:
most = 0
users_db = self.db.get(__name__, str(chat_id), {})
users = {}
async for user in self.client.iter_participants(chat_id):
if not user.bot:
if str(user.id) not in users_db:
users_db[str(user.id)] = self.get_empty_user(user)
users[str(user.id)] = users_db[str(user.id)]
# We won't include users not CURRENTLY in chat,
# but their stats will remain in the database
self.db.set(__name__, str(chat_id), users_db)
def key(x):
return x[1]['cnt']
users = sorted(users.items(), key=key)
text = []
for uid, u in users:
if u['cnt'] <= most:
text.append(self.strings('top_place', message).format(
name=u['name'], uid=uid, nmsg=u['cnt']
))
else:
break
msg = self.strings('top_header', message).format(un=len(text), mn=most)\
+ self.config['top_delimiter'].join(text)
kw = {}
if self.me.id != message.from_id:
kw['silent'] = True
await utils.answer(message, msg, parse_mode="md", **kw)
async def recountcmd(self, message):
if message.is_private:
await utils.answer(message, self.strings('recount_priv', message))
return
chat_id = message.chat_id
await utils.answer(message, self.strings('recount_started', message).format(chat_id))
db = self.db.get(__name__, str(chat_id), {})
json_db = json.dumps(db)
msg = self.strings("recount_dump", message).format(cid=chat_id, dmp=json_db)
logging.debug('Database dump (chat %d): %s', chat_id, json_db)
owners = self.db.get(security.__name__, "owner", ["me"])
if owners:
for owner in owners:
try:
await self.client.send_message(owner, msg)
except Exception:
logger.warning("Dump of chat %d sending to %d failed",
chat_id, owner, exc_info=True)
new_db = {}
n = 0
async for msg in self.client.iter_messages(chat_id, limit=None):
if not msg.sender.bot:
n += 1
from_id = msg.from_id
# Ensure such user exists, or create him
new_db[str(from_id)] = new_db.get(str(from_id),
self.get_empty_user(msg.sender))
new_db[str(from_id)]['cnt'] += 1
await utils.answer(message, self.strings("recount_iter_done", message).format(n))
self.db.set(__name__, str(chat_id), new_db)
await utils.answer(message, self.strings("recount_finish", message))
async def watcher(self, message):
if message.is_private:
return
else:
chat_id = str(message.chat_id)
users = self.db.get(__name__, chat_id, {})
from_id = str(message.from_id)
# this creates user if not exists
if message.sender:
users[from_id] = users.get(from_id, self.get_empty_user(message.sender))
users[from_id]["cnt"] += 1
self.db.set(__name__, chat_id, users)
def get_full_name(self, user):
fn, ln = '', ''
if user.first_name:
fn = user.first_name
if user.last_name: # Can be None, then we get an exception
ln = user.last_name
return (fn + ' ' + ln).strip()
def get_empty_user(self, user):
return {"cnt": 0, "name": self.get_full_name(user)}