Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions app/modules/meeting/api_docs.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# FluentMeet Meeting API Documentation
# Spoken.ai Meeting API Documentation

> **Base URL:** `/api/v1/meetings`
> **Version:** 1.0 · **Protocol:** REST over HTTPS & WebSockets · **Content-Type:** `application/json`
Expand Down Expand Up @@ -38,7 +38,7 @@

## Overview

The FluentMeet meeting module provides comprehensive meeting management, supporting:
The Spoken.ai meeting module provides comprehensive meeting management, supporting:

- **Room Management:** Creation, scheduling, retrieval, updates, and forced ending.
- **Participant Tracking:** Identifying registered users and dynamic token-based guests.
Expand Down
58 changes: 38 additions & 20 deletions app/modules/meeting/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,32 +510,34 @@ async def leave_room(

async def admit_user(self, host: User, room_code: str, target_user_id: str) -> None:
"""Host admits a specific user from the lobby into the active room."""
try:
target_uuid = uuid.UUID(target_user_id)
normalized_user_id = str(target_uuid)
except ValueError:
target_uuid = uuid.uuid5(uuid.NAMESPACE_DNS, target_user_id)
normalized_user_id = target_user_id

room = self.repo.get_room_by_code(room_code)
if not room or room.host_id != host.id:
raise ForbiddenException(message="Only the host can admit participants.")

# Fetch display_name and languages BEFORE admit_from_lobby
# removes the entry from the lobby hash.
lobby = await self.state.get_lobby(room_code)
lobby_data = lobby.get(target_user_id)
lobby_data = lobby.get(normalized_user_id)
if not lobby_data:
raise BadRequestException(message="User is not in the lobby.")

display_name = lobby_data.get("display_name", "")
listening_language = lobby_data.get("language")
speaking_language = lobby_data.get("speaking_language")

was_in_lobby = await self.state.admit_from_lobby(room_code, target_user_id)
was_in_lobby = await self.state.admit_from_lobby(room_code, normalized_user_id)

if not was_in_lobby:
raise BadRequestException(message="User is not in the lobby.")

# Find or create Participant, add to active room, persist to DB
try:
target_uuid = uuid.UUID(target_user_id)
except ValueError:
target_uuid = uuid.uuid5(uuid.NAMESPACE_DNS, target_user_id)

user = self.repo.db.get(User, target_uuid)

# Check if participant already exists in DB
Expand Down Expand Up @@ -566,7 +568,7 @@ async def admit_user(self, host: User, room_code: str, target_user_id: str) -> N
cm = get_connection_manager()
# Send admission message via lobby connection channel where user is listening
await cm.send_to_lobby_user(
room_code, target_user_id, {"type": "admitted", "room_code": room_code}
room_code, normalized_user_id, {"type": "admitted", "room_code": room_code}
)

# Notify existing participants that the newly admitted user has joined.
Expand All @@ -576,15 +578,15 @@ async def admit_user(self, host: User, room_code: str, target_user_id: str) -> N
room_code,
{
"type": "user_joined",
"user_id": target_user_id,
"user_id": normalized_user_id,
"display_name": display_name,
"role": (
ParticipantRole.PARTICIPANT.value
if user
else ParticipantRole.GUEST.value
),
},
sender_id=target_user_id, # Exclude the admitted user
sender_id=normalized_user_id, # Exclude the admitted user
)

async def admit_all_users(self, host: User, room_code: str) -> int:
Expand All @@ -600,7 +602,14 @@ async def admit_all_users(self, host: User, room_code: str) -> int:
cm = get_connection_manager()
admitted_count = 0

for user_id, lobby_data in lobby.items():
for raw_user_id, lobby_data in lobby.items():
try:
target_uuid = uuid.UUID(raw_user_id)
user_id = str(target_uuid)
except ValueError:
target_uuid = uuid.uuid5(uuid.NAMESPACE_DNS, raw_user_id)
user_id = raw_user_id

was_in_lobby = await self.state.admit_from_lobby(room_code, user_id)
if not was_in_lobby:
continue
Expand All @@ -609,11 +618,6 @@ async def admit_all_users(self, host: User, room_code: str) -> int:
listening_language = lobby_data.get("language")
speaking_language = lobby_data.get("speaking_language")

try:
target_uuid = uuid.UUID(user_id)
except ValueError:
target_uuid = uuid.uuid5(uuid.NAMESPACE_DNS, user_id)

user = self.repo.db.get(User, target_uuid)

user_uuid = target_uuid if user else None
Expand Down Expand Up @@ -665,20 +669,27 @@ async def reject_user(
self, host: User, room_code: str, target_user_id: str
) -> None:
"""Host rejects a specific user from the lobby."""
try:
target_uuid = uuid.UUID(target_user_id)
normalized_user_id = str(target_uuid)
except ValueError:
target_uuid = uuid.uuid5(uuid.NAMESPACE_DNS, target_user_id)
normalized_user_id = target_user_id

room = self.repo.get_room_by_code(room_code)
if not room or room.host_id != host.id:
raise ForbiddenException(message="Only the host can reject participants.")

lobby = await self.state.get_lobby(room_code)
if target_user_id not in lobby:
if normalized_user_id not in lobby:
raise BadRequestException(message="User is not in the lobby.")

await self.state.remove_from_lobby(room_code, target_user_id)
await self.state.remove_from_lobby(room_code, normalized_user_id)

cm = get_connection_manager()
await cm.send_to_lobby_user(
room_code,
target_user_id,
normalized_user_id,
{"type": "rejected", "reason": "Host denied entry"},
)

Expand All @@ -695,7 +706,14 @@ async def reject_all_users(self, host: User, room_code: str) -> int:
cm = get_connection_manager()
rejected_count = 0

for user_id in list(lobby.keys()):
for raw_user_id in list(lobby.keys()):
try:
target_uuid = uuid.UUID(raw_user_id)
user_id = str(target_uuid)
except ValueError:
target_uuid = uuid.uuid5(uuid.NAMESPACE_DNS, raw_user_id)
user_id = raw_user_id

await self.state.remove_from_lobby(room_code, user_id)
await cm.send_to_lobby_user(
room_code,
Expand Down
21 changes: 21 additions & 0 deletions app/modules/meeting/ws_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,27 @@ async def lobby_websocket(
try:
_ = await assert_lobby_participant(room_code, user_id)
except Exception as e:
# Check if the user is already admitted to the meeting room
# (present in participants list)
try:
from app.modules.meeting.state import MeetingStateService

state_service = MeetingStateService()
participants = await state_service.get_participants(room_code)
normalized_user_id = user_id.lower().strip()
normalized_participants = {k.lower().strip() for k in participants}
if normalized_user_id in normalized_participants:
await websocket.accept()
await websocket.send_json({"type": "admitted", "room_code": room_code})
await websocket.close(code=1000)
return
except Exception as inner_e:
logger.error(
"Error checking alternative participant admission in "
"lobby WebSocket: %s",
inner_e,
)

await websocket.close(code=1008, reason=str(e))
return

Expand Down
9 changes: 9 additions & 0 deletions app/services/connection_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ async def connect(self, room_code: str, user_id: str, websocket: WebSocket) -> N
user_id (str): The connecting participant's user id.
websocket (WebSocket): The active websocket connection.
"""
user_id = user_id.lower().strip()
if room_code not in self.active_connections:
self.active_connections[room_code] = {}
# Start pub/sub listener for the room
Expand All @@ -68,6 +69,7 @@ def disconnect(self, room_code: str, user_id: str) -> None:
room_code (str): The room the user is disconnecting from.
user_id (str): The disconnecting participant's user id.
"""
user_id = user_id.lower().strip()
if room_code in self.active_connections:
self.active_connections[room_code].pop(user_id, None)
logger.info(
Expand Down Expand Up @@ -166,6 +168,8 @@ async def _listen_to_redis(self, room_code: str) -> None: # noqa: C901

if msg_type == "broadcast":
sender_id = payload.get("sender_id")
if isinstance(sender_id, str):
sender_id = sender_id.lower().strip()
for user_id, ws in list(self.active_connections[room_code].items()):
# Don't echo back to the sender
if user_id != sender_id:
Expand All @@ -179,6 +183,8 @@ async def _listen_to_redis(self, room_code: str) -> None: # noqa: C901

elif msg_type == "unicast":
target_id = payload.get("target_user_id")
if isinstance(target_id, str):
target_id = target_id.lower().strip()
target_ws = self.active_connections[room_code].get(target_id)
if target_ws:
try:
Expand All @@ -203,6 +209,7 @@ async def connect_lobby(
user_id (str): The connecting waitlisted participant's user id.
websocket (WebSocket): The active websocket connection.
"""
user_id = user_id.lower().strip()
if room_code not in self.lobby_connections:
self.lobby_connections[room_code] = {}
# Start lobby pub/sub listener for the room
Expand All @@ -222,6 +229,7 @@ def disconnect_lobby(self, room_code: str, user_id: str) -> None:
room_code (str): The room the user is disconnecting from.
user_id (str): The disconnecting waitlisted user id.
"""
user_id = user_id.lower().strip()
if room_code in self.lobby_connections:
self.lobby_connections[room_code].pop(user_id, None)
logger.info(
Expand Down Expand Up @@ -344,6 +352,7 @@ async def _dispatch_lobby_unicast(
target_id = payload.get("target_user_id")
if not isinstance(target_id, str):
return
target_id = target_id.lower().strip()
target_ws = self.lobby_connections[room_code].get(target_id)
if target_ws:
try:
Expand Down
Loading