Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 STTCapabilities.streaming is hardcoded to True even for pulse-pro which doesn't support streaming

When model="pulse-pro" is passed to STT.__init__(), the capabilities are still constructed with streaming=True (line 148). However, stream() raises ValueError for pulse-pro (lines 245-248). The agent framework checks capabilities.streaming at livekit-agents/livekit/agents/voice/agent.py:423 to decide whether to call stream() directly or wrap with a StreamAdapter. Because streaming=True, the framework will skip the StreamAdapter wrapping and call stream() directly at agent.py:433, which crashes with ValueError("pulse-pro does not support streaming..."). The streaming capability should be conditional on the model.

(Refers to line 148)

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a legit comment?

Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@

NUM_CHANNELS = 1
# Base URL for the Smallest AI API.
# Streaming: wss://api.smallest.ai/waves/v1/{model}/get_text
# Batch: https://api.smallest.ai/waves/v1/{model}/get_text
# Streaming: wss://api.smallest.ai/waves/v1/stt/live?model={model}
# Batch: https://api.smallest.ai/waves/v1/stt/?model={model}
SMALLEST_STT_BASE_URL = "https://api.smallest.ai/waves/v1"

# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -111,24 +111,33 @@ def __init__(
http_session: aiohttp.ClientSession | None = None,
base_url: str = SMALLEST_STT_BASE_URL,
) -> None:
"""Create a new instance of Smallest AI Pulse STT.
"""Create a new instance of Smallest AI STT.

Args:
model: STT model to use. Currently only "pulse" is available.
model: STT model to use. ``"pulse"`` supports streaming and batch
transcription across 38 languages. ``"pulse-pro"`` is a
higher-accuracy English-only model available for batch
(pre-recorded) transcription only — calling ``stream()`` with
``pulse-pro`` raises ``ValueError``.
language: BCP-47 language code (e.g. "en", "hi", "fr"). Use "multi"
for automatic language detection across 39 supported languages.
``pulse-pro`` only supports ``"en"``.
sample_rate: Audio sample rate in Hz. Supported: 8000, 16000, 22050,
24000, 44100, 48000. Defaults to 16000.
encoding: PCM encoding of the audio stream. Use "linear16" for raw
16-bit PCM (the default and most compatible choice for streaming).
word_timestamps: Include per-word start/end timestamps and confidence
scores in transcripts. Defaults to True.
scores in transcripts. Supported by both ``pulse`` and
``pulse-pro``. Defaults to True.
diarize: Enable speaker diarization. When True, each word includes a
speaker ID (integer during streaming). Defaults to False.
speaker ID (integer during streaming, string label in batch).
Defaults to False.
eou_timeout_ms: Milliseconds of silence before the server considers an
utterance complete and emits a final transcript. Set to 0 to disable
server-side end-of-utterance detection, which is recommended when using
LiveKit's built-in turn detection to minimise latency. Defaults to 0.
utterance complete and emits a final transcript. Defaults to 0 (disabled).
The server's native default when this parameter is omitted is 800ms —
the plugin sends 0 explicitly to disable it, since LiveKit's VAD handles
turn detection. Set to a value between 100 and 10000 to enable
server-side end-of-utterance detection alongside LiveKit's own logic.
api_key: Smallest AI API key. Falls back to the SMALLEST_API_KEY
environment variable if not provided.
http_session: An existing aiohttp ClientSession to reuse.
Expand Down Expand Up @@ -186,6 +195,7 @@ async def _recognize_impl(
) -> stt.SpeechEvent:
config = self._sanitize_options(language=language)
params: dict[str, Any] = {
"model": config.model,
"language": config.language,
"encoding": config.encoding,
"sample_rate": config.sample_rate,
Expand All @@ -195,7 +205,7 @@ async def _recognize_impl(

try:
async with self._ensure_session().post(
url=f"{config.base_url}/{config.model}/get_text",
url=f"{config.base_url}/stt/",
headers={
"Authorization": f"Bearer {config.api_key}",
"Content-Type": "application/octet-stream",
Expand Down Expand Up @@ -232,6 +242,10 @@ def stream(
language: NotGivenOr[str] = NOT_GIVEN,
conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> SpeechStream:
if self._opts.model == "pulse-pro":
raise ValueError(
"pulse-pro does not support streaming; use recognize() for batch transcription"
)
config = self._sanitize_options(language=language)
stream = SpeechStream(
stt=self,
Expand Down Expand Up @@ -426,21 +440,20 @@ async def recv_task(ws: aiohttp.ClientWebSocketResponse) -> None:

async def _connect_ws(self) -> aiohttp.ClientWebSocketResponse:
params: dict[str, Any] = {
"model": self._opts.model,
"language": self._opts.language,
"encoding": self._opts.encoding,
"sample_rate": self._opts.sample_rate,
"word_timestamps": str(self._opts.word_timestamps).lower(),
"diarize": str(self._opts.diarize).lower(),
}
# Only send eou_timeout_ms when explicitly set (non-zero).
# When 0, omit the parameter and let the server use its default,
# which avoids adding server-side silence latency on top of LiveKit's
# own end-of-turn detection.
if self._opts.eou_timeout_ms > 0:
params["eou_timeout_ms"] = self._opts.eou_timeout_ms
# Always send eou_timeout_ms. Default is 0 (disabled) so LiveKit's own
# VAD controls turn detection. The server's native default when this
# parameter is omitted is 800ms, which would conflict with LiveKit VAD.
params["eou_timeout_ms"] = self._opts.eou_timeout_ms
ws_url = (
self._opts.base_url.replace("https://", "wss://", 1).replace("http://", "ws://", 1)
+ f"/{self._opts.model}/get_text"
+ "/stt/live"
+ f"?{urlencode(params)}"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
)
from livekit.agents.types import DEFAULT_API_CONNECT_OPTIONS, NOT_GIVEN, NotGivenOr
from livekit.agents.utils import is_given
from livekit.agents.voice.io import TimedString

from .models import TTSEncoding, TTSModels
from .version import __version__
Expand All @@ -53,6 +54,7 @@ class _TTSOptions:
speed: float
language: LanguageCode
output_format: TTSEncoding | str
word_timestamps: bool
base_url: str
ws_url: str

Expand All @@ -68,6 +70,7 @@ def __init__(
speed: float = 1.0,
language: str = "en",
output_format: TTSEncoding | str = "pcm",
word_timestamps: bool = False,
base_url: str = SMALLEST_BASE_URL,
ws_url: str = SMALLEST_WS_URL,
http_session: aiohttp.ClientSession | None = None,
Expand All @@ -90,12 +93,19 @@ def __init__(
detection and code-switching. Pro supports "en", "hi", and "auto" only.
output_format: Output format for HTTP synthesize() calls ("pcm", "mp3", "wav",
"ulaw", "alaw"). WebSocket streaming always returns PCM.
word_timestamps: Request per-word timing events from the server and emit them
as timed transcript entries alongside audio. Disabled by default. Supported
on base-queue English + Hindi voices (meher, devansh, kartik, maithili,
liam, avery); other voices silently emit no word events.
base_url: Base URL for the Smallest AI HTTP API.
ws_url: WebSocket URL for low-latency streaming synthesis.
http_session: An existing aiohttp ClientSession to use.
"""
super().__init__(
capabilities=tts.TTSCapabilities(streaming=True),
capabilities=tts.TTSCapabilities(
streaming=True,
aligned_transcript=word_timestamps,
),
sample_rate=sample_rate,
num_channels=NUM_CHANNELS,
)
Expand All @@ -118,6 +128,7 @@ def __init__(
speed=speed,
language=LanguageCode(language),
output_format=output_format,
word_timestamps=word_timestamps,
base_url=base_url,
ws_url=ws_url,
)
Expand Down Expand Up @@ -167,6 +178,7 @@ def update_options(
sample_rate: NotGivenOr[int] = NOT_GIVEN,
language: NotGivenOr[str] = NOT_GIVEN,
output_format: NotGivenOr[TTSEncoding | str] = NOT_GIVEN,
word_timestamps: NotGivenOr[bool] = NOT_GIVEN,
) -> None:
"""Update TTS options."""
if is_given(model):
Expand All @@ -181,6 +193,8 @@ def update_options(
self._opts.language = LanguageCode(language)
if is_given(output_format):
self._opts.output_format = output_format
if is_given(word_timestamps):
self._opts.word_timestamps = word_timestamps

def synthesize(
self,
Expand Down Expand Up @@ -307,6 +321,9 @@ async def _run_ws(self, text: str, output_emitter: tts.AudioEmitter) -> None:
else self._opts.language,
}

if self._opts.word_timestamps:
payload["word_timestamps"] = True

async with self._tts._pool.connection(timeout=self._conn_options.timeout) as ws:
self._acquire_time = self._tts._pool.last_acquire_time
self._connection_reused = self._tts._pool.last_connection_reused
Expand Down Expand Up @@ -337,6 +354,15 @@ async def _run_ws(self, text: str, output_emitter: tts.AudioEmitter) -> None:
audio_b64 = event.get("data", {}).get("audio")
if audio_b64:
output_emitter.push(base64.b64decode(audio_b64))
elif status == "word_timestamp":
data = event.get("data", {})
word = data.get("word")
start = data.get("start")
end = data.get("end")
if word is not None and start is not None and end is not None:
output_emitter.push_timed_transcript(
TimedString(text=word, start_time=start, end_time=end)
)
elif status == "complete":
output_emitter.end_segment()
break
Expand Down
Loading