-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmedia_stream.rs
More file actions
724 lines (622 loc) · 27.9 KB
/
media_stream.rs
File metadata and controls
724 lines (622 loc) · 27.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
//! Media Streaming Example
//!
//! Live audio streaming over P2P using MP3 files as source.
//! Demonstrates real-time media streaming between peers.
//!
//! This shows high-throughput, low-latency P2P streaming patterns.
//!
//! Usage:
//! media_stream publisher [mp3_file] [key] # Start media publisher
//! media_stream subscriber <id52> # Subscribe to media stream
use std::time::{Duration, Instant};
use tokio::io::AsyncReadExt;
use tokio::sync::mpsc;
use tokio::time::interval;
// Protocol Definition - Pull-based streaming
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq)]
pub enum MediaProtocol {
AudioStream,
}
// Client requests for pull-based streaming
#[derive(serde::Serialize, serde::Deserialize, Debug)]
pub enum StreamRequest {
// Request next chunk of audio data
RequestChunk { chunk_id: u64 },
// Get stream info (duration, format, etc.)
GetStreamInfo,
// Stop streaming
Stop,
}
// Server responses for pull-based streaming
#[derive(serde::Serialize, serde::Deserialize, Debug)]
pub enum StreamResponse {
// Audio chunk data
AudioChunk(AudioChunk),
// Stream information
StreamInfo {
total_chunks: u64,
chunk_size: usize,
sample_rate: u32,
channels: u16,
duration_seconds: f64,
},
// End of stream
EndOfStream,
// Error response
Error(String),
}
// Audio chunk for streaming
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
pub struct AudioChunk {
pub sequence: u64,
pub timestamp: u64, // Microseconds since stream start
pub data: Vec<u8>,
pub sample_rate: u32,
pub channels: u16,
}
// Audio stream statistics
#[derive(Debug, Default)]
pub struct StreamStats {
pub chunks_sent: u64,
pub chunks_received: u64,
pub bytes_sent: u64,
pub bytes_received: u64,
pub start_time: Option<Instant>,
pub last_chunk_time: Option<Instant>,
pub chunks_dropped: u64,
pub last_sequence: u64,
// Jitter tracking
pub inter_arrival_times: Vec<u64>, // Microseconds between chunks
pub expected_interval_us: u64, // Expected microseconds between chunks
}
// Custom error type for media operations
#[derive(Debug, thiserror::Error)]
pub enum MediaError {
#[error("Audio file not found: {0}")]
FileNotFound(String),
#[error("Audio decode error: {0}")]
DecodeError(String),
#[error("Audio playback error: {0}")]
PlaybackError(String),
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
}
#[fastn_p2p::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
match examples::parse_cli()? {
examples::Server {
private_key: _,
config,
} => {
// Always use persistent key for consistent server ID
let server_key = examples::get_or_create_persistent_key("media_stream");
let audio_file = config.first().cloned().unwrap_or_else(|| "examples/assets/SerenataGranados.ogg".to_string());
run_publisher(server_key, audio_file).await
}
examples::Client { target, config: _ } => {
run_subscriber(target).await
}
}
}
async fn run_publisher(
private_key: fastn_p2p::SecretKey,
audio_file: String,
) -> Result<(), Box<dyn std::error::Error>> {
println!("🎵 Audio Publisher starting...");
println!("📁 Audio file: {}", audio_file);
println!("🎧 Publisher listening on: {}", private_key.id52());
println!("");
println!("🚀 To connect from another machine, run:");
println!(" cargo run --bin media_stream -- client {}", private_key.id52());
println!("");
// Check if audio file exists, if not create a test tone
if !std::path::Path::new(&audio_file).exists() {
println!("📝 Creating test audio file: {}", audio_file);
create_test_audio(&audio_file).await?;
}
// Show audio info at startup
if let Ok((_, sample_rate, channels)) = load_audio_file_with_format(&audio_file).await {
let file_size = std::fs::metadata(&audio_file).map(|m| m.len()).unwrap_or(0);
println!("📀 Audio file info:");
println!(" 📦 File: {} ({:.1} KB)", audio_file, file_size as f64 / 1024.0);
println!(" 🎵 Format: {}Hz, {} channel(s)", sample_rate, channels);
}
fastn_p2p::listen(private_key)
.handle_requests(MediaProtocol::AudioStream, audio_file, audio_request_handler)
.await?;
Ok(())
}
async fn run_subscriber(
target: fastn_p2p::PublicKey,
) -> Result<(), Box<dyn std::error::Error>> {
let private_key = fastn_p2p::SecretKey::generate();
let start_time = Instant::now();
println!("🎧 Audio Subscriber connecting to: {}", target);
println!("🔍 Attempting P2P connection...");
// Connect to the publisher
let mut session = match fastn_p2p::client::connect(
private_key,
target,
MediaProtocol::AudioStream,
(), // No data needed for subscription
).await {
Ok(session) => {
let connection_time = start_time.elapsed();
println!("✅ P2P connection established! (+{:.3}s)", connection_time.as_secs_f64());
println!("🔊 Starting audio playback...");
session
}
Err(e) => {
eprintln!("❌ Failed to connect to publisher: {}", e);
eprintln!("💡 Check that:");
eprintln!(" - Publisher is running");
eprintln!(" - Publisher ID is correct");
eprintln!(" - Both machines can reach the internet");
eprintln!(" - No firewall blocking P2P traffic");
return Err(Box::new(e));
}
};
// Start audio playback system
let audio_setup_start = Instant::now();
let (_stream, stream_handle) = rodio::OutputStream::try_default()
.map_err(|e| MediaError::PlaybackError(format!("Failed to create audio output: {}", e)))?;
let sink = rodio::Sink::try_new(&stream_handle)
.map_err(|e| MediaError::PlaybackError(format!("Failed to create audio sink: {}", e)))?;
let audio_setup_time = audio_setup_start.elapsed();
println!("🔧 Audio system ready (+{:.3}s)", audio_setup_time.as_secs_f64());
let mut stats = StreamStats::default();
stats.start_time = Some(Instant::now());
// Set expected interval for jitter measurement (will be updated from chunk timing)
stats.expected_interval_us = 50000; // 50ms default
// Audio playback buffer - larger buffer to reduce jitter
let (audio_tx, mut audio_rx) = mpsc::channel::<AudioChunk>(1000);
// Spawn audio player task - let rodio handle buffering naturally
let sink = std::sync::Arc::new(sink);
let sink_clone = sink.clone();
tokio::spawn(async move {
while let Some(chunk) = audio_rx.recv().await {
// Convert raw audio data to playable format
if let Ok(source) = create_audio_source(chunk) {
// Simply append - rodio's internal buffering handles the rest
sink_clone.append(source);
}
}
});
// Receive and process audio chunks
let mut no_data_timeout = tokio::time::interval(Duration::from_secs(5));
no_data_timeout.tick().await; // First tick is immediate
loop {
let mut chunk_size_buf = [0u8; 4];
// Try to read with timeout to detect connection issues
tokio::select! {
result = session.stdout.read_exact(&mut chunk_size_buf) => {
if result.is_err() {
println!("📡 Stream ended or connection closed");
break;
}
no_data_timeout.reset(); // Reset timeout on successful read
}
_ = no_data_timeout.tick() => {
if stats.chunks_received == 0 {
eprintln!("⏰ No audio data received for 5 seconds");
eprintln!("💡 Connection may have failed or publisher not streaming");
break;
} else {
eprintln!("⏰ No new data for 5 seconds, stream may have ended");
break;
}
}
}
let chunk_size = u32::from_le_bytes(chunk_size_buf) as usize;
if chunk_size > 2 * 1024 * 1024 { // 2MB max chunk size
eprintln!("⚠️ Chunk too large: {} bytes", chunk_size);
continue;
}
let mut chunk_data = vec![0u8; chunk_size];
if session.stdout.read_exact(&mut chunk_data).await.is_err() {
break;
}
match bincode::deserialize::<AudioChunk>(&chunk_data) {
Ok(chunk) => {
// Print timing for first chunk
if stats.chunks_received == 0 {
let first_chunk_time = start_time.elapsed();
println!("📦 First chunk received (+{:.3}s)", first_chunk_time.as_secs_f64());
}
// Update statistics and calculate jitter
stats.chunks_received += 1;
stats.bytes_received += chunk.data.len() as u64;
let now = Instant::now();
if let Some(last_time) = stats.last_chunk_time {
let inter_arrival_us = now.duration_since(last_time).as_micros() as u64;
stats.inter_arrival_times.push(inter_arrival_us);
// Update expected interval from chunk timing data
if stats.chunks_received > 1 {
stats.expected_interval_us = chunk.timestamp / (chunk.sequence + 1); // Average expected
}
}
stats.last_chunk_time = Some(now);
// Check for dropped chunks
if chunk.sequence > stats.last_sequence + 1 {
stats.chunks_dropped += chunk.sequence - stats.last_sequence - 1;
eprintln!("📉 Dropped {} chunks (seq {} -> {})",
chunk.sequence - stats.last_sequence - 1,
stats.last_sequence, chunk.sequence);
}
stats.last_sequence = chunk.sequence;
// Send to audio player with buffering
let sequence_for_error = chunk.sequence;
if audio_tx.try_send(chunk.clone()).is_err() {
// Buffer full - wait a bit then try again to avoid dropping
tokio::time::sleep(Duration::from_millis(1)).await;
if audio_tx.try_send(chunk).is_err() {
eprintln!("⚠️ Audio buffer full, dropping chunk {}", sequence_for_error);
}
}
// Print stats every 100 chunks
if stats.chunks_received % 100 == 0 {
let elapsed = stats.start_time.unwrap().elapsed();
let throughput = stats.bytes_received as f64 / elapsed.as_secs_f64() / 1024.0;
println!("📊 Received {} chunks, {:.1} KB/s, {} dropped",
stats.chunks_received, throughput, stats.chunks_dropped);
}
}
Err(e) => {
eprintln!("❌ Failed to deserialize chunk: {}", e);
}
}
}
// Calculate final metrics including jitter
let total_duration = stats.start_time.unwrap().elapsed().as_secs_f64();
let avg_throughput_kbps = (stats.bytes_received as f64 * 8.0) / total_duration / 1000.0;
let packet_loss_rate = if stats.chunks_received > 0 {
(stats.chunks_dropped as f64 / stats.chunks_received as f64) * 100.0
} else {
0.0
};
// Calculate jitter (standard deviation of inter-arrival times)
let (avg_jitter_ms, jitter_stddev_ms) = if stats.inter_arrival_times.len() > 1 {
let avg_us = stats.inter_arrival_times.iter().sum::<u64>() as f64 / stats.inter_arrival_times.len() as f64;
let variance = stats.inter_arrival_times.iter()
.map(|&x| {
let diff = x as f64 - avg_us;
diff * diff
})
.sum::<f64>() / stats.inter_arrival_times.len() as f64;
let stddev_us = variance.sqrt();
(avg_us / 1000.0, stddev_us / 1000.0) // Convert to milliseconds
} else {
(0.0, 0.0)
};
println!("");
println!("📊 Client Streaming Metrics:");
println!(" ⏱️ Total duration: {:.1}s", total_duration);
println!(" 📦 Chunks received: {}", stats.chunks_received);
println!(" 💾 Data received: {:.1} KB", stats.bytes_received as f64 / 1024.0);
println!(" 🚀 Average throughput: {:.0} kbps", avg_throughput_kbps);
println!(" 📊 Jitter: {:.1}ms avg, {:.1}ms stddev", avg_jitter_ms, jitter_stddev_ms);
let quality = if jitter_stddev_ms > 100.0 {
"Poor (High Jitter)"
} else if jitter_stddev_ms > 50.0 {
"Fair (Moderate Jitter)"
} else if jitter_stddev_ms < 20.0 {
"Excellent"
} else {
"Good"
};
println!(" 🔊 Audio quality: {}", quality);
if stats.chunks_dropped > 0 {
println!(" ⚠️ {} chunks dropped - may cause audio gaps", stats.chunks_dropped);
}
Ok(())
}
// Global audio data cache to avoid re-decoding for each request
static AUDIO_CACHE: tokio::sync::OnceCell<(Vec<u8>, u32, u16, f64)> = tokio::sync::OnceCell::const_new();
// Audio request handler - responds to client chunk requests
async fn audio_request_handler(
request: StreamRequest,
audio_file: String,
) -> Result<StreamResponse, MediaError> {
// Read and decode audio file to get actual audio format
let decode_start = Instant::now();
let (audio_data, sample_rate, channels) = load_audio_file_with_format(&audio_file).await?;
let decode_time = decode_start.elapsed();
println!("📁 Audio decoded (+{:.3}s)", decode_time.as_secs_f64());
let mut stats = StreamStats::default();
stats.start_time = Some(Instant::now());
// Stream audio chunks - larger chunks for better efficiency
let chunk_size = 262144; // 256KB chunks for efficient streaming
let mut sequence = 0u64;
let stream_start = Instant::now();
// Calculate proper timing based on ACTUAL audio format
let bytes_per_second = sample_rate as u64 * channels as u64 * 2; // 2 bytes per sample (i16)
let chunk_duration_ms = (chunk_size as f64 / bytes_per_second as f64 * 1000.0) as u64;
println!("🎵 Streaming config:");
println!(" 🔊 Format: {}Hz, {} channel(s), 16-bit", sample_rate, channels);
println!(" 📦 Chunk size: {} bytes = {:.1}ms of audio", chunk_size, chunk_duration_ms);
println!(" ⏱️ Expected stream duration: {:.1}s", audio_data.len() as f64 / bytes_per_second as f64);
// Use slightly faster timing to prevent underruns
let adjusted_timing = (chunk_duration_ms as f64 * 0.95) as u64; // 5% faster
let mut interval = interval(Duration::from_millis(adjusted_timing.max(5))); // At least 5ms
let streaming_start = Instant::now();
println!("🚀 Starting audio stream (+{:.3}s)", streaming_start.duration_since(handler_start).as_secs_f64());
for chunk_data in audio_data.chunks(chunk_size) {
interval.tick().await;
let chunk = AudioChunk {
sequence,
timestamp: stream_start.elapsed().as_micros() as u64,
data: chunk_data.to_vec(),
sample_rate: 44100,
channels: 2,
};
// Serialize and send chunk
match bincode::serialize(&chunk) {
Ok(serialized) => {
let size = serialized.len() as u32;
// Send chunk size first
if session.send.write_all(&size.to_le_bytes()).await.is_err() {
break;
}
// Send chunk data
if session.send.write_all(&serialized).await.is_err() {
break;
}
stats.chunks_sent += 1;
stats.bytes_sent += chunk.data.len() as u64;
sequence += 1;
if sequence % 100 == 0 {
let elapsed = stats.start_time.unwrap().elapsed();
let throughput = stats.bytes_sent as f64 / elapsed.as_secs_f64() / 1024.0;
println!("📡 Sent {} chunks, {:.1} KB/s", stats.chunks_sent, throughput);
}
}
Err(e) => {
eprintln!("❌ Failed to serialize chunk: {}", e);
break;
}
}
}
// Calculate server-side metrics
let total_duration = stats.start_time.unwrap().elapsed().as_secs_f64();
let avg_throughput_kbps = (stats.bytes_sent as f64 * 8.0) / total_duration / 1000.0;
println!("");
println!("📊 Server Streaming Metrics:");
println!(" ⏱️ Stream duration: {:.1}s", total_duration);
println!(" 📦 Chunks sent: {}", stats.chunks_sent);
println!(" 💾 Data sent: {:.1} KB", stats.bytes_sent as f64 / 1024.0);
println!(" 🚀 Average throughput: {:.0} kbps", avg_throughput_kbps);
println!(" 🎵 Stream completed successfully");
Ok(())
}
// Load audio file (MP3/OGG) and decode to PCM audio data with format info
async fn load_audio_file_with_format(filename: &str) -> Result<(Vec<u8>, u32, u16), MediaError> {
println!("📁 Loading and decoding audio file: {}", filename);
let file_data = tokio::fs::read(filename).await
.map_err(|_| MediaError::FileNotFound(filename.to_string()))?;
// Try different decoders based on file extension
let extension = std::path::Path::new(filename)
.extension()
.and_then(|ext| ext.to_str())
.unwrap_or("")
.to_lowercase();
let (pcm_data, sample_rate, channels) = match extension.as_str() {
"mp3" => decode_mp3(&file_data)?,
"ogg" => decode_with_symphonia(&file_data)?,
_ => {
// Try symphonia first (supports many formats), fallback to mp3
decode_with_symphonia(&file_data)
.or_else(|_| decode_mp3(&file_data))?
}
};
// Calculate and display audio info
let file_size_kb = file_data.len() as f64 / 1024.0;
let pcm_size_kb = pcm_data.len() as f64 / 1024.0;
let duration = pcm_data.len() as f64 / (sample_rate as f64 * channels as f64 * 2.0); // 2 bytes per sample
let bitrate = (file_size_kb * 8.0) / duration;
println!("✅ Audio Decoded:");
println!(" 📦 File size: {:.1} KB", file_size_kb);
println!(" 🔊 PCM size: {:.1} KB", pcm_size_kb);
println!(" ⏱️ Duration: {:.1}s", duration);
println!(" 🎵 Sample rate: {} Hz", sample_rate);
println!(" 📻 Channels: {}", channels);
println!(" 📡 Bitrate: {:.0} kbps", bitrate);
Ok((pcm_data, sample_rate, channels))
}
// Decode MP3 using minimp3
fn decode_mp3(file_data: &[u8]) -> Result<(Vec<u8>, u32, u16), MediaError> {
let mut decoder = minimp3::Decoder::new(std::io::Cursor::new(file_data));
let mut pcm_data = Vec::new();
let mut sample_rate = 44100;
let mut channels = 2;
loop {
match decoder.next_frame() {
Ok(frame) => {
sample_rate = frame.sample_rate as u32;
channels = frame.channels as u16;
// Convert i16 samples to bytes
for sample in frame.data {
pcm_data.extend_from_slice(&sample.to_le_bytes());
}
}
Err(minimp3::Error::Eof) => break,
Err(e) => {
return Err(MediaError::DecodeError(format!("MP3 decode error: {:?}", e)));
}
}
}
if pcm_data.is_empty() {
return Err(MediaError::DecodeError("No MP3 audio data decoded".to_string()));
}
Ok((pcm_data, sample_rate, channels))
}
// Decode OGG/Vorbis and other formats using symphonia
fn decode_with_symphonia(file_data: &[u8]) -> Result<(Vec<u8>, u32, u16), MediaError> {
use symphonia::core::audio::{AudioBufferRef, Signal};
use symphonia::core::codecs::DecoderOptions;
use symphonia::core::formats::FormatOptions;
use symphonia::core::io::MediaSourceStream;
use symphonia::core::meta::MetadataOptions;
use symphonia::core::probe::Hint;
let file_data_owned = file_data.to_vec();
let cursor = std::io::Cursor::new(file_data_owned);
let mss = MediaSourceStream::new(Box::new(cursor), Default::default());
let hint = Hint::new();
// Let symphonia probe the format
let meta_opts = MetadataOptions::default();
let fmt_opts = FormatOptions::default();
let probed = symphonia::default::get_probe()
.format(&hint, mss, &fmt_opts, &meta_opts)
.map_err(|e| MediaError::DecodeError(format!("Format probe failed: {:?}", e)))?;
let mut format = probed.format;
let track = format
.tracks()
.iter()
.find(|t| t.codec_params.codec != symphonia::core::codecs::CODEC_TYPE_NULL)
.ok_or_else(|| MediaError::DecodeError("No supported audio tracks found".to_string()))?;
let dec_opts = DecoderOptions::default();
let mut decoder = symphonia::default::get_codecs()
.make(&track.codec_params, &dec_opts)
.map_err(|e| MediaError::DecodeError(format!("Decoder creation failed: {:?}", e)))?;
let track_id = track.id;
let mut pcm_data = Vec::new();
let mut sample_rate = 44100;
let mut channels = 2;
// Decode all packets
loop {
let packet = match format.next_packet() {
Ok(packet) => packet,
Err(symphonia::core::errors::Error::ResetRequired) => {
// The track list has been changed. Re-examine it and create a new set of decoders,
// then restart the decode loop. This is an advanced feature that most programs
// will never encounter.
unimplemented!();
}
Err(symphonia::core::errors::Error::IoError(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
// End of stream
break;
}
Err(e) => {
return Err(MediaError::DecodeError(format!("Packet read error: {:?}", e)));
}
};
// Only decode packets for our track
if packet.track_id() != track_id {
continue;
}
match decoder.decode(&packet) {
Ok(decoded) => {
// Extract sample rate and channel info
sample_rate = decoded.spec().rate;
channels = decoded.spec().channels.count() as u16;
// Convert audio buffer to i16 PCM with proper stereo interleaving
match decoded {
AudioBufferRef::F32(buf) => {
let channels_count = buf.spec().channels.count();
let frames = buf.frames();
for frame_idx in 0..frames {
// For each frame, interleave all channels properly
for ch in 0..channels_count {
let sample = buf.chan(ch)[frame_idx];
let sample_i16 = (sample * 32767.0).clamp(-32767.0, 32767.0) as i16;
pcm_data.extend_from_slice(&sample_i16.to_le_bytes());
}
}
}
AudioBufferRef::U16(buf) => {
let channels_count = buf.spec().channels.count();
let frames = buf.frames();
for frame_idx in 0..frames {
for ch in 0..channels_count {
let sample = buf.chan(ch)[frame_idx];
let sample_i16 = (sample as i32 - 32768) as i16;
pcm_data.extend_from_slice(&sample_i16.to_le_bytes());
}
}
}
AudioBufferRef::S16(buf) => {
let channels_count = buf.spec().channels.count();
let frames = buf.frames();
for frame_idx in 0..frames {
for ch in 0..channels_count {
let sample = buf.chan(ch)[frame_idx];
pcm_data.extend_from_slice(&sample.to_le_bytes());
}
}
}
_ => {
return Err(MediaError::DecodeError("Unsupported audio format".to_string()));
}
}
}
Err(symphonia::core::errors::Error::IoError(_)) => {
// End of stream
break;
}
Err(symphonia::core::errors::Error::DecodeError(_)) => {
// Decode error, try to continue
continue;
}
Err(e) => {
return Err(MediaError::DecodeError(format!("Decode error: {:?}", e)));
}
}
}
if pcm_data.is_empty() {
return Err(MediaError::DecodeError("No audio data decoded".to_string()));
}
Ok((pcm_data, sample_rate, channels))
}
// Create a test audio file (simple sine wave as MP3)
async fn create_test_audio(filename: &str) -> Result<(), MediaError> {
println!("🎼 Generating test audio (musical scale)...");
// Generate a musical scale instead of single tone for better quality testing
let sample_rate = 44100;
let duration = 10; // 10 seconds total
let notes = [261.63, 293.66, 329.63, 349.23, 392.00, 440.00, 493.88, 523.25]; // C major scale
let mut samples = Vec::new();
let note_duration = duration as f32 / notes.len() as f32; // Each note duration
for i in 0..(sample_rate * duration) {
let t = i as f32 / sample_rate as f32;
let note_index = (t / note_duration) as usize % notes.len();
let frequency = notes[note_index];
// Create a more pleasant sound with some harmonics
let fundamental = (2.0 * std::f32::consts::PI * frequency * t).sin();
let harmonic = 0.3 * (2.0 * std::f32::consts::PI * frequency * 2.0 * t).sin();
let sample = (fundamental + harmonic) * 0.7; // Reduce volume slightly
let sample_i16 = (sample * 32767.0) as i16;
samples.extend_from_slice(&sample_i16.to_le_bytes());
}
// For simplicity, save as raw PCM data with .mp3 extension
// In a real implementation, you'd encode this as actual MP3
tokio::fs::write(filename, &samples).await?;
println!("✅ Created test audio: C major scale ({} bytes, {}s)", samples.len(), duration);
Ok(())
}
// Create an audio source from raw chunk data for playback
fn create_audio_source(chunk: AudioChunk) -> Result<rodio::buffer::SamplesBuffer<i16>, MediaError> {
// Convert raw bytes to i16 samples
let mut samples = Vec::with_capacity(chunk.data.len() / 2);
// Handle both mono and stereo properly
for chunk_bytes in chunk.data.chunks_exact(2) {
let sample = i16::from_le_bytes([chunk_bytes[0], chunk_bytes[1]]);
samples.push(sample);
}
if samples.is_empty() {
return Err(MediaError::PlaybackError("No audio samples in chunk".to_string()));
}
// Create the audio source with correct format
let source = rodio::buffer::SamplesBuffer::new(
chunk.channels,
chunk.sample_rate,
samples
);
// Only print first few chunks to avoid spam
if chunk.sequence < 3 {
println!("🎵 Playing chunk {}: {} bytes, {}Hz, {} ch",
chunk.sequence,
chunk.data.len(),
chunk.sample_rate,
chunk.channels);
}
Ok(source)
}