mirror of
https://github.com/MuRuLOSE/limoka.git
synced 2026-06-16 06:24:18 +02:00
1175 lines
46 KiB
Python
1175 lines
46 KiB
Python
__version__ = (2, 0, 6)
|
||
#░░░███░███░███░███░███
|
||
#░░░░░█░█░░░░█░░█░░░█░█
|
||
#░░░░█░░███░░█░░█░█░█░█
|
||
#░░░█░░░█░░░░█░░█░█░█░█
|
||
#░░░███░███░░█░░███░███
|
||
#H:Mods Team [💎]
|
||
|
||
# meta developer: @nullmod
|
||
# requires: python-chess gdown
|
||
# packurl: https://github.com/ZetGoHack/TestingModules/raw/main/chess.yml
|
||
|
||
import asyncio
|
||
import chess
|
||
import chess.engine
|
||
import chess.pgn
|
||
import copy
|
||
import logging
|
||
import os
|
||
import random as r
|
||
import time
|
||
|
||
from telethon.tl.types import PeerUser, User, Message
|
||
from datetime import datetime, timezone
|
||
from typing import TypedDict
|
||
|
||
from .. import loader, utils
|
||
from ..inline.types import BotInlineCall, InlineCall, InlineMessage
|
||
|
||
class Timer:
|
||
def __init__(self, scnds):
|
||
self.starttime = scnds
|
||
self.timers = {"white": scnds, "black": scnds}
|
||
self.running = {"white": False, "black": False}
|
||
self.last_time = time.monotonic()
|
||
self.t = None
|
||
|
||
def minutes(self) -> int:
|
||
return self.starttime // 60
|
||
|
||
async def _count(self):
|
||
while True:
|
||
await asyncio.sleep(0.1)
|
||
now = time.monotonic()
|
||
elapsed = now - self.last_time
|
||
self.last_time = now
|
||
for color in ("white", "black"):
|
||
if self.running[color]:
|
||
self.timers[color] = max(0, self.timers[color] - elapsed)
|
||
|
||
async def start(self, from_color: str = "white"):
|
||
self.last_time = time.monotonic()
|
||
if from_color == "restore":
|
||
if self.running["white"]:
|
||
from_color = "white"
|
||
else:
|
||
from_color = "black"
|
||
await self._turn(from_color)
|
||
self.t = asyncio.create_task(self._count())
|
||
|
||
async def switch(self):
|
||
self.running["white"] = not self.running["white"]
|
||
self.running["black"] = not self.running["black"]
|
||
|
||
async def _turn(self, color):
|
||
now = time.monotonic()
|
||
e = now - self.last_time
|
||
self.last_time = now
|
||
for clr in ("white", "black"):
|
||
if self.running[clr]:
|
||
self.timers[clr] = max(0, self.timers[clr] - e)
|
||
self.running = {"white": color == "white", "black": color == "black"}
|
||
|
||
async def white_time(self):
|
||
return round(self.timers["white"], 0)
|
||
|
||
async def black_time(self):
|
||
return round(self.timers["black"], 0)
|
||
|
||
def restore(self, white_time: float, black_time: float, running: dict):
|
||
self.timers["white"] = white_time
|
||
self.timers["black"] = black_time
|
||
self.running = running
|
||
|
||
def backup(self) -> dict:
|
||
return {
|
||
"white_time": self.timers["white"],
|
||
"black_time": self.timers["black"],
|
||
"running": self.running
|
||
}
|
||
|
||
async def stop(self):
|
||
if self.t:
|
||
self.t.cancel()
|
||
self.running = {"white": False, "black": False}
|
||
|
||
class Player(TypedDict):
|
||
id: int
|
||
name: str
|
||
color: bool | None
|
||
|
||
class TimerDict(TypedDict):
|
||
timer: Timer
|
||
timer_loop: bool
|
||
timer_is_set: bool
|
||
message: InlineCall
|
||
|
||
class GameParams(TypedDict):
|
||
chosen_figure_coord: str
|
||
reason_of_ending: str
|
||
promotion_move: str
|
||
winner_color: bool | None
|
||
resigner_color: bool | None
|
||
draw_offerer: bool | None
|
||
|
||
class Game(TypedDict):
|
||
board: chess.Board
|
||
message: InlineCall
|
||
root_node: chess.pgn.Game
|
||
curr_node: chess.pgn.Game
|
||
state: str
|
||
reason: str
|
||
add_params: GameParams
|
||
bot: chess.engine.SimpleEngine | None
|
||
|
||
class GameObj(TypedDict):
|
||
game_id: str
|
||
vs_bot: bool
|
||
bot_elo: int
|
||
game: Game
|
||
sender: Player
|
||
opponent: Player
|
||
Timer: TimerDict
|
||
time: int
|
||
host_plays: bool # True - white, False - black
|
||
style: dict[str, str]
|
||
|
||
GamesDict = dict[str, GameObj]
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
def install_stockfish() -> str | None:
|
||
import platform
|
||
import gdown
|
||
import zipfile
|
||
system = platform.system()
|
||
if system == "Windows":
|
||
url = "https://github.com/official-stockfish/Stockfish/releases/latest/download/stockfish-windows-x86-64.zip"
|
||
elif system == "Linux":
|
||
if platform.machine().lower() == "aarch64":
|
||
url = "https://github.com/official-stockfish/Stockfish/releases/latest/download/stockfish-android-armv8.tar"
|
||
else:
|
||
url = "https://github.com/official-stockfish/Stockfish/releases/latest/download/stockfish-ubuntu-x86-64.tar"
|
||
else:
|
||
return None
|
||
file_name = url.split("/")[-1]
|
||
try:
|
||
gdown.download(url, file_name, quiet=True)
|
||
if file_name.endswith(".zip"):
|
||
with zipfile.ZipFile(file_name, 'r') as file:
|
||
file.extractall()
|
||
elif file_name.endswith(".tar"):
|
||
import tarfile
|
||
with tarfile.open(file_name, 'r') as file:
|
||
file.extractall() # noqa: S202
|
||
os.remove(file_name)
|
||
|
||
return find_stfsh_exe()
|
||
except Exception:
|
||
logger.exception("Failed to install Stockfish")
|
||
return None
|
||
|
||
def find_stfsh_exe() -> str | None:
|
||
for root, _, files in os.walk("./stockfish"):
|
||
for file in files:
|
||
if "stockfish" in file.lower():
|
||
return os.path.join(root, file)
|
||
|
||
def check_path(path: str) -> bool:
|
||
return os.path.isfile(path)
|
||
|
||
|
||
@loader.tds
|
||
class Chess(loader.Module):
|
||
"""A reworked version of the Chess module"""
|
||
strings = {
|
||
"": "",
|
||
"name": "Chess",
|
||
}
|
||
|
||
def __init__(self):
|
||
self.config = loader.ModuleConfig(
|
||
loader.ConfigValue(
|
||
"play_self",
|
||
False,
|
||
"Allows you to make moves without turn checks (also, you can play with yourself)",
|
||
validator=loader.validators.Boolean(),
|
||
),
|
||
loader.ConfigValue(
|
||
"stockfish_path",
|
||
None,
|
||
"Path to stockfish engine",
|
||
),
|
||
)
|
||
|
||
async def client_ready(self):
|
||
self._last_backup = 0
|
||
self.styles = {
|
||
"figures-with-circles": {
|
||
"symbol": "[♔⚪] ",
|
||
"r": "♖⚫", "n": "♘⚫", "b": "♗⚫", "q": "♕⚫", "k": "♔⚫", "p": "♙⚫",
|
||
"R": "♖⚪", "N": "♘⚪", "B": "♗⚪", "Q": "♕⚪", "K": "♔⚪", "P": "♙⚪",
|
||
"move": "●", "capture": "×", "promotion": "↻", "capture_promotion": "×↻",
|
||
},
|
||
"figures": {
|
||
"symbol": "[♔] ",
|
||
"r": "♜", "n": "♞", "b": "♝", "q": "𝗾", "k": "♚", "p": "♟",
|
||
"R": "♖", "N": "♘", "B": "♗", "Q": "𝗤", "K": "♔", "P": "♙",
|
||
"move": "●", "capture": "×", "promotion": "↻", "capture_promotion": "×↻",
|
||
},
|
||
"letters": {
|
||
"symbol": "[𝗞] ",
|
||
"r": "𝗿", "n": "𝗻", "b": "𝗯", "q": "𝗾", "k": "𝗸", "p": "𝗽",
|
||
"R": "𝗥", "N": "𝗡", "B": "𝗕", "Q": "𝗤", "K": "𝗞", "P": "𝗣",
|
||
"move": "●", "capture": "×", "promotion": "↻", "capture_promotion": "×↻",
|
||
},
|
||
"figures-with-cyr-letters": {
|
||
"symbol": "[♔Б] ",
|
||
"r": "♖Ч", "n": "♘Ч", "b": "♗Ч", "q": "♕Ч", "k": "♔Ч", "p": "♙Ч",
|
||
"R": "♖Б", "N": "♘Б", "B": "♗Б", "Q": "♕Б", "K": "♔Б", "P": "♙Б",
|
||
"move": "●", "capture": "×", "promotion": "↻", "capture_promotion": "×↻",
|
||
},
|
||
"figures-with-latin-letters": {
|
||
"symbol": "[♔W] ",
|
||
"r": "♖B", "n": "♘B", "b": "♗B", "q": "♕B", "k": "♔B", "p": "♙B",
|
||
"R": "♖W", "N": "♘W", "B": "♗W", "Q": "♕W", "K": "♔W", "P": "♙W",
|
||
"move": "●", "capture": "×", "promotion": "↻", "capture_promotion": "×↻",
|
||
},
|
||
"figures-with-comb-letters": {
|
||
"symbol": "[♔ⷱ] ",
|
||
"r": "♖ⷱ", "n": "♘ⷱ", "b": "♗ⷱ", "q": "♕ⷱ", "k": "♔ⷱ", "p": "♙ⷱ",
|
||
"R": "♖ⷠ", "N": "♘ⷠ", "B": "♗ⷠ", "Q": "♕ⷠ", "K": "♔ⷠ", "P": "♙ⷠ",
|
||
"move": "●", "capture": "×", "promotion": "↻", "capture_promotion": "×↻",
|
||
},
|
||
}
|
||
self.coords = {
|
||
f"{col}{row}": "" for row in range(1, 9)
|
||
for col in "hgfedcba"
|
||
}
|
||
self.games: GamesDict = self.get("games_backup", {})
|
||
self.pgn = {
|
||
'Event': "Chess Play In Module",
|
||
'Site': "https://t.me/nullmod/",
|
||
'Date': "{date}",
|
||
'Round': "{game_id}",
|
||
'White': "{player}",
|
||
'Black': "{player}",
|
||
}
|
||
if (path := find_stfsh_exe()):
|
||
self.config["stockfish_path"] = path
|
||
|
||
async def _check_player(self, call: InlineCall | Message | None, game_id: str, only_opponent: bool = False, skip_turn_check: bool = False) -> bool:
|
||
if isinstance(call, (BotInlineCall, InlineCall, InlineMessage)):
|
||
game = self.games[game_id]
|
||
_from_id = call.from_user.id
|
||
|
||
if game.get("game", None) and game["game"]["state"] == "the_end":
|
||
await call.answer(self.strings["game_ended"], show_alert=True)
|
||
return
|
||
if _from_id != game["sender"]["id"] and _from_id != game["opponent"]["id"]:
|
||
await call.answer(self.strings["not_available"])
|
||
return False
|
||
if _from_id == game["sender"]["id"] and only_opponent and not self.config["play_self"]:
|
||
await call.answer(self.strings["not_you"])
|
||
return False
|
||
elif not self.config["play_self"] and game.get("game", None) and not skip_turn_check:
|
||
if game["sender"]["color"] == game["game"]["board"].turn and game["sender"]["id"] != _from_id:
|
||
await call.answer(self.strings["opp_move"])
|
||
return False
|
||
elif game["opponent"]["color"] == game["game"]["board"].turn and game["opponent"]["id"] != _from_id:
|
||
await call.answer(self.strings["opp_move"])
|
||
return False
|
||
return True
|
||
|
||
async def install_stockfish(self, call: InlineCall):
|
||
await utils.answer(call, self.strings["installing"])
|
||
path = install_stockfish()
|
||
if path:
|
||
self.config["stockfish_path"] = path
|
||
await utils.answer(call, self.strings["stockfish_installed"].format(path=path))
|
||
else:
|
||
await utils.answer(call, self.strings["stockfish_install_failed"])
|
||
|
||
async def get_players(self, message: Message | InlineCall, sender: dict = None, sender_only: bool = False, opponent_only: bool = False):
|
||
if not sender:
|
||
sender = {
|
||
"id": message.sender_id,
|
||
"name": (await self.client.get_entity(message.sender_id)).first_name
|
||
}
|
||
if sender_only:
|
||
return sender
|
||
|
||
if isinstance(message, InlineCall):
|
||
opp_id = message.from_user.id
|
||
opp_name = message.from_user.first_name
|
||
|
||
elif message.is_reply:
|
||
r = await message.get_reply_message()
|
||
opponent = r.sender
|
||
if not isinstance(opponent, User):
|
||
await utils.answer(message, self.strings["not_a_user"])
|
||
return (sender, None)
|
||
opp_id = opponent.id
|
||
opp_name = opponent.first_name
|
||
else:
|
||
args = utils.get_args(message)
|
||
|
||
if len(args)==0:
|
||
return (sender, None)
|
||
|
||
opponent = args[0]
|
||
try:
|
||
if opponent.isdigit():
|
||
opp_id = int(opponent)
|
||
opponent = await self.client.get_entity(opp_id)
|
||
|
||
if not isinstance(opponent, User):
|
||
await utils.answer(message, self.strings["not_a_user"])
|
||
return (sender, None)
|
||
opp_name = opponent.first_name
|
||
else:
|
||
opponent = await self.client.get_entity(opponent)
|
||
|
||
if not isinstance(opponent, User):
|
||
await utils.answer(message, self.strings["not_a_user"])
|
||
return (sender, None)
|
||
|
||
opp_name = opponent.first_name
|
||
opp_id = opponent.id
|
||
except ValueError:
|
||
await utils.answer(message, self.strings["whosthat"])
|
||
return (sender, None)
|
||
|
||
opponent = {
|
||
"id": opp_id,
|
||
"name": opp_name
|
||
}
|
||
if opponent_only:
|
||
return opponent
|
||
|
||
return (sender, opponent)
|
||
|
||
async def _invite(self, call: InlineCall, game_id: str):
|
||
if not await self._check_player(call, game_id): return
|
||
game = self.games[game_id]
|
||
|
||
await utils.answer(
|
||
call,
|
||
self.strings["invite_bot" if game["vs_bot"] else "invite" if not game.get("alr_accepted", False) else "not_invite"].format(
|
||
opponent=utils.escape_html(self.games[game_id]["opponent"]["name"])
|
||
) + self._get_settings_text(game_id),
|
||
reply_markup = [
|
||
[
|
||
{
|
||
"text": self.strings["bot_yes" if game["vs_bot"] else "yes"],
|
||
"callback": self._init_game,
|
||
"args": (game_id,)
|
||
},
|
||
{
|
||
"text": self.strings["no"],
|
||
"callback": self._init_game,
|
||
"args": (game_id, "no")
|
||
}
|
||
],
|
||
[
|
||
{
|
||
"text": self.strings["settings"],
|
||
"callback": self.settings,
|
||
"args": (game_id,)
|
||
}
|
||
]
|
||
],
|
||
disable_security=True
|
||
)
|
||
|
||
async def settings(self, call: InlineCall, game_id: str):
|
||
if not await self._check_player(call, game_id): return
|
||
game = self.games[game_id]
|
||
reply_markup = []
|
||
|
||
if game["vs_bot"]:
|
||
reply_markup.append([
|
||
{"text": self.strings["bot_elo_btn"], "callback": self._settings, "args": (game_id, "e")}
|
||
])
|
||
|
||
if game["Timer"]["available"]:
|
||
reply_markup.append([
|
||
{"text": self.strings["time_btn"], "callback": self._settings, "args": (game_id, "t",)}
|
||
])
|
||
|
||
reply_markup.extend([
|
||
[
|
||
{"text": self.strings["color_btn"], "callback": self._settings, "args": (game_id, "c",)}
|
||
],
|
||
[
|
||
{"text": self.strings["style_btn"], "callback": self._settings, "args": (game_id, "s",)}
|
||
],
|
||
[
|
||
{"text": self.strings['back'], "callback": self._invite, "args": (game_id,)}
|
||
]
|
||
])
|
||
await utils.answer(
|
||
call,
|
||
self._get_settings_text(game_id),
|
||
reply_markup=reply_markup,
|
||
disable_security=True
|
||
)
|
||
|
||
async def _elo_validator(self, call: InlineCall, data, game_id: str):
|
||
reply_markup = {"text": self.strings['back'], "callback": self._settings, "args": (game_id, "e")}
|
||
|
||
if not str(data).isdigit():
|
||
return await utils.answer(call, self.strings["not_int_err"], reply_markup=reply_markup)
|
||
if not 1400 <= int(data) <= 3200:
|
||
return await utils.answer(call, self.strings["out_of_range_err"], reply_markup=reply_markup)
|
||
|
||
self.games[game_id]["bot_elo"] = int(data)
|
||
self.set("bot_elo", int(data))
|
||
await self._settings(call, game_id, "MARKASSUCCESS")
|
||
|
||
async def _settings(self, call: InlineCall, game_id: str, page: str = "", param: str = "", value = None):
|
||
reply_markup = []
|
||
text = "✅"
|
||
if page:
|
||
if page == "t":
|
||
text = "⏳"
|
||
reply_markup.extend([
|
||
[
|
||
{"text": self.strings['blitz_text'], "action": "answer", "message": self.strings['blitz_message']}
|
||
],
|
||
[
|
||
{"text": self.strings['timer'].format(3), "callback":self._settings, "args": (game_id,), "kwargs": {"param": 'Timer', "value": 3}},
|
||
{"text": self.strings['timer'].format(5), "callback":self._settings, "args": (game_id,), "kwargs": {"param": 'Timer', "value": 5}},
|
||
],
|
||
[
|
||
{"text": self.strings['rapid_text'], "action": "answer", "message": self.strings['rapid_message']}
|
||
],
|
||
[
|
||
{"text": self.strings['timer'].format(10), "callback":self._settings, "args": (game_id,), "kwargs": {"param": 'Timer', "value": 10}},
|
||
{"text": self.strings['timer'].format(15), "callback":self._settings, "args": (game_id,), "kwargs": {"param": 'Timer', "value": 15}},
|
||
{"text": self.strings['timer'].format(30), "callback":self._settings, "args": (game_id,), "kwargs": {"param": 'Timer', "value": 30}},
|
||
{"text": self.strings['timer'].format(60), "callback":self._settings, "args": (game_id,), "kwargs": {"param": 'Timer', "value": 60}}
|
||
],
|
||
[
|
||
{"text": self.strings['no_clock_text'], "callback":self._settings, "args": (game_id,), "kwargs": {"param": 'Timer', "value": True}}
|
||
]
|
||
])
|
||
elif page == "c":
|
||
text = "♟️"
|
||
reply_markup.extend([
|
||
[
|
||
{"text": self.strings['white'], "callback":self._settings, "args": (game_id,), "kwargs": {"param": 'host_plays', "value": True}},
|
||
{"text": self.strings['black'], "callback":self._settings, "args": (game_id,), "kwargs": {"param": 'host_plays', "value": False}}
|
||
],
|
||
[
|
||
{"text": self.strings['random'], "callback":self._settings, "args": (game_id,), "kwargs": {"param": 'host_plays', "value": 'r'}}
|
||
]
|
||
])
|
||
elif page == "s":
|
||
text = "✏️"
|
||
reply_markup.extend([
|
||
[{"text": st["symbol"] + self.strings[name], "callback":self._settings, "args": (game_id,), "kwargs": {"param": "style", "value": name}}]
|
||
for name, st in self.styles.items()
|
||
])
|
||
elif page == "e":
|
||
text = "🧠"
|
||
reply_markup.extend([
|
||
[{"text": self.strings["set_btn"], "input": self.strings["bot_elo_btn"], "handler": self._elo_validator, "args": (game_id,)}]
|
||
])
|
||
|
||
reply_markup.append(
|
||
[
|
||
{"text": self.strings['back'], "callback": self.settings, "args": (game_id,)}
|
||
]
|
||
)
|
||
|
||
await utils.answer(call, text, reply_markup=reply_markup, disable_security=True)
|
||
else:
|
||
await call.answer("✅")
|
||
if param == "style":
|
||
self.set("style", value)
|
||
|
||
if param == "Timer" and isinstance(value, int):
|
||
self.games[game_id]['Timer']['timer'] = Timer(value*60)
|
||
else:
|
||
self.games[game_id][param] = value
|
||
await self.settings(call, game_id)
|
||
|
||
def _get_settings_text(self, game_id: str):
|
||
game = self.games[game_id]
|
||
timer = game['Timer']
|
||
return (
|
||
self.strings['settings_text'].format(
|
||
style=game['style'],
|
||
|
||
timer=self.strings['available'] if timer['available'] and not timer['timer']
|
||
else self.strings['timer'].format(timer['timer'].minutes()) if timer['timer']
|
||
else self.strings['not_available'],
|
||
|
||
color=self.strings['random'] if game['host_plays'] == 'r'
|
||
else self.strings['white'] if game['host_plays']
|
||
else self.strings['black']
|
||
)
|
||
+ ("\n " + self.strings["bot_elo"].format(elo=game["bot_elo"]) if game["vs_bot"] else "")
|
||
)
|
||
|
||
|
||
def _get_new_game_id(self):
|
||
if self.games:
|
||
past_game = next(reversed(self.games.values()))
|
||
if not past_game.get("game", None):
|
||
self.games.pop(past_game['game_id'], None)
|
||
if not self.games:
|
||
game_id = str(1)
|
||
else:
|
||
game_id = str(max(map(int, self.games.keys())) + 1)
|
||
|
||
return game_id
|
||
|
||
def _create_game(self, game_id: str, _params: dict = None):
|
||
params = {
|
||
"game_id": game_id,
|
||
"vs_bot": False,
|
||
"bot_elo": self.get("bot_elo", 3400),
|
||
"sender": None,
|
||
"opponent": None,
|
||
"Timer": {
|
||
"available": False,
|
||
"timer": None,
|
||
"timer_loop": False
|
||
},
|
||
"time": int(time.time()),
|
||
"host_plays": "r",
|
||
"style": self.get("style", "figures-with-circles")
|
||
}
|
||
|
||
if _params:
|
||
params.update(_params)
|
||
|
||
self.games[game_id] = GameObj(**params)
|
||
|
||
|
||
@loader.command(ru_doc="[reply/username/id] - предложить человеку сыграть партию")
|
||
async def chess(self, message: Message | InlineCall, _sender: dict = None):
|
||
"""[nothing/reply/username/id] - propose a person to play a game"""
|
||
if _sender is None:
|
||
_sender = {}
|
||
sender, opponent = await self.get_players(message, sender=_sender)
|
||
if not opponent:
|
||
r_m = {"text": self.strings["i_wanna"], "callback": self.chess, "args": (sender,)}
|
||
|
||
await utils.answer(
|
||
message,
|
||
self.strings["is_someone_wanna_play"],
|
||
reply_markup=r_m,
|
||
disable_security=True,
|
||
)
|
||
return
|
||
if sender['id'] == opponent['id'] and not self.config["play_self"]:
|
||
await utils.answer(message, self.strings["playing_with_yourself?"])
|
||
return
|
||
|
||
game_id = self._get_new_game_id()
|
||
|
||
mod_params = {
|
||
"sender": sender,
|
||
"opponent": opponent,
|
||
"Timer": {
|
||
"available": isinstance(message, Message) and isinstance(message.peer_id, PeerUser),
|
||
"timer": None,
|
||
"timer_loop": False
|
||
}
|
||
}
|
||
|
||
self._create_game(game_id, mod_params)
|
||
|
||
if _sender:
|
||
self.games[game_id]["alr_accepted"] = True
|
||
|
||
await self._invite(message, game_id)
|
||
|
||
@loader.command(ru_doc="[reply/username/id] - предложить человеку сыграть партию против 🐟 Stockfish")
|
||
async def stockfish(self, message: Message):
|
||
"""[reply/username/id] - propose a person to play a game against a 🐟 Stockfish"""
|
||
if not self.config["stockfish_path"] or not check_path(self.config["stockfish_path"]):
|
||
return await utils.answer(
|
||
message,
|
||
self.strings["stockfish_not_found"],
|
||
reply_markup={
|
||
"text": self.strings["install_stockfish"],
|
||
"callback": self.install_stockfish,
|
||
}
|
||
)
|
||
|
||
if message.is_reply:
|
||
player = await self.get_players(message, opponent_only=True)
|
||
else:
|
||
player = await self.get_players(message, sender_only=True)
|
||
|
||
stockfish = {
|
||
"name": "Stockfish",
|
||
"id": -42,
|
||
}
|
||
|
||
game_id = self._get_new_game_id()
|
||
|
||
mod_params = {
|
||
"vs_bot": True,
|
||
"sender": stockfish,
|
||
"opponent": player,
|
||
"Timer": {
|
||
"available": isinstance(message, Message) and isinstance(message.peer_id, PeerUser),
|
||
"timer": None,
|
||
"timer_loop": False
|
||
}
|
||
}
|
||
|
||
self._create_game(game_id, mod_params)
|
||
|
||
await self._invite(message, game_id)
|
||
|
||
############## Preparing all for game start... ##############
|
||
|
||
async def _init_game(self, call: InlineCall, game_id: str, ans="yes"):
|
||
if not await self._check_player(call, game_id=game_id, only_opponent=True): return
|
||
if ans == "no":
|
||
self.games.pop(game_id, None)
|
||
await utils.answer(call, self.strings["declined"])
|
||
return
|
||
|
||
await utils.answer(call, "🌒")
|
||
|
||
game = self.games[game_id]
|
||
game["style"] = self.styles[game["style"]]
|
||
|
||
if (turn := game.pop("host_plays")) == "r":
|
||
turn = r.choice([True, False])
|
||
|
||
game["sender"]["color"] = turn
|
||
game["opponent"]["color"] = not turn
|
||
game["Timer"].pop("available", None)
|
||
if game.get("alr_accepted", None):
|
||
game.pop("alr_accepted")
|
||
|
||
await asyncio.sleep(0.8)
|
||
|
||
if isinstance(self.games[game_id]["Timer"]["timer"], Timer):
|
||
await utils.answer(call, self.strings["step4.T"])
|
||
await self._set_timer(call, game_id, call._units[call.unit_id]['chat'])
|
||
await asyncio.sleep(0.8)
|
||
return await utils.answer(call, self.strings["waiting_for_start"])
|
||
|
||
await self._start_game(call, game_id)
|
||
|
||
async def _set_timer(self, board_call: InlineCall, game_id: str, chat_id):
|
||
timer = self.games[game_id]["Timer"]["timer"]
|
||
self.games[game_id]["Timer"]["message"] = (
|
||
await self.inline.form(self.strings["timer_text"].format(
|
||
int(await timer.white_time()),
|
||
int(await timer.black_time()),
|
||
""
|
||
),
|
||
chat_id,
|
||
reply_markup = {
|
||
"text": self.strings["start_timer"],
|
||
"callback": self._start_timer,
|
||
"args": (board_call, game_id,)
|
||
},
|
||
disable_security = True,
|
||
)
|
||
)
|
||
|
||
@loader.loop(interval=1, autostart=True)
|
||
async def main_loop(self):
|
||
for game_id in self.games:
|
||
if not self.games[game_id].get("backup", False) and self.games[game_id]["Timer"]["timer_loop"] and not self.games[game_id]["Timer"]["timer_is_set"]:
|
||
async def timer_loop(game_id):
|
||
game = self.games[game_id]["game"]
|
||
timer = self.games[game_id]["Timer"]
|
||
timer_c = self.games[game_id]["Timer"]["timer"]
|
||
|
||
await timer_c.start()
|
||
timer["timer_is_set"] = True
|
||
while timer["timer_loop"]:
|
||
if not all([await timer_c.white_time(), await timer_c.black_time()]):
|
||
timer["timer_loop"] = False
|
||
self.the_end(game_id, "time_is_up")
|
||
elif game["state"] == "the_end":
|
||
timer["timer_loop"] = False
|
||
|
||
loser, winner = self._get_loser_and_winner(game_id)
|
||
|
||
await timer["message"].edit(self.strings["timer_text"].format(
|
||
int(await timer_c.white_time()),
|
||
int(await timer_c.black_time()),
|
||
"" if game["state"] != "the_end"
|
||
else "⏹️ " + self.strings[game["add_params"]["reason_of_ending"]].format(
|
||
loser, winner
|
||
)
|
||
),
|
||
)
|
||
await asyncio.sleep(1)
|
||
await timer.stop()
|
||
asyncio.create_task(timer_loop(game_id))
|
||
|
||
if self.games[game_id].get("game", None):
|
||
if not self.games[game_id].get("backup", False):
|
||
self.games[game_id]["game"]["message"].inline_manager._units[
|
||
self.games[game_id]["game"]["message"].unit_id
|
||
]["always_allow"] = True # для ругающегося на эту строку гпт - по неизвестно какой причине фреймворк в какое-то время попросту
|
||
# забывает про отключение его проверки. мне это нужно, чтобы сам модуль брал на себя ответсвенность
|
||
# проверки, кто может управлять доской, а до кого очередь ещё не дошла
|
||
# FIXME: оно, похоже, всё ещё забывает про always_allow, патч не помогает... нужно выйти на эту ошибку и посмотреть, прочему права пропадают
|
||
|
||
games_backup = {}
|
||
if time.time() - self._last_backup >= 10:
|
||
for game_id, game in self.games.items():
|
||
if game.get("game", None):
|
||
game_copy = game
|
||
if not game.get("backup", None):
|
||
game_copy = {}
|
||
game_copy["backup"] = True
|
||
|
||
game_copy["game"] = {
|
||
k: v for k, v in game["game"].items()
|
||
if k not in ("message", "root_node", "curr_node", "board", "bot")
|
||
}
|
||
game_copy["game"]["node"] = str(game["game"]["root_node"])
|
||
|
||
if game.get("Timer", None) and game["Timer"].get("timer", None):
|
||
game_copy["Timer"] = game["Timer"]["timer"].backup()
|
||
|
||
for key, value in game.items():
|
||
if key not in ("game", "Timer"):
|
||
game_copy[key] = value
|
||
|
||
games_backup[game_id] = game_copy
|
||
|
||
self.set("games_backup", games_backup)
|
||
self._last_backup = time.time()
|
||
|
||
############## Starting game... ##############
|
||
|
||
async def _start_timer(self, call: InlineCall, board_call: InlineCall, game_id: str):
|
||
if not await self._check_player(call, game_id): return
|
||
timer = self.games[game_id]["Timer"]
|
||
timer["timer_loop"] = True
|
||
await self._start_game(board_call, game_id)
|
||
|
||
async def _init_bot(self, game_id: str, params: dict):
|
||
if not self.games[game_id]["vs_bot"]: return
|
||
|
||
engine = chess.engine.SimpleEngine.popen_uci(self.config["stockfish_path"])
|
||
engine.configure({"UCI_LimitStrength": True, "UCI_Elo": params["elo"]})
|
||
|
||
self.games[game_id]["game"]["bot"] = engine
|
||
|
||
async def _start_game(self, call: InlineCall, game_id: str):
|
||
if not await self._check_player(call, game_id): return
|
||
game = self.games[game_id]
|
||
node = chess.pgn.Game()
|
||
pgn = copy.deepcopy(self.pgn)
|
||
pgn["Date"] = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
|
||
pgn["Round"] = str(game_id)
|
||
pgn["White"] = game["sender"]["name"] if game["sender"]["color"] else game["opponent"]["name"]
|
||
pgn["Black"] = game["opponent"]["name"] if game["sender"]["color"] else game["sender"]["name"]
|
||
pgn["Result"] = "*"
|
||
node.headers.update(pgn)
|
||
game["game"] = Game(
|
||
board = chess.Board(),
|
||
message = call,
|
||
root_node = node,
|
||
curr_node = node,
|
||
state = "idle",
|
||
add_params = GameParams(
|
||
chosen_figure_coord = "",
|
||
reason_of_ending = "",
|
||
promotion_move = "",
|
||
winner_color = None,
|
||
resigner_color = None,
|
||
draw_offerer = None,
|
||
),
|
||
bot = None,
|
||
)
|
||
await self._init_bot(game_id, {"elo": game["bot_elo"]})
|
||
await self.update_board(game_id)
|
||
|
||
def idle(self, game_id: str):
|
||
game = self.games[game_id]["game"]
|
||
game["state"] = "idle"
|
||
game["add_params"]["chosen_figure_coord"] = ""
|
||
game["add_params"]["promotion_move"] = ""
|
||
game["add_params"]["draw_offerer"] = None
|
||
|
||
def choose(self, game_id: str, coord: str):
|
||
game = self.games[game_id]["game"]
|
||
game["state"] = "in_choose"
|
||
game["add_params"]["chosen_figure_coord"] = coord
|
||
game["add_params"]["promotion_move"] = ""
|
||
|
||
def promotion(self, game_id: str, move: str):
|
||
game = self.games[game_id]["game"]
|
||
game["state"] = "in_promotion"
|
||
game["add_params"]["chosen_figure_coord"] = ""
|
||
game["add_params"]["promotion_move"] = move
|
||
|
||
def the_end(self, game_id: str, reason: str, winner: bool = None):
|
||
game = self.games[game_id]["game"]
|
||
game["state"] = "the_end"
|
||
game["add_params"]["reason_of_ending"] = reason
|
||
game["add_params"]["winner_color"] = winner
|
||
game["add_params"]["chosen_figure_coord"] = ""
|
||
game["add_params"]["promotion_move"] = ""
|
||
game["root_node"].headers["Result"] = (
|
||
"1-0" if winner is True else
|
||
"0-1" if winner is False else
|
||
"1/2-1/2"
|
||
)
|
||
if self.games[game_id]["vs_bot"]:
|
||
game["bot"].quit()
|
||
|
||
def _get_loser_and_winner(self, game_id: str) -> tuple[str, str]:
|
||
game = self.games[game_id]
|
||
if game["sender"]["color"] == self.games[game_id]["game"]["add_params"]["winner_color"]:
|
||
return (game["opponent"]["name"], game["sender"]["name"])
|
||
else:
|
||
return (game["sender"]["name"], game["opponent"]["name"])
|
||
|
||
def _get_piece_symbol(self, game_id: str, coord: str) -> str:
|
||
game = self.games[game_id]
|
||
piece = game["game"]["board"].piece_at(chess.parse_square(coord))
|
||
return game["style"][piece.symbol()] if piece else " "
|
||
|
||
def _get_move_symbol(self, game_id: str, move: str) -> str:
|
||
game = self.games[game_id]
|
||
if len(move) == 5:
|
||
return game["style"][
|
||
"capture_promotion" if (move := chess.Move.from_uci(move))
|
||
and game["game"]["board"].is_capture(move)
|
||
else "promotion"
|
||
]
|
||
else:
|
||
return game["style"][
|
||
"capture" if (move := chess.Move.from_uci(move))
|
||
and game["game"]["board"].is_capture(move)
|
||
else "move"
|
||
]
|
||
|
||
def _get_available_moves(self, game_id: str, coord: str) -> list[str]:
|
||
if not coord: return []
|
||
game = self.games[game_id]
|
||
coord = chess.parse_square(coord)
|
||
moves = [move.uci() for move in game["game"]["board"].legal_moves if move.from_square == coord]
|
||
return moves
|
||
|
||
def _get_board_dict(self, game_id: str) -> dict[str, str]:
|
||
game = self.games[game_id]
|
||
coords = copy.deepcopy(self.coords)
|
||
for coord in self.coords:
|
||
coords[coord] = self._get_piece_symbol(game_id, coord)
|
||
|
||
if game["game"]["state"] == "in_choose":
|
||
choosen_coord = game["game"]["add_params"]["chosen_figure_coord"]
|
||
for move in self._get_available_moves(game_id, choosen_coord):
|
||
coord = move[2:4]
|
||
coords[coord] = self._get_move_symbol(game_id, move)
|
||
|
||
return coords
|
||
|
||
def _get_reply_markup(self, game_id: str, promotion: bool = False, resign_confirm: bool = False, draw_confirm: bool = False) -> list[list[dict]]:
|
||
game = self.games[game_id]
|
||
is_end = game["game"]["state"] == "the_end"
|
||
|
||
reply_markup = utils.chunks(
|
||
[
|
||
{
|
||
"text": figure,
|
||
"callback": self.choose_coord,
|
||
"args": (game_id, coord),
|
||
}
|
||
for coord, figure in self._get_board_dict(game_id).items()
|
||
][::-1],
|
||
8
|
||
)
|
||
|
||
if promotion:
|
||
reply_markup.append(
|
||
[{"text": "⬇️↻⬇️", "action": "answer", "message": self.strings["choose_promotion"]}]
|
||
)
|
||
reply_markup.append(
|
||
[
|
||
{
|
||
"text": game["style"].get(piece, piece),
|
||
"callback": self.pawn_promotion,
|
||
"args": (game_id, piece),
|
||
} for piece in "qrnb"
|
||
]
|
||
)
|
||
elif resign_confirm:
|
||
reply_markup.extend(
|
||
[
|
||
[
|
||
{
|
||
"text": self.strings["resign_check"],
|
||
"data": "_there_is_nothing",
|
||
}
|
||
],
|
||
[
|
||
{
|
||
"text": self.strings["resign_yes"],
|
||
"callback": self.resign,
|
||
"args": (game_id, True),
|
||
},
|
||
{
|
||
"text": self.strings["resign_no"],
|
||
"callback": self._back_to_game,
|
||
"args": (game_id,),
|
||
},
|
||
]
|
||
]
|
||
)
|
||
elif draw_confirm:
|
||
reply_markup.extend(
|
||
[
|
||
[
|
||
{
|
||
"text": self.strings["draw_offer"].format(
|
||
self.strings["white"] if game["game"]["add_params"]["draw_offerer"]
|
||
else self.strings["black"]
|
||
),
|
||
"data": "_there_is_nothing",
|
||
}
|
||
],
|
||
[
|
||
{
|
||
"text": self.strings["draw_yes"],
|
||
"callback": self.draw,
|
||
"args": (game_id, True),
|
||
},
|
||
{
|
||
"text": self.strings["resign_no"],
|
||
"callback": self._back_to_game,
|
||
"args": (game_id,),
|
||
},
|
||
]
|
||
]
|
||
)
|
||
elif not is_end:
|
||
resign = [
|
||
{
|
||
"text": "🏳️",
|
||
"callback": self.resign,
|
||
"args": (game_id,),
|
||
},
|
||
]
|
||
|
||
if not game["vs_bot"]:
|
||
resign.append(
|
||
{
|
||
"text": "🤝",
|
||
"callback": self.draw,
|
||
"args": (game_id,),
|
||
}
|
||
)
|
||
reply_markup.append(resign)
|
||
return reply_markup
|
||
|
||
async def update_board(self, game_id: str, promotion: bool = False, resign_confirm: bool = False, draw_confirm: bool = False):
|
||
game = self.games[game_id]
|
||
is_end = game["game"]["state"] == "the_end"
|
||
reason_of_ending = game["game"]["add_params"]["reason_of_ending"]
|
||
status = (
|
||
self.strings["check"] if game["game"]["board"].is_check() and not is_end
|
||
else self.strings[reason_of_ending] + "\n"
|
||
)
|
||
loser, winner = self._get_loser_and_winner(game_id)
|
||
|
||
reply_markup = self._get_reply_markup(game_id, promotion, resign_confirm, draw_confirm)
|
||
|
||
pgn = game["game"]["root_node"].accept(chess.pgn.StringExporter(columns=None, headers=False)).replace("*", "").rsplit(maxsplit=1)
|
||
if pgn:
|
||
pgn[-1] = f"<b>{pgn[-1]}</b>"
|
||
else:
|
||
pgn = ["<b>|</b>"]
|
||
last_moves = " ".join(pgn)
|
||
|
||
await utils.answer(
|
||
game["game"]["message"],
|
||
self.strings["board"].format(
|
||
game_id,
|
||
utils.escape_html(game["sender"]["name"] if game["sender"]["color"] else game["opponent"]["name"]),
|
||
utils.escape_html(game["opponent"]["name"] if game["sender"]["color"] else game["sender"]["name"]),
|
||
self.strings["white"] if game["game"]["board"].turn else self.strings["black"],
|
||
status.format(loser=loser, winner=winner),
|
||
last_moves[-32:],
|
||
),
|
||
reply_markup=reply_markup,
|
||
)
|
||
|
||
if game["vs_bot"] and game["game"]["board"].turn == game["sender"]["color"] and game["game"]["state"] == "idle":
|
||
await self._bot_process_board(game_id)
|
||
|
||
async def _bot_process_board(self, game_id: str):
|
||
if not (game := self.games[game_id])["vs_bot"]:
|
||
return
|
||
|
||
board = game["game"]["board"]
|
||
bot = game["game"]["bot"]
|
||
|
||
_d = game["bot_elo"] // 100 - 10
|
||
depth = r.randint(_d, _d + 15)
|
||
|
||
result = bot.play(board, limit=chess.engine.Limit(time=0.1, depth=depth))
|
||
move = result.move
|
||
from_coord = chess.square_name(result.move.from_square)
|
||
to_coord = chess.square_name(result.move.to_square)
|
||
|
||
await asyncio.sleep(r.randint(1, 3))
|
||
await self.choose_coord(None, game_id, from_coord)
|
||
await asyncio.sleep(0.7)
|
||
await self.choose_coord(None, game_id, to_coord)
|
||
|
||
if move.promotion:
|
||
await asyncio.sleep(0.7)
|
||
await self.pawn_promotion(None, game_id, chess.piece_symbol(move.promotion))
|
||
|
||
def make_move(self, game_id: str, move: str):
|
||
game = self.games[game_id]["game"]
|
||
move = chess.Move.from_uci(move)
|
||
game["board"].push(move)
|
||
game["curr_node"] = game["curr_node"].add_variation(move)
|
||
|
||
async def pawn_promotion(self, call: InlineCall, game_id: str, piece: str):
|
||
if not await self._check_player(call, game_id): return
|
||
game = self.games[game_id]["game"]
|
||
move = game["add_params"]["promotion_move"] + piece
|
||
|
||
self.make_move(game_id, move)
|
||
self.set_game_state(game_id)
|
||
|
||
return await self.update_board(game_id)
|
||
|
||
async def _back_to_game(self, _, game_id: str):
|
||
self.set_game_state(game_id)
|
||
await self.update_board(game_id)
|
||
|
||
async def resign(self, call: InlineCall, game_id: str, confirm: bool = False):
|
||
if not await self._check_player(call, game_id, skip_turn_check=True): return
|
||
game = self.games[game_id]
|
||
if not confirm:
|
||
game["game"]["add_params"]["resigner_color"] = self._get_color_by_player(
|
||
game_id,
|
||
call.from_user.id
|
||
)
|
||
return await self.update_board(game_id, resign_confirm=True)
|
||
|
||
resigner = self._get_player_by_color(
|
||
game_id, game["game"]["add_params"]["resigner_color"]
|
||
)
|
||
|
||
if call.from_user.id != resigner["id"]:
|
||
return await call.answer(self.strings["resign_not_you"])
|
||
|
||
self.the_end(game_id, "resign", winner=not resigner["color"])
|
||
await self.update_board(game_id)
|
||
|
||
async def draw(self, call: InlineCall, game_id: str, accept: bool = False):
|
||
if not await self._check_player(call, game_id, skip_turn_check=True): return
|
||
game = self.games[game_id]
|
||
if accept:
|
||
offerer = self._get_player_by_color(
|
||
game_id,
|
||
game["game"]["add_params"]["draw_offerer"]
|
||
)
|
||
|
||
if call.from_user.id == offerer["id"]:
|
||
await call.answer(self.strings["draw_not_you"])
|
||
return
|
||
self.the_end(game_id, "draw")
|
||
return await self.update_board(game_id)
|
||
|
||
game["game"]["add_params"]["draw_offerer"] = self._get_color_by_player(
|
||
game_id,
|
||
call.from_user.id
|
||
)
|
||
return await self.update_board(game_id, draw_confirm=True)
|
||
|
||
def set_game_state(self, game_id: str):
|
||
game = self.games[game_id]["game"]
|
||
board = game["board"]
|
||
self.idle(game_id)
|
||
if board.is_checkmate():
|
||
self.the_end(game_id, "checkmate", winner=not board.turn)
|
||
elif board.is_stalemate():
|
||
self.the_end(game_id, "stalemate")
|
||
elif board.is_insufficient_material():
|
||
self.the_end(game_id, "insufficient_material")
|
||
elif board.is_seventyfive_moves():
|
||
self.the_end(game_id, "seventyfive_moves")
|
||
elif board.is_fivefold_repetition():
|
||
self.the_end(game_id, "fivefold_repetition")
|
||
|
||
async def choose_coord(self, call: BotInlineCall, game_id: str, coord: str):
|
||
if not await self._check_player(call, game_id): return
|
||
game = self.games[game_id]["game"]
|
||
state = game["state"]
|
||
|
||
if state == "idle":
|
||
if self._get_available_moves(game_id, coord):
|
||
self.choose(game_id, coord)
|
||
else:
|
||
await call.answer(self.strings["no_moves"])
|
||
return await self.update_board(game_id)
|
||
|
||
elif state == "in_choose":
|
||
if coord == game["add_params"]["chosen_figure_coord"]:
|
||
self.idle(game_id)
|
||
return await self.update_board(game_id)
|
||
|
||
av_moves = self._get_available_moves(game_id, game["add_params"]["chosen_figure_coord"])
|
||
coord_matches = [move for move in av_moves if coord in move]
|
||
|
||
if len(coord_matches) == 1:
|
||
self.make_move(game_id, coord_matches[0])
|
||
self.set_game_state(game_id)
|
||
return await self.update_board(game_id)
|
||
|
||
elif len(coord_matches) > 1:
|
||
move = coord_matches[0][:4]
|
||
self.promotion(game_id, move)
|
||
return await self.update_board(game_id, promotion=True)
|
||
|
||
elif game["board"].piece_at(chess.parse_square(coord)):
|
||
self.choose(game_id, coord)
|
||
return await self.update_board(game_id)
|
||
|
||
else:
|
||
self.idle(game_id)
|
||
return await self.update_board(game_id)
|
||
|
||
elif state == "in_promotion":
|
||
return await call.answer(self.strings["can_not_move"])
|
||
|
||
elif state == "the_end":
|
||
return await call.answer(self.strings["game_ended"])
|
||
|
||
else:
|
||
await call.answer("ты игру сломал?")
|
||
self.idle(game_id)
|
||
return await self.update_board(game_id)
|
||
|
||
|
||
|
||
def _get_player_by_color(self, game_id: str, color: bool):
|
||
game = self.games[game_id]
|
||
return game["sender"] if game["sender"]["color"] == color else game["opponent"]
|
||
|
||
def _get_color_by_player(self, game_id: str, player_id: int):
|
||
game = self.games[game_id]
|
||
if game["sender"]["id"] == player_id:
|
||
return game["sender"]["color"]
|
||
elif game["opponent"]["id"] == player_id:
|
||
return game["opponent"]["color"]
|
||
return None
|