Skip to content

Commit 2737923

Browse files
committed
fix(websockets): switch socket clients to construct_type for unknown message tolerance
Replace parse_obj_as with construct_type in all socket clients so unknown message types are silently coerced instead of raising ValidationError. Also fix v2 listen example to actually send audio.
1 parent cb6ad8a commit 2737923

5 files changed

Lines changed: 66 additions & 45 deletions

File tree

examples/14-transcription-live-websocket-v2.py

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@
55
with contextual turn detection.
66
"""
77

8+
import os
9+
import threading
10+
import time
11+
from pathlib import Path
812
from typing import Union
913

1014
from dotenv import load_dotenv
@@ -14,42 +18,59 @@
1418
from deepgram import DeepgramClient
1519
from deepgram.core.events import EventType
1620
from deepgram.listen.v2.types import (
21+
ListenV2CloseStream,
1722
ListenV2Connected,
1823
ListenV2FatalError,
1924
ListenV2TurnInfo,
2025
)
2126

2227
ListenV2SocketClientResponse = Union[ListenV2Connected, ListenV2TurnInfo, ListenV2FatalError]
2328

24-
client = DeepgramClient()
29+
client = DeepgramClient(api_key=os.environ.get("DEEPGRAM_API_KEY"))
2530

2631
try:
27-
with client.listen.v2.connect(model="flux-general-en", encoding="linear16", sample_rate=16000) as connection:
32+
with client.listen.v2.connect(
33+
model="flux-general-en",
34+
encoding="linear16",
35+
sample_rate="16000",
36+
) as connection:
2837

2938
def on_message(message: ListenV2SocketClientResponse) -> None:
30-
msg_type = getattr(message, "type", "Unknown")
31-
print(f"Received {msg_type} event")
39+
msg_type = getattr(message, "type", type(message).__name__)
40+
print(f"Received {msg_type} event ({type(message).__name__})")
3241

3342
# Extract transcription from TurnInfo events
3443
if isinstance(message, ListenV2TurnInfo):
35-
print(f"Turn transcript: {message.transcript}")
36-
print(f"Turn event: {message.event}")
37-
print(f"Turn index: {message.turn_index}")
44+
print(f" transcript: {message.transcript}")
45+
print(f" event: {message.event}")
46+
print(f" turn_index: {message.turn_index}")
3847

3948
connection.on(EventType.OPEN, lambda _: print("Connection opened"))
4049
connection.on(EventType.MESSAGE, on_message)
4150
connection.on(EventType.CLOSE, lambda _: print("Connection closed"))
42-
connection.on(EventType.ERROR, lambda error: print(f"Error: {error}"))
51+
connection.on(EventType.ERROR, lambda error: print(f"Error: {type(error).__name__}: {error}"))
4352

44-
# Start listening - this blocks until the connection closes
45-
# In production, you would send audio data here using connection.send_media()
46-
connection.start_listening()
53+
# Send audio in a background thread so start_listening can process responses
54+
def send_audio():
55+
audio_path = Path(__file__).parent / "fixtures" / "audio.wav"
56+
with open(audio_path, "rb") as f:
57+
audio = f.read()
58+
59+
# Send in chunks
60+
chunk_size = 4096
61+
for i in range(0, len(audio), chunk_size):
62+
connection.send_media(audio[i : i + chunk_size])
63+
time.sleep(0.01) # pace the sending
64+
65+
# Signal end of audio
66+
time.sleep(2)
67+
connection.send_close_stream(ListenV2CloseStream(type="CloseStream"))
4768

48-
# For async version:
49-
# from deepgram import AsyncDeepgramClient
50-
# async with client.listen.v2.connect(...) as connection:
51-
# # ... same event handlers ...
52-
# await connection.start_listening()
69+
sender = threading.Thread(target=send_audio, daemon=True)
70+
sender.start()
71+
72+
# This blocks until the connection closes
73+
connection.start_listening()
5374

5475
except Exception as e:
55-
print(f"Error: {e}")
76+
print(f"Error: {type(e).__name__}: {e}")

src/deepgram/agent/v1/socket_client.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import websockets
88
import websockets.sync.connection as websockets_sync_connection
99
from ...core.events import EventEmitterMixin, EventType
10-
from ...core.pydantic_utilities import parse_obj_as
10+
from ...core.unchecked_base_model import construct_type
1111
from .types.agent_v1agent_audio_done import AgentV1AgentAudioDone
1212
from .types.agent_v1agent_started_speaking import AgentV1AgentStartedSpeaking
1313
from .types.agent_v1agent_thinking import AgentV1AgentThinking
@@ -64,7 +64,7 @@ async def __aiter__(self):
6464
if isinstance(message, bytes):
6565
yield message
6666
else:
67-
yield parse_obj_as(V1SocketClientResponse, json.loads(message)) # type: ignore
67+
yield construct_type(type_=V1SocketClientResponse, object_=json.loads(message)) # type: ignore
6868

6969
async def start_listening(self):
7070
"""
@@ -83,7 +83,7 @@ async def start_listening(self):
8383
parsed = raw_message
8484
else:
8585
json_data = json.loads(raw_message)
86-
parsed = parse_obj_as(V1SocketClientResponse, json_data) # type: ignore
86+
parsed = construct_type(type_=V1SocketClientResponse, object_=json_data) # type: ignore
8787
await self._emit_async(EventType.MESSAGE, parsed)
8888
except Exception as exc:
8989
await self._emit_async(EventType.ERROR, exc)
@@ -154,7 +154,7 @@ async def recv(self) -> V1SocketClientResponse:
154154
if isinstance(data, bytes):
155155
return data # type: ignore
156156
json_data = json.loads(data)
157-
return parse_obj_as(V1SocketClientResponse, json_data) # type: ignore
157+
return construct_type(type_=V1SocketClientResponse, object_=json_data) # type: ignore
158158

159159
async def _send(self, data: typing.Any) -> None:
160160
"""
@@ -181,7 +181,7 @@ def __iter__(self):
181181
if isinstance(message, bytes):
182182
yield message
183183
else:
184-
yield parse_obj_as(V1SocketClientResponse, json.loads(message)) # type: ignore
184+
yield construct_type(type_=V1SocketClientResponse, object_=json.loads(message)) # type: ignore
185185

186186
def start_listening(self):
187187
"""
@@ -200,7 +200,7 @@ def start_listening(self):
200200
parsed = raw_message
201201
else:
202202
json_data = json.loads(raw_message)
203-
parsed = parse_obj_as(V1SocketClientResponse, json_data) # type: ignore
203+
parsed = construct_type(type_=V1SocketClientResponse, object_=json_data) # type: ignore
204204
self._emit(EventType.MESSAGE, parsed)
205205
except Exception as exc:
206206
self._emit(EventType.ERROR, exc)
@@ -271,7 +271,7 @@ def recv(self) -> V1SocketClientResponse:
271271
if isinstance(data, bytes):
272272
return data # type: ignore
273273
json_data = json.loads(data)
274-
return parse_obj_as(V1SocketClientResponse, json_data) # type: ignore
274+
return construct_type(type_=V1SocketClientResponse, object_=json_data) # type: ignore
275275

276276
def _send(self, data: typing.Any) -> None:
277277
"""

src/deepgram/listen/v1/socket_client.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import websockets
88
import websockets.sync.connection as websockets_sync_connection
99
from ...core.events import EventEmitterMixin, EventType
10-
from ...core.pydantic_utilities import parse_obj_as
10+
from ...core.unchecked_base_model import construct_type
1111
from .types.listen_v1close_stream import ListenV1CloseStream
1212
from .types.listen_v1finalize import ListenV1Finalize
1313
from .types.listen_v1keep_alive import ListenV1KeepAlive
@@ -34,7 +34,7 @@ async def __aiter__(self):
3434
if isinstance(message, bytes):
3535
yield message
3636
else:
37-
yield parse_obj_as(V1SocketClientResponse, json.loads(message)) # type: ignore
37+
yield construct_type(type_=V1SocketClientResponse, object_=json.loads(message)) # type: ignore
3838

3939
async def start_listening(self):
4040
"""
@@ -53,7 +53,7 @@ async def start_listening(self):
5353
parsed = raw_message
5454
else:
5555
json_data = json.loads(raw_message)
56-
parsed = parse_obj_as(V1SocketClientResponse, json_data) # type: ignore
56+
parsed = construct_type(type_=V1SocketClientResponse, object_=json_data) # type: ignore
5757
await self._emit_async(EventType.MESSAGE, parsed)
5858
except Exception as exc:
5959
await self._emit_async(EventType.ERROR, exc)
@@ -96,7 +96,7 @@ async def recv(self) -> V1SocketClientResponse:
9696
if isinstance(data, bytes):
9797
return data # type: ignore
9898
json_data = json.loads(data)
99-
return parse_obj_as(V1SocketClientResponse, json_data) # type: ignore
99+
return construct_type(type_=V1SocketClientResponse, object_=json_data) # type: ignore
100100

101101
async def _send(self, data: typing.Any) -> None:
102102
"""
@@ -123,7 +123,7 @@ def __iter__(self):
123123
if isinstance(message, bytes):
124124
yield message
125125
else:
126-
yield parse_obj_as(V1SocketClientResponse, json.loads(message)) # type: ignore
126+
yield construct_type(type_=V1SocketClientResponse, object_=json.loads(message)) # type: ignore
127127

128128
def start_listening(self):
129129
"""
@@ -142,7 +142,7 @@ def start_listening(self):
142142
parsed = raw_message
143143
else:
144144
json_data = json.loads(raw_message)
145-
parsed = parse_obj_as(V1SocketClientResponse, json_data) # type: ignore
145+
parsed = construct_type(type_=V1SocketClientResponse, object_=json_data) # type: ignore
146146
self._emit(EventType.MESSAGE, parsed)
147147
except Exception as exc:
148148
self._emit(EventType.ERROR, exc)
@@ -185,7 +185,7 @@ def recv(self) -> V1SocketClientResponse:
185185
if isinstance(data, bytes):
186186
return data # type: ignore
187187
json_data = json.loads(data)
188-
return parse_obj_as(V1SocketClientResponse, json_data) # type: ignore
188+
return construct_type(type_=V1SocketClientResponse, object_=json_data) # type: ignore
189189

190190
def _send(self, data: typing.Any) -> None:
191191
"""

src/deepgram/listen/v2/socket_client.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import websockets
88
import websockets.sync.connection as websockets_sync_connection
99
from ...core.events import EventEmitterMixin, EventType
10-
from ...core.pydantic_utilities import parse_obj_as
10+
from ...core.unchecked_base_model import construct_type
1111
from .types.listen_v2close_stream import ListenV2CloseStream
1212
from .types.listen_v2connected import ListenV2Connected
1313
from .types.listen_v2fatal_error import ListenV2FatalError
@@ -31,7 +31,7 @@ async def __aiter__(self):
3131
if isinstance(message, bytes):
3232
yield message
3333
else:
34-
yield parse_obj_as(V2SocketClientResponse, json.loads(message)) # type: ignore
34+
yield construct_type(type_=V2SocketClientResponse, object_=json.loads(message)) # type: ignore
3535

3636
async def start_listening(self):
3737
"""
@@ -50,7 +50,7 @@ async def start_listening(self):
5050
parsed = raw_message
5151
else:
5252
json_data = json.loads(raw_message)
53-
parsed = parse_obj_as(V2SocketClientResponse, json_data) # type: ignore
53+
parsed = construct_type(type_=V2SocketClientResponse, object_=json_data) # type: ignore
5454
await self._emit_async(EventType.MESSAGE, parsed)
5555
except Exception as exc:
5656
await self._emit_async(EventType.ERROR, exc)
@@ -79,7 +79,7 @@ async def recv(self) -> V2SocketClientResponse:
7979
if isinstance(data, bytes):
8080
return data # type: ignore
8181
json_data = json.loads(data)
82-
return parse_obj_as(V2SocketClientResponse, json_data) # type: ignore
82+
return construct_type(type_=V2SocketClientResponse, object_=json_data) # type: ignore
8383

8484
async def _send(self, data: typing.Any) -> None:
8585
"""
@@ -106,7 +106,7 @@ def __iter__(self):
106106
if isinstance(message, bytes):
107107
yield message
108108
else:
109-
yield parse_obj_as(V2SocketClientResponse, json.loads(message)) # type: ignore
109+
yield construct_type(type_=V2SocketClientResponse, object_=json.loads(message)) # type: ignore
110110

111111
def start_listening(self):
112112
"""
@@ -125,7 +125,7 @@ def start_listening(self):
125125
parsed = raw_message
126126
else:
127127
json_data = json.loads(raw_message)
128-
parsed = parse_obj_as(V2SocketClientResponse, json_data) # type: ignore
128+
parsed = construct_type(type_=V2SocketClientResponse, object_=json_data) # type: ignore
129129
self._emit(EventType.MESSAGE, parsed)
130130
except Exception as exc:
131131
self._emit(EventType.ERROR, exc)
@@ -154,7 +154,7 @@ def recv(self) -> V2SocketClientResponse:
154154
if isinstance(data, bytes):
155155
return data # type: ignore
156156
json_data = json.loads(data)
157-
return parse_obj_as(V2SocketClientResponse, json_data) # type: ignore
157+
return construct_type(type_=V2SocketClientResponse, object_=json_data) # type: ignore
158158

159159
def _send(self, data: typing.Any) -> None:
160160
"""

src/deepgram/speak/v1/socket_client.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import websockets
88
import websockets.sync.connection as websockets_sync_connection
99
from ...core.events import EventEmitterMixin, EventType
10-
from ...core.pydantic_utilities import parse_obj_as
10+
from ...core.unchecked_base_model import construct_type
1111
from .types.speak_v1clear import SpeakV1Clear
1212
from .types.speak_v1cleared import SpeakV1Cleared
1313
from .types.speak_v1close import SpeakV1Close
@@ -35,7 +35,7 @@ async def __aiter__(self):
3535
if isinstance(message, bytes):
3636
yield message
3737
else:
38-
yield parse_obj_as(V1SocketClientResponse, json.loads(message)) # type: ignore
38+
yield construct_type(type_=V1SocketClientResponse, object_=json.loads(message)) # type: ignore
3939

4040
async def start_listening(self):
4141
"""
@@ -54,7 +54,7 @@ async def start_listening(self):
5454
parsed = raw_message
5555
else:
5656
json_data = json.loads(raw_message)
57-
parsed = parse_obj_as(V1SocketClientResponse, json_data) # type: ignore
57+
parsed = construct_type(type_=V1SocketClientResponse, object_=json_data) # type: ignore
5858
await self._emit_async(EventType.MESSAGE, parsed)
5959
except Exception as exc:
6060
await self._emit_async(EventType.ERROR, exc)
@@ -97,7 +97,7 @@ async def recv(self) -> V1SocketClientResponse:
9797
if isinstance(data, bytes):
9898
return data # type: ignore
9999
json_data = json.loads(data)
100-
return parse_obj_as(V1SocketClientResponse, json_data) # type: ignore
100+
return construct_type(type_=V1SocketClientResponse, object_=json_data) # type: ignore
101101

102102
async def _send(self, data: typing.Any) -> None:
103103
"""
@@ -124,7 +124,7 @@ def __iter__(self):
124124
if isinstance(message, bytes):
125125
yield message
126126
else:
127-
yield parse_obj_as(V1SocketClientResponse, json.loads(message)) # type: ignore
127+
yield construct_type(type_=V1SocketClientResponse, object_=json.loads(message)) # type: ignore
128128

129129
def start_listening(self):
130130
"""
@@ -143,7 +143,7 @@ def start_listening(self):
143143
parsed = raw_message
144144
else:
145145
json_data = json.loads(raw_message)
146-
parsed = parse_obj_as(V1SocketClientResponse, json_data) # type: ignore
146+
parsed = construct_type(type_=V1SocketClientResponse, object_=json_data) # type: ignore
147147
self._emit(EventType.MESSAGE, parsed)
148148
except Exception as exc:
149149
self._emit(EventType.ERROR, exc)
@@ -186,7 +186,7 @@ def recv(self) -> V1SocketClientResponse:
186186
if isinstance(data, bytes):
187187
return data # type: ignore
188188
json_data = json.loads(data)
189-
return parse_obj_as(V1SocketClientResponse, json_data) # type: ignore
189+
return construct_type(type_=V1SocketClientResponse, object_=json_data) # type: ignore
190190

191191
def _send(self, data: typing.Any) -> None:
192192
"""

0 commit comments

Comments
 (0)