Files
limoka/fajox1/famods/cocoon.py
2026-03-03 01:27:55 +00:00

554 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# █▀▀ ▄▀█   █▀▄▀█ █▀█ █▀▄ █▀
# █▀░ █▀█   █░▀░█ █▄█ █▄▀ ▄█
# https://t.me/famods
# 🔒 Licensed under the GNU AGPLv3
# 🌐 https://www.gnu.org/licenses/agpl-3.0.html
# ---------------------------------------------------------------------------------
# Name: Cocoon [BETA]
# Description: Взаимодействие с Cocoon от HikkaHost
# meta developer: @FAmods & @vsecoder_m
# meta banner: https://github.com/FajoX1/FAmods/blob/main/assets/banners/cocoon.png?raw=true
# requires: openai httpx aiohttp bs4 markdown
# ---------------------------------------------------------------------------------
import re
import html
import uuid
import httpx
import asyncio
import logging
import markdown
from openai import AsyncOpenAI
from typing import Optional, Any
from dataclasses import dataclass
from bs4 import BeautifulSoup, NavigableString
from datetime import datetime, timezone, timedelta
from telethon.tl.types import User
from telethon.errors import MessageNotModifiedError
from aiogram.exceptions import TelegramBadRequest
from .. import loader, utils
from ..inline.types import InlineCall
logger = logging.getLogger(__name__)
TG_ALLOWED = {
"b",
"strong",
"i",
"em",
"u",
"ins",
"s",
"strike",
"del",
"a",
"code",
"pre",
"blockquote",
"emoji",
"tg-emoji",
}
TAG_MAP = {"strong": "b", "em": "i", "del": "s", "strike": "s", "ins": "u"}
@dataclass(frozen=True)
class Usage:
spent_nano: int
spent_ton: str
tokens_spent: int
free_tokens_remaining: int
free_tokens_reset_at: Optional[int]
updated_at: Optional[int]
def _now_utc() -> datetime:
return datetime.now(timezone.utc)
def _safe_int(v: Any, default: int = 0) -> int:
try:
return int(v)
except Exception:
return default
def _escape_text(s: str) -> str:
return html.escape(s or "", quote=False)
def _days_until_reset(reset_ts: Optional[int], cycle_days: int = 30) -> int:
"""
Cocoon может отдавать free_tokens_reset_at как момент сброса текущего периода,
который уже мог произойти сегодня. Тогда следующий сброс = reset_ts + cycle_days.
"""
if not reset_ts:
return cycle_days
try:
now = _now_utc()
target = datetime.fromtimestamp(int(reset_ts), tz=timezone.utc)
if target <= now:
target = target + timedelta(days=cycle_days)
delta = target - now
if delta.total_seconds() <= 0:
return 0
return max(delta.days + (1 if delta.seconds > 0 else 0), 0)
except Exception:
return cycle_days
def _normalize_reset_ts(reset_at: Optional[int]) -> Optional[int]:
if not reset_at:
return None
reset_at = _safe_int(reset_at, 0)
if reset_at <= 0:
return None
return reset_at
def _format_compact(n: int) -> str:
n = int(n)
if n >= 1_000_000_000:
return f"{n / 1_000_000_000:.1f}".rstrip("0").rstrip(".") + "b"
if n >= 1_000_000:
return f"{n / 1_000_000:.1f}".rstrip("0").rstrip(".") + "m"
if n >= 1_000:
return f"{n / 1_000:.1f}".rstrip("0").rstrip(".") + "k"
return str(n)
def _percent_remaining(spent: int, total: int) -> float:
if total <= 0:
return 0.0
remaining = max(total - spent, 0)
return (remaining / total) * 100.0
def md_to_tg_html(text: str) -> str:
if not text:
return ""
raw_html = markdown.markdown(text, extensions=["fenced_code", "tables", "nl2br"])
soup = BeautifulSoup(raw_html, "html.parser")
def stringify(node, lang=None):
res = ""
for child in node.children:
if isinstance(child, NavigableString):
res += html.escape(str(child))
elif child.name:
tag_name = child.name
if tag_name in ["h1", "h2", "h3", "h4", "h5", "h6"]:
content = stringify(child)
res += f"<b>{content}</b>\n\n"
elif tag_name == "p":
res += stringify(child) + "\n\n"
elif tag_name == "br":
res += "\n"
elif tag_name == "li":
res += f"{stringify(child)}\n"
elif tag_name in ["ul", "ol"]:
res += stringify(child) + "\n"
elif tag_name == "tr":
res += "| " + stringify(child) + "\n"
elif tag_name in ["td", "th"]:
res += stringify(child) + " | "
else:
target_tag = TAG_MAP.get(tag_name, tag_name)
if target_tag in TG_ALLOWED:
inner_html = stringify(child)
if not inner_html.strip() and target_tag not in ["code", "pre"]:
res += inner_html
continue
if target_tag == "a":
href = child.get("href", "")
if href:
res += f'<a href="{html.escape(href)}">{inner_html}</a>'
else:
res += inner_html
elif target_tag == "code":
cls = child.get("class", [])
if cls and cls[0].startswith("language-"):
res += f'<code class="{cls[0]}">{inner_html}</code>'
else:
res += f"<code>{inner_html}</code>"
elif target_tag == "pre":
res += f"<pre>{inner_html}</pre>"
else:
res += f"<{target_tag}>{inner_html}</{target_tag}>"
else:
res += stringify(child)
return res
final_text = stringify(soup)
final_text = re.sub(r"\n{3,}", "\n\n", final_text)
return final_text.strip()
def repair_html_tags(html_chunk: str) -> str:
if not html_chunk:
return ""
newline_placeholder = f"MARKER_{uuid.uuid4().hex}"
protected_html = html_chunk.replace("\n", newline_placeholder)
soup = BeautifulSoup(protected_html, "html.parser")
repaired_html = soup.decode_contents(formatter=None)
final_html = repaired_html.replace(newline_placeholder, "\n")
return final_html
@loader.tds
class Cocoon(loader.Module):
"""Взаимодействие с Cocoon от HikkaHost"""
strings = {
"name": "Cocoon [BETA]",
"try_again": "<b><emoji document_id=5456307331644037599>❌</emoji> <b>Что-то пошло не так. Попробуйте снова.</b>",
"no_args": "<b><emoji document_id=5456307331644037599>❌</emoji> <b>Нужно </b><code>{}{} {}</code></b>",
"no_token": (
"<b><emoji document_id=5456307331644037599>❌</emoji> <b>Нету токена! Вставь его в <code>{}cfg cocoon</code>\n\n"
"<emoji document_id=5456672880605565619>🌘</emoji> Получить токен: @hikkahost_bot → <emoji document_id=5208521532942358129>🥚</emoji> Cocoon</b>"
),
"invalid_token_or_no_sub": (
"<b><emoji document_id=5456307331644037599>❌</emoji>Неверный токен или у вас нет подписки <emoji document_id=5188377234380954537>🌘</emoji> HikkaHost.</b>\n\n"
"<emoji document_id=5456672880605565619>🌘</emoji> Получить токен: @hikkahost_bot → <emoji document_id=5208521532942358129>🥚</emoji> Cocoon</b>"
),
"sending_request_to_cocoon": "<tg-emoji emoji-id=5197252827247841976>🐣</tg-emoji> <b>Обрабатываю запрос в Cocoon...</b>",
"thinking": (
"<tg-emoji emoji-id=5197252827247841976>🐣</tg-emoji> <b>Думаю...</b>\n\n"
"<blockquote>{thoughts}…</blockquote>"
),
"answer": (
"<tg-emoji emoji-id=5456217626957091223>🌘</tg-emoji> <b>Вопрос:</b> {question}\n\n"
"<tg-emoji emoji-id=5197252827247841976>🐣</tg-emoji> <b>Размышления:</b>\n"
"<blockquote expandable>{thoughts}</blockquote>\n\n"
"<tg-emoji emoji-id=5208521532942358129>🥚</tg-emoji> {answer}\n\n"
"<tg-emoji emoji-id=5458567764341985638>🚀</tg-emoji> <b>Модель</b>: <code>{model}</code>"
),
"usage": (
"<b><tg-emoji emoji-id=5208521532942358129>🥚</tg-emoji> Cocoon API\n\n"
"<tg-emoji emoji-id=5458805877328875335>💡</tg-emoji> Использовано:\n"
"</b><i>• {current}/{total} ({percent}% осталось)</i><b>\n\n"
"<tg-emoji emoji-id=5456591761558245861>⏳</tg-emoji> Лимит сбросится через {days} день(-ей).</b>"
),
"again_kb": "🔄 Сгенерировать ещё раз",
}
def __init__(self):
self.config = loader.ModuleConfig(
loader.ConfigValue(
"token",
None,
lambda: "Токен HikkaHost API. Получить токен: @hikkahost_bot -> 🥚 Cocoon",
validator=loader.validators.Hidden(loader.validators.String()),
),
loader.ConfigValue(
"model",
"Qwen/Qwen3-32B",
lambda: "Модель ИИ. Список: https://cocoon.hikka.host/v1/models",
),
loader.ConfigValue(
"role",
"user",
lambda: "Роль user-сообщения (обычно user).",
validator=loader.validators.Choice(
["system", "developer", "user", "assistant", "tool"]
),
),
loader.ConfigValue(
"system_prompt",
"",
lambda: "System prompt (инструкция для модели, role=system).",
),
loader.ConfigValue(
"max_tokens",
3900,
lambda: "Максимальное количество токенов для ответа модели.",
validator=loader.validators.Integer(minimum=1),
),
loader.ConfigValue(
"temperature",
0.2,
lambda: "Температура генерации (0.01.0).",
),
)
async def client_ready(self, client, db):
self.db = db
self._client = client
self.api_url = "https://cocoon.hikka.host/v1"
self._openai: Optional[AsyncOpenAI] = None
def _rebuild_openai_client(self) -> None:
token = self.config.get("token") or ""
self._openai = AsyncOpenAI(
api_key=token, base_url=self.api_url, timeout=60.0, max_retries=2
)
def _ensure_client(self) -> None:
if not self._openai or (
self._openai.api_key != (self.config.get("token") or "")
):
self._rebuild_openai_client()
async def _answer(self, message, text, *args, **kwargs):
try:
if len(text) > 4096:
text = text[:4090] + "..."
return await utils.answer(message, repair_html_tags(text), *args, **kwargs)
except (MessageNotModifiedError, TelegramBadRequest):
return message
async def _fetch_usage(self) -> Optional[Usage]:
token = self.config.get("token")
if not token:
return None
headers = {
"accept": "application/json",
"X-API-Key": token,
}
try:
async with httpx.AsyncClient(timeout=20.0) as client:
r = await client.get(f"{self.api_url}/usage", headers=headers)
r.raise_for_status()
data = r.json()
except Exception as e:
logger.exception("Usage request failed: %s", e)
return None
if isinstance(data, dict) and data.get("detail") == "API key not recognized":
return None
if not isinstance(data, dict):
return None
return Usage(
spent_nano=_safe_int(data.get("spent_nano"), 0),
spent_ton=str(data.get("spent_ton", "0")),
tokens_spent=_safe_int(data.get("tokens_spent"), 0),
free_tokens_remaining=_safe_int(data.get("free_tokens_remaining"), 0),
free_tokens_reset_at=(
_safe_int(data.get("free_tokens_reset_at"), 0) or None
),
updated_at=(_safe_int(data.get("updated_at"), 0) or None),
)
async def _regenerate(self, call: InlineCall, arg1, arg2):
await self.cocoon(arg1, inline_message=arg2)
@loader.command()
async def ccusage(self, message):
"""Статистика использования Cocoon"""
if not self.config.get("token"):
return await self._answer(
message, self.strings["no_token"].format(self.get_prefix())
)
usage = await self._fetch_usage()
if not usage:
return await self._answer(message, self.strings["invalid_token_or_no_sub"])
reset_ts = _normalize_reset_ts(usage.free_tokens_reset_at)
days = _days_until_reset(reset_ts, cycle_days=30)
spent = usage.tokens_spent
total = spent + usage.free_tokens_remaining
if total <= 0:
total = 1_000_000
percent = _percent_remaining(spent, total)
percent_fmt = f"{percent:.1f}".rstrip("0").rstrip(".")
return await self._answer(
message,
self.strings["usage"].format(
current=_format_compact(spent),
total=_format_compact(total),
percent=percent_fmt,
days=days,
),
)
@loader.command()
async def cocoon(self, message, inline_message=None):
"""Задать вопрос к ИИ (поддерживает ответ на сообщение)"""
q = utils.get_args_raw(message)
if not q:
return await utils.answer(
message,
self.strings["no_args"].format(self.get_prefix(), "cocoon", "[вопрос]"),
)
if not self.config["token"]:
return await utils.answer(
message, self.strings["no_token"].format(self.get_prefix())
)
usage = await self._fetch_usage()
if not usage:
return await utils.answer(message, self.strings["invalid_token_or_no_sub"])
user_message = message
if not inline_message:
message = await self.inline.form(
text="...",
message=message,
force_me=False,
)
else:
message = inline_message
await utils.answer(message, self.strings["sending_request_to_cocoon"])
self._ensure_client()
system_prompt = (self.config.get("system_prompt") or "").strip()
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
if user_message.reply_to:
reply = await user_message.get_reply_message()
entity_id = None
if hasattr(reply, "from_id") and reply.from_id:
if hasattr(reply.from_id, "user_id") and reply.from_id.user_id:
entity_id = reply.from_id.user_id
elif hasattr(reply.from_id, "channel_id") and reply.from_id.channel_id:
entity_id = reply.from_id.channel_id
if entity_id is None:
if hasattr(reply.peer_id, "user_id") and reply.peer_id.user_id:
entity_id = reply.peer_id.user_id
else:
entity_id = reply.peer_id.channel_id
entity = await self.client.get_entity(entity_id)
date = reply.date.strftime("%H:%M %d.%m.%Y UTC")
messages.append(
{
"role": "user",
"content": (
(
f"{entity.first_name} {entity.last_name or ''} (user id: {entity.id}) "
if isinstance(entity, User)
else f"Channel {entity.title} (channel id: {entity.id}) "
)
+ f"msg id: {reply.id}, date: {date}: "
+ reply.message
),
}
)
messages.append({"role": self.config.get("role", "user"), "content": q})
try:
response = await self._openai.chat.completions.create(
messages=messages,
stream=True,
max_tokens=self.config.get("max_tokens", 3900),
model=self.config.get("model", "Qwen/Qwen3-32B"),
temperature=self.config.get("temperature", 0.2),
)
response_text = ""
chunk_buffer = ""
async for chunk in response:
if chunk.choices and chunk.choices[0].delta.content:
chunk_buffer += chunk.choices[0].delta.content
if len(chunk_buffer) >= 100:
response_text += chunk_buffer
chunk_buffer = ""
thoughts = response_text.split("</think>", 1)[0].replace(
"<think>", ""
)
if "</think>" in response_text:
after_think = response_text.split("</think>", 1)[1].strip()
await self._answer(
message,
self.strings["answer"].format(
thoughts=thoughts[:500],
question=q,
answer=md_to_tg_html(_escape_text(after_think) + ""),
model=self.config["model"],
),
)
else:
await self._answer(
message,
self.strings["thinking"].format(
thoughts=md_to_tg_html(_escape_text(thoughts) + "")
),
)
await asyncio.sleep(0.2)
if chunk_buffer:
response_text += chunk_buffer
responses_data = response_text.split("</think>", 1)
thoughts = responses_data[0].strip().replace("<think>", "")
after_think = responses_data[1].strip()
await self._answer(
message,
self.strings["answer"].format(
question=q,
thoughts=md_to_tg_html(_escape_text(thoughts[:500])),
answer=md_to_tg_html(_escape_text(after_think)),
model=self.config["model"],
),
reply_markup=[
{
"text": self.strings["again_kb"],
"callback": self._regenerate,
"args": [user_message, message],
}
],
)
except httpx.RemoteProtocolError:
return await self._answer(message, self.strings["try_again"])