forked from livekit/python-sdks
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathaudio_stream.py
More file actions
321 lines (280 loc) · 12.8 KB
/
audio_stream.py
File metadata and controls
321 lines (280 loc) · 12.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
# Copyright 2023 LiveKit, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import asyncio
import json
from dataclasses import dataclass
from typing import Any, AsyncIterator, Optional
from ._ffi_client import FfiClient, FfiHandle
from ._proto import audio_frame_pb2 as proto_audio_frame
from ._proto import ffi_pb2 as proto_ffi
from ._proto.track_pb2 import TrackSource
from ._utils import RingQueue, task_done_logger
from .audio_frame import AudioFrame
from .log import logger
from .participant import Participant
from .track import Track
from .frame_processor import FrameProcessor
@dataclass
class AudioFrameEvent:
"""An event representing a received audio frame.
Attributes:
frame (AudioFrame): The received audio frame.
"""
frame: AudioFrame
@dataclass
class NoiseCancellationOptions:
module_id: str
options: dict[str, Any]
class AudioStream:
"""An asynchronous audio stream for receiving audio frames from a participant or track.
The `AudioStream` class provides an asynchronous iterator over audio frames received from
a specific track or participant. It allows you to receive audio frames in real-time with
customizable sample rates and channel configurations.
"""
def __init__(
self,
track: Track,
loop: Optional[asyncio.AbstractEventLoop] = None,
capacity: int = 0,
sample_rate: int = 48000,
num_channels: int = 1,
frame_size_ms: int | None = None,
noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None,
**kwargs,
) -> None:
"""Initialize an `AudioStream` instance.
Args:
track (Optional[Track]): The audio track from which to receive audio. If not provided,
you must specify `participant` and `track_source` in `kwargs`.
loop (Optional[asyncio.AbstractEventLoop], optional): The event loop to use.
Defaults to the current event loop.
capacity (int, optional): The capacity of the internal frame queue. Defaults to 0 (unbounded).
sample_rate (int, optional): The sample rate for the audio stream in Hz.
Defaults to 48000.
num_channels (int, optional): The number of audio channels. Defaults to 1.
noise_cancellation (Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]], optional):
If noise cancellation is used, pass a `NoiseCancellationOptions` or `FrameProcessor[AudioFrame]` instance
created by the noise cancellation module.
Example:
```python
audio_stream = AudioStream(
track=audio_track,
sample_rate=44100,
num_channels=2,
)
audio_stream = AudioStream.from_track(
track=audio_track,
sample_rate=44100,
num_channels=2,
)
```
"""
self._track: Track | None = track
self._sample_rate = sample_rate
self._num_channels = num_channels
self._frame_size_ms = frame_size_ms
self._loop = loop or asyncio.get_event_loop()
# Only subscribe to audio_stream_event to avoid unnecessary memory allocations
# from other event types (room_event, track_event, etc.)
self._ffi_queue = FfiClient.instance.queue.subscribe(
self._loop,
filter_fn=lambda e: e.WhichOneof("message") == "audio_stream_event",
)
self._queue: RingQueue[AudioFrameEvent | None] = RingQueue(capacity)
self._audio_filter_module: str | None = None
self._audio_filter_options: dict[str, Any] | None = None
self._processor: FrameProcessor[AudioFrame] | None = None
if isinstance(noise_cancellation, NoiseCancellationOptions):
self._audio_filter_module = noise_cancellation.module_id
self._audio_filter_options = noise_cancellation.options
elif isinstance(noise_cancellation, FrameProcessor):
self._processor = noise_cancellation
self._task = self._loop.create_task(self._run())
self._task.add_done_callback(task_done_logger)
stream: Any = None
if "participant" in kwargs:
stream = self._create_owned_stream_from_participant(
participant=kwargs["participant"], track_source=kwargs["track_source"]
)
else:
stream = self._create_owned_stream()
self._ffi_handle = FfiHandle(stream.handle.id)
self._info = stream.info
@classmethod
def from_participant(
cls,
*,
participant: Participant,
track_source: TrackSource.ValueType,
loop: Optional[asyncio.AbstractEventLoop] = None,
capacity: int = 0,
sample_rate: int = 48000,
num_channels: int = 1,
frame_size_ms: int | None = None,
noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None,
) -> AudioStream:
"""Create an `AudioStream` from a participant's audio track.
Args:
participant (Participant): The participant from whom to receive audio.
track_source (TrackSource.ValueType): The source of the audio track (e.g., microphone, screen share).
loop (Optional[asyncio.AbstractEventLoop], optional): The event loop to use. Defaults to the current event loop.
capacity (int, optional): The capacity of the internal frame queue. Defaults to 0 (unbounded).
sample_rate (int, optional): The sample rate for the audio stream in Hz. Defaults to 48000.
num_channels (int, optional): The number of audio channels. Defaults to 1.
noise_cancellation (Optional[NoiseCancellationOptions], optional):
If noise cancellation is used, pass a `NoiseCancellationOptions` instance
created by the noise cancellation module.
Returns:
AudioStream: An instance of `AudioStream` that can be used to receive audio frames.
Example:
```python
audio_stream = AudioStream.from_participant(
participant=participant,
track_source=TrackSource.MICROPHONE,
sample_rate=24000,
num_channels=1,
)
```
"""
return AudioStream(
participant=participant,
track_source=track_source,
loop=loop,
capacity=capacity,
track=None, # type: ignore
sample_rate=sample_rate,
num_channels=num_channels,
noise_cancellation=noise_cancellation,
frame_size_ms=frame_size_ms,
)
@classmethod
def from_track(
cls,
*,
track: Track,
loop: Optional[asyncio.AbstractEventLoop] = None,
capacity: int = 0,
sample_rate: int = 48000,
num_channels: int = 1,
frame_size_ms: int | None = None,
noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None,
) -> AudioStream:
"""Create an `AudioStream` from an existing audio track.
Args:
track (Track): The audio track from which to receive audio.
loop (Optional[asyncio.AbstractEventLoop], optional): The event loop to use. Defaults to the current event loop.
capacity (int, optional): The capacity of the internal frame queue. Defaults to 0 (unbounded).
sample_rate (int, optional): The sample rate for the audio stream in Hz. Defaults to 48000.
num_channels (int, optional): The number of audio channels. Defaults to 1.
noise_cancellation (Optional[NoiseCancellationOptions], optional):
If noise cancellation is used, pass a `NoiseCancellationOptions` instance
created by the noise cancellation module.
Returns:
AudioStream: An instance of `AudioStream` that can be used to receive audio frames.
Example:
```python
audio_stream = AudioStream.from_track(
track=audio_track,
sample_rate=44100,
num_channels=2,
)
```
"""
return AudioStream(
track=track,
loop=loop,
capacity=capacity,
sample_rate=sample_rate,
num_channels=num_channels,
noise_cancellation=noise_cancellation,
frame_size_ms=frame_size_ms,
)
def __del__(self) -> None:
FfiClient.instance.queue.unsubscribe(self._ffi_queue)
def _create_owned_stream(self) -> Any:
assert self._track is not None
req = proto_ffi.FfiRequest()
new_audio_stream = req.new_audio_stream
new_audio_stream.track_handle = self._track._ffi_handle.handle
new_audio_stream.sample_rate = self._sample_rate
new_audio_stream.num_channels = self._num_channels
if self._frame_size_ms:
new_audio_stream.frame_size_ms = self._frame_size_ms
new_audio_stream.type = proto_audio_frame.AudioStreamType.AUDIO_STREAM_NATIVE
if self._audio_filter_module is not None:
new_audio_stream.audio_filter_module_id = self._audio_filter_module
if self._audio_filter_options is not None:
new_audio_stream.audio_filter_options = json.dumps(self._audio_filter_options)
resp = FfiClient.instance.request(req)
return resp.new_audio_stream.stream
def _create_owned_stream_from_participant(
self, participant: Participant, track_source: TrackSource.ValueType
) -> Any:
req = proto_ffi.FfiRequest()
audio_stream_from_participant = req.audio_stream_from_participant
audio_stream_from_participant.participant_handle = participant._ffi_handle.handle
audio_stream_from_participant.sample_rate = self._sample_rate
audio_stream_from_participant.num_channels = self._num_channels
audio_stream_from_participant.type = proto_audio_frame.AudioStreamType.AUDIO_STREAM_NATIVE
audio_stream_from_participant.track_source = track_source
if self._frame_size_ms:
audio_stream_from_participant.frame_size_ms = self._frame_size_ms
if self._audio_filter_module is not None:
audio_stream_from_participant.audio_filter_module_id = self._audio_filter_module
if self._audio_filter_options is not None:
audio_stream_from_participant.audio_filter_options = json.dumps(
self._audio_filter_options
)
resp = FfiClient.instance.request(req)
return resp.audio_stream_from_participant.stream
async def _run(self):
while True:
event = await self._ffi_queue.wait_for(self._is_event)
audio_event: proto_audio_frame.AudioStreamEvent = event.audio_stream_event
if audio_event.HasField("frame_received"):
owned_buffer_info = audio_event.frame_received.frame
frame = AudioFrame._from_owned_info(owned_buffer_info)
if self._processor is not None and self._processor.enabled:
try:
frame = self._processor._process(frame)
except Exception:
logger.warning(
"Frame processing failed, passing through original frame",
exc_info=True,
)
event = AudioFrameEvent(frame)
self._queue.put(event)
elif audio_event.HasField("eos"):
self._queue.put(None)
break
FfiClient.instance.queue.unsubscribe(self._ffi_queue)
async def aclose(self) -> None:
"""Asynchronously close the audio stream.
This method cleans up resources associated with the audio stream and waits for
any pending operations to complete.
"""
self._ffi_handle.dispose()
await self._task
def _is_event(self, e: proto_ffi.FfiEvent) -> bool:
return e.audio_stream_event.stream_handle == self._ffi_handle.handle
def __aiter__(self) -> AsyncIterator[AudioFrameEvent]:
return self
async def __anext__(self) -> AudioFrameEvent:
if self._task.done():
raise StopAsyncIteration
item = await self._queue.get()
if item is None:
raise StopAsyncIteration
return item