__version__ = (2, 0, 6) #░░░███░███░███░███░███ #░░░░░█░█░░░░█░░█░░░█░█ #░░░░█░░███░░█░░█░█░█░█ #░░░█░░░█░░░░█░░█░█░█░█ #░░░███░███░░█░░███░███ #H:Mods Team [💎] # meta developer: @nullmod # requires: python-chess gdown # packurl: https://github.com/ZetGoHack/TestingModules/raw/main/chess.yml import asyncio import chess import chess.engine import chess.pgn import copy import logging import os import random as r import time from telethon.tl.types import PeerUser, User, Message from datetime import datetime, timezone from typing import TypedDict from .. import loader, utils from ..inline.types import BotInlineCall, InlineCall, InlineMessage class Timer: def __init__(self, scnds): self.starttime = scnds self.timers = {"white": scnds, "black": scnds} self.running = {"white": False, "black": False} self.last_time = time.monotonic() self.t = None def minutes(self) -> int: return self.starttime // 60 async def _count(self): while True: await asyncio.sleep(0.1) now = time.monotonic() elapsed = now - self.last_time self.last_time = now for color in ("white", "black"): if self.running[color]: self.timers[color] = max(0, self.timers[color] - elapsed) async def start(self, from_color: str = "white"): self.last_time = time.monotonic() if from_color == "restore": if self.running["white"]: from_color = "white" else: from_color = "black" await self._turn(from_color) self.t = asyncio.create_task(self._count()) async def switch(self): self.running["white"] = not self.running["white"] self.running["black"] = not self.running["black"] async def _turn(self, color): now = time.monotonic() e = now - self.last_time self.last_time = now for clr in ("white", "black"): if self.running[clr]: self.timers[clr] = max(0, self.timers[clr] - e) self.running = {"white": color == "white", "black": color == "black"} async def white_time(self): return round(self.timers["white"], 0) async def black_time(self): return round(self.timers["black"], 0) def restore(self, white_time: float, black_time: float, running: dict): self.timers["white"] = white_time self.timers["black"] = black_time self.running = running def backup(self) -> dict: return { "white_time": self.timers["white"], "black_time": self.timers["black"], "running": self.running } async def stop(self): if self.t: self.t.cancel() self.running = {"white": False, "black": False} class Player(TypedDict): id: int name: str color: bool | None class TimerDict(TypedDict): timer: Timer timer_loop: bool timer_is_set: bool message: InlineCall class GameParams(TypedDict): chosen_figure_coord: str reason_of_ending: str promotion_move: str winner_color: bool | None resigner_color: bool | None draw_offerer: bool | None class Game(TypedDict): board: chess.Board message: InlineCall root_node: chess.pgn.Game curr_node: chess.pgn.Game state: str reason: str add_params: GameParams bot: chess.engine.SimpleEngine | None class GameObj(TypedDict): game_id: str vs_bot: bool bot_elo: int game: Game sender: Player opponent: Player Timer: TimerDict time: int host_plays: bool # True - white, False - black style: dict[str, str] GamesDict = dict[str, GameObj] logger = logging.getLogger(__name__) def install_stockfish() -> str | None: import platform import gdown import zipfile system = platform.system() if system == "Windows": url = "https://github.com/official-stockfish/Stockfish/releases/latest/download/stockfish-windows-x86-64.zip" elif system == "Linux": if platform.machine().lower() == "aarch64": url = "https://github.com/official-stockfish/Stockfish/releases/latest/download/stockfish-android-armv8.tar" else: url = "https://github.com/official-stockfish/Stockfish/releases/latest/download/stockfish-ubuntu-x86-64.tar" else: return None file_name = url.split("/")[-1] try: gdown.download(url, file_name, quiet=True) if file_name.endswith(".zip"): with zipfile.ZipFile(file_name, 'r') as file: file.extractall() elif file_name.endswith(".tar"): import tarfile with tarfile.open(file_name, 'r') as file: file.extractall() # noqa: S202 os.remove(file_name) return find_stfsh_exe() except Exception: logger.exception("Failed to install Stockfish") return None def find_stfsh_exe() -> str | None: for root, _, files in os.walk("./stockfish"): for file in files: if "stockfish" in file.lower(): return os.path.join(root, file) def check_path(path: str) -> bool: return os.path.isfile(path) @loader.tds class Chess(loader.Module): """A reworked version of the Chess module""" strings = { "": "", "name": "Chess", } def __init__(self): self.config = loader.ModuleConfig( loader.ConfigValue( "play_self", False, "Allows you to make moves without turn checks (also, you can play with yourself)", validator=loader.validators.Boolean(), ), loader.ConfigValue( "stockfish_path", None, "Path to stockfish engine", ), ) async def client_ready(self): self._last_backup = 0 self.styles = { "figures-with-circles": { "symbol": "[♔⚪] ", "r": "♖⚫", "n": "♘⚫", "b": "♗⚫", "q": "♕⚫", "k": "♔⚫", "p": "♙⚫", "R": "♖⚪", "N": "♘⚪", "B": "♗⚪", "Q": "♕⚪", "K": "♔⚪", "P": "♙⚪", "move": "●", "capture": "×", "promotion": "↻", "capture_promotion": "×↻", }, "figures": { "symbol": "[♔] ", "r": "♜", "n": "♞", "b": "♝", "q": "𝗾", "k": "♚", "p": "♟", "R": "♖", "N": "♘", "B": "♗", "Q": "𝗤", "K": "♔", "P": "♙", "move": "●", "capture": "×", "promotion": "↻", "capture_promotion": "×↻", }, "letters": { "symbol": "[𝗞] ", "r": "𝗿", "n": "𝗻", "b": "𝗯", "q": "𝗾", "k": "𝗸", "p": "𝗽", "R": "𝗥", "N": "𝗡", "B": "𝗕", "Q": "𝗤", "K": "𝗞", "P": "𝗣", "move": "●", "capture": "×", "promotion": "↻", "capture_promotion": "×↻", }, "figures-with-cyr-letters": { "symbol": "[♔Б] ", "r": "♖Ч", "n": "♘Ч", "b": "♗Ч", "q": "♕Ч", "k": "♔Ч", "p": "♙Ч", "R": "♖Б", "N": "♘Б", "B": "♗Б", "Q": "♕Б", "K": "♔Б", "P": "♙Б", "move": "●", "capture": "×", "promotion": "↻", "capture_promotion": "×↻", }, "figures-with-latin-letters": { "symbol": "[♔W] ", "r": "♖B", "n": "♘B", "b": "♗B", "q": "♕B", "k": "♔B", "p": "♙B", "R": "♖W", "N": "♘W", "B": "♗W", "Q": "♕W", "K": "♔W", "P": "♙W", "move": "●", "capture": "×", "promotion": "↻", "capture_promotion": "×↻", }, "figures-with-comb-letters": { "symbol": "[♔ⷱ] ", "r": "♖ⷱ", "n": "♘ⷱ", "b": "♗ⷱ", "q": "♕ⷱ", "k": "♔ⷱ", "p": "♙ⷱ", "R": "♖ⷠ", "N": "♘ⷠ", "B": "♗ⷠ", "Q": "♕ⷠ", "K": "♔ⷠ", "P": "♙ⷠ", "move": "●", "capture": "×", "promotion": "↻", "capture_promotion": "×↻", }, } self.coords = { f"{col}{row}": "" for row in range(1, 9) for col in "hgfedcba" } self.games: GamesDict = self.get("games_backup", {}) self.pgn = { 'Event': "Chess Play In Module", 'Site': "https://t.me/nullmod/", 'Date': "{date}", 'Round': "{game_id}", 'White': "{player}", 'Black': "{player}", } if (path := find_stfsh_exe()): self.config["stockfish_path"] = path async def _check_player(self, call: InlineCall | Message | None, game_id: str, only_opponent: bool = False, skip_turn_check: bool = False) -> bool: if isinstance(call, (BotInlineCall, InlineCall, InlineMessage)): game = self.games[game_id] _from_id = call.from_user.id if game.get("game", None) and game["game"]["state"] == "the_end": await call.answer(self.strings["game_ended"], show_alert=True) return if _from_id != game["sender"]["id"] and _from_id != game["opponent"]["id"]: await call.answer(self.strings["not_available"]) return False if _from_id == game["sender"]["id"] and only_opponent and not self.config["play_self"]: await call.answer(self.strings["not_you"]) return False elif not self.config["play_self"] and game.get("game", None) and not skip_turn_check: if game["sender"]["color"] == game["game"]["board"].turn and game["sender"]["id"] != _from_id: await call.answer(self.strings["opp_move"]) return False elif game["opponent"]["color"] == game["game"]["board"].turn and game["opponent"]["id"] != _from_id: await call.answer(self.strings["opp_move"]) return False return True async def install_stockfish(self, call: InlineCall): await utils.answer(call, self.strings["installing"]) path = install_stockfish() if path: self.config["stockfish_path"] = path await utils.answer(call, self.strings["stockfish_installed"].format(path=path)) else: await utils.answer(call, self.strings["stockfish_install_failed"]) async def get_players(self, message: Message | InlineCall, sender: dict = None, sender_only: bool = False, opponent_only: bool = False): if not sender: sender = { "id": message.sender_id, "name": (await self.client.get_entity(message.sender_id)).first_name } if sender_only: return sender if isinstance(message, InlineCall): opp_id = message.from_user.id opp_name = message.from_user.first_name elif message.is_reply: r = await message.get_reply_message() opponent = r.sender if not isinstance(opponent, User): await utils.answer(message, self.strings["not_a_user"]) return (sender, None) opp_id = opponent.id opp_name = opponent.first_name else: args = utils.get_args(message) if len(args)==0: return (sender, None) opponent = args[0] try: if opponent.isdigit(): opp_id = int(opponent) opponent = await self.client.get_entity(opp_id) if not isinstance(opponent, User): await utils.answer(message, self.strings["not_a_user"]) return (sender, None) opp_name = opponent.first_name else: opponent = await self.client.get_entity(opponent) if not isinstance(opponent, User): await utils.answer(message, self.strings["not_a_user"]) return (sender, None) opp_name = opponent.first_name opp_id = opponent.id except ValueError: await utils.answer(message, self.strings["whosthat"]) return (sender, None) opponent = { "id": opp_id, "name": opp_name } if opponent_only: return opponent return (sender, opponent) async def _invite(self, call: InlineCall, game_id: str): if not await self._check_player(call, game_id): return game = self.games[game_id] await utils.answer( call, self.strings["invite_bot" if game["vs_bot"] else "invite" if not game.get("alr_accepted", False) else "not_invite"].format( opponent=utils.escape_html(self.games[game_id]["opponent"]["name"]) ) + self._get_settings_text(game_id), reply_markup = [ [ { "text": self.strings["bot_yes" if game["vs_bot"] else "yes"], "callback": self._init_game, "args": (game_id,) }, { "text": self.strings["no"], "callback": self._init_game, "args": (game_id, "no") } ], [ { "text": self.strings["settings"], "callback": self.settings, "args": (game_id,) } ] ], disable_security=True ) async def settings(self, call: InlineCall, game_id: str): if not await self._check_player(call, game_id): return game = self.games[game_id] reply_markup = [] if game["vs_bot"]: reply_markup.append([ {"text": self.strings["bot_elo_btn"], "callback": self._settings, "args": (game_id, "e")} ]) if game["Timer"]["available"]: reply_markup.append([ {"text": self.strings["time_btn"], "callback": self._settings, "args": (game_id, "t",)} ]) reply_markup.extend([ [ {"text": self.strings["color_btn"], "callback": self._settings, "args": (game_id, "c",)} ], [ {"text": self.strings["style_btn"], "callback": self._settings, "args": (game_id, "s",)} ], [ {"text": self.strings['back'], "callback": self._invite, "args": (game_id,)} ] ]) await utils.answer( call, self._get_settings_text(game_id), reply_markup=reply_markup, disable_security=True ) async def _elo_validator(self, call: InlineCall, data, game_id: str): reply_markup = {"text": self.strings['back'], "callback": self._settings, "args": (game_id, "e")} if not str(data).isdigit(): return await utils.answer(call, self.strings["not_int_err"], reply_markup=reply_markup) if not 1400 <= int(data) <= 3200: return await utils.answer(call, self.strings["out_of_range_err"], reply_markup=reply_markup) self.games[game_id]["bot_elo"] = int(data) self.set("bot_elo", int(data)) await self._settings(call, game_id, "MARKASSUCCESS") async def _settings(self, call: InlineCall, game_id: str, page: str = "", param: str = "", value = None): reply_markup = [] text = "✅" if page: if page == "t": text = "⏳" reply_markup.extend([ [ {"text": self.strings['blitz_text'], "action": "answer", "message": self.strings['blitz_message']} ], [ {"text": self.strings['timer'].format(3), "callback":self._settings, "args": (game_id,), "kwargs": {"param": 'Timer', "value": 3}}, {"text": self.strings['timer'].format(5), "callback":self._settings, "args": (game_id,), "kwargs": {"param": 'Timer', "value": 5}}, ], [ {"text": self.strings['rapid_text'], "action": "answer", "message": self.strings['rapid_message']} ], [ {"text": self.strings['timer'].format(10), "callback":self._settings, "args": (game_id,), "kwargs": {"param": 'Timer', "value": 10}}, {"text": self.strings['timer'].format(15), "callback":self._settings, "args": (game_id,), "kwargs": {"param": 'Timer', "value": 15}}, {"text": self.strings['timer'].format(30), "callback":self._settings, "args": (game_id,), "kwargs": {"param": 'Timer', "value": 30}}, {"text": self.strings['timer'].format(60), "callback":self._settings, "args": (game_id,), "kwargs": {"param": 'Timer', "value": 60}} ], [ {"text": self.strings['no_clock_text'], "callback":self._settings, "args": (game_id,), "kwargs": {"param": 'Timer', "value": True}} ] ]) elif page == "c": text = "♟️" reply_markup.extend([ [ {"text": self.strings['white'], "callback":self._settings, "args": (game_id,), "kwargs": {"param": 'host_plays', "value": True}}, {"text": self.strings['black'], "callback":self._settings, "args": (game_id,), "kwargs": {"param": 'host_plays', "value": False}} ], [ {"text": self.strings['random'], "callback":self._settings, "args": (game_id,), "kwargs": {"param": 'host_plays', "value": 'r'}} ] ]) elif page == "s": text = "✏️" reply_markup.extend([ [{"text": st["symbol"] + self.strings[name], "callback":self._settings, "args": (game_id,), "kwargs": {"param": "style", "value": name}}] for name, st in self.styles.items() ]) elif page == "e": text = "🧠" reply_markup.extend([ [{"text": self.strings["set_btn"], "input": self.strings["bot_elo_btn"], "handler": self._elo_validator, "args": (game_id,)}] ]) reply_markup.append( [ {"text": self.strings['back'], "callback": self.settings, "args": (game_id,)} ] ) await utils.answer(call, text, reply_markup=reply_markup, disable_security=True) else: await call.answer("✅") if param == "style": self.set("style", value) if param == "Timer" and isinstance(value, int): self.games[game_id]['Timer']['timer'] = Timer(value*60) else: self.games[game_id][param] = value await self.settings(call, game_id) def _get_settings_text(self, game_id: str): game = self.games[game_id] timer = game['Timer'] return ( self.strings['settings_text'].format( style=game['style'], timer=self.strings['available'] if timer['available'] and not timer['timer'] else self.strings['timer'].format(timer['timer'].minutes()) if timer['timer'] else self.strings['not_available'], color=self.strings['random'] if game['host_plays'] == 'r' else self.strings['white'] if game['host_plays'] else self.strings['black'] ) + ("\n " + self.strings["bot_elo"].format(elo=game["bot_elo"]) if game["vs_bot"] else "") ) def _get_new_game_id(self): if self.games: past_game = next(reversed(self.games.values())) if not past_game.get("game", None): self.games.pop(past_game['game_id'], None) if not self.games: game_id = str(1) else: game_id = str(max(map(int, self.games.keys())) + 1) return game_id def _create_game(self, game_id: str, _params: dict = None): params = { "game_id": game_id, "vs_bot": False, "bot_elo": self.get("bot_elo", 3400), "sender": None, "opponent": None, "Timer": { "available": False, "timer": None, "timer_loop": False }, "time": int(time.time()), "host_plays": "r", "style": self.get("style", "figures-with-circles") } if _params: params.update(_params) self.games[game_id] = GameObj(**params) @loader.command(ru_doc="[reply/username/id] - предложить человеку сыграть партию") async def chess(self, message: Message | InlineCall, _sender: dict = None): """[nothing/reply/username/id] - propose a person to play a game""" if _sender is None: _sender = {} sender, opponent = await self.get_players(message, sender=_sender) if not opponent: r_m = {"text": self.strings["i_wanna"], "callback": self.chess, "args": (sender,)} await utils.answer( message, self.strings["is_someone_wanna_play"], reply_markup=r_m, disable_security=True, ) return if sender['id'] == opponent['id'] and not self.config["play_self"]: await utils.answer(message, self.strings["playing_with_yourself?"]) return game_id = self._get_new_game_id() mod_params = { "sender": sender, "opponent": opponent, "Timer": { "available": isinstance(message, Message) and isinstance(message.peer_id, PeerUser), "timer": None, "timer_loop": False } } self._create_game(game_id, mod_params) if _sender: self.games[game_id]["alr_accepted"] = True await self._invite(message, game_id) @loader.command(ru_doc="[reply/username/id] - предложить человеку сыграть партию против 🐟 Stockfish") async def stockfish(self, message: Message): """[reply/username/id] - propose a person to play a game against a 🐟 Stockfish""" if not self.config["stockfish_path"] or not check_path(self.config["stockfish_path"]): return await utils.answer( message, self.strings["stockfish_not_found"], reply_markup={ "text": self.strings["install_stockfish"], "callback": self.install_stockfish, } ) if message.is_reply: player = await self.get_players(message, opponent_only=True) else: player = await self.get_players(message, sender_only=True) stockfish = { "name": "Stockfish", "id": -42, } game_id = self._get_new_game_id() mod_params = { "vs_bot": True, "sender": stockfish, "opponent": player, "Timer": { "available": isinstance(message, Message) and isinstance(message.peer_id, PeerUser), "timer": None, "timer_loop": False } } self._create_game(game_id, mod_params) await self._invite(message, game_id) ############## Preparing all for game start... ############## async def _init_game(self, call: InlineCall, game_id: str, ans="yes"): if not await self._check_player(call, game_id=game_id, only_opponent=True): return if ans == "no": self.games.pop(game_id, None) await utils.answer(call, self.strings["declined"]) return await utils.answer(call, "🌒") game = self.games[game_id] game["style"] = self.styles[game["style"]] if (turn := game.pop("host_plays")) == "r": turn = r.choice([True, False]) game["sender"]["color"] = turn game["opponent"]["color"] = not turn game["Timer"].pop("available", None) if game.get("alr_accepted", None): game.pop("alr_accepted") await asyncio.sleep(0.8) if isinstance(self.games[game_id]["Timer"]["timer"], Timer): await utils.answer(call, self.strings["step4.T"]) await self._set_timer(call, game_id, call._units[call.unit_id]['chat']) await asyncio.sleep(0.8) return await utils.answer(call, self.strings["waiting_for_start"]) await self._start_game(call, game_id) async def _set_timer(self, board_call: InlineCall, game_id: str, chat_id): timer = self.games[game_id]["Timer"]["timer"] self.games[game_id]["Timer"]["message"] = ( await self.inline.form(self.strings["timer_text"].format( int(await timer.white_time()), int(await timer.black_time()), "" ), chat_id, reply_markup = { "text": self.strings["start_timer"], "callback": self._start_timer, "args": (board_call, game_id,) }, disable_security = True, ) ) @loader.loop(interval=1, autostart=True) async def main_loop(self): for game_id in self.games: if not self.games[game_id].get("backup", False) and self.games[game_id]["Timer"]["timer_loop"] and not self.games[game_id]["Timer"]["timer_is_set"]: async def timer_loop(game_id): game = self.games[game_id]["game"] timer = self.games[game_id]["Timer"] timer_c = self.games[game_id]["Timer"]["timer"] await timer_c.start() timer["timer_is_set"] = True while timer["timer_loop"]: if not all([await timer_c.white_time(), await timer_c.black_time()]): timer["timer_loop"] = False self.the_end(game_id, "time_is_up") elif game["state"] == "the_end": timer["timer_loop"] = False loser, winner = self._get_loser_and_winner(game_id) await timer["message"].edit(self.strings["timer_text"].format( int(await timer_c.white_time()), int(await timer_c.black_time()), "" if game["state"] != "the_end" else "⏹️ " + self.strings[game["add_params"]["reason_of_ending"]].format( loser, winner ) ), ) await asyncio.sleep(1) await timer.stop() asyncio.create_task(timer_loop(game_id)) if self.games[game_id].get("game", None): if not self.games[game_id].get("backup", False): self.games[game_id]["game"]["message"].inline_manager._units[ self.games[game_id]["game"]["message"].unit_id ]["always_allow"] = True # для ругающегося на эту строку гпт - по неизвестно какой причине фреймворк в какое-то время попросту # забывает про отключение его проверки. мне это нужно, чтобы сам модуль брал на себя ответсвенность # проверки, кто может управлять доской, а до кого очередь ещё не дошла # FIXME: оно, похоже, всё ещё забывает про always_allow, патч не помогает... нужно выйти на эту ошибку и посмотреть, прочему права пропадают games_backup = {} if time.time() - self._last_backup >= 10: for game_id, game in self.games.items(): if game.get("game", None): game_copy = game if not game.get("backup", None): game_copy = {} game_copy["backup"] = True game_copy["game"] = { k: v for k, v in game["game"].items() if k not in ("message", "root_node", "curr_node", "board", "bot") } game_copy["game"]["node"] = str(game["game"]["root_node"]) if game.get("Timer", None) and game["Timer"].get("timer", None): game_copy["Timer"] = game["Timer"]["timer"].backup() for key, value in game.items(): if key not in ("game", "Timer"): game_copy[key] = value games_backup[game_id] = game_copy self.set("games_backup", games_backup) self._last_backup = time.time() ############## Starting game... ############## async def _start_timer(self, call: InlineCall, board_call: InlineCall, game_id: str): if not await self._check_player(call, game_id): return timer = self.games[game_id]["Timer"] timer["timer_loop"] = True await self._start_game(board_call, game_id) async def _init_bot(self, game_id: str, params: dict): if not self.games[game_id]["vs_bot"]: return engine = chess.engine.SimpleEngine.popen_uci(self.config["stockfish_path"]) engine.configure({"UCI_LimitStrength": True, "UCI_Elo": params["elo"]}) self.games[game_id]["game"]["bot"] = engine async def _start_game(self, call: InlineCall, game_id: str): if not await self._check_player(call, game_id): return game = self.games[game_id] node = chess.pgn.Game() pgn = copy.deepcopy(self.pgn) pgn["Date"] = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") pgn["Round"] = str(game_id) pgn["White"] = game["sender"]["name"] if game["sender"]["color"] else game["opponent"]["name"] pgn["Black"] = game["opponent"]["name"] if game["sender"]["color"] else game["sender"]["name"] pgn["Result"] = "*" node.headers.update(pgn) game["game"] = Game( board = chess.Board(), message = call, root_node = node, curr_node = node, state = "idle", add_params = GameParams( chosen_figure_coord = "", reason_of_ending = "", promotion_move = "", winner_color = None, resigner_color = None, draw_offerer = None, ), bot = None, ) await self._init_bot(game_id, {"elo": game["bot_elo"]}) await self.update_board(game_id) def idle(self, game_id: str): game = self.games[game_id]["game"] game["state"] = "idle" game["add_params"]["chosen_figure_coord"] = "" game["add_params"]["promotion_move"] = "" game["add_params"]["draw_offerer"] = None def choose(self, game_id: str, coord: str): game = self.games[game_id]["game"] game["state"] = "in_choose" game["add_params"]["chosen_figure_coord"] = coord game["add_params"]["promotion_move"] = "" def promotion(self, game_id: str, move: str): game = self.games[game_id]["game"] game["state"] = "in_promotion" game["add_params"]["chosen_figure_coord"] = "" game["add_params"]["promotion_move"] = move def the_end(self, game_id: str, reason: str, winner: bool = None): game = self.games[game_id]["game"] game["state"] = "the_end" game["add_params"]["reason_of_ending"] = reason game["add_params"]["winner_color"] = winner game["add_params"]["chosen_figure_coord"] = "" game["add_params"]["promotion_move"] = "" game["root_node"].headers["Result"] = ( "1-0" if winner is True else "0-1" if winner is False else "1/2-1/2" ) if self.games[game_id]["vs_bot"]: game["bot"].quit() def _get_loser_and_winner(self, game_id: str) -> tuple[str, str]: game = self.games[game_id] if game["sender"]["color"] == self.games[game_id]["game"]["add_params"]["winner_color"]: return (game["opponent"]["name"], game["sender"]["name"]) else: return (game["sender"]["name"], game["opponent"]["name"]) def _get_piece_symbol(self, game_id: str, coord: str) -> str: game = self.games[game_id] piece = game["game"]["board"].piece_at(chess.parse_square(coord)) return game["style"][piece.symbol()] if piece else " " def _get_move_symbol(self, game_id: str, move: str) -> str: game = self.games[game_id] if len(move) == 5: return game["style"][ "capture_promotion" if (move := chess.Move.from_uci(move)) and game["game"]["board"].is_capture(move) else "promotion" ] else: return game["style"][ "capture" if (move := chess.Move.from_uci(move)) and game["game"]["board"].is_capture(move) else "move" ] def _get_available_moves(self, game_id: str, coord: str) -> list[str]: if not coord: return [] game = self.games[game_id] coord = chess.parse_square(coord) moves = [move.uci() for move in game["game"]["board"].legal_moves if move.from_square == coord] return moves def _get_board_dict(self, game_id: str) -> dict[str, str]: game = self.games[game_id] coords = copy.deepcopy(self.coords) for coord in self.coords: coords[coord] = self._get_piece_symbol(game_id, coord) if game["game"]["state"] == "in_choose": choosen_coord = game["game"]["add_params"]["chosen_figure_coord"] for move in self._get_available_moves(game_id, choosen_coord): coord = move[2:4] coords[coord] = self._get_move_symbol(game_id, move) return coords def _get_reply_markup(self, game_id: str, promotion: bool = False, resign_confirm: bool = False, draw_confirm: bool = False) -> list[list[dict]]: game = self.games[game_id] is_end = game["game"]["state"] == "the_end" reply_markup = utils.chunks( [ { "text": figure, "callback": self.choose_coord, "args": (game_id, coord), } for coord, figure in self._get_board_dict(game_id).items() ][::-1], 8 ) if promotion: reply_markup.append( [{"text": "⬇️↻⬇️", "action": "answer", "message": self.strings["choose_promotion"]}] ) reply_markup.append( [ { "text": game["style"].get(piece, piece), "callback": self.pawn_promotion, "args": (game_id, piece), } for piece in "qrnb" ] ) elif resign_confirm: reply_markup.extend( [ [ { "text": self.strings["resign_check"], "data": "_there_is_nothing", } ], [ { "text": self.strings["resign_yes"], "callback": self.resign, "args": (game_id, True), }, { "text": self.strings["resign_no"], "callback": self._back_to_game, "args": (game_id,), }, ] ] ) elif draw_confirm: reply_markup.extend( [ [ { "text": self.strings["draw_offer"].format( self.strings["white"] if game["game"]["add_params"]["draw_offerer"] else self.strings["black"] ), "data": "_there_is_nothing", } ], [ { "text": self.strings["draw_yes"], "callback": self.draw, "args": (game_id, True), }, { "text": self.strings["resign_no"], "callback": self._back_to_game, "args": (game_id,), }, ] ] ) elif not is_end: resign = [ { "text": "🏳️", "callback": self.resign, "args": (game_id,), }, ] if not game["vs_bot"]: resign.append( { "text": "🤝", "callback": self.draw, "args": (game_id,), } ) reply_markup.append(resign) return reply_markup async def update_board(self, game_id: str, promotion: bool = False, resign_confirm: bool = False, draw_confirm: bool = False): game = self.games[game_id] is_end = game["game"]["state"] == "the_end" reason_of_ending = game["game"]["add_params"]["reason_of_ending"] status = ( self.strings["check"] if game["game"]["board"].is_check() and not is_end else self.strings[reason_of_ending] + "\n" ) loser, winner = self._get_loser_and_winner(game_id) reply_markup = self._get_reply_markup(game_id, promotion, resign_confirm, draw_confirm) pgn = game["game"]["root_node"].accept(chess.pgn.StringExporter(columns=None, headers=False)).replace("*", "").rsplit(maxsplit=1) if pgn: pgn[-1] = f"{pgn[-1]}" else: pgn = ["|"] last_moves = " ".join(pgn) await utils.answer( game["game"]["message"], self.strings["board"].format( game_id, utils.escape_html(game["sender"]["name"] if game["sender"]["color"] else game["opponent"]["name"]), utils.escape_html(game["opponent"]["name"] if game["sender"]["color"] else game["sender"]["name"]), self.strings["white"] if game["game"]["board"].turn else self.strings["black"], status.format(loser=loser, winner=winner), last_moves[-32:], ), reply_markup=reply_markup, ) if game["vs_bot"] and game["game"]["board"].turn == game["sender"]["color"] and game["game"]["state"] == "idle": await self._bot_process_board(game_id) async def _bot_process_board(self, game_id: str): if not (game := self.games[game_id])["vs_bot"]: return board = game["game"]["board"] bot = game["game"]["bot"] _d = game["bot_elo"] // 100 - 10 depth = r.randint(_d, _d + 15) result = bot.play(board, limit=chess.engine.Limit(time=0.1, depth=depth)) move = result.move from_coord = chess.square_name(result.move.from_square) to_coord = chess.square_name(result.move.to_square) await asyncio.sleep(r.randint(1, 3)) await self.choose_coord(None, game_id, from_coord) await asyncio.sleep(0.7) await self.choose_coord(None, game_id, to_coord) if move.promotion: await asyncio.sleep(0.7) await self.pawn_promotion(None, game_id, chess.piece_symbol(move.promotion)) def make_move(self, game_id: str, move: str): game = self.games[game_id]["game"] move = chess.Move.from_uci(move) game["board"].push(move) game["curr_node"] = game["curr_node"].add_variation(move) async def pawn_promotion(self, call: InlineCall, game_id: str, piece: str): if not await self._check_player(call, game_id): return game = self.games[game_id]["game"] move = game["add_params"]["promotion_move"] + piece self.make_move(game_id, move) self.set_game_state(game_id) return await self.update_board(game_id) async def _back_to_game(self, _, game_id: str): self.set_game_state(game_id) await self.update_board(game_id) async def resign(self, call: InlineCall, game_id: str, confirm: bool = False): if not await self._check_player(call, game_id, skip_turn_check=True): return game = self.games[game_id] if not confirm: game["game"]["add_params"]["resigner_color"] = self._get_color_by_player( game_id, call.from_user.id ) return await self.update_board(game_id, resign_confirm=True) resigner = self._get_player_by_color( game_id, game["game"]["add_params"]["resigner_color"] ) if call.from_user.id != resigner["id"]: return await call.answer(self.strings["resign_not_you"]) self.the_end(game_id, "resign", winner=not resigner["color"]) await self.update_board(game_id) async def draw(self, call: InlineCall, game_id: str, accept: bool = False): if not await self._check_player(call, game_id, skip_turn_check=True): return game = self.games[game_id] if accept: offerer = self._get_player_by_color( game_id, game["game"]["add_params"]["draw_offerer"] ) if call.from_user.id == offerer["id"]: await call.answer(self.strings["draw_not_you"]) return self.the_end(game_id, "draw") return await self.update_board(game_id) game["game"]["add_params"]["draw_offerer"] = self._get_color_by_player( game_id, call.from_user.id ) return await self.update_board(game_id, draw_confirm=True) def set_game_state(self, game_id: str): game = self.games[game_id]["game"] board = game["board"] self.idle(game_id) if board.is_checkmate(): self.the_end(game_id, "checkmate", winner=not board.turn) elif board.is_stalemate(): self.the_end(game_id, "stalemate") elif board.is_insufficient_material(): self.the_end(game_id, "insufficient_material") elif board.is_seventyfive_moves(): self.the_end(game_id, "seventyfive_moves") elif board.is_fivefold_repetition(): self.the_end(game_id, "fivefold_repetition") async def choose_coord(self, call: BotInlineCall, game_id: str, coord: str): if not await self._check_player(call, game_id): return game = self.games[game_id]["game"] state = game["state"] if state == "idle": if self._get_available_moves(game_id, coord): self.choose(game_id, coord) else: await call.answer(self.strings["no_moves"]) return await self.update_board(game_id) elif state == "in_choose": if coord == game["add_params"]["chosen_figure_coord"]: self.idle(game_id) return await self.update_board(game_id) av_moves = self._get_available_moves(game_id, game["add_params"]["chosen_figure_coord"]) coord_matches = [move for move in av_moves if coord in move] if len(coord_matches) == 1: self.make_move(game_id, coord_matches[0]) self.set_game_state(game_id) return await self.update_board(game_id) elif len(coord_matches) > 1: move = coord_matches[0][:4] self.promotion(game_id, move) return await self.update_board(game_id, promotion=True) elif game["board"].piece_at(chess.parse_square(coord)): self.choose(game_id, coord) return await self.update_board(game_id) else: self.idle(game_id) return await self.update_board(game_id) elif state == "in_promotion": return await call.answer(self.strings["can_not_move"]) elif state == "the_end": return await call.answer(self.strings["game_ended"]) else: await call.answer("ты игру сломал?") self.idle(game_id) return await self.update_board(game_id) def _get_player_by_color(self, game_id: str, color: bool): game = self.games[game_id] return game["sender"] if game["sender"]["color"] == color else game["opponent"] def _get_color_by_player(self, game_id: str, player_id: int): game = self.games[game_id] if game["sender"]["id"] == player_id: return game["sender"]["color"] elif game["opponent"]["id"] == player_id: return game["opponent"]["color"] return None