mirror of
https://github.com/MuRuLOSE/limoka.git
synced 2026-06-16 06:24:18 +02:00
1781 lines
67 KiB
Python
Executable File
1781 lines
67 KiB
Python
Executable File
__version__ = (3, 0, 2)
|
||
|
||
# █ █ ▀ █▄▀ ▄▀█ █▀█ ▀
|
||
# █▀█ █ █ █ █▀█ █▀▄ █
|
||
# © Copyright 2022
|
||
# https://t.me/hikariatama
|
||
#
|
||
# 🔒 Licensed under the GNU AGPLv3
|
||
# 🌐 https://www.gnu.org/licenses/agpl-3.0.html
|
||
|
||
# meta pic: https://img.icons8.com/fluency/240/000000/sticker.png
|
||
# meta banner: https://mods.hikariatama.ru/badges/sticks.jpg
|
||
# meta developer: @hikarimods
|
||
# scope: ffmpeg
|
||
# scope: disable_onload_docs
|
||
# requires: Pillow moviepy emoji==2.1.0 requests_toolbelt
|
||
# scope: hikka_min 1.3.3
|
||
|
||
import asyncio
|
||
import io
|
||
import logging
|
||
import math
|
||
import os
|
||
import random
|
||
import re
|
||
import time
|
||
|
||
import emoji
|
||
import grapheme
|
||
import moviepy.editor as mp
|
||
import requests
|
||
from PIL import Image, ImageChops, ImageDraw, ImageFont
|
||
from requests_toolbelt import MultipartEncoder
|
||
from telethon.errors.rpcerrorlist import RPCError
|
||
from telethon.tl.functions.messages import (
|
||
ClearRecentStickersRequest,
|
||
GetStickerSetRequest,
|
||
InstallStickerSetRequest,
|
||
UninstallStickerSetRequest,
|
||
UploadMediaRequest,
|
||
)
|
||
from telethon.tl.types import (
|
||
InputDocument,
|
||
InputMediaUploadedDocument,
|
||
InputPeerSelf,
|
||
InputStickerSetShortName,
|
||
Message,
|
||
)
|
||
from telethon.utils import get_input_document
|
||
|
||
from .. import loader, utils
|
||
from ..inline.types import InlineCall
|
||
|
||
# https://gist.github.com/turicas/1455973
|
||
|
||
|
||
class ImageText:
|
||
def __init__(
|
||
self,
|
||
size: tuple,
|
||
mode: str = "RGBA",
|
||
background: tuple = (0, 0, 0, 0),
|
||
encoding: str = "utf8",
|
||
):
|
||
self.size = size
|
||
self.image = Image.new(mode, self.size, color=background)
|
||
self.draw = ImageDraw.Draw(self.image)
|
||
self.encoding = encoding
|
||
|
||
def get_font_size(
|
||
self,
|
||
text: str,
|
||
font: ImageFont,
|
||
max_width: int = None,
|
||
max_height: int = None,
|
||
) -> int:
|
||
if max_width is None and max_height is None:
|
||
raise ValueError("You need to pass max_width or max_height")
|
||
font_size = 1
|
||
text_size = self.get_text_size(font, font_size, text)
|
||
if (max_width is not None and text_size[0] > max_width) or (
|
||
max_height is not None and text_size[1] > max_height
|
||
):
|
||
raise ValueError("Text can't be filled in only (%dpx, %dpx)" % text_size)
|
||
|
||
while True:
|
||
if (max_width is not None and text_size[0] >= max_width) or (
|
||
max_height is not None and text_size[1] >= max_height
|
||
):
|
||
return font_size - 1
|
||
font_size += 1
|
||
text_size = self.get_text_size(font, font_size, text)
|
||
|
||
def write_text(
|
||
self,
|
||
coordinates: tuple,
|
||
text: str,
|
||
font_stream: io.BytesIO,
|
||
font_size: int = 11,
|
||
color: tuple = (0, 0, 0),
|
||
max_width: int = None,
|
||
max_height: int = None,
|
||
) -> int:
|
||
x, y = coordinates
|
||
# if isinstance(text, str):
|
||
# text = text.decode(self.encoding)
|
||
if font_size == "fill" and (max_width is not None or max_height is not None):
|
||
font_size = self.get_font_size(text, font_stream, max_width, max_height)
|
||
text_size = self.get_text_size(font_stream, font_size, text)
|
||
font_stream.seek(0)
|
||
font = ImageFont.truetype(font_stream, font_size)
|
||
if x == "center":
|
||
x = (self.size[0] - text_size[0]) / 2
|
||
if y == "center":
|
||
y = (self.size[1] - text_size[1]) / 2
|
||
self.draw.text((x, y), text, font=font, fill=color)
|
||
return text_size
|
||
|
||
def get_text_size(self, font_stream: io.BytesIO, font_size: int, text: str) -> int:
|
||
font_stream.seek(0)
|
||
font = ImageFont.truetype(font_stream, font_size)
|
||
return font.getsize(text)
|
||
|
||
def write_text_box(
|
||
self,
|
||
coordinates: tuple,
|
||
text: str,
|
||
box_width: int,
|
||
font_stream: io.BytesIO,
|
||
font_size: int = 11,
|
||
color: tuple = (0, 0, 0),
|
||
place: str = "center",
|
||
justify_last_line: bool = False,
|
||
):
|
||
x, y = coordinates
|
||
lines = []
|
||
line = []
|
||
words = text.split()
|
||
for word in words:
|
||
new_line = " ".join(line + [word])
|
||
size = self.get_text_size(font_stream, font_size, new_line)
|
||
text_height = size[1]
|
||
if size[0] <= box_width:
|
||
line.append(word)
|
||
else:
|
||
lines.append(line)
|
||
line = [word]
|
||
if line:
|
||
lines.append(line)
|
||
lines = [" ".join(line) for line in lines if line]
|
||
height = y
|
||
for index, line in enumerate(lines):
|
||
height += text_height
|
||
if place == "left":
|
||
self.write_text((x, height), line, font_stream, font_size, color)
|
||
elif place == "right":
|
||
total_size = self.get_text_size(font_stream, font_size, line)
|
||
x_left = x + box_width - total_size[0]
|
||
self.write_text((x_left, height), line, font_stream, font_size, color)
|
||
elif place == "center":
|
||
total_size = self.get_text_size(font_stream, font_size, line)
|
||
x_left = int(x + ((box_width - total_size[0]) / 2))
|
||
self.write_text((x_left, height), line, font_stream, font_size, color)
|
||
elif place == "justify":
|
||
words = line.split()
|
||
if (index == len(lines) - 1 and not justify_last_line) or len(
|
||
words
|
||
) == 1:
|
||
self.write_text((x, height), line, font_stream, font_size, color)
|
||
continue
|
||
line_without_spaces = "".join(words)
|
||
total_size = self.get_text_size(
|
||
font_stream, font_size, line_without_spaces
|
||
)
|
||
space_width = (box_width - total_size[0]) / (len(words) - 1.0)
|
||
start_x = x
|
||
for word in words[:-1]:
|
||
self.write_text(
|
||
(start_x, height), word, font_stream, font_size, color
|
||
)
|
||
word_size = self.get_text_size(font_stream, font_size, word)
|
||
start_x += word_size[0] + space_width
|
||
last_word_size = self.get_text_size(font_stream, font_size, words[-1])
|
||
last_word_x = x + box_width - last_word_size[0]
|
||
self.write_text(
|
||
(last_word_x, height), words[-1], font_stream, font_size, color
|
||
)
|
||
return (box_width, height - y)
|
||
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
distinct_emoji_list = getattr(
|
||
emoji,
|
||
"distinct_emoji_lis",
|
||
getattr(emoji, "distinct_emoji_list", None),
|
||
)
|
||
|
||
if distinct_emoji_list is None:
|
||
raise ImportError
|
||
|
||
|
||
class HikariException(Exception):
|
||
pass
|
||
|
||
|
||
@loader.tds
|
||
class StickManagerMod(loader.Module):
|
||
"""Sticker manager with video stickers support and friendly design"""
|
||
|
||
strings = {
|
||
"name": "StickManager",
|
||
"no_args": "🚫 <b>This command requires arguments</b>",
|
||
"no_such_pack": "🚫 <b>Stickerset not found</b>",
|
||
"stickersets_added": (
|
||
"🌁 <code>{}</code><b> stickerset(-s) added, </b><code>{}</code><b>"
|
||
" removed!</b>"
|
||
),
|
||
"no_stickersets_to_import": "🚫 <b>No stickersets to import</b>",
|
||
"no_stickersets": "🚫 <b>You have no stickersets</b>",
|
||
"alias_removed": "✅ <b>Alias </b><code>{}</code><b> removed</b>",
|
||
"remove_alias_404": "🚫 <b>No pack has alias </b><code>{}</code>",
|
||
"pack404": "🚫 <b>Pack </b><code>{}</code><b> not found</b>",
|
||
"created_alias": (
|
||
"{} <b>Created alias for {}. Access it with </b><code>{}</code>"
|
||
),
|
||
"packs_header": "👨🎤 <b>Active Stickerpacks:</b>\n\n",
|
||
"default": "{} <b>Set pack {} as default</b>",
|
||
"packremoved": "{} <b>Removed pack {}</b>",
|
||
"error": "🚫 <b>{}</b>",
|
||
"kang": (
|
||
"<emoji document_id='6334599075337340489'>🍵</emoji> <b>Sticker added to <a"
|
||
' href="https://t.me/addstickers/{}">pack</a></b>'
|
||
),
|
||
"created": (
|
||
"<emoji document_id='5370900768796711127'>🍾</emoji> <b>Created new pack {}"
|
||
' <a href="https://t.me/addstickers/{}">add</a></b>'
|
||
),
|
||
"alias_exists": "🚫 <b>Alias </b><code>{}</code><b> exists</b>",
|
||
"stickrm": "{} <b>Sticker removed from pack</b>",
|
||
"need_reply": "🚫 <b>Reply to use this command</b>",
|
||
"cleaned": "⏳ <b>Recents cleared.</b>",
|
||
"processing": "👩🎤 <b>Processing media...</b>",
|
||
"processing_gif": "🧑🏻🎤 <b>Processing video...</b>",
|
||
"rmbg": (
|
||
"<emoji document_id='6048696253632482685'>✂️</emoji> <b>Removing"
|
||
" background...</b>"
|
||
),
|
||
"trimming": (
|
||
"<emoji document_id='6037132221691727143'>✂️</emoji> <b>Trimming...</b>"
|
||
),
|
||
"outline": (
|
||
"<emoji document_id='6048640560791555243'>🖌</emoji> <b>Adding"
|
||
" outline...</b>"
|
||
),
|
||
"adding_text": (
|
||
"<emoji document_id='6048366494633430880'>🅰️</emoji> <b>Adding text...</b>"
|
||
),
|
||
"exporting": (
|
||
"<emoji document_id='6048887676029898150'>📥</emoji> <b>Exporting...</b>"
|
||
),
|
||
"confirm_remove": "🚫 <b>Are you sure you want to delete pack {}?</b>",
|
||
"remove": "🚫 Delete",
|
||
"cancel": "🔻 Cancel",
|
||
"deleting_pack": "😓 <b>Deleting pack...</b>",
|
||
}
|
||
|
||
strings_ru = {
|
||
"no_args": "🚫 <b>Эта команда требует аргументы</b>",
|
||
"no_such_pack": "🚫 <b>Стикерпак не найден</b>",
|
||
"stickersets_added": (
|
||
"🌁 <code>{}</code><b> стикерпак(-ов) добавлено, </b><code>{}</code><b>"
|
||
" удалено!</b>"
|
||
),
|
||
"no_stickersets_to_import": "🚫 <b>Нет стикерпаков для импорта</b>",
|
||
"no_stickersets": "🚫 <b>У тебя нет стикерпаков</b>",
|
||
"alias_removed": "✅ <b>Алиас </b><code>{}</code><b> удален</b>",
|
||
"remove_alias_404": "🚫 <b>Нет стикерпака с алиасом </b><code>{}</code>",
|
||
"pack404": "🚫 <b>Стикерпак </b><code>{}</code><b> не найден</b>",
|
||
"created_alias": "{} <b>Создан алиас для {}. Алиас: </b><code>{}</code>",
|
||
"packs_header": "👨🎤 <b>Активные стикерпаки:</b>\n\n",
|
||
"default": "{} <b>Пак {} установлен в качестве основного</b>",
|
||
"packremoved": "{} <b>Пак {} удален</b>",
|
||
"error": "🚫 <b>{}</b>",
|
||
"alias_exists": "🚫 <b>Алиас </b><code>{}</code><b> уже существует</b>",
|
||
"stickrm": "{} <b>Стикер удален из пака</b>",
|
||
"_cls_doc": (
|
||
"Управление стикерпаками с поддержкой видеопаков и дружелюбным интерфейсом"
|
||
),
|
||
"need_reply": "🚫 <b>Необходим ответ на сообщение</b>",
|
||
"cleaned": "⏳ <b>Недавние очищены.</b>",
|
||
"processing": "👩🎤 <b>Обрабатываю медиа...</b>",
|
||
"processing_gif": "🧑🏻🎤 <b>Обрабатываю видео...</b>",
|
||
"rmbg": (
|
||
"<emoji document_id='6048696253632482685'>✂️</emoji> <b>Убираю фон...</b>"
|
||
),
|
||
"trimming": (
|
||
"<emoji document_id='6037132221691727143'>✂️</emoji> <b>Подрезаю края...</b>"
|
||
),
|
||
"outline": (
|
||
"<emoji document_id='6048640560791555243'>🖌</emoji> <b>Добавлю"
|
||
" окантовку...</b>"
|
||
),
|
||
"adding_text": (
|
||
"<emoji document_id='6048366494633430880'>🅰️</emoji> <b>Накладываю"
|
||
" текст...</b>"
|
||
),
|
||
"exporting": (
|
||
"<emoji document_id='6048887676029898150'>📥</emoji> <b>Экспортирую...</b>"
|
||
),
|
||
"confirm_remove": "🚫 <b>Вы уверены, что хотите удалить пак {}?</b>",
|
||
"remove": "🚫 Удалить",
|
||
"cancel": "🔻 Отмена",
|
||
"deleting_pack": "😓 <b>Удаляю пак...</b>",
|
||
}
|
||
|
||
strings_de = {
|
||
"no_args": "🚫 <b>Diese Befehl benötigt Argumente</b>",
|
||
"no_such_pack": "🚫 <b>Stickerpack nicht gefunden</b>",
|
||
"stickersets_added": (
|
||
"🌁 <code>{}</code><b> Stickerpack(-s) hinzugefügt, </b><code>{}</code><b>"
|
||
" entfernt!</b>"
|
||
),
|
||
"no_stickersets_to_import": "🚫 <b>Keine Stickerpacks zum Importieren</b>",
|
||
"no_stickersets": "🚫 <b>Du hast keine Stickerpacks</b>",
|
||
"alias_removed": "✅ <b>Alias </b><code>{}</code><b> entfernt</b>",
|
||
"remove_alias_404": "🚫 <b>Kein Stickerpack mit Alias </b><code>{}</code>",
|
||
"pack404": "🚫 <b>Stickerpack </b><code>{}</code><b> nicht gefunden</b>",
|
||
"created_alias": "{} <b>Alias für {} erstellt. Alias: </b><code>{}</code>",
|
||
"packs_header": "👨🎤 <b>Aktive Stickerpacks:</b>\n\n",
|
||
"default": "{} <b>Pack {} als Standard festgelegt</b>",
|
||
"packremoved": "{} <b>Pack {} entfernt</b>",
|
||
"error": "🚫 <b>{}</b>",
|
||
"alias_exists": "🚫 <b>Alias </b><code>{}</code><b> existiert bereits</b>",
|
||
"stickrm": "{} <b>Sticker aus Pack entfernt</b>",
|
||
"_cls_doc": (
|
||
"Verwalte Stickerpacks mit Unterstützung für Videopacks und freundliches"
|
||
" Interface"
|
||
),
|
||
"need_reply": "🚫 <b>Antwort auf Nachricht erforderlich</b>",
|
||
"cleaned": "⏳ <b>Recent gelöscht.</b>",
|
||
"processing": "👩🎤 <b>Verarbeite Medien...</b>",
|
||
"processing_gif": "🧑🏻🎤 <b>Verarbeite Video...</b>",
|
||
"rmbg": (
|
||
"<emoji document_id='6048696253632482685'>✂️</emoji> <b>Hintergrund"
|
||
" entfernen...</b>"
|
||
),
|
||
"trimming": (
|
||
"<emoji document_id='6037132221691727143'>✂️</emoji> <b>Ränder"
|
||
" zuschneiden...</b>"
|
||
),
|
||
"outline": (
|
||
"<emoji document_id='6048640560791555243'>🖌</emoji> <b>Umrandung"
|
||
" hinzufügen...</b>"
|
||
),
|
||
"adding_text": (
|
||
"<emoji document_id='6048366494633430880'>🅰️</emoji> <b>Text"
|
||
" hinzufügen...</b>"
|
||
),
|
||
"exporting": (
|
||
"<emoji document_id='6048887676029898150'>📥</emoji> <b>Exportieren...</b>"
|
||
),
|
||
"confirm_remove": (
|
||
"🚫 <b>Sind Sie sicher, dass Sie Pack {} entfernen möchten?</b>"
|
||
),
|
||
"remove": "🚫 Entfernen",
|
||
"cancel": "🔻 Abbrechen",
|
||
"deleting_pack": "😓 <b>Pack entfernen...</b>",
|
||
}
|
||
|
||
strings_tr = {
|
||
"no_args": "🚫 <b>Bu komut için argüman gerekli.</b>",
|
||
"no_such_pack": "🚫 <b>Böyle bir paket bulunamadı.</b>",
|
||
"stickersets_added": (
|
||
"🌁 <code>{}</code><b> paket eklendi, </b><code>{}</code><b>"
|
||
" kaldırıldı!</b>"
|
||
),
|
||
"no_stickersets_to_import": "🚫 <b>İçe aktarılacak paket yok.</b>",
|
||
"no_stickersets": "🚫 <b>Paketiniz yok.</b>",
|
||
"alias_removed": (
|
||
"✅ <b> </b><code>{}</code><b> adlı paketin takma adı kaldırıldı.</b>"
|
||
),
|
||
"remove_alias_404": "🚫 <b> </b><code>{}</code><b> adlı paket yok.</b>",
|
||
"pack404": "🚫 <b> </b><code>{}</code><b> adlı paket bulunamadı.</b>",
|
||
"created_alias": (
|
||
"{} <b> </b><code>{}</code><b> adlı paket için takma ad oluşturuldu. Takma"
|
||
" ad: </b><code>{}</code>"
|
||
),
|
||
"packs_header": "👨🎤 <b>Aktif paketler:</b>\n\n",
|
||
"default": "{} <b> </b><code>{}</code><b> adlı paket varsayılan yapıldı.</b>",
|
||
"packremoved": "{} <b> </b><code>{}</code><b> adlı paket kaldırıldı.</b>",
|
||
"error": "🚫 <b>{}</b>",
|
||
"alias_exists": "🚫 <b> </b><code>{}</code><b> adlı takma ad zaten var.</b>",
|
||
"stickrm": "{} <b>Paketten çıkartıldı.</b>",
|
||
"_cls_doc": "Video paketleri ve kullanıcı dostu arayüz ile paketleri yönetin.",
|
||
"need_reply": "🚫 <b>Bir mesaja yanıt vermeniz gerekiyor.</b>",
|
||
"cleaned": "⏳ <b>Temizlendi.</b>",
|
||
"processing": "👩🎤 <b>Medya işleniyor...</b>",
|
||
"processing_gif": "🧑🏻🎤 <b>Video işleniyor...</b>",
|
||
"rmbg": (
|
||
"<emoji document_id='6048696253632482685'>✂️</emoji> <b>Arkaplan"
|
||
" kaldırılıyor...</b>"
|
||
),
|
||
"trimming": (
|
||
"<emoji document_id='6037132221691727143'>✂️</emoji> <b>Kenarlar"
|
||
" kırpılıyor...</b>"
|
||
),
|
||
"outline": (
|
||
"<emoji document_id='6048640560791555243'>🖌</emoji> <b>Anahat"
|
||
" ekleniyor...</b>"
|
||
),
|
||
"adding_text": (
|
||
"<emoji document_id='6048366494633430880'>🅰️</emoji> <b>Metin"
|
||
" ekleniyor...</b>"
|
||
),
|
||
"exporting": (
|
||
"<emoji document_id='6048887676029898150'>📥</emoji> <b>Dışa"
|
||
" aktarılıyor...</b>"
|
||
),
|
||
"confirm_remove": (
|
||
"🚫 <b> </b><code>{}</code><b> adlı paketi kaldırmak istediğinizden emin"
|
||
" misiniz?</b>"
|
||
),
|
||
"remove": "🚫 Kaldır",
|
||
"cancel": "🔻 İptal",
|
||
"deleting_pack": "😓 <b>Paket siliniyor...</b>",
|
||
}
|
||
|
||
strings_hi = {
|
||
"no_args": "🚫 <b>इस कमांड के लिए एर्गुमेंट आवश्यक है।</b>",
|
||
"no_such_pack": "🚫 <b>कोई ऐसा पैक नहीं मिला।</b>",
|
||
"stickersets_added": (
|
||
"🌁 <code>{}</code><b> पैक जोड़ा गया, </b><code>{}</code><b> हटा दिया"
|
||
" गया!</b>"
|
||
),
|
||
"no_stickersets_to_import": "🚫 <b>इम्पोर्ट करने के लिए कोई पैक नहीं।</b>",
|
||
"no_stickersets": "🚫 <b>आपके पास कोई पैक नहीं है।</b>",
|
||
"alias_removed": (
|
||
"✅ <b> </b><code>{}</code><b> नाम के पैक का उपनाम हटा दिया गया।</b>"
|
||
),
|
||
"remove_alias_404": "🚫 <b> </b><code>{}</code><b> नाम का कोई पैक नहीं।</b>",
|
||
"pack404": "🚫 <b> </b><code>{}</code><b> नाम का कोई पैक नहीं मिला।</b>",
|
||
"created_alias": (
|
||
"{} <b> </b><code>{}</code><b> नाम के पैक के लिए उपनाम बनाया गया। उपनाम:"
|
||
" </b><code>{}</code>"
|
||
),
|
||
"packs_header": "👨🎤 <b>सक्रिय पैक:</b>\n\n",
|
||
"default": "{} <b> </b><code>{}</code><b> नाम के पैक को डिफ़ॉल्ट किया गया।</b>",
|
||
"packremoved": "{} <b> </b><code>{}</code><b> नाम के पैक को हटा दिया गया।</b>",
|
||
"error": "🚫 <b>{}</b>",
|
||
"alias_exists": "🚫 <b> </b><code>{}</code><b> नाम का उपनाम पहले से ही है।</b>",
|
||
"stickrm": "{} <b>स्टिकर को पैक से हटा दिया गया।</b>",
|
||
"_cls_doc": "वीडियो पैक और उपयोगकर्ता अनुकूल प्रणाली के साथ पैक का प्रबंधन करें।",
|
||
"need_reply": "🚫 <b>आपको एक संदेश पर उत्तर देना चाहिए।</b>",
|
||
"cleaned": "⏳ <b>सफाई की गई।</b>",
|
||
"processing": "👩🎤 <b>मीडिया प्रोसेस हो रहा है...</b>",
|
||
"processing_gif": "🧑🏻🎤 <b>वीडियो प्रोसेस हो रहा है...</b>",
|
||
"rmbg": (
|
||
"<emoji document_id='6048696253632482685'>✂️</emoji> <b>बैकग्राउंड हटा रहा"
|
||
" हूँ...</b>"
|
||
),
|
||
"exporting": (
|
||
"<emoji document_id='6048887676029898150'>📥</emoji> <b>निर्यात कर रहा"
|
||
" हूँ...</b>"
|
||
),
|
||
"confirm_remove": (
|
||
"🚫 <b>क्या आप वाकई </b><code>{}</code><b> पैक को हटाना चाहते हैं?</b>"
|
||
),
|
||
"remove": "🚫 हटाएं",
|
||
"cancel": "🔻 रद्द करें",
|
||
"deleting_pack": "😓 <b>पैक हटा रहा हूँ...</b>",
|
||
}
|
||
|
||
def find(self, args: str) -> str:
|
||
if args in self.stickersets:
|
||
p = self.stickersets[args].copy()
|
||
p.update({"shortname": args})
|
||
return p
|
||
|
||
for shortname, info in self.stickersets.copy().items():
|
||
if info["alias"] == args:
|
||
info.update({"shortname": shortname})
|
||
return info
|
||
|
||
return False
|
||
|
||
@staticmethod
|
||
def trim(image: Image) -> Image:
|
||
bg = Image.new(image.mode, image.size, image.getpixel((0, 0)))
|
||
diff = ImageChops.difference(image, bg)
|
||
diff = ImageChops.add(diff, diff, 2.0, -100)
|
||
bbox = diff.getbbox()
|
||
if bbox:
|
||
bbox = (
|
||
bbox[0] - 5 if bbox[0] >= 5 else 0,
|
||
bbox[1] - 5 if bbox[1] >= 5 else 0,
|
||
bbox[2] + 5 if bbox[2] + 5 <= image.width else image.width,
|
||
bbox[3] + 5 if bbox[3] + 5 <= image.height else image.height,
|
||
)
|
||
return image.crop(bbox)
|
||
|
||
return image
|
||
|
||
@staticmethod
|
||
def stroke(
|
||
image: Image,
|
||
strokeSize: int = 5,
|
||
color: tuple = (255, 255, 255),
|
||
) -> Image:
|
||
# Create a disc kernel
|
||
kernel = []
|
||
kernelSize = math.ceil(strokeSize) * 2 + 1 # Should always be odd
|
||
kernelRadius = strokeSize + 0.5
|
||
kernelCenter = kernelSize / 2 - 1
|
||
pixelRadius = 1 / math.sqrt(math.pi)
|
||
for x in range(kernelSize):
|
||
kernel.append([])
|
||
for y in range(kernelSize):
|
||
distanceToCenter = math.sqrt(
|
||
(kernelCenter - x + 0.5) ** 2 + (kernelCenter - y + 0.5) ** 2
|
||
)
|
||
if distanceToCenter <= kernelRadius - pixelRadius:
|
||
value = 1 # This pixel is fully inside the circle
|
||
elif distanceToCenter <= kernelRadius:
|
||
value = min(
|
||
1,
|
||
(kernelRadius - distanceToCenter + pixelRadius)
|
||
/ (pixelRadius * 2),
|
||
) # Mostly inside
|
||
elif distanceToCenter <= kernelRadius + pixelRadius:
|
||
value = min(
|
||
1,
|
||
(pixelRadius - (distanceToCenter - kernelRadius))
|
||
/ (pixelRadius * 2),
|
||
) # Mostly outside
|
||
else:
|
||
value = 0 # This pixel is fully outside the circle
|
||
kernel[x].append(value)
|
||
kernelExtent = int(len(kernel) / 2)
|
||
imageWidth, imageHeight = image.size
|
||
outline = image.copy()
|
||
outline.paste((0, 0, 0, 0), [0, 0, imageWidth, imageHeight])
|
||
imagePixels = image.load()
|
||
outlinePixels = outline.load()
|
||
# Morphological grayscale dilation
|
||
for x in range(imageWidth):
|
||
for y in range(imageHeight):
|
||
highestValue = 0
|
||
for kx in range(-kernelExtent, kernelExtent + 1):
|
||
for ky in range(-kernelExtent, kernelExtent + 1):
|
||
kernelValue = kernel[kx + kernelExtent][ky + kernelExtent]
|
||
if (
|
||
x + kx >= 0
|
||
and y + ky >= 0
|
||
and x + kx < imageWidth
|
||
and y + ky < imageHeight
|
||
and kernelValue > 0
|
||
):
|
||
highestValue = max(
|
||
highestValue,
|
||
min(
|
||
255,
|
||
int(
|
||
round(
|
||
imagePixels[x + kx, y + ky][3] * kernelValue
|
||
)
|
||
),
|
||
),
|
||
)
|
||
outlinePixels[x, y] = (color[0], color[1], color[2], highestValue)
|
||
outline.paste(image, (0, 0), image)
|
||
return outline
|
||
|
||
@loader.command(
|
||
ru_doc="<ответ> - Убрать фон с картинки",
|
||
de_doc="<antwort> - Hintergrund von Bild entfernen",
|
||
tr_doc="<yanıt> - Resimden arka planı kaldır",
|
||
hi_doc="<उत्तर> - छवि से पृष्ठभूमि निकालें",
|
||
)
|
||
async def rmbg(self, message: Message):
|
||
"""<reply> - Remove background from image"""
|
||
reply = await message.get_reply_message()
|
||
if not reply or not reply.photo and not reply.sticker:
|
||
await utils.answer(message, self.strings("need_reply"))
|
||
return
|
||
|
||
message = await utils.answer(message, self.strings("rmbg"))
|
||
|
||
photo = io.BytesIO()
|
||
photo.name = "photo.webp"
|
||
Image.open(
|
||
await (self.remove_bg_api if self.config["use_api"] else self.remove_bg)(
|
||
io.BytesIO(await self._client.download_media(reply, bytes))
|
||
)
|
||
).save(photo, "WEBP")
|
||
photo.seek(0)
|
||
await self._client.send_file(
|
||
message.peer_id,
|
||
photo,
|
||
reply_to=reply.id,
|
||
)
|
||
if message.out:
|
||
await message.delete()
|
||
|
||
async def prepare(
|
||
self,
|
||
status: Message,
|
||
message: Message,
|
||
outline: bool = False,
|
||
remove_bg: bool = False,
|
||
text: str = None,
|
||
only_bytes: bool = False,
|
||
) -> InputDocument:
|
||
dl = io.BytesIO(await self._client.download_file(message.media, bytes))
|
||
dl.seek(0)
|
||
|
||
if remove_bg:
|
||
status = await utils.answer(status, self.strings("rmbg"))
|
||
dl = await (
|
||
self.remove_bg_api if self.config["use_api"] else self.remove_bg
|
||
)(dl)
|
||
dl.seek(0)
|
||
|
||
status = await utils.answer(status, self.strings("trimming"))
|
||
img = await utils.run_sync(self.trim, Image.open(dl))
|
||
|
||
if outline:
|
||
status = await utils.answer(status, self.strings("outline"))
|
||
img = await utils.run_sync(self.stroke, img, self.config["stroke_size"])
|
||
|
||
if text:
|
||
status = await utils.answer(status, self.strings("adding_text"))
|
||
await self._font_ready.wait()
|
||
new_img = Image.new("RGBA", (img.width, img.height), (255, 255, 255, 0))
|
||
proportions = img.width / img.height
|
||
image_text = ImageText(new_img.size)
|
||
self._raw_font.seek(0)
|
||
image_text.write_text_box(
|
||
(0, 0),
|
||
text,
|
||
new_img.width - 8,
|
||
self._raw_font,
|
||
max(
|
||
self.config["font_size"],
|
||
(
|
||
40
|
||
if len(text) < 5
|
||
else (
|
||
35
|
||
if len(text) < 10
|
||
else (
|
||
30
|
||
if len(text) < 15
|
||
else 25 if len(text) < 20 else self.config["font_size"]
|
||
)
|
||
)
|
||
),
|
||
),
|
||
(255, 255, 255),
|
||
"center",
|
||
)
|
||
image_text.image = await utils.run_sync(self.trim, image_text.image)
|
||
img = img.resize(
|
||
(
|
||
round((img.height - image_text.image.height) * proportions),
|
||
round(img.height - image_text.image.height),
|
||
)
|
||
)
|
||
new_img.paste(
|
||
img,
|
||
((new_img.width - img.width) // 2, image_text.image.height + 8),
|
||
mask=img,
|
||
)
|
||
new_img.paste(
|
||
image_text.image,
|
||
(
|
||
(new_img.width - image_text.image.width) // 2,
|
||
4,
|
||
(new_img.width - image_text.image.width) // 2
|
||
+ image_text.image.width,
|
||
image_text.image.height + 4,
|
||
),
|
||
mask=image_text.image,
|
||
)
|
||
img = new_img
|
||
|
||
w, h = img.size
|
||
if w > h:
|
||
img = img.resize((512, int(h * (512 / w))), Image.ANTIALIAS)
|
||
else:
|
||
img = img.resize((int((w * (512 / h))), 512), Image.ANTIALIAS)
|
||
|
||
dst = io.BytesIO()
|
||
img.save(dst, "PNG")
|
||
mime = "image/png"
|
||
|
||
status = await utils.answer(status, self.strings("exporting"))
|
||
|
||
if only_bytes:
|
||
return dst.getvalue()
|
||
|
||
file = await self._client.upload_file(dst.getvalue())
|
||
file = InputMediaUploadedDocument(file, mime, [])
|
||
document = await self._client(UploadMediaRequest(InputPeerSelf(), file))
|
||
document = get_input_document(document)
|
||
|
||
return document
|
||
|
||
async def prepare_vid(self, message: Message) -> InputDocument:
|
||
dl = await self._client.download_file(message.media, bytes)
|
||
|
||
with open("sticker.mp4", "wb") as f:
|
||
f.write(dl)
|
||
|
||
clip = mp.VideoFileClip("sticker.mp4")
|
||
|
||
clip = clip.subclip(0, 3)
|
||
|
||
w, h = clip.size
|
||
|
||
size = f"512:{int(h * (512 / w))}" if w > h else f"{int(w * (512 / h))}:512"
|
||
|
||
clip.write_videofile(
|
||
"sticker.webm",
|
||
audio=False,
|
||
codec="libvpx-vp9",
|
||
logger=None,
|
||
fps=15,
|
||
preset="faster",
|
||
ffmpeg_params=["-pix_fmt", "yuv420p", "-vf", f"scale={size}"],
|
||
)
|
||
|
||
clip.close()
|
||
|
||
os.remove("sticker.mp4")
|
||
|
||
return "sticker.webm"
|
||
|
||
def __init__(self):
|
||
self.config = loader.ModuleConfig(
|
||
loader.ConfigValue(
|
||
"font_size",
|
||
18,
|
||
"Font size of text to apply to image",
|
||
validator=loader.validators.Integer(minimum=0),
|
||
),
|
||
loader.ConfigValue(
|
||
"stroke_size",
|
||
5,
|
||
"Stroke size to apply to image",
|
||
validator=loader.validators.Integer(minimum=5),
|
||
),
|
||
loader.ConfigValue(
|
||
"use_api",
|
||
False,
|
||
"Use API to remove background, not bot",
|
||
validator=loader.validators.Boolean(),
|
||
),
|
||
)
|
||
|
||
async def client_ready(self):
|
||
self.stickersets = self.pointer("stickersets", {})
|
||
self.default = self.get("default", None)
|
||
|
||
if not self.default and self.stickersets:
|
||
self.default = list(self.stickersets.keys())[0]
|
||
|
||
self.emojies = list(
|
||
grapheme.graphemes(
|
||
"🌌🌃🏙🌇🌆🌁🌉🎑🏞🎆🌅🌄🌠🎇🗾🐭🐱🐶🐹🐰🦊🐻🐼🐻❄️🐨🐯🦁🐮🐷🐸🐵🙉🐥🦆🦄🐴🐗🐺🦇🦉🦅"
|
||
)
|
||
)
|
||
|
||
self._font_ready = asyncio.Event()
|
||
|
||
asyncio.ensure_future(self._dl_font())
|
||
|
||
async def _dl_font(self):
|
||
self._raw_font = io.BytesIO(
|
||
(
|
||
await utils.run_sync(
|
||
requests.get,
|
||
"https://0x0.st/oLI7.ttf",
|
||
)
|
||
).content
|
||
)
|
||
self._font_ready.set()
|
||
|
||
@loader.command(
|
||
ru_doc="<short_name> <название> [-a <алиас>] - Создать новый стикерпак",
|
||
de_doc="<short_name> <name> [-a <alias>] - Erstelle einen neuen Stickerpack",
|
||
tr_doc="<kısa_ad> <isim> [-a <takma_ad>] - Yeni bir sticker paketi oluştur",
|
||
hi_doc="<छोटा_नाम> <नाम> [-a <उपनाम>] - एक नया स्टिकर सेट बनाएं",
|
||
)
|
||
async def newpack(self, message: Message):
|
||
"""<short_name> <name> [-a <alias>] - Create new pack"""
|
||
args = utils.get_args_raw(message)
|
||
if "-a" in args:
|
||
alias = args[args.find("-a") + 3 :]
|
||
args = args[: args.find("-a")]
|
||
else:
|
||
alias = None
|
||
|
||
args = args.split(maxsplit=1)
|
||
reply = await message.get_reply_message()
|
||
|
||
if not args or len(args) < 2:
|
||
await utils.answer(
|
||
message,
|
||
self.strings("error").format(
|
||
"Usage: .newpack <short_name> <name>"
|
||
),
|
||
)
|
||
return
|
||
|
||
if not reply or not reply.media:
|
||
await utils.answer(
|
||
message, self.strings("error").format("Reply to a photo required")
|
||
)
|
||
return
|
||
|
||
message = await utils.answer(message, self.strings("processing"))
|
||
|
||
shortname, name = args
|
||
shortname, name = shortname.strip().lower(), name.strip()
|
||
|
||
stick = await self.prepare(message, reply)
|
||
assert stick
|
||
|
||
async with self._client.conversation("@stickers") as conv:
|
||
try:
|
||
m = await conv.send_message("/cancel")
|
||
r = await conv.get_response()
|
||
|
||
await m.delete()
|
||
await r.delete()
|
||
|
||
m = await conv.send_message("/newpack")
|
||
r = await conv.get_response()
|
||
|
||
if (
|
||
"Now choose a name" not in r.raw_text
|
||
and "выберите название для нового" not in r.raw_text
|
||
):
|
||
raise HikariException(
|
||
"UNEXPECTED_ANSWER - Error while creating pack"
|
||
)
|
||
|
||
await m.delete()
|
||
await r.delete()
|
||
|
||
m = await conv.send_message(name)
|
||
r = await conv.get_response()
|
||
|
||
if (
|
||
"Now send me" not in r.raw_text
|
||
and "будущий стикер" not in r.raw_text
|
||
):
|
||
raise HikariException("UNEXPECTED_ANSWER - Error when typing name")
|
||
|
||
await m.delete()
|
||
await r.delete()
|
||
|
||
m = await conv.send_file(stick, force_document=True)
|
||
r = await conv.get_response()
|
||
|
||
if not (
|
||
"Now send me an emoji" in r.raw_text
|
||
or ("Пожалуйста" in r.raw_text and "смайл" in r.raw_text)
|
||
):
|
||
raise HikariException("UNEXPECTED_ANSWER - Error when sending file")
|
||
|
||
await m.delete()
|
||
await r.delete()
|
||
|
||
m = await conv.send_message("🔆")
|
||
r = await conv.get_response()
|
||
|
||
if "/publish" not in r.raw_text:
|
||
raise HikariException("UNEXPECTED_ANSWER - No publish option")
|
||
|
||
await m.delete()
|
||
await r.delete()
|
||
|
||
m = await conv.send_message("/publish")
|
||
r = await conv.get_response()
|
||
|
||
if "/skip" not in r.raw_text:
|
||
raise HikariException("UNEXPECTED_ANSWER - No skip option")
|
||
|
||
await m.delete()
|
||
await r.delete()
|
||
|
||
m = await conv.send_message("/skip")
|
||
r = await conv.get_response()
|
||
|
||
await m.delete()
|
||
await r.delete()
|
||
|
||
m = await conv.send_message(shortname)
|
||
r = await conv.get_response()
|
||
|
||
if (
|
||
"this short name is already taken" in r.raw_text
|
||
or "Увы, такой адрес уже занят." in r.raw_text
|
||
):
|
||
raise HikariException("UNEXPECTED_ANSWER - Occupied shortname")
|
||
|
||
await m.delete()
|
||
await r.delete()
|
||
except HikariException as e:
|
||
await utils.answer(message, f"🚫 <code>{e}</code>")
|
||
return
|
||
|
||
await self._client(
|
||
InstallStickerSetRequest(
|
||
stickerset=InputStickerSetShortName(shortname), archived=False
|
||
)
|
||
)
|
||
|
||
if len(self.stickersets) >= len(self.emojies):
|
||
emoji = random.choice(self.emojies)
|
||
else:
|
||
emoji = self.emojies[len(self.stickersets) + 1]
|
||
|
||
self.stickersets[shortname] = {"title": name, "emoji": emoji, "alias": alias}
|
||
|
||
await utils.answer(
|
||
message,
|
||
self.strings("created").format(
|
||
name,
|
||
shortname,
|
||
),
|
||
)
|
||
|
||
@loader.command(
|
||
ru_doc="<short_name> <имя> [-a <алиас>] - Создать новый видео стикерпак",
|
||
de_doc=(
|
||
"<short_name> <name> [-a <alias>] - Erstelle einen neuen Video Stickerpack"
|
||
),
|
||
tr_doc=(
|
||
"<kısa_ad> <isim> [-a <takma_ad>] - Yeni bir video sticker paketi oluştur"
|
||
),
|
||
hi_doc="<छोटा_नाम> <नाम> [-a <उपनाम>] - एक नया वीडियो स्टिकर सेट बनाएं",
|
||
)
|
||
async def newvidpack(self, message: Message):
|
||
"""<short_name> <name> [-a <alias>] - Create new video stickers pack"""
|
||
args = utils.get_args_raw(message)
|
||
if "-a" in args:
|
||
alias = args[args.find("-a") + 3 :]
|
||
args = args[: args.find("-a")]
|
||
else:
|
||
alias = None
|
||
|
||
args = args.split(maxsplit=1)
|
||
reply = await message.get_reply_message()
|
||
|
||
if not args or len(args) < 2:
|
||
await utils.answer(
|
||
message,
|
||
self.strings("error").format(
|
||
"Usage: .newvidpack <short_name> <name>"
|
||
),
|
||
)
|
||
return
|
||
|
||
if not reply or not reply.media:
|
||
await utils.answer(
|
||
message, self.strings("error").format("Reply to a gif is required")
|
||
)
|
||
return
|
||
|
||
shortname, name = args
|
||
|
||
message = await utils.answer(message, self.strings("processing"))
|
||
|
||
stick = await self.prepare_vid(reply)
|
||
assert stick
|
||
|
||
async with self._client.conversation("@stickers") as conv:
|
||
try:
|
||
m = await conv.send_message("/cancel")
|
||
r = await conv.get_response()
|
||
|
||
await m.delete()
|
||
await r.delete()
|
||
|
||
m = await conv.send_message("/newvideo")
|
||
r = await conv.get_response()
|
||
|
||
if (
|
||
"A new set" not in r.raw_text
|
||
and "Создается новый набор видеостикеров." not in r.raw_text
|
||
):
|
||
raise HikariException(
|
||
"UNEXPECTED_ANSWER - Error while creating pack"
|
||
)
|
||
|
||
await m.delete()
|
||
await r.delete()
|
||
|
||
m = await conv.send_message(name)
|
||
r = await conv.get_response()
|
||
|
||
if (
|
||
"Now send me the vide" not in r.raw_text
|
||
and "Теперь пришлите, пожалуйста, будущий стикер — файл в формате .WEBM"
|
||
not in r.raw_text
|
||
):
|
||
raise HikariException(
|
||
"UNEXPECTED_ANSWER - Error while entering name"
|
||
)
|
||
|
||
await m.delete()
|
||
await r.delete()
|
||
|
||
m = await conv.send_file(stick, force_document=True)
|
||
r = await conv.get_response()
|
||
|
||
if (
|
||
"Now send me an emoji" not in r.raw_text
|
||
and "Пожалуйста, отправьте мне новый смайл, который соответствует этому видеостикеру."
|
||
not in r.raw_text
|
||
):
|
||
raise HikariException(
|
||
"UNEXPECTED_ANSWER - Error while sending file"
|
||
)
|
||
|
||
await m.delete()
|
||
await r.delete()
|
||
|
||
m = await conv.send_message("🔆")
|
||
r = await conv.get_response()
|
||
|
||
if "/publish" not in r.raw_text:
|
||
raise HikariException("UNEXPECTED_ANSWER - No publish option")
|
||
|
||
await m.delete()
|
||
await r.delete()
|
||
|
||
m = await conv.send_message("/publish")
|
||
r = await conv.get_response()
|
||
|
||
if "/skip" not in r.raw_text:
|
||
raise HikariException("UNEXPECTED_ANSWER - Broke after /publish")
|
||
|
||
await m.delete()
|
||
await r.delete()
|
||
|
||
m = await conv.send_message("/skip")
|
||
r = await conv.get_response()
|
||
|
||
if (
|
||
"provide a short name" not in r.raw_text
|
||
and "короткое название" not in r.raw_text
|
||
):
|
||
raise HikariException("UNEXPECTED_ANSWER - Broke after /skip")
|
||
|
||
await m.delete()
|
||
await r.delete()
|
||
|
||
m = await conv.send_message(shortname)
|
||
r = await conv.get_response()
|
||
|
||
if (
|
||
"this short name is already taken" in r.raw_text
|
||
or "Увы, такой адрес уже занят." in r.raw_text
|
||
):
|
||
raise HikariException("UNEXPECTED_ANSWER - Occupied shortname")
|
||
|
||
if (
|
||
"Kaboom" not in r.raw_text
|
||
and "успешно опубликован" not in r.raw_text
|
||
):
|
||
raise HikariException(
|
||
"UNEXPECTED_ANSWER - Unknown confirmation error (Kaboom)"
|
||
)
|
||
|
||
await m.delete()
|
||
await r.delete()
|
||
except HikariException as e:
|
||
await utils.answer(message, f"🚫 <code>{e}</code>")
|
||
return
|
||
|
||
await self._client(
|
||
InstallStickerSetRequest(
|
||
stickerset=InputStickerSetShortName(shortname), archived=False
|
||
)
|
||
)
|
||
|
||
if len(self.stickersets) >= len(self.emojies):
|
||
emoji = random.choice(self.emojies)
|
||
else:
|
||
emoji = self.emojies[len(self.stickersets) + 1]
|
||
|
||
self.stickersets[shortname] = {
|
||
"title": name,
|
||
"emoji": emoji,
|
||
"alias": alias,
|
||
"video": True,
|
||
}
|
||
|
||
await utils.answer(message, self.strings("created").format(name, shortname))
|
||
|
||
@loader.command(
|
||
ru_doc="Синхронизировать стикерпаки с @stickers",
|
||
de_doc="Synchronisiere Stickerpacks mit @stickers",
|
||
tr_doc="Sticker paketlerini @stickers ile senkronize et",
|
||
hi_doc="स्टिकर सेट्स को @stickers से सिंक्रनाइज़ करें",
|
||
)
|
||
async def syncpacks(self, message: Message):
|
||
"""Sync existing stickersets with @stickers"""
|
||
q = 0
|
||
|
||
message = await utils.answer(message, self.strings("processing"))
|
||
|
||
async with self._client.conversation("@stickers") as conv:
|
||
m = await conv.send_message("/cancel")
|
||
r = await conv.get_response()
|
||
|
||
await m.delete()
|
||
await r.delete()
|
||
|
||
m = await conv.send_message("/packstats")
|
||
r = await conv.get_response()
|
||
|
||
packs = []
|
||
for row in [
|
||
[btn.text for btn in row.buttons] for row in r.reply_markup.rows
|
||
]:
|
||
for btn in row:
|
||
packs += [btn]
|
||
if btn in self.stickersets:
|
||
continue
|
||
|
||
try:
|
||
stickerset = await self._client(
|
||
GetStickerSetRequest(
|
||
stickerset=InputStickerSetShortName(btn),
|
||
hash=round(time.time()),
|
||
)
|
||
)
|
||
except Exception:
|
||
continue
|
||
|
||
if len(self.stickersets) >= len(self.emojies):
|
||
emoji = random.choice(self.emojies)
|
||
else:
|
||
emoji = self.emojies[len(self.stickersets) + 1]
|
||
|
||
self.stickersets[btn] = {
|
||
"title": stickerset.set.title,
|
||
"emoji": emoji,
|
||
"alias": None,
|
||
}
|
||
|
||
q += 1
|
||
|
||
await m.delete()
|
||
await r.delete()
|
||
|
||
m = await conv.send_message("/cancel")
|
||
r = await conv.get_response()
|
||
|
||
await m.delete()
|
||
await r.delete()
|
||
|
||
d = 0
|
||
for pack in list(self.stickersets).copy():
|
||
if pack not in packs:
|
||
self.stickersets.pop(pack)
|
||
d += 1
|
||
|
||
await utils.answer(message, self.strings("stickersets_added").format(q, d))
|
||
|
||
@loader.command(
|
||
ru_doc="Показать доступные стикерпаки",
|
||
de_doc="Zeige verfügbare Stickerpacks",
|
||
tr_doc="Mevcut sticker paketlerini göster",
|
||
hi_doc="उपलब्ध स्टिकर सेट्स दिखाएं",
|
||
)
|
||
async def packs(self, message: Message):
|
||
"""Short available stickersets"""
|
||
if not self.stickersets:
|
||
await utils.answer(message, self.strings("no_stickersets"))
|
||
return
|
||
|
||
res = self.strings("packs_header")
|
||
for shortname, info in self.stickersets.items():
|
||
alias = (
|
||
f' (<code>{utils.escape_html(info["alias"])}</code>)'
|
||
if info["alias"]
|
||
else f" (<code>{utils.escape_html(shortname)}</code>)"
|
||
)
|
||
res += (
|
||
f"{info['emoji']} <b>{utils.escape_html(info['title'])}</b> <a"
|
||
f' href="https://t.me/addstickers/{shortname}">add</a>{alias}\n'
|
||
)
|
||
|
||
await utils.answer(message, res)
|
||
|
||
@loader.command(
|
||
ru_doc="<алиас> [short_name] - Добавить или удалить алиас",
|
||
de_doc="<alias> [short_name] - Füge oder entferne einen Alias hinzu",
|
||
tr_doc="<alias> [short_name] - Alias ekle veya kaldır",
|
||
hi_doc="<alias> [short_name] - एलियास जोड़ें या हटाएं",
|
||
)
|
||
async def stickalias(self, message: Message):
|
||
"""<alias> [short_name] - Add or remove alias"""
|
||
args = utils.get_args_raw(message)
|
||
if not args:
|
||
await utils.answer(message, self.strings("no_args"))
|
||
return
|
||
|
||
args = args.split(maxsplit=1)
|
||
if len(args) == 1:
|
||
for shortname, info in self.stickersets.items():
|
||
if info["alias"] == args[0]:
|
||
self.stickersets[shortname]["alias"] = None
|
||
await utils.answer(
|
||
message, self.strings("alias_removed").format(args[0])
|
||
)
|
||
return
|
||
|
||
await utils.answer(
|
||
message, self.strings("remove_alias_404").format(args[0])
|
||
)
|
||
return
|
||
else:
|
||
alias, pack = args
|
||
if pack not in self.stickersets:
|
||
await utils.answer(message, self.strings("pack404").format(pack))
|
||
return
|
||
|
||
if any(alias == pack["alias"] for pack in self.stickersets.values()):
|
||
await utils.answer(message, self.strings("alias_exists").format(alias))
|
||
return
|
||
|
||
self.stickersets[pack]["alias"] = alias
|
||
await utils.answer(
|
||
message,
|
||
self.strings("created_alias").format(
|
||
self.stickersets[pack]["emoji"],
|
||
utils.escape_html(self.stickersets[pack]["title"]),
|
||
alias,
|
||
),
|
||
)
|
||
|
||
@loader.command(
|
||
ru_doc="<short_name|алиас> - Установить стандартный стикерпак",
|
||
de_doc="<short_name|alias> - Setze das Standard-Stickerpack",
|
||
tr_doc="<short_name|alias> - Standart sticker paketini ayarla",
|
||
hi_doc="<short_name|alias> - डिफ़ॉल्ट स्टिकर सेट को सेट करें",
|
||
)
|
||
async def stickdef(self, message: Message):
|
||
"""<short_name|alias> - Set default stickerpack"""
|
||
args = utils.get_args_raw(message)
|
||
pack = self.find(args)
|
||
if not pack:
|
||
await utils.answer(message, self.strings("pack404").format(args))
|
||
return
|
||
|
||
self.default = pack["shortname"]
|
||
self.set("default", self.default)
|
||
await utils.answer(
|
||
message,
|
||
self.strings("default").format(
|
||
pack["emoji"],
|
||
utils.escape_html(pack["title"]),
|
||
),
|
||
)
|
||
|
||
@loader.command(
|
||
ru_doc="<short_name|алиас> - Удалить стикерпак",
|
||
de_doc="<short_name|alias> - Entferne das Stickerpack",
|
||
tr_doc="<short_name|alias> - Sticker paketini kaldır",
|
||
hi_doc="<short_name|alias> - स्टिकर सेट हटाएं",
|
||
)
|
||
async def rmpack(self, message: Message):
|
||
"""<short_name|alias> - Remove stickerpack"""
|
||
args = utils.get_args_raw(message)
|
||
pack = self.find(args)
|
||
if not pack:
|
||
await utils.answer(message, self.strings("pack404").format(args))
|
||
return
|
||
|
||
try:
|
||
await self._client(
|
||
UninstallStickerSetRequest(
|
||
stickerset=InputStickerSetShortName(pack["shortname"])
|
||
)
|
||
)
|
||
except RPCError:
|
||
await utils.answer(
|
||
message, self.strings("pack404").format(pack["shortname"])
|
||
)
|
||
return
|
||
|
||
await self.inline.form(
|
||
message=message,
|
||
text=self.strings("confirm_remove").format(pack["title"]),
|
||
reply_markup=[
|
||
{
|
||
"text": self.strings("remove"),
|
||
"callback": self._remove,
|
||
"args": (pack,),
|
||
},
|
||
{"text": self.strings("cancel"), "action": "close"},
|
||
],
|
||
)
|
||
|
||
async def _remove(self, call: InlineCall, pack: dict):
|
||
call = await utils.answer(call, self.strings("deleting_pack"))
|
||
|
||
async with self._client.conversation("@stickers") as conv:
|
||
try:
|
||
m = await conv.send_message("/cancel")
|
||
r = await conv.get_response()
|
||
|
||
await m.delete()
|
||
await r.delete()
|
||
|
||
m = await conv.send_message("/delpack")
|
||
r = await conv.get_response()
|
||
|
||
if (
|
||
"Choose the sticker set" not in r.raw_text
|
||
and "который хотите удалить" not in r.raw_text
|
||
):
|
||
raise HikariException("UNEXPECTED_ANSWER - After /delpack")
|
||
|
||
await m.delete()
|
||
await r.delete()
|
||
|
||
m = await conv.send_message(pack["shortname"])
|
||
r = await conv.get_response()
|
||
|
||
if (
|
||
"you selected the set" not in r.raw_text
|
||
and "Вы выбрали набор" not in r.raw_text
|
||
):
|
||
raise HikariException("UNEXPECTED_ANSWER - After sending shortname")
|
||
|
||
await m.delete()
|
||
await r.delete()
|
||
|
||
if "Вы выбрали набор" not in r.raw_text:
|
||
m = await conv.send_message("Yes, I am totally sure.")
|
||
else:
|
||
m = await conv.send_message("Да, удалите этот набор.")
|
||
|
||
r = await conv.get_response()
|
||
|
||
if (
|
||
"The sticker set is gon" not in r.raw_text
|
||
and "Набор стикеров был удален." not in r.raw_text
|
||
):
|
||
raise HikariException(
|
||
"UNEXPECTED_ANSWER - Confirmation did not work"
|
||
)
|
||
|
||
await m.delete()
|
||
await r.delete()
|
||
except HikariException as e:
|
||
await utils.answer(call, f"🚫 <code>{e}</code>")
|
||
return
|
||
|
||
self.stickersets.pop(pack["shortname"])
|
||
await utils.answer(
|
||
call,
|
||
self.strings("packremoved").format(
|
||
pack["emoji"],
|
||
utils.escape_html(pack["title"]),
|
||
),
|
||
)
|
||
|
||
@loader.command(
|
||
ru_doc="<реплай> - Удалить стикер из пака",
|
||
de_doc="<reply> - Entferne den Sticker aus dem Pack",
|
||
tr_doc="<reply> - Sticker paketinden çıkar",
|
||
hi_doc="<reply> - स्टिकर को स्टिकर सेट से हटाएं",
|
||
)
|
||
async def unstick(self, message: Message):
|
||
"""<reply> - Remove sticker from pack"""
|
||
reply = await message.get_reply_message()
|
||
if not reply:
|
||
await utils.answer(message, self.strings("error").format("Reply required"))
|
||
return
|
||
|
||
async with self._client.conversation("@stickers") as conv:
|
||
try:
|
||
m = await conv.send_message("/cancel")
|
||
r = await conv.get_response()
|
||
|
||
await m.delete()
|
||
await r.delete()
|
||
|
||
m = await conv.send_message("/delsticker")
|
||
r = await conv.get_response()
|
||
|
||
if not (
|
||
("Choose a sticker" in r.raw_text)
|
||
or ("Выберите нужный набор") in r.raw_text
|
||
):
|
||
raise HikariException(
|
||
"UNEXPECTED_ANSWER - Error while starting action"
|
||
)
|
||
|
||
await m.delete()
|
||
await r.delete()
|
||
|
||
m = await self._client.forward_messages(
|
||
"@stickers", [reply.id], message.peer_id
|
||
)
|
||
r = await conv.get_response()
|
||
|
||
if not (
|
||
("I have deleted that sticker" in r.raw_text)
|
||
or ("Стикер успешно удален") in r.raw_text
|
||
):
|
||
raise HikariException("UNEXPECTED_ANSWER - Sticker not deleted")
|
||
|
||
await m[0].delete()
|
||
await r.delete()
|
||
except HikariException as e:
|
||
await utils.answer(message, f"🚫 <code>{e}</code>")
|
||
return
|
||
|
||
await utils.answer(
|
||
message, self.strings("stickrm").format(random.choice(self.emojies))
|
||
)
|
||
|
||
async def remove_bg(self, image: io.BytesIO) -> io.BytesIO:
|
||
todel = []
|
||
async with self._client.conversation("@removefundobot") as conv:
|
||
m = await conv.send_file(image)
|
||
r = await conv.get_response()
|
||
|
||
if not r.document:
|
||
todel += [r]
|
||
r = await conv.get_response()
|
||
|
||
todel += [m]
|
||
todel += [r]
|
||
|
||
if not r.document:
|
||
raise RuntimeError("Unable to remove background from image")
|
||
|
||
im = io.BytesIO(await self._client.download_media(r, bytes))
|
||
|
||
for i in todel:
|
||
await i.delete()
|
||
|
||
return im
|
||
|
||
@staticmethod
|
||
async def remove_bg_api(photo: bytes) -> str:
|
||
HEADERS = {
|
||
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
|
||
"accept-encoding": "gzip, deflate, br",
|
||
"accept-language": "en-US,en;q=0.9",
|
||
"cache-control": "no-cache",
|
||
"user-agent": (
|
||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
|
||
" (KHTML, like Gecko) Chrome/92.0.4515.131 Safari/537.36"
|
||
),
|
||
}
|
||
|
||
response1 = await utils.run_sync(
|
||
requests.get,
|
||
"https://icons8.com/bgremover",
|
||
headers=HEADERS,
|
||
)
|
||
_jwt_token = response1.cookies.get("i8remover")
|
||
response2 = await utils.run_sync(
|
||
requests.post,
|
||
"https://api-bgremover-origin.icons8.com/api/frontend/v1/batches",
|
||
headers={"Authorization": f"Bearer {_jwt_token}"},
|
||
cookies=response1.cookies,
|
||
)
|
||
_id = response2.json()["id"]
|
||
|
||
fields = {
|
||
"image": ("hikka.jpg", photo, "image/jpeg"),
|
||
}
|
||
|
||
boundary = "----WebKitFormBoundary" + utils.rand(16)
|
||
m = MultipartEncoder(fields=fields, boundary=boundary)
|
||
|
||
await utils.run_sync(
|
||
requests.post,
|
||
f"https://api-bgremover-origin.icons8.com/api/frontend/v1/batches/{_id}",
|
||
headers={
|
||
"Authorization": f"Bearer {_jwt_token}",
|
||
"content-type": m.content_type,
|
||
"connection": "keep-alive",
|
||
},
|
||
data=m,
|
||
cookies=response1.cookies,
|
||
)
|
||
|
||
url = ".jpg"
|
||
|
||
for _ in range(3):
|
||
if not url.endswith(".jpg"):
|
||
break
|
||
|
||
await asyncio.sleep(2)
|
||
|
||
response4 = await utils.run_sync(
|
||
requests.get,
|
||
"https://icons8.com/bgremover",
|
||
headers=HEADERS,
|
||
cookies=response1.cookies,
|
||
)
|
||
|
||
url = re.search(
|
||
r'Processed transparent png file" src="(.*?)"',
|
||
response4.text,
|
||
).group(1)
|
||
|
||
url = url.replace("&", "&")
|
||
|
||
return io.BytesIO(
|
||
(
|
||
await utils.run_sync(
|
||
requests.get,
|
||
url,
|
||
cookies=response1.cookies,
|
||
headers=HEADERS,
|
||
)
|
||
).content
|
||
)
|
||
|
||
@loader.command(
|
||
ru_doc=(
|
||
"[эмодзи] [short_name|алиам] [-o - добавить окантовку] [-r - убрать фон]"
|
||
" [-q - Не добавлять в пак, а просто отправить стикер] [-t <текст> -"
|
||
" наложить текст] - Добавить стикер \ картинку в пак. Если не указано в"
|
||
" какой, будет использован стандартный\nПример:\n.stick mypack -o -r -q -t"
|
||
" Привет, мир!"
|
||
),
|
||
de_doc=(
|
||
"[emoji] [short_name|alias] [-o - Rahmen hinzufügen] [-r - Hintergrund"
|
||
" entfernen] [-q - Fügen Sie kein Paket hinzu, sondern senden Sie nur"
|
||
" Sticker] [-t <text> - Text hinzufügen] - Fügen Sie einen Sticker oder"
|
||
" ein Bild zu einem Paket hinzu. Wenn nicht angegeben, wird der Standard"
|
||
" verwendet\nBeispiel:\n.stick mypack -o -r -q -t Hallo Welt!"
|
||
),
|
||
tr_doc=(
|
||
"[emoji] [short_name|alias] [-o - çerçeve ekle] [-r - arka planı kaldır]"
|
||
" [-q - Paket eklemeyin, sadece çubuk gönderin] [-t <metin> - metin"
|
||
" ekle] - Bir pakete bir çubuk veya resim ekleyin. Belirtilmediyse"
|
||
" varsayılan olarak kullanılır\nÖrnek:\n.stick mypack -o -r -q -t Merhaba"
|
||
" dünya!"
|
||
),
|
||
hi_doc=(
|
||
"[emoji] [short_name|alias] [-o - फ़्रेम जोड़ें] [-r - पृष्ठभूमि निकालें]"
|
||
" [-q - पैकेज न जोड़ें, केवल स्टिकर भेजें] [-t <पाठ> - पाठ जोड़ें] -"
|
||
" एक पैकेज में एक स्टिकर या छवि जोड़ें। निर्दिष्ट नहीं किया गया तो"
|
||
" डिफ़ॉल्ट उपयोग किया जाएगा\nउदाहरण:\n.stick mypack -o -r -q -t नमस्ते"
|
||
" दुनिया!"
|
||
),
|
||
)
|
||
async def stick(self, message: Message):
|
||
"""[emoji] [short_name|alias] [-o - add outline] [-r - remove background] [-q - Do not add sticker to pack, just send it] [-t <text> - add text] - Add sticker to pack. If not specified - default
|
||
Example:
|
||
.stick mypack -o -r -q -t Hello world!"""
|
||
if not self.stickersets:
|
||
await utils.answer(message, self.strings("no_stickersets"))
|
||
return
|
||
|
||
args = utils.get_args_raw(message)
|
||
|
||
args = f" {args} "
|
||
|
||
if " -o" in args:
|
||
args = args.replace(" -o", "")
|
||
outline = True
|
||
else:
|
||
outline = False
|
||
|
||
if " -r" in args:
|
||
args = args.replace(" -r", "")
|
||
remove_bg = True
|
||
else:
|
||
remove_bg = False
|
||
|
||
if " -q" in args:
|
||
args = args.replace(" -q", "")
|
||
quiet = True
|
||
else:
|
||
quiet = False
|
||
|
||
if " -t" in args:
|
||
text = args[args.index(" -t") + 3 :]
|
||
args = args[: args.index(" -t")]
|
||
else:
|
||
text = None
|
||
|
||
args = args.strip()
|
||
|
||
reply = await message.get_reply_message()
|
||
if not reply or not reply.media:
|
||
await utils.answer(
|
||
message,
|
||
self.strings("error").format("Reply to sticker required"),
|
||
)
|
||
return
|
||
|
||
if not quiet:
|
||
pack, emoji = None, None
|
||
if len(args.split()) > 1:
|
||
pack = self.find(args.split(maxsplit=1)[1])
|
||
if pack:
|
||
emoji = args.split(maxsplit=1)[0]
|
||
else:
|
||
pack = self.find(args)
|
||
if not pack:
|
||
await utils.answer(
|
||
message, self.strings("pack404").format(args)
|
||
)
|
||
return
|
||
|
||
elif args:
|
||
pack = self.find(args)
|
||
if not pack:
|
||
pack = self.find(self.default)
|
||
emoji = args
|
||
|
||
if not pack:
|
||
pack = self.find(self.default)
|
||
|
||
if not pack:
|
||
await utils.answer(
|
||
message, self.strings("error").format("Default pack doesn't exist")
|
||
)
|
||
return
|
||
|
||
if not emoji or not "".join(distinct_emoji_list(emoji)):
|
||
emoji = pack["emoji"]
|
||
|
||
emoji = "".join(distinct_emoji_list(emoji))
|
||
|
||
if getattr(getattr(reply.media, "document", None), "mime_type", "").startswith(
|
||
"video"
|
||
):
|
||
if quiet or outline or remove_bg or text:
|
||
await utils.answer(
|
||
message,
|
||
self.strings("error").format(
|
||
"Arguments for video stickers are not supported"
|
||
),
|
||
)
|
||
return
|
||
|
||
if "video" not in pack:
|
||
pack = [
|
||
self.find(_) for _, p in self.stickersets.items() if "video" in p
|
||
]
|
||
if not pack:
|
||
await utils.answer(
|
||
message,
|
||
self.strings("error").format(
|
||
"Can't add video sticker - no video stickersets"
|
||
),
|
||
)
|
||
return
|
||
|
||
pack = pack[0]
|
||
|
||
message = await utils.answer(message, self.strings("processing_gif"))
|
||
stick = await self.prepare_vid(reply)
|
||
else:
|
||
message = await utils.answer(message, self.strings("processing"))
|
||
stick = await self.prepare(message, reply, outline, remove_bg, text, quiet)
|
||
|
||
if quiet:
|
||
file = io.BytesIO()
|
||
Image.open(io.BytesIO(stick)).save(file, "webp")
|
||
file.name = "sticker.webp"
|
||
file.seek(0)
|
||
await self._client.send_file(
|
||
message.peer_id,
|
||
file,
|
||
reply_to=message.reply_to_msg_id,
|
||
)
|
||
if message.out:
|
||
await message.delete()
|
||
|
||
return
|
||
|
||
async with self._client.conversation("@stickers") as conv:
|
||
try:
|
||
m = await conv.send_message("/cancel")
|
||
r = await conv.get_response()
|
||
|
||
await m.delete()
|
||
await r.delete()
|
||
|
||
m = await conv.send_message("/addsticker")
|
||
r = await conv.get_response()
|
||
|
||
if (
|
||
"Choose a sticker set" not in r.raw_text
|
||
and "Выберите набор стикеров." not in r.raw_text
|
||
):
|
||
raise HikariException(
|
||
"UNEXPECTED_ANSWER - Error while starting action"
|
||
)
|
||
|
||
await m.delete()
|
||
await r.delete()
|
||
|
||
m = await conv.send_message(pack["shortname"])
|
||
r = await conv.get_response()
|
||
|
||
if "Alright!" not in r.raw_text and "будущий стикер" not in r.raw_text:
|
||
raise HikariException(
|
||
"UNEXPECTED_ANSWER - Error while choosing pack"
|
||
)
|
||
|
||
await m.delete()
|
||
await r.delete()
|
||
|
||
m = await conv.send_file(stick, force_document=True)
|
||
r = await conv.get_response()
|
||
|
||
if (
|
||
"Now send me an emoji" not in r.raw_text
|
||
and "Пожалуйста, пришлите смайл" not in r.raw_text
|
||
):
|
||
raise HikariException(
|
||
"UNEXPECTED_ANSWER - Error while sending file"
|
||
)
|
||
|
||
await m.delete()
|
||
await r.delete()
|
||
|
||
m = await conv.send_message(emoji)
|
||
r = await conv.get_response()
|
||
|
||
if (
|
||
"There we go" not in r.raw_text
|
||
and "Стикер был добавлен в набор" not in r.raw_text
|
||
):
|
||
raise HikariException(
|
||
"UNEXPECTED_ANSWER - Error while sending emoji"
|
||
)
|
||
|
||
await m.delete()
|
||
await r.delete()
|
||
|
||
m = await conv.send_message("/done")
|
||
r = await conv.get_response()
|
||
|
||
if "OK" not in r.raw_text and "Готово!" not in r.raw_text:
|
||
raise HikariException("UNEXPECTED_ANSWER - Error on confirmation")
|
||
|
||
await m.delete()
|
||
await r.delete()
|
||
except HikariException as e:
|
||
await utils.answer(message, f"🚫 <code>{e}</code>")
|
||
return
|
||
|
||
await utils.answer(
|
||
message,
|
||
self.strings("kang").format(pack["shortname"]),
|
||
)
|
||
|
||
@loader.command(
|
||
ru_doc="Очистить недавно использованные стикеры",
|
||
de_doc="Löscht kürzlich verwendete Sticker",
|
||
tr_doc="Son kullanılan stickerları temizler",
|
||
hi_doc="हाल ही में उपयोग किए गए स्टिकर्स को साफ़ करें",
|
||
)
|
||
async def rmrecent(self, message: Message):
|
||
"""Clear recently used stickers"""
|
||
await self._client(ClearRecentStickersRequest(attached=False))
|
||
await utils.answer(message, self.strings("cleaned"))
|