diff --git a/example_apps/echo.py b/example_apps/echo.py index 2cf709f..5185c92 100644 --- a/example_apps/echo.py +++ b/example_apps/echo.py @@ -3,7 +3,7 @@ from logging import StreamHandler, getLogger from stackchan_server.app import StackChanApp -from stackchan_server.ws_proxy import WsProxy +from stackchan_server.ws_proxy import EmptyTranscriptError, WsProxy logger = getLogger(__name__) @@ -22,7 +22,10 @@ async def setup(proxy: WsProxy): @app.talk_session async def talk_session(proxy: WsProxy): while True: - text = await proxy.listen() + try: + text = await proxy.listen() + except EmptyTranscriptError: + return if not text: return logger.info("Heard: %s", text) diff --git a/stackchan_server/app.py b/stackchan_server/app.py index 2239fbb..c192556 100644 --- a/stackchan_server/app.py +++ b/stackchan_server/app.py @@ -5,16 +5,17 @@ from typing import Awaitable, Callable, Optional from fastapi import FastAPI, WebSocket, WebSocketDisconnect -from google.cloud import speech +from .speech_recognition import create_speech_recognizer +from .types import SpeechRecognizer from .ws_proxy import WsProxy logger = getLogger(__name__) class StackChanApp: - def __init__(self) -> None: - self.speech_client = speech.SpeechClient() + def __init__(self, speech_recognizer: SpeechRecognizer | None = None) -> None: + self.speech_recognizer = speech_recognizer or create_speech_recognizer() self.fastapi = FastAPI(title="StackChan WebSocket Server") self._setup_fn: Optional[Callable[[WsProxy], Awaitable[None]]] = None self._talk_session_fn: Optional[Callable[[WsProxy], Awaitable[None]]] = None @@ -37,7 +38,7 @@ def talk_session(self, fn: Callable[["WsProxy"], Awaitable[None]]): async def _handle_ws(self, websocket: WebSocket) -> None: await websocket.accept() - proxy = WsProxy(websocket, speech_client=self.speech_client) + proxy = WsProxy(websocket, speech_recognizer=self.speech_recognizer) await proxy.start() try: if self._setup_fn: diff --git a/stackchan_server/listen.py b/stackchan_server/listen.py new file mode 100644 index 0000000..c7e5b84 --- /dev/null +++ b/stackchan_server/listen.py @@ -0,0 +1,246 @@ +from __future__ import annotations + +import asyncio +import wave +from datetime import UTC, datetime +from logging import getLogger +from pathlib import Path +from typing import Awaitable, Callable, Optional + +from fastapi import WebSocket, WebSocketDisconnect + +from .types import SpeechRecognizer, StreamingSpeechRecognizer, StreamingSpeechSession + +logger = getLogger(__name__) + + +class TimeoutError(Exception): + pass + + +class EmptyTranscriptError(Exception): + pass + + +class ListenHandler: + def __init__( + self, + *, + speech_recognizer: SpeechRecognizer, + recordings_dir: Path, + debug_recording: bool, + sample_rate_hz: int, + channels: int, + sample_width: int, + listen_audio_timeout_seconds: float, + language_code: str = "ja-JP", + ) -> None: + self.speech_recognizer = speech_recognizer + self.recordings_dir = recordings_dir + self.debug_recording = debug_recording + self.sample_rate_hz = sample_rate_hz + self.channels = channels + self.sample_width = sample_width + self.listen_audio_timeout_seconds = listen_audio_timeout_seconds + self.language_code = language_code + + self._pcm_buffer = bytearray() + self._streaming = False + self._pcm_data_counter = 0 + self._message_ready = asyncio.Event() + self._message_error: Optional[Exception] = None + self._transcript: Optional[str] = None + self._speech_stream: Optional[StreamingSpeechSession] = None + + async def close(self) -> None: + await self._abort_speech_stream() + + async def listen( + self, + *, + send_state_command: Callable[[int], Awaitable[None]], + is_closed: Callable[[], bool], + idle_state: int, + listening_state: int, + ) -> str: + await send_state_command(listening_state) + loop = asyncio.get_running_loop() + last_counter = self._pcm_data_counter + last_data_time = loop.time() + while True: + if self._message_error is not None: + err = self._message_error + self._message_error = None + raise err + if self._message_ready.is_set(): + text = self._transcript or "" + self._transcript = None + self._message_ready.clear() + return text + if is_closed(): + raise WebSocketDisconnect() + if self._pcm_data_counter != last_counter: + last_counter = self._pcm_data_counter + last_data_time = loop.time() + if (loop.time() - last_data_time) >= self.listen_audio_timeout_seconds: + if not is_closed(): + await send_state_command(idle_state) + raise TimeoutError("Timed out after audio data inactivity from firmware") + await asyncio.sleep(0.05) + + async def handle_start(self, websocket: WebSocket) -> bool: + logger.info("Received START") + await self._abort_speech_stream() + self._pcm_buffer = bytearray() + self._streaming = True + self._message_error = None + if isinstance(self.speech_recognizer, StreamingSpeechRecognizer): + try: + self._speech_stream = await self.speech_recognizer.start_stream( + sample_rate_hz=self.sample_rate_hz, + channels=self.channels, + sample_width=self.sample_width, + language_code=self.language_code, + ) + except Exception: + asyncio.create_task(websocket.close(code=1011, reason="speech streaming failed")) + return False + return True + + async def handle_data(self, websocket: WebSocket, payload_bytes: int, payload: bytes) -> bool: + logger.info("Received DATA payload_bytes=%d", payload_bytes) + if not self._streaming: + await self._abort_speech_stream() + asyncio.create_task(websocket.close(code=1003, reason="data received before start")) + return False + if payload_bytes % (self.sample_width * self.channels) != 0: + await self._abort_speech_stream() + asyncio.create_task(websocket.close(code=1003, reason="invalid pcm chunk length")) + return False + self._pcm_buffer.extend(payload) + if payload_bytes > 0: + try: + await self._push_speech_stream(payload) + except Exception: + await self._abort_speech_stream() + asyncio.create_task(websocket.close(code=1011, reason="speech streaming failed")) + return False + self._pcm_data_counter += 1 + return True + + async def handle_end( + self, + websocket: WebSocket, + *, + payload_bytes: int, + payload: bytes, + send_state_command: Callable[[int], Awaitable[None]], + thinking_state: int, + ) -> None: + logger.info("Received END payload_bytes=%d", payload_bytes) + if not self._streaming: + await self._abort_speech_stream() + await websocket.close(code=1003, reason="end received before start") + return + if payload_bytes % (self.sample_width * self.channels) != 0: + await self._abort_speech_stream() + await websocket.close(code=1003, reason="invalid pcm tail length") + return + self._pcm_buffer.extend(payload) + if payload_bytes > 0: + try: + await self._push_speech_stream(payload) + except Exception: + await self._abort_speech_stream() + await websocket.close(code=1011, reason="speech streaming failed") + return + + if len(self._pcm_buffer) == 0 or len(self._pcm_buffer) % (self.sample_width * self.channels) != 0: + await self._abort_speech_stream() + await websocket.close(code=1003, reason="invalid accumulated pcm length") + return + + await send_state_command(thinking_state) + + frames = len(self._pcm_buffer) // (self.sample_width * self.channels) + duration_seconds = frames / float(self.sample_rate_hz) + ws_meta = { + "sample_rate": self.sample_rate_hz, + "frames": frames, + "channels": self.channels, + "duration_seconds": round(duration_seconds, 3), + } + if self.debug_recording: + _filepath, filename = self._save_wav(bytes(self._pcm_buffer)) + ws_meta["text"] = f"Saved as {filename}" + ws_meta["path"] = f"recordings/{filename}" + else: + ws_meta["text"] = "Recording skipped (DEBUG_RECODING!=1)" + + await websocket.send_json(ws_meta) + + transcript = await self._transcribe_async(bytes(self._pcm_buffer)) + + self._streaming = False + self._pcm_buffer = bytearray() + + if transcript.strip() == "": + self._message_error = EmptyTranscriptError("Speech recognition result is empty") + return + + self._transcript = transcript + self._message_ready.set() + + def _save_wav(self, pcm_bytes: bytes) -> tuple[Path, str]: + timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S_%f") + filename = f"rec_ws_{timestamp}.wav" + filepath = self.recordings_dir / filename + + with wave.open(str(filepath), "wb") as wav_fp: + wav_fp.setnchannels(self.channels) + wav_fp.setsampwidth(self.sample_width) + wav_fp.setframerate(self.sample_rate_hz) + wav_fp.writeframes(pcm_bytes) + + logger.info("Saved WAV: %s", filename) + return filepath, filename + + async def _transcribe_async(self, pcm_bytes: bytes) -> str: + if self._speech_stream is not None: + return await self._finish_speech_stream() + return await self._transcribe(pcm_bytes) + + async def _transcribe(self, pcm_bytes: bytes) -> str: + transcript = await self.speech_recognizer.transcribe( + pcm_bytes, + sample_rate_hz=self.sample_rate_hz, + channels=self.channels, + sample_width=self.sample_width, + language_code=self.language_code, + ) + if transcript: + logger.info("Transcript: %s", transcript) + return transcript + + async def _push_speech_stream(self, pcm_bytes: bytes) -> None: + if self._speech_stream is not None: + await self._speech_stream.push_audio(pcm_bytes) + + async def _finish_speech_stream(self) -> str: + speech_stream = self._speech_stream + self._speech_stream = None + if speech_stream is None: + return "" + transcript = await speech_stream.finish() + if transcript: + logger.info("Transcript: %s", transcript) + return transcript + + async def _abort_speech_stream(self) -> None: + speech_stream = self._speech_stream + self._speech_stream = None + if speech_stream is not None: + await speech_stream.abort() + + +__all__ = ["ListenHandler", "TimeoutError", "EmptyTranscriptError"] diff --git a/stackchan_server/speech_recognition/__init__.py b/stackchan_server/speech_recognition/__init__.py new file mode 100644 index 0000000..660cf6d --- /dev/null +++ b/stackchan_server/speech_recognition/__init__.py @@ -0,0 +1,11 @@ +from __future__ import annotations + +from ..types import SpeechRecognizer +from .google_cloud import GoogleCloudSpeechToText + + +def create_speech_recognizer() -> SpeechRecognizer: + return GoogleCloudSpeechToText() + + +__all__ = ["GoogleCloudSpeechToText", "create_speech_recognizer"] diff --git a/stackchan_server/speech_recognition/google_cloud.py b/stackchan_server/speech_recognition/google_cloud.py new file mode 100644 index 0000000..3780ea6 --- /dev/null +++ b/stackchan_server/speech_recognition/google_cloud.py @@ -0,0 +1,155 @@ +from __future__ import annotations + +import asyncio +from logging import getLogger + +from google.cloud import speech + +from ..types import StreamingSpeechRecognizer, StreamingSpeechSession + +logger = getLogger(__name__) +_STREAM_END = object() + + +class _GoogleCloudStreamingSession(StreamingSpeechSession): + def __init__( + self, + client: speech.SpeechAsyncClient, + *, + sample_rate_hz: int, + channels: int, + sample_width: int, + language_code: str, + ) -> None: + if channels != 1: + raise ValueError(f"Google Cloud Speech only supports mono input here: channels={channels}") + if sample_width != 2: + raise ValueError( + f"Google Cloud Speech LINEAR16 requires 16-bit samples here: sample_width={sample_width}" + ) + + self._client = client + self._config = speech.StreamingRecognitionConfig( + config=speech.RecognitionConfig( + encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16, + sample_rate_hertz=sample_rate_hz, + language_code=language_code, + ), + interim_results=False, + single_utterance=False, + ) + self._audio_queue: asyncio.Queue[bytes | object] = asyncio.Queue() + self._done = asyncio.Event() + self._closed = False + self._error: Exception | None = None + self._final_transcripts: list[str] = [] + self._latest_transcript = "" + self._task = asyncio.create_task(self._run()) + + async def push_audio(self, pcm_bytes: bytes) -> None: + if self._closed: + raise RuntimeError("streaming speech session is already closed") + if pcm_bytes: + await self._audio_queue.put(bytes(pcm_bytes)) + + async def finish(self) -> str: + await self._close_stream() + await self._task + if self._error is not None: + raise self._error + transcript = "".join(self._final_transcripts) + return transcript or self._latest_transcript + + async def abort(self) -> None: + await self._close_stream() + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + + async def _close_stream(self) -> None: + if self._closed: + return + self._closed = True + await self._audio_queue.put(_STREAM_END) + + async def _request_iter(self): + yield speech.StreamingRecognizeRequest(streaming_config=self._config) + while True: + chunk = await self._audio_queue.get() + if chunk is _STREAM_END: + break + yield speech.StreamingRecognizeRequest(audio_content=chunk) + + async def _run(self) -> None: + try: + responses = await self._client.streaming_recognize(requests=self._request_iter()) + async for response in responses: + for result in response.results: + if not result.alternatives: + continue + transcript = result.alternatives[0].transcript + if result.is_final: + logger.info("Streaming transcript(final): %s", transcript) + self._final_transcripts.append(transcript) + self._latest_transcript = "" + else: + logger.info("Streaming transcript(interim): %s", transcript) + self._latest_transcript = transcript + except asyncio.CancelledError: + raise + except Exception as exc: + self._error = exc + finally: + self._done.set() + + +class GoogleCloudSpeechToText(StreamingSpeechRecognizer): + def __init__(self, client: speech.SpeechAsyncClient | None = None) -> None: + self._client = client or speech.SpeechAsyncClient() + + async def transcribe( + self, + pcm_bytes: bytes, + *, + sample_rate_hz: int, + channels: int, + sample_width: int, + language_code: str = "ja-JP", + ) -> str: + if channels != 1: + raise ValueError(f"Google Cloud Speech only supports mono input here: channels={channels}") + if sample_width != 2: + raise ValueError( + f"Google Cloud Speech LINEAR16 requires 16-bit samples here: sample_width={sample_width}" + ) + + audio = speech.RecognitionAudio(content=pcm_bytes) + config = speech.RecognitionConfig( + encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16, + sample_rate_hertz=sample_rate_hz, + language_code=language_code, + ) + response = await self._client.recognize(config=config, audio=audio) + + return "".join(result.alternatives[0].transcript for result in response.results) + + async def start_stream( + self, + *, + sample_rate_hz: int, + channels: int, + sample_width: int, + language_code: str = "ja-JP", + ) -> StreamingSpeechSession: + return _GoogleCloudStreamingSession( + self._client, + sample_rate_hz=sample_rate_hz, + channels=channels, + sample_width=sample_width, + language_code=language_code, + ) + + +__all__ = ["GoogleCloudSpeechToText"] diff --git a/stackchan_server/types.py b/stackchan_server/types.py new file mode 100644 index 0000000..041f7d1 --- /dev/null +++ b/stackchan_server/types.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +from typing import Protocol, runtime_checkable + + +@runtime_checkable +class SpeechRecognizer(Protocol): + async def transcribe( + self, + pcm_bytes: bytes, + *, + sample_rate_hz: int, + channels: int, + sample_width: int, + language_code: str = "ja-JP", + ) -> str: ... + + +@runtime_checkable +class StreamingSpeechSession(Protocol): + async def push_audio(self, pcm_bytes: bytes) -> None: ... + + async def finish(self) -> str: ... + + async def abort(self) -> None: ... + + +@runtime_checkable +class StreamingSpeechRecognizer(SpeechRecognizer, Protocol): + async def start_stream( + self, + *, + sample_rate_hz: int, + channels: int, + sample_width: int, + language_code: str = "ja-JP", + ) -> StreamingSpeechSession: ... + + +__all__ = ["SpeechRecognizer", "StreamingSpeechRecognizer", "StreamingSpeechSession"] diff --git a/stackchan_server/ws_proxy.py b/stackchan_server/ws_proxy.py index 8bda2f0..596284b 100644 --- a/stackchan_server/ws_proxy.py +++ b/stackchan_server/ws_proxy.py @@ -6,16 +6,17 @@ import struct import wave from contextlib import suppress -from datetime import UTC, datetime from enum import IntEnum from logging import getLogger from pathlib import Path from typing import Optional from fastapi import WebSocket, WebSocketDisconnect -from google.cloud import speech from vvclient import Client as VVClient +from .listen import EmptyTranscriptError, ListenHandler, TimeoutError +from .types import SpeechRecognizer + logger = getLogger(__name__) _BASE_DIR = Path(__file__).resolve().parent @@ -35,14 +36,6 @@ _DEBUG_RECORDING_ENABLED = os.getenv("DEBUG_RECODING") == "1" -class TimeoutError(Exception): - pass - - -class EmptyTranscriptError(Exception): - pass - - class FirmwareState(IntEnum): IDLE = 0 LISTENING = 1 @@ -71,22 +64,24 @@ def create_voicevox_client() -> VVClient: class WsProxy: - def __init__(self, websocket: WebSocket, speech_client: speech.SpeechClient): + def __init__(self, websocket: WebSocket, speech_recognizer: SpeechRecognizer): self.ws = websocket - self.speech_client = speech_client + self.speech_recognizer = speech_recognizer self.recordings_dir = _RECORDINGS_DIR self._debug_recording = _DEBUG_RECORDING_ENABLED if self._debug_recording: _RECORDINGS_DIR.mkdir(parents=True, exist_ok=True) self.recordings_dir.mkdir(parents=True, exist_ok=True) - - self._pcm_buffer = bytearray() - self._streaming = False - self._pcm_data_counter = 0 - self._message_ready = asyncio.Event() - self._message_error: Optional[Exception] = None - self._transcript: Optional[str] = None self._wakeword_event = asyncio.Event() + self._listener = ListenHandler( + speech_recognizer=self.speech_recognizer, + recordings_dir=self.recordings_dir, + debug_recording=self._debug_recording, + sample_rate_hz=_SAMPLE_RATE_HZ, + channels=_CHANNELS, + sample_width=_SAMPLE_WIDTH, + listen_audio_timeout_seconds=_LISTEN_AUDIO_TIMEOUT_SECONDS, + ) self._receiving_task: Optional[asyncio.Task] = None self._closed = False @@ -114,30 +109,12 @@ async def wait_for_talk_session(self) -> None: await asyncio.sleep(0.05) async def listen(self) -> str: - await self.send_state_command(FirmwareState.LISTENING) - loop = asyncio.get_running_loop() - last_counter = self._pcm_data_counter - last_data_time = loop.time() - while True: - if self._message_error is not None: - err = self._message_error - self._message_error = None - raise err - if self._message_ready.is_set(): - text = self._transcript or "" - self._transcript = None - self._message_ready.clear() - return text - if self._closed: - raise WebSocketDisconnect() - if self._pcm_data_counter != last_counter: - last_counter = self._pcm_data_counter - last_data_time = loop.time() - if (loop.time() - last_data_time) >= _LISTEN_AUDIO_TIMEOUT_SECONDS: - if not self._closed: - await self.send_state_command(FirmwareState.IDLE) - raise TimeoutError("Timed out after audio data inactivity from firmware") - await asyncio.sleep(0.05) + return await self._listener.listen( + send_state_command=self.send_state_command, + is_closed=lambda: self._closed, + idle_state=FirmwareState.IDLE, + listening_state=FirmwareState.LISTENING, + ) async def speak(self, text: str) -> None: start_counter = self._speak_finished_counter @@ -184,6 +161,7 @@ async def close(self) -> None: self._receiving_task.cancel() with suppress(asyncio.CancelledError): await self._receiving_task + await self._listener.close() async def start_talking(self, text: str) -> None: await self.speak(text) @@ -237,16 +215,23 @@ async def _receive_loop(self) -> None: if kind == _WsKind.PCM: if msg_type == _WsMsgType.START: - self._handle_listening_start() + if not await self._listener.handle_start(self.ws): + break continue if msg_type == _WsMsgType.DATA: - if not self._handle_listening_data(payload_bytes, payload): + if not await self._listener.handle_data(self.ws, payload_bytes, payload): break continue if msg_type == _WsMsgType.END: - await self._handle_listening_end(payload_bytes, payload) + await self._listener.handle_end( + self.ws, + payload_bytes=payload_bytes, + payload=payload, + send_state_command=self.send_state_command, + thinking_state=FirmwareState.THINKING, + ) continue await self.ws.close(code=1003, reason="unknown PCM msg type") @@ -272,72 +257,6 @@ async def _receive_loop(self) -> None: self._closed = True self._speaking = False - def _handle_listening_start(self) -> None: - logger.info("Received START") - self._pcm_buffer = bytearray() - self._streaming = True - self._message_error = None - - def _handle_listening_data(self, payload_bytes: int, payload: bytes) -> bool: - logger.info("Received DATA payload_bytes=%d", payload_bytes) - if not self._streaming: - asyncio.create_task(self.ws.close(code=1003, reason="data received before start")) - return False - if payload_bytes % (_SAMPLE_WIDTH * _CHANNELS) != 0: - asyncio.create_task(self.ws.close(code=1003, reason="invalid pcm chunk length")) - return False - self._pcm_buffer.extend(payload) - if payload_bytes > 0: - self._pcm_data_counter += 1 - return True - - async def _handle_listening_end(self, payload_bytes: int, payload: bytes) -> None: - logger.info("Received END payload_bytes=%d", payload_bytes) - if not self._streaming: - await self.ws.close(code=1003, reason="end received before start") - return - if payload_bytes % (_SAMPLE_WIDTH * _CHANNELS) != 0: - await self.ws.close(code=1003, reason="invalid pcm tail length") - return - self._pcm_buffer.extend(payload) - - if len(self._pcm_buffer) == 0 or len(self._pcm_buffer) % (_SAMPLE_WIDTH * _CHANNELS) != 0: - await self.ws.close(code=1003, reason="invalid accumulated pcm length") - return - - # Uplink audio has been fully received: tell firmware to enter Thinking state. - await self._send_state_command(FirmwareState.THINKING) - - frames = len(self._pcm_buffer) // (_SAMPLE_WIDTH * _CHANNELS) - duration_seconds = frames / float(_SAMPLE_RATE_HZ) - - ws_meta = { - "sample_rate": _SAMPLE_RATE_HZ, - "frames": frames, - "channels": _CHANNELS, - "duration_seconds": round(duration_seconds, 3), - } - if self._debug_recording: - _filepath, filename = self._save_wav(bytes(self._pcm_buffer)) - ws_meta["text"] = f"Saved as {filename}" - ws_meta["path"] = f"recordings/{filename}" - else: - ws_meta["text"] = "Recording skipped (DEBUG_RECODING!=1)" - - await self.ws.send_json(ws_meta) - - transcript = await self._transcribe_async(bytes(self._pcm_buffer)) - - self._streaming = False - self._pcm_buffer = bytearray() - - if transcript.strip() == "": - self._message_error = EmptyTranscriptError("Speech recognition result is empty") - return - - self._transcript = transcript - self._message_ready.set() - def _handle_wakeword_event(self, msg_type: int, payload: bytes) -> None: if msg_type != _WsMsgType.DATA: return @@ -380,39 +299,6 @@ async def _send_state_command(self, state_id: int | FirmwareState) -> None: await self.ws.send_bytes(hdr + payload) self._down_seq += 1 - def _save_wav(self, pcm_bytes: bytes) -> tuple[Path, str]: - timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S_%f") - filename = f"rec_ws_{timestamp}.wav" - filepath = self.recordings_dir / filename - - with wave.open(str(filepath), "wb") as wav_fp: - wav_fp.setnchannels(_CHANNELS) - wav_fp.setsampwidth(_SAMPLE_WIDTH) - wav_fp.setframerate(_SAMPLE_RATE_HZ) - wav_fp.writeframes(pcm_bytes) - - logger.info("Saved WAV: %s", filename) - return filepath, filename - - async def _transcribe_async(self, pcm_bytes: bytes) -> str: - loop = asyncio.get_running_loop() - return await loop.run_in_executor(None, lambda: self._transcribe(pcm_bytes)) - - def _transcribe(self, pcm_bytes: bytes) -> str: - audio = speech.RecognitionAudio(content=pcm_bytes) - config = speech.RecognitionConfig( - encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16, - sample_rate_hertz=_SAMPLE_RATE_HZ, - language_code="ja-JP", - ) - response = self.speech_client.recognize(config=config, audio=audio) - - transcript = "" - for result in response.results: - logger.info("Transcript: %s", result.alternatives[0].transcript) - transcript += result.alternatives[0].transcript - return transcript - def _extract_pcm(self, wav_bytes: bytes) -> tuple[bytes, int, int, int]: with wave.open(io.BytesIO(wav_bytes), "rb") as wf: pcm_bytes = wf.readframes(wf.getnframes())