diff --git a/README.md b/README.md index 9ea48b2..f7ed764 100644 --- a/README.md +++ b/README.md @@ -158,21 +158,20 @@ async def on_first_participant_joined(transport, payload): ### Available Events -| Event | Arguments | Description | -|-------------------------------|--------------------------|------------------------------------------------------| -| `on_connected` | — | Bot has connected to the call | -| `on_disconnected` | — | Bot has disconnected from the call | -| `on_before_disconnect` | — | Called before the bot disconnects | -| `on_participant_connected` | `participant_id` | A participant has joined the call | -| `on_participant_disconnected` | `participant_id` | A participant has left the call | -| `on_participant_left` | `participant_id` | A participant has left the call | -| `on_first_participant_joined` | `participant_id` | The first participant has joined the call | -| `on_audio_track_subscribed` | `participant_id` | Audio from a participant is now being received | -| `on_audio_track_unsubscribed` | `participant_id` | Audio from a participant is no longer being received | -| `on_video_track_subscribed` | `participant_id` | Video from a participant is now being received | -| `on_video_track_unsubscribed` | `participant_id` | Video from a participant is no longer being received | -| `on_data_received` | `data`, `participant_id` | Custom data/event received from a participant | -| `on_stream_custom_event` | `event` | Custom event from a client watching the call | +| Event | Arguments | Description | +|-------------------------------|------------------|------------------------------------------------------| +| `on_connected` | — | Bot has connected to the call | +| `on_disconnected` | — | Bot has disconnected from the call | +| `on_before_disconnect` | — | Called before the bot disconnects | +| `on_participant_connected` | `participant_id` | A participant has joined the call | +| `on_participant_disconnected` | `participant_id` | A participant has left the call | +| `on_participant_left` | `participant_id` | A participant has left the call | +| `on_first_participant_joined` | `participant_id` | The first participant has joined the call | +| `on_audio_track_subscribed` | `participant_id` | Audio from a participant is now being received | +| `on_audio_track_unsubscribed` | `participant_id` | Audio from a participant is no longer being received | +| `on_video_track_subscribed` | `participant_id` | Video from a participant is now being received | +| `on_video_track_unsubscribed` | `participant_id` | Video from a participant is no longer being received | +| `on_stream_custom_event` | `event` | Custom event from a client watching the call | ## Running the Example diff --git a/src/pipecat_getstream/transport.py b/src/pipecat_getstream/transport.py index f274f2c..72d67ec 100644 --- a/src/pipecat_getstream/transport.py +++ b/src/pipecat_getstream/transport.py @@ -8,10 +8,9 @@ import asyncio import time import warnings -from dataclasses import dataclass from fractions import Fraction -from typing import Any, Callable, Coroutine, Dict, List, Optional - +from typing import Callable, Coroutine, Dict, List, Optional +from pipecat.processors.frame_processor import FrameProcessor import av import numpy as np from aiortc import MediaStreamTrack @@ -29,12 +28,11 @@ AudioRawFrame, CancelFrame, EndFrame, + EndTaskFrame, Frame, InterruptionFrame, OutputAudioRawFrame, OutputImageRawFrame, - OutputTransportMessageFrame, - OutputTransportMessageUrgentFrame, StartFrame, UserAudioRawFrame, UserImageRawFrame, @@ -59,28 +57,6 @@ ) -@dataclass -class GetstreamOutputTransportMessageFrame(OutputTransportMessageFrame): - """Frame for transport messages in Stream Video calls. - - Parameters: - participant_id: Optional ID of the participant this message is for/from. - """ - - participant_id: Optional[str] = None - - -@dataclass -class GetstreamOutputTransportMessageUrgentFrame(OutputTransportMessageUrgentFrame): - """Frame for urgent transport messages in Stream Video calls. - - Parameters: - participant_id: Optional ID of the participant this message is for/from. - """ - - participant_id: Optional[str] = None - - class GetstreamParams(TransportParams): """Configuration parameters for Stream Video transport. @@ -103,8 +79,8 @@ class GetstreamCallbacks(BaseModel): on_audio_track_unsubscribed: Called when an audio track is unsubscribed. on_video_track_subscribed: Called when a video track is subscribed. on_video_track_unsubscribed: Called when a video track is unsubscribed. - on_data_received: Called when data is received from a participant. on_first_participant_joined: Called when the first participant joins. + on_call_ended: Called when Stream reports that the call has ended. """ on_connected: Callable[[], Coroutine[None, None, None]] @@ -116,9 +92,9 @@ class GetstreamCallbacks(BaseModel): on_audio_track_unsubscribed: Callable[[str], Coroutine[None, None, None]] on_video_track_subscribed: Callable[[str], Coroutine[None, None, None]] on_video_track_unsubscribed: Callable[[str], Coroutine[None, None, None]] - on_data_received: Callable[[bytes, str], Coroutine[None, None, None]] on_first_participant_joined: Callable[[str], Coroutine[None, None, None]] on_custom_event: Callable[[dict], Coroutine[None, None, None]] + on_call_ended: Callable[[], Coroutine[None, None, None]] class PipecatVideoStreamTrack(MediaStreamTrack): @@ -851,8 +827,8 @@ def _on_call_ended(self, *args): logger.info("Stream Video call ended") if self._connected: self._create_task( - self.disconnect(), - f"{self}::disconnect_on_call_ended", + self._callbacks.on_call_ended(), + f"{self}::on_call_ended", ) def _create_task(self, coroutine: Coroutine, name: str) -> asyncio.Task: @@ -965,18 +941,6 @@ async def cleanup(self): await super().cleanup() await self._transport.cleanup() - async def push_app_message(self, message: Any, sender: str): - """Push an application message as an urgent transport frame. - - Args: - message: The message data to send. - sender: ID of the message sender. - """ - frame = GetstreamOutputTransportMessageUrgentFrame( - message=message, participant_id=sender - ) - await self.push_frame(frame) - async def _audio_in_task_handler(self): """Handle incoming audio frames from participants.""" logger.info("Stream Video audio input task started") @@ -1261,9 +1225,9 @@ def __init__( on_audio_track_unsubscribed=self._on_audio_track_unsubscribed, on_video_track_subscribed=self._on_video_track_subscribed, on_video_track_unsubscribed=self._on_video_track_unsubscribed, - on_data_received=self._on_data_received, on_first_participant_joined=self._on_first_participant_joined, on_custom_event=self._on_custom_event, + on_call_ended=self._on_call_ended, ) self._params = params or GetstreamParams() @@ -1288,7 +1252,6 @@ def __init__( self._register_event_handler("on_audio_track_unsubscribed") self._register_event_handler("on_video_track_subscribed") self._register_event_handler("on_video_track_unsubscribed") - self._register_event_handler("on_data_received") self._register_event_handler("on_first_participant_joined") self._register_event_handler("on_participant_left") self._register_event_handler("on_before_disconnect", sync=True) @@ -1367,6 +1330,31 @@ async def _on_before_disconnect(self): """Handle before disconnection events.""" await self._call_event_handler("on_before_disconnect") + async def _on_call_ended(self): + """Handle call ended events. + + If the transport is attached to a running Pipecat pipeline, request a + graceful shutdown via `EndTaskFrame`. + Otherwise, fall back to a direct + disconnect so the Stream connection still gets cleaned up. + """ + shutdown_target: FrameProcessor | None = None + if self._input and self._input.previous: + shutdown_target = self._input + elif self._output and self._output.previous: + shutdown_target = self._output + + if shutdown_target: + await shutdown_target.push_frame( + EndTaskFrame(reason="Stream Video call ended"), + FrameDirection.UPSTREAM, + ) + else: + logger.debug( + "Stream Video call ended without an attached pipeline, disconnecting directly" + ) + await self._client.disconnect() + async def _on_participant_joined(self, participant_id: str): """Handle participant joined events.""" await self._call_event_handler("on_participant_connected", participant_id) @@ -1392,12 +1380,6 @@ async def _on_video_track_unsubscribed(self, participant_id: str): """Handle video track unsubscribed events.""" await self._call_event_handler("on_video_track_unsubscribed", participant_id) - async def _on_data_received(self, data: bytes, participant_id: str): - """Handle data received events.""" - if self._input: - await self._input.push_app_message(data.decode(), participant_id) - await self._call_event_handler("on_data_received", data, participant_id) - async def _on_first_participant_joined(self, participant_id: str): """Handle first participant joined events.""" await self._call_event_handler("on_first_participant_joined", participant_id) diff --git a/src/tests/conftest.py b/src/tests/conftest.py index 642a7a0..89a1a22 100644 --- a/src/tests/conftest.py +++ b/src/tests/conftest.py @@ -27,8 +27,8 @@ def _factory() -> "GetstreamCallbacks": on_video_track_subscribed=AsyncMock(), on_video_track_unsubscribed=AsyncMock(), on_custom_event=AsyncMock(), - on_data_received=AsyncMock(), on_first_participant_joined=AsyncMock(), + on_call_ended=AsyncMock(), ) return _factory