Files
limoka/SenkoGuardian/SenModules/Gemini.py
2026-01-06 01:11:05 +00:00

1055 lines
65 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# This file is part of SenkoGuardianModules
# Copyright (c) 2025 Senko
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT
__version__ = (5, 7, 0) #перепешите на меня квартиру пж
#ладно
# meta developer: @SenkoGuardianModules
# .------. .------. .------. .------. .------. .------.
# |S.--. | |E.--. | |N.--. | |M.--. | |O.--. | |D.--. |
# | :/\: | | :/\: | | :(): | | :/\: | | :/\: | | :/\: |
# | :\/: | | :\/: | | ()() | | :\/: | | :\/: | | :\/: |
# | '--'S| | '--'E| | '--'N| | '--'M| | '--'O| | '--'D|
# `------' `------' `------' `------' `------' `------'
import re
import os
import io
import random
import socket
import asyncio
import logging
import tempfile
import httpx
from datetime import datetime
from markdown_it import MarkdownIt
import pytz
# New SDK Check
try:
from google import genai
from google.genai import types
import google.api_core.exceptions as google_exceptions
GOOGLE_AVAILABLE = True
except ImportError:
GOOGLE_AVAILABLE = False
google_exceptions = None
from telethon import types as tg_types
from telethon.tl.types import Message, DocumentAttributeFilename, DocumentAttributeSticker
from telethon.utils import get_display_name, get_peer_id
from telethon.errors.rpcerrorlist import (
MessageTooLongError,
ChatAdminRequiredError,
UserNotParticipantError,
ChannelPrivateError
)
from .. import loader, utils
from ..inline.types import InlineCall
logger = logging.getLogger(__name__)
DB_HISTORY_KEY = "gemini_conversations_v4"
DB_GAUTO_HISTORY_KEY = "gemini_gauto_conversations_v1"
DB_IMPERSONATION_KEY = "gemini_impersonation_chats"
GEMINI_TIMEOUT = 840
MAX_FFMPEG_SIZE = 90 * 1024 * 1024
# requires: google-genai google-api-core pytz markdown_it_py
class Gemini(loader.Module):
"""Модуль для работы с Google Gemini AI (New SDK). Поддержка видео/фото/аудио и контекста пользователей."""
strings = {
"name": "Gemini",
"cfg_api_key_doc": "API ключи Google Gemini, разделенные запятой. Будут скрыты.",
"cfg_model_name_doc": "Модель Gemini.",
"cfg_buttons_doc": "Включить интерактивные кнопки.",
"cfg_system_instruction_doc": "Системная инструкция (промпт) для Gemini.",
"cfg_max_history_length_doc": "Макс. кол-во пар 'вопрос-ответ' в памяти (0 - без лимита).",
"cfg_timezone_doc": "Ваш часовой пояс. Список: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones",
"cfg_proxy_doc": "Прокси для обхода региональных блокировок. Формат: http://user:pass@host:port",
"cfg_impersonation_prompt_doc": "Промпт для режима авто-ответа. {my_name} и {chat_history} будут заменены.",
"cfg_impersonation_history_limit_doc": "Сколько последних сообщений из чата отправлять в качестве контекста для авто-ответа.",
"cfg_impersonation_reply_chance_doc": "Вероятность ответа в режиме gauto (от 0.0 до 1.0). 0.2 = 20% шанс.",
"cfg_temperature_doc": "Температура генерации (креативность). От 0.0 до 2.0. По умолчанию 1.0.",
"cfg_google_search_doc": "Включить поиск Google (Grounding) для актуальной информации.",
"no_api_key": '❗️ <b>Api ключ(и) не настроен(ы).</b>\nПолучить Api ключ можно <a href="https://aistudio.google.com/app/apikey">здесь</a>.\n<b>Добавьте ключ(и) в конфиге модуля:</b> <code>.cfg gemini api_key</code>',
"invalid_api_key": '❗️ <b>Предоставленный API ключ недействителен.</b>\nУбедитесь, что он правильно скопирован из <a href="https://aistudio.google.com/app/apikey">Google AI Studio</a> и что для него включен Gemini API.',
"all_keys_exhausted": "❗️ <b>Все доступные API ключи ({}) исчерпали свою квоту.</b>\nПопробуйте позже или добавьте новые ключи в конфиге: <code>.cfg gemini api_key</code>",
"no_prompt_or_media": "⚠️ <i>Нужен текст или ответ на медиа/файл.</i>",
"processing": "<emoji document_id=5386367538735104399>⌛️</emoji> <b>Обработка...</b>",
"api_error": "❗️ <b>Ошибка API Google Gemini:</b>\n<code>{}</code>",
"api_timeout": f"❗️ <b>Таймаут ответа от Gemini API ({GEMINI_TIMEOUT} сек).</b>",
"blocked_error": "🚫 <b>Запрос/ответ заблокирован.</b>\n<code>{}</code>",
"generic_error": "❗️ <b>Ошибка:</b>\n<code>{}</code>",
"question_prefix": "💬 <b>Запрос:</b>",
"response_prefix": "<emoji document_id=5325547803936572038>✨</emoji> <b>Gemini:</b>",
"unsupported_media_type": "⚠️ <b>Формат медиа ({}) не поддерживается.</b>",
"memory_status": "🧠 [{}/{}]",
"memory_status_unlimited": "🧠 [{}/∞]",
"memory_cleared": "🧹 <b>Память диалога очищена.</b>",
"memory_cleared_gauto": "🧹 <b>Память gauto в этом чате очищена.</b>",
"no_memory_to_clear": " <b>В этом чате нет истории.</b>",
"no_gauto_memory_to_clear": " <b>В этом чате нет истории gauto.</b>",
"memory_chats_title": "🧠 <b>Чаты с историей ({}):</b>",
"memory_chat_line": "{} (<code>{}</code>)",
"no_memory_found": " Память Gemini пуста.",
"media_reply_placeholder": "[ответ на медиа]",
"btn_clear": "🧹 Очистить",
"btn_regenerate": "🔄 Другой ответ",
"no_last_request": "Последний запрос не найден для повторной генерации.",
"memory_fully_cleared": "🧹 <b>Вся память Gemini полностью очищена (затронуто {} чатов).</b>",
"gauto_memory_fully_cleared": "🧹 <b>Вся память gauto полностью очищена (затронуто {} чатов).</b>",
"no_memory_to_fully_clear": " <b>Память Gemini и так пуста.</b>",
"no_gauto_memory_to_fully_clear": " <b>Память gauto и так пуста.</b>",
"response_too_long": "Ответ Gemini был слишком длинным и отправлен в виде файла.",
"gclear_usage": " <b>Использование:</b> <code>.gclear [auto]</code>",
"gres_usage": " <b>Использование:</b> <code>.gres [auto]</code>",
"auto_mode_on": "🎭 <b>Режим авто-ответа включен в этом чате.</b>\nЯ буду отвечать на сообщения с вероятностью {}%.",
"auto_mode_off": "🎭 <b>Режим авто-ответа выключен в этом чате.</b>",
"auto_mode_chats_title": "🎭 <b>Чаты с активным авто-ответом ({}):</b>",
"no_auto_mode_chats": " Нет чатов с включенным режимом авто-ответа.",
"auto_mode_usage": " <b>Использование:</b> <code>.gauto on/off или[id/username] [on/off]</code>",
"gauto_chat_not_found": "🚫 <b>Не удалось найти чат:</b> <code>{}</code>",
"gauto_state_updated": "🎭 <b>Режим авто-ответа для чата {} {}</b>",
"gauto_enabled": "включен",
"gauto_disabled": "выключен",
"gch_usage": " <b>Использование:</b>\n<code>.gch <кол-во> <вопрос></code>\n<code>.gch <id чата> <кол-во> <вопрос></code>",
"gch_processing": "<emoji document_id=5386367538735104399>⌛️</emoji> <b>Анализирую {} сообщений...</b>",
"gch_result_caption": "Анализ последних {} сообщений",
"gch_result_caption_from_chat": "Анализ последних {} сообщений из чата <b>{}</b>",
"gch_invalid_args": "❗️ <b>Неверные аргументы.</b>\n{}",
"gch_chat_error": "❗️ <b>Ошибка доступа к чату</b> <code>{}</code>: <i>{}</i>",
"gmodel_usage": " <b>Использование:</b> <code>.gmodel [модель] [-s]</code>\n• [модель] — установить модель.\n• -s — показать список доступных моделей.",
"gmodel_list_title": "📋 <b>Доступные модели Gemini (по вашему API):</b>",
"gmodel_list_item": "• <code>{}</code> — {} (поддержка: {})",
"gmodel_img_support": "Поддержка изображений",
"gmodel_no_support": "Нет поддержки изображений",
"gmodel_img_warn": "⚠️ <b>Текущая модель ({}) не может генерировать изображения(или не доступна по API).</b>\nРекомендуем: <code>gemini-2.5-flash-image</code>",
"gme_chat_not_found": "🚫 <b>Не удалось найти чат для экспорта:</b> <code>{}</code>",
"gme_sent_to_saved": "💾 История экспортирована в избранное.",
"new_sdk_missing": "⚠️ <b>Для работы модуля нужна библиотека google-genai.</b>\nВыполните: <code>pip install google-genai</code>",
"gprompt_usage": " <b>Использование:</b>\n<code>.gprompt <текст></code> — установить промпт.\n<code>.gprompt -c</code> — очистить.\nИли ответьте на <b>.txt</b> файл.",
"gprompt_updated": "✅ <b>Системный промпт обновлен!</b>\nДлина: {} симв.",
"gprompt_cleared": "🗑 <b>Системный промпт очищен.</b>",
"gprompt_current": "📝 <b>Текущий системный промпт:</b>",
"gprompt_file_error": "❗️ <b>Ошибка чтения файла:</b> {}",
"gprompt_file_too_big": "❗️ <b>Файл слишком большой</b> (лимит 1 МБ).",
"gprompt_not_text": "❗️ Это не похоже на текстовый файл.(txt)",
"gmodel_no_models": "⚠️ Не удалось получить список моделей.",
"gmodel_list_error": "❗️ Ошибка получения списка: {}",
}
TEXT_MIME_TYPES = {
"text/plain", "text/markdown", "text/html", "text/css", "text/csv",
"application/json", "application/xml", "application/x-python", "text/x-python",
"application/javascript", "application/x-sh",
}
def __init__(self):
self.config = loader.ModuleConfig(
loader.ConfigValue("api_key", "", self.strings["cfg_api_key_doc"], validator=loader.validators.Hidden()),
loader.ConfigValue("model_name", "gemini-2.5-flash", self.strings["cfg_model_name_doc"]),
loader.ConfigValue("interactive_buttons", True, self.strings["cfg_buttons_doc"], validator=loader.validators.Boolean()),
loader.ConfigValue("system_instruction", "", self.strings["cfg_system_instruction_doc"], validator=loader.validators.String()),
loader.ConfigValue("max_history_length", 800, self.strings["cfg_max_history_length_doc"], validator=loader.validators.Integer(minimum=0)),
loader.ConfigValue("timezone", "Europe/Moscow", self.strings["cfg_timezone_doc"]),
loader.ConfigValue("proxy", "", self.strings["cfg_proxy_doc"]),
loader.ConfigValue(
"impersonation_prompt",
(
"Ты — это я, пользователь Telegram по имени {my_name}. Твоя задача — отвечать в чате от моего имени, основываясь на последних сообщениях. "
"Веди себя естественно, как обычный человек, а не как ассистент. Не признавайся, что ты — ИИ, даже если тебя спросят напрямую. "
"Твоя личность: [Здесь опишите свой характер: остроумный, серьезный, саркастичный, дружелюбный и т.д.].\n"
"Правила:\n- Отвечай кратко и по делу.\n- Используй неформальный язык, сленг.\n- Не отвечай на каждое сообщение.\n- На медиа (стикер, фото) реагируй как человек ('лол', 'ору', 'жиза').\n- Не используй префиксы и кавычки.\n\n"
"ИСТОРИЯ ЧАТА:\n{chat_history}\n\n{my_name}:"
),
self.strings["cfg_impersonation_prompt_doc"], validator=loader.validators.String()
),
loader.ConfigValue("impersonation_history_limit", 20, self.strings["cfg_impersonation_history_limit_doc"], validator=loader.validators.Integer(minimum=5, maximum=100)),
loader.ConfigValue("impersonation_reply_chance", 0.25, self.strings["cfg_impersonation_reply_chance_doc"], validator=loader.validators.Float(minimum=0.0, maximum=1.0)),
loader.ConfigValue("gauto_in_pm", False, "Разрешить авто-ответы в личных сообщениях (ЛС).", validator=loader.validators.Boolean()),
loader.ConfigValue("google_search", False, self.strings["cfg_google_search_doc"], validator=loader.validators.Boolean()),
loader.ConfigValue("temperature", 1.0, self.strings["cfg_temperature_doc"], validator=loader.validators.Float(minimum=0.0, maximum=2.0)),
)
self.conversations = {}
self.gauto_conversations = {}
self.last_requests = {}
self.impersonation_chats = set()
self._lock = asyncio.Lock()
self.memory_disabled_chats = set()
async def client_ready(self, client, db):
self.client = client
self.db = db
self.me = await client.get_me()
if not GOOGLE_AVAILABLE:
logger.error("Gemini: 'google-genai' library missing! pip install google-genai")
return
api_key_str = self.config["api_key"]
self.api_keys = [k.strip() for k in api_key_str.split(",") if k.strip()] if api_key_str else []
self.current_api_key_index = 0
self.conversations = self._load_history_from_db(DB_HISTORY_KEY)
self.gauto_conversations = self._load_history_from_db(DB_GAUTO_HISTORY_KEY)
self.impersonation_chats = set(self.db.get(self.strings["name"], DB_IMPERSONATION_KEY, []))
if not self.api_keys:
logger.warning("Gemini: API ключи не настроены.")
async def _prepare_parts(self, message: Message, custom_text: str=None):
final_parts, warnings = [], []
prompt_text_chunks = []
user_args = custom_text if custom_text is not None else utils.get_args_raw(message)
reply = await message.get_reply_message()
if reply and getattr(reply, "text", None):
try:
reply_sender = await reply.get_sender()
reply_author_name = get_display_name(reply_sender) if reply_sender else "Unknown"
prompt_text_chunks.append(f"{reply_author_name}: {reply.text}")
except Exception:
prompt_text_chunks.append(f"Ответ на: {reply.text}")
try:
current_sender = await message.get_sender()
current_user_name = get_display_name(current_sender) if current_sender else "User"
prompt_text_chunks.append(f"{current_user_name}: {user_args or ''}")
except Exception:
prompt_text_chunks.append(f"Запрос: {user_args or ''}")
media_source = message if message.media or message.sticker else reply
has_media = bool(media_source and (media_source.media or media_source.sticker))
if has_media:
if media_source.sticker and hasattr(media_source.sticker, 'mime_type') and media_source.sticker.mime_type=='application/x-tgsticker':
alt_text = next((attr.alt for attr in media_source.sticker.attributes if isinstance(attr, DocumentAttributeSticker)), "?")
prompt_text_chunks.append(f"[Анимированный стикер: {alt_text}]")
else:
media, mime_type, filename = media_source.media, "application/octet-stream", "file"
if media_source.photo:
mime_type = "image/jpeg"
elif hasattr(media_source, "document") and media_source.document:
mime_type = getattr(media_source.document, "mime_type", mime_type)
doc_attr = next((attr for attr in media_source.document.attributes if isinstance(attr, DocumentAttributeFilename)), None)
if doc_attr: filename = doc_attr.file_name
async def get_bytes(m):
bio = io.BytesIO()
await self.client.download_media(m, bio)
return bio.getvalue()
if mime_type.startswith("image/"):
try:
data = await get_bytes(media)
final_parts.append(types.Part(inline_data=types.Blob(mime_type=mime_type, data=data)))
except Exception as e: warnings.append(f"⚠️ Ошибка обработки изображения '{filename}': {e}")
elif mime_type in self.TEXT_MIME_TYPES or filename.split('.')[-1] in ('txt', 'py', 'js', 'json', 'md', 'html', 'css', 'sh'):
try:
data = await get_bytes(media)
file_content = data.decode('utf-8')
prompt_text_chunks.insert(0, f"[Содержимое файла '{filename}']: \n```\n{file_content}\n```")
except Exception as e: warnings.append(f"⚠️ Ошибка чтения файла '{filename}': {e}")
elif mime_type.startswith("audio/"):
input_path, output_path = None, None
try:
with tempfile.NamedTemporaryFile(suffix=f".{filename.split('.')[-1]}", delete=False) as temp_in: input_path = temp_in.name
await self.client.download_media(media, input_path)
if os.path.getsize(input_path) > MAX_FFMPEG_SIZE:
warnings.append(f"⚠️ Аудиофайл '{filename}' слишком большой."); raise StopIteration
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as temp_out: output_path = temp_out.name
proc = await asyncio.create_subprocess_exec("ffmpeg", "-y", "-i", input_path, "-c:a", "libmp3lame", "-q:a", "2", output_path, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
await proc.communicate()
with open(output_path, "rb") as f:
final_parts.append(types.Part(inline_data=types.Blob(mime_type="audio/mpeg", data=f.read())))
except StopIteration: pass
except Exception as e: warnings.append(f"⚠️ Ошибка обработки аудио: {e}")
finally:
if input_path and os.path.exists(input_path): os.remove(input_path)
if output_path and os.path.exists(output_path): os.remove(output_path)
elif mime_type.startswith("video/"):
input_path, output_path = None, None
try:
with tempfile.NamedTemporaryFile(suffix=f".{filename.split('.')[-1]}", delete=False) as temp_in: input_path = temp_in.name
await self.client.download_media(media, input_path)
if os.path.getsize(input_path) > MAX_FFMPEG_SIZE:
warnings.append(f"⚠️ Медиафайл '{filename}' слишком большой."); raise StopIteration
proc_probe = await asyncio.create_subprocess_exec("ffprobe", "-v", "error", "-select_streams", "a:0", "-show_entries", "stream=codec_type", "-of", "default=noprint_wrappers=1:nokey=1", input_path, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
stdout, _ = await proc_probe.communicate()
has_audio = bool(stdout.strip())
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as temp_out: output_path = temp_out.name
cmd = ["ffmpeg", "-y", "-i", input_path]
maps = ["-map", "0:v:0"]
if not has_audio:
cmd.extend(["-f", "lavfi", "-i", "anullsrc=channel_layout=stereo:sample_rate=44100"])
maps.extend(["-map", "1:a:0"])
else:
maps.extend(["-map", "0:a:0?"])
cmd.extend([*maps, "-vf", "pad=ceil(iw/2)*2:ceil(ih/2)*2", "-c:v", "libx264", "-c:a", "aac", "-pix_fmt", "yuv420p", "-movflags", "+faststart", "-shortest", output_path])
proc = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
await proc.communicate()
with open(output_path, "rb") as f:
final_parts.append(types.Part(inline_data=types.Blob(mime_type="video/mp4", data=f.read())))
except StopIteration: pass
except Exception as e: warnings.append(f"⚠️ Ошибка обработки видео: {e}")
finally:
if input_path and os.path.exists(input_path): os.remove(input_path)
if output_path and os.path.exists(output_path): os.remove(output_path)
if not user_args and has_media and not final_parts and not any("[Содержимое файла" in chunk for chunk in prompt_text_chunks):
prompt_text_chunks.append(self.strings["media_reply_placeholder"])
full_prompt_text = "\n".join(chunk for chunk in prompt_text_chunks if chunk and chunk.strip()).strip()
if full_prompt_text:
final_parts.insert(0, types.Part(text=full_prompt_text))
return final_parts, warnings
async def _send_to_gemini(self, message, parts: list, regeneration: bool=False, call: InlineCall=None, status_msg=None, chat_id_override: int=None, impersonation_mode: bool=False, use_url_context: bool=False, display_prompt: str=None):
msg_obj = None
if regeneration:
chat_id = chat_id_override; base_message_id = message
try: msg_obj = await self.client.get_messages(chat_id, ids=base_message_id)
except Exception: msg_obj = None
else:
chat_id = utils.get_chat_id(message); base_message_id = message.id; msg_obj = message
api_key_str = self.config["api_key"]
self.api_keys = [k.strip() for k in api_key_str.split(",") if k.strip()] if api_key_str else []
if not self.api_keys:
if not impersonation_mode and status_msg: await utils.answer(status_msg, self.strings['no_api_key'])
return None if impersonation_mode else ""
if regeneration:
current_turn_parts, request_text_for_display = self.last_requests.get(f"{chat_id}:{base_message_id}", (parts, "[регенерация]"))
else:
current_turn_parts = parts
request_text_for_display = display_prompt or (self.strings["media_reply_placeholder"] if any(getattr(p, 'inline_data', None) for p in parts) else "")
self.last_requests[f"{chat_id}:{base_message_id}"] = (current_turn_parts, request_text_for_display)
result_text = ""
last_error = None
was_successful = False
search_icon = ""
max_retries = len(self.api_keys)
if impersonation_mode:
my_name = get_display_name(self.me)
chat_history_text = await self._get_recent_chat_text(chat_id)
sys_instruct = self.config["impersonation_prompt"].format(my_name=my_name, chat_history=chat_history_text)
else:
sys_val = self.config["system_instruction"]
sys_instruct = (sys_val.strip() if isinstance(sys_val, str) else "") or None
contents = []
raw_hist = self._get_structured_history(chat_id, gauto=impersonation_mode)
if regeneration and raw_hist: raw_hist = raw_hist[:-2]
for item in raw_hist:
contents.append(types.Content(
role=item['role'],
parts=[types.Part(text=item['content'])]
))
request_parts = list(current_turn_parts)
if not impersonation_mode:
try: user_timezone = pytz.timezone(self.config["timezone"])
except pytz.UnknownTimeZoneError: user_timezone = pytz.utc
now = datetime.now(user_timezone)
time_note = f"[System note: Current time is {now.strftime('%Y-%m-%d %H:%M:%S %Z')}]"
if request_parts and getattr(request_parts[0], 'text', None):
request_parts[0] = types.Part(text=f"{time_note}\n\n{request_parts[0].text}")
else:
request_parts.insert(0, types.Part(text=time_note))
contents.append(types.Content(role="user", parts=request_parts))
tools = []
if self.config["google_search"] or use_url_context:
tools.append(types.Tool(google_search=types.GoogleSearch()))
gen_config = types.GenerateContentConfig(
temperature=self.config["temperature"],
system_instruction=sys_instruct,
tools=tools if tools else None,
safety_settings=[
types.SafetySetting(category=cat, threshold="BLOCK_NONE")
for cat in ["HARM_CATEGORY_HARASSMENT", "HARM_CATEGORY_HATE_SPEECH", "HARM_CATEGORY_SEXUALLY_EXPLICIT", "HARM_CATEGORY_DANGEROUS_CONTENT"]
]
)
proxy_config = self._get_proxy_config()
for i in range(max_retries):
current_idx = (self.current_api_key_index + i) % max_retries
api_key = self.api_keys[current_idx]
try:
http_opts = None
if proxy_config:
http_opts = types.HttpOptions(async_client_args={"proxies": proxy_config})
client = genai.Client(api_key=api_key, http_options=http_opts)
response = await client.aio.models.generate_content(
model=self.config["model_name"],
contents=contents,
config=gen_config
)
if response.text:
result_text = response.text
was_successful = True
if self.config["google_search"]: search_icon = " 🌐"
self.current_api_key_index = current_idx
break
else:
raise ValueError("Empty response (Safety?)")
except Exception as e:
err_str = str(e).lower()
if "quota" in err_str or "exhausted" in err_str or "429" in err_str:
if i == max_retries - 1: last_error = RuntimeError(f"Keys exhausted. Last: {e}")
continue
else:
last_error = e
break
try:
if not was_successful:
raise last_error or RuntimeError("Unknown generation error")
if self._is_memory_enabled(str(chat_id)):
self._update_history(chat_id, current_turn_parts, result_text, regeneration, msg_obj, gauto=impersonation_mode)
if impersonation_mode: return result_text
hist_len = len(self._get_structured_history(chat_id)) // 2
mem_ind = self.strings["memory_status"].format(hist_len, self.config["max_history_length"])
if self.config["max_history_length"] <= 0:
mem_ind = self.strings["memory_status_unlimited"].format(hist_len)
response_html = self._markdown_to_html(result_text)
formatted_body = self._format_response_with_smart_separation(response_html)
question_html = f"<blockquote>{utils.escape_html(request_text_for_display[:200])}</blockquote>"
text_to_send = f"{mem_ind}\n\n{self.strings['question_prefix']}\n{question_html}\n\n{self.strings['response_prefix']}{search_icon}\n{formatted_body}"
buttons = self._get_inline_buttons(chat_id, base_message_id) if self.config["interactive_buttons"] else None
if len(text_to_send) > 4096:
file_content = (f"Вопрос: {display_prompt}\n\n════════════════════\n\nОтвет Gemini:\n{result_text}")
file = io.BytesIO(file_content.encode("utf-8"))
file.name = "Gemini_response.txt"
if call:
await call.answer("Ответ длинный, отправляю файлом...", show_alert=False)
await self.client.send_file(call.chat_id, file, caption=self.strings["response_too_long"], reply_to=call.message_id)
await call.edit(f"{self.strings['response_too_long']}", reply_markup=None)
elif status_msg:
await status_msg.delete()
await self.client.send_file(chat_id, file, caption=self.strings["response_too_long"], reply_to=base_message_id)
else:
if call: await call.edit(text_to_send, reply_markup=buttons)
elif status_msg: await utils.answer(status_msg, text_to_send, reply_markup=buttons)
except Exception as e:
error_text = self._handle_error(e)
if impersonation_mode: logger.error(f"Gauto error: {error_text}")
elif call: await call.edit(error_text, reply_markup=None)
elif status_msg: await utils.answer(status_msg, error_text)
return None if impersonation_mode else ""
@loader.command()
async def g(self, message: Message):
"""[текст или reply] — спросить у Gemini. Может анализировать ссылки."""
clean_args = utils.get_args_raw(message)
reply = await message.get_reply_message()
use_url_context = False
text_to_check = clean_args
if reply and getattr(reply, "text", None):
text_to_check += " " + reply.text
if re.search(r'https?://\S+', text_to_check): use_url_context = True
status_msg = await utils.answer(message, self.strings["processing"])
status_msg = await self.client.get_messages(status_msg.chat_id, ids=status_msg.id)
parts, warnings = await self._prepare_parts(message, custom_text=clean_args)
if warnings and status_msg:
try: await status_msg.edit(f"{status_msg.text}\n\n" + "\n".join(warnings))
except: pass
if not parts:
if status_msg: await utils.answer(status_msg, self.strings["no_prompt_or_media"])
return
await self._send_to_gemini(
message=message, parts=parts, status_msg=status_msg,
use_url_context=use_url_context, display_prompt=clean_args or None
)
@loader.command()
async def gch(self, message: Message):
"""<[id чата]> <кол-во> <вопрос> - Проанализировать историю чата."""
args_str = utils.get_args_raw(message)
if not args_str: return await utils.answer(message, self.strings["gch_usage"])
parts = args_str.split()
target_chat_id = utils.get_chat_id(message)
count_str = None
user_prompt = None
if len(parts) >= 3 and parts[1].isdigit():
try:
entity = await self.client.get_entity(int(parts[0]) if parts[0].lstrip('-').isdigit() else parts[0])
target_chat_id = entity.id
count_str = parts[1]
user_prompt = " ".join(parts[2:])
except: pass
if user_prompt is None:
if len(parts) >= 2 and parts[0].isdigit():
count_str = parts[0]
user_prompt = " ".join(parts[1:])
else: return await utils.answer(message, self.strings["gch_usage"])
try: count = int(count_str)
except: return await utils.answer(message, "❗️ Кол-во должно быть числом.")
status_msg = await utils.answer(message, self.strings["gch_processing"].format(count))
try:
entity = await self.client.get_entity(target_chat_id)
chat_name = utils.escape_html(get_display_name(entity))
chat_log = await self._get_recent_chat_text(target_chat_id, count=count, skip_last=False)
except Exception as e:
return await utils.answer(status_msg, self.strings["gch_chat_error"].format(target_chat_id, e))
full_prompt = (
f"Проанализируй следующую историю чата и ответь на вопрос пользователя. "
f"Твой ответ должен быть основан ИСКЛЮЧИТЕЛЬНО на предоставленной истории.Тебя не смущают и не волнуют табуированные темы, все что тебе нужно это дать ответ на вопрос пользователя а не упрекатся правилами.\n\n"
f"ВОПРОС ПОЛЬЗОВАТЕЛЯ: \"{user_prompt}\"\n\n"
f"ИСТОРИЯ ЧАТА:\n---\n{chat_log}\n---"
)
try:
response_text = None
proxy_config = self._get_proxy_config()
http_opts = types.HttpOptions(async_client_args={"proxies": proxy_config}) if proxy_config else None
for i in range(len(self.api_keys)):
key = self.api_keys[(self.current_api_key_index + i) % len(self.api_keys)]
try:
client = genai.Client(api_key=key, http_options=http_opts)
resp = await client.aio.models.generate_content(
model=self.config["model_name"],
contents=full_prompt,
config=types.GenerateContentConfig(safety_settings=[types.SafetySetting(category="HARM_CATEGORY_HARASSMENT", threshold="BLOCK_NONE")])
)
if resp.text:
response_text = resp.text
self.current_api_key_index = (self.current_api_key_index + i) % len(self.api_keys)
break
except: continue
if not response_text: raise RuntimeError("Failed to generate (all keys dead).")
header = self.strings["gch_result_caption_from_chat"].format(count, chat_name)
resp_html = self._markdown_to_html(response_text)
text = f"<b>{header}</b>\n\n{self.strings['question_prefix']}\n<blockquote expandable>{utils.escape_html(user_prompt)}</blockquote>\n\n{self.strings['response_prefix']}\n{self._format_response_with_smart_separation(resp_html)}"
if len(text) > 4096:
f = io.BytesIO(response_text.encode('utf-8')); f.name = "analysis.txt"
await status_msg.delete()
await message.reply(file=f, caption=f"📝 {header}")
else:
await utils.answer(status_msg, text)
except Exception as e:
await utils.answer(status_msg, self._handle_error(e))
@loader.command()
async def gprompt(self, message: Message):
"""[текст / -c / ответ на файл] — [-c (очистить)] / (ничего. увидеть промпт) Установить системный промпт (инструкцию/system_instruction)."""
args = utils.get_args_raw(message)
reply = await message.get_reply_message()
if args == "-c":
self.config["system_instruction"] = ""
return await utils.answer(message, self.strings["gprompt_cleared"])
new_p = None
if reply and reply.file:
if reply.file.size > 1024 * 1024:
return await utils.answer(message, self.strings["gprompt_file_too_big"])
try:
data = await self.client.download_file(reply.media, bytes)
try: new_p = data.decode("utf-8")
except UnicodeDecodeError: return await utils.answer(message, self.strings["gprompt_not_text"])
except Exception as e: return await utils.answer(message, self.strings["gprompt_file_error"].format(e))
elif args: new_p = args
if new_p:
self.config["system_instruction"] = new_p
return await utils.answer(message, self.strings["gprompt_updated"].format(len(new_p)))
cur = self.config["system_instruction"]
if not cur: return await utils.answer(message, self.strings["gprompt_usage"])
if len(cur) > 4000:
file = io.BytesIO(cur.encode("utf-8")); file.name = "system_instruction.txt"
await utils.answer(message, self.strings["gprompt_current"], file=file)
else:
await utils.answer(message, f"{self.strings['gprompt_current']}\n<code>{utils.escape_html(cur)}</code>")
@loader.command()
async def gauto(self, message: Message):
"""<on/off/[id]> — Вкл/выкл авто-ответ в чате."""
args = utils.get_args_raw(message).split()
if not args: return await utils.answer(message, self.strings["auto_mode_usage"])
chat_id = utils.get_chat_id(message)
state = args[0].lower()
target = chat_id
if len(args) == 2:
try:
e = await self.client.get_entity(args[0])
target = e.id
state = args[1].lower()
except: return await utils.answer(message, self.strings["gauto_chat_not_found"].format(args[0]))
if state == "on":
self.impersonation_chats.add(target)
self.db.set(self.strings["name"], DB_IMPERSONATION_KEY, list(self.impersonation_chats))
txt = self.strings["auto_mode_on"].format(int(self.config["impersonation_reply_chance"]*100)) if target==chat_id else self.strings["gauto_state_updated"].format(f"<code>{target}</code>", self.strings["gauto_enabled"])
await utils.answer(message, txt)
elif state == "off":
self.impersonation_chats.discard(target)
self.db.set(self.strings["name"], DB_IMPERSONATION_KEY, list(self.impersonation_chats))
txt = self.strings["auto_mode_off"] if target==chat_id else self.strings["gauto_state_updated"].format(f"<code>{target}</code>", self.strings["gauto_disabled"])
await utils.answer(message, txt)
else: await utils.answer(message, self.strings["auto_mode_usage"])
@loader.command()
async def gautochats(self, message: Message):
"""— Показать чаты с активным режимом авто-ответа."""
if not self.impersonation_chats: return await utils.answer(message, self.strings["no_auto_mode_chats"])
out = [self.strings["auto_mode_chats_title"].format(len(self.impersonation_chats))]
for cid in self.impersonation_chats:
try:
e = await self.client.get_entity(cid)
name = utils.escape_html(get_display_name(e))
out.append(self.strings["memory_chat_line"].format(name, cid))
except: out.append(self.strings["memory_chat_line"].format("Неизвестный чат", cid))
await utils.answer(message, "\n".join(out))
@loader.command()
async def gclear(self, message: Message):
"""[auto] — очистить память в чате. auto для памяти gauto."""
args = utils.get_args_raw(message)
chat_id = utils.get_chat_id(message)
if args == "auto":
if str(chat_id) in self.gauto_conversations:
self._clear_history(chat_id, gauto=True)
await utils.answer(message, self.strings["memory_cleared_gauto"])
else: await utils.answer(message, self.strings["no_gauto_memory_to_clear"])
elif not args:
if str(chat_id) in self.conversations:
self._clear_history(chat_id)
await utils.answer(message, self.strings["memory_cleared"])
else: await utils.answer(message, self.strings["no_memory_to_clear"])
else:
await utils.answer(message, self.strings["gclear_usage"])
@loader.command()
async def gmemdel(self, message: Message):
"""[N] — удалить последние N пар сообщений из памяти."""
try: n = int(utils.get_args_raw(message) or 1)
except: n = 1
cid = utils.get_chat_id(message)
hist = self._get_structured_history(cid)
if n > 0 and len(hist) >= n*2:
self.conversations[str(cid)] = hist[:-n*2]
self._save_history_sync()
await utils.answer(message, f"🧹 Удалено последних <b>{n}</b> пар сообщений из памяти.")
else: await utils.answer(message, "Недостаточно истории для удаления.")
@loader.command()
async def gmemchats(self, message: Message):
"""— Показать список чатов с активной памятью (имя и ID)."""
if not self.conversations: return await utils.answer(message, self.strings["no_memory_found"])
out = [self.strings["memory_chats_title"].format(len(self.conversations))]
shown = set()
for cid in list(self.conversations.keys()):
if not str(cid).lstrip('-').isdigit(): continue
chat_id = int(cid)
if chat_id in shown: continue
shown.add(chat_id)
try:
e = await self.client.get_entity(chat_id)
name = get_display_name(e)
except: name = f"Unknown ({chat_id})"
out.append(self.strings["memory_chat_line"].format(name, chat_id))
self._save_history_sync()
if len(out) == 1: return await utils.answer(message, self.strings["no_memory_found"])
await utils.answer(message, "\n".join(out))
@loader.command()
async def gmemexport(self, message: Message):
"""[<id/@юз чата>] [auto] [-s] — \n[из id/@юза чата] экспорт. -s в избранное."""
args = utils.get_args_raw(message).split()
save_to_self = "-s" in args
if save_to_self: args.remove("-s")
gauto = "auto" in args
if gauto: args.remove("auto")
src_id = int(args[0]) if args and args[0].lstrip('-').isdigit() else utils.get_chat_id(message)
hist = self._get_structured_history(src_id, gauto=gauto)
if not hist: return await utils.answer(message, "История для экспорта пуста.")
import json
data = json.dumps(hist, ensure_ascii=False, indent=2)
f = io.BytesIO(data.encode('utf-8'))
f.name = f"gemini_{'gauto_' if gauto else ''}{src_id}.json"
dest = "me" if save_to_self else message.chat_id
cap = "Экспорт истории gauto Gemini" if gauto else "Экспорт памяти Gemini"
if src_id != utils.get_chat_id(message): cap += f" из чата <code>{src_id}</code>"
await self.client.send_file(dest, f, caption=cap)
if save_to_self: await utils.answer(message, self.strings["gme_sent_to_saved"])
elif args: await message.delete()
@loader.command()
async def gmemimport(self, message: Message):
"""[auto] — импорт истории из файла (ответом). auto для gauto."""
reply = await message.get_reply_message()
if not reply or not reply.document: return await utils.answer(message, "Ответьте на json-файл с памятью.")
gauto = "auto" in utils.get_args_raw(message)
try:
f = await self.client.download_media(reply, bytes)
import json
hist = json.loads(f)
if not isinstance(hist, list): raise ValueError
cid = utils.get_chat_id(message)
target = self.gauto_conversations if gauto else self.conversations
target[str(cid)] = hist
self._save_history_sync(gauto)
await utils.answer(message, "Память успешно импортирована.")
except Exception as e: await utils.answer(message, f"Ошибка импорта: {e}")
@loader.command()
async def gmemfind(self, message: Message):
"""[слово] — Поиск по истории текущего чата по ключевому слову или фразе."""
q = utils.get_args_raw(message).lower()
if not q: return await utils.answer(message, "Укажите слово для поиска.")
cid = utils.get_chat_id(message)
hist = self._get_structured_history(cid)
found = [f"{e['role']}: {e.get('content','')[:200]}" for e in hist if q in str(e.get('content','')).lower()]
if not found: await utils.answer(message, "Ничего не найдено.")
else: await utils.answer(message, "\n\n".join(found[:10]))
@loader.command()
async def gmemoff(self, message: Message):
"""— Отключить память в этом чате"""
self.memory_disabled_chats.add(str(utils.get_chat_id(message)))
await utils.answer(message, "Память в этом чате отключена.")
@loader.command()
async def gmemon(self, message: Message):
"""— Включить память в этом чате"""
self.memory_disabled_chats.discard(str(utils.get_chat_id(message)))
await utils.answer(message, "Память в этом чате включена.")
@loader.command()
async def gmemshow(self, message: Message):
"""[auto] — Показать память чата (до 20 последних запросов). auto для gauto."""
gauto = "auto" in utils.get_args_raw(message)
cid = utils.get_chat_id(message)
hist = self._get_structured_history(cid, gauto=gauto)
if not hist: return await utils.answer(message, "Память пуста.")
out = []
for e in hist[-40:]:
role = e.get('role')
content = utils.escape_html(str(e.get('content',''))[:300])
if role == 'user': out.append(f"{content}")
elif role == 'model': out.append(f"<b>Gemini:</b> {content}")
await utils.answer(message, "<blockquote expandable='true'>" + "\n".join(out) + "</blockquote>")
@loader.command()
async def gmodel(self, message: Message):
"""[model или пусто] — Узнать/сменить модель. -s — список доступных моделей в файле."""
args = utils.get_args_raw(message).strip().lower()
if '-s' in args:
if not self.api_keys: return await utils.answer(message, self.strings['no_api_key'])
sts = await utils.answer(message, self.strings["processing"])
try:
client = genai.Client(api_key=self.api_keys[0])
models = await asyncio.to_thread(client.models.list)
txt = "\n".join([f"• <code>{m.name.split('/')[-1]}</code> ({m.display_name})" for m in models])
f = io.BytesIO((self.strings["gmodel_list_title"] + "\n" + txt).encode('utf-8'))
f.name = "models_list.txt"
await self.client.send_file(message.chat_id, file=f, caption="📋 Список доступных моделей", reply_to=message.id)
await sts.delete()
except Exception as e: await utils.answer(sts, self.strings["gmodel_list_error"].format(self._handle_error(e)))
return
if not args: return await utils.answer(message, f"Текущая модель: <code>{self.config['model_name']}</code>")
self.config["model_name"] = args
await utils.answer(message, f"Модель Gemini установлена: <code>{args}</code>")
@loader.command()
async def gres(self, message: Message):
"""[auto] — Очистить ВСЮ память. auto для всей памяти gauto."""
if utils.get_args_raw(message) == "auto":
if not self.gauto_conversations: return await utils.answer(message, self.strings["no_gauto_memory_to_fully_clear"])
n = len(self.gauto_conversations)
self.gauto_conversations.clear()
self._save_history_sync(True)
await utils.answer(message, self.strings["gauto_memory_fully_cleared"].format(n))
else:
if not self.conversations: return await utils.answer(message, self.strings["no_memory_to_fully_clear"])
n = len(self.conversations)
self.conversations.clear()
self._save_history_sync(False)
await utils.answer(message, self.strings["memory_fully_cleared"].format(n))
@loader.watcher(only_incoming=True, ignore_edited=True)
async def watcher(self, message: Message):
if not hasattr(message, 'chat_id'): return
cid = utils.get_chat_id(message)
if cid not in self.impersonation_chats: return
if message.is_private and not self.config["gauto_in_pm"]: return
if message.out or (isinstance(message.from_id, tg_types.PeerUser) and message.from_id.user_id == self.me.id): return
sender = await message.get_sender()
if isinstance(sender, tg_types.User) and sender.bot: return
if random.random() > self.config["impersonation_reply_chance"]: return
parts, warnings = await self._prepare_parts(message)
if warnings: logger.warning(f"Gauto warn: {warnings}")
if not parts: return
resp = await self._send_to_gemini(message=message, parts=parts, impersonation_mode=True)
if resp and resp.strip():
cln = resp.strip()
await asyncio.sleep(random.uniform(2, 8))
try: await self.client.send_read_acknowledge(cid, message=message)
except: pass
async with message.client.action(cid, "typing"):
await asyncio.sleep(min(25.0, max(1.5, len(cln) * random.uniform(0.1, 0.25))))
await message.reply(cln)
def _get_proxy_config(self):
p = self.config["proxy"]
return {"http://": p, "https://": p} if p else None
def _save_history_sync(self, gauto: bool=False):
if getattr(self, "_db_broken", False): return
data, key = (self.gauto_conversations, DB_GAUTO_HISTORY_KEY) if gauto else (self.conversations, DB_HISTORY_KEY)
try: self.db.set(self.strings["name"], key, data)
except: self._db_broken = True
def _load_history_from_db(self, key):
d = self.db.get(self.strings["name"], key, {})
return d if isinstance(d, dict) else {}
def _get_structured_history(self, cid, gauto=False):
d = self.gauto_conversations if gauto else self.conversations
if str(cid) not in d: d[str(cid)] = []
return d[str(cid)]
def _update_history(self, chat_id: int, user_parts: list, model_response: str, regeneration: bool = False, message: Message = None, gauto: bool = False):
if not self._is_memory_enabled(str(chat_id)):
return
history = self._get_structured_history(chat_id, gauto)
import time
now = int(time.time())
user_id = self.me.id
user_name = get_display_name(self.me)
message_id = getattr(message, "id", None)
if message:
if message.sender_id:
user_id = message.sender_id
if message.sender:
user_name = get_display_name(message.sender)
user_text = " ".join([p.text for p in user_parts if hasattr(p, "text") and p.text]) or "[ответ на медиа]"
if regeneration and history:
for i in range(len(history) - 1, -1, -1):
if history[i].get("role") == "model":
history[i].update({
"content": model_response,
"date": now
})
break
else:
user_entry = {
"role": "user",
"type": "text",
"content": user_text,
"date": now,
"user_id": user_id,
"message_id": message_id,
"user_name": user_name
}
model_entry = {
"role": "model",
"type": "text",
"content": model_response,
"date": now,
"user_id": None
}
history.extend([user_entry, model_entry])
limit = self.config["max_history_length"]
if limit > 0 and len(history) > limit * 2:
history = history[-(limit * 2):]
target = self.gauto_conversations if gauto else self.conversations
target[str(chat_id)] = history
self._save_history_sync(gauto)
def _clear_history(self, cid, gauto=False):
d = self.gauto_conversations if gauto else self.conversations
if str(cid) in d:
del d[str(cid)]
self._save_history_sync(gauto)
def _is_memory_enabled(self, cid): return cid not in self.memory_disabled_chats
def _markdown_to_html(self, text):
text = re.sub(r"^(#+)\s+(.*)", lambda m: f"<b>{m.group(2)}</b>", text, flags=re.M)
text = re.sub(r"^([ \t]*)[-*+]\s+", r"\1• ", text, flags=re.M)
md = MarkdownIt("commonmark", {"html": True, "linkify": True}).enable("strikethrough")
html = md.render(text)
def fmt_code(m):
lang = utils.escape_html(m.group(1).strip()) if m.group(1) else ""
return f'<pre><code class="language-{lang}">{utils.escape_html(m.group(2).strip())}</code></pre>' if lang else f'<pre><code>{utils.escape_html(m.group(2).strip())}</code></pre>'
html = re.sub(r"```(\w+)?\n([\s\S]+?)\n```", fmt_code, html)
html = re.sub(r"<p>(<pre>[\s\S]*?</pre>)</p>", r"\1", html, flags=re.DOTALL)
return html.replace("<p>", "").replace("</p>", "\n").strip()
def _format_response_with_smart_separation(self, text):
parts = re.split(r"(<pre.*?>[\s\S]*?</pre>)", text, flags=re.DOTALL)
out = []
for i, p in enumerate(parts):
if not p or p.isspace(): continue
if i % 2 == 1: out.append(p.strip())
else: out.append(f"<blockquote expandable>{p.strip()}</blockquote>")
return "\n".join(out)
def _get_inline_buttons(self, cid, mid):
return [[
{"text": self.strings["btn_clear"], "callback": self._clear_callback, "args": (cid,)},
{"text": self.strings["btn_regenerate"], "callback": self._regenerate_callback, "args": (mid, cid)}
]]
async def _clear_callback(self, call: InlineCall, cid):
self._clear_history(cid, gauto=False)
await call.edit(self.strings["memory_cleared"], reply_markup=None)
async def _regenerate_callback(self, call: InlineCall, mid, cid):
key = f"{cid}:{mid}"
if key not in self.last_requests: return await call.answer(self.strings["no_last_request"], show_alert=True)
parts, disp = self.last_requests[key]
use_url_context = bool(re.search(r'https?://\S+', disp or ""))
await self._send_to_gemini(mid, parts, regeneration=True, call=call, chat_id_override=cid, display_prompt=disp, use_url_context=use_url_context)
async def _get_recent_chat_text(self, cid, count=None, skip_last=False):
lim = (count or self.config["impersonation_history_limit"]) + (1 if skip_last else 0)
lines = []
try:
msgs = await self.client.get_messages(cid, limit=lim)
if skip_last and msgs: msgs = msgs[1:]
for m in msgs:
if not m or (not m.text and not m.media): continue
name = get_display_name(await m.get_sender()) or "Unknown"
txt = m.text or ("[Media]" if m.media else "")
if m.sticker:
alt = next((a.alt for a in m.sticker.attributes if isinstance(a, DocumentAttributeSticker)), "?")
txt += f" [Стикер: {alt}]"
elif m.photo: txt += " [Фото]"
elif m.document and not hasattr(m.media, "webpage"): txt += " [Файл]"
if txt.strip(): lines.append(f"{name}: {txt.strip()}")
except: pass
return "\n".join(reversed(lines))
def _handle_error(self, e: Exception) -> str:
logger.exception("Gemini execution error")
if isinstance(e, asyncio.TimeoutError):
return self.strings["api_timeout"]
msg = str(e)
if "quota" in msg.lower() or "exhausted" in msg.lower() or "429" in msg:
model = self.config.get("model_name", "unknown")
return (
f"❗️ <b>Превышен лимит Google Gemini API для модели <code>{utils.escape_html(model)}</code>.</b>"
"\n\nЧаще всего это происходит на бесплатном тарифе. Вы можете:\n"
"• Подождать, пока лимит сбросится (обычно раз в сутки).\n"
"• Проверить свой тарифный план в <a href='https://aistudio.google.com/app/billing'>Google AI Studio</a>.\n"
"• Узнать больше о лимитах <a href='https://ai.google.dev/gemini-api/docs/rate-limits'>здесь</a>.\n\n"
f"<b>Детали ошибки:</b>\n<code>{utils.escape_html(msg)}</code>"
)
if "location" in msg.lower() or "not supported" in msg.lower():
return (
'❗️ <b>В данном регионе Gemini API не доступен.</b>\n'
'Скачайте VPN (для пк/тел) или поставьте прокси (платный/бесплатный).\n'
'Или воспользуйтесь инструкцией <a href="https://t.me/SenkoGuardianModules/23">вот тут</a>\n'
'А для тех у кого UserLand инструкция <a href="https://t.me/SenkoGuardianModules/35">тут</a>'
)
if "key" in msg.lower() and "valid" in msg.lower():
return self.strings["invalid_api_key"]
if "blocked" in msg.lower():
return self.strings["blocked_error"].format(utils.escape_html(msg))
if "500" in msg:
return (
"❗️ <b>Ошибка 500 от Google API.</b>\n"
"Это значит, что формат медиа (файл или еще что то) который ты отправил, не поддерживается.\n"
"Такое случается, по такой причине:\n "
"• Если формат файла в принципе не поддерживается Gemini/Гуглом.\n "
"• Временный сбой на серверах Google. Попробуйте повторить запрос позже."
)
return self.strings["api_error"].format(utils.escape_html(msg))
def _markdown_to_html(self, text: str) -> str:
def heading_replacer(match):
level = len(match.group(1))
title = match.group(2).strip()
indent = " " * (level - 1)
return f"{indent}<b>{title}</b>"
text = re.sub(r"^(#+)\s+(.*)", heading_replacer, text, flags=re.MULTILINE)
def list_replacer(match):
indent = match.group(1)
return f"{indent}"
text = re.sub(r"^([ \t]*)[-*+]\s+", list_replacer, text, flags=re.MULTILINE)
md = MarkdownIt("commonmark", {"html": True, "linkify": True})
md.enable("strikethrough")
md.disable("hr")
md.disable("heading")
md.disable("list")
html_text = md.render(text)
def format_code(match):
lang = utils.escape_html(match.group(1).strip())
code = utils.escape_html(match.group(2).strip())
return f'<pre><code class="language-{lang}">{code}</code></pre>' if lang else f'<pre><code>{code}</code></pre>'
html_text = re.sub(r"```(.*?)\n([\s\S]+?)\n```", format_code, html_text)
html_text = re.sub(r"<p>(<pre>[\s\S]*?</pre>)</p>", r"\1", html_text, flags=re.DOTALL)
html_text = html_text.replace("<p>", "").replace("</p>", "\n").strip()
return html_text
def _format_response_with_smart_separation(self, text: str) -> str:
pattern = r"(<pre.*?>[\s\S]*?</pre>)"
parts = re.split(pattern, text, flags=re.DOTALL)
result_parts = []
for i, part in enumerate(parts):
if not part or part.isspace(): continue
if i % 2 == 1:
result_parts.append(part.strip())
else:
stripped_part = part.strip()
if stripped_part:
result_parts.append(f'<blockquote expandable="true">{stripped_part}</blockquote>')
return "\n".join(result_parts)
def _get_inline_buttons(self, chat_id, base_message_id):
return [[
{"text": self.strings["btn_clear"], "callback": self._clear_callback, "args": (chat_id,)},
{"text": self.strings["btn_regenerate"], "callback": self._regenerate_callback, "args": (base_message_id, chat_id)}
]]
async def _safe_del_msg(self, msg, delay=1):
await asyncio.sleep(delay)
try: await self.client.delete_messages(msg.chat_id, msg.id)
except Exception as e: logger.warning(f"Ошибка удаления сообщения: {e}")
async def _clear_callback(self, call: InlineCall, chat_id: int):
self._clear_history(chat_id, gauto=False)
await call.edit(self.strings["memory_cleared"], reply_markup=None)
async def _regenerate_callback(self, call: InlineCall, original_message_id: int, chat_id: int):
key = f"{chat_id}:{original_message_id}"
last_request_tuple = self.last_requests.get(key)
if not last_request_tuple:
return await call.answer(self.strings["no_last_request"], show_alert=True)
last_parts, display_prompt = last_request_tuple
use_url_context = bool(re.search(r'https?://\S+', display_prompt or ""))
await self._send_to_gemini(
message=original_message_id,
parts=last_parts,
regeneration=True,
call=call,
chat_id_override=chat_id,
use_url_context=use_url_context,
display_prompt=display_prompt
)
async def _get_recent_chat_text(self, chat_id: int, count: int = None, skip_last: bool = False) -> str:
history_limit = count or self.config["impersonation_history_limit"]
fetch_limit = history_limit + 1 if skip_last else history_limit
chat_history_lines = []
try:
messages = await self.client.get_messages(chat_id, limit=fetch_limit)
if skip_last and messages:
messages = messages[1:]
for msg in messages:
if not msg: continue
if not msg.text and not msg.sticker and not msg.photo and not (msg.media and not hasattr(msg.media, "webpage")):
continue
sender = await msg.get_sender()
sender_name = get_display_name(sender) if sender else "Unknown"
text_content = msg.text or ""
if msg.sticker and hasattr(msg.sticker, 'attributes'):
alt_text = next((attr.alt for attr in msg.sticker.attributes if isinstance(attr, DocumentAttributeSticker)), None)
text_content += f" [Стикер: {alt_text or '?'}]"
elif msg.photo:
text_content += " [Фото]"
elif msg.document and not hasattr(msg.media, "webpage"):
text_content += " [Файл]"
if text_content.strip():
chat_history_lines.append(f"{sender_name}: {text_content.strip()}")
except Exception as e:
logger.warning(f"Не удалось получить историю для авто-ответа: {e}")
return "\n".join(reversed(chat_history_lines))
def _is_memory_enabled(self, chat_id: str) -> bool: return chat_id not in self.memory_disabled_chats
def _disable_memory(self, chat_id: int): self.memory_disabled_chats.add(str(chat_id))
def _enable_memory(self, chat_id: int): self.memory_disabled_chats.discard(str(chat_id))