# ╔╗╔┌─┐┬─┐┌─┐┬ ┬
# ║║║├┤ ├┬┘│ └┬┘
# ╝╚╝└─┘┴└─└─┘ ┴
# Code is licensed under CC-BY-NC-ND 4.0 unless otherwise specified.
# https://creativecommons.org/licenses/by-nc-nd/4.0/
# You CANNOT edit this file without direct permission from the author.
# You can redistribute this file without any changes.
# meta developer: @nercymods
# scope: hikka_min 1.6.2
# requires: matplotlib
from hikkatl.types import Message, PeerUser, PeerChat, PeerChannel
from .. import loader, utils
from collections import defaultdict
import matplotlib.pyplot as plt
import io
import asyncio
import warnings
import numpy as np
from matplotlib.colors import LinearSegmentedColormap
from telethon.tl.functions.messages import SearchRequest, GetHistoryRequest
from telethon.tl.types import InputMessagesFilterEmpty
plt.style.use('dark_background')
@loader.tds
class Top(loader.Module):
"""Module for viewing the top list in chat"""
strings = {
"name": "Top",
"top": "Top users by message count",
"topchat": "💬Top users in",
"msgcount": "Message count",
"loading": "🕒Message counting has started, please wait, it may take a long time if there are a lot of messages in the chat",
"private_chat": "💬Message count in private chat with"
}
strings_ru = {
"top": "Топ пользователей по количеству сообщений",
"topchat": "💬Топ пользователей в",
"msgcount": "Количество сообщений",
"loading": "🕒Подсчет сообщений начался, пожалуйста подождите, это может занять много времени если в чате много сообщений",
"private_chat": "💬Количество сообщений в личном чате с"
}
@loader.command(ru_doc="Посмотреть топ в чате")
async def top(self, m: Message):
"""View top in the chat"""
await utils.answer(m, self.strings['loading'])
client = self.client
if isinstance(m.peer_id, PeerUser):
chat_type = 'private'
chat_id = m.peer_id.user_id
elif isinstance(m.peer_id, PeerChat) or isinstance(m.peer_id, PeerChannel):
chat_type = 'chat'
chat_id = m.chat.id
else:
await utils.answer(m, "Unsupported chat type.")
return
if chat_type == 'chat':
users = await client.get_participants(chat_id)
users_dict = {user.id: (user.username or user.first_name) for user in users}
message_count = defaultdict(int)
for user_id in users_dict:
result = await client(SearchRequest(
peer=chat_id,
q='',
filter=InputMessagesFilterEmpty(),
from_id=user_id,
limit=0,
min_date=None,
max_date=None,
offset_id=0,
add_offset=0,
max_id=0,
min_id=0,
hash=0
))
message_count[user_id] = result.count
sorted_message_count = sorted(message_count.items(), key=lambda item: item[1], reverse=True)
top_users = sorted_message_count[:20]
usernames = [users_dict[user_id] or "Unknown" for user_id, _ in top_users]
counts = [count for _, count in top_users]
fig, ax = plt.subplots(figsize=(10, 5))
colors = self._generate_gradient('#8A2BE2', '#4B0082', len(usernames))
bars = ax.barh(usernames, counts, color=colors, edgecolor='black', linewidth=0.5)
for bar in bars:
bar.set_alpha(0.8)
bar.set_hatch('///')
ax.set_xlabel(self.strings['msgcount'], fontsize=12, color='white')
ax.set_title(self.strings['top'], fontsize=14, color='white', pad=20)
ax.invert_yaxis()
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
ax.spines['left'].set_color('#8A2BE2')
ax.spines['bottom'].set_color('#8A2BE2')
ax.grid(True, linestyle='--', alpha=0.6, color='gray')
for i, (bar, username) in enumerate(zip(bars, usernames)):
if i < 3:
bar.set_color('#FFD700')
ax.text(bar.get_width() + 5, bar.get_y() + bar.get_height() / 2,
f'#{i+1}', va='center', ha='left', color='#FFD700', fontsize=12)
buf = io.BytesIO()
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
plt.savefig(buf, format='png', bbox_inches='tight', dpi=100)
buf.seek(0)
caption = f"{self.strings['topchat']} {m.chat.title}:\n"
caption += "\n".join([f"{i+1}. {user} - {count}" for i, (user, count) in enumerate(zip(usernames, counts))])
await utils.answer_file(m, buf, caption, force_document=False)
else:
me = await client.get_me()
target = await client.get_entity(chat_id)
my_count, their_count = await asyncio.gather(
self._get_message_count_fast(client, chat_id, me.id),
self._get_message_count_fast(client, chat_id, target.id)
)
message_counts = [(me.first_name, my_count), (target.first_name, their_count)]
sorted_message_counts = sorted(message_counts, key=lambda item: item[1], reverse=True)
usernames = [user for user, _ in sorted_message_counts]
counts = [count for _, count in sorted_message_counts]
fig, ax = plt.subplots(figsize=(10, 5))
colors = self._generate_gradient('#8A2BE2', '#4B0082', len(usernames))
bars = ax.barh(usernames, counts, color=colors, edgecolor='black', linewidth=0.5)
for bar in bars:
bar.set_alpha(0.8)
bar.set_hatch('///')
ax.set_xlabel(self.strings['msgcount'], fontsize=12, color='white')
ax.set_title(self.strings['top'], fontsize=14, color='white', pad=20)
ax.invert_yaxis()
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
ax.spines['left'].set_color('#8A2BE2')
ax.spines['bottom'].set_color('#8A2BE2')
ax.grid(True, linestyle='--', alpha=0.6, color='gray')
for i, (bar, username) in enumerate(zip(bars, usernames)):
if i < 3:
bar.set_color('#FFD700')
ax.text(bar.get_width() + 5, bar.get_y() + bar.get_height() / 2,
f'#{i+1}', va='center', ha='left', color='#FFD700', fontsize=12)
buf = io.BytesIO()
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
plt.savefig(buf, format='png', bbox_inches='tight', dpi=100)
buf.seek(0)
caption = f"{self.strings['private_chat']} {target.first_name}:\n"
caption += "\n".join([f'"{user}" - {count}' for user, count in zip(usernames, counts)])
await utils.answer_file(m, buf, caption, force_document=False)
async def _get_message_count_fast(self, client, chat_id, user_id):
"""Получает количество сообщений от конкретного пользователя с использованием GetHistoryRequest"""
total_count = 0
offset_id = 0
limit = 100
while True:
history = await client(GetHistoryRequest(
peer=chat_id,
offset_id=offset_id,
offset_date=None,
add_offset=0,
limit=limit,
max_id=0,
min_id=0,
hash=0
))
if not history.messages:
break
for message in history.messages:
if message.sender_id == user_id:
total_count += 1
offset_id = history.messages[-1].id
if len(history.messages) < limit:
break
return total_count
def _generate_gradient(self, start_color, end_color, n):
"""Генерация градиента между двумя цветами"""
cmap = LinearSegmentedColormap.from_list('custom_gradient', [start_color, end_color], N=n)
return [cmap(i) for i in np.linspace(0, 1, n)]