# Proprietary License Agreement
# Copyright (c) 2026-2029 CodWiz
# Permission is hereby granted to any person obtaining a copy of this software and associated documentation files (the "Software"), to use the Software for personal and non-commercial purposes, subject to the following conditions:
# 1. The Software may not be modified, altered, or otherwise changed in any way without the explicit written permission of the author.
# 2. Redistribution of the Software, in original or modified form, is strictly prohibited without the explicit written permission of the author.
# 3. The Software is provided "as is", without warranty of any kind, express or implied, including but not limited to the warranties of merchantability, fitness for a particular purpose, and non-infringement. In no event shall the author or copyright holder be liable for any claim, damages, or other liability, whether in an action of contract, tort, or otherwise, arising from, out of, or in connection with the Software or the use or other dealings in the Software.
# 4. Any use of the Software must include the above copyright notice and this permission notice in all copies or substantial portions of the Software.
# 5. By using the Software, you agree to be bound by the terms and conditions of this license.
# For any inquiries or requests for permissions, please contact codwiz@yandex.ru.
# ---------------------------------------------------------------------------------
# Name: TelegraphComics
# Description: Create comics on Telegraph from ZIP/RAR archives
# Author: @hikka_mods
# ---------------------------------------------------------------------------------
# meta developer: @hikka_mods
# requires: aiohttp, zipfile, telegraph
# ---------------------------------------------------------------------------------
import asyncio
import logging
import os
import tempfile
from typing import List, Optional
import zipfile
import aiohttp
from telethon.types import MessageMediaDocument, Message
from telegraph import Telegraph
from .. import loader, utils
logger = logging.getLogger(__name__)
@loader.tds
class TelegraphComicMod(loader.Module):
"""Create comics on Telegraph from ZIP/CBZ/RAR archives"""
strings = {
"name": "TelegraphComic",
"invalid_args": "❌ Invalid arguments. Usage: .telegraphcomics
| (optional)",
"no_reply": "❌ Reply to a message with ZIP/CBZ/RAR file",
"unsupported_format": "❌ Unsupported file format. Only ZIP/CBZ/RAR files are supported",
"processing": "⏳ Processing archive...",
"uploading": "⏳ Uploading images...",
"creating_article": "⏳ Creating Telegraph article...",
"archive_extracted": "📦 Archive successfully extracted: ✅",
"upload_files": "📦 Upload image files:",
"creating_telegraph": "📝 Creating Telegraph article:",
"success": '✅ Telegraph article created!\n\n📦 Archive successfully extracted: ✅\n\n📦 Upload image files:\n{upload_status}\n\n📝 Creating Telegraph article:\n{article_status}\n\n🔗 {url}',
"error": "❌ Error: {}",
"_cls_doc": "Create comics on Telegraph from ZIP/CBZ/RAR archives",
}
strings_ru = {
"_cls_doc": "Создание комиксов на Telegraph из ZIP/CBZ/RAR архивов",
"invalid_args": "❌ Неверные аргументы. Использование: .telegraphcomics <название> | <ссылка_на_обложку>(необязательно)",
"no_reply": "❌ Ответьте на сообщение с ZIP/CBZ/RAR файлом",
"unsupported_format": "❌ Неподдерживаемый формат. Только ZIP/CBZ/RAR файлы",
"processing": "⏳ Обработка архива...",
"uploading": "⏳ Загрузка изображений...",
"creating_article": "⏳ Создание Telegraph статьи...",
"archive_extracted": "📦 Архив успешно распакован: ✅",
"upload_files": "📦 Загрузка файлов изображений:",
"creating_telegraph": "📝 Создание Telegraph статьи:",
"success": '✅ Telegraph статья создана!\n\n📦 Архив успешно распакован: ✅\n\n📦 Загрузка файлов изображений:\n{upload_status}\n\n📝 Создание Telegraph статьи:\n{article_status}\n\n🔗 {url}',
"error": "❌ Ошибка: {}",
"available_services": "Доступные сервисы: catbox, bashupload, kappa, x0, tmpfiles, pomf",
"current_service": "Текущий сервис: {}",
"invalid_service": "❌ Неизвестный сервис: {}\n\n{}",
}
def __init__(self):
self.config = loader.ModuleConfig(
loader.ConfigValue(
"upload_service",
"catbox",
"Upload service to use",
validator=loader.validators.Choice(
["catbox", "bashupload", "kappa", "x0", "tmpfiles", "pomf"]
),
),
loader.ConfigValue(
"short_name",
"HikkaMods",
"short name for the article",
validator=loader.validators.String(),
),
loader.ConfigValue(
"author_name",
"HikkaMods",
"nickname of the author of the article",
validator=loader.validators.String(),
),
loader.ConfigValue(
"author_url",
"https://t.me/hikka_mods",
"link to author",
validator=loader.validators.String(),
),
)
async def client_ready(self, client, db):
self.client = client
self.db = db
self.telegraph = Telegraph()
self.telegraph.create_account(
short_name=self.config["short_name"],
author_name=self.config["author_name"],
author_url=self.config["author_url"],
)
async def _upload_file_to_service(
self,
session: aiohttp.ClientSession,
url: str,
file_path: str,
field_name: str,
**extra_fields,
) -> Optional[str]:
"""Generic file upload method"""
try:
with open(file_path, "rb") as f:
data = aiohttp.FormData()
data.add_field(field_name, f, filename=os.path.basename(file_path))
for key, value in extra_fields.items():
data.add_field(key, value)
async with session.post(url, data=data) as response:
if response.status == 200:
result = await response.text()
return result.strip() if result else None
else:
logger.info(
f"Upload failed with status {response.status}: {await response.text()}"
)
except Exception as e:
logger.info(f"Error uploading to {url}: {e}")
return None
async def upload_to_catbox(self, file_path: str) -> Optional[str]:
"""Upload file to catbox.moe"""
async with aiohttp.ClientSession() as session:
result = await self._upload_file_to_service(
session,
"https://catbox.moe/user/api.php",
file_path,
"fileToUpload",
reqtype="fileupload",
)
return (
result
if result and result.startswith("https://files.catbox.moe/")
else None
)
async def upload_to_bashupload(self, file_path: str) -> Optional[str]:
"""Upload file to bashupload.com"""
async with aiohttp.ClientSession() as session:
try:
with open(file_path, "rb") as f:
data = aiohttp.FormData()
data.add_field("file", f, filename=os.path.basename(file_path))
async with session.post(
"https://bashupload.com", data=data
) as response:
if response.status == 200:
result = await response.text()
lines = result.strip().split("\n")
for line in lines:
if line.startswith("https://"):
return line
if "wget" in result:
urls = [
line
for line in result.split("\n")
if "wget" in line
]
if urls:
parts = urls[0].split()
for part in parts:
if part.startswith("https://"):
return part
except Exception as e:
logger.info(f"Error uploading to bashupload: {e}")
return None
async def upload_to_kappa(self, file_path: str) -> Optional[str]:
"""Upload file to kappa.lol"""
async with aiohttp.ClientSession() as session:
try:
with open(file_path, "rb") as f:
data = aiohttp.FormData()
data.add_field("file", f, filename=os.path.basename(file_path))
async with session.post(
"https://kappa.lol/api/upload", data=data
) as response:
if response.status == 200:
result = await response.json()
if result and "id" in result:
return f"https://kappa.lol/{result['id']}"
except Exception as e:
logger.info(f"Error uploading to kappa: {e}")
return None
async def upload_to_x0(self, file_path: str) -> Optional[str]:
"""Upload file to x0.at"""
async with aiohttp.ClientSession() as session:
try:
with open(file_path, "rb") as f:
data = aiohttp.FormData()
data.add_field("file", f, filename=os.path.basename(file_path))
async with session.post("https://x0.at", data=data) as response:
if response.status == 200:
result = await response.text()
return (
result.strip()
if result and "https://" in result
else None
)
except Exception as e:
logger.info(f"Error uploading to x0: {e}")
return None
async def upload_to_tmpfiles(self, file_path: str) -> Optional[str]:
"""Upload file to tmpfiles.org"""
async with aiohttp.ClientSession() as session:
try:
with open(file_path, "rb") as f:
data = aiohttp.FormData()
data.add_field("file", f, filename=os.path.basename(file_path))
async with session.post(
"https://tmpfiles.org/api/v1/upload", data=data
) as response:
if response.status == 200:
result = await response.json()
if result and "data" in result and "url" in result["data"]:
return result["data"]["url"]
except Exception as e:
logger.info(f"Error uploading to tmpfiles: {e}")
return None
async def upload_to_pomf(self, file_path: str) -> Optional[str]:
"""Upload file to pomf.lain.la"""
async with aiohttp.ClientSession() as session:
try:
with open(file_path, "rb") as f:
data = aiohttp.FormData()
data.add_field("files[]", f, filename=os.path.basename(file_path))
async with session.post(
"https://pomf.lain.la/upload.php", data=data
) as response:
if response.status == 200:
result = await response.json()
if result and "files" in result and result["files"]:
return result["files"][0].get("url")
except Exception as e:
logger.info(f"Error uploading to pomf: {e}")
return None
async def upload_file(self, file_path: str) -> Optional[str]:
"""Upload file to selected service"""
service_name = self.config["upload_service"]
service_map = {
"catbox": self.upload_to_catbox,
"bashupload": self.upload_to_bashupload,
"kappa": self.upload_to_kappa,
"x0": self.upload_to_x0,
"tmpfiles": self.upload_to_tmpfiles,
"pomf": self.upload_to_pomf,
}
service_func = service_map.get(service_name)
if not service_func:
return await self.upload_to_catbox(file_path)
try:
result = await service_func(file_path)
return result
except Exception as e:
logger.error(f"Upload to {service_name} failed: {e}")
return None
async def extract_zip_archive(self, zip_path: str, extract_dir: str) -> List[str]:
"""Extract ZIP archive and return sorted list of image files"""
image_extensions = {".png", ".jpg", ".jpeg", ".gif", ".bmp", ".webp", ".avif"}
image_files = []
try:
with zipfile.ZipFile(zip_path, "r") as zip_ref:
zip_ref.extractall(extract_dir)
for root, _, files in os.walk(extract_dir):
for file in files:
if os.path.splitext(file)[1].lower() in image_extensions:
image_files.append(os.path.join(root, file))
image_files.sort(key=lambda x: os.path.basename(x).lower())
except Exception as e:
logger.info(f"Error extracting ZIP archive: {e}")
return image_files
async def create_telegraph_article(
self, title: str, image_urls: List[str], cover_url: Optional[str] = None
) -> Optional[str]:
"""Create Telegraph article with images"""
try:
if cover_url:
content = f'
'
content += "
".join(f'
' for url in image_urls)
else:
content = "
".join(f'
' for url in image_urls)
response = await asyncio.to_thread(
lambda: self.telegraph.create_page(
title=title,
html_content=content,
author_name=self.config["author_name"],
author_url=self.config["author_url"],
)
)
return response["url"]
except Exception as e:
logger.info(f"Error creating Telegraph article: {e}")
return None
async def _process_cover_url(self, cover_url: str) -> Optional[str]:
"""Process cover URL - handle Telegram message links and direct URLs"""
if not cover_url:
return None
cover_url = cover_url.strip()
if "t.me/" in cover_url and "/" in cover_url.split("t.me/")[1]:
try:
parts = cover_url.split("/")
if len(parts) >= 4:
chat_username = parts[-3]
message_id = int(parts[-1])
message = await self.client.get_messages(
chat_username, ids=message_id
)
if message and message.media:
media_path = await message.download_media()
if media_path:
uploaded_url = await self.upload_file(media_path)
os.remove(media_path)
return uploaded_url
except Exception as e:
logger.info(f"Error processing Telegram cover link: {e}")
return cover_url
return cover_url
async def _process_comics_request(self, message, create_func) -> None:
"""Common logic for processing comics requests"""
args = utils.get_args_raw(message)
reply = await message.get_reply_message()
if not args or not reply:
await utils.answer(message, self.strings["invalid_args"])
return
if not isinstance(reply.media, MessageMediaDocument):
await utils.answer(message, self.strings["no_reply"])
return
if "|" in args:
title, cover_url = args.split("|", 1)
else:
title = args
cover_url = None
title = title.strip()
cover_url = (
await self._process_cover_url(cover_url.strip()) if cover_url else None
)
await utils.answer(message, self.strings["processing"])
file_path = await reply.download_media()
if not file_path:
await utils.answer(
message, self.strings["error"].format("Failed to download file")
)
return
try:
if not (file_path.lower().endswith((".zip", ".cbz"))):
await utils.answer(message, self.strings["unsupported_format"])
return
with tempfile.TemporaryDirectory() as temp_dir:
archive_path = file_path
if file_path.lower().endswith(".cbz"):
import shutil
zip_path = file_path[:-4] + ".zip"
shutil.copy2(file_path, zip_path)
archive_path = zip_path
image_files = await self.extract_zip_archive(archive_path, temp_dir)
if archive_path != file_path and os.path.exists(archive_path):
os.remove(archive_path)
if not image_files:
await utils.answer(
message,
self.strings["error"].format("No images found in archive"),
)
return
await utils.answer(message, self.strings["archive_extracted"])
await utils.answer(message, self.strings["uploading"])
upload_tasks = [self.upload_file(img_file) for img_file in image_files]
upload_results = await asyncio.gather(
*upload_tasks, return_exceptions=True
)
image_urls = []
failed_uploads = 0
upload_errors = []
upload_status_lines = []
for i, (img_file, result) in enumerate(
zip(image_files, upload_results)
):
filename = os.path.basename(img_file)
if isinstance(result, Exception):
error_str = str(result)
logger.info(f"Upload failed: {error_str}")
failed_uploads += 1
upload_errors.append(error_str)
upload_status_lines.append(
f"{filename} - ❌"
)
elif result and "https://" in result:
image_urls.append(result)
upload_status_lines.append(
f"{filename} - ✅"
)
else:
failed_uploads += 1
upload_errors.append("Invalid response from upload service")
upload_status_lines.append(
f"{filename} - ❌"
)
if not image_urls:
error_details = []
error_details.append(f"Failed uploads: {failed_uploads}")
if upload_errors:
unique_errors = list(set(upload_errors))[:3]
error_details.append("Errors: " + "; ".join(unique_errors))
error_msg = " | ".join(error_details)
await utils.answer(
message,
self.strings["error"].format(error_msg),
)
return
upload_status = (
self.strings["upload_files"] + "\n" + "\n".join(upload_status_lines)
)
await utils.answer(message, upload_status)
await utils.answer(message, self.strings["creating_article"])
article_url = await create_func(title, image_urls, cover_url)
if article_url:
article_status_lines = []
for i, (img_file, url) in enumerate(zip(image_files, image_urls)):
filename = os.path.basename(img_file)
article_status_lines.append(
f"{filename} - ✅"
)
upload_status = "\n".join(upload_status_lines)
article_status = "\n".join(article_status_lines)
await utils.answer(
message,
self.strings["success"].format(
upload_status=upload_status,
article_status=article_status,
url=article_url,
),
)
else:
await utils.answer(
message,
self.strings["error"].format("Failed to create article"),
)
except Exception as e:
await utils.answer(
message,
self.strings["error"].format(f"Processing error: {e}"),
)
finally:
if os.path.exists(file_path):
os.remove(file_path)
@loader.command(
ru_doc="Создать комикс на Telegraph из ZIP/CBZ/RAR архива\nАргументы: <название> | <ссылка_на_обложку>(необязательно)\nИспользование: .telegraphcomics | (optional)",
en_doc="Create Telegraph comic from ZIP/CBZ/RAR archive\nArguments: | (optional)\nUsage: .telegraphcomics | (optional)",
)
async def telegraphcomicscmd(self, message):
await self._process_comics_request(message, self.create_telegraph_article)