From 8bef60417099129f012d7e2f65f75716b1ae376b Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Mon, 9 Feb 2026 01:29:03 +0000 Subject: [PATCH] Added and updated repositories 2026-02-09 01:29:03 --- SenkoGuardian/SenModules/Gemini.py | 150 ++++++++++++++++++++++++----- 1 file changed, 126 insertions(+), 24 deletions(-) diff --git a/SenkoGuardian/SenModules/Gemini.py b/SenkoGuardian/SenModules/Gemini.py index 971b02d..e72d071 100644 --- a/SenkoGuardian/SenModules/Gemini.py +++ b/SenkoGuardian/SenModules/Gemini.py @@ -3,7 +3,7 @@ # This software is released under the MIT License. # https://opensource.org/licenses/MIT -__version__ = (6, 0, 0) #фыр +__version__ = (6, 1, 0) #фыр # meta developer: @SenkoGuardianModules @@ -59,6 +59,7 @@ logger = logging.getLogger(__name__) DB_HISTORY_KEY = "gemini_conversations_v4" DB_GAUTO_HISTORY_KEY = "gemini_gauto_conversations_v1" DB_IMPERSONATION_KEY = "gemini_impersonation_chats" +DB_PRESETS_KEY = "gemini_prompt_presets" GEMINI_TIMEOUT = 840 MAX_FFMPEG_SIZE = 90 * 1024 * 1024 DB_KEY_MAP_KEY = "gemini_key_model_map" @@ -158,6 +159,21 @@ class Gemini(loader.Module): "gmodel_no_models": "⚠️ Не удалось получить список моделей.", "gmodel_list_error": "❗️ Ошибка получения списка: {}", "gimg_process": " Генерация...\n🧠 Модель: {model}", + "gprompt_usage": "ℹ️ Использование:\n.gprompt <текст/пресет> — установить.\n.gprompt -c — очистить.\n.gpresets — база пресетов.", + "gpresets_usage": ( + "ℹ️ Управление пресетами:\n" + "• .gpresets save [Имя] текст — сохранить (имя в скобках, если с пробелами).\n" + "• .gpresets load 1 или имя — загрузить по номеру/имени.\n" + "• .gpresets del 1 или имя — удалить.\n" + "• .gpresets list — список." + ), + "gpreset_loaded": "✅ Установлен пресет: [{}]\nДлина: {} симв.", + "gpreset_saved": "💾 Пресет сохранен!\n🏷 Имя: {}\n№ Индекс: {}", + "gpreset_deleted": "🗑 Пресет удален: {}", + "gpreset_not_found": "🚫 Пресет с таким именем или индексом не найден.", + "gpreset_list_head": "📋 Ваши пресеты:\n", + "gpreset_empty": "📂 Список пресетов пуст.", + } TEXT_MIME_TYPES = { "text/plain", "text/markdown", "text/html", "text/css", "text/csv", @@ -194,6 +210,7 @@ class Gemini(loader.Module): loader.ConfigValue("inline_pagination", False, self.strings["cfg_inline_pagination_doc"], validator=loader.validators.Boolean()), loader.ConfigValue("image_model_name", "gemini-2.5-flash-image", self.strings["cfg_image_model_doc"]), ) + self.prompt_presets = [] self.conversations = {} self.gauto_conversations = {} self.last_requests = {} @@ -221,6 +238,9 @@ class Gemini(loader.Module): return self.current_api_key_index = 0 self.conversations = self._load_history_from_db(DB_HISTORY_KEY) + self.prompt_presets = self.db.get(self.strings["name"], DB_PRESETS_KEY, []) + if isinstance(self.prompt_presets, dict): + self.prompt_presets = [{"name": k, "content": v} for k, v in self.prompt_presets.items()] self.gauto_conversations = self._load_history_from_db(DB_GAUTO_HISTORY_KEY) self.impersonation_chats = set(self.db.get(self.strings["name"], DB_IMPERSONATION_KEY, [])) if not self.api_keys: @@ -348,19 +368,37 @@ class Gemini(loader.Module): my_name = get_display_name(self.me) chat_history_text = await self._get_recent_chat_text(chat_id) sys_instruct = self.config["impersonation_prompt"].format(my_name=my_name, chat_history=chat_history_text) - raw_hist = self._get_structured_history(chat_id, gauto=impersonation_mode) + history_key = "global_context" if (self.config.get("global_memory") and not impersonation_mode) else str(chat_id) + raw_hist = self._get_structured_history(history_key, gauto=impersonation_mode) if regeneration and raw_hist: raw_hist = raw_hist[:-2] openai_messages = self._convert_google_history_to_openai(raw_hist, sys_instruct) - user_text_prompt = " ".join([p.text for p in current_turn_parts if hasattr(p, "text") and p.text]) - if not user_text_prompt: user_text_prompt = request_text_for_display - openai_messages.append({"role": "user", "content": user_text_prompt}) + content_list = [] + for p in current_turn_parts: + if hasattr(p, "text") and p.text: + content_list.append({"type": "text", "text": p.text}) + elif hasattr(p, "inline_data") and p.inline_data: + mime = p.inline_data.mime_type + data = p.inline_data.data + if mime.startswith("image/"): + b64_img = base64.b64encode(data).decode("utf-8") + content_list.append({ + "type": "image_url", + "image_url": {"url": f"data:{mime};base64,{b64_img}"} + }) + if not content_list: + content_list = request_text_for_display + openai_messages.append({"role": "user", "content": content_list}) target_model = self.config["model_name"] result_text = await self._send_to_Openrouter_api(target_model, openai_messages, self.config["temperature"]) if self._is_memory_enabled(str(chat_id)): - self._update_history(chat_id, current_turn_parts, result_text, regeneration, msg_obj, gauto=impersonation_mode) + self._update_history(history_key, current_turn_parts, result_text, regeneration, msg_obj, gauto=impersonation_mode) if impersonation_mode: return result_text - hist_len = len(self._get_structured_history(chat_id)) // 2 - mem_ind = self.strings["memory_status"].format(hist_len, self.config["max_history_length"]) + hist_len = len(self._get_structured_history(history_key)) // 2 + mem_ind_fmt = self.strings.get("memory_status_global", self.strings["memory_status"]) + if self.config.get("global_memory"): + mem_ind = mem_ind_fmt.format(hist_len) + else: + mem_ind = self.strings["memory_status"].format(hist_len, self.config["max_history_length"]) model_info = f"OpenRouter: {target_model}" response_html = self._markdown_to_html(result_text) formatted_body = self._format_response_with_smart_separation(response_html) @@ -676,32 +714,39 @@ class Gemini(loader.Module): @loader.command() async def gprompt(self, message: Message): - """[текст / -c / ответ на файл] — [-c (очистить)] / (ничего. увидеть промпт) Установить системный промпт (инструкцию/system_instruction).""" + """<текст/-c/ответ на файл> — Установить промпт.""" args = utils.get_args_raw(message) reply = await message.get_reply_message() if args == "-c": self.config["system_instruction"] = "" return await utils.answer(message, self.strings["gprompt_cleared"]) - new_p = None - if reply and reply.file: + new_prompt = None + preset = self._find_preset(args) + if preset: + new_prompt = preset['content'] + elif reply and reply.file: if reply.file.size > 1024 * 1024: return await utils.answer(message, self.strings["gprompt_file_too_big"]) try: - data = await self.client.download_file(reply.media, bytes) - try: new_p = data.decode("utf-8") + file_data = await self.client.download_file(reply.media, bytes) + try: new_prompt = file_data.decode("utf-8") except UnicodeDecodeError: return await utils.answer(message, self.strings["gprompt_not_text"]) - except Exception as e: return await utils.answer(message, self.strings["gprompt_file_error"].format(e)) - elif args: new_p = args - if new_p: - self.config["system_instruction"] = new_p - return await utils.answer(message, self.strings["gprompt_updated"].format(len(new_p))) - cur = self.config["system_instruction"] - if not cur: return await utils.answer(message, self.strings["gprompt_usage"]) - if len(cur) > 4000: - file = io.BytesIO(cur.encode("utf-8")); file.name = "system_instruction.txt" + except Exception as e: + return await utils.answer(message, self.strings["gprompt_file_error"].format(e)) + elif args: + new_prompt = args + if new_prompt is not None: + self.config["system_instruction"] = new_prompt + return await utils.answer(message, self.strings["gprompt_updated"].format(len(new_prompt))) + current_prompt = self.config["system_instruction"] + if not current_prompt: + return await utils.answer(message, self.strings["gprompt_usage"]) + if len(current_prompt) > 4000: + file = io.BytesIO(current_prompt.encode("utf-8")) + file.name = "system_instruction.txt" await utils.answer(message, self.strings["gprompt_current"], file=file) else: - await utils.answer(message, f"{self.strings['gprompt_current']}\n{utils.escape_html(cur)}") + await utils.answer(message, f"{self.strings['gprompt_current']}\n{utils.escape_html(current_prompt)}") @loader.command() async def gauto(self, message: Message): @@ -760,6 +805,64 @@ class Gemini(loader.Module): else: await utils.answer(message, self.strings["gclear_usage"]) + @loader.command() + async def gpresets(self, message: Message): + """ — Управление пресетами (профилями).""" + args = utils.get_args_raw(message) + if not args: return await utils.answer(message, self.strings["gpresets_usage"]) + match = re.match(r"^(\w+)(?:\s+\[(.+?)\]|\s+(\S+))?(?:\s+(.*))?$", args, re.DOTALL) + if not match: return await utils.answer(message, self.strings["gpresets_usage"]) + action = match.group(1).lower() + name = match.group(2) or match.group(3) + content = match.group(4) + if action == "list": + if not self.prompt_presets: return await utils.answer(message, self.strings["gpreset_empty"]) + text = self.strings["gpreset_list_head"] + for idx, p in enumerate(self.prompt_presets, 1): + text += f"{idx}. {p['name']} ({len(p['content'])} симв.)\n" + return await utils.answer(message, text) + if action == "save": + if not name: return await utils.answer(message, "❌ Укажите имя: .gpresets save [Имя] текст") + reply = await message.get_reply_message() + if not content and reply: + if reply.text: content = reply.text + elif reply.file: + try: content = (await self.client.download_file(reply.media, bytes)).decode("utf-8", errors="ignore") + except: pass + if not content: return await utils.answer(message, "❌ Нет текста для сохранения.") + existing = self._find_preset(name) + if existing: + existing['content'] = content + else: + self.prompt_presets.append({"name": name, "content": content}) + self.db.set(self.strings["name"], DB_PRESETS_KEY, self.prompt_presets) + await utils.answer(message, self.strings["gpreset_saved"].format(name, len(self.prompt_presets))) + elif action == "load": + target = self._find_preset(name) + if not target: return await utils.answer(message, self.strings["gpreset_not_found"]) + self.config["system_instruction"] = target['content'] + await utils.answer(message, self.strings["gpreset_loaded"].format(target['name'], len(target['content']))) + elif action == "del": + target = self._find_preset(name) + if not target: return await utils.answer(message, self.strings["gpreset_not_found"]) + self.prompt_presets.remove(target) + self.db.set(self.strings["name"], DB_PRESETS_KEY, self.prompt_presets) + await utils.answer(message, self.strings["gpreset_deleted"].format(target['name'])) + else: + await utils.answer(message, self.strings["gpresets_usage"]) + + def _find_preset(self, query): + "Ищет пресет по номеру (строка '1') или имени." + if not query: return None + if str(query).isdigit(): + idx = int(query) - 1 + if 0 <= idx < len(self.prompt_presets): + return self.prompt_presets[idx] + for p in self.prompt_presets: + if p['name'].lower() == str(query).lower(): + return p + return None + @loader.command() async def gmemdel(self, message: Message): """[N] — удалить последние N пар сообщений из памяти.""" @@ -1438,7 +1541,6 @@ class Gemini(loader.Module): async with aiohttp.ClientSession() as session: async with session.post(url, headers=headers, json=payload, timeout=GEMINI_TIMEOUT) as resp: text = await resp.text() - if resp.status != 200: try: err_json = json.loads(text)