Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ classifiers = [
]

requires-python = ">=3.10"
dependencies = ["getstream[webrtc,telemetry]>=3.3.0,<4", "pipecat-ai>=0.0.108"]
dependencies = ["getstream[webrtc,telemetry]>=3.4.0,<4", "pipecat-ai>=0.0.108"]

[project.urls]
Homepage = "https://github.com/GetStream/pipecat-getstream"
Expand Down
58 changes: 38 additions & 20 deletions src/pipecat_getstream/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,28 +201,36 @@ class GetstreamTransportClient:
def __init__(
self,
api_key: str,
api_secret: str,
call_type: str,
call_id: str,
user_id: str,
params: GetstreamParams,
callbacks: GetstreamCallbacks,
transport_name: str,
api_secret: Optional[str] = None,
token: Optional[str] = None,
):
"""Initialize the Stream Video transport client.

Args:
api_key: Stream Video API key.
api_secret: Stream Video API secret.
call_type: The Stream call type (e.g. "default").
call_id: Unique call identifier.
user_id: The bot/agent user ID.
params: Configuration parameters for the transport.
callbacks: Event callback handlers.
transport_name: Name identifier for the transport.
api_secret: Stream Video API secret. Mutually exclusive with token.
token: Pre-minted user JWT for the bot. Mutually exclusive with api_secret.
"""
if api_secret and token:
raise ValueError("Pass either api_secret or token, not both")
if not api_secret and not token:
raise ValueError("Either api_secret or token is required")

self._api_key = api_key
self._api_secret = api_secret
self._token = token
self._call_type = call_type
self._call_id = call_id
self._user_id = user_id
Expand Down Expand Up @@ -277,15 +285,22 @@ async def setup(self, setup: FrameProcessorSetup):
return

self._task_manager = setup.task_manager
self._client = AsyncStream(api_key=self._api_key, api_secret=self._api_secret)

# Ensure the bot user exists
try:
await self._client.upsert_users(
UserRequest(id=self._user_id, name=self._user_id)
if self._token:
# The client is created from a scoped token.
# The user associated with this token already exists.
self._client = AsyncStream(api_key=self._api_key, token=self._token)
else:
# The client is created from api secret and typically has more permissions.
# Ensure the agent user exists in Stream.
self._client = AsyncStream(
api_key=self._api_key, api_secret=self._api_secret
)
except Exception as exc:
logger.warning(f"Could not create user {self._user_id}: {exc}")
try:
await self._client.upsert_users(
UserRequest(id=self._user_id, name=self._user_id)
)
except Exception as exc:
logger.warning(f"Could not create user {self._user_id}: {exc}")

async def cleanup(self):
"""Clean up client resources."""
Expand Down Expand Up @@ -1193,25 +1208,27 @@ class GetstreamTransport(BaseTransport):
def __init__(
self,
api_key: str,
api_secret: str,
call_type: str,
call_id: str,
user_id: str,
params: Optional[GetstreamParams] = None,
input_name: Optional[str] = None,
output_name: Optional[str] = None,
api_secret: Optional[str] = None,
token: Optional[str] = None,
):
"""Initialize the Stream Video transport.

Args:
api_key: Stream Video API key.
api_secret: Stream Video API secret.
call_type: The Stream call type (e.g. "default").
call_id: Unique call identifier.
user_id: The bot/agent user ID.
params: Configuration parameters for the transport.
input_name: Optional name for the input transport.
output_name: Optional name for the output transport.
api_secret: Stream Video API secret. Mutually exclusive with token.
token: Pre-minted user JWT for the bot. Mutually exclusive with api_secret.
"""
super().__init__(input_name=input_name, output_name=output_name)

Expand All @@ -1232,14 +1249,15 @@ def __init__(
self._params = params or GetstreamParams()

self._client = GetstreamTransportClient(
api_key,
api_secret,
call_type,
call_id,
user_id,
self._params,
callbacks,
self.name,
api_key=api_key,
call_type=call_type,
call_id=call_id,
user_id=user_id,
params=self._params,
callbacks=callbacks,
transport_name=self.name,
api_secret=api_secret,
token=token,
)
self._input: Optional[GetstreamInputTransport] = None
self._output: Optional[GetstreamOutputTransport] = None
Expand Down
98 changes: 98 additions & 0 deletions src/tests/test_getstream_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import numpy as np
import pytest
from dotenv import load_dotenv
from getstream import AsyncStream
from getstream.models import UserRequest
from getstream.video.rtc.pb.stream.video.sfu.models.models_pb2 import TrackType
from pipecat.clocks.system_clock import SystemClock
from pipecat.frames.frames import CancelFrame, StartFrame
Expand Down Expand Up @@ -99,6 +101,48 @@ async def test_full_participant_session(
assert not client._other_participant_has_joined


class TestGetstreamTransport:
"""Constructor validation for credential modes (api_secret vs token)."""

def test_construct_with_api_secret(self):
GetstreamTransport(
api_key="k",
api_secret="s",
call_type="default",
call_id="c",
user_id="u",
)

def test_construct_with_token(self):
GetstreamTransport(
api_key="k",
token="jwt",
call_type="default",
call_id="c",
user_id="u",
)

def test_rejects_both_api_secret_and_token(self):
with pytest.raises(ValueError, match="Pass either api_secret or token"):
GetstreamTransport(
api_key="k",
api_secret="s",
token="jwt",
call_type="default",
call_id="c",
user_id="u",
)

def test_rejects_neither_api_secret_nor_token(self):
with pytest.raises(ValueError, match="Either api_secret or token is required"):
GetstreamTransport(
api_key="k",
call_type="default",
call_id="c",
user_id="u",
)


@pytest.mark.skipif(
not GETSTREAM_INTEGRATION_AVAILABLE,
reason="Requires STREAM_API_KEY and STREAM_API_SECRET env vars and getstream[webrtc]",
Expand Down Expand Up @@ -347,3 +391,57 @@ async def send_until_received():
finally:
for input_t in inputs:
await input_t.cancel(CancelFrame())

async def test_join_with_preminted_token(self):
"""A transport joins a call using a pre-minted user token (no api_secret).

An admin AsyncStream client (api_key+api_secret) creates the call and
mints a JWT for the bot. GetstreamTransport then joins using only that
token + api_key, exercising the token-only auth path end-to-end.
"""

call_id = f"integration-test-token-{uuid.uuid4().hex[:8]}"
bot_user_id = f"bot-{uuid.uuid4().hex[:6]}"

admin = AsyncStream(api_key=STREAM_API_KEY, api_secret=STREAM_API_SECRET)
await admin.upsert_users(UserRequest(id=bot_user_id, name="Bot"))
await admin.video.call("default", call_id).get_or_create(
data={"created_by_id": bot_user_id}
)
bot_token = admin.create_token(user_id=bot_user_id, expiration=300)

params = GetstreamParams(
audio_in_enabled=False,
audio_out_enabled=False,
video_in_enabled=False,
video_out_enabled=False,
)
transport = GetstreamTransport(
api_key=STREAM_API_KEY,
token=bot_token,
call_type="default",
call_id=call_id,
user_id=bot_user_id,
params=params,
)

connected = asyncio.Event()

@transport.event_handler("on_connected")
async def on_connected(*_):
connected.set()

task_manager = TaskManager()
task_manager.setup(TaskManagerParams(loop=asyncio.get_running_loop()))
frame_setup = FrameProcessorSetup(
clock=SystemClock(), task_manager=task_manager
)

input_t = transport.input()
await input_t.setup(frame_setup)
await input_t.start(StartFrame())

try:
await asyncio.wait_for(connected.wait(), timeout=15)
finally:
await input_t.cancel(CancelFrame())
Loading
Loading