Source code for ai.brain

from typing import Optional
from random import choice
from time import sleep, time
from abc import ABC, abstractmethod
from copy import deepcopy
from core.board import Board
from core.game import Move
from core.enums import GameState
from ai.table import TranspositionTable, TranspositionTableEntry, TranspositionTableEntryType, ScoreTable

[docs] class Brain(ABC): """ Base abstract class for AI agents. """ def __init__(self) -> None: self._best_move_cache: Optional[str] = None self._last_max_depth: int = 0 self._last_time_limit: int = 0 self._last_hash: int = 0
[docs] def find_best_move(self, board: Board, max_branching_factor: int, max_depth: int = 0, time_limit: int = 0) -> str: """ Finds the best move for the given board state, following the agent's policy. :param board: Current playing board. :type board: Board :param max_branching_factor: Maximum branching factor. :type max_branching_factor: int :param max_depth: Maximum lookahead depth, defaults to `0`. :type max_depth: int, optional :param time_limit: Maximum time (in seconds) to calculate the best move, defaults to `0`. :type time_limit: int, optional :return: Stringified best move. :rtype: str """ if not self._best_move_cache or self._last_hash != board.hash() or self._last_max_depth != max_depth or self._last_time_limit != time_limit: self._empty_cache() self._last_max_depth = max_depth self._last_time_limit = time_limit self._last_hash = board.hash() self._best_move_cache = self._find_best_move(board, max_branching_factor, max_depth, time_limit) return self._best_move_cache
@abstractmethod def _find_best_move(self, board: Board, max_branching_factor: int, max_depth: int = 0, time_limit: int = 0) -> str: """ Finds the best move according to this agent's strategy. :param board: Current playing board. :type board: Board :param max_branching_factor: Maximum branching factor. :type max_branching_factor: int :param max_depth: Maximum lookahead depth, defaults to `0`. :type max_depth: int, optional :param time_limit: Maximum time (in seconds) to calculate the best move, defaults to `0`. :type time_limit: int, optional :return: Stringified best move. :rtype: str """ def _empty_cache(self) -> None: """ | Empties the current cache for the best move. | Might empty more data depending on the agent. """ self._best_move_cache = None
[docs] class Random(Brain): """ Random acting AI agent. """ def _find_best_move(self, board: Board, max_branching_factor: int, max_depth: int = 0, time_limit: int = 0) -> str: sleep(0.5) return choice(board.valid_moves.split(";"))
[docs] class AlphaBetaPruner(Brain): """ AI agent following an alpha-beta pruning policy. """ def __init__(self) -> None: super().__init__() self._transpos_table: TranspositionTable = TranspositionTable() self._pv_table: dict[int, Move] = {} self._killer_moves: dict[int, list[Move]] = {} self._history_heuristic: dict[Move, int] = {} self._cached_scores: ScoreTable = ScoreTable() self._visited_nodes: int = 0 self._cutoffs: int = 0 def _find_best_move(self, board: Board, max_branching_factor: int, max_depth: int = 0, time_limit: int = 0) -> str: start_time = time() best_move = None scores: list[tuple[Optional[Move], float]] = [] depth = 0 try: while not max_depth or depth < max_depth: depth += 1 best_move, score = self._alpha_beta_search(deepcopy(board), max_branching_factor, depth, float('-inf'), float('inf'), start_time, time_limit) scores.append((best_move, score)) if time_limit and time() - start_time > time_limit: break except TimeoutError: pass self._transpos_table.flush() print(f"Visited nodes: {self._visited_nodes}; Cutoffs: {self._cutoffs}; Scores: {scores}; Time: {time() - start_time}") self._visited_nodes = 0 self._cutoffs = 0 return board.stringify_move(best_move) def _alpha_beta_search(self, board: Board, max_branching_factor: int, depth: int, alpha: float, beta: float, start_time: float, time_limit: int) -> tuple[Optional[Move], float]: if time_limit and time() - start_time > time_limit: raise TimeoutError("Time limit exceeded during alpha-beta pruning search") self._visited_nodes += 1 node_hash = board.hash() cached_entry = self._transpos_table[node_hash] if cached_entry and cached_entry.depth >= depth: match cached_entry.type: case TranspositionTableEntryType.EXACT: self._cutoffs += 1 return cached_entry.move, cached_entry.value case TranspositionTableEntryType.LOWER_BOUND: alpha = max(alpha, cached_entry.value) case TranspositionTableEntryType.UPPER_BOUND: beta = min(beta, cached_entry.value) if alpha >= beta: self._cutoffs += 1 return cached_entry.move, cached_entry.value if depth == 0 or board.gameover: return None, self._evaluate(board, None) best_move = self._pv_table.get(node_hash, None) moves = list(board.calculate_valid_moves()) moves.sort(key=lambda m: self._move_order_heuristic(board, m, best_move, depth), reverse=True) if len(moves) > max_branching_factor: del moves[max_branching_factor:] best_value = float('-inf') for move in moves: board.play_parsed(move) _, value = self._alpha_beta_search(board, max_branching_factor, depth - 1, -beta, -alpha, start_time, time_limit) board.undo() value = -value if value > best_value: best_value = value best_move = move alpha = max(alpha, best_value) if alpha >= beta: self._cutoffs += 1 self._store_killer_move(depth, move) break entry_type = TranspositionTableEntryType.EXACT if best_value < beta else TranspositionTableEntryType.LOWER_BOUND if best_value > alpha else TranspositionTableEntryType.UPPER_BOUND self._transpos_table[node_hash] = TranspositionTableEntry(entry_type, best_value, depth, best_move) if best_move: self._pv_table[node_hash] = best_move self._update_history_heuristic(best_move, depth) return best_move, best_value def _move_order_heuristic(self, board: Board, move: Move, best_move: Optional[Move], depth: int) -> tuple[float, int]: """ | Assigns a heuristic value to moves for ordering. | Higher values indicate better moves. """ if move == best_move: return (float('inf'), 1) # Prioritize PV move. if move in self._killer_moves.get(depth, []): return (float('inf'), 0) # Prioritize killer moves. return (self._history_heuristic.get(move, self._evaluate(board, move)), 0) # Fallback to history heuristic or explicit board evaluation. def _store_killer_move(self, depth: int, move: Move) -> None: """ Stores killer moves for a given depth. """ if depth not in self._killer_moves: self._killer_moves[depth] = [] if move not in self._killer_moves[depth]: if len(self._killer_moves[depth]) >= 3: self._killer_moves[depth].pop(0) self._killer_moves[depth].append(move) def _update_history_heuristic(self, move: Move, depth: int) -> None: """ Updates history heuristic table with a move's depth. :param move: Move. :type move: Move :param depth: Depth. :type depth: int """ self._history_heuristic[move] = self._history_heuristic.get(move, 0) + 2 ** depth def _evaluate(self, board: Board, move: Optional[Move]) -> float: """ Evaluates the given node. :param node: Playing board. :type node: Board :return: Node value. :rtype: float """ score = 0 if move: board.play_parsed(move) if (board.state is GameState.DRAW or board.state is GameState.NOT_STARTED): score = 0 elif board.current_player_has_won: score = float('inf') elif board.current_opponent_has_won: score = float('-inf') else: node_hash = board.hash() score = self._cached_scores[node_hash] if score is None: player = board.current_player_color opponent = player.opposite # Maximize the neighbors of the opponent's queen and minimize our own queen neighbors. score = 10 * (board.queen_neighbors_by_color(opponent) - board.queen_neighbors_by_color(player)) # Maximize our own pieces in play and minimize the opponent's. score += 2 * (board.pieces_in_play(player) - board.pieces_in_play(opponent)) # valid_moves_maximize = node.calculate_valid_moves(current_player, True) # collision_count_max = node.count_moves_near_queen(current_player) # valid_moves_minimize = node.calculate_valid_moves(current_opponent) # collision_count_min = node.count_moves_near_queen(current_opponent) # score = (node.count_queen_neighbors(current_opponent) - node.count_queen_neighbors(current_player)) * 20 + (collision_count_max - collision_count_min) * 20 + (len(valid_moves_maximize) - len(valid_moves_minimize)) // 2 self._cached_scores[node_hash] = score if move: board.undo() return score