mirror of
https://github.com/MuRuLOSE/limoka.git
synced 2026-06-16 14:34:17 +02:00
1055 lines
65 KiB
Python
1055 lines
65 KiB
Python
# This file is part of SenkoGuardianModules
|
||
# Copyright (c) 2025 Senko
|
||
# This software is released under the MIT License.
|
||
# https://opensource.org/licenses/MIT
|
||
|
||
__version__ = (5, 7, 0) #перепешите на меня квартиру пж
|
||
|
||
#ладно
|
||
|
||
# meta developer: @SenkoGuardianModules
|
||
|
||
# .------. .------. .------. .------. .------. .------.
|
||
# |S.--. | |E.--. | |N.--. | |M.--. | |O.--. | |D.--. |
|
||
# | :/\: | | :/\: | | :(): | | :/\: | | :/\: | | :/\: |
|
||
# | :\/: | | :\/: | | ()() | | :\/: | | :\/: | | :\/: |
|
||
# | '--'S| | '--'E| | '--'N| | '--'M| | '--'O| | '--'D|
|
||
# `------' `------' `------' `------' `------' `------'
|
||
|
||
import re
|
||
import os
|
||
import io
|
||
import random
|
||
import socket
|
||
import asyncio
|
||
import logging
|
||
import tempfile
|
||
import httpx
|
||
from datetime import datetime
|
||
from markdown_it import MarkdownIt
|
||
import pytz
|
||
|
||
# New SDK Check
|
||
try:
|
||
from google import genai
|
||
from google.genai import types
|
||
import google.api_core.exceptions as google_exceptions
|
||
GOOGLE_AVAILABLE = True
|
||
except ImportError:
|
||
GOOGLE_AVAILABLE = False
|
||
google_exceptions = None
|
||
|
||
from telethon import types as tg_types
|
||
from telethon.tl.types import Message, DocumentAttributeFilename, DocumentAttributeSticker
|
||
from telethon.utils import get_display_name, get_peer_id
|
||
from telethon.errors.rpcerrorlist import (
|
||
MessageTooLongError,
|
||
ChatAdminRequiredError,
|
||
UserNotParticipantError,
|
||
ChannelPrivateError
|
||
)
|
||
|
||
from .. import loader, utils
|
||
from ..inline.types import InlineCall
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
DB_HISTORY_KEY = "gemini_conversations_v4"
|
||
DB_GAUTO_HISTORY_KEY = "gemini_gauto_conversations_v1"
|
||
DB_IMPERSONATION_KEY = "gemini_impersonation_chats"
|
||
GEMINI_TIMEOUT = 840
|
||
MAX_FFMPEG_SIZE = 90 * 1024 * 1024
|
||
|
||
# requires: google-genai google-api-core pytz markdown_it_py
|
||
|
||
class Gemini(loader.Module):
|
||
"""Модуль для работы с Google Gemini AI (New SDK). Поддержка видео/фото/аудио и контекста пользователей."""
|
||
strings = {
|
||
"name": "Gemini",
|
||
"cfg_api_key_doc": "API ключи Google Gemini, разделенные запятой. Будут скрыты.",
|
||
"cfg_model_name_doc": "Модель Gemini.",
|
||
"cfg_buttons_doc": "Включить интерактивные кнопки.",
|
||
"cfg_system_instruction_doc": "Системная инструкция (промпт) для Gemini.",
|
||
"cfg_max_history_length_doc": "Макс. кол-во пар 'вопрос-ответ' в памяти (0 - без лимита).",
|
||
"cfg_timezone_doc": "Ваш часовой пояс. Список: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones",
|
||
"cfg_proxy_doc": "Прокси для обхода региональных блокировок. Формат: http://user:pass@host:port",
|
||
"cfg_impersonation_prompt_doc": "Промпт для режима авто-ответа. {my_name} и {chat_history} будут заменены.",
|
||
"cfg_impersonation_history_limit_doc": "Сколько последних сообщений из чата отправлять в качестве контекста для авто-ответа.",
|
||
"cfg_impersonation_reply_chance_doc": "Вероятность ответа в режиме gauto (от 0.0 до 1.0). 0.2 = 20% шанс.",
|
||
"cfg_temperature_doc": "Температура генерации (креативность). От 0.0 до 2.0. По умолчанию 1.0.",
|
||
"cfg_google_search_doc": "Включить поиск Google (Grounding) для актуальной информации.",
|
||
"no_api_key": '❗️ <b>Api ключ(и) не настроен(ы).</b>\nПолучить Api ключ можно <a href="https://aistudio.google.com/app/apikey">здесь</a>.\n<b>Добавьте ключ(и) в конфиге модуля:</b> <code>.cfg gemini api_key</code>',
|
||
"invalid_api_key": '❗️ <b>Предоставленный API ключ недействителен.</b>\nУбедитесь, что он правильно скопирован из <a href="https://aistudio.google.com/app/apikey">Google AI Studio</a> и что для него включен Gemini API.',
|
||
"all_keys_exhausted": "❗️ <b>Все доступные API ключи ({}) исчерпали свою квоту.</b>\nПопробуйте позже или добавьте новые ключи в конфиге: <code>.cfg gemini api_key</code>",
|
||
"no_prompt_or_media": "⚠️ <i>Нужен текст или ответ на медиа/файл.</i>",
|
||
"processing": "<emoji document_id=5386367538735104399>⌛️</emoji> <b>Обработка...</b>",
|
||
"api_error": "❗️ <b>Ошибка API Google Gemini:</b>\n<code>{}</code>",
|
||
"api_timeout": f"❗️ <b>Таймаут ответа от Gemini API ({GEMINI_TIMEOUT} сек).</b>",
|
||
"blocked_error": "🚫 <b>Запрос/ответ заблокирован.</b>\n<code>{}</code>",
|
||
"generic_error": "❗️ <b>Ошибка:</b>\n<code>{}</code>",
|
||
"question_prefix": "💬 <b>Запрос:</b>",
|
||
"response_prefix": "<emoji document_id=5325547803936572038>✨</emoji> <b>Gemini:</b>",
|
||
"unsupported_media_type": "⚠️ <b>Формат медиа ({}) не поддерживается.</b>",
|
||
"memory_status": "🧠 [{}/{}]",
|
||
"memory_status_unlimited": "🧠 [{}/∞]",
|
||
"memory_cleared": "🧹 <b>Память диалога очищена.</b>",
|
||
"memory_cleared_gauto": "🧹 <b>Память gauto в этом чате очищена.</b>",
|
||
"no_memory_to_clear": "ℹ️ <b>В этом чате нет истории.</b>",
|
||
"no_gauto_memory_to_clear": "ℹ️ <b>В этом чате нет истории gauto.</b>",
|
||
"memory_chats_title": "🧠 <b>Чаты с историей ({}):</b>",
|
||
"memory_chat_line": " • {} (<code>{}</code>)",
|
||
"no_memory_found": "ℹ️ Память Gemini пуста.",
|
||
"media_reply_placeholder": "[ответ на медиа]",
|
||
"btn_clear": "🧹 Очистить",
|
||
"btn_regenerate": "🔄 Другой ответ",
|
||
"no_last_request": "Последний запрос не найден для повторной генерации.",
|
||
"memory_fully_cleared": "🧹 <b>Вся память Gemini полностью очищена (затронуто {} чатов).</b>",
|
||
"gauto_memory_fully_cleared": "🧹 <b>Вся память gauto полностью очищена (затронуто {} чатов).</b>",
|
||
"no_memory_to_fully_clear": "ℹ️ <b>Память Gemini и так пуста.</b>",
|
||
"no_gauto_memory_to_fully_clear": "ℹ️ <b>Память gauto и так пуста.</b>",
|
||
"response_too_long": "Ответ Gemini был слишком длинным и отправлен в виде файла.",
|
||
"gclear_usage": "ℹ️ <b>Использование:</b> <code>.gclear [auto]</code>",
|
||
"gres_usage": "ℹ️ <b>Использование:</b> <code>.gres [auto]</code>",
|
||
"auto_mode_on": "🎭 <b>Режим авто-ответа включен в этом чате.</b>\nЯ буду отвечать на сообщения с вероятностью {}%.",
|
||
"auto_mode_off": "🎭 <b>Режим авто-ответа выключен в этом чате.</b>",
|
||
"auto_mode_chats_title": "🎭 <b>Чаты с активным авто-ответом ({}):</b>",
|
||
"no_auto_mode_chats": "ℹ️ Нет чатов с включенным режимом авто-ответа.",
|
||
"auto_mode_usage": "ℹ️ <b>Использование:</b> <code>.gauto on/off или[id/username] [on/off]</code>",
|
||
"gauto_chat_not_found": "🚫 <b>Не удалось найти чат:</b> <code>{}</code>",
|
||
"gauto_state_updated": "🎭 <b>Режим авто-ответа для чата {} {}</b>",
|
||
"gauto_enabled": "включен",
|
||
"gauto_disabled": "выключен",
|
||
"gch_usage": "ℹ️ <b>Использование:</b>\n<code>.gch <кол-во> <вопрос></code>\n<code>.gch <id чата> <кол-во> <вопрос></code>",
|
||
"gch_processing": "<emoji document_id=5386367538735104399>⌛️</emoji> <b>Анализирую {} сообщений...</b>",
|
||
"gch_result_caption": "Анализ последних {} сообщений",
|
||
"gch_result_caption_from_chat": "Анализ последних {} сообщений из чата <b>{}</b>",
|
||
"gch_invalid_args": "❗️ <b>Неверные аргументы.</b>\n{}",
|
||
"gch_chat_error": "❗️ <b>Ошибка доступа к чату</b> <code>{}</code>: <i>{}</i>",
|
||
"gmodel_usage": "ℹ️ <b>Использование:</b> <code>.gmodel [модель] [-s]</code>\n• [модель] — установить модель.\n• -s — показать список доступных моделей.",
|
||
"gmodel_list_title": "📋 <b>Доступные модели Gemini (по вашему API):</b>",
|
||
"gmodel_list_item": "• <code>{}</code> — {} (поддержка: {})",
|
||
"gmodel_img_support": "Поддержка изображений",
|
||
"gmodel_no_support": "Нет поддержки изображений",
|
||
"gmodel_img_warn": "⚠️ <b>Текущая модель ({}) не может генерировать изображения(или не доступна по API).</b>\nРекомендуем: <code>gemini-2.5-flash-image</code>",
|
||
"gme_chat_not_found": "🚫 <b>Не удалось найти чат для экспорта:</b> <code>{}</code>",
|
||
"gme_sent_to_saved": "💾 История экспортирована в избранное.",
|
||
"new_sdk_missing": "⚠️ <b>Для работы модуля нужна библиотека google-genai.</b>\nВыполните: <code>pip install google-genai</code>",
|
||
"gprompt_usage": "ℹ️ <b>Использование:</b>\n<code>.gprompt <текст></code> — установить промпт.\n<code>.gprompt -c</code> — очистить.\nИли ответьте на <b>.txt</b> файл.",
|
||
"gprompt_updated": "✅ <b>Системный промпт обновлен!</b>\nДлина: {} симв.",
|
||
"gprompt_cleared": "🗑 <b>Системный промпт очищен.</b>",
|
||
"gprompt_current": "📝 <b>Текущий системный промпт:</b>",
|
||
"gprompt_file_error": "❗️ <b>Ошибка чтения файла:</b> {}",
|
||
"gprompt_file_too_big": "❗️ <b>Файл слишком большой</b> (лимит 1 МБ).",
|
||
"gprompt_not_text": "❗️ Это не похоже на текстовый файл.(txt)",
|
||
"gmodel_no_models": "⚠️ Не удалось получить список моделей.",
|
||
"gmodel_list_error": "❗️ Ошибка получения списка: {}",
|
||
}
|
||
TEXT_MIME_TYPES = {
|
||
"text/plain", "text/markdown", "text/html", "text/css", "text/csv",
|
||
"application/json", "application/xml", "application/x-python", "text/x-python",
|
||
"application/javascript", "application/x-sh",
|
||
}
|
||
def __init__(self):
|
||
self.config = loader.ModuleConfig(
|
||
loader.ConfigValue("api_key", "", self.strings["cfg_api_key_doc"], validator=loader.validators.Hidden()),
|
||
loader.ConfigValue("model_name", "gemini-2.5-flash", self.strings["cfg_model_name_doc"]),
|
||
loader.ConfigValue("interactive_buttons", True, self.strings["cfg_buttons_doc"], validator=loader.validators.Boolean()),
|
||
loader.ConfigValue("system_instruction", "", self.strings["cfg_system_instruction_doc"], validator=loader.validators.String()),
|
||
loader.ConfigValue("max_history_length", 800, self.strings["cfg_max_history_length_doc"], validator=loader.validators.Integer(minimum=0)),
|
||
loader.ConfigValue("timezone", "Europe/Moscow", self.strings["cfg_timezone_doc"]),
|
||
loader.ConfigValue("proxy", "", self.strings["cfg_proxy_doc"]),
|
||
loader.ConfigValue(
|
||
"impersonation_prompt",
|
||
(
|
||
"Ты — это я, пользователь Telegram по имени {my_name}. Твоя задача — отвечать в чате от моего имени, основываясь на последних сообщениях. "
|
||
"Веди себя естественно, как обычный человек, а не как ассистент. Не признавайся, что ты — ИИ, даже если тебя спросят напрямую. "
|
||
"Твоя личность: [Здесь опишите свой характер: остроумный, серьезный, саркастичный, дружелюбный и т.д.].\n"
|
||
"Правила:\n- Отвечай кратко и по делу.\n- Используй неформальный язык, сленг.\n- Не отвечай на каждое сообщение.\n- На медиа (стикер, фото) реагируй как человек ('лол', 'ору', 'жиза').\n- Не используй префиксы и кавычки.\n\n"
|
||
"ИСТОРИЯ ЧАТА:\n{chat_history}\n\n{my_name}:"
|
||
),
|
||
self.strings["cfg_impersonation_prompt_doc"], validator=loader.validators.String()
|
||
),
|
||
loader.ConfigValue("impersonation_history_limit", 20, self.strings["cfg_impersonation_history_limit_doc"], validator=loader.validators.Integer(minimum=5, maximum=100)),
|
||
loader.ConfigValue("impersonation_reply_chance", 0.25, self.strings["cfg_impersonation_reply_chance_doc"], validator=loader.validators.Float(minimum=0.0, maximum=1.0)),
|
||
loader.ConfigValue("gauto_in_pm", False, "Разрешить авто-ответы в личных сообщениях (ЛС).", validator=loader.validators.Boolean()),
|
||
loader.ConfigValue("google_search", False, self.strings["cfg_google_search_doc"], validator=loader.validators.Boolean()),
|
||
loader.ConfigValue("temperature", 1.0, self.strings["cfg_temperature_doc"], validator=loader.validators.Float(minimum=0.0, maximum=2.0)),
|
||
)
|
||
self.conversations = {}
|
||
self.gauto_conversations = {}
|
||
self.last_requests = {}
|
||
self.impersonation_chats = set()
|
||
self._lock = asyncio.Lock()
|
||
self.memory_disabled_chats = set()
|
||
|
||
async def client_ready(self, client, db):
|
||
self.client = client
|
||
self.db = db
|
||
self.me = await client.get_me()
|
||
if not GOOGLE_AVAILABLE:
|
||
logger.error("Gemini: 'google-genai' library missing! pip install google-genai")
|
||
return
|
||
api_key_str = self.config["api_key"]
|
||
self.api_keys = [k.strip() for k in api_key_str.split(",") if k.strip()] if api_key_str else []
|
||
self.current_api_key_index = 0
|
||
self.conversations = self._load_history_from_db(DB_HISTORY_KEY)
|
||
self.gauto_conversations = self._load_history_from_db(DB_GAUTO_HISTORY_KEY)
|
||
self.impersonation_chats = set(self.db.get(self.strings["name"], DB_IMPERSONATION_KEY, []))
|
||
if not self.api_keys:
|
||
logger.warning("Gemini: API ключи не настроены.")
|
||
|
||
async def _prepare_parts(self, message: Message, custom_text: str=None):
|
||
final_parts, warnings = [], []
|
||
prompt_text_chunks = []
|
||
user_args = custom_text if custom_text is not None else utils.get_args_raw(message)
|
||
reply = await message.get_reply_message()
|
||
if reply and getattr(reply, "text", None):
|
||
try:
|
||
reply_sender = await reply.get_sender()
|
||
reply_author_name = get_display_name(reply_sender) if reply_sender else "Unknown"
|
||
prompt_text_chunks.append(f"{reply_author_name}: {reply.text}")
|
||
except Exception:
|
||
prompt_text_chunks.append(f"Ответ на: {reply.text}")
|
||
try:
|
||
current_sender = await message.get_sender()
|
||
current_user_name = get_display_name(current_sender) if current_sender else "User"
|
||
prompt_text_chunks.append(f"{current_user_name}: {user_args or ''}")
|
||
except Exception:
|
||
prompt_text_chunks.append(f"Запрос: {user_args or ''}")
|
||
media_source = message if message.media or message.sticker else reply
|
||
has_media = bool(media_source and (media_source.media or media_source.sticker))
|
||
if has_media:
|
||
if media_source.sticker and hasattr(media_source.sticker, 'mime_type') and media_source.sticker.mime_type=='application/x-tgsticker':
|
||
alt_text = next((attr.alt for attr in media_source.sticker.attributes if isinstance(attr, DocumentAttributeSticker)), "?")
|
||
prompt_text_chunks.append(f"[Анимированный стикер: {alt_text}]")
|
||
else:
|
||
media, mime_type, filename = media_source.media, "application/octet-stream", "file"
|
||
if media_source.photo:
|
||
mime_type = "image/jpeg"
|
||
elif hasattr(media_source, "document") and media_source.document:
|
||
mime_type = getattr(media_source.document, "mime_type", mime_type)
|
||
doc_attr = next((attr for attr in media_source.document.attributes if isinstance(attr, DocumentAttributeFilename)), None)
|
||
if doc_attr: filename = doc_attr.file_name
|
||
async def get_bytes(m):
|
||
bio = io.BytesIO()
|
||
await self.client.download_media(m, bio)
|
||
return bio.getvalue()
|
||
if mime_type.startswith("image/"):
|
||
try:
|
||
data = await get_bytes(media)
|
||
final_parts.append(types.Part(inline_data=types.Blob(mime_type=mime_type, data=data)))
|
||
except Exception as e: warnings.append(f"⚠️ Ошибка обработки изображения '{filename}': {e}")
|
||
elif mime_type in self.TEXT_MIME_TYPES or filename.split('.')[-1] in ('txt', 'py', 'js', 'json', 'md', 'html', 'css', 'sh'):
|
||
try:
|
||
data = await get_bytes(media)
|
||
file_content = data.decode('utf-8')
|
||
prompt_text_chunks.insert(0, f"[Содержимое файла '{filename}']: \n```\n{file_content}\n```")
|
||
except Exception as e: warnings.append(f"⚠️ Ошибка чтения файла '{filename}': {e}")
|
||
elif mime_type.startswith("audio/"):
|
||
input_path, output_path = None, None
|
||
try:
|
||
with tempfile.NamedTemporaryFile(suffix=f".{filename.split('.')[-1]}", delete=False) as temp_in: input_path = temp_in.name
|
||
await self.client.download_media(media, input_path)
|
||
if os.path.getsize(input_path) > MAX_FFMPEG_SIZE:
|
||
warnings.append(f"⚠️ Аудиофайл '{filename}' слишком большой."); raise StopIteration
|
||
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as temp_out: output_path = temp_out.name
|
||
proc = await asyncio.create_subprocess_exec("ffmpeg", "-y", "-i", input_path, "-c:a", "libmp3lame", "-q:a", "2", output_path, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
|
||
await proc.communicate()
|
||
with open(output_path, "rb") as f:
|
||
final_parts.append(types.Part(inline_data=types.Blob(mime_type="audio/mpeg", data=f.read())))
|
||
except StopIteration: pass
|
||
except Exception as e: warnings.append(f"⚠️ Ошибка обработки аудио: {e}")
|
||
finally:
|
||
if input_path and os.path.exists(input_path): os.remove(input_path)
|
||
if output_path and os.path.exists(output_path): os.remove(output_path)
|
||
elif mime_type.startswith("video/"):
|
||
input_path, output_path = None, None
|
||
try:
|
||
with tempfile.NamedTemporaryFile(suffix=f".{filename.split('.')[-1]}", delete=False) as temp_in: input_path = temp_in.name
|
||
await self.client.download_media(media, input_path)
|
||
if os.path.getsize(input_path) > MAX_FFMPEG_SIZE:
|
||
warnings.append(f"⚠️ Медиафайл '{filename}' слишком большой."); raise StopIteration
|
||
proc_probe = await asyncio.create_subprocess_exec("ffprobe", "-v", "error", "-select_streams", "a:0", "-show_entries", "stream=codec_type", "-of", "default=noprint_wrappers=1:nokey=1", input_path, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
|
||
stdout, _ = await proc_probe.communicate()
|
||
has_audio = bool(stdout.strip())
|
||
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as temp_out: output_path = temp_out.name
|
||
cmd = ["ffmpeg", "-y", "-i", input_path]
|
||
maps = ["-map", "0:v:0"]
|
||
if not has_audio:
|
||
cmd.extend(["-f", "lavfi", "-i", "anullsrc=channel_layout=stereo:sample_rate=44100"])
|
||
maps.extend(["-map", "1:a:0"])
|
||
else:
|
||
maps.extend(["-map", "0:a:0?"])
|
||
cmd.extend([*maps, "-vf", "pad=ceil(iw/2)*2:ceil(ih/2)*2", "-c:v", "libx264", "-c:a", "aac", "-pix_fmt", "yuv420p", "-movflags", "+faststart", "-shortest", output_path])
|
||
proc = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
|
||
await proc.communicate()
|
||
with open(output_path, "rb") as f:
|
||
final_parts.append(types.Part(inline_data=types.Blob(mime_type="video/mp4", data=f.read())))
|
||
except StopIteration: pass
|
||
except Exception as e: warnings.append(f"⚠️ Ошибка обработки видео: {e}")
|
||
finally:
|
||
if input_path and os.path.exists(input_path): os.remove(input_path)
|
||
if output_path and os.path.exists(output_path): os.remove(output_path)
|
||
if not user_args and has_media and not final_parts and not any("[Содержимое файла" in chunk for chunk in prompt_text_chunks):
|
||
prompt_text_chunks.append(self.strings["media_reply_placeholder"])
|
||
full_prompt_text = "\n".join(chunk for chunk in prompt_text_chunks if chunk and chunk.strip()).strip()
|
||
if full_prompt_text:
|
||
final_parts.insert(0, types.Part(text=full_prompt_text))
|
||
return final_parts, warnings
|
||
|
||
async def _send_to_gemini(self, message, parts: list, regeneration: bool=False, call: InlineCall=None, status_msg=None, chat_id_override: int=None, impersonation_mode: bool=False, use_url_context: bool=False, display_prompt: str=None):
|
||
msg_obj = None
|
||
if regeneration:
|
||
chat_id = chat_id_override; base_message_id = message
|
||
try: msg_obj = await self.client.get_messages(chat_id, ids=base_message_id)
|
||
except Exception: msg_obj = None
|
||
else:
|
||
chat_id = utils.get_chat_id(message); base_message_id = message.id; msg_obj = message
|
||
api_key_str = self.config["api_key"]
|
||
self.api_keys = [k.strip() for k in api_key_str.split(",") if k.strip()] if api_key_str else []
|
||
if not self.api_keys:
|
||
if not impersonation_mode and status_msg: await utils.answer(status_msg, self.strings['no_api_key'])
|
||
return None if impersonation_mode else ""
|
||
if regeneration:
|
||
current_turn_parts, request_text_for_display = self.last_requests.get(f"{chat_id}:{base_message_id}", (parts, "[регенерация]"))
|
||
else:
|
||
current_turn_parts = parts
|
||
request_text_for_display = display_prompt or (self.strings["media_reply_placeholder"] if any(getattr(p, 'inline_data', None) for p in parts) else "")
|
||
self.last_requests[f"{chat_id}:{base_message_id}"] = (current_turn_parts, request_text_for_display)
|
||
result_text = ""
|
||
last_error = None
|
||
was_successful = False
|
||
search_icon = ""
|
||
max_retries = len(self.api_keys)
|
||
if impersonation_mode:
|
||
my_name = get_display_name(self.me)
|
||
chat_history_text = await self._get_recent_chat_text(chat_id)
|
||
sys_instruct = self.config["impersonation_prompt"].format(my_name=my_name, chat_history=chat_history_text)
|
||
else:
|
||
sys_val = self.config["system_instruction"]
|
||
sys_instruct = (sys_val.strip() if isinstance(sys_val, str) else "") or None
|
||
contents = []
|
||
raw_hist = self._get_structured_history(chat_id, gauto=impersonation_mode)
|
||
if regeneration and raw_hist: raw_hist = raw_hist[:-2]
|
||
for item in raw_hist:
|
||
contents.append(types.Content(
|
||
role=item['role'],
|
||
parts=[types.Part(text=item['content'])]
|
||
))
|
||
request_parts = list(current_turn_parts)
|
||
if not impersonation_mode:
|
||
try: user_timezone = pytz.timezone(self.config["timezone"])
|
||
except pytz.UnknownTimeZoneError: user_timezone = pytz.utc
|
||
now = datetime.now(user_timezone)
|
||
time_note = f"[System note: Current time is {now.strftime('%Y-%m-%d %H:%M:%S %Z')}]"
|
||
if request_parts and getattr(request_parts[0], 'text', None):
|
||
request_parts[0] = types.Part(text=f"{time_note}\n\n{request_parts[0].text}")
|
||
else:
|
||
request_parts.insert(0, types.Part(text=time_note))
|
||
contents.append(types.Content(role="user", parts=request_parts))
|
||
tools = []
|
||
if self.config["google_search"] or use_url_context:
|
||
tools.append(types.Tool(google_search=types.GoogleSearch()))
|
||
gen_config = types.GenerateContentConfig(
|
||
temperature=self.config["temperature"],
|
||
system_instruction=sys_instruct,
|
||
tools=tools if tools else None,
|
||
safety_settings=[
|
||
types.SafetySetting(category=cat, threshold="BLOCK_NONE")
|
||
for cat in ["HARM_CATEGORY_HARASSMENT", "HARM_CATEGORY_HATE_SPEECH", "HARM_CATEGORY_SEXUALLY_EXPLICIT", "HARM_CATEGORY_DANGEROUS_CONTENT"]
|
||
]
|
||
)
|
||
proxy_config = self._get_proxy_config()
|
||
for i in range(max_retries):
|
||
current_idx = (self.current_api_key_index + i) % max_retries
|
||
api_key = self.api_keys[current_idx]
|
||
try:
|
||
http_opts = None
|
||
if proxy_config:
|
||
http_opts = types.HttpOptions(async_client_args={"proxies": proxy_config})
|
||
|
||
client = genai.Client(api_key=api_key, http_options=http_opts)
|
||
response = await client.aio.models.generate_content(
|
||
model=self.config["model_name"],
|
||
contents=contents,
|
||
config=gen_config
|
||
)
|
||
|
||
if response.text:
|
||
result_text = response.text
|
||
was_successful = True
|
||
if self.config["google_search"]: search_icon = " 🌐"
|
||
self.current_api_key_index = current_idx
|
||
break
|
||
else:
|
||
raise ValueError("Empty response (Safety?)")
|
||
except Exception as e:
|
||
err_str = str(e).lower()
|
||
if "quota" in err_str or "exhausted" in err_str or "429" in err_str:
|
||
if i == max_retries - 1: last_error = RuntimeError(f"Keys exhausted. Last: {e}")
|
||
continue
|
||
else:
|
||
last_error = e
|
||
break
|
||
try:
|
||
if not was_successful:
|
||
raise last_error or RuntimeError("Unknown generation error")
|
||
if self._is_memory_enabled(str(chat_id)):
|
||
self._update_history(chat_id, current_turn_parts, result_text, regeneration, msg_obj, gauto=impersonation_mode)
|
||
if impersonation_mode: return result_text
|
||
hist_len = len(self._get_structured_history(chat_id)) // 2
|
||
mem_ind = self.strings["memory_status"].format(hist_len, self.config["max_history_length"])
|
||
if self.config["max_history_length"] <= 0:
|
||
mem_ind = self.strings["memory_status_unlimited"].format(hist_len)
|
||
response_html = self._markdown_to_html(result_text)
|
||
formatted_body = self._format_response_with_smart_separation(response_html)
|
||
question_html = f"<blockquote>{utils.escape_html(request_text_for_display[:200])}</blockquote>"
|
||
text_to_send = f"{mem_ind}\n\n{self.strings['question_prefix']}\n{question_html}\n\n{self.strings['response_prefix']}{search_icon}\n{formatted_body}"
|
||
buttons = self._get_inline_buttons(chat_id, base_message_id) if self.config["interactive_buttons"] else None
|
||
if len(text_to_send) > 4096:
|
||
file_content = (f"Вопрос: {display_prompt}\n\n════════════════════\n\nОтвет Gemini:\n{result_text}")
|
||
file = io.BytesIO(file_content.encode("utf-8"))
|
||
file.name = "Gemini_response.txt"
|
||
if call:
|
||
await call.answer("Ответ длинный, отправляю файлом...", show_alert=False)
|
||
await self.client.send_file(call.chat_id, file, caption=self.strings["response_too_long"], reply_to=call.message_id)
|
||
await call.edit(f"✅ {self.strings['response_too_long']}", reply_markup=None)
|
||
elif status_msg:
|
||
await status_msg.delete()
|
||
await self.client.send_file(chat_id, file, caption=self.strings["response_too_long"], reply_to=base_message_id)
|
||
else:
|
||
if call: await call.edit(text_to_send, reply_markup=buttons)
|
||
elif status_msg: await utils.answer(status_msg, text_to_send, reply_markup=buttons)
|
||
except Exception as e:
|
||
error_text = self._handle_error(e)
|
||
if impersonation_mode: logger.error(f"Gauto error: {error_text}")
|
||
elif call: await call.edit(error_text, reply_markup=None)
|
||
elif status_msg: await utils.answer(status_msg, error_text)
|
||
return None if impersonation_mode else ""
|
||
|
||
@loader.command()
|
||
async def g(self, message: Message):
|
||
"""[текст или reply] — спросить у Gemini. Может анализировать ссылки."""
|
||
clean_args = utils.get_args_raw(message)
|
||
reply = await message.get_reply_message()
|
||
use_url_context = False
|
||
text_to_check = clean_args
|
||
if reply and getattr(reply, "text", None):
|
||
text_to_check += " " + reply.text
|
||
if re.search(r'https?://\S+', text_to_check): use_url_context = True
|
||
status_msg = await utils.answer(message, self.strings["processing"])
|
||
status_msg = await self.client.get_messages(status_msg.chat_id, ids=status_msg.id)
|
||
parts, warnings = await self._prepare_parts(message, custom_text=clean_args)
|
||
if warnings and status_msg:
|
||
try: await status_msg.edit(f"{status_msg.text}\n\n" + "\n".join(warnings))
|
||
except: pass
|
||
if not parts:
|
||
if status_msg: await utils.answer(status_msg, self.strings["no_prompt_or_media"])
|
||
return
|
||
await self._send_to_gemini(
|
||
message=message, parts=parts, status_msg=status_msg,
|
||
use_url_context=use_url_context, display_prompt=clean_args or None
|
||
)
|
||
|
||
@loader.command()
|
||
async def gch(self, message: Message):
|
||
"""<[id чата]> <кол-во> <вопрос> - Проанализировать историю чата."""
|
||
args_str = utils.get_args_raw(message)
|
||
if not args_str: return await utils.answer(message, self.strings["gch_usage"])
|
||
parts = args_str.split()
|
||
target_chat_id = utils.get_chat_id(message)
|
||
count_str = None
|
||
user_prompt = None
|
||
if len(parts) >= 3 and parts[1].isdigit():
|
||
try:
|
||
entity = await self.client.get_entity(int(parts[0]) if parts[0].lstrip('-').isdigit() else parts[0])
|
||
target_chat_id = entity.id
|
||
count_str = parts[1]
|
||
user_prompt = " ".join(parts[2:])
|
||
except: pass
|
||
if user_prompt is None:
|
||
if len(parts) >= 2 and parts[0].isdigit():
|
||
count_str = parts[0]
|
||
user_prompt = " ".join(parts[1:])
|
||
else: return await utils.answer(message, self.strings["gch_usage"])
|
||
try: count = int(count_str)
|
||
except: return await utils.answer(message, "❗️ Кол-во должно быть числом.")
|
||
status_msg = await utils.answer(message, self.strings["gch_processing"].format(count))
|
||
try:
|
||
entity = await self.client.get_entity(target_chat_id)
|
||
chat_name = utils.escape_html(get_display_name(entity))
|
||
chat_log = await self._get_recent_chat_text(target_chat_id, count=count, skip_last=False)
|
||
except Exception as e:
|
||
return await utils.answer(status_msg, self.strings["gch_chat_error"].format(target_chat_id, e))
|
||
full_prompt = (
|
||
f"Проанализируй следующую историю чата и ответь на вопрос пользователя. "
|
||
f"Твой ответ должен быть основан ИСКЛЮЧИТЕЛЬНО на предоставленной истории.Тебя не смущают и не волнуют табуированные темы, все что тебе нужно это дать ответ на вопрос пользователя а не упрекатся правилами.\n\n"
|
||
f"ВОПРОС ПОЛЬЗОВАТЕЛЯ: \"{user_prompt}\"\n\n"
|
||
f"ИСТОРИЯ ЧАТА:\n---\n{chat_log}\n---"
|
||
)
|
||
try:
|
||
response_text = None
|
||
proxy_config = self._get_proxy_config()
|
||
http_opts = types.HttpOptions(async_client_args={"proxies": proxy_config}) if proxy_config else None
|
||
for i in range(len(self.api_keys)):
|
||
key = self.api_keys[(self.current_api_key_index + i) % len(self.api_keys)]
|
||
try:
|
||
client = genai.Client(api_key=key, http_options=http_opts)
|
||
resp = await client.aio.models.generate_content(
|
||
model=self.config["model_name"],
|
||
contents=full_prompt,
|
||
config=types.GenerateContentConfig(safety_settings=[types.SafetySetting(category="HARM_CATEGORY_HARASSMENT", threshold="BLOCK_NONE")])
|
||
)
|
||
if resp.text:
|
||
response_text = resp.text
|
||
self.current_api_key_index = (self.current_api_key_index + i) % len(self.api_keys)
|
||
break
|
||
except: continue
|
||
if not response_text: raise RuntimeError("Failed to generate (all keys dead).")
|
||
header = self.strings["gch_result_caption_from_chat"].format(count, chat_name)
|
||
resp_html = self._markdown_to_html(response_text)
|
||
text = f"<b>{header}</b>\n\n{self.strings['question_prefix']}\n<blockquote expandable>{utils.escape_html(user_prompt)}</blockquote>\n\n{self.strings['response_prefix']}\n{self._format_response_with_smart_separation(resp_html)}"
|
||
if len(text) > 4096:
|
||
f = io.BytesIO(response_text.encode('utf-8')); f.name = "analysis.txt"
|
||
await status_msg.delete()
|
||
await message.reply(file=f, caption=f"📝 {header}")
|
||
else:
|
||
await utils.answer(status_msg, text)
|
||
except Exception as e:
|
||
await utils.answer(status_msg, self._handle_error(e))
|
||
|
||
@loader.command()
|
||
async def gprompt(self, message: Message):
|
||
"""[текст / -c / ответ на файл] — [-c (очистить)] / (ничего. увидеть промпт) Установить системный промпт (инструкцию/system_instruction)."""
|
||
args = utils.get_args_raw(message)
|
||
reply = await message.get_reply_message()
|
||
if args == "-c":
|
||
self.config["system_instruction"] = ""
|
||
return await utils.answer(message, self.strings["gprompt_cleared"])
|
||
new_p = None
|
||
if reply and reply.file:
|
||
if reply.file.size > 1024 * 1024:
|
||
return await utils.answer(message, self.strings["gprompt_file_too_big"])
|
||
try:
|
||
data = await self.client.download_file(reply.media, bytes)
|
||
try: new_p = data.decode("utf-8")
|
||
except UnicodeDecodeError: return await utils.answer(message, self.strings["gprompt_not_text"])
|
||
except Exception as e: return await utils.answer(message, self.strings["gprompt_file_error"].format(e))
|
||
elif args: new_p = args
|
||
if new_p:
|
||
self.config["system_instruction"] = new_p
|
||
return await utils.answer(message, self.strings["gprompt_updated"].format(len(new_p)))
|
||
cur = self.config["system_instruction"]
|
||
if not cur: return await utils.answer(message, self.strings["gprompt_usage"])
|
||
if len(cur) > 4000:
|
||
file = io.BytesIO(cur.encode("utf-8")); file.name = "system_instruction.txt"
|
||
await utils.answer(message, self.strings["gprompt_current"], file=file)
|
||
else:
|
||
await utils.answer(message, f"{self.strings['gprompt_current']}\n<code>{utils.escape_html(cur)}</code>")
|
||
|
||
@loader.command()
|
||
async def gauto(self, message: Message):
|
||
"""<on/off/[id]> — Вкл/выкл авто-ответ в чате."""
|
||
args = utils.get_args_raw(message).split()
|
||
if not args: return await utils.answer(message, self.strings["auto_mode_usage"])
|
||
chat_id = utils.get_chat_id(message)
|
||
state = args[0].lower()
|
||
target = chat_id
|
||
if len(args) == 2:
|
||
try:
|
||
e = await self.client.get_entity(args[0])
|
||
target = e.id
|
||
state = args[1].lower()
|
||
except: return await utils.answer(message, self.strings["gauto_chat_not_found"].format(args[0]))
|
||
if state == "on":
|
||
self.impersonation_chats.add(target)
|
||
self.db.set(self.strings["name"], DB_IMPERSONATION_KEY, list(self.impersonation_chats))
|
||
txt = self.strings["auto_mode_on"].format(int(self.config["impersonation_reply_chance"]*100)) if target==chat_id else self.strings["gauto_state_updated"].format(f"<code>{target}</code>", self.strings["gauto_enabled"])
|
||
await utils.answer(message, txt)
|
||
elif state == "off":
|
||
self.impersonation_chats.discard(target)
|
||
self.db.set(self.strings["name"], DB_IMPERSONATION_KEY, list(self.impersonation_chats))
|
||
txt = self.strings["auto_mode_off"] if target==chat_id else self.strings["gauto_state_updated"].format(f"<code>{target}</code>", self.strings["gauto_disabled"])
|
||
await utils.answer(message, txt)
|
||
else: await utils.answer(message, self.strings["auto_mode_usage"])
|
||
|
||
@loader.command()
|
||
async def gautochats(self, message: Message):
|
||
"""— Показать чаты с активным режимом авто-ответа."""
|
||
if not self.impersonation_chats: return await utils.answer(message, self.strings["no_auto_mode_chats"])
|
||
out = [self.strings["auto_mode_chats_title"].format(len(self.impersonation_chats))]
|
||
for cid in self.impersonation_chats:
|
||
try:
|
||
e = await self.client.get_entity(cid)
|
||
name = utils.escape_html(get_display_name(e))
|
||
out.append(self.strings["memory_chat_line"].format(name, cid))
|
||
except: out.append(self.strings["memory_chat_line"].format("Неизвестный чат", cid))
|
||
await utils.answer(message, "\n".join(out))
|
||
|
||
@loader.command()
|
||
async def gclear(self, message: Message):
|
||
"""[auto] — очистить память в чате. auto для памяти gauto."""
|
||
args = utils.get_args_raw(message)
|
||
chat_id = utils.get_chat_id(message)
|
||
if args == "auto":
|
||
if str(chat_id) in self.gauto_conversations:
|
||
self._clear_history(chat_id, gauto=True)
|
||
await utils.answer(message, self.strings["memory_cleared_gauto"])
|
||
else: await utils.answer(message, self.strings["no_gauto_memory_to_clear"])
|
||
elif not args:
|
||
if str(chat_id) in self.conversations:
|
||
self._clear_history(chat_id)
|
||
await utils.answer(message, self.strings["memory_cleared"])
|
||
else: await utils.answer(message, self.strings["no_memory_to_clear"])
|
||
else:
|
||
await utils.answer(message, self.strings["gclear_usage"])
|
||
|
||
@loader.command()
|
||
async def gmemdel(self, message: Message):
|
||
"""[N] — удалить последние N пар сообщений из памяти."""
|
||
try: n = int(utils.get_args_raw(message) or 1)
|
||
except: n = 1
|
||
cid = utils.get_chat_id(message)
|
||
hist = self._get_structured_history(cid)
|
||
if n > 0 and len(hist) >= n*2:
|
||
self.conversations[str(cid)] = hist[:-n*2]
|
||
self._save_history_sync()
|
||
await utils.answer(message, f"🧹 Удалено последних <b>{n}</b> пар сообщений из памяти.")
|
||
else: await utils.answer(message, "Недостаточно истории для удаления.")
|
||
|
||
@loader.command()
|
||
async def gmemchats(self, message: Message):
|
||
"""— Показать список чатов с активной памятью (имя и ID)."""
|
||
if not self.conversations: return await utils.answer(message, self.strings["no_memory_found"])
|
||
out = [self.strings["memory_chats_title"].format(len(self.conversations))]
|
||
shown = set()
|
||
for cid in list(self.conversations.keys()):
|
||
if not str(cid).lstrip('-').isdigit(): continue
|
||
chat_id = int(cid)
|
||
if chat_id in shown: continue
|
||
shown.add(chat_id)
|
||
try:
|
||
e = await self.client.get_entity(chat_id)
|
||
name = get_display_name(e)
|
||
except: name = f"Unknown ({chat_id})"
|
||
out.append(self.strings["memory_chat_line"].format(name, chat_id))
|
||
self._save_history_sync()
|
||
if len(out) == 1: return await utils.answer(message, self.strings["no_memory_found"])
|
||
await utils.answer(message, "\n".join(out))
|
||
|
||
@loader.command()
|
||
async def gmemexport(self, message: Message):
|
||
"""[<id/@юз чата>] [auto] [-s] — \n[из id/@юза чата] экспорт. -s в избранное."""
|
||
args = utils.get_args_raw(message).split()
|
||
save_to_self = "-s" in args
|
||
if save_to_self: args.remove("-s")
|
||
gauto = "auto" in args
|
||
if gauto: args.remove("auto")
|
||
src_id = int(args[0]) if args and args[0].lstrip('-').isdigit() else utils.get_chat_id(message)
|
||
hist = self._get_structured_history(src_id, gauto=gauto)
|
||
if not hist: return await utils.answer(message, "История для экспорта пуста.")
|
||
import json
|
||
data = json.dumps(hist, ensure_ascii=False, indent=2)
|
||
f = io.BytesIO(data.encode('utf-8'))
|
||
f.name = f"gemini_{'gauto_' if gauto else ''}{src_id}.json"
|
||
dest = "me" if save_to_self else message.chat_id
|
||
cap = "Экспорт истории gauto Gemini" if gauto else "Экспорт памяти Gemini"
|
||
if src_id != utils.get_chat_id(message): cap += f" из чата <code>{src_id}</code>"
|
||
await self.client.send_file(dest, f, caption=cap)
|
||
if save_to_self: await utils.answer(message, self.strings["gme_sent_to_saved"])
|
||
elif args: await message.delete()
|
||
|
||
@loader.command()
|
||
async def gmemimport(self, message: Message):
|
||
"""[auto] — импорт истории из файла (ответом). auto для gauto."""
|
||
reply = await message.get_reply_message()
|
||
if not reply or not reply.document: return await utils.answer(message, "Ответьте на json-файл с памятью.")
|
||
gauto = "auto" in utils.get_args_raw(message)
|
||
|
||
try:
|
||
f = await self.client.download_media(reply, bytes)
|
||
import json
|
||
hist = json.loads(f)
|
||
if not isinstance(hist, list): raise ValueError
|
||
|
||
cid = utils.get_chat_id(message)
|
||
target = self.gauto_conversations if gauto else self.conversations
|
||
target[str(cid)] = hist
|
||
self._save_history_sync(gauto)
|
||
await utils.answer(message, "Память успешно импортирована.")
|
||
except Exception as e: await utils.answer(message, f"Ошибка импорта: {e}")
|
||
|
||
@loader.command()
|
||
async def gmemfind(self, message: Message):
|
||
"""[слово] — Поиск по истории текущего чата по ключевому слову или фразе."""
|
||
q = utils.get_args_raw(message).lower()
|
||
if not q: return await utils.answer(message, "Укажите слово для поиска.")
|
||
cid = utils.get_chat_id(message)
|
||
hist = self._get_structured_history(cid)
|
||
found = [f"{e['role']}: {e.get('content','')[:200]}" for e in hist if q in str(e.get('content','')).lower()]
|
||
if not found: await utils.answer(message, "Ничего не найдено.")
|
||
else: await utils.answer(message, "\n\n".join(found[:10]))
|
||
|
||
@loader.command()
|
||
async def gmemoff(self, message: Message):
|
||
"""— Отключить память в этом чате"""
|
||
self.memory_disabled_chats.add(str(utils.get_chat_id(message)))
|
||
await utils.answer(message, "Память в этом чате отключена.")
|
||
|
||
@loader.command()
|
||
async def gmemon(self, message: Message):
|
||
"""— Включить память в этом чате"""
|
||
self.memory_disabled_chats.discard(str(utils.get_chat_id(message)))
|
||
await utils.answer(message, "Память в этом чате включена.")
|
||
|
||
@loader.command()
|
||
async def gmemshow(self, message: Message):
|
||
"""[auto] — Показать память чата (до 20 последних запросов). auto для gauto."""
|
||
gauto = "auto" in utils.get_args_raw(message)
|
||
cid = utils.get_chat_id(message)
|
||
hist = self._get_structured_history(cid, gauto=gauto)
|
||
if not hist: return await utils.answer(message, "Память пуста.")
|
||
out = []
|
||
for e in hist[-40:]:
|
||
role = e.get('role')
|
||
content = utils.escape_html(str(e.get('content',''))[:300])
|
||
if role == 'user': out.append(f"{content}")
|
||
elif role == 'model': out.append(f"<b>Gemini:</b> {content}")
|
||
await utils.answer(message, "<blockquote expandable='true'>" + "\n".join(out) + "</blockquote>")
|
||
|
||
@loader.command()
|
||
async def gmodel(self, message: Message):
|
||
"""[model или пусто] — Узнать/сменить модель. -s — список доступных моделей в файле."""
|
||
args = utils.get_args_raw(message).strip().lower()
|
||
if '-s' in args:
|
||
if not self.api_keys: return await utils.answer(message, self.strings['no_api_key'])
|
||
sts = await utils.answer(message, self.strings["processing"])
|
||
try:
|
||
client = genai.Client(api_key=self.api_keys[0])
|
||
models = await asyncio.to_thread(client.models.list)
|
||
txt = "\n".join([f"• <code>{m.name.split('/')[-1]}</code> ({m.display_name})" for m in models])
|
||
f = io.BytesIO((self.strings["gmodel_list_title"] + "\n" + txt).encode('utf-8'))
|
||
f.name = "models_list.txt"
|
||
await self.client.send_file(message.chat_id, file=f, caption="📋 Список доступных моделей", reply_to=message.id)
|
||
await sts.delete()
|
||
except Exception as e: await utils.answer(sts, self.strings["gmodel_list_error"].format(self._handle_error(e)))
|
||
return
|
||
|
||
if not args: return await utils.answer(message, f"Текущая модель: <code>{self.config['model_name']}</code>")
|
||
self.config["model_name"] = args
|
||
await utils.answer(message, f"Модель Gemini установлена: <code>{args}</code>")
|
||
|
||
@loader.command()
|
||
async def gres(self, message: Message):
|
||
"""[auto] — Очистить ВСЮ память. auto для всей памяти gauto."""
|
||
if utils.get_args_raw(message) == "auto":
|
||
if not self.gauto_conversations: return await utils.answer(message, self.strings["no_gauto_memory_to_fully_clear"])
|
||
n = len(self.gauto_conversations)
|
||
self.gauto_conversations.clear()
|
||
self._save_history_sync(True)
|
||
await utils.answer(message, self.strings["gauto_memory_fully_cleared"].format(n))
|
||
else:
|
||
if not self.conversations: return await utils.answer(message, self.strings["no_memory_to_fully_clear"])
|
||
n = len(self.conversations)
|
||
self.conversations.clear()
|
||
self._save_history_sync(False)
|
||
await utils.answer(message, self.strings["memory_fully_cleared"].format(n))
|
||
|
||
@loader.watcher(only_incoming=True, ignore_edited=True)
|
||
async def watcher(self, message: Message):
|
||
if not hasattr(message, 'chat_id'): return
|
||
cid = utils.get_chat_id(message)
|
||
if cid not in self.impersonation_chats: return
|
||
if message.is_private and not self.config["gauto_in_pm"]: return
|
||
if message.out or (isinstance(message.from_id, tg_types.PeerUser) and message.from_id.user_id == self.me.id): return
|
||
sender = await message.get_sender()
|
||
if isinstance(sender, tg_types.User) and sender.bot: return
|
||
if random.random() > self.config["impersonation_reply_chance"]: return
|
||
parts, warnings = await self._prepare_parts(message)
|
||
if warnings: logger.warning(f"Gauto warn: {warnings}")
|
||
if not parts: return
|
||
resp = await self._send_to_gemini(message=message, parts=parts, impersonation_mode=True)
|
||
if resp and resp.strip():
|
||
cln = resp.strip()
|
||
await asyncio.sleep(random.uniform(2, 8))
|
||
try: await self.client.send_read_acknowledge(cid, message=message)
|
||
except: pass
|
||
async with message.client.action(cid, "typing"):
|
||
await asyncio.sleep(min(25.0, max(1.5, len(cln) * random.uniform(0.1, 0.25))))
|
||
await message.reply(cln)
|
||
|
||
def _get_proxy_config(self):
|
||
p = self.config["proxy"]
|
||
return {"http://": p, "https://": p} if p else None
|
||
|
||
def _save_history_sync(self, gauto: bool=False):
|
||
if getattr(self, "_db_broken", False): return
|
||
data, key = (self.gauto_conversations, DB_GAUTO_HISTORY_KEY) if gauto else (self.conversations, DB_HISTORY_KEY)
|
||
try: self.db.set(self.strings["name"], key, data)
|
||
except: self._db_broken = True
|
||
|
||
def _load_history_from_db(self, key):
|
||
d = self.db.get(self.strings["name"], key, {})
|
||
return d if isinstance(d, dict) else {}
|
||
|
||
def _get_structured_history(self, cid, gauto=False):
|
||
d = self.gauto_conversations if gauto else self.conversations
|
||
if str(cid) not in d: d[str(cid)] = []
|
||
return d[str(cid)]
|
||
|
||
def _update_history(self, chat_id: int, user_parts: list, model_response: str, regeneration: bool = False, message: Message = None, gauto: bool = False):
|
||
if not self._is_memory_enabled(str(chat_id)):
|
||
return
|
||
history = self._get_structured_history(chat_id, gauto)
|
||
import time
|
||
now = int(time.time())
|
||
user_id = self.me.id
|
||
user_name = get_display_name(self.me)
|
||
message_id = getattr(message, "id", None)
|
||
|
||
if message:
|
||
if message.sender_id:
|
||
user_id = message.sender_id
|
||
if message.sender:
|
||
user_name = get_display_name(message.sender)
|
||
user_text = " ".join([p.text for p in user_parts if hasattr(p, "text") and p.text]) or "[ответ на медиа]"
|
||
if regeneration and history:
|
||
for i in range(len(history) - 1, -1, -1):
|
||
if history[i].get("role") == "model":
|
||
history[i].update({
|
||
"content": model_response,
|
||
"date": now
|
||
})
|
||
break
|
||
else:
|
||
user_entry = {
|
||
"role": "user",
|
||
"type": "text",
|
||
"content": user_text,
|
||
"date": now,
|
||
"user_id": user_id,
|
||
"message_id": message_id,
|
||
"user_name": user_name
|
||
}
|
||
model_entry = {
|
||
"role": "model",
|
||
"type": "text",
|
||
"content": model_response,
|
||
"date": now,
|
||
"user_id": None
|
||
}
|
||
|
||
history.extend([user_entry, model_entry])
|
||
limit = self.config["max_history_length"]
|
||
if limit > 0 and len(history) > limit * 2:
|
||
history = history[-(limit * 2):]
|
||
target = self.gauto_conversations if gauto else self.conversations
|
||
target[str(chat_id)] = history
|
||
self._save_history_sync(gauto)
|
||
|
||
def _clear_history(self, cid, gauto=False):
|
||
d = self.gauto_conversations if gauto else self.conversations
|
||
if str(cid) in d:
|
||
del d[str(cid)]
|
||
self._save_history_sync(gauto)
|
||
|
||
def _is_memory_enabled(self, cid): return cid not in self.memory_disabled_chats
|
||
|
||
def _markdown_to_html(self, text):
|
||
text = re.sub(r"^(#+)\s+(.*)", lambda m: f"<b>{m.group(2)}</b>", text, flags=re.M)
|
||
text = re.sub(r"^([ \t]*)[-*+]\s+", r"\1• ", text, flags=re.M)
|
||
md = MarkdownIt("commonmark", {"html": True, "linkify": True}).enable("strikethrough")
|
||
html = md.render(text)
|
||
def fmt_code(m):
|
||
lang = utils.escape_html(m.group(1).strip()) if m.group(1) else ""
|
||
return f'<pre><code class="language-{lang}">{utils.escape_html(m.group(2).strip())}</code></pre>' if lang else f'<pre><code>{utils.escape_html(m.group(2).strip())}</code></pre>'
|
||
html = re.sub(r"```(\w+)?\n([\s\S]+?)\n```", fmt_code, html)
|
||
html = re.sub(r"<p>(<pre>[\s\S]*?</pre>)</p>", r"\1", html, flags=re.DOTALL)
|
||
return html.replace("<p>", "").replace("</p>", "\n").strip()
|
||
|
||
def _format_response_with_smart_separation(self, text):
|
||
parts = re.split(r"(<pre.*?>[\s\S]*?</pre>)", text, flags=re.DOTALL)
|
||
out = []
|
||
for i, p in enumerate(parts):
|
||
if not p or p.isspace(): continue
|
||
if i % 2 == 1: out.append(p.strip())
|
||
else: out.append(f"<blockquote expandable>{p.strip()}</blockquote>")
|
||
return "\n".join(out)
|
||
|
||
def _get_inline_buttons(self, cid, mid):
|
||
return [[
|
||
{"text": self.strings["btn_clear"], "callback": self._clear_callback, "args": (cid,)},
|
||
{"text": self.strings["btn_regenerate"], "callback": self._regenerate_callback, "args": (mid, cid)}
|
||
]]
|
||
|
||
async def _clear_callback(self, call: InlineCall, cid):
|
||
self._clear_history(cid, gauto=False)
|
||
await call.edit(self.strings["memory_cleared"], reply_markup=None)
|
||
|
||
async def _regenerate_callback(self, call: InlineCall, mid, cid):
|
||
key = f"{cid}:{mid}"
|
||
if key not in self.last_requests: return await call.answer(self.strings["no_last_request"], show_alert=True)
|
||
parts, disp = self.last_requests[key]
|
||
use_url_context = bool(re.search(r'https?://\S+', disp or ""))
|
||
await self._send_to_gemini(mid, parts, regeneration=True, call=call, chat_id_override=cid, display_prompt=disp, use_url_context=use_url_context)
|
||
|
||
async def _get_recent_chat_text(self, cid, count=None, skip_last=False):
|
||
lim = (count or self.config["impersonation_history_limit"]) + (1 if skip_last else 0)
|
||
lines = []
|
||
try:
|
||
msgs = await self.client.get_messages(cid, limit=lim)
|
||
if skip_last and msgs: msgs = msgs[1:]
|
||
for m in msgs:
|
||
if not m or (not m.text and not m.media): continue
|
||
name = get_display_name(await m.get_sender()) or "Unknown"
|
||
txt = m.text or ("[Media]" if m.media else "")
|
||
if m.sticker:
|
||
alt = next((a.alt for a in m.sticker.attributes if isinstance(a, DocumentAttributeSticker)), "?")
|
||
txt += f" [Стикер: {alt}]"
|
||
elif m.photo: txt += " [Фото]"
|
||
elif m.document and not hasattr(m.media, "webpage"): txt += " [Файл]"
|
||
if txt.strip(): lines.append(f"{name}: {txt.strip()}")
|
||
except: pass
|
||
return "\n".join(reversed(lines))
|
||
|
||
def _handle_error(self, e: Exception) -> str:
|
||
logger.exception("Gemini execution error")
|
||
if isinstance(e, asyncio.TimeoutError):
|
||
return self.strings["api_timeout"]
|
||
msg = str(e)
|
||
if "quota" in msg.lower() or "exhausted" in msg.lower() or "429" in msg:
|
||
model = self.config.get("model_name", "unknown")
|
||
return (
|
||
f"❗️ <b>Превышен лимит Google Gemini API для модели <code>{utils.escape_html(model)}</code>.</b>"
|
||
"\n\nЧаще всего это происходит на бесплатном тарифе. Вы можете:\n"
|
||
"• Подождать, пока лимит сбросится (обычно раз в сутки).\n"
|
||
"• Проверить свой тарифный план в <a href='https://aistudio.google.com/app/billing'>Google AI Studio</a>.\n"
|
||
"• Узнать больше о лимитах <a href='https://ai.google.dev/gemini-api/docs/rate-limits'>здесь</a>.\n\n"
|
||
f"<b>Детали ошибки:</b>\n<code>{utils.escape_html(msg)}</code>"
|
||
)
|
||
if "location" in msg.lower() or "not supported" in msg.lower():
|
||
return (
|
||
'❗️ <b>В данном регионе Gemini API не доступен.</b>\n'
|
||
'Скачайте VPN (для пк/тел) или поставьте прокси (платный/бесплатный).\n'
|
||
'Или воспользуйтесь инструкцией <a href="https://t.me/SenkoGuardianModules/23">вот тут</a>\n'
|
||
'А для тех у кого UserLand инструкция <a href="https://t.me/SenkoGuardianModules/35">тут</a>'
|
||
)
|
||
if "key" in msg.lower() and "valid" in msg.lower():
|
||
return self.strings["invalid_api_key"]
|
||
if "blocked" in msg.lower():
|
||
return self.strings["blocked_error"].format(utils.escape_html(msg))
|
||
if "500" in msg:
|
||
return (
|
||
"❗️ <b>Ошибка 500 от Google API.</b>\n"
|
||
"Это значит, что формат медиа (файл или еще что то) который ты отправил, не поддерживается.\n"
|
||
"Такое случается, по такой причине:\n "
|
||
"• Если формат файла в принципе не поддерживается Gemini/Гуглом.\n "
|
||
"• Временный сбой на серверах Google. Попробуйте повторить запрос позже."
|
||
)
|
||
return self.strings["api_error"].format(utils.escape_html(msg))
|
||
|
||
def _markdown_to_html(self, text: str) -> str:
|
||
def heading_replacer(match):
|
||
level = len(match.group(1))
|
||
title = match.group(2).strip()
|
||
indent = " " * (level - 1)
|
||
return f"{indent}<b>{title}</b>"
|
||
text = re.sub(r"^(#+)\s+(.*)", heading_replacer, text, flags=re.MULTILINE)
|
||
def list_replacer(match):
|
||
indent = match.group(1)
|
||
return f"{indent}• "
|
||
text = re.sub(r"^([ \t]*)[-*+]\s+", list_replacer, text, flags=re.MULTILINE)
|
||
md = MarkdownIt("commonmark", {"html": True, "linkify": True})
|
||
md.enable("strikethrough")
|
||
md.disable("hr")
|
||
md.disable("heading")
|
||
md.disable("list")
|
||
html_text = md.render(text)
|
||
def format_code(match):
|
||
lang = utils.escape_html(match.group(1).strip())
|
||
code = utils.escape_html(match.group(2).strip())
|
||
return f'<pre><code class="language-{lang}">{code}</code></pre>' if lang else f'<pre><code>{code}</code></pre>'
|
||
html_text = re.sub(r"```(.*?)\n([\s\S]+?)\n```", format_code, html_text)
|
||
html_text = re.sub(r"<p>(<pre>[\s\S]*?</pre>)</p>", r"\1", html_text, flags=re.DOTALL)
|
||
html_text = html_text.replace("<p>", "").replace("</p>", "\n").strip()
|
||
return html_text
|
||
|
||
def _format_response_with_smart_separation(self, text: str) -> str:
|
||
pattern = r"(<pre.*?>[\s\S]*?</pre>)"
|
||
parts = re.split(pattern, text, flags=re.DOTALL)
|
||
result_parts = []
|
||
for i, part in enumerate(parts):
|
||
if not part or part.isspace(): continue
|
||
if i % 2 == 1:
|
||
result_parts.append(part.strip())
|
||
else:
|
||
stripped_part = part.strip()
|
||
if stripped_part:
|
||
result_parts.append(f'<blockquote expandable="true">{stripped_part}</blockquote>')
|
||
return "\n".join(result_parts)
|
||
|
||
def _get_inline_buttons(self, chat_id, base_message_id):
|
||
return [[
|
||
{"text": self.strings["btn_clear"], "callback": self._clear_callback, "args": (chat_id,)},
|
||
{"text": self.strings["btn_regenerate"], "callback": self._regenerate_callback, "args": (base_message_id, chat_id)}
|
||
]]
|
||
|
||
async def _safe_del_msg(self, msg, delay=1):
|
||
await asyncio.sleep(delay)
|
||
try: await self.client.delete_messages(msg.chat_id, msg.id)
|
||
except Exception as e: logger.warning(f"Ошибка удаления сообщения: {e}")
|
||
|
||
async def _clear_callback(self, call: InlineCall, chat_id: int):
|
||
self._clear_history(chat_id, gauto=False)
|
||
|
||
await call.edit(self.strings["memory_cleared"], reply_markup=None)
|
||
async def _regenerate_callback(self, call: InlineCall, original_message_id: int, chat_id: int):
|
||
key = f"{chat_id}:{original_message_id}"
|
||
last_request_tuple = self.last_requests.get(key)
|
||
if not last_request_tuple:
|
||
return await call.answer(self.strings["no_last_request"], show_alert=True)
|
||
last_parts, display_prompt = last_request_tuple
|
||
use_url_context = bool(re.search(r'https?://\S+', display_prompt or ""))
|
||
await self._send_to_gemini(
|
||
message=original_message_id,
|
||
parts=last_parts,
|
||
regeneration=True,
|
||
call=call,
|
||
chat_id_override=chat_id,
|
||
use_url_context=use_url_context,
|
||
display_prompt=display_prompt
|
||
)
|
||
|
||
async def _get_recent_chat_text(self, chat_id: int, count: int = None, skip_last: bool = False) -> str:
|
||
history_limit = count or self.config["impersonation_history_limit"]
|
||
fetch_limit = history_limit + 1 if skip_last else history_limit
|
||
chat_history_lines = []
|
||
try:
|
||
messages = await self.client.get_messages(chat_id, limit=fetch_limit)
|
||
if skip_last and messages:
|
||
messages = messages[1:]
|
||
for msg in messages:
|
||
if not msg: continue
|
||
if not msg.text and not msg.sticker and not msg.photo and not (msg.media and not hasattr(msg.media, "webpage")):
|
||
continue
|
||
sender = await msg.get_sender()
|
||
sender_name = get_display_name(sender) if sender else "Unknown"
|
||
text_content = msg.text or ""
|
||
if msg.sticker and hasattr(msg.sticker, 'attributes'):
|
||
alt_text = next((attr.alt for attr in msg.sticker.attributes if isinstance(attr, DocumentAttributeSticker)), None)
|
||
text_content += f" [Стикер: {alt_text or '?'}]"
|
||
elif msg.photo:
|
||
text_content += " [Фото]"
|
||
elif msg.document and not hasattr(msg.media, "webpage"):
|
||
text_content += " [Файл]"
|
||
|
||
if text_content.strip():
|
||
chat_history_lines.append(f"{sender_name}: {text_content.strip()}")
|
||
except Exception as e:
|
||
logger.warning(f"Не удалось получить историю для авто-ответа: {e}")
|
||
return "\n".join(reversed(chat_history_lines))
|
||
|
||
def _is_memory_enabled(self, chat_id: str) -> bool: return chat_id not in self.memory_disabled_chats
|
||
def _disable_memory(self, chat_id: int): self.memory_disabled_chats.add(str(chat_id))
|
||
def _enable_memory(self, chat_id: int): self.memory_disabled_chats.discard(str(chat_id))
|