-
Notifications
You must be signed in to change notification settings - Fork 168
Expand file tree
/
Copy pathsubscriber.rs
More file actions
51 lines (44 loc) · 1.47 KB
/
subscriber.rs
File metadata and controls
51 lines (44 loc) · 1.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
use anyhow::Result;
use futures_util::StreamExt;
use livekit::prelude::*;
use std::env;
use tokio::{signal, sync::mpsc::UnboundedReceiver};
#[tokio::main]
async fn main() -> Result<()> {
env_logger::init();
let url = env::var("LIVEKIT_URL").expect("LIVEKIT_URL is not set");
let token = env::var("LIVEKIT_TOKEN").expect("LIVEKIT_TOKEN is not set");
let (_, rx) = Room::connect(&url, &token, RoomOptions::default()).await?;
tokio::select! {
_ = handle_publications(rx) => {}
_ = signal::ctrl_c() => {}
}
Ok(())
}
/// Subscribes to any published data tracks.
async fn handle_publications(mut rx: UnboundedReceiver<RoomEvent>) -> Result<()> {
while let Some(event) = rx.recv().await {
let RoomEvent::DataTrackPublished(track) = event else {
continue;
};
subscribe(track).await?
}
Ok(())
}
/// Subscribes to the given data track and logs received frames.
async fn subscribe(track: RemoteDataTrack) -> Result<()> {
log::info!(
"Subscribing to '{}' published by '{}'",
track.info().name(),
track.publisher_identity()
);
let mut subscription = track.subscribe().await?;
while let Some(frame) = subscription.next().await {
log::info!("Received frame ({} bytes)", frame.payload().len());
if let Some(duration) = frame.duration_since_timestamp() {
log::info!("Latency: {:?}", duration);
}
}
log::info!("Unsubscribed");
Ok(())
}