diff --git a/.env.prod.example b/.env.prod.example index b91104e..d51562e 100644 --- a/.env.prod.example +++ b/.env.prod.example @@ -4,8 +4,8 @@ API_V1_STR=/api/v1 # ── Security ───────────────────────────────────────────────── SECRET_KEY= -ADMIN_EMAIL=admin@unraveldocs.xyz -ADMIN_PASSWORD= +# Note: Admin users are created/managed via CLI: +# python -m app.cli create-admin --email admin@unraveldocs.xyz # ── Database (Supabase) ────────────────────────────────────── # Pooled connection (PgBouncer port 6543) — used by the app at runtime diff --git a/app/__main__.py b/app/__main__.py new file mode 100644 index 0000000..61d365d --- /dev/null +++ b/app/__main__.py @@ -0,0 +1,6 @@ +"""Entrypoint for running the package as a module.""" + +from app.cli import app_cli + +if __name__ == "__main__": + app_cli() diff --git a/app/cli.py b/app/cli.py new file mode 100644 index 0000000..3a45b3b --- /dev/null +++ b/app/cli.py @@ -0,0 +1,204 @@ +"""Command Line Interface for system management tasks. + +This module provides CLI commands to manage admin users and other system +tasks using Typer. +""" + +import typer +from email_validator import EmailNotValidError, validate_email + +from app.core.security import security_service +from app.db.session import SessionLocal, get_engine +from app.modules.auth.constants import UserRole +from app.modules.auth.models import User + +app_cli = typer.Typer(help="FluentMeet management commands.") + + +@app_cli.command() +def create_admin( + email: str = typer.Option( + None, + "--email", + "-e", + help="Email address for the new admin user.", + ), + password: str = typer.Option( + None, + "--password", + "-p", + help="Password for the new admin user. (Insecure if passed via shell history)", + ), + full_name: str = typer.Option( + "System Admin", + "--full-name", + "-n", + help="Full name of the new admin user.", + ), + no_input: bool = typer.Option( + False, + "--no-input", + help="Non-interactive mode (no prompts, fails if credentials not supplied).", + ), +) -> None: + """Create a new admin user.""" + # Handle credentials input based on mode + if no_input: + if not email or not password: + typer.echo( + "Error: Both --email and --password must be provided in " + "non-interactive mode.", + err=True, + ) + raise typer.Exit(code=1) + else: + if not email: + email = typer.prompt("Admin email address") + if not password: + password = typer.prompt( + "Admin password", + hide_input=True, + confirmation_prompt=True, + ) + + # Validate email format + try: + validation = validate_email(email, check_deliverability=False) + email = validation.normalized + except EmailNotValidError as exc: + typer.echo(f"Error: Invalid email address: {exc}", err=True) + raise typer.Exit(code=1) from exc + + # Ensure engine is bound and open session + get_engine() + with SessionLocal() as db: + # Check if user already exists + existing_user = db.query(User).filter(User.email == email).first() + if existing_user: + if existing_user.user_role == UserRole.ADMIN.value: + typer.echo( + f"Error: User with email '{email}' is already an admin.", + err=True, + ) + else: + typer.echo( + f"Error: User with email '{email}' " + "already exists but is not an admin. " + "Use the 'promote-admin' command to " + "promote them instead.", + err=True, + ) + raise typer.Exit(code=1) + + # Create new admin user + hashed_pw = security_service.hash_password(password) + admin_user = User( + email=email, + full_name=full_name, + hashed_password=hashed_pw, + user_role=UserRole.ADMIN.value, + is_active=True, + is_verified=True, + ) + db.add(admin_user) + db.commit() + typer.echo(f"Successfully created admin user: {email}") + + +@app_cli.command() +def promote_admin( + email: str = typer.Option( + None, + "--email", + "-e", + help="Email address of the user to promote to admin.", + ), +) -> None: + """Promote an existing user to admin role.""" + if not email: + email = typer.prompt("Email of user to promote") + + email = email.strip().lower() + + get_engine() + with SessionLocal() as db: + user = db.query(User).filter(User.email == email).first() + if not user: + typer.echo(f"Error: User with email '{email}' does not exist.", err=True) + raise typer.Exit(code=1) + + if user.user_role == UserRole.ADMIN.value: + typer.echo(f"User '{email}' is already an admin.") + return + + user.user_role = UserRole.ADMIN.value + db.commit() + typer.echo(f"Successfully promoted user '{email}' to admin role.") + + +@app_cli.command() +def demote_admin( + email: str = typer.Option( + None, + "--email", + "-e", + help="Email address of the admin user to demote.", + ), +) -> None: + """Demote an admin back to regular user role.""" + if not email: + email = typer.prompt("Email of admin to demote") + + email = email.strip().lower() + + get_engine() + with SessionLocal() as db: + user = db.query(User).filter(User.email == email).first() + if not user: + typer.echo(f"Error: User with email '{email}' does not exist.", err=True) + raise typer.Exit(code=1) + + if user.user_role != UserRole.ADMIN.value: + typer.echo(f"Error: User with email '{email}' is not an admin.", err=True) + raise typer.Exit(code=1) + + # Safety Check: Prevent demoting the last remaining admin user + admin_count = ( + db.query(User).filter(User.user_role == UserRole.ADMIN.value).count() + ) + if admin_count <= 1: + typer.echo( + "Error: Cannot demote the last remaining admin.", + err=True, + ) + raise typer.Exit(code=1) + + user.user_role = UserRole.USER.value + db.commit() + typer.echo(f"Successfully demoted admin '{email}' to regular user role.") + + +@app_cli.command() +def list_admins() -> None: + """List all users with the admin role.""" + get_engine() + with SessionLocal() as db: + admins = db.query(User).filter(User.user_role == UserRole.ADMIN.value).all() + if not admins: + typer.echo("No admin users found.") + return + + typer.echo(f"{'Email':<35} | {'Full Name':<25} | {'Created At':<25}") + typer.echo("-" * 90) + for admin in admins: + created_str = ( + admin.created_at.strftime("%Y-%m-%d %H:%M:%S") + if admin.created_at + else "N/A" + ) + name_str = admin.full_name or "N/A" + typer.echo(f"{admin.email:<35} | {name_str:<25} | {created_str:<25}") + + +if __name__ == "__main__": + app_cli() diff --git a/app/core/config.py b/app/core/config.py index b8baf53..671df41 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -30,10 +30,6 @@ class Settings(BaseSettings): VERSION: str = get_version() API_V1_STR: str = "/api/v1" - # Default Admin - ADMIN_EMAIL: str | None = None - ADMIN_PASSWORD: str | None = None - # Security SECRET_KEY: str = "placeholder_secret_key" ALGORITHM: str = "HS256" diff --git a/app/core/init_admin.py b/app/core/init_admin.py deleted file mode 100644 index 3d39905..0000000 --- a/app/core/init_admin.py +++ /dev/null @@ -1,55 +0,0 @@ -"""Initialization module for default system admin user. - -Triggers an automatic account creation using environment variables. -""" - -import logging - -from sqlalchemy import select -from sqlalchemy.orm import Session - -from app.core.config import settings -from app.core.security import security_service -from app.modules.auth.constants import UserRole -from app.modules.auth.models import User - -logger = logging.getLogger(__name__) - - -def init_admin(db: Session) -> None: - """Initialize a default admin account on server startup natively. - - Args: - db (Session): Database transaction session. - """ - if not settings.ADMIN_EMAIL or not settings.ADMIN_PASSWORD: - logger.info( - "Admin credentials not fully set in .env, skipping admin initialization." - ) - return - - admin_email = settings.ADMIN_EMAIL.lower() - - stmt = select(User).where(User.email == admin_email) - existing_admin = db.execute(stmt).scalar_one_or_none() - - if existing_admin: - if existing_admin.user_role != UserRole.ADMIN.value: - existing_admin.user_role = UserRole.ADMIN.value - db.commit() - logger.info("Existing admin user updated with ADMIN role.") - return - - logger.info("Creating default admin user: System Admin") - - admin_user = User( - email=admin_email, - full_name="System Admin", - hashed_password=security_service.hash_password(settings.ADMIN_PASSWORD), - user_role=UserRole.ADMIN.value, - is_active=True, - is_verified=True, - ) - db.add(admin_user) - db.commit() - logger.info("Default admin user created successfully.") diff --git a/app/main.py b/app/main.py index b39f185..8b8343d 100644 --- a/app/main.py +++ b/app/main.py @@ -9,10 +9,8 @@ from app.core.config import settings from app.core.exception_handlers import register_exception_handlers -from app.core.init_admin import init_admin from app.core.rate_limiter import limiter, rate_limit_exception_handler from app.core.sanitize import sanitize_for_log -from app.db.session import SessionLocal, get_engine from app.kafka.manager import get_kafka_manager from app.routers import api_router @@ -36,15 +34,6 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: # Keep API startup alive in environments where Kafka isn't available (e.g. CI). logger.warning("Kafka startup skipped: %s", sanitize_for_log(exc)) - # Initialize Admin - try: - # Ensure the engine is initialized and SessionLocal is bound - get_engine() - with SessionLocal() as db_session: - init_admin(db_session) - except Exception as exc: - logger.warning("Admin initialization failed: %s", sanitize_for_log(exc)) - yield # Shutdown if kafka_started: diff --git a/app/modules/meeting/api_docs.md b/app/modules/meeting/api_docs.md index 6c8a2a7..3e46cf6 100644 --- a/app/modules/meeting/api_docs.md +++ b/app/modules/meeting/api_docs.md @@ -19,10 +19,14 @@ - [POST /{room_code}/join](#post-room_codejoin) - [POST /{room_code}/leave](#post-room_codeleave) - [POST /{room_code}/admit/{user_id}](#post-room_codeadmituser_id) + - [POST /{room_code}/reject/{user_id}](#post-room_coderejectuser_id) + - [POST /{room_code}/admit-all](#post-room_codeadmit-all) + - [POST /{room_code}/reject-all](#post-room_codereject-all) - [POST /{room_code}/end](#post-room_codeend) - [PATCH /{room_code}/config](#patch-room_codeconfig) - [POST /{room_code}/invite](#post-room_codeinvite) - [WebSocket Endpoints](#websocket-endpoints) + - [WS /lobby/{room_code}](#ws-lobbyroom_code) - [WS /signaling/{room_code}](#ws-signalingroom_code) - [WS /audio/{room_code}](#ws-audioroom_code) - [WS /captions/{room_code}](#ws-captionsroom_code) @@ -363,7 +367,7 @@ Admit a waitlisted participant out of the lobby and into the live room. **🔒 Requires Authentication:** Host only. -**Response: `200 OK`** Returns the updated live participant list and lobby list in the same format as `GET /{room_code}/participants` to sync the host's view immediately. +**Response: `200 OK`** ```json { @@ -374,6 +378,63 @@ Admit a waitlisted participant out of the lobby and into the live room. --- +### POST /{room_code}/reject/{user_id} + +Reject a waitlisted participant from the lobby. + +**🔒 Requires Authentication:** Host only. + +**Response: `200 OK`** + +```json +{ + "status": "success", + "message": "User rejected from lobby successfully." +} +``` + +--- + +### POST /{room_code}/admit-all + +Admit all users currently waiting in the lobby into the active room. + +**🔒 Requires Authentication:** Host only. + +**Response: `200 OK`** + +```json +{ + "status": "success", + "message": "All lobby users admitted successfully.", + "data": { + "admitted_count": 3 + } +} +``` + +--- + +### POST /{room_code}/reject-all + +Reject all users currently waiting in the lobby. + +**🔒 Requires Authentication:** Host only. + +**Response: `200 OK`** + +```json +{ + "status": "success", + "message": "All lobby users rejected successfully.", + "data": { + "rejected_count": 3 + } +} +``` + +--- + ### POST /{room_code}/end Forcibly end the meeting. Immediately updates the DB state to `ENDED`, tallies up the `duration_minutes`, and wipes all real-time structures in Redis. @@ -478,6 +539,17 @@ Dispatch email invitations utilizing the async Kafka email producer. Clients connect using a `?token=` query parameter for authentication instead of HTTP headers. The JWT can be a standard Access Token or a Guest Token returned from `POST /{room_code}/join`. +### WS /lobby/{room_code} + +- **Purpose:** WebSocket for users waiting in the lobby. Connected when the POST `/join` call returns `{"status": "waiting"}`. +- **Authentication:** Connects using `?token=` query parameter (Access Token or Guest Token). +- **Events Pushed from Server:** + - `{"type": "admitted", "room_code": "..."}` -> Waitlisted user is admitted to the meeting. The client should close this connection and open connections to the signaling, audio, and captions WebSockets. + - `{"type": "rejected", "reason": "Host denied entry"}` -> Waitlisted user entry was denied. The client should close the connection and show a rejection screen. + - `{"type": "meeting_ended"}` -> Meeting was ended by the host while the user was waiting in the lobby. +- **Events Sent by Client:** + - `{"type": "cancel"}` -> User cancels their wait and exits the lobby. This removes them from the Redis lobby state and sends a `lobby_cancel` notification to the Host over the signaling WebSocket. + ### WS /signaling/{room_code} - **Purpose:** Relay mechanism for WebRTC handshakes (Offer/Answer/ICE candidates). diff --git a/app/modules/meeting/constants.py b/app/modules/meeting/constants.py index c98485e..70623d1 100644 --- a/app/modules/meeting/constants.py +++ b/app/modules/meeting/constants.py @@ -40,6 +40,9 @@ class InvitationStatus(enum.StrEnum): MSG_ROOM_JOINED = "Joined room successfully." MSG_ROOM_LEFT = "Left room successfully." MSG_USER_ADMITTED = "User admitted to room." +MSG_USER_REJECTED = "User rejected from lobby successfully." +MSG_ALL_USERS_ADMITTED = "All lobby users admitted successfully." +MSG_ALL_USERS_REJECTED = "All lobby users rejected successfully." MSG_MEETING_ENDED = "Meeting ended successfully." MSG_ROOM_CONFIG_UPDATED = "Room configuration updated." MSG_MEETING_HISTORY = "Meeting history retrieved successfully." @@ -91,3 +94,7 @@ def key_room_lobby(room_code: str) -> str: def key_room_active_speaker(room_code: str) -> str: return f"room:{room_code}:active_speaker" + + +def key_lobby_channel(room_code: str) -> str: + return f"ws:lobby:{room_code}" diff --git a/app/modules/meeting/router.py b/app/modules/meeting/router.py index bee1927..43b7f98 100644 --- a/app/modules/meeting/router.py +++ b/app/modules/meeting/router.py @@ -14,6 +14,8 @@ from app.core.dependencies import get_current_user, get_current_user_optional from app.modules.auth.models import User from app.modules.meeting.constants import ( + MSG_ALL_USERS_ADMITTED, + MSG_ALL_USERS_REJECTED, MSG_INVITATIONS_SENT, MSG_MEETING_ENDED, MSG_MEETING_HISTORY, @@ -23,6 +25,7 @@ MSG_ROOM_JOINED, MSG_ROOM_LEFT, MSG_USER_ADMITTED, + MSG_USER_REJECTED, ) from app.modules.meeting.dependencies import get_meeting_service from app.modules.meeting.schemas import ( @@ -216,6 +219,68 @@ async def admit_user( ) +@router.post( + "/{room_code}/reject/{user_id}", + status_code=status.HTTP_200_OK, + summary="Host proxy: rejects a user from the waiting room lobby", +) +async def reject_user( + room_code: str, + user_id: str, + current_user: User = Depends(get_current_user), + service: MeetingService = Depends(get_meeting_service), +) -> JSONResponse: + await service.reject_user( + host=current_user, room_code=room_code, target_user_id=user_id + ) + return JSONResponse( + content={"status": "success", "message": MSG_USER_REJECTED}, + status_code=status.HTTP_200_OK, + ) + + +@router.post( + "/{room_code}/admit-all", + status_code=status.HTTP_200_OK, + summary="Host proxy: admits all users from the waiting room lobby", +) +async def admit_all_users( + room_code: str, + current_user: User = Depends(get_current_user), + service: MeetingService = Depends(get_meeting_service), +) -> JSONResponse: + count = await service.admit_all_users(host=current_user, room_code=room_code) + return JSONResponse( + content={ + "status": "success", + "message": MSG_ALL_USERS_ADMITTED, + "data": {"admitted_count": count}, + }, + status_code=status.HTTP_200_OK, + ) + + +@router.post( + "/{room_code}/reject-all", + status_code=status.HTTP_200_OK, + summary="Host proxy: rejects all users from the waiting room lobby", +) +async def reject_all_users( + room_code: str, + current_user: User = Depends(get_current_user), + service: MeetingService = Depends(get_meeting_service), +) -> JSONResponse: + count = await service.reject_all_users(host=current_user, room_code=room_code) + return JSONResponse( + content={ + "status": "success", + "message": MSG_ALL_USERS_REJECTED, + "data": {"rejected_count": count}, + }, + status_code=status.HTTP_200_OK, + ) + + @router.post( "/{room_code}/end", response_model=RoomApiResponse, diff --git a/app/modules/meeting/service.py b/app/modules/meeting/service.py index a754dae..f543a71 100644 --- a/app/modules/meeting/service.py +++ b/app/modules/meeting/service.py @@ -514,20 +514,61 @@ async def admit_user(self, host: User, room_code: str, target_user_id: str) -> N if not room or room.host_id != host.id: raise ForbiddenException(message="Only the host can admit participants.") - # Fetch display_name BEFORE admit_from_lobby + # Fetch display_name and languages BEFORE admit_from_lobby # removes the entry from the lobby hash. lobby = await self.state.get_lobby(room_code) - display_name = lobby.get(target_user_id, {}).get("display_name", "") + lobby_data = lobby.get(target_user_id) + if not lobby_data: + raise BadRequestException(message="User is not in the lobby.") + + display_name = lobby_data.get("display_name", "") + listening_language = lobby_data.get("language") + speaking_language = lobby_data.get("speaking_language") was_in_lobby = await self.state.admit_from_lobby(room_code, target_user_id) if not was_in_lobby: raise BadRequestException(message="User is not in the lobby.") + # Find or create Participant, add to active room, persist to DB + try: + target_uuid = uuid.UUID(target_user_id) + except ValueError: + target_uuid = uuid.uuid5(uuid.NAMESPACE_DNS, target_user_id) + + user = self.repo.db.get(User, target_uuid) + + # Check if participant already exists in DB + user_uuid = target_uuid if user else None + guest_uuid = target_uuid if not user else None + ptc = self.repo.get_participant( + room.id, user_id=user_uuid, guest_session_id=guest_uuid + ) + + # Call _finalize_join to persist to DB and write to Redis active participants + await self._finalize_join( + room, + room_code, + ptc=ptc, + user=user, + tracking_id=str(target_uuid), + display_name=display_name, + listening_language=listening_language, + speaking_language=speaking_language, + new_guest_token=None, + role=( + ParticipantRole.PARTICIPANT.value + if user + else ParticipantRole.GUEST.value + ), + ) + cm = get_connection_manager() - await cm.send_to_user( + # Send admission message via lobby connection channel where user is listening + await cm.send_to_lobby_user( room_code, target_user_id, {"type": "admitted", "room_code": room_code} ) + # Notify existing participants that the newly admitted user has joined. # Without this broadcast, peers already in the room never know the new # participant exists and won't initiate WebRTC offers to them. @@ -537,11 +578,146 @@ async def admit_user(self, host: User, room_code: str, target_user_id: str) -> N "type": "user_joined", "user_id": target_user_id, "display_name": display_name, - "role": "guest", + "role": ( + ParticipantRole.PARTICIPANT.value + if user + else ParticipantRole.GUEST.value + ), }, sender_id=target_user_id, # Exclude the admitted user ) + async def admit_all_users(self, host: User, room_code: str) -> int: + """Host admits all users currently waiting in the lobby.""" + room = self.repo.get_room_by_code(room_code) + if not room or room.host_id != host.id: + raise ForbiddenException(message="Only the host can admit participants.") + + lobby = await self.state.get_lobby(room_code) + if not lobby: + raise BadRequestException(message="No users in the lobby.") + + cm = get_connection_manager() + admitted_count = 0 + + for user_id, lobby_data in lobby.items(): + was_in_lobby = await self.state.admit_from_lobby(room_code, user_id) + if not was_in_lobby: + continue + + display_name = lobby_data.get("display_name", "") + listening_language = lobby_data.get("language") + speaking_language = lobby_data.get("speaking_language") + + try: + target_uuid = uuid.UUID(user_id) + except ValueError: + target_uuid = uuid.uuid5(uuid.NAMESPACE_DNS, user_id) + + user = self.repo.db.get(User, target_uuid) + + user_uuid = target_uuid if user else None + guest_uuid = target_uuid if not user else None + ptc = self.repo.get_participant( + room.id, user_id=user_uuid, guest_session_id=guest_uuid + ) + + await self._finalize_join( + room, + room_code, + ptc=ptc, + user=user, + tracking_id=str(target_uuid), + display_name=display_name, + listening_language=listening_language, + speaking_language=speaking_language, + new_guest_token=None, + role=( + ParticipantRole.PARTICIPANT.value + if user + else ParticipantRole.GUEST.value + ), + ) + + await cm.send_to_lobby_user( + room_code, user_id, {"type": "admitted", "room_code": room_code} + ) + + await cm.broadcast_to_room( + room_code, + { + "type": "user_joined", + "user_id": user_id, + "display_name": display_name, + "role": ( + ParticipantRole.PARTICIPANT.value + if user + else ParticipantRole.GUEST.value + ), + }, + sender_id=user_id, + ) + admitted_count += 1 + + return admitted_count + + async def reject_user( + self, host: User, room_code: str, target_user_id: str + ) -> None: + """Host rejects a specific user from the lobby.""" + room = self.repo.get_room_by_code(room_code) + if not room or room.host_id != host.id: + raise ForbiddenException(message="Only the host can reject participants.") + + lobby = await self.state.get_lobby(room_code) + if target_user_id not in lobby: + raise BadRequestException(message="User is not in the lobby.") + + await self.state.remove_from_lobby(room_code, target_user_id) + + cm = get_connection_manager() + await cm.send_to_lobby_user( + room_code, + target_user_id, + {"type": "rejected", "reason": "Host denied entry"}, + ) + + async def reject_all_users(self, host: User, room_code: str) -> int: + """Host rejects all users currently waiting in the lobby.""" + room = self.repo.get_room_by_code(room_code) + if not room or room.host_id != host.id: + raise ForbiddenException(message="Only the host can reject participants.") + + lobby = await self.state.get_lobby(room_code) + if not lobby: + raise BadRequestException(message="No users in the lobby.") + + cm = get_connection_manager() + rejected_count = 0 + + for user_id in list(lobby.keys()): + await self.state.remove_from_lobby(room_code, user_id) + await cm.send_to_lobby_user( + room_code, + user_id, + {"type": "rejected", "reason": "Host denied entry"}, + ) + rejected_count += 1 + + return rejected_count + + async def cancel_lobby_wait( + self, room_code: str, user_id: str, _user: User | None = None + ) -> None: + """User cancels their own wait in the lobby.""" + await self.state.remove_from_lobby(room_code, user_id) + + cm = get_connection_manager() + await cm.broadcast_to_room( + room_code, + {"type": "lobby_cancel", "user_id": user_id}, + ) + async def end_room(self, host: User, room_code: str) -> Room: """Host forcibly ends the meeting for everyone.""" room = self.repo.get_room_by_code(room_code) @@ -558,6 +734,10 @@ async def end_room(self, host: User, room_code: str) -> Room: room_code, {"type": "meeting_ended"}, ) + await cm.broadcast_to_lobby( + room_code, + {"type": "meeting_ended"}, + ) # Update DB status now = utc_now() diff --git a/app/modules/meeting/ws_dependencies.py b/app/modules/meeting/ws_dependencies.py index 40bfe55..0e4626d 100644 --- a/app/modules/meeting/ws_dependencies.py +++ b/app/modules/meeting/ws_dependencies.py @@ -97,3 +97,28 @@ async def assert_room_participant(room_code: str, user_id: str) -> dict: reason="User is not a participant of this room", ) return participant_state + + +async def assert_lobby_participant(room_code: str, user_id: str) -> dict: + """Ensure the user is in the lobby hash (not participants hash). + + This is the inverse of assert_room_participant — used exclusively + for the lobby WebSocket endpoint. + + Args: + room_code (str): Active room code. + user_id (str): Waitlisted user ID. + + Returns: + dict: The waitlisted lobby state dictionary. + """ + state_service = MeetingStateService() + lobby = await state_service.get_lobby(room_code) + + lobby_state = lobby.get(user_id) + if not lobby_state: + raise WebSocketException( + code=status.WS_1008_POLICY_VIOLATION, + reason="User is not in the lobby", + ) + return lobby_state diff --git a/app/modules/meeting/ws_router.py b/app/modules/meeting/ws_router.py index 698277c..2d8f2ae 100644 --- a/app/modules/meeting/ws_router.py +++ b/app/modules/meeting/ws_router.py @@ -13,7 +13,11 @@ from app.core.sanitize import log_sanitizer, sanitize_for_log from app.modules.meeting.state import MeetingStateService -from app.modules.meeting.ws_dependencies import assert_room_participant, authenticate_ws +from app.modules.meeting.ws_dependencies import ( + assert_lobby_participant, + assert_room_participant, + authenticate_ws, +) from app.schemas.pipeline import ( SynthesizedAudioEvent, ) @@ -334,3 +338,54 @@ async def captions_websocket( pass finally: await pubsub.unsubscribe(channel) + + +@router.websocket("/lobby/{room_code}") +async def lobby_websocket( + websocket: WebSocket, + room_code: str, + user_id: str = Depends(authenticate_ws), +) -> None: + """WebSocket for users waiting in the lobby. + + Lobby users connect here after POST /join returns {"status": "waiting"}. + They receive real-time server-pushed events: + - {"type": "admitted"} -> user should close this WS and connect to signaling + - {"type": "rejected"} -> user should close this WS and show rejection UI + - {"type": "meeting_ended"} -> meeting was ended while user was waiting + + They can also SEND client messages: + - {"type": "cancel"} -> user cancels their wait (removes from lobby) + """ + try: + _ = await assert_lobby_participant(room_code, user_id) + except Exception as e: + await websocket.close(code=1008, reason=str(e)) + return + + await websocket.accept() + + manager = get_connection_manager() + await manager.connect_lobby(room_code, user_id, websocket) + + try: + while True: + data = await websocket.receive_text() + try: + payload = json.loads(data) + msg_type = payload.get("type") + if msg_type == "cancel": + from app.modules.meeting.service import MeetingService + from app.modules.meeting.state import MeetingStateService + + state = MeetingStateService() + service = MeetingService(repo=None, state=state) # type: ignore[arg-type] + await service.cancel_lobby_wait(room_code, user_id) + await websocket.close(code=1000, reason="Canceled wait") + break + except json.JSONDecodeError: + logger.warning("Invalid JSON received on lobby WS") + except WebSocketDisconnect: + pass + finally: + manager.disconnect_lobby(room_code, user_id) diff --git a/app/services/connection_manager.py b/app/services/connection_manager.py index f6ae43d..31e088e 100644 --- a/app/services/connection_manager.py +++ b/app/services/connection_manager.py @@ -35,6 +35,10 @@ def __init__(self, redis_client: Redis) -> None: self.active_connections: dict[str, dict[str, WebSocket]] = {} # Maps room_code -> BackgroundTask (Redis subscriber) self._pubsub_tasks: dict[str, asyncio.Task] = {} + # Maps room_code -> { user_id -> WebSocket } for lobby waiting rooms + self.lobby_connections: dict[str, dict[str, WebSocket]] = {} + # Maps room_code -> BackgroundTask (Redis lobby subscriber) + self._lobby_pubsub_tasks: dict[str, asyncio.Task] = {} self.redis = redis_client async def connect(self, room_code: str, user_id: str, websocket: WebSocket) -> None: @@ -189,6 +193,173 @@ async def _listen_to_redis(self, room_code: str) -> None: # noqa: C901 finally: await pubsub.unsubscribe(channel) + async def connect_lobby( + self, room_code: str, user_id: str, websocket: WebSocket + ) -> None: + """Register an accepted lobby WebSocket connection in the manager. + + Args: + room_code (str): The active room code. + user_id (str): The connecting waitlisted participant's user id. + websocket (WebSocket): The active websocket connection. + """ + if room_code not in self.lobby_connections: + self.lobby_connections[room_code] = {} + # Start lobby pub/sub listener for the room + self._start_lobby_listening(room_code) + + self.lobby_connections[room_code][user_id] = websocket + logger.info( + "User %s connected to lobby WS for room %s", + log_sanitizer.sanitize(user_id), + log_sanitizer.sanitize(room_code), + ) + + def disconnect_lobby(self, room_code: str, user_id: str) -> None: + """Remove a lobby WebSocket connection from the manager. + + Args: + room_code (str): The room the user is disconnecting from. + user_id (str): The disconnecting waitlisted user id. + """ + if room_code in self.lobby_connections: + self.lobby_connections[room_code].pop(user_id, None) + logger.info( + "User %s disconnected from lobby WS for room %s", + log_sanitizer.sanitize(user_id), + log_sanitizer.sanitize(room_code), + ) + + # Clean up empty lobbies + if not self.lobby_connections[room_code]: + del self.lobby_connections[room_code] + self._stop_lobby_listening(room_code) + + async def broadcast_to_lobby(self, room_code: str, message: dict) -> None: + """Publish a message to all users in a lobby across all instances. + + Args: + room_code (str): The room whose lobby to broadcast to. + message (dict): The message payload. + """ + payload = {"type": "broadcast", "data": message} + from app.modules.meeting.constants import key_lobby_channel + + await self.redis.publish(key_lobby_channel(room_code), json.dumps(payload)) + + async def send_to_lobby_user( + self, room_code: str, target_user_id: str, message: dict + ) -> None: + """Publish a message to a specific user in a lobby across all instances. + + Args: + room_code (str): The room containing the target. + target_user_id (str): The specific user to receive the message. + message (dict): The message payload. + """ + payload = { + "type": "unicast", + "target_user_id": target_user_id, + "data": message, + } + from app.modules.meeting.constants import key_lobby_channel + + await self.redis.publish(key_lobby_channel(room_code), json.dumps(payload)) + + def _start_lobby_listening(self, room_code: str) -> None: + """Start a background task to listen for lobby messages on Redis. + + Args: + room_code (str): The room code to subscribe to. + """ + if room_code not in self._lobby_pubsub_tasks: + task = asyncio.create_task(self._listen_to_lobby_redis(room_code)) + self._lobby_pubsub_tasks[room_code] = task + + def _stop_lobby_listening(self, room_code: str) -> None: + """Cancel the background task listening for lobby messages. + + Args: + room_code (str): The room code to unsubscribe from. + """ + task = self._lobby_pubsub_tasks.pop(room_code, None) + if task and not task.done(): + task.cancel() + + async def _listen_to_lobby_redis(self, room_code: str) -> None: + """Listen to a lobby Redis channel and dispatch to local websockets. + + Args: + room_code (str): The room code being monitored. + """ + pubsub = self.redis.pubsub() + from app.modules.meeting.constants import key_lobby_channel + + channel = key_lobby_channel(room_code) + await pubsub.subscribe(channel) + + try: + async for message in pubsub.listen(): + if message["type"] != "message": + continue + + payload = json.loads(message["data"]) + msg_type = payload.get("type") + data = payload.get("data") + + # Check if lobby is still active locally + if room_code not in self.lobby_connections: + break + + if msg_type == "broadcast": + await self._dispatch_lobby_broadcast(room_code, data) + elif msg_type == "unicast": + await self._dispatch_lobby_unicast(room_code, payload, data) + except asyncio.CancelledError: + pass + finally: + await pubsub.unsubscribe(channel) + + async def _dispatch_lobby_broadcast(self, room_code: str, data: dict) -> None: + """Dispatch a broadcast event to all lobby WebSockets in a room.""" + for user_id, ws in list(self.lobby_connections[room_code].items()): + try: + await ws.send_json(data) + if data.get("type") in ( + "meeting_ended", + "rejected", + "admitted", + ): + await ws.close(code=1000) + except Exception: + logger.warning( + "Failed to send lobby message to %s", + log_sanitizer.sanitize(user_id), + ) + + async def _dispatch_lobby_unicast( + self, room_code: str, payload: dict, data: dict + ) -> None: + """Dispatch a unicast event to a target lobby WebSocket in a room.""" + target_id = payload.get("target_user_id") + if not isinstance(target_id, str): + return + target_ws = self.lobby_connections[room_code].get(target_id) + if target_ws: + try: + await target_ws.send_json(data) + if data.get("type") in ( + "meeting_ended", + "rejected", + "admitted", + ): + await target_ws.close(code=1000) + except Exception: + logger.warning( + "Failed to send lobby unicast message to %s", + log_sanitizer.sanitize(target_id), + ) + # ── Module-level Dependency ─────────────────────────────────────────── diff --git a/tests/meeting/test_meeting_service.py b/tests/meeting/test_meeting_service.py index f4ce0da..ddb18be 100644 --- a/tests/meeting/test_meeting_service.py +++ b/tests/meeting/test_meeting_service.py @@ -103,6 +103,8 @@ def mock_cm(): mock_instance = MagicMock() mock_instance.broadcast_to_room = AsyncMock() mock_instance.send_to_user = AsyncMock() + mock_instance.broadcast_to_lobby = AsyncMock() + mock_instance.send_to_lobby_user = AsyncMock() mock_get.return_value = mock_instance yield mock_instance