Skip to content
148 changes: 115 additions & 33 deletions livekit-plugins/livekit-plugins-google/livekit/plugins/google/tts.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
NUM_CHANNELS = 1
DEFAULT_LANGUAGE = "en-US"
DEFAULT_GENDER = "neutral"
# Google Cloud TTS API limit: input.text or input.ssml must not exceed 5000 bytes
GOOGLE_TTS_MAX_INPUT_BYTES = 5000


@dataclass
Expand Down Expand Up @@ -293,52 +295,98 @@ def __init__(self, *, tts: TTS, input_text: str, conn_options: APIConnectOptions
self._tts: TTS = tts
self._opts = replace(tts._opts)

def _build_ssml(self) -> str:
ssml = "<speak>"
ssml += self._input_text
ssml += "</speak>"
return ssml
@staticmethod
def _build_ssml(text: str) -> str:
return f"<speak>{text}</speak>"

def _get_text_chunks(self) -> list[str]:
"""Split input text into chunks that respect Google TTS byte limits.

Google Cloud TTS API rejects input.text or input.ssml longer than
5000 bytes. Uses the configured sentence tokenizer (blingfire by
default) for boundary detection. Structured content (SSML, markup)
is returned as-is because naive sentence splitting would break tag
structure.
"""
if self._opts.enable_ssml or self._opts.use_markup:
return [self._input_text]

max_bytes = GOOGLE_TTS_MAX_INPUT_BYTES

if len(self._input_text.encode("utf-8")) <= max_bytes:
return [self._input_text]

sentences = self._opts.tokenizer.tokenize(text=self._input_text)
return _split_sentences_by_bytes(sentences, max_bytes)

def _build_synthesis_input(self, text: str) -> texttospeech.SynthesisInput:
if self._opts.use_markup:
tts_input = texttospeech.SynthesisInput(
markup=text, custom_pronunciations=self._opts.custom_pronunciations
)
elif self._opts.enable_ssml:
tts_input = texttospeech.SynthesisInput(
ssml=self._build_ssml(text),
custom_pronunciations=self._opts.custom_pronunciations,
)
else:
tts_input = texttospeech.SynthesisInput(
text=text, custom_pronunciations=self._opts.custom_pronunciations
)

if self._opts.prompt is not None:
tts_input.prompt = self._opts.prompt

return tts_input

async def _run(self, output_emitter: tts.AudioEmitter) -> None:
try:
if self._opts.use_markup:
tts_input = texttospeech.SynthesisInput(
markup=self._input_text, custom_pronunciations=self._opts.custom_pronunciations
)
elif self._opts.enable_ssml:
tts_input = texttospeech.SynthesisInput(
ssml=self._build_ssml(), custom_pronunciations=self._opts.custom_pronunciations
)
else:
tts_input = texttospeech.SynthesisInput(
text=self._input_text, custom_pronunciations=self._opts.custom_pronunciations
text_chunks = self._get_text_chunks()
encoding = self._opts.encoding

# Container-based formats (MP3, OGG_OPUS) cannot be naively
# concatenated when text is split into multiple chunks because each
# API response is an independently encoded file with its own
# headers. Fall back to PCM which is raw sample data and safe to
# concatenate.
if len(text_chunks) > 1 and encoding not in (
texttospeech.AudioEncoding.PCM,
texttospeech.AudioEncoding.LINEAR16,
):
logger.debug(
"text was split into %d chunks, forcing PCM encoding "
"to avoid corrupted audio from concatenated container files",
len(text_chunks),
)
encoding = texttospeech.AudioEncoding.PCM # type: ignore
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.

if self._opts.prompt is not None:
tts_input.prompt = self._opts.prompt

response: SynthesizeSpeechResponse = await self._tts._ensure_client().synthesize_speech(
input=tts_input,
voice=self._opts.voice,
audio_config=texttospeech.AudioConfig(
audio_encoding=self._opts.encoding,
sample_rate_hertz=self._opts.sample_rate,
pitch=self._opts.pitch,
effects_profile_id=self._opts.effects_profile_id,
speaking_rate=self._opts.speaking_rate,
volume_gain_db=self._opts.volume_gain_db,
),
timeout=self._conn_options.timeout,
audio_config = texttospeech.AudioConfig(
audio_encoding=encoding,
sample_rate_hertz=self._opts.sample_rate,
pitch=self._opts.pitch,
effects_profile_id=self._opts.effects_profile_id,
speaking_rate=self._opts.speaking_rate,
volume_gain_db=self._opts.volume_gain_db,
)

output_emitter.initialize(
request_id=utils.shortuuid(),
sample_rate=self._opts.sample_rate,
num_channels=1,
mime_type=_encoding_to_mimetype(self._opts.encoding),
mime_type=_encoding_to_mimetype(encoding),
)

output_emitter.push(response.audio_content)
for chunk in text_chunks:
tts_input = self._build_synthesis_input(chunk)
response: SynthesizeSpeechResponse = (
await self._tts._ensure_client().synthesize_speech(
input=tts_input,
voice=self._opts.voice,
audio_config=audio_config,
timeout=self._conn_options.timeout,
)
)
output_emitter.push(response.audio_content)
except DeadlineExceeded:
raise APITimeoutError() from None
except GoogleAPICallError as e:
Expand Down Expand Up @@ -456,6 +504,40 @@ async def input_generator() -> AsyncGenerator[
await input_gen.aclose()


def _split_sentences_by_bytes(sentences: list[str], max_bytes: int) -> list[str]:
"""Merge tokenized sentences into chunks that fit within a byte limit.

If a single sentence exceeds *max_bytes* it is included as-is and a warning
is logged. Splitting mid-sentence would generally produce incorrect
intonation from the TTS engine.
"""
chunks: list[str] = []
current = ""

for sentence in sentences:
if not sentence:
continue
candidate = (current + " " + sentence) if current else sentence
if len(candidate.encode("utf-8")) <= max_bytes:
current = candidate
else:
if current:
chunks.append(current)
if len(sentence.encode("utf-8")) > max_bytes:
logger.warning(
"single sentence exceeds Google TTS byte limit (%d bytes), "
"synthesis may fail: %.100s...",
len(sentence.encode("utf-8")),
sentence,
)
current = sentence

if current:
chunks.append(current)

return chunks


def _gender_from_str(gender: str) -> SsmlVoiceGender:
ssml_gender = SsmlVoiceGender.NEUTRAL
if gender == "male":
Expand Down
Loading