mirror of
https://github.com/MuRuLOSE/limoka.git
synced 2026-06-16 14:34:17 +02:00
Added and updated repositories 2025-11-21 01:04:46
This commit is contained in:
@@ -1,147 +0,0 @@
|
||||
# ______ ___ ___ _ _
|
||||
# ____ | ___ \ | \/ | | | | |
|
||||
# / __ \| |_/ / _| . . | ___ __| |_ _| | ___
|
||||
# / / _` | __/ | | | |\/| |/ _ \ / _` | | | | |/ _ \
|
||||
# | | (_| | | | |_| | | | | (_) | (_| | |_| | | __/
|
||||
# \ \__,_\_| \__, \_| |_/\___/ \__,_|\__,_|_|\___|
|
||||
# \____/ __/ |
|
||||
# |___/
|
||||
|
||||
# На модуль распространяется лицензия "GNU General Public License v3.0"
|
||||
# https://github.com/all-licenses/GNU-General-Public-License-v3.0
|
||||
|
||||
# meta developer: @pymodule
|
||||
# requires: opencv-python pillow
|
||||
|
||||
import os, shutil, cv2
|
||||
from PIL import Image, UnidentifiedImageError
|
||||
from telethon.tl.functions.stickers import CreateStickerSetRequest
|
||||
from telethon.tl.types import InputStickerSetItem, InputDocument
|
||||
from telethon.errors.rpcerrorlist import PackShortNameOccupiedError
|
||||
from .. import loader
|
||||
from telethon.tl.functions.photos import GetUserPhotosRequest
|
||||
|
||||
import asyncio
|
||||
import random
|
||||
import string
|
||||
|
||||
try:
|
||||
resample = Image.Resampling.LANCZOS
|
||||
except:
|
||||
resample = Image.LANCZOS
|
||||
|
||||
@loader.tds
|
||||
class CreateAvatarsPack(loader.Module):
|
||||
"""Creates a sticker pack from photos and video avatars of participants"""
|
||||
strings = {
|
||||
"name": "CreateAvatarsPack",
|
||||
"processing": "📥 I'm collecting avatars of participants...",
|
||||
"no_avatars": "❌ No members with avatars",
|
||||
"no_valid": "❌ Could not process any avatars",
|
||||
"done": "✅ The sticker pack is ready:\n👉 <a href='https://t.me/addstickers/{}'>Open</a>",
|
||||
"already": "⚠️ A sticker pack with this name already exists.",
|
||||
}
|
||||
|
||||
strings_ru = {
|
||||
"processing": "📥 Собираю аватарки участников...",
|
||||
"no_avatars": "❌ Нет участников с аватарками",
|
||||
"no_valid": "❌ Не удалось обработать ни одну аватарку",
|
||||
"done": "✅ Стикерпак готов:\n👉 <a href='https://t.me/addstickers/{}'>Открыть</a>",
|
||||
"already": "⚠️ Стикерпак с таким именем уже существует",
|
||||
}
|
||||
|
||||
@loader.command(doc="- Create a sticker pack from the avatars of users in the group", ru_doc="- Создать стикерпак из аватаров пользователей группы", only_groups=True)
|
||||
async def createavatars(self, message):
|
||||
"""- Create a sticker pack from the avatars of users in the group"""
|
||||
chat = await message.get_chat()
|
||||
cid = abs(message.chat_id)
|
||||
await message.edit(self.strings["processing"])
|
||||
|
||||
users = []
|
||||
async for u in self._client.iter_participants(chat.id):
|
||||
if u.photo:
|
||||
users.append(u)
|
||||
if len(users) >= 100:
|
||||
break
|
||||
|
||||
if not users:
|
||||
return await message.edit(self.strings["no_avatars"])
|
||||
|
||||
tmp_dir = f"/tmp/avatars_{cid}"
|
||||
os.makedirs(tmp_dir, exist_ok=True)
|
||||
sticker_files = []
|
||||
|
||||
for u in users:
|
||||
try:
|
||||
photos = await self._client(GetUserPhotosRequest(u.id, 0, 0, 1))
|
||||
if not photos.photos:
|
||||
continue
|
||||
|
||||
raw = await self._client.download_media(photos.photos[0])
|
||||
data = raw if isinstance(raw, (bytes, bytearray)) else open(raw, "rb").read()
|
||||
|
||||
path_raw = os.path.join(tmp_dir, f"{u.id}_raw")
|
||||
with open(path_raw, "wb") as f:
|
||||
f.write(data)
|
||||
|
||||
if b"ftyp" in data[:32] or path_raw.endswith((".mp4", ".webm", ".mov")):
|
||||
cap = cv2.VideoCapture(path_raw)
|
||||
success, frame = cap.read()
|
||||
cap.release()
|
||||
if not success:
|
||||
continue
|
||||
img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGBA))
|
||||
else:
|
||||
try:
|
||||
img = Image.open(path_raw).convert("RGBA")
|
||||
except UnidentifiedImageError:
|
||||
continue
|
||||
|
||||
img.thumbnail((512, 512), resample)
|
||||
w, h = img.size
|
||||
final = Image.new("RGBA", (512, 512), (0, 0, 0, 0))
|
||||
final.paste(img, ((512 - w)//2, (512 - h)//2))
|
||||
|
||||
out = os.path.join(tmp_dir, f"{u.id}.webp")
|
||||
final.save(out, "WEBP")
|
||||
sticker_files.append(out)
|
||||
|
||||
except:
|
||||
continue
|
||||
|
||||
if not sticker_files:
|
||||
shutil.rmtree(tmp_dir, ignore_errors=True)
|
||||
return await message.edit(self.strings["no_valid"])
|
||||
|
||||
tag = ''.join(random.choices(string.ascii_lowercase + string.digits, k=4))
|
||||
short = f"f{cid}_{tag}_by_fcreateavatars"
|
||||
title = f"AvaPack {tag}"
|
||||
|
||||
stickers = []
|
||||
for p in sticker_files:
|
||||
await asyncio.sleep(0.3)
|
||||
file = await self._client.upload_file(p)
|
||||
msg = await self._client.send_file("me", file, force_document=True)
|
||||
doc = msg.document
|
||||
await self._client.delete_messages("me", msg.id)
|
||||
stickers.append(InputStickerSetItem(
|
||||
document=InputDocument(doc.id, doc.access_hash, doc.file_reference),
|
||||
emoji="🖼️"
|
||||
))
|
||||
|
||||
try:
|
||||
await self._client(CreateStickerSetRequest(
|
||||
user_id="me",
|
||||
title=title,
|
||||
short_name=short,
|
||||
stickers=stickers
|
||||
))
|
||||
except PackShortNameOccupiedError:
|
||||
shutil.rmtree(tmp_dir, ignore_errors=True)
|
||||
return await message.edit(self.strings["already"])
|
||||
except Exception as e:
|
||||
shutil.rmtree(tmp_dir, ignore_errors=True)
|
||||
return await message.edit(f"❌ Error: {e}")
|
||||
|
||||
shutil.rmtree(tmp_dir, ignore_errors=True)
|
||||
await message.edit(self.strings["done"].format(short))
|
||||
353
fiksofficial/python-modules/createpacks.py
Normal file
353
fiksofficial/python-modules/createpacks.py
Normal file
@@ -0,0 +1,353 @@
|
||||
# ______ ___ ___ _ _
|
||||
# ____ | ___ \ | \/ | | | | |
|
||||
# / __ \| |_/ / _| . . | ___ __| |_ _| | ___
|
||||
# / / _` | __/ | | | |\/| |/ _ \ / _` | | | | |/ _ \
|
||||
# | | (_| | | | |_| | | | | (_) | (_| | |_| | | __/
|
||||
# \ \__,_\_| \__, \_| |_/\___/ \__,_|\__,_|_|\___|
|
||||
# \____/ __/ |
|
||||
# |___/
|
||||
|
||||
# На модуль распространяется лицензия "GNU General Public License v3.0"
|
||||
# https://github.com/all-licenses/GNU-General-Public-License-v3.0
|
||||
|
||||
# meta developer: @pymodule
|
||||
# requires: opencv-python pillow
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import cv2
|
||||
import random
|
||||
import string
|
||||
import asyncio
|
||||
import logging
|
||||
from PIL import Image, UnidentifiedImageError
|
||||
|
||||
from telethon.tl.functions.stickers import CreateStickerSetRequest
|
||||
from telethon.tl.types import InputStickerSetItem, InputDocument
|
||||
from telethon.errors.rpcerrorlist import PackShortNameOccupiedError
|
||||
|
||||
from .. import loader, utils
|
||||
from telethon.tl.functions.photos import GetUserPhotosRequest
|
||||
|
||||
|
||||
try:
|
||||
resample = Image.Resampling.LANCZOS
|
||||
except AttributeError:
|
||||
resample = Image.LANCZOS
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
||||
async def process_to_webp(input_path: str, output_path: str, size: int = 512) -> bool:
|
||||
try:
|
||||
is_video = input_path.lower().endswith(('.mp4', '.webm', '.mov')) or b'ftyp' in open(input_path, 'rb').read(32)
|
||||
if is_video:
|
||||
cap = cv2.VideoCapture(input_path)
|
||||
success, frame = cap.read()
|
||||
cap.release()
|
||||
if not success:
|
||||
logger.warning(f"Video: Unable to read frame {input_path}")
|
||||
return False
|
||||
img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGBA))
|
||||
else:
|
||||
try:
|
||||
img = Image.open(input_path).convert("RGBA")
|
||||
except UnidentifiedImageError:
|
||||
logger.warning(f"Image: incorrect {input_path}")
|
||||
return False
|
||||
|
||||
img.thumbnail((size, size), resample)
|
||||
final = Image.new("RGBA", (size, size), (0, 0, 0, 0))
|
||||
w, h = img.size
|
||||
final.paste(img, ((size - w) // 2, (size - h) // 2))
|
||||
|
||||
final.save(output_path, "WEBP", quality=95, method=6)
|
||||
|
||||
try:
|
||||
check = Image.open(output_path)
|
||||
if check.size != (size, size):
|
||||
logger.warning(f"WEBP: size not {size}x{size}: {check.size}")
|
||||
return False
|
||||
if os.path.getsize(output_path) > 512 * 1024:
|
||||
final.save(output_path, "WEBP", quality=80, method=6)
|
||||
if os.path.getsize(output_path) > 512 * 1024:
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"WEBP: verification error {output_path}: {e}")
|
||||
return False
|
||||
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"WEBP: processing error {input_path}: {e}")
|
||||
return False
|
||||
|
||||
|
||||
|
||||
async def process_to_png(input_path: str, output_path: str, size: int = 100) -> bool:
|
||||
try:
|
||||
is_video = input_path.lower().endswith(('.mp4', '.webm', '.mov')) or b'ftyp' in open(input_path, 'rb').read(32)
|
||||
if is_video:
|
||||
cap = cv2.VideoCapture(input_path)
|
||||
success, frame = cap.read()
|
||||
cap.release()
|
||||
if not success:
|
||||
logger.warning(f"Video: Unable to read frame {input_path}")
|
||||
return False
|
||||
img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGBA))
|
||||
else:
|
||||
try:
|
||||
img = Image.open(input_path).convert("RGBA")
|
||||
except UnidentifiedImageError:
|
||||
logger.warning(f"Image: incorrect {input_path}")
|
||||
return False
|
||||
|
||||
img.thumbnail((size, size), resample)
|
||||
final = Image.new("RGBA", (size, size), (0, 0, 0, 0))
|
||||
w, h = img.size
|
||||
final.paste(img, ((size - w) // 2, (size - h) // 2))
|
||||
|
||||
final.save(output_path, "PNG")
|
||||
|
||||
try:
|
||||
check = Image.open(output_path)
|
||||
if check.size != (size, size):
|
||||
logger.warning(f"PNG: size not {size}x{size}: {check.size}")
|
||||
return False
|
||||
if os.path.getsize(output_path) > 512 * 1024:
|
||||
logger.warning(f"PNG: file >512KB: {os.path.getsize(output_path)}")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"PNG: verification error {output_path}: {e}")
|
||||
return False
|
||||
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"PNG: processing error {input_path}: {e}")
|
||||
return False
|
||||
|
||||
|
||||
@loader.tds
|
||||
class CreatePacks(loader.Module):
|
||||
"""Creates sticker packs and emoji packs from the avatars of chat participants"""
|
||||
|
||||
strings = {
|
||||
"name": "CreatePacks",
|
||||
"processing": "<b>[CreatePacks]</b> Collecting avatars of participants...",
|
||||
"no_avatars": "<b>[CreatePacks]</b> No members with avatars",
|
||||
"no_valid": "<b>[CreatePacks]</b> Could not process any avatars",
|
||||
"done_pack": "<b>[CreatePacks]</b> Sticker pack is ready:\n<b>[CreatePacks]</b> Open: <a href='https://t.me/addstickers/{}'>here</a>",
|
||||
"done_emoji_pack": "<b>[CreatePacks]</b> Emoji pack is ready:\n<b>[CreatePacks]</b> Open: <a href='https://t.me/addstickers/{}'>here</a>",
|
||||
"already": "<b>[CreatePacks]</b> A sticker pack with this name already exists.",
|
||||
"emoji_processing": "<b>[CreatePacks]</b> Creating emoji pack from avatars...",
|
||||
"emoji_no_emoji": "<b>[CreatePacks]</b> No emoji specified — using",
|
||||
}
|
||||
|
||||
strings_ru = {
|
||||
"_cls_doc": "Создаёт стикерпаки и эмодзи-паки из аватаров участников чата",
|
||||
"processing": "<b>[CreatePacks]</b> Собираю аватарки участников...",
|
||||
"no_avatars": "<b>[CreatePacks]</b> Нет участников с аватарками",
|
||||
"no_valid": "<b>[CreatePacks]</b> Не удалось обработать ни одну аватарку",
|
||||
"done_pack": "<b>[CreatePacks]</b> Стикерпак готов:\n<b>[CreatePacks]</b> Открыть: <a href='https://t.me/addstickers/{}'>здесь</a>",
|
||||
"done_emoji_pack": "<b>[CreatePacks]</b> Эмодзи-пак готов:\n<b>[CreatePacks]</b> Открыть: <a href='https://t.me/addstickers/{}'>здесь</a>",
|
||||
"already": "<b>[CreatePacks]</b> Стикерпак с таким именем уже существует",
|
||||
"emoji_processing": "<b>[CreatePacks]</b> Создаю эмодзи-пак из аватаров...",
|
||||
"emoji_no_emoji": "<b>[CreatePacks]</b> Эмодзи не указан — используется",
|
||||
}
|
||||
|
||||
async def _get_avatar_files(self, message, format: str = "webp", size: int = 512) -> tuple[list[str], str]:
|
||||
chat = await message.get_chat()
|
||||
cid = abs(message.chat_id)
|
||||
tmp_dir = f"/tmp/avatars_{cid}_{random.randint(1000, 9999)}"
|
||||
os.makedirs(tmp_dir, exist_ok=True)
|
||||
|
||||
users = []
|
||||
async for u in self._client.iter_participants(chat.id):
|
||||
if u.photo:
|
||||
users.append(u)
|
||||
if len(users) >= 100:
|
||||
break
|
||||
|
||||
if not users:
|
||||
shutil.rmtree(tmp_dir, ignore_errors=True)
|
||||
return [], tmp_dir
|
||||
|
||||
processed = []
|
||||
process_func = process_to_webp if format == "webp" else process_to_png
|
||||
|
||||
for u in users:
|
||||
try:
|
||||
photos = await self._client(GetUserPhotosRequest(u.id, 0, 0, 1))
|
||||
if not photos.photos:
|
||||
continue
|
||||
|
||||
raw_path = os.path.join(tmp_dir, f"{u.id}_raw")
|
||||
raw = await self._client.download_media(photos.photos[0], file=raw_path)
|
||||
|
||||
ext = ".webp" if format == "webp" else ".png"
|
||||
output_path = os.path.join(tmp_dir, f"{u.id}{ext}")
|
||||
success = False
|
||||
|
||||
if isinstance(raw, str):
|
||||
success = await process_func(raw, output_path, size=size)
|
||||
if os.path.exists(raw):
|
||||
os.unlink(raw)
|
||||
else:
|
||||
temp_raw = os.path.join(tmp_dir, f"{u.id}_temp_raw")
|
||||
with open(temp_raw, "wb") as f:
|
||||
f.write(raw)
|
||||
success = await process_func(temp_raw, output_path, size=size)
|
||||
if os.path.exists(temp_raw):
|
||||
os.unlink(temp_raw)
|
||||
|
||||
if success:
|
||||
try:
|
||||
img_size = Image.open(output_path).size
|
||||
if img_size != (size, size):
|
||||
logger.warning(f"{format.upper()}: size not {size}x{size}: {img_size}")
|
||||
os.unlink(output_path)
|
||||
continue
|
||||
if os.path.getsize(output_path) > 512 * 1024:
|
||||
logger.warning(f"{format.upper()}: file >512KB: {os.path.getsize(output_path)}")
|
||||
os.unlink(output_path)
|
||||
continue
|
||||
processed.append(output_path)
|
||||
except Exception as e:
|
||||
logger.error(f"{format.upper()}: verification error {output_path}: {e}")
|
||||
else:
|
||||
logger.warning(f"{format.upper()}: Failed to process avatar {u.id}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"User processing error {u.id}: {e}")
|
||||
continue
|
||||
|
||||
return processed, tmp_dir
|
||||
|
||||
@loader.command(
|
||||
ru_doc="- Создать стикерпак из аватаров в группе",
|
||||
only_groups=True
|
||||
)
|
||||
async def createavatars(self, message):
|
||||
"""- Create a sticker pack from avatars in a group"""
|
||||
await message.edit(self.strings("processing"))
|
||||
|
||||
files, tmp_dir = await self._get_avatar_files(message, format="webp", size=512)
|
||||
if not files:
|
||||
return await message.edit(self.strings("no_avatars"))
|
||||
|
||||
tag = ''.join(random.choices(string.ascii_lowercase + string.digits, k=4))
|
||||
short_name = f"f{abs(message.chat_id)}_{tag}_by_fcreateavatars"
|
||||
title = f"AvaPack {tag}"
|
||||
|
||||
stickers = []
|
||||
for path in files:
|
||||
try:
|
||||
await asyncio.sleep(0.3)
|
||||
file = await self._client.upload_file(path)
|
||||
msg = await self._client.send_file("me", file, force_document=True)
|
||||
doc = msg.document
|
||||
await self._client.delete_messages("me", msg.id)
|
||||
stickers.append(InputStickerSetItem(
|
||||
document=InputDocument(doc.id, doc.access_hash, doc.file_reference),
|
||||
emoji="🖼️"
|
||||
))
|
||||
except Exception as e:
|
||||
logger.error(f"Sticker loading error {path}: {e}")
|
||||
continue
|
||||
|
||||
if not stickers:
|
||||
shutil.rmtree(tmp_dir, ignore_errors=True)
|
||||
return await message.edit(self.strings("no_valid"))
|
||||
|
||||
try:
|
||||
await self._client(CreateStickerSetRequest(
|
||||
user_id="me",
|
||||
title=title,
|
||||
short_name=short_name,
|
||||
stickers=stickers
|
||||
))
|
||||
await message.edit(self.strings("done_pack").format(short_name))
|
||||
except PackShortNameOccupiedError:
|
||||
await message.edit(self.strings("already"))
|
||||
except Exception as e:
|
||||
error_details = f"❌ Ошибка создания стикерпака:\n<code>{type(e).__name__}: {e}</code>\n"
|
||||
error_details += f"Пак: {short_name}\nСтикеров: {len(stickers)}\n"
|
||||
if files:
|
||||
error_details += f"Последний файл: {files[-1]}\n"
|
||||
try:
|
||||
error_details += f"Размер: {Image.open(files[-1]).size}\n"
|
||||
error_details += f"Вес: {os.path.getsize(files[-1])} байт"
|
||||
except:
|
||||
pass
|
||||
await message.edit(error_details)
|
||||
logger.exception("Error creating sticker pack")
|
||||
finally:
|
||||
shutil.rmtree(tmp_dir, ignore_errors=True)
|
||||
|
||||
@loader.command(
|
||||
ru_doc="[эмодзи] - Создать эмодзи-пак из всех аватаров",
|
||||
only_groups=True
|
||||
)
|
||||
async def createemojis(self, message):
|
||||
"""[emoji] - Create an emoji pack from all avatars"""
|
||||
args = utils.get_args_raw(message)
|
||||
emoji = args.strip() if args else "🖼️"
|
||||
if not args:
|
||||
await message.edit(self.strings("emoji_no_emoji") + f" `{emoji}`")
|
||||
await asyncio.sleep(1.5)
|
||||
|
||||
await message.edit(self.strings("emoji_processing"))
|
||||
|
||||
files, tmp_dir = await self._get_avatar_files(message, format="png", size=100)
|
||||
if not files:
|
||||
return await message.edit(self.strings("no_avatars"))
|
||||
|
||||
tag = ''.join(random.choices(string.ascii_lowercase + string.digits, k=4))
|
||||
short_name = f"f{abs(message.chat_id)}_{tag}_by_fcreateemojis"
|
||||
title = f"EmojiPack {tag}"
|
||||
|
||||
stickers = []
|
||||
for path in files:
|
||||
try:
|
||||
await asyncio.sleep(0.3)
|
||||
file = await self._client.upload_file(path)
|
||||
msg = await self._client.send_file("me", file, force_document=True)
|
||||
doc = msg.document
|
||||
await self._client.delete_messages("me", msg.id)
|
||||
stickers.append(InputStickerSetItem(
|
||||
document=InputDocument(doc.id, doc.access_hash, doc.file_reference),
|
||||
emoji=emoji
|
||||
))
|
||||
except Exception as e:
|
||||
logger.error(f"Error loading emoji {path}: {e}")
|
||||
continue
|
||||
|
||||
if not stickers:
|
||||
shutil.rmtree(tmp_dir, ignore_errors=True)
|
||||
return await message.edit(self.strings("no_valid"))
|
||||
|
||||
try:
|
||||
await self._client(CreateStickerSetRequest(
|
||||
user_id="me",
|
||||
title=title,
|
||||
short_name=short_name,
|
||||
stickers=stickers,
|
||||
emojis=True
|
||||
))
|
||||
await message.edit(self.strings("done_emoji_pack").format(short_name))
|
||||
except PackShortNameOccupiedError:
|
||||
await message.edit(self.strings("already"))
|
||||
except Exception as e:
|
||||
error_details = f"❌ Ошибка создания эмодзи-пака:\n<code>{type(e).__name__}: {e}</code>\n"
|
||||
error_details += f"Пак: {short_name}\nСмайликов: {len(stickers)}\n"
|
||||
if files:
|
||||
error_details += f"Последний файл: {files[-1]}\n"
|
||||
try:
|
||||
error_details += f"Размер: {Image.open(files[-1]).size}\n"
|
||||
error_details += f"Вес: {os.path.getsize(files[-1])} байт"
|
||||
except:
|
||||
pass
|
||||
await message.edit(error_details)
|
||||
logger.exception("Error creating emoji pack")
|
||||
finally:
|
||||
shutil.rmtree(tmp_dir, ignore_errors=True)
|
||||
@@ -449,7 +449,7 @@ class DeviceInfo(loader.Module):
|
||||
await call.edit(
|
||||
text=self.strings["no_results"].format(query),
|
||||
reply_markup=[],
|
||||
photo=None, # Explicitly remove any existing photo
|
||||
photo=None,
|
||||
disable_web_page_preview=True
|
||||
)
|
||||
except Exception as edit_error:
|
||||
@@ -471,7 +471,7 @@ class DeviceInfo(loader.Module):
|
||||
await call.edit(
|
||||
text=list_text,
|
||||
reply_markup=button_rows,
|
||||
photo=None, # Explicitly remove any existing photo
|
||||
photo=None,
|
||||
disable_web_page_preview=True
|
||||
)
|
||||
except Exception as edit_error:
|
||||
@@ -491,8 +491,8 @@ class DeviceInfo(loader.Module):
|
||||
await call.edit(
|
||||
text=self.strings["error"].format(str(e)),
|
||||
reply_markup=[],
|
||||
photo=None, # Explicitly remove any existing photo
|
||||
disable_web_page_preview=True
|
||||
photo=None,
|
||||
disable_web_page_preview=True
|
||||
)
|
||||
except Exception as edit_error:
|
||||
logger.warning(f"DeviceInfo: Failed to edit error message: {edit_error}")
|
||||
|
||||
@@ -1,3 +1,12 @@
|
||||
# ______ ___ ___ _ _
|
||||
# ____ | ___ \ | \/ | | | | |
|
||||
# / __ \| |_/ / _| . . | ___ __| |_ _| | ___
|
||||
# / / _` | __/ | | | |\/| |/ _ \ / _` | | | | |/ _ \
|
||||
# | | (_| | | | |_| | | | | (_) | (_| | |_| | | __/
|
||||
# \ \__,_\_| \__, \_| |_/\___/ \__,_|\__,_|_|\___|
|
||||
# \____/ __/ |
|
||||
# |___/
|
||||
|
||||
# На модуль распространяется лицензия "GNU General Public License v3.0"
|
||||
# https://github.com/all-licenses/GNU-General-Public-License-v3.0
|
||||
|
||||
@@ -5,30 +14,147 @@
|
||||
# requires: speedtest-cli
|
||||
|
||||
import speedtest
|
||||
from .. import loader
|
||||
from .. import loader, utils
|
||||
|
||||
@loader.tds
|
||||
class SpeedTestMod(loader.Module):
|
||||
"""Модуль для проверки скорости интернета"""
|
||||
"""Checking your internet speed"""
|
||||
strings = {
|
||||
"name": "SpeedTest",
|
||||
"starting": "Running Speedtest…",
|
||||
"ping": "Ping: <i>{:.2f} ms</i>",
|
||||
"download": "Download: <i>{:.2f} Mbps</i>",
|
||||
"upload": "Upload: <i>{:.2f} Mbps</i>",
|
||||
"finished": "<b>Speedtest completed!</b>",
|
||||
"error": "Speedtest error: <code>{}</code>",
|
||||
"progress_ping": "Testing \"Ping\"...",
|
||||
"progress_download": "Testing \"Download\"...",
|
||||
"progress_upload": "Testing \"Upload\"...",
|
||||
"cfg_timeout": "Server request timeout (sec)",
|
||||
"cfg_retries": "Number of retry attempts",
|
||||
"quality_website": "Websites: {}",
|
||||
"quality_video": "Video: {}",
|
||||
"quality_gaming": "Gaming: {}",
|
||||
"quality_calls": "Video calls: {}",
|
||||
}
|
||||
|
||||
strings = {"name": "SpeedTest"}
|
||||
strings_ru = {
|
||||
"_cls_doc": "Проверка скорости интернета",
|
||||
"starting": "Запускаем Speedtest…",
|
||||
"ping": "Ping: <i>{:.2f} мс</i>",
|
||||
"download": "Загрузка: <i>{:.2f} Мбит/с</i>",
|
||||
"upload": "Отдача: <i>{:.2f} Мбит/с</i>",
|
||||
"finished": "<b>Speedtest завершён!</b>",
|
||||
"error": "Ошибка при выполнении Speedtest: <code>{}</code>",
|
||||
"progress_ping": "Тестируем пинг...",
|
||||
"progress_download": "Тестируем скачивание...",
|
||||
"progress_upload": "Тестируем загрузку...",
|
||||
"cfg_timeout": "Таймаут запросов к серверу (сек)",
|
||||
"cfg_retries": "Кол‑во попыток при неудаче",
|
||||
"quality_website": "Сайты: {}",
|
||||
"quality_video": "Видео: {}",
|
||||
"quality_gaming": "Игры: {}",
|
||||
"quality_calls": "Видеосвязь: {}",
|
||||
}
|
||||
|
||||
def __init__(self):
|
||||
self.config = loader.ModuleConfig(
|
||||
loader.ConfigValue(
|
||||
"timeout",
|
||||
30,
|
||||
lambda: self.strings("cfg_timeout"),
|
||||
validator=loader.validators.Integer(minimum=10, maximum=120),
|
||||
),
|
||||
loader.ConfigValue(
|
||||
"retries",
|
||||
2,
|
||||
lambda: self.strings("cfg_retries"),
|
||||
validator=loader.validators.Integer(minimum=0, maximum=5),
|
||||
),
|
||||
)
|
||||
|
||||
def _get_quality_rating(self, category: str, ping: float, download: float, upload: float) -> str:
|
||||
if category == "website":
|
||||
if ping < 50 and download > 5:
|
||||
return "🟢🟢🟢🟢🟢"
|
||||
elif ping < 100 and download > 3:
|
||||
return "🟠🟠🟠🟠"
|
||||
elif ping < 200 and download > 1:
|
||||
return "🟡🟡🟡"
|
||||
elif ping < 300 and download > 0.5:
|
||||
return "🔴🔴"
|
||||
else:
|
||||
return "⚫"
|
||||
elif category == "video":
|
||||
if ping < 50 and download > 25:
|
||||
return "🟢🟢🟢🟢🟢"
|
||||
elif ping < 75 and download > 5:
|
||||
return "🟠🟠🟠🟠"
|
||||
elif ping < 100 and download > 3:
|
||||
return "🟡🟡🟡"
|
||||
elif ping < 150 and download > 1:
|
||||
return "🔴🔴"
|
||||
else:
|
||||
return "⚫"
|
||||
elif category == "gaming":
|
||||
if ping < 50 and download > 5 and upload > 3:
|
||||
return "🟢🟢🟢🟢🟢"
|
||||
elif ping < 100 and download > 3 and upload > 1:
|
||||
return "🟠🟠🟠🟠"
|
||||
elif ping < 150 and download > 1 and upload > 0.5:
|
||||
return "🟡🟡🟡"
|
||||
elif ping < 200 and download > 0.5:
|
||||
return "🔴🔴"
|
||||
else:
|
||||
return "⚫"
|
||||
elif category == "calls":
|
||||
if ping < 50 and download > 4 and upload > 4:
|
||||
return "🟢🟢🟢🟢🟢"
|
||||
elif ping < 100 and download > 1.5 and upload > 1.5:
|
||||
return "🟡🟡🟡🟡"
|
||||
elif ping < 150 and download > 1 and upload > 1:
|
||||
return "🟠🟠🟠"
|
||||
elif ping < 200 and download > 0.5:
|
||||
return "🔴🔴"
|
||||
else:
|
||||
return "⚫"
|
||||
return "⚫"
|
||||
|
||||
@loader.command(
|
||||
ru_doc="(.st) - Запускает тест скорости интернета",
|
||||
en_doc="(.st) - Runs an internet speed test",
|
||||
alias="st",
|
||||
)
|
||||
async def speedcmd(self, message):
|
||||
"""Запускает тест скорости интернета"""
|
||||
msg = await message.edit("Запускаем Speedtest... 🏁")
|
||||
msg = await utils.answer(message, self.strings("starting"))
|
||||
|
||||
try:
|
||||
st = speedtest.Speedtest()
|
||||
st.get_best_server()
|
||||
download = st.download() / 1_000_000 # Мбит/с
|
||||
upload = st.upload() / 1_000_000 # Мбит/с
|
||||
ping = st.results.ping
|
||||
s = speedtest.Speedtest()
|
||||
s.get_best_server()
|
||||
await utils.answer(msg, self.strings("progress_ping"))
|
||||
|
||||
await msg.edit(
|
||||
f"<emoji document_id=5325547803936572038>✨</emoji> <b>Speedtest завершён!</b> <emoji document_id=5325547803936572038>✨</emoji>\n\n"
|
||||
f"<b>Ping:</b> <i>{ping:.2f} ms</i>\n"
|
||||
f"<emoji document_id=6041730074376410123>📥</emoji> <b>Загрузка:</b> <i>{download:.2f} Mbps</i>\n"
|
||||
f"<emoji document_id=6041730074376410123>📤</emoji> <b>Отдача:</b> <i>{upload:.2f} Mbps</i>",
|
||||
parse_mode="HTML"
|
||||
ping = s.results.ping
|
||||
|
||||
await utils.answer(msg, self.strings("progress_download"))
|
||||
download = s.download() / 1_000_000
|
||||
|
||||
await utils.answer(msg, self.strings("progress_upload"))
|
||||
upload = s.upload() / 1_000_000
|
||||
|
||||
text = (
|
||||
f"{self.strings('finished')}\n\n"
|
||||
f"{self.strings('ping').format(ping)}\n"
|
||||
f"{self.strings('download').format(download)}\n"
|
||||
f"{self.strings('upload').format(upload)}\n\n"
|
||||
f"{self.strings('quality_website').format(self._get_quality_rating('website', ping, download, upload))}\n"
|
||||
f"{self.strings('quality_video').format(self._get_quality_rating('video', ping, download, upload))}\n"
|
||||
f"{self.strings('quality_gaming').format(self._get_quality_rating('gaming', ping, download, upload))}\n"
|
||||
f"{self.strings('quality_calls').format(self._get_quality_rating('calls', ping, download, upload))}"
|
||||
)
|
||||
|
||||
await utils.answer(msg, text)
|
||||
except Exception as exc:
|
||||
await utils.answer(
|
||||
msg,
|
||||
self.strings("error").format(utils.escape_html(str(exc))),
|
||||
)
|
||||
except Exception as e:
|
||||
await msg.edit(f"<b>Ошибка при выполнении Speedtest:</b>\n<code>{e}</code>")
|
||||
|
||||
Reference in New Issue
Block a user