Coverage for src / beaverbunch / network / session.py: 100.0%

198 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 00:24 +0000

1from __future__ import annotations 

2 

3import secrets 

4import threading 

5from dataclasses import dataclass, field 

6from enum import Enum, auto 

7from random import SystemRandom 

8 

9from beaverbunch.core.game import Game, GameState 

10from beaverbunch.core.game_settings import GameSettings 

11from beaverbunch.core.hand import Hand 

12from beaverbunch.core.player import Player 

13 

14JOIN_CODE_ALPHABET = "ABCDEFGHJKLMNPQRSTUVWXYZ23456789" 

15TOKEN_BYTES = 16 # 128-bit opaque token, hex-encoded → 32 chars 

16 

17 

18class LobbyState(Enum): 

19 """Lifecycle state for a multiplayer lobby before and after game start.""" 

20 

21 OPEN = auto() 

22 FULL = auto() 

23 STARTED = auto() 

24 CLOSED = auto() 

25 

26 

27class SessionError(Exception): 

28 """Base exception for lobby and session management errors.""" 

29 

30 

31class SessionNotFoundError(SessionError): 

32 """Raised when a join code does not map to an active session.""" 

33 

34 

35class SessionCapacityError(SessionError): 

36 """Raised when a session or manager has reached its capacity.""" 

37 

38 

39class SessionStateError(SessionError): 

40 """Raised when an operation is not allowed in the current lobby state.""" 

41 

42 

43class DuplicatePlayerError(SessionError): 

44 """Raised when a player name already exists in a session.""" 

45 

46 

47class NotInSessionError(SessionError): 

48 """Raised when a player is not found in the session they are trying to leave.""" 

49 

50 

51class InvalidTokenError(SessionError): 

52 """Raised when an action is attempted with an invalid or unknown player token.""" 

53 

54 

55@dataclass 

56class Session: 

57 """Represents one in-memory lobby that can later transition into a running game.""" 

58 

59 join_code: str 

60 settings: GameSettings = field(default_factory=GameSettings) 

61 host_name: str | None = None 

62 players: list[Player] = field(default_factory=list) 

63 lobby_state: LobbyState = LobbyState.OPEN 

64 game: Game | None = None 

65 # Maps opaque player_token → player name for authentication 

66 token_map: dict[str, str] = field(default_factory=dict) 

67 

68 @property 

69 def player_count(self) -> int: 

70 return len(self.players) 

71 

72 @property 

73 def is_joinable(self) -> bool: 

74 return self.lobby_state == LobbyState.OPEN 

75 

76 def add_player(self, player_name: str) -> tuple[Player, str]: 

77 """Join a player to the lobby and assign host rights to the first player. 

78 

79 Returns a ``(Player, token)`` tuple; the token must be stored by the 

80 caller and presented for all future authenticated actions. 

81 """ 

82 self._refresh_lobby_state() 

83 if self.lobby_state == LobbyState.STARTED: 

84 raise SessionStateError("Cannot join a session that has already started") 

85 if self.lobby_state == LobbyState.CLOSED: 

86 raise SessionStateError("Cannot join a closed session") 

87 if self.player_count >= self.settings.max_players: 

88 self.lobby_state = LobbyState.FULL 

89 raise SessionCapacityError( 

90 f"Session {self.join_code} is full ({self.settings.max_players} players)", 

91 ) 

92 

93 normalized_name = _normalize_player_name(player_name) 

94 if any(existing.name == normalized_name for existing in self.players): 

95 raise DuplicatePlayerError( 

96 f"Player name '{normalized_name}' is already taken in session {self.join_code}", 

97 ) 

98 

99 player = Player(name=normalized_name, hand=Hand()) 

100 token = secrets.token_hex(TOKEN_BYTES) 

101 self.token_map[token] = normalized_name 

102 self.players.append(player) 

103 if self.host_name is None: 

104 self.host_name = player.name 

105 self._refresh_lobby_state() 

106 return player, token 

107 

108 def resolve_token(self, token: str) -> str: 

109 """Return the player name for the given token. 

110 

111 Raises :class:`InvalidTokenError` if the token is unknown. 

112 """ 

113 try: 

114 return self.token_map[token] 

115 except KeyError as exc: 

116 raise InvalidTokenError("Invalid or unknown player token") from exc 

117 

118 def remove_player(self, player_name: str) -> None: 

119 """Remove a player from the lobby. 

120 

121 If the departing player was the host, the next remaining player becomes host. 

122 Raises NotInSessionError if the player is not in the session. 

123 Raises SessionStateError if the session has already started. 

124 """ 

125 self._refresh_lobby_state() 

126 if self.lobby_state == LobbyState.STARTED: 

127 raise SessionStateError("Cannot leave a session that has already started") 

128 

129 normalized_name = _normalize_player_name(player_name) 

130 matching = [p for p in self.players if p.name == normalized_name] 

131 if not matching: 

132 raise NotInSessionError( 

133 f"Player '{normalized_name}' is not in session {self.join_code}", 

134 ) 

135 self.players.remove(matching[0]) 

136 

137 reverse = {n: t for t, n in self.token_map.items()} 

138 del self.token_map[reverse[normalized_name]] 

139 

140 if self.host_name == normalized_name: 

141 self.host_name = self.players[0].name if self.players else None 

142 

143 self._refresh_lobby_state() 

144 

145 def close(self) -> None: 

146 """Close the session, preventing any further joins or starts.""" 

147 self._refresh_lobby_state() 

148 if self.lobby_state == LobbyState.STARTED: 

149 raise SessionStateError("Cannot close a session that has already started") 

150 self.lobby_state = LobbyState.CLOSED 

151 

152 def can_start(self) -> bool: 

153 self._refresh_lobby_state() 

154 return self.lobby_state in { 

155 LobbyState.OPEN, LobbyState.FULL, 

156 } and self.player_count >= self.settings.min_players 

157 

158 def start_game(self, requester_name: str) -> Game: 

159 """Create and start the underlying Game from the current lobby state.""" 

160 self._refresh_lobby_state() 

161 if self.lobby_state == LobbyState.STARTED or self.game is not None: 

162 raise SessionStateError("Session has already started") 

163 if self.host_name is None: # pragma: no cover 

164 raise SessionStateError("Cannot start a session without a host") 

165 

166 normalized_requester = _normalize_player_name(requester_name) 

167 if normalized_requester != self.host_name: 

168 raise SessionStateError("Only the host can start the session") 

169 

170 state = GameState() 

171 for player in self.players: 

172 state.add_player(player) 

173 

174 game = Game(state=state, settings=self.settings) 

175 game.start() 

176 

177 self.game = game 

178 self.lobby_state = LobbyState.STARTED 

179 return game 

180 

181 def _refresh_lobby_state(self) -> None: 

182 if self.game is not None: 

183 self.lobby_state = LobbyState.STARTED 

184 elif self.lobby_state == LobbyState.CLOSED: 

185 return 

186 elif self.player_count >= self.settings.max_players: 

187 self.lobby_state = LobbyState.FULL 

188 else: 

189 self.lobby_state = LobbyState.OPEN 

190 

191 

192@dataclass 

193class SessionManager: 

194 """In-memory registry for active multiplayer sessions. 

195 

196 Note: locking here only protects access within a single Python process. 

197 Multi-worker deployments still need an external shared store / coordination layer. 

198 """ 

199 

200 max_sessions: int = 25 

201 code_length: int = 6 

202 code_alphabet: str = JOIN_CODE_ALPHABET 

203 max_code_generation_attempts: int = 32 

204 sessions: dict[str, Session] = field(default_factory=dict) 

205 _random: SystemRandom = field(default_factory=SystemRandom, repr=False) 

206 _lock: threading.RLock = field(default_factory=threading.RLock, init=False, repr=False) 

207 

208 def __post_init__(self) -> None: 

209 if self.max_sessions < 1: 

210 raise ValueError("max_sessions must be at least 1") 

211 if self.code_length < 1: 

212 raise ValueError("code_length must be at least 1") 

213 if len(set(self.code_alphabet)) < 2: 

214 raise ValueError("code_alphabet must contain at least two unique characters") 

215 

216 @property 

217 def session_count(self) -> int: 

218 with self._lock: 

219 return len(self.sessions) 

220 

221 def create_session( 

222 self, 

223 host_name: str | None = None, 

224 settings: GameSettings | None = None, 

225 ) -> tuple[Session, str | None]: 

226 """Create a new lobby and optionally seat its first player immediately. 

227 

228 Returns ``(session, player_token)`` where ``player_token`` is ``None`` 

229 when no ``host_name`` was provided. 

230 """ 

231 with self._lock: 

232 if self.session_count >= self.max_sessions: 

233 raise SessionCapacityError( 

234 f"Maximum number of active sessions reached ({self.max_sessions})", 

235 ) 

236 

237 join_code = self._generate_join_code() 

238 session = Session(join_code=join_code, settings=settings or GameSettings()) 

239 self.sessions[join_code] = session 

240 

241 token: str | None = None 

242 if host_name is not None: 

243 _player, token = session.add_player(host_name) 

244 

245 return session, token 

246 

247 def get_session(self, join_code: str) -> Session: 

248 """Look up an active session by its join code.""" 

249 normalized_code = _normalize_join_code(join_code) 

250 with self._lock: 

251 try: 

252 return self.sessions[normalized_code] 

253 except KeyError as exc: 

254 raise SessionNotFoundError(f"Unknown session code: {normalized_code}") from exc 

255 

256 def join_session(self, join_code: str, player_name: str) -> tuple[Session, str]: 

257 """Join a player to the session addressed by the given code. 

258 

259 Returns ``(session, player_token)``. 

260 """ 

261 with self._lock: 

262 session = self.get_session(join_code) 

263 _player, token = session.add_player(player_name) 

264 return session, token 

265 

266 def start_session(self, join_code: str, player_token: str) -> Game: 

267 """Start the game contained in a session (token-authenticated).""" 

268 with self._lock: 

269 session = self.get_session(join_code) 

270 requester_name = session.resolve_token(player_token) 

271 return session.start_game(requester_name) 

272 

273 def leave_session(self, join_code: str, player_token: str) -> Session | None: 

274 """Remove a player from a session (token-authenticated). 

275 

276 If the session becomes empty after the player leaves, it is automatically 

277 removed from the registry and None is returned. 

278 Otherwise the updated session is returned. 

279 """ 

280 with self._lock: 

281 session = self.get_session(join_code) 

282 player_name = session.resolve_token(player_token) 

283 session.remove_player(player_name) 

284 if session.player_count == 0: 

285 self.remove_session(join_code) 

286 return None 

287 return session 

288 

289 def close_session(self, join_code: str, player_token: str) -> Session: 

290 """Close a session so that no new players can join (token-authenticated). 

291 

292 Only the host may close a session. 

293 """ 

294 with self._lock: 

295 session = self.get_session(join_code) 

296 requester_name = session.resolve_token(player_token) 

297 normalized_requester = _normalize_player_name(requester_name) 

298 if session.host_name is None or normalized_requester != session.host_name: 

299 raise SessionStateError("Only the host can close the session") 

300 session.close() 

301 return session 

302 

303 def remove_session(self, join_code: str) -> Session: 

304 """Remove a session from the in-memory registry.""" 

305 normalized_code = _normalize_join_code(join_code) 

306 with self._lock: 

307 try: 

308 return self.sessions.pop(normalized_code) 

309 except KeyError as exc: 

310 raise SessionNotFoundError(f"Unknown session code: {normalized_code}") from exc 

311 

312 def _generate_join_code(self) -> str: 

313 for _ in range(self.max_code_generation_attempts): 

314 candidate = "".join(self._random.choice(self.code_alphabet) for _ in range(self.code_length)) 

315 if candidate not in self.sessions: 

316 return candidate 

317 raise SessionError("Could not generate a unique join code") 

318 

319 

320def _normalize_player_name(player_name: str) -> str: 

321 normalized_name = player_name.strip() 

322 if not normalized_name: 

323 raise ValueError("Player name cannot be empty") 

324 return normalized_name 

325 

326 

327def _normalize_join_code(join_code: str) -> str: 

328 normalized_code = join_code.strip().upper() 

329 if not normalized_code: 

330 raise ValueError("Join code cannot be empty") 

331 return normalized_code