diff --git a/.env.example b/.env.example index d11adaaa..d820e37e 100644 --- a/.env.example +++ b/.env.example @@ -211,3 +211,43 @@ EVA_RECORD_IDS= # Logging level (DEBUG | INFO | WARNING | ERROR | CRITICAL) EVA_LOG_LEVEL=INFO + +# ============================================== +# Optional: Turn Detection & VAD Configuration +# ============================================== +# Fine-tune user turn detection and voice activity detection. +# Leave commented to use smart defaults. + +# User turn start strategy: vad | transcription | external +# - vad: Start turn when VAD detects speech (default) +# - transcription: Start turn when STT produces transcription +# - external: Delegate to external service (e.g., Deepgram Flux) +# EVA_MODEL__TURN_START_STRATEGY=vad + +# User turn start strategy parameters (JSON) +# EVA_MODEL__TURN_START_STRATEGY_PARAMS='{}' + +# User turn stop strategy: turn_analyzer | speech_timeout | external +# - turn_analyzer: Use smart turn analyzer to detect natural turn end (default) +# - speech_timeout: Stop after fixed silence duration +# - external: Delegate to external service +# EVA_MODEL__TURN_STOP_STRATEGY=turn_analyzer + +# User turn stop strategy parameters (JSON) +# For speech_timeout: {"user_speech_timeout": 0.8} +# For turn_analyzer: automatically uses smart turn detection +# EVA_MODEL__TURN_STOP_STRATEGY_PARAMS='{}' + +# Note: For services with built-in turn detection (e.g., Deepgram Flux), set both to 'external': +# EVA_MODEL__TURN_START_STRATEGY=external +# EVA_MODEL__TURN_STOP_STRATEGY=external + +# VAD (Voice Activity Detection) analyzer: silero +# EVA_MODEL__VAD=silero + +# VAD parameters (JSON) +# - confidence: Minimum confidence threshold (0.0-1.0, default: 0.7) +# - start_secs: Duration to wait before confirming voice start (default: 0.2) +# - stop_secs: Duration to wait before confirming voice stop (default: 0.2) +# - min_volume: Minimum audio volume threshold (0.0-1.0, default: 0.6) +# EVA_MODEL__VAD_PARAMS='{"start_secs": 0.1, "stop_secs": 0.8, "min_volume": 0.6, "confidence": 0.7}' diff --git a/src/eva/assistant/pipeline/audio_llm_processor.py b/src/eva/assistant/pipeline/audio_llm_processor.py index 25d16a7a..19d5e1ab 100644 --- a/src/eva/assistant/pipeline/audio_llm_processor.py +++ b/src/eva/assistant/pipeline/audio_llm_processor.py @@ -417,7 +417,7 @@ def __init__( super().__init__(**kwargs) self._audio_collector = audio_collector params = params or {} - self._api_key = params.get["api_key"] + self._api_key = params["api_key"] self._model = model self._system_prompt = system_prompt or self.DEFAULT_SYSTEM_PROMPT self._sample_rate = sample_rate diff --git a/src/eva/assistant/pipeline/turn_config.py b/src/eva/assistant/pipeline/turn_config.py new file mode 100644 index 00000000..ff346979 --- /dev/null +++ b/src/eva/assistant/pipeline/turn_config.py @@ -0,0 +1,123 @@ +"""Factory functions for creating turn strategies and VAD analyzers from configuration. + +This module provides functions to instantiate Pipecat turn strategies and VAD analyzers +based on configuration settings from environment variables or config files. +""" + +from typing import Any + +from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams +from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.audio.vad.vad_analyzer import VADAnalyzer, VADParams +from pipecat.turns.user_start import ( + BaseUserTurnStartStrategy, + ExternalUserTurnStartStrategy, + TranscriptionUserTurnStartStrategy, + VADUserTurnStartStrategy, +) +from pipecat.turns.user_stop import ( + BaseUserTurnStopStrategy, + ExternalUserTurnStopStrategy, + SpeechTimeoutUserTurnStopStrategy, + TurnAnalyzerUserTurnStopStrategy, +) + +from eva.utils.logging import get_logger + +logger = get_logger(__name__) + + +def create_vad_analyzer(vad_type: str, vad_params: dict[str, Any]) -> VADAnalyzer: + """Create a VAD analyzer from configuration. + + Args: + vad_type: VAD analyzer type ('silero') + vad_params: VAD parameters (confidence, start_secs, stop_secs, min_volume) + + Returns: + VAD analyzer instance + + Raises: + ValueError: If vad_type is not supported + """ + vad_type_lower = vad_type.lower() + + if vad_type_lower == "silero": + # Create VADParams, respecting existing defaults if no params specified + params = VADParams(**vad_params) if vad_params else None + return SileroVADAnalyzer(params=params) + else: + raise ValueError(f"Unsupported VAD type: {vad_type}. Supported types: 'silero'") + + +def create_turn_start_strategy( + strategy_type: str, + strategy_params: dict[str, Any], +) -> BaseUserTurnStartStrategy: + """Create a user turn start strategy from configuration. + + Args: + strategy_type: Strategy type ('vad', 'transcription', 'external') + strategy_params: Strategy-specific parameters + + Returns: + Turn start strategy instance + + Raises: + ValueError: If strategy_type is not supported + """ + strategy_type_lower = strategy_type.lower() + + if strategy_type_lower == "vad": + # VADUserTurnStartStrategy has no required parameters + return VADUserTurnStartStrategy(**strategy_params) + elif strategy_type_lower == "transcription": + # TranscriptionUserTurnStartStrategy has no required parameters + return TranscriptionUserTurnStartStrategy(**strategy_params) + elif strategy_type_lower == "external": + # ExternalUserTurnStartStrategy has no required parameters + return ExternalUserTurnStartStrategy(**strategy_params) + else: + raise ValueError( + f"Unsupported turn start strategy: {strategy_type}. Supported types: 'vad', 'transcription', 'external'" + ) + + +def create_turn_stop_strategy( + strategy_type: str, + strategy_params: dict[str, Any], + smart_turn_stop_secs: float | None = None, +) -> BaseUserTurnStopStrategy: + """Create a user turn stop strategy from configuration. + + Args: + strategy_type: Strategy type ('speech_timeout', 'turn_analyzer', 'external') + strategy_params: Strategy-specific parameters + smart_turn_stop_secs: stop_secs for SmartTurnParams (used with turn_analyzer strategy) + + Returns: + Turn stop strategy instance + + Raises: + ValueError: If strategy_type is not supported + """ + strategy_type_lower = strategy_type.lower() + + if strategy_type_lower == "speech_timeout": + # SpeechTimeoutUserTurnStopStrategy accepts user_speech_timeout parameter + return SpeechTimeoutUserTurnStopStrategy(**strategy_params) + elif strategy_type_lower == "turn_analyzer": + # TurnAnalyzerUserTurnStopStrategy requires a turn_analyzer instance + # If smart_turn_stop_secs is provided, use it; otherwise let SmartTurnParams use its default + smart_params = SmartTurnParams(stop_secs=smart_turn_stop_secs) if smart_turn_stop_secs is not None else None + turn_analyzer = LocalSmartTurnAnalyzerV3(params=smart_params) + return TurnAnalyzerUserTurnStopStrategy(turn_analyzer=turn_analyzer, **strategy_params) + elif strategy_type_lower == "external": + # ExternalUserTurnStopStrategy has no required parameters + return ExternalUserTurnStopStrategy(**strategy_params) + else: + raise ValueError( + f"Unsupported turn stop strategy: {strategy_type}. " + f"Supported types: 'speech_timeout', 'turn_analyzer', 'external'" + ) diff --git a/src/eva/assistant/server.py b/src/eva/assistant/server.py index 2de9661a..193a0670 100644 --- a/src/eva/assistant/server.py +++ b/src/eva/assistant/server.py @@ -12,10 +12,6 @@ import uvicorn from fastapi import FastAPI, WebSocket -from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams -from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 -from pipecat.audio.vad.silero import SileroVADAnalyzer -from pipecat.audio.vad.vad_analyzer import VADParams from pipecat.frames.frames import ( CancelFrame, LLMRunFrame, @@ -40,9 +36,8 @@ FastAPIWebsocketParams, FastAPIWebsocketTransport, ) -from pipecat.turns.user_start import VADUserTurnStartStrategy from pipecat.turns.user_stop import TurnAnalyzerUserTurnStopStrategy -from pipecat.turns.user_turn_strategies import ExternalUserTurnStrategies, UserTurnStrategies +from pipecat.turns.user_turn_strategies import UserTurnStrategies from pipecat.utils.time import time_now_iso8601 from eva.assistant.agentic.audit_log import AuditLog, convert_to_epoch_ms, current_timestamp_ms @@ -61,6 +56,11 @@ create_stt_service, create_tts_service, ) +from eva.assistant.pipeline.turn_config import ( + create_turn_start_strategy, + create_turn_stop_strategy, + create_vad_analyzer, +) from eva.assistant.services.llm import LiteLLMClient from eva.assistant.tools.tool_executor import ToolExecutor from eva.models.agents import AgentConfig @@ -326,26 +326,42 @@ async def _realtime_tool_handler(params) -> None: "smart_turn_stop_secs", 0.8 ) # Shorter silence so we don't have to wait 3s if smart turn marks audio as incomplete - if ( - isinstance(self.pipeline_config, (PipelineConfig, SpeechToSpeechConfig)) - and self.pipeline_config.turn_strategy == "external" - ): - logger.info("Using external user turn strategies") - user_turn_strategies = ExternalUserTurnStrategies() - vad_analyzer = None + # Use configurable turn strategies if specified, otherwise fall back to defaults + if isinstance(self.pipeline_config, (PipelineConfig, AudioLLMConfig, SpeechToSpeechConfig)): + turn_start_cfg = self.pipeline_config.turn_start_strategy + turn_start_params = self.pipeline_config.turn_start_strategy_params + turn_stop_cfg = self.pipeline_config.turn_stop_strategy + turn_stop_params = self.pipeline_config.turn_stop_strategy_params + vad_cfg = self.pipeline_config.vad + vad_cfg_params = self.pipeline_config.vad_params else: - logger.info("Using local smart turn analyzer") - user_turn_strategies = UserTurnStrategies( - start=[VADUserTurnStartStrategy()], - stop=[ - TurnAnalyzerUserTurnStopStrategy( - turn_analyzer=LocalSmartTurnAnalyzerV3( - params=SmartTurnParams(stop_secs=smart_turn_stop_secs) - ) - ) - ], - ) - vad_analyzer = SileroVADAnalyzer(params=VADParams(stop_secs=vad_stop_secs)) + turn_start_cfg = None + turn_start_params = {} + turn_stop_cfg = None + turn_stop_params = {} + vad_cfg = None + vad_cfg_params = {} + + # Create turn start strategy using factory function + turn_start_strategy = create_turn_start_strategy(turn_start_cfg, turn_start_params) + logger.info(f"Using turn start strategy: {turn_start_cfg}") + + # Create turn stop strategy using factory function + turn_stop_strategy = create_turn_stop_strategy(turn_stop_cfg, turn_stop_params, smart_turn_stop_secs) + logger.info(f"Using turn stop strategy: {turn_stop_cfg}") + + user_turn_strategies = UserTurnStrategies( + start=[turn_start_strategy], + stop=[turn_stop_strategy], + ) + + # Create VAD analyzer using factory function + # Merge user params with pipeline-specific stop_secs + vad_params_dict = {"stop_secs": vad_stop_secs} + if vad_cfg_params: + vad_params_dict.update(vad_cfg_params) + vad_analyzer = create_vad_analyzer(vad_cfg, vad_params_dict) + logger.info(f"Using VAD analyzer: {vad_cfg}") user_aggregator, assistant_aggregator = LLMContextAggregatorPair( context, user_params=LLMUserAggregatorParams( diff --git a/src/eva/models/config.py b/src/eva/models/config.py index e08783bd..a696059b 100644 --- a/src/eva/models/config.py +++ b/src/eva/models/config.py @@ -41,7 +41,7 @@ def _param_alias(params: dict[str, Any]) -> str: """Return the display alias from a params dict.""" - return params.get("alias") or params["model"] + return params.get("alias") or params.get("model", "") class PipelineConfig(BaseModel): @@ -67,14 +67,42 @@ class PipelineConfig(BaseModel): stt_params: dict[str, Any] = Field({}, description="Additional STT model parameters (JSON)") tts_params: dict[str, Any] = Field({}, description="Additional TTS model parameters (JSON)") - turn_strategy: Literal["smart", "external"] = Field( - "smart", + # Configurable turn start/stop strategies + turn_start_strategy: str = Field( + "vad", description=( - "User turn detection strategy. " - "'smart' uses LocalSmartTurnAnalyzerV3 + SileroVAD (default). " - "'external' uses ExternalUserTurnStrategies for services with built-in turn detection " - "(e.g., deepgram-flux, Speechmatics). " - "Set via EVA_MODEL__TURN_STRATEGY=external." + "User turn start strategy: 'vad', 'transcription', or 'external'. " + "Defaults to 'vad' (VADUserTurnStartStrategy). " + "Set via EVA_MODEL__TURN_START_STRATEGY." + ), + ) + turn_start_strategy_params: dict[str, Any] = Field( + {}, + description="Parameters for turn start strategy (JSON). Set via EVA_MODEL__TURN_START_STRATEGY_PARAMS.", + ) + + turn_stop_strategy: str = Field( + "turn_analyzer", + description=( + "User turn stop strategy: 'speech_timeout', 'turn_analyzer', or 'external'. " + "Defaults to 'turn_analyzer' (TurnAnalyzerUserTurnStopStrategy with LocalSmartTurnAnalyzerV3). " + "Set via EVA_MODEL__TURN_STOP_STRATEGY." + ), + ) + turn_stop_strategy_params: dict[str, Any] = Field( + {}, + description="Parameters for turn stop strategy (JSON). Set via EVA_MODEL__TURN_STOP_STRATEGY_PARAMS.", + ) + + # VAD configuration + vad: str = Field( + "silero", + description=("VAD analyzer type: 'silero'. Defaults to 'silero' (SileroVADAnalyzer). Set via EVA_MODEL__VAD."), + ) + vad_params: dict[str, Any] = Field( + {}, + description=( + "VAD parameters (JSON): confidence, start_secs, stop_secs, min_volume. Set via EVA_MODEL__VAD_PARAMS." ), ) @@ -102,6 +130,15 @@ def _migrate_legacy_fields(cls, data: Any) -> Any: data.pop(key, None) return data + @field_serializer("stt_params", "tts_params") + @classmethod + def _redact_api_keys(cls, params: dict[str, Any]) -> dict[str, Any]: + """Redact API keys when serializing.""" + redacted = params.copy() + if "api_key" in redacted: + redacted["api_key"] = "***" + return redacted + class SpeechToSpeechConfig(BaseModel): """Configuration for a speech-to-speech model.""" @@ -111,14 +148,42 @@ class SpeechToSpeechConfig(BaseModel): s2s: str = Field(description="Speech-to-speech model name", examples=["gpt-realtime-mini", "gemini_live"]) s2s_params: dict[str, Any] = Field({}, description="Additional speech-to-speech model parameters (JSON)") - turn_strategy: Literal["smart", "external"] = Field( - "smart", + # Configurable turn start/stop strategies (same as PipelineConfig) + turn_start_strategy: str = Field( + "vad", + description=( + "User turn start strategy: 'vad', 'transcription', or 'external'. " + "Defaults to 'vad' (VADUserTurnStartStrategy). " + "Set via EVA_MODEL__TURN_START_STRATEGY." + ), + ) + turn_start_strategy_params: dict[str, Any] = Field( + {}, + description="Parameters for turn start strategy (JSON). Set via EVA_MODEL__TURN_START_STRATEGY_PARAMS.", + ) + + turn_stop_strategy: str = Field( + "turn_analyzer", + description=( + "User turn stop strategy: 'speech_timeout', 'turn_analyzer', or 'external'. " + "Defaults to 'turn_analyzer' (TurnAnalyzerUserTurnStopStrategy with LocalSmartTurnAnalyzerV3). " + "Set via EVA_MODEL__TURN_STOP_STRATEGY." + ), + ) + turn_stop_strategy_params: dict[str, Any] = Field( + {}, + description="Parameters for turn stop strategy (JSON). Set via EVA_MODEL__TURN_STOP_STRATEGY_PARAMS.", + ) + + # VAD configuration + vad: str = Field( + "silero", + description=("VAD analyzer type: 'silero'. Defaults to 'silero' (SileroVADAnalyzer). Set via EVA_MODEL__VAD."), + ) + vad_params: dict[str, Any] = Field( + {}, description=( - "User turn detection strategy. " - "'smart' uses LocalSmartTurnAnalyzerV3 + SileroVAD (default). " - "'external' uses ExternalUserTurnStrategies for services with built-in turn detection " - "(e.g., deepgram-flux, Speechmatics). " - "Set via EVA_MODEL__TURN_STRATEGY=external." + "VAD parameters (JSON): confidence, start_secs, stop_secs, min_volume. Set via EVA_MODEL__VAD_PARAMS." ), ) @@ -127,6 +192,15 @@ def pipeline_parts(self) -> dict[str, str]: """Component names for this pipeline.""" return {"s2s": _param_alias(self.s2s_params) or self.s2s} + @field_serializer("s2s_params") + @classmethod + def _redact_api_keys(cls, params: dict[str, Any]) -> dict[str, Any]: + """Redact API keys when serializing.""" + redacted = params.copy() + if "api_key" in redacted: + redacted["api_key"] = "***" + return redacted + class AudioLLMConfig(BaseModel): """Configuration for an Audio-LLM pipeline (audio in, text out, separate TTS). @@ -143,11 +217,53 @@ class AudioLLMConfig(BaseModel): ) audio_llm_params: dict[str, Any] = Field( {}, - description="Audio-LLM parameters (JSON): base_url (required), api_key, model, temperature, max_tokens", + description=( + "Audio-LLM parameters (JSON): base_url (required), api_key, model, temperature, max_tokens, " + "vad_stop_secs (default: 0.4), smart_turn_stop_secs (default: 0.8)" + ), ) tts: str = Field(description="TTS model", examples=["cartesia", "elevenlabs"]) tts_params: dict[str, Any] = Field({}, description="Additional TTS model parameters (JSON)") + # Configurable turn start/stop strategies (same as PipelineConfig) + turn_start_strategy: str = Field( + "vad", + description=( + "User turn start strategy: 'vad', 'transcription', or 'external'. " + "Defaults to 'vad' (VADUserTurnStartStrategy). " + "Set via EVA_MODEL__TURN_START_STRATEGY." + ), + ) + turn_start_strategy_params: dict[str, Any] = Field( + {}, + description="Parameters for turn start strategy (JSON). Set via EVA_MODEL__TURN_START_STRATEGY_PARAMS.", + ) + + turn_stop_strategy: str = Field( + "turn_analyzer", + description=( + "User turn stop strategy: 'speech_timeout', 'turn_analyzer', or 'external'. " + "Defaults to 'turn_analyzer' (TurnAnalyzerUserTurnStopStrategy with LocalSmartTurnAnalyzerV3). " + "Set via EVA_MODEL__TURN_STOP_STRATEGY." + ), + ) + turn_stop_strategy_params: dict[str, Any] = Field( + {}, + description="Parameters for turn stop strategy (JSON). Set via EVA_MODEL__TURN_STOP_STRATEGY_PARAMS.", + ) + + # VAD configuration + vad: str = Field( + "silero", + description=("VAD analyzer type: 'silero'. Defaults to 'silero' (SileroVADAnalyzer). Set via EVA_MODEL__VAD."), + ) + vad_params: dict[str, Any] = Field( + {}, + description=( + "VAD parameters (JSON): confidence, start_secs, stop_secs, min_volume. Set via EVA_MODEL__VAD_PARAMS." + ), + ) + @property def pipeline_parts(self) -> dict[str, str]: """Component names for this pipeline.""" @@ -156,6 +272,15 @@ def pipeline_parts(self) -> dict[str, str]: "tts": _param_alias(self.tts_params) or self.tts, } + @field_serializer("audio_llm_params", "tts_params") + @classmethod + def _redact_api_keys(cls, params: dict[str, Any]) -> dict[str, Any]: + """Redact API keys when serializing.""" + redacted = params.copy() + if "api_key" in redacted: + redacted["api_key"] = "***" + return redacted + _PIPELINE_FIELDS = { "llm", @@ -163,12 +288,37 @@ def pipeline_parts(self) -> dict[str, str]: "tts", "stt_params", "tts_params", - "turn_strategy", + "turn_start_strategy", + "turn_start_strategy_params", + "turn_stop_strategy", + "turn_stop_strategy_params", + "vad", + "vad_params", *PipelineConfig._LEGACY_RENAMES, *PipelineConfig._LEGACY_DROP, } -_S2S_FIELDS = {"s2s", "s2s_params", "turn_strategy"} -_AUDIO_LLM_FIELDS = {"audio_llm", "audio_llm_params", "tts", "tts_params"} +_S2S_FIELDS = { + "s2s", + "s2s_params", + "turn_start_strategy", + "turn_start_strategy_params", + "turn_stop_strategy", + "turn_stop_strategy_params", + "vad", + "vad_params", +} +_AUDIO_LLM_FIELDS = { + "audio_llm", + "audio_llm_params", + "tts", + "tts_params", + "turn_start_strategy", + "turn_start_strategy_params", + "turn_stop_strategy", + "turn_stop_strategy_params", + "vad", + "vad_params", +} def _model_config_discriminator(data: Any) -> str: