From 91a493b77adad6a889078dfeccd58823ebe2aa57 Mon Sep 17 00:00:00 2001 From: Atsushi Morimoto <74th.tech@gmail.com> Date: Mon, 9 Mar 2026 19:44:27 +0900 Subject: [PATCH 1/5] feat: implement speech recognition integration with Google Cloud --- stackchan_server/app.py | 9 +++-- .../speech_recognition/__init__.py | 31 +++++++++++++++ .../speech_recognition/google_cloud.py | 39 +++++++++++++++++++ stackchan_server/types.py | 18 +++++++++ stackchan_server/ws_proxy.py | 24 ++++++------ 5 files changed, 104 insertions(+), 17 deletions(-) create mode 100644 stackchan_server/speech_recognition/__init__.py create mode 100644 stackchan_server/speech_recognition/google_cloud.py create mode 100644 stackchan_server/types.py diff --git a/stackchan_server/app.py b/stackchan_server/app.py index 2239fbb..c192556 100644 --- a/stackchan_server/app.py +++ b/stackchan_server/app.py @@ -5,16 +5,17 @@ from typing import Awaitable, Callable, Optional from fastapi import FastAPI, WebSocket, WebSocketDisconnect -from google.cloud import speech +from .speech_recognition import create_speech_recognizer +from .types import SpeechRecognizer from .ws_proxy import WsProxy logger = getLogger(__name__) class StackChanApp: - def __init__(self) -> None: - self.speech_client = speech.SpeechClient() + def __init__(self, speech_recognizer: SpeechRecognizer | None = None) -> None: + self.speech_recognizer = speech_recognizer or create_speech_recognizer() self.fastapi = FastAPI(title="StackChan WebSocket Server") self._setup_fn: Optional[Callable[[WsProxy], Awaitable[None]]] = None self._talk_session_fn: Optional[Callable[[WsProxy], Awaitable[None]]] = None @@ -37,7 +38,7 @@ def talk_session(self, fn: Callable[["WsProxy"], Awaitable[None]]): async def _handle_ws(self, websocket: WebSocket) -> None: await websocket.accept() - proxy = WsProxy(websocket, speech_client=self.speech_client) + proxy = WsProxy(websocket, speech_recognizer=self.speech_recognizer) await proxy.start() try: if self._setup_fn: diff --git a/stackchan_server/speech_recognition/__init__.py b/stackchan_server/speech_recognition/__init__.py new file mode 100644 index 0000000..bbaefba --- /dev/null +++ b/stackchan_server/speech_recognition/__init__.py @@ -0,0 +1,31 @@ +from __future__ import annotations + +import importlib +import os +from typing import cast + +from ..types import SpeechRecognizer +from .google_cloud import GoogleCloudSpeechToText + +_DEFAULT_RECOGNIZER = "google_cloud" + + +def create_speech_recognizer() -> SpeechRecognizer: + recognizer_name = os.getenv("STACKCHAN_SPEECH_RECOGNIZER", _DEFAULT_RECOGNIZER) + + if recognizer_name == "google_cloud": + return GoogleCloudSpeechToText() + + module_name, separator, attr_name = recognizer_name.partition(":") + if not separator: + raise ValueError( + "STACKCHAN_SPEECH_RECOGNIZER must be 'google_cloud' or ':'" + ) + + module = importlib.import_module(module_name) + target = getattr(module, attr_name) + instance = target() if callable(target) else target + return cast(SpeechRecognizer, instance) + + +__all__ = ["GoogleCloudSpeechToText", "create_speech_recognizer"] diff --git a/stackchan_server/speech_recognition/google_cloud.py b/stackchan_server/speech_recognition/google_cloud.py new file mode 100644 index 0000000..f6470d3 --- /dev/null +++ b/stackchan_server/speech_recognition/google_cloud.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +from google.cloud import speech + +from ..types import SpeechRecognizer + + +class GoogleCloudSpeechToText(SpeechRecognizer): + def __init__(self, client: speech.SpeechClient | None = None) -> None: + self._client = client or speech.SpeechClient() + + def transcribe( + self, + pcm_bytes: bytes, + *, + sample_rate_hz: int, + channels: int, + sample_width: int, + language_code: str = "ja-JP", + ) -> str: + if channels != 1: + raise ValueError(f"Google Cloud Speech only supports mono input here: channels={channels}") + if sample_width != 2: + raise ValueError( + f"Google Cloud Speech LINEAR16 requires 16-bit samples here: sample_width={sample_width}" + ) + + audio = speech.RecognitionAudio(content=pcm_bytes) + config = speech.RecognitionConfig( + encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16, + sample_rate_hertz=sample_rate_hz, + language_code=language_code, + ) + response = self._client.recognize(config=config, audio=audio) + + return "".join(result.alternatives[0].transcript for result in response.results) + + +__all__ = ["GoogleCloudSpeechToText"] diff --git a/stackchan_server/types.py b/stackchan_server/types.py new file mode 100644 index 0000000..6efd57e --- /dev/null +++ b/stackchan_server/types.py @@ -0,0 +1,18 @@ +from __future__ import annotations + +from typing import Protocol + + +class SpeechRecognizer(Protocol): + def transcribe( + self, + pcm_bytes: bytes, + *, + sample_rate_hz: int, + channels: int, + sample_width: int, + language_code: str = "ja-JP", + ) -> str: ... + + +__all__ = ["SpeechRecognizer"] diff --git a/stackchan_server/ws_proxy.py b/stackchan_server/ws_proxy.py index 8bda2f0..6e42aa2 100644 --- a/stackchan_server/ws_proxy.py +++ b/stackchan_server/ws_proxy.py @@ -13,9 +13,10 @@ from typing import Optional from fastapi import WebSocket, WebSocketDisconnect -from google.cloud import speech from vvclient import Client as VVClient +from .types import SpeechRecognizer + logger = getLogger(__name__) _BASE_DIR = Path(__file__).resolve().parent @@ -71,9 +72,9 @@ def create_voicevox_client() -> VVClient: class WsProxy: - def __init__(self, websocket: WebSocket, speech_client: speech.SpeechClient): + def __init__(self, websocket: WebSocket, speech_recognizer: SpeechRecognizer): self.ws = websocket - self.speech_client = speech_client + self.speech_recognizer = speech_recognizer self.recordings_dir = _RECORDINGS_DIR self._debug_recording = _DEBUG_RECORDING_ENABLED if self._debug_recording: @@ -399,18 +400,15 @@ async def _transcribe_async(self, pcm_bytes: bytes) -> str: return await loop.run_in_executor(None, lambda: self._transcribe(pcm_bytes)) def _transcribe(self, pcm_bytes: bytes) -> str: - audio = speech.RecognitionAudio(content=pcm_bytes) - config = speech.RecognitionConfig( - encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16, - sample_rate_hertz=_SAMPLE_RATE_HZ, + transcript = self.speech_recognizer.transcribe( + pcm_bytes, + sample_rate_hz=_SAMPLE_RATE_HZ, + channels=_CHANNELS, + sample_width=_SAMPLE_WIDTH, language_code="ja-JP", ) - response = self.speech_client.recognize(config=config, audio=audio) - - transcript = "" - for result in response.results: - logger.info("Transcript: %s", result.alternatives[0].transcript) - transcript += result.alternatives[0].transcript + if transcript: + logger.info("Transcript: %s", transcript) return transcript def _extract_pcm(self, wav_bytes: bytes) -> tuple[bytes, int, int, int]: From 869f4478f06982af9a1036c5c6cf0d68b24f24de Mon Sep 17 00:00:00 2001 From: Atsushi Morimoto <74th.tech@gmail.com> Date: Mon, 9 Mar 2026 19:50:15 +0900 Subject: [PATCH 2/5] refactor: simplify speech recognizer creation by removing dynamic module loading --- .../speech_recognition/__init__.py | 23 +------------------ 1 file changed, 1 insertion(+), 22 deletions(-) diff --git a/stackchan_server/speech_recognition/__init__.py b/stackchan_server/speech_recognition/__init__.py index bbaefba..cd6674e 100644 --- a/stackchan_server/speech_recognition/__init__.py +++ b/stackchan_server/speech_recognition/__init__.py @@ -1,31 +1,10 @@ from __future__ import annotations -import importlib -import os -from typing import cast - from ..types import SpeechRecognizer from .google_cloud import GoogleCloudSpeechToText -_DEFAULT_RECOGNIZER = "google_cloud" - - def create_speech_recognizer() -> SpeechRecognizer: - recognizer_name = os.getenv("STACKCHAN_SPEECH_RECOGNIZER", _DEFAULT_RECOGNIZER) - - if recognizer_name == "google_cloud": - return GoogleCloudSpeechToText() - - module_name, separator, attr_name = recognizer_name.partition(":") - if not separator: - raise ValueError( - "STACKCHAN_SPEECH_RECOGNIZER must be 'google_cloud' or ':'" - ) - - module = importlib.import_module(module_name) - target = getattr(module, attr_name) - instance = target() if callable(target) else target - return cast(SpeechRecognizer, instance) + return GoogleCloudSpeechToText() __all__ = ["GoogleCloudSpeechToText", "create_speech_recognizer"] From 48b670fcbd6fc5f3413a07dec996151e9bb1cacf Mon Sep 17 00:00:00 2001 From: Atsushi Morimoto <74th.tech@gmail.com> Date: Mon, 9 Mar 2026 20:04:14 +0900 Subject: [PATCH 3/5] feat: add streaming speech recognition support with Google Cloud integration --- example_apps/echo.py | 7 +- .../speech_recognition/__init__.py | 1 + .../speech_recognition/google_cloud.py | 117 +++++++++++++++++- stackchan_server/types.py | 26 +++- stackchan_server/ws_proxy.py | 62 +++++++++- 5 files changed, 204 insertions(+), 9 deletions(-) diff --git a/example_apps/echo.py b/example_apps/echo.py index 2cf709f..5185c92 100644 --- a/example_apps/echo.py +++ b/example_apps/echo.py @@ -3,7 +3,7 @@ from logging import StreamHandler, getLogger from stackchan_server.app import StackChanApp -from stackchan_server.ws_proxy import WsProxy +from stackchan_server.ws_proxy import EmptyTranscriptError, WsProxy logger = getLogger(__name__) @@ -22,7 +22,10 @@ async def setup(proxy: WsProxy): @app.talk_session async def talk_session(proxy: WsProxy): while True: - text = await proxy.listen() + try: + text = await proxy.listen() + except EmptyTranscriptError: + return if not text: return logger.info("Heard: %s", text) diff --git a/stackchan_server/speech_recognition/__init__.py b/stackchan_server/speech_recognition/__init__.py index cd6674e..660cf6d 100644 --- a/stackchan_server/speech_recognition/__init__.py +++ b/stackchan_server/speech_recognition/__init__.py @@ -3,6 +3,7 @@ from ..types import SpeechRecognizer from .google_cloud import GoogleCloudSpeechToText + def create_speech_recognizer() -> SpeechRecognizer: return GoogleCloudSpeechToText() diff --git a/stackchan_server/speech_recognition/google_cloud.py b/stackchan_server/speech_recognition/google_cloud.py index f6470d3..55448c9 100644 --- a/stackchan_server/speech_recognition/google_cloud.py +++ b/stackchan_server/speech_recognition/google_cloud.py @@ -1,11 +1,108 @@ from __future__ import annotations +import queue +import threading +from logging import getLogger + from google.cloud import speech -from ..types import SpeechRecognizer +from ..types import SpeechRecognizer, StreamingSpeechRecognizer, StreamingSpeechSession + +logger = getLogger(__name__) +_STREAM_END = object() + + +class _GoogleCloudStreamingSession(StreamingSpeechSession): + def __init__( + self, + client: speech.SpeechClient, + *, + sample_rate_hz: int, + channels: int, + sample_width: int, + language_code: str, + ) -> None: + if channels != 1: + raise ValueError(f"Google Cloud Speech only supports mono input here: channels={channels}") + if sample_width != 2: + raise ValueError( + f"Google Cloud Speech LINEAR16 requires 16-bit samples here: sample_width={sample_width}" + ) + + self._client = client + self._config = speech.StreamingRecognitionConfig( + config=speech.RecognitionConfig( + encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16, + sample_rate_hertz=sample_rate_hz, + language_code=language_code, + ), + interim_results=False, + single_utterance=False, + ) + self._audio_queue: queue.Queue[bytes | object] = queue.Queue() + self._done = threading.Event() + self._closed = False + self._error: Exception | None = None + self._final_transcripts: list[str] = [] + self._latest_transcript = "" + self._thread = threading.Thread(target=self._run, name="gcloud-speech-stream", daemon=True) + self._thread.start() + + def push_audio(self, pcm_bytes: bytes) -> None: + if self._closed: + raise RuntimeError("streaming speech session is already closed") + if pcm_bytes: + self._audio_queue.put(bytes(pcm_bytes)) + + def finish(self) -> str: + self._close_stream() + self._thread.join(timeout=30.0) + if self._thread.is_alive(): + raise TimeoutError("timed out waiting for streaming speech recognition to finish") + if self._error is not None: + raise self._error + transcript = "".join(self._final_transcripts) + return transcript or self._latest_transcript + + def abort(self) -> None: + self._close_stream() + self._done.wait(timeout=1.0) + def _close_stream(self) -> None: + if self._closed: + return + self._closed = True + self._audio_queue.put(_STREAM_END) -class GoogleCloudSpeechToText(SpeechRecognizer): + def _request_iter(self): + while True: + chunk = self._audio_queue.get() + if chunk is _STREAM_END: + return + yield speech.StreamingRecognizeRequest(audio_content=chunk) + + def _run(self) -> None: + try: + responses = self._client.streaming_recognize(self._config, self._request_iter()) + for response in responses: + for result in response.results: + if not result.alternatives: + continue + transcript = result.alternatives[0].transcript + if result.is_final: + logger.info("Streaming transcript(final): %s", transcript) + self._final_transcripts.append(transcript) + self._latest_transcript = "" + else: + logger.info("Streaming transcript(interim): %s", transcript) + self._latest_transcript = transcript + except Exception as exc: + self._error = exc + finally: + self._done.set() + + +class GoogleCloudSpeechToText(StreamingSpeechRecognizer): def __init__(self, client: speech.SpeechClient | None = None) -> None: self._client = client or speech.SpeechClient() @@ -35,5 +132,21 @@ def transcribe( return "".join(result.alternatives[0].transcript for result in response.results) + def start_stream( + self, + *, + sample_rate_hz: int, + channels: int, + sample_width: int, + language_code: str = "ja-JP", + ) -> StreamingSpeechSession: + return _GoogleCloudStreamingSession( + self._client, + sample_rate_hz=sample_rate_hz, + channels=channels, + sample_width=sample_width, + language_code=language_code, + ) + __all__ = ["GoogleCloudSpeechToText"] diff --git a/stackchan_server/types.py b/stackchan_server/types.py index 6efd57e..4e57bb2 100644 --- a/stackchan_server/types.py +++ b/stackchan_server/types.py @@ -1,8 +1,9 @@ from __future__ import annotations -from typing import Protocol +from typing import Protocol, runtime_checkable +@runtime_checkable class SpeechRecognizer(Protocol): def transcribe( self, @@ -15,4 +16,25 @@ def transcribe( ) -> str: ... -__all__ = ["SpeechRecognizer"] +@runtime_checkable +class StreamingSpeechSession(Protocol): + def push_audio(self, pcm_bytes: bytes) -> None: ... + + def finish(self) -> str: ... + + def abort(self) -> None: ... + + +@runtime_checkable +class StreamingSpeechRecognizer(SpeechRecognizer, Protocol): + def start_stream( + self, + *, + sample_rate_hz: int, + channels: int, + sample_width: int, + language_code: str = "ja-JP", + ) -> StreamingSpeechSession: ... + + +__all__ = ["SpeechRecognizer", "StreamingSpeechRecognizer", "StreamingSpeechSession"] diff --git a/stackchan_server/ws_proxy.py b/stackchan_server/ws_proxy.py index 6e42aa2..7727b5c 100644 --- a/stackchan_server/ws_proxy.py +++ b/stackchan_server/ws_proxy.py @@ -15,7 +15,7 @@ from fastapi import WebSocket, WebSocketDisconnect from vvclient import Client as VVClient -from .types import SpeechRecognizer +from .types import SpeechRecognizer, StreamingSpeechRecognizer, StreamingSpeechSession logger = getLogger(__name__) @@ -88,6 +88,7 @@ def __init__(self, websocket: WebSocket, speech_recognizer: SpeechRecognizer): self._message_error: Optional[Exception] = None self._transcript: Optional[str] = None self._wakeword_event = asyncio.Event() + self._speech_stream: Optional[StreamingSpeechSession] = None self._receiving_task: Optional[asyncio.Task] = None self._closed = False @@ -185,6 +186,7 @@ async def close(self) -> None: self._receiving_task.cancel() with suppress(asyncio.CancelledError): await self._receiving_task + self._abort_speech_stream() async def start_talking(self, text: str) -> None: await self.speak(text) @@ -238,7 +240,8 @@ async def _receive_loop(self) -> None: if kind == _WsKind.PCM: if msg_type == _WsMsgType.START: - self._handle_listening_start() + if not self._handle_listening_start(): + break continue if msg_type == _WsMsgType.DATA: @@ -273,36 +276,67 @@ async def _receive_loop(self) -> None: self._closed = True self._speaking = False - def _handle_listening_start(self) -> None: + def _handle_listening_start(self) -> bool: logger.info("Received START") + self._abort_speech_stream() self._pcm_buffer = bytearray() self._streaming = True self._message_error = None + if isinstance(self.speech_recognizer, StreamingSpeechRecognizer): + try: + self._speech_stream = self.speech_recognizer.start_stream( + sample_rate_hz=_SAMPLE_RATE_HZ, + channels=_CHANNELS, + sample_width=_SAMPLE_WIDTH, + language_code="ja-JP", + ) + except Exception: + asyncio.create_task(self.ws.close(code=1011, reason="speech streaming failed")) + return False + return True def _handle_listening_data(self, payload_bytes: int, payload: bytes) -> bool: logger.info("Received DATA payload_bytes=%d", payload_bytes) if not self._streaming: + self._abort_speech_stream() asyncio.create_task(self.ws.close(code=1003, reason="data received before start")) return False if payload_bytes % (_SAMPLE_WIDTH * _CHANNELS) != 0: + self._abort_speech_stream() asyncio.create_task(self.ws.close(code=1003, reason="invalid pcm chunk length")) return False self._pcm_buffer.extend(payload) if payload_bytes > 0: + try: + self._push_speech_stream(payload) + except Exception: + self._abort_speech_stream() + asyncio.create_task(self.ws.close(code=1011, reason="speech streaming failed")) + return False self._pcm_data_counter += 1 return True async def _handle_listening_end(self, payload_bytes: int, payload: bytes) -> None: logger.info("Received END payload_bytes=%d", payload_bytes) if not self._streaming: + self._abort_speech_stream() await self.ws.close(code=1003, reason="end received before start") return if payload_bytes % (_SAMPLE_WIDTH * _CHANNELS) != 0: + self._abort_speech_stream() await self.ws.close(code=1003, reason="invalid pcm tail length") return self._pcm_buffer.extend(payload) + if payload_bytes > 0: + try: + self._push_speech_stream(payload) + except Exception: + self._abort_speech_stream() + await self.ws.close(code=1011, reason="speech streaming failed") + return if len(self._pcm_buffer) == 0 or len(self._pcm_buffer) % (_SAMPLE_WIDTH * _CHANNELS) != 0: + self._abort_speech_stream() await self.ws.close(code=1003, reason="invalid accumulated pcm length") return @@ -396,6 +430,8 @@ def _save_wav(self, pcm_bytes: bytes) -> tuple[Path, str]: return filepath, filename async def _transcribe_async(self, pcm_bytes: bytes) -> str: + if self._speech_stream is not None: + return await asyncio.to_thread(self._finish_speech_stream) loop = asyncio.get_running_loop() return await loop.run_in_executor(None, lambda: self._transcribe(pcm_bytes)) @@ -411,6 +447,26 @@ def _transcribe(self, pcm_bytes: bytes) -> str: logger.info("Transcript: %s", transcript) return transcript + def _push_speech_stream(self, pcm_bytes: bytes) -> None: + if self._speech_stream is not None: + self._speech_stream.push_audio(pcm_bytes) + + def _finish_speech_stream(self) -> str: + speech_stream = self._speech_stream + self._speech_stream = None + if speech_stream is None: + return "" + transcript = speech_stream.finish() + if transcript: + logger.info("Transcript: %s", transcript) + return transcript + + def _abort_speech_stream(self) -> None: + speech_stream = self._speech_stream + self._speech_stream = None + if speech_stream is not None: + speech_stream.abort() + def _extract_pcm(self, wav_bytes: bytes) -> tuple[bytes, int, int, int]: with wave.open(io.BytesIO(wav_bytes), "rb") as wf: pcm_bytes = wf.readframes(wf.getnframes()) From e275e78a4650451bbbab40567742b4ea16c87f0a Mon Sep 17 00:00:00 2001 From: Atsushi Morimoto <74th.tech@gmail.com> Date: Mon, 9 Mar 2026 20:10:57 +0900 Subject: [PATCH 4/5] feat: refactor speech recognition to use asyncio for improved performance --- .../speech_recognition/google_cloud.py | 65 ++++++++++--------- stackchan_server/types.py | 10 +-- stackchan_server/ws_proxy.py | 53 ++++++++------- 3 files changed, 65 insertions(+), 63 deletions(-) diff --git a/stackchan_server/speech_recognition/google_cloud.py b/stackchan_server/speech_recognition/google_cloud.py index 55448c9..3780ea6 100644 --- a/stackchan_server/speech_recognition/google_cloud.py +++ b/stackchan_server/speech_recognition/google_cloud.py @@ -1,12 +1,11 @@ from __future__ import annotations -import queue -import threading +import asyncio from logging import getLogger from google.cloud import speech -from ..types import SpeechRecognizer, StreamingSpeechRecognizer, StreamingSpeechSession +from ..types import StreamingSpeechRecognizer, StreamingSpeechSession logger = getLogger(__name__) _STREAM_END = object() @@ -15,7 +14,7 @@ class _GoogleCloudStreamingSession(StreamingSpeechSession): def __init__( self, - client: speech.SpeechClient, + client: speech.SpeechAsyncClient, *, sample_rate_hz: int, channels: int, @@ -39,52 +38,54 @@ def __init__( interim_results=False, single_utterance=False, ) - self._audio_queue: queue.Queue[bytes | object] = queue.Queue() - self._done = threading.Event() + self._audio_queue: asyncio.Queue[bytes | object] = asyncio.Queue() + self._done = asyncio.Event() self._closed = False self._error: Exception | None = None self._final_transcripts: list[str] = [] self._latest_transcript = "" - self._thread = threading.Thread(target=self._run, name="gcloud-speech-stream", daemon=True) - self._thread.start() + self._task = asyncio.create_task(self._run()) - def push_audio(self, pcm_bytes: bytes) -> None: + async def push_audio(self, pcm_bytes: bytes) -> None: if self._closed: raise RuntimeError("streaming speech session is already closed") if pcm_bytes: - self._audio_queue.put(bytes(pcm_bytes)) + await self._audio_queue.put(bytes(pcm_bytes)) - def finish(self) -> str: - self._close_stream() - self._thread.join(timeout=30.0) - if self._thread.is_alive(): - raise TimeoutError("timed out waiting for streaming speech recognition to finish") + async def finish(self) -> str: + await self._close_stream() + await self._task if self._error is not None: raise self._error transcript = "".join(self._final_transcripts) return transcript or self._latest_transcript - def abort(self) -> None: - self._close_stream() - self._done.wait(timeout=1.0) + async def abort(self) -> None: + await self._close_stream() + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass - def _close_stream(self) -> None: + async def _close_stream(self) -> None: if self._closed: return self._closed = True - self._audio_queue.put(_STREAM_END) + await self._audio_queue.put(_STREAM_END) - def _request_iter(self): + async def _request_iter(self): + yield speech.StreamingRecognizeRequest(streaming_config=self._config) while True: - chunk = self._audio_queue.get() + chunk = await self._audio_queue.get() if chunk is _STREAM_END: - return + break yield speech.StreamingRecognizeRequest(audio_content=chunk) - def _run(self) -> None: + async def _run(self) -> None: try: - responses = self._client.streaming_recognize(self._config, self._request_iter()) - for response in responses: + responses = await self._client.streaming_recognize(requests=self._request_iter()) + async for response in responses: for result in response.results: if not result.alternatives: continue @@ -96,6 +97,8 @@ def _run(self) -> None: else: logger.info("Streaming transcript(interim): %s", transcript) self._latest_transcript = transcript + except asyncio.CancelledError: + raise except Exception as exc: self._error = exc finally: @@ -103,10 +106,10 @@ def _run(self) -> None: class GoogleCloudSpeechToText(StreamingSpeechRecognizer): - def __init__(self, client: speech.SpeechClient | None = None) -> None: - self._client = client or speech.SpeechClient() + def __init__(self, client: speech.SpeechAsyncClient | None = None) -> None: + self._client = client or speech.SpeechAsyncClient() - def transcribe( + async def transcribe( self, pcm_bytes: bytes, *, @@ -128,11 +131,11 @@ def transcribe( sample_rate_hertz=sample_rate_hz, language_code=language_code, ) - response = self._client.recognize(config=config, audio=audio) + response = await self._client.recognize(config=config, audio=audio) return "".join(result.alternatives[0].transcript for result in response.results) - def start_stream( + async def start_stream( self, *, sample_rate_hz: int, diff --git a/stackchan_server/types.py b/stackchan_server/types.py index 4e57bb2..041f7d1 100644 --- a/stackchan_server/types.py +++ b/stackchan_server/types.py @@ -5,7 +5,7 @@ @runtime_checkable class SpeechRecognizer(Protocol): - def transcribe( + async def transcribe( self, pcm_bytes: bytes, *, @@ -18,16 +18,16 @@ def transcribe( @runtime_checkable class StreamingSpeechSession(Protocol): - def push_audio(self, pcm_bytes: bytes) -> None: ... + async def push_audio(self, pcm_bytes: bytes) -> None: ... - def finish(self) -> str: ... + async def finish(self) -> str: ... - def abort(self) -> None: ... + async def abort(self) -> None: ... @runtime_checkable class StreamingSpeechRecognizer(SpeechRecognizer, Protocol): - def start_stream( + async def start_stream( self, *, sample_rate_hz: int, diff --git a/stackchan_server/ws_proxy.py b/stackchan_server/ws_proxy.py index 7727b5c..b84646f 100644 --- a/stackchan_server/ws_proxy.py +++ b/stackchan_server/ws_proxy.py @@ -186,7 +186,7 @@ async def close(self) -> None: self._receiving_task.cancel() with suppress(asyncio.CancelledError): await self._receiving_task - self._abort_speech_stream() + await self._abort_speech_stream() async def start_talking(self, text: str) -> None: await self.speak(text) @@ -240,12 +240,12 @@ async def _receive_loop(self) -> None: if kind == _WsKind.PCM: if msg_type == _WsMsgType.START: - if not self._handle_listening_start(): + if not await self._handle_listening_start(): break continue if msg_type == _WsMsgType.DATA: - if not self._handle_listening_data(payload_bytes, payload): + if not await self._handle_listening_data(payload_bytes, payload): break continue @@ -276,15 +276,15 @@ async def _receive_loop(self) -> None: self._closed = True self._speaking = False - def _handle_listening_start(self) -> bool: + async def _handle_listening_start(self) -> bool: logger.info("Received START") - self._abort_speech_stream() + await self._abort_speech_stream() self._pcm_buffer = bytearray() self._streaming = True self._message_error = None if isinstance(self.speech_recognizer, StreamingSpeechRecognizer): try: - self._speech_stream = self.speech_recognizer.start_stream( + self._speech_stream = await self.speech_recognizer.start_stream( sample_rate_hz=_SAMPLE_RATE_HZ, channels=_CHANNELS, sample_width=_SAMPLE_WIDTH, @@ -295,22 +295,22 @@ def _handle_listening_start(self) -> bool: return False return True - def _handle_listening_data(self, payload_bytes: int, payload: bytes) -> bool: + async def _handle_listening_data(self, payload_bytes: int, payload: bytes) -> bool: logger.info("Received DATA payload_bytes=%d", payload_bytes) if not self._streaming: - self._abort_speech_stream() + await self._abort_speech_stream() asyncio.create_task(self.ws.close(code=1003, reason="data received before start")) return False if payload_bytes % (_SAMPLE_WIDTH * _CHANNELS) != 0: - self._abort_speech_stream() + await self._abort_speech_stream() asyncio.create_task(self.ws.close(code=1003, reason="invalid pcm chunk length")) return False self._pcm_buffer.extend(payload) if payload_bytes > 0: try: - self._push_speech_stream(payload) + await self._push_speech_stream(payload) except Exception: - self._abort_speech_stream() + await self._abort_speech_stream() asyncio.create_task(self.ws.close(code=1011, reason="speech streaming failed")) return False self._pcm_data_counter += 1 @@ -319,24 +319,24 @@ def _handle_listening_data(self, payload_bytes: int, payload: bytes) -> bool: async def _handle_listening_end(self, payload_bytes: int, payload: bytes) -> None: logger.info("Received END payload_bytes=%d", payload_bytes) if not self._streaming: - self._abort_speech_stream() + await self._abort_speech_stream() await self.ws.close(code=1003, reason="end received before start") return if payload_bytes % (_SAMPLE_WIDTH * _CHANNELS) != 0: - self._abort_speech_stream() + await self._abort_speech_stream() await self.ws.close(code=1003, reason="invalid pcm tail length") return self._pcm_buffer.extend(payload) if payload_bytes > 0: try: - self._push_speech_stream(payload) + await self._push_speech_stream(payload) except Exception: - self._abort_speech_stream() + await self._abort_speech_stream() await self.ws.close(code=1011, reason="speech streaming failed") return if len(self._pcm_buffer) == 0 or len(self._pcm_buffer) % (_SAMPLE_WIDTH * _CHANNELS) != 0: - self._abort_speech_stream() + await self._abort_speech_stream() await self.ws.close(code=1003, reason="invalid accumulated pcm length") return @@ -431,12 +431,11 @@ def _save_wav(self, pcm_bytes: bytes) -> tuple[Path, str]: async def _transcribe_async(self, pcm_bytes: bytes) -> str: if self._speech_stream is not None: - return await asyncio.to_thread(self._finish_speech_stream) - loop = asyncio.get_running_loop() - return await loop.run_in_executor(None, lambda: self._transcribe(pcm_bytes)) + return await self._finish_speech_stream() + return await self._transcribe(pcm_bytes) - def _transcribe(self, pcm_bytes: bytes) -> str: - transcript = self.speech_recognizer.transcribe( + async def _transcribe(self, pcm_bytes: bytes) -> str: + transcript = await self.speech_recognizer.transcribe( pcm_bytes, sample_rate_hz=_SAMPLE_RATE_HZ, channels=_CHANNELS, @@ -447,25 +446,25 @@ def _transcribe(self, pcm_bytes: bytes) -> str: logger.info("Transcript: %s", transcript) return transcript - def _push_speech_stream(self, pcm_bytes: bytes) -> None: + async def _push_speech_stream(self, pcm_bytes: bytes) -> None: if self._speech_stream is not None: - self._speech_stream.push_audio(pcm_bytes) + await self._speech_stream.push_audio(pcm_bytes) - def _finish_speech_stream(self) -> str: + async def _finish_speech_stream(self) -> str: speech_stream = self._speech_stream self._speech_stream = None if speech_stream is None: return "" - transcript = speech_stream.finish() + transcript = await speech_stream.finish() if transcript: logger.info("Transcript: %s", transcript) return transcript - def _abort_speech_stream(self) -> None: + async def _abort_speech_stream(self) -> None: speech_stream = self._speech_stream self._speech_stream = None if speech_stream is not None: - speech_stream.abort() + await speech_stream.abort() def _extract_pcm(self, wav_bytes: bytes) -> tuple[bytes, int, int, int]: with wave.open(io.BytesIO(wav_bytes), "rb") as wf: From c644f18685442a0d357cb71fc021d9f816fe0b17 Mon Sep 17 00:00:00 2001 From: Atsushi Morimoto <74th.tech@gmail.com> Date: Mon, 9 Mar 2026 20:19:46 +0900 Subject: [PATCH 5/5] feat: implement ListenHandler for improved audio streaming and recognition --- stackchan_server/listen.py | 246 +++++++++++++++++++++++++++++++++++ stackchan_server/ws_proxy.py | 221 ++++--------------------------- 2 files changed, 273 insertions(+), 194 deletions(-) create mode 100644 stackchan_server/listen.py diff --git a/stackchan_server/listen.py b/stackchan_server/listen.py new file mode 100644 index 0000000..c7e5b84 --- /dev/null +++ b/stackchan_server/listen.py @@ -0,0 +1,246 @@ +from __future__ import annotations + +import asyncio +import wave +from datetime import UTC, datetime +from logging import getLogger +from pathlib import Path +from typing import Awaitable, Callable, Optional + +from fastapi import WebSocket, WebSocketDisconnect + +from .types import SpeechRecognizer, StreamingSpeechRecognizer, StreamingSpeechSession + +logger = getLogger(__name__) + + +class TimeoutError(Exception): + pass + + +class EmptyTranscriptError(Exception): + pass + + +class ListenHandler: + def __init__( + self, + *, + speech_recognizer: SpeechRecognizer, + recordings_dir: Path, + debug_recording: bool, + sample_rate_hz: int, + channels: int, + sample_width: int, + listen_audio_timeout_seconds: float, + language_code: str = "ja-JP", + ) -> None: + self.speech_recognizer = speech_recognizer + self.recordings_dir = recordings_dir + self.debug_recording = debug_recording + self.sample_rate_hz = sample_rate_hz + self.channels = channels + self.sample_width = sample_width + self.listen_audio_timeout_seconds = listen_audio_timeout_seconds + self.language_code = language_code + + self._pcm_buffer = bytearray() + self._streaming = False + self._pcm_data_counter = 0 + self._message_ready = asyncio.Event() + self._message_error: Optional[Exception] = None + self._transcript: Optional[str] = None + self._speech_stream: Optional[StreamingSpeechSession] = None + + async def close(self) -> None: + await self._abort_speech_stream() + + async def listen( + self, + *, + send_state_command: Callable[[int], Awaitable[None]], + is_closed: Callable[[], bool], + idle_state: int, + listening_state: int, + ) -> str: + await send_state_command(listening_state) + loop = asyncio.get_running_loop() + last_counter = self._pcm_data_counter + last_data_time = loop.time() + while True: + if self._message_error is not None: + err = self._message_error + self._message_error = None + raise err + if self._message_ready.is_set(): + text = self._transcript or "" + self._transcript = None + self._message_ready.clear() + return text + if is_closed(): + raise WebSocketDisconnect() + if self._pcm_data_counter != last_counter: + last_counter = self._pcm_data_counter + last_data_time = loop.time() + if (loop.time() - last_data_time) >= self.listen_audio_timeout_seconds: + if not is_closed(): + await send_state_command(idle_state) + raise TimeoutError("Timed out after audio data inactivity from firmware") + await asyncio.sleep(0.05) + + async def handle_start(self, websocket: WebSocket) -> bool: + logger.info("Received START") + await self._abort_speech_stream() + self._pcm_buffer = bytearray() + self._streaming = True + self._message_error = None + if isinstance(self.speech_recognizer, StreamingSpeechRecognizer): + try: + self._speech_stream = await self.speech_recognizer.start_stream( + sample_rate_hz=self.sample_rate_hz, + channels=self.channels, + sample_width=self.sample_width, + language_code=self.language_code, + ) + except Exception: + asyncio.create_task(websocket.close(code=1011, reason="speech streaming failed")) + return False + return True + + async def handle_data(self, websocket: WebSocket, payload_bytes: int, payload: bytes) -> bool: + logger.info("Received DATA payload_bytes=%d", payload_bytes) + if not self._streaming: + await self._abort_speech_stream() + asyncio.create_task(websocket.close(code=1003, reason="data received before start")) + return False + if payload_bytes % (self.sample_width * self.channels) != 0: + await self._abort_speech_stream() + asyncio.create_task(websocket.close(code=1003, reason="invalid pcm chunk length")) + return False + self._pcm_buffer.extend(payload) + if payload_bytes > 0: + try: + await self._push_speech_stream(payload) + except Exception: + await self._abort_speech_stream() + asyncio.create_task(websocket.close(code=1011, reason="speech streaming failed")) + return False + self._pcm_data_counter += 1 + return True + + async def handle_end( + self, + websocket: WebSocket, + *, + payload_bytes: int, + payload: bytes, + send_state_command: Callable[[int], Awaitable[None]], + thinking_state: int, + ) -> None: + logger.info("Received END payload_bytes=%d", payload_bytes) + if not self._streaming: + await self._abort_speech_stream() + await websocket.close(code=1003, reason="end received before start") + return + if payload_bytes % (self.sample_width * self.channels) != 0: + await self._abort_speech_stream() + await websocket.close(code=1003, reason="invalid pcm tail length") + return + self._pcm_buffer.extend(payload) + if payload_bytes > 0: + try: + await self._push_speech_stream(payload) + except Exception: + await self._abort_speech_stream() + await websocket.close(code=1011, reason="speech streaming failed") + return + + if len(self._pcm_buffer) == 0 or len(self._pcm_buffer) % (self.sample_width * self.channels) != 0: + await self._abort_speech_stream() + await websocket.close(code=1003, reason="invalid accumulated pcm length") + return + + await send_state_command(thinking_state) + + frames = len(self._pcm_buffer) // (self.sample_width * self.channels) + duration_seconds = frames / float(self.sample_rate_hz) + ws_meta = { + "sample_rate": self.sample_rate_hz, + "frames": frames, + "channels": self.channels, + "duration_seconds": round(duration_seconds, 3), + } + if self.debug_recording: + _filepath, filename = self._save_wav(bytes(self._pcm_buffer)) + ws_meta["text"] = f"Saved as {filename}" + ws_meta["path"] = f"recordings/{filename}" + else: + ws_meta["text"] = "Recording skipped (DEBUG_RECODING!=1)" + + await websocket.send_json(ws_meta) + + transcript = await self._transcribe_async(bytes(self._pcm_buffer)) + + self._streaming = False + self._pcm_buffer = bytearray() + + if transcript.strip() == "": + self._message_error = EmptyTranscriptError("Speech recognition result is empty") + return + + self._transcript = transcript + self._message_ready.set() + + def _save_wav(self, pcm_bytes: bytes) -> tuple[Path, str]: + timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S_%f") + filename = f"rec_ws_{timestamp}.wav" + filepath = self.recordings_dir / filename + + with wave.open(str(filepath), "wb") as wav_fp: + wav_fp.setnchannels(self.channels) + wav_fp.setsampwidth(self.sample_width) + wav_fp.setframerate(self.sample_rate_hz) + wav_fp.writeframes(pcm_bytes) + + logger.info("Saved WAV: %s", filename) + return filepath, filename + + async def _transcribe_async(self, pcm_bytes: bytes) -> str: + if self._speech_stream is not None: + return await self._finish_speech_stream() + return await self._transcribe(pcm_bytes) + + async def _transcribe(self, pcm_bytes: bytes) -> str: + transcript = await self.speech_recognizer.transcribe( + pcm_bytes, + sample_rate_hz=self.sample_rate_hz, + channels=self.channels, + sample_width=self.sample_width, + language_code=self.language_code, + ) + if transcript: + logger.info("Transcript: %s", transcript) + return transcript + + async def _push_speech_stream(self, pcm_bytes: bytes) -> None: + if self._speech_stream is not None: + await self._speech_stream.push_audio(pcm_bytes) + + async def _finish_speech_stream(self) -> str: + speech_stream = self._speech_stream + self._speech_stream = None + if speech_stream is None: + return "" + transcript = await speech_stream.finish() + if transcript: + logger.info("Transcript: %s", transcript) + return transcript + + async def _abort_speech_stream(self) -> None: + speech_stream = self._speech_stream + self._speech_stream = None + if speech_stream is not None: + await speech_stream.abort() + + +__all__ = ["ListenHandler", "TimeoutError", "EmptyTranscriptError"] diff --git a/stackchan_server/ws_proxy.py b/stackchan_server/ws_proxy.py index b84646f..596284b 100644 --- a/stackchan_server/ws_proxy.py +++ b/stackchan_server/ws_proxy.py @@ -6,7 +6,6 @@ import struct import wave from contextlib import suppress -from datetime import UTC, datetime from enum import IntEnum from logging import getLogger from pathlib import Path @@ -15,7 +14,8 @@ from fastapi import WebSocket, WebSocketDisconnect from vvclient import Client as VVClient -from .types import SpeechRecognizer, StreamingSpeechRecognizer, StreamingSpeechSession +from .listen import EmptyTranscriptError, ListenHandler, TimeoutError +from .types import SpeechRecognizer logger = getLogger(__name__) @@ -36,14 +36,6 @@ _DEBUG_RECORDING_ENABLED = os.getenv("DEBUG_RECODING") == "1" -class TimeoutError(Exception): - pass - - -class EmptyTranscriptError(Exception): - pass - - class FirmwareState(IntEnum): IDLE = 0 LISTENING = 1 @@ -80,15 +72,16 @@ def __init__(self, websocket: WebSocket, speech_recognizer: SpeechRecognizer): if self._debug_recording: _RECORDINGS_DIR.mkdir(parents=True, exist_ok=True) self.recordings_dir.mkdir(parents=True, exist_ok=True) - - self._pcm_buffer = bytearray() - self._streaming = False - self._pcm_data_counter = 0 - self._message_ready = asyncio.Event() - self._message_error: Optional[Exception] = None - self._transcript: Optional[str] = None self._wakeword_event = asyncio.Event() - self._speech_stream: Optional[StreamingSpeechSession] = None + self._listener = ListenHandler( + speech_recognizer=self.speech_recognizer, + recordings_dir=self.recordings_dir, + debug_recording=self._debug_recording, + sample_rate_hz=_SAMPLE_RATE_HZ, + channels=_CHANNELS, + sample_width=_SAMPLE_WIDTH, + listen_audio_timeout_seconds=_LISTEN_AUDIO_TIMEOUT_SECONDS, + ) self._receiving_task: Optional[asyncio.Task] = None self._closed = False @@ -116,30 +109,12 @@ async def wait_for_talk_session(self) -> None: await asyncio.sleep(0.05) async def listen(self) -> str: - await self.send_state_command(FirmwareState.LISTENING) - loop = asyncio.get_running_loop() - last_counter = self._pcm_data_counter - last_data_time = loop.time() - while True: - if self._message_error is not None: - err = self._message_error - self._message_error = None - raise err - if self._message_ready.is_set(): - text = self._transcript or "" - self._transcript = None - self._message_ready.clear() - return text - if self._closed: - raise WebSocketDisconnect() - if self._pcm_data_counter != last_counter: - last_counter = self._pcm_data_counter - last_data_time = loop.time() - if (loop.time() - last_data_time) >= _LISTEN_AUDIO_TIMEOUT_SECONDS: - if not self._closed: - await self.send_state_command(FirmwareState.IDLE) - raise TimeoutError("Timed out after audio data inactivity from firmware") - await asyncio.sleep(0.05) + return await self._listener.listen( + send_state_command=self.send_state_command, + is_closed=lambda: self._closed, + idle_state=FirmwareState.IDLE, + listening_state=FirmwareState.LISTENING, + ) async def speak(self, text: str) -> None: start_counter = self._speak_finished_counter @@ -186,7 +161,7 @@ async def close(self) -> None: self._receiving_task.cancel() with suppress(asyncio.CancelledError): await self._receiving_task - await self._abort_speech_stream() + await self._listener.close() async def start_talking(self, text: str) -> None: await self.speak(text) @@ -240,17 +215,23 @@ async def _receive_loop(self) -> None: if kind == _WsKind.PCM: if msg_type == _WsMsgType.START: - if not await self._handle_listening_start(): + if not await self._listener.handle_start(self.ws): break continue if msg_type == _WsMsgType.DATA: - if not await self._handle_listening_data(payload_bytes, payload): + if not await self._listener.handle_data(self.ws, payload_bytes, payload): break continue if msg_type == _WsMsgType.END: - await self._handle_listening_end(payload_bytes, payload) + await self._listener.handle_end( + self.ws, + payload_bytes=payload_bytes, + payload=payload, + send_state_command=self.send_state_command, + thinking_state=FirmwareState.THINKING, + ) continue await self.ws.close(code=1003, reason="unknown PCM msg type") @@ -276,103 +257,6 @@ async def _receive_loop(self) -> None: self._closed = True self._speaking = False - async def _handle_listening_start(self) -> bool: - logger.info("Received START") - await self._abort_speech_stream() - self._pcm_buffer = bytearray() - self._streaming = True - self._message_error = None - if isinstance(self.speech_recognizer, StreamingSpeechRecognizer): - try: - self._speech_stream = await self.speech_recognizer.start_stream( - sample_rate_hz=_SAMPLE_RATE_HZ, - channels=_CHANNELS, - sample_width=_SAMPLE_WIDTH, - language_code="ja-JP", - ) - except Exception: - asyncio.create_task(self.ws.close(code=1011, reason="speech streaming failed")) - return False - return True - - async def _handle_listening_data(self, payload_bytes: int, payload: bytes) -> bool: - logger.info("Received DATA payload_bytes=%d", payload_bytes) - if not self._streaming: - await self._abort_speech_stream() - asyncio.create_task(self.ws.close(code=1003, reason="data received before start")) - return False - if payload_bytes % (_SAMPLE_WIDTH * _CHANNELS) != 0: - await self._abort_speech_stream() - asyncio.create_task(self.ws.close(code=1003, reason="invalid pcm chunk length")) - return False - self._pcm_buffer.extend(payload) - if payload_bytes > 0: - try: - await self._push_speech_stream(payload) - except Exception: - await self._abort_speech_stream() - asyncio.create_task(self.ws.close(code=1011, reason="speech streaming failed")) - return False - self._pcm_data_counter += 1 - return True - - async def _handle_listening_end(self, payload_bytes: int, payload: bytes) -> None: - logger.info("Received END payload_bytes=%d", payload_bytes) - if not self._streaming: - await self._abort_speech_stream() - await self.ws.close(code=1003, reason="end received before start") - return - if payload_bytes % (_SAMPLE_WIDTH * _CHANNELS) != 0: - await self._abort_speech_stream() - await self.ws.close(code=1003, reason="invalid pcm tail length") - return - self._pcm_buffer.extend(payload) - if payload_bytes > 0: - try: - await self._push_speech_stream(payload) - except Exception: - await self._abort_speech_stream() - await self.ws.close(code=1011, reason="speech streaming failed") - return - - if len(self._pcm_buffer) == 0 or len(self._pcm_buffer) % (_SAMPLE_WIDTH * _CHANNELS) != 0: - await self._abort_speech_stream() - await self.ws.close(code=1003, reason="invalid accumulated pcm length") - return - - # Uplink audio has been fully received: tell firmware to enter Thinking state. - await self._send_state_command(FirmwareState.THINKING) - - frames = len(self._pcm_buffer) // (_SAMPLE_WIDTH * _CHANNELS) - duration_seconds = frames / float(_SAMPLE_RATE_HZ) - - ws_meta = { - "sample_rate": _SAMPLE_RATE_HZ, - "frames": frames, - "channels": _CHANNELS, - "duration_seconds": round(duration_seconds, 3), - } - if self._debug_recording: - _filepath, filename = self._save_wav(bytes(self._pcm_buffer)) - ws_meta["text"] = f"Saved as {filename}" - ws_meta["path"] = f"recordings/{filename}" - else: - ws_meta["text"] = "Recording skipped (DEBUG_RECODING!=1)" - - await self.ws.send_json(ws_meta) - - transcript = await self._transcribe_async(bytes(self._pcm_buffer)) - - self._streaming = False - self._pcm_buffer = bytearray() - - if transcript.strip() == "": - self._message_error = EmptyTranscriptError("Speech recognition result is empty") - return - - self._transcript = transcript - self._message_ready.set() - def _handle_wakeword_event(self, msg_type: int, payload: bytes) -> None: if msg_type != _WsMsgType.DATA: return @@ -415,57 +299,6 @@ async def _send_state_command(self, state_id: int | FirmwareState) -> None: await self.ws.send_bytes(hdr + payload) self._down_seq += 1 - def _save_wav(self, pcm_bytes: bytes) -> tuple[Path, str]: - timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S_%f") - filename = f"rec_ws_{timestamp}.wav" - filepath = self.recordings_dir / filename - - with wave.open(str(filepath), "wb") as wav_fp: - wav_fp.setnchannels(_CHANNELS) - wav_fp.setsampwidth(_SAMPLE_WIDTH) - wav_fp.setframerate(_SAMPLE_RATE_HZ) - wav_fp.writeframes(pcm_bytes) - - logger.info("Saved WAV: %s", filename) - return filepath, filename - - async def _transcribe_async(self, pcm_bytes: bytes) -> str: - if self._speech_stream is not None: - return await self._finish_speech_stream() - return await self._transcribe(pcm_bytes) - - async def _transcribe(self, pcm_bytes: bytes) -> str: - transcript = await self.speech_recognizer.transcribe( - pcm_bytes, - sample_rate_hz=_SAMPLE_RATE_HZ, - channels=_CHANNELS, - sample_width=_SAMPLE_WIDTH, - language_code="ja-JP", - ) - if transcript: - logger.info("Transcript: %s", transcript) - return transcript - - async def _push_speech_stream(self, pcm_bytes: bytes) -> None: - if self._speech_stream is not None: - await self._speech_stream.push_audio(pcm_bytes) - - async def _finish_speech_stream(self) -> str: - speech_stream = self._speech_stream - self._speech_stream = None - if speech_stream is None: - return "" - transcript = await speech_stream.finish() - if transcript: - logger.info("Transcript: %s", transcript) - return transcript - - async def _abort_speech_stream(self) -> None: - speech_stream = self._speech_stream - self._speech_stream = None - if speech_stream is not None: - await speech_stream.abort() - def _extract_pcm(self, wav_bytes: bytes) -> tuple[bytes, int, int, int]: with wave.open(io.BytesIO(wav_bytes), "rb") as wf: pcm_bytes = wf.readframes(wf.getnframes())