Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 14 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,21 +158,20 @@ async def on_first_participant_joined(transport, payload):

### Available Events

| Event | Arguments | Description |
|-------------------------------|--------------------------|------------------------------------------------------|
| `on_connected` | — | Bot has connected to the call |
| `on_disconnected` | — | Bot has disconnected from the call |
| `on_before_disconnect` | — | Called before the bot disconnects |
| `on_participant_connected` | `participant_id` | A participant has joined the call |
| `on_participant_disconnected` | `participant_id` | A participant has left the call |
| `on_participant_left` | `participant_id` | A participant has left the call |
| `on_first_participant_joined` | `participant_id` | The first participant has joined the call |
| `on_audio_track_subscribed` | `participant_id` | Audio from a participant is now being received |
| `on_audio_track_unsubscribed` | `participant_id` | Audio from a participant is no longer being received |
| `on_video_track_subscribed` | `participant_id` | Video from a participant is now being received |
| `on_video_track_unsubscribed` | `participant_id` | Video from a participant is no longer being received |
| `on_data_received` | `data`, `participant_id` | Custom data/event received from a participant |
| `on_stream_custom_event` | `event` | Custom event from a client watching the call |
| Event | Arguments | Description |
|-------------------------------|------------------|------------------------------------------------------|
| `on_connected` | — | Bot has connected to the call |
| `on_disconnected` | — | Bot has disconnected from the call |
| `on_before_disconnect` | — | Called before the bot disconnects |
| `on_participant_connected` | `participant_id` | A participant has joined the call |
| `on_participant_disconnected` | `participant_id` | A participant has left the call |
| `on_participant_left` | `participant_id` | A participant has left the call |
| `on_first_participant_joined` | `participant_id` | The first participant has joined the call |
| `on_audio_track_subscribed` | `participant_id` | Audio from a participant is now being received |
| `on_audio_track_unsubscribed` | `participant_id` | Audio from a participant is no longer being received |
| `on_video_track_subscribed` | `participant_id` | Video from a participant is now being received |
| `on_video_track_unsubscribed` | `participant_id` | Video from a participant is no longer being received |
| `on_stream_custom_event` | `event` | Custom event from a client watching the call |

## Running the Example

Expand Down
84 changes: 33 additions & 51 deletions src/pipecat_getstream/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@
import asyncio
import time
import warnings
from dataclasses import dataclass
from fractions import Fraction
from typing import Any, Callable, Coroutine, Dict, List, Optional

from typing import Callable, Coroutine, Dict, List, Optional
from pipecat.processors.frame_processor import FrameProcessor
import av
import numpy as np
from aiortc import MediaStreamTrack
Expand All @@ -29,12 +28,11 @@
AudioRawFrame,
CancelFrame,
EndFrame,
EndTaskFrame,
Frame,
InterruptionFrame,
OutputAudioRawFrame,
OutputImageRawFrame,
OutputTransportMessageFrame,
OutputTransportMessageUrgentFrame,
StartFrame,
UserAudioRawFrame,
UserImageRawFrame,
Expand All @@ -59,28 +57,6 @@
)


@dataclass
class GetstreamOutputTransportMessageFrame(OutputTransportMessageFrame):
"""Frame for transport messages in Stream Video calls.

Parameters:
participant_id: Optional ID of the participant this message is for/from.
"""

participant_id: Optional[str] = None


@dataclass
class GetstreamOutputTransportMessageUrgentFrame(OutputTransportMessageUrgentFrame):
"""Frame for urgent transport messages in Stream Video calls.

Parameters:
participant_id: Optional ID of the participant this message is for/from.
"""

participant_id: Optional[str] = None


class GetstreamParams(TransportParams):
"""Configuration parameters for Stream Video transport.

Expand All @@ -103,8 +79,8 @@ class GetstreamCallbacks(BaseModel):
on_audio_track_unsubscribed: Called when an audio track is unsubscribed.
on_video_track_subscribed: Called when a video track is subscribed.
on_video_track_unsubscribed: Called when a video track is unsubscribed.
on_data_received: Called when data is received from a participant.
on_first_participant_joined: Called when the first participant joins.
on_call_ended: Called when Stream reports that the call has ended.
"""

on_connected: Callable[[], Coroutine[None, None, None]]
Expand All @@ -116,9 +92,9 @@ class GetstreamCallbacks(BaseModel):
on_audio_track_unsubscribed: Callable[[str], Coroutine[None, None, None]]
on_video_track_subscribed: Callable[[str], Coroutine[None, None, None]]
on_video_track_unsubscribed: Callable[[str], Coroutine[None, None, None]]
on_data_received: Callable[[bytes, str], Coroutine[None, None, None]]
on_first_participant_joined: Callable[[str], Coroutine[None, None, None]]
on_custom_event: Callable[[dict], Coroutine[None, None, None]]
on_call_ended: Callable[[], Coroutine[None, None, None]]


class PipecatVideoStreamTrack(MediaStreamTrack):
Expand Down Expand Up @@ -851,8 +827,8 @@ def _on_call_ended(self, *args):
logger.info("Stream Video call ended")
if self._connected:
self._create_task(
self.disconnect(),
f"{self}::disconnect_on_call_ended",
self._callbacks.on_call_ended(),
f"{self}::on_call_ended",
)

def _create_task(self, coroutine: Coroutine, name: str) -> asyncio.Task:
Expand Down Expand Up @@ -965,18 +941,6 @@ async def cleanup(self):
await super().cleanup()
await self._transport.cleanup()

async def push_app_message(self, message: Any, sender: str):
"""Push an application message as an urgent transport frame.

Args:
message: The message data to send.
sender: ID of the message sender.
"""
frame = GetstreamOutputTransportMessageUrgentFrame(
message=message, participant_id=sender
)
await self.push_frame(frame)

async def _audio_in_task_handler(self):
"""Handle incoming audio frames from participants."""
logger.info("Stream Video audio input task started")
Expand Down Expand Up @@ -1261,9 +1225,9 @@ def __init__(
on_audio_track_unsubscribed=self._on_audio_track_unsubscribed,
on_video_track_subscribed=self._on_video_track_subscribed,
on_video_track_unsubscribed=self._on_video_track_unsubscribed,
on_data_received=self._on_data_received,
on_first_participant_joined=self._on_first_participant_joined,
on_custom_event=self._on_custom_event,
on_call_ended=self._on_call_ended,
)
self._params = params or GetstreamParams()

Expand All @@ -1288,7 +1252,6 @@ def __init__(
self._register_event_handler("on_audio_track_unsubscribed")
self._register_event_handler("on_video_track_subscribed")
self._register_event_handler("on_video_track_unsubscribed")
self._register_event_handler("on_data_received")
self._register_event_handler("on_first_participant_joined")
self._register_event_handler("on_participant_left")
self._register_event_handler("on_before_disconnect", sync=True)
Expand Down Expand Up @@ -1367,6 +1330,31 @@ async def _on_before_disconnect(self):
"""Handle before disconnection events."""
await self._call_event_handler("on_before_disconnect")

async def _on_call_ended(self):
"""Handle call ended events.

If the transport is attached to a running Pipecat pipeline, request a
graceful shutdown via `EndTaskFrame`.
Otherwise, fall back to a direct
disconnect so the Stream connection still gets cleaned up.
"""
shutdown_target: FrameProcessor | None = None
if self._input and self._input.previous:
shutdown_target = self._input
elif self._output and self._output.previous:
shutdown_target = self._output

if shutdown_target:
await shutdown_target.push_frame(
EndTaskFrame(reason="Stream Video call ended"),
FrameDirection.UPSTREAM,
)
else:
logger.debug(
"Stream Video call ended without an attached pipeline, disconnecting directly"
)
await self._client.disconnect()

async def _on_participant_joined(self, participant_id: str):
"""Handle participant joined events."""
await self._call_event_handler("on_participant_connected", participant_id)
Expand All @@ -1392,12 +1380,6 @@ async def _on_video_track_unsubscribed(self, participant_id: str):
"""Handle video track unsubscribed events."""
await self._call_event_handler("on_video_track_unsubscribed", participant_id)

async def _on_data_received(self, data: bytes, participant_id: str):
"""Handle data received events."""
if self._input:
await self._input.push_app_message(data.decode(), participant_id)
await self._call_event_handler("on_data_received", data, participant_id)

async def _on_first_participant_joined(self, participant_id: str):
"""Handle first participant joined events."""
await self._call_event_handler("on_first_participant_joined", participant_id)
Expand Down
2 changes: 1 addition & 1 deletion src/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ def _factory() -> "GetstreamCallbacks":
on_video_track_subscribed=AsyncMock(),
on_video_track_unsubscribed=AsyncMock(),
on_custom_event=AsyncMock(),
on_data_received=AsyncMock(),
on_first_participant_joined=AsyncMock(),
on_call_ended=AsyncMock(),
)

return _factory
Expand Down