From 45c314e1276612dc50dbbed75a81ffbe7a26f66f Mon Sep 17 00:00:00 2001 From: Daniil Gusev Date: Thu, 23 Apr 2026 14:26:07 +0200 Subject: [PATCH 1/3] Remove unused message frames and "on_data_received" callback The callback wasn't wired properly anyway --- README.md | 29 +++++++++--------- src/pipecat_getstream/transport.py | 47 ------------------------------ 2 files changed, 14 insertions(+), 62 deletions(-) diff --git a/README.md b/README.md index 9ea48b2..f7ed764 100644 --- a/README.md +++ b/README.md @@ -158,21 +158,20 @@ async def on_first_participant_joined(transport, payload): ### Available Events -| Event | Arguments | Description | -|-------------------------------|--------------------------|------------------------------------------------------| -| `on_connected` | — | Bot has connected to the call | -| `on_disconnected` | — | Bot has disconnected from the call | -| `on_before_disconnect` | — | Called before the bot disconnects | -| `on_participant_connected` | `participant_id` | A participant has joined the call | -| `on_participant_disconnected` | `participant_id` | A participant has left the call | -| `on_participant_left` | `participant_id` | A participant has left the call | -| `on_first_participant_joined` | `participant_id` | The first participant has joined the call | -| `on_audio_track_subscribed` | `participant_id` | Audio from a participant is now being received | -| `on_audio_track_unsubscribed` | `participant_id` | Audio from a participant is no longer being received | -| `on_video_track_subscribed` | `participant_id` | Video from a participant is now being received | -| `on_video_track_unsubscribed` | `participant_id` | Video from a participant is no longer being received | -| `on_data_received` | `data`, `participant_id` | Custom data/event received from a participant | -| `on_stream_custom_event` | `event` | Custom event from a client watching the call | +| Event | Arguments | Description | +|-------------------------------|------------------|------------------------------------------------------| +| `on_connected` | — | Bot has connected to the call | +| `on_disconnected` | — | Bot has disconnected from the call | +| `on_before_disconnect` | — | Called before the bot disconnects | +| `on_participant_connected` | `participant_id` | A participant has joined the call | +| `on_participant_disconnected` | `participant_id` | A participant has left the call | +| `on_participant_left` | `participant_id` | A participant has left the call | +| `on_first_participant_joined` | `participant_id` | The first participant has joined the call | +| `on_audio_track_subscribed` | `participant_id` | Audio from a participant is now being received | +| `on_audio_track_unsubscribed` | `participant_id` | Audio from a participant is no longer being received | +| `on_video_track_subscribed` | `participant_id` | Video from a participant is now being received | +| `on_video_track_unsubscribed` | `participant_id` | Video from a participant is no longer being received | +| `on_stream_custom_event` | `event` | Custom event from a client watching the call | ## Running the Example diff --git a/src/pipecat_getstream/transport.py b/src/pipecat_getstream/transport.py index f274f2c..34c7a85 100644 --- a/src/pipecat_getstream/transport.py +++ b/src/pipecat_getstream/transport.py @@ -8,7 +8,6 @@ import asyncio import time import warnings -from dataclasses import dataclass from fractions import Fraction from typing import Any, Callable, Coroutine, Dict, List, Optional @@ -33,8 +32,6 @@ InterruptionFrame, OutputAudioRawFrame, OutputImageRawFrame, - OutputTransportMessageFrame, - OutputTransportMessageUrgentFrame, StartFrame, UserAudioRawFrame, UserImageRawFrame, @@ -59,28 +56,6 @@ ) -@dataclass -class GetstreamOutputTransportMessageFrame(OutputTransportMessageFrame): - """Frame for transport messages in Stream Video calls. - - Parameters: - participant_id: Optional ID of the participant this message is for/from. - """ - - participant_id: Optional[str] = None - - -@dataclass -class GetstreamOutputTransportMessageUrgentFrame(OutputTransportMessageUrgentFrame): - """Frame for urgent transport messages in Stream Video calls. - - Parameters: - participant_id: Optional ID of the participant this message is for/from. - """ - - participant_id: Optional[str] = None - - class GetstreamParams(TransportParams): """Configuration parameters for Stream Video transport. @@ -103,7 +78,6 @@ class GetstreamCallbacks(BaseModel): on_audio_track_unsubscribed: Called when an audio track is unsubscribed. on_video_track_subscribed: Called when a video track is subscribed. on_video_track_unsubscribed: Called when a video track is unsubscribed. - on_data_received: Called when data is received from a participant. on_first_participant_joined: Called when the first participant joins. """ @@ -116,7 +90,6 @@ class GetstreamCallbacks(BaseModel): on_audio_track_unsubscribed: Callable[[str], Coroutine[None, None, None]] on_video_track_subscribed: Callable[[str], Coroutine[None, None, None]] on_video_track_unsubscribed: Callable[[str], Coroutine[None, None, None]] - on_data_received: Callable[[bytes, str], Coroutine[None, None, None]] on_first_participant_joined: Callable[[str], Coroutine[None, None, None]] on_custom_event: Callable[[dict], Coroutine[None, None, None]] @@ -965,18 +938,6 @@ async def cleanup(self): await super().cleanup() await self._transport.cleanup() - async def push_app_message(self, message: Any, sender: str): - """Push an application message as an urgent transport frame. - - Args: - message: The message data to send. - sender: ID of the message sender. - """ - frame = GetstreamOutputTransportMessageUrgentFrame( - message=message, participant_id=sender - ) - await self.push_frame(frame) - async def _audio_in_task_handler(self): """Handle incoming audio frames from participants.""" logger.info("Stream Video audio input task started") @@ -1261,7 +1222,6 @@ def __init__( on_audio_track_unsubscribed=self._on_audio_track_unsubscribed, on_video_track_subscribed=self._on_video_track_subscribed, on_video_track_unsubscribed=self._on_video_track_unsubscribed, - on_data_received=self._on_data_received, on_first_participant_joined=self._on_first_participant_joined, on_custom_event=self._on_custom_event, ) @@ -1288,7 +1248,6 @@ def __init__( self._register_event_handler("on_audio_track_unsubscribed") self._register_event_handler("on_video_track_subscribed") self._register_event_handler("on_video_track_unsubscribed") - self._register_event_handler("on_data_received") self._register_event_handler("on_first_participant_joined") self._register_event_handler("on_participant_left") self._register_event_handler("on_before_disconnect", sync=True) @@ -1392,12 +1351,6 @@ async def _on_video_track_unsubscribed(self, participant_id: str): """Handle video track unsubscribed events.""" await self._call_event_handler("on_video_track_unsubscribed", participant_id) - async def _on_data_received(self, data: bytes, participant_id: str): - """Handle data received events.""" - if self._input: - await self._input.push_app_message(data.decode(), participant_id) - await self._call_event_handler("on_data_received", data, participant_id) - async def _on_first_participant_joined(self, participant_id: str): """Handle first participant joined events.""" await self._call_event_handler("on_first_participant_joined", participant_id) From b64054ca2a8d19b0fc8e95e03f2f19139c9cad89 Mon Sep 17 00:00:00 2001 From: Daniil Gusev Date: Thu, 23 Apr 2026 14:28:55 +0200 Subject: [PATCH 2/3] Stop the pipeline gracefully on call_ended event --- src/pipecat_getstream/transport.py | 34 +++++++++++++++++++++++++++--- src/tests/conftest.py | 2 +- 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/src/pipecat_getstream/transport.py b/src/pipecat_getstream/transport.py index 34c7a85..91c4db5 100644 --- a/src/pipecat_getstream/transport.py +++ b/src/pipecat_getstream/transport.py @@ -9,7 +9,7 @@ import time import warnings from fractions import Fraction -from typing import Any, Callable, Coroutine, Dict, List, Optional +from typing import Callable, Coroutine, Dict, List, Optional import av import numpy as np @@ -28,6 +28,7 @@ AudioRawFrame, CancelFrame, EndFrame, + EndTaskFrame, Frame, InterruptionFrame, OutputAudioRawFrame, @@ -79,6 +80,7 @@ class GetstreamCallbacks(BaseModel): on_video_track_subscribed: Called when a video track is subscribed. on_video_track_unsubscribed: Called when a video track is unsubscribed. on_first_participant_joined: Called when the first participant joins. + on_call_ended: Called when Stream reports that the call has ended. """ on_connected: Callable[[], Coroutine[None, None, None]] @@ -92,6 +94,7 @@ class GetstreamCallbacks(BaseModel): on_video_track_unsubscribed: Callable[[str], Coroutine[None, None, None]] on_first_participant_joined: Callable[[str], Coroutine[None, None, None]] on_custom_event: Callable[[dict], Coroutine[None, None, None]] + on_call_ended: Callable[[], Coroutine[None, None, None]] class PipecatVideoStreamTrack(MediaStreamTrack): @@ -824,8 +827,8 @@ def _on_call_ended(self, *args): logger.info("Stream Video call ended") if self._connected: self._create_task( - self.disconnect(), - f"{self}::disconnect_on_call_ended", + self._callbacks.on_call_ended(), + f"{self}::on_call_ended", ) def _create_task(self, coroutine: Coroutine, name: str) -> asyncio.Task: @@ -1224,6 +1227,7 @@ def __init__( on_video_track_unsubscribed=self._on_video_track_unsubscribed, on_first_participant_joined=self._on_first_participant_joined, on_custom_event=self._on_custom_event, + on_call_ended=self._on_call_ended, ) self._params = params or GetstreamParams() @@ -1326,6 +1330,30 @@ async def _on_before_disconnect(self): """Handle before disconnection events.""" await self._call_event_handler("on_before_disconnect") + async def _on_call_ended(self): + """Handle call ended events. + + If the transport is attached to a running Pipecat pipeline, request a + graceful shutdown via `EndTaskFrame`. Otherwise fall back to a direct + disconnect so the Stream connection still gets cleaned up. + """ + shutdown_target = None + if self._input and self._input.previous: + shutdown_target = self._input + elif self._output and self._output.previous: + shutdown_target = self._output + + if shutdown_target: + await shutdown_target.push_frame( + EndTaskFrame(reason="Stream Video call ended"), + FrameDirection.UPSTREAM, + ) + else: + logger.debug( + "Stream Video call ended without an attached pipeline, disconnecting directly" + ) + await self._client.disconnect() + async def _on_participant_joined(self, participant_id: str): """Handle participant joined events.""" await self._call_event_handler("on_participant_connected", participant_id) diff --git a/src/tests/conftest.py b/src/tests/conftest.py index 642a7a0..89a1a22 100644 --- a/src/tests/conftest.py +++ b/src/tests/conftest.py @@ -27,8 +27,8 @@ def _factory() -> "GetstreamCallbacks": on_video_track_subscribed=AsyncMock(), on_video_track_unsubscribed=AsyncMock(), on_custom_event=AsyncMock(), - on_data_received=AsyncMock(), on_first_participant_joined=AsyncMock(), + on_call_ended=AsyncMock(), ) return _factory From a40dac15bb81179adcf94919c6e5e003d911d1dc Mon Sep 17 00:00:00 2001 From: Daniil Gusev Date: Thu, 23 Apr 2026 14:32:51 +0200 Subject: [PATCH 3/3] Fix ruff --- src/pipecat_getstream/transport.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/pipecat_getstream/transport.py b/src/pipecat_getstream/transport.py index 91c4db5..72d67ec 100644 --- a/src/pipecat_getstream/transport.py +++ b/src/pipecat_getstream/transport.py @@ -10,7 +10,7 @@ import warnings from fractions import Fraction from typing import Callable, Coroutine, Dict, List, Optional - +from pipecat.processors.frame_processor import FrameProcessor import av import numpy as np from aiortc import MediaStreamTrack @@ -1334,10 +1334,11 @@ async def _on_call_ended(self): """Handle call ended events. If the transport is attached to a running Pipecat pipeline, request a - graceful shutdown via `EndTaskFrame`. Otherwise fall back to a direct + graceful shutdown via `EndTaskFrame`. + Otherwise, fall back to a direct disconnect so the Stream connection still gets cleaned up. """ - shutdown_target = None + shutdown_target: FrameProcessor | None = None if self._input and self._input.previous: shutdown_target = self._input elif self._output and self._output.previous: