From 14a7a6476f6f14fb52252549cc5dc03b1cfbf8d2 Mon Sep 17 00:00:00 2001 From: shijing xian Date: Wed, 11 Mar 2026 18:34:41 +0800 Subject: [PATCH] upload the prototype for single peer connnection and tests --- .../connection_latency_test.dart | 478 +++++++++++++++++ .../LiveKit Broadcast Extension.xcscheme | 97 ++++ example/lib/latency_test.dart | 402 ++++++++++++++ example/pubspec.yaml | 2 + lib/src/core/engine.dart | 163 ++++-- lib/src/core/signal_client.dart | 124 ++++- lib/src/core/transport.dart | 104 +++- lib/src/options.dart | 13 + lib/src/participant/local.dart | 21 +- lib/src/utils.dart | 109 ++++ pubspec.lock | 47 +- pubspec.yaml | 2 + test/integration/connection_latency_test.dart | 504 ++++++++++++++++++ 13 files changed, 2002 insertions(+), 64 deletions(-) create mode 100644 example/integration_test/connection_latency_test.dart create mode 100644 example/ios/Runner.xcodeproj/xcshareddata/xcschemes/LiveKit Broadcast Extension.xcscheme create mode 100644 example/lib/latency_test.dart create mode 100644 test/integration/connection_latency_test.dart diff --git a/example/integration_test/connection_latency_test.dart b/example/integration_test/connection_latency_test.dart new file mode 100644 index 000000000..694dfe1f8 --- /dev/null +++ b/example/integration_test/connection_latency_test.dart @@ -0,0 +1,478 @@ +// Copyright 2024 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// Connection Latency Measurement Integration Test +/// +/// Run from the example directory: +/// cd example +/// flutter test integration_test/connection_latency_test.dart -d + +import 'dart:async'; +import 'dart:math' as math; + +import 'package:flutter_test/flutter_test.dart'; +import 'package:integration_test/integration_test.dart'; +import 'package:livekit_client/livekit_client.dart'; +import 'package:logging/logging.dart'; + +// ============================================================================ +// CONFIGURATION - EDIT THESE VALUES +// ============================================================================ + +class TestConfig { + // TODO: Replace with your LiveKit server URL + static const String url = 'wss://xianstaging-hixkk74p.staging.livekit.cloud'; + + // Caller participant config (used by existing tests) + static const String callerIdentity = 'sxian'; + static const String callerToken = + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3NzYzMjcyMDQsImlkZW50aXR5Ijoic3hpYW4iLCJpc3MiOiJBUElHejJLUXU0UGJ6YkEiLCJuYW1lIjoic3hpYW4iLCJuYmYiOjE3NzAzMjcyMDQsInN1YiI6InN4aWFuIiwidmlkZW8iOnsicm9vbSI6ImNwcCIsInJvb21Kb2luIjp0cnVlfX0.oI0DCLyCMK-y6yxyQ3cFPArGJa03XHeeUrC_t2LUCgU'; + + // Receiver participant config (required by RPC latency test) + // TODO: Replace with a second token whose identity matches receiverIdentity. + static const String receiverIdentity = 'receiver'; + static const String receiverToken = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3NzYzMjcyMTgsImlkZW50aXR5IjoidGVzdCIsImlzcyI6IkFQSUd6MktRdTRQYnpiQSIsIm5hbWUiOiJ0ZXN0IiwibmJmIjoxNzcwMzI3MjE4LCJzdWIiOiJ0ZXN0IiwidmlkZW8iOnsicm9vbSI6ImNwcCIsInJvb21Kb2luIjp0cnVlfX0.xWNZ4Hz1AmXblr8V1GWPm9fcbhuj_f9C5qa0sJxdTFA'; + + // Backward-compatible alias for existing tests in this file. + static const String token = callerToken; + + // Number of iterations for the test + static const int iterations = 5; + + // Check if configured + static bool get isConfigured => + url.isNotEmpty && + token != 'YOUR_TOKEN_HERE' && + token.isNotEmpty; + + static bool get isRpcConfigured => + isConfigured && + receiverToken != 'RECEIVER_TOKEN_HERE' && + receiverToken.isNotEmpty; +} + +void _enableLiveKitLogs() { + hierarchicalLoggingEnabled = true; + setLoggingLevel(LoggerLevel.kOFF); +} + +Future _waitForRemoteParticipantByIdentity( + Room room, + String identity, { + Duration timeout = const Duration(seconds: 10), +}) async { + final stopwatch = Stopwatch()..start(); + while (stopwatch.elapsed < timeout) { + final participant = room.getParticipantByIdentity(identity); + if (participant is RemoteParticipant) return; + await Future.delayed(const Duration(milliseconds: 50)); + } + throw TimeoutException('Timed out waiting for remote participant identity=$identity'); +} + +Future _safeDisconnectAndDispose(Room room, String label) async { + try { + await room.disconnect(); + } catch (e) { + print(' [$label] disconnect warning: $e'); + } + + try { + await room.dispose(); + } catch (e) { + print(' [$label] dispose warning: $e'); + } +} + +// ============================================================================ +// LATENCY STATISTICS +// ============================================================================ + +class LatencyStats { + final List _measurements = []; + final String name; + + LatencyStats({this.name = 'Latency'}); + + void addMeasurement(double latencyMs) { + _measurements.add(latencyMs); + } + + int get count => _measurements.length; + bool get isEmpty => _measurements.isEmpty; + + double get min => _measurements.isEmpty ? 0 : _measurements.reduce(math.min); + double get max => _measurements.isEmpty ? 0 : _measurements.reduce(math.max); + + double get mean { + if (_measurements.isEmpty) return 0; + return _measurements.reduce((a, b) => a + b) / _measurements.length; + } + + double get p50 { + if (_measurements.isEmpty) return 0; + final sorted = List.from(_measurements)..sort(); + final middle = sorted.length ~/ 2; + if (sorted.length.isOdd) { + return sorted[middle]; + } + return (sorted[middle - 1] + sorted[middle]) / 2; + } + + double get p95 { + if (_measurements.isEmpty) return 0; + final sorted = List.from(_measurements)..sort(); + final index = ((sorted.length - 1) * 0.95).floor(); + return sorted[index]; + } + + double get p99 { + if (_measurements.isEmpty) return 0; + final sorted = List.from(_measurements)..sort(); + final index = ((sorted.length - 1) * 0.99).floor(); + return sorted[index]; + } + + void printStats() { + print('\n$name'); + print('Samples: $count'); + print('Min: ${min.toStringAsFixed(2)} ms'); + print('Avg: ${mean.toStringAsFixed(2)} ms'); + print('P50: ${p50.toStringAsFixed(2)} ms'); + print('P95: ${p95.toStringAsFixed(2)} ms'); + print('P99: ${p99.toStringAsFixed(2)} ms'); + print('Max: ${max.toStringAsFixed(2)} ms'); + } + + void clear() { + _measurements.clear(); + } +} + +// ============================================================================ +// TESTS +// ============================================================================ + +void main() { + IntegrationTestWidgetsFlutterBinding.ensureInitialized(); + _enableLiveKitLogs(); + + group('Connection Latency Tests', () { + testWidgets('Connection Time Measurement', (tester) async { + if (!TestConfig.isConfigured) { + print('\n*** TEST NOT CONFIGURED ***'); + print('Edit TestConfig in this file with your LiveKit credentials:'); + print(' - url: Your LiveKit server URL'); + print(' - token: A valid access token'); + return; + } + + print('\n=== Connection Time Measurement ==='); + print('URL: ${TestConfig.url}'); + print('Iterations: ${TestConfig.iterations}'); + + final stats = LatencyStats(name: 'Connection Time'); + + for (int i = 0; i < TestConfig.iterations; i++) { + final room = Room( + roomOptions: const RoomOptions( + adaptiveStream: false, + dynacast: false, + ), + ); + + try { + final stopwatch = Stopwatch()..start(); + + await room.connect( + TestConfig.url, + TestConfig.token, + connectOptions: const ConnectOptions(autoSubscribe: true), + ); + + stopwatch.stop(); + + if (room.connectionState == ConnectionState.connected) { + final latencyMs = stopwatch.elapsedMicroseconds / 1000.0; + stats.addMeasurement(latencyMs); + + final pathInfo = room.engine.signalClient.singlePcMode ? '[V1]' : '[V0]'; + print(' Iteration ${i + 1}: ${latencyMs.toStringAsFixed(2)} ms $pathInfo'); + } else { + print(' Iteration ${i + 1}: FAILED - state: ${room.connectionState}'); + } + } catch (e) { + print(' Iteration ${i + 1}: FAILED - $e'); + } finally { + await room.disconnect(); + await room.dispose(); + } + + // Delay between iterations + await Future.delayed(const Duration(milliseconds: 500)); + } + + stats.printStats(); + + expect(stats.count, greaterThan(0), reason: 'At least one connection should succeed'); + }); + + testWidgets('Single PC Mode Comparison (V0 vs V1)', (tester) async { + if (!TestConfig.isConfigured) { + print('\n*** TEST NOT CONFIGURED ***'); + return; + } + + print('\n=== Single PC Mode Comparison (V0 vs V1) ==='); + print('URL: ${TestConfig.url}'); + + final statsV0 = LatencyStats(name: 'V0 (Legacy Dual PC)'); + final statsV1 = LatencyStats(name: 'V1 (Single PC)'); + + // Test V0 (legacy dual PeerConnection mode) + print('\nV0 - Legacy Dual PeerConnection (singlePeerConnection: false):'); + for (int i = 0; i < TestConfig.iterations; i++) { + final room = Room( + roomOptions: const RoomOptions( + adaptiveStream: false, + dynacast: false, + ), + ); + + try { + final stopwatch = Stopwatch()..start(); + await room.connect( + TestConfig.url, + TestConfig.token, + connectOptions: const ConnectOptions( + autoSubscribe: true, + singlePeerConnection: false, // Force V0 path + ), + ); + stopwatch.stop(); + + if (room.connectionState == ConnectionState.connected) { + final latencyMs = stopwatch.elapsedMicroseconds / 1000.0; + statsV0.addMeasurement(latencyMs); + final mode = room.engine.signalClient.singlePcMode ? 'V1' : 'V0'; + print(' Iteration ${i + 1}: ${latencyMs.toStringAsFixed(2)} ms [$mode]'); + } + } catch (e) { + print(' Iteration ${i + 1}: FAILED - $e'); + } finally { + await room.disconnect(); + await room.dispose(); + } + + await Future.delayed(const Duration(milliseconds: 500)); + } + + // Test V1 (single PeerConnection mode) + print('\nV1 - Single PeerConnection (singlePeerConnection: true):'); + for (int i = 0; i < TestConfig.iterations; i++) { + final room = Room( + roomOptions: const RoomOptions( + adaptiveStream: false, + dynacast: false, + ), + ); + + try { + final stopwatch = Stopwatch()..start(); + await room.connect( + TestConfig.url, + TestConfig.token, + connectOptions: const ConnectOptions( + autoSubscribe: true, + singlePeerConnection: true, // Enable V1 path (default) + ), + ); + stopwatch.stop(); + + if (room.connectionState == ConnectionState.connected) { + final latencyMs = stopwatch.elapsedMicroseconds / 1000.0; + statsV1.addMeasurement(latencyMs); + final mode = room.engine.signalClient.singlePcMode ? 'V1' : 'V0'; + print(' Iteration ${i + 1}: ${latencyMs.toStringAsFixed(2)} ms [$mode]'); + } + } catch (e) { + print(' Iteration ${i + 1}: FAILED - $e'); + } finally { + await room.disconnect(); + await room.dispose(); + } + + await Future.delayed(const Duration(milliseconds: 500)); + } + + statsV0.printStats(); + statsV1.printStats(); + + // Print comparison + if (statsV0.count > 0 && statsV1.count > 0) { + print('\n=== Comparison ==='); + final diff = statsV0.mean - statsV1.mean; + print('V0 (Legacy): ${statsV0.mean.toStringAsFixed(2)} ms avg'); + print('V1 (Single PC): ${statsV1.mean.toStringAsFixed(2)} ms avg'); + if (diff > 0) { + print('V1 is ${diff.toStringAsFixed(2)} ms faster (${(diff / statsV0.mean * 100).toStringAsFixed(1)}% improvement)'); + } else if (diff < 0) { + print('V0 is ${(-diff).toStringAsFixed(2)} ms faster'); + } else { + print('Both modes have similar latency'); + } + } + + final totalCount = statsV0.count + statsV1.count; + expect(totalCount, greaterThan(0), reason: 'At least one connection should succeed'); + }); + + testWidgets('RPC Latency Right After Connect (V0 vs V1, two participants)', (tester) async { + if (!TestConfig.isRpcConfigured) { + print('\n*** RPC TEST NOT CONFIGURED ***'); + print('Set TestConfig.receiverToken and ensure it matches TestConfig.receiverIdentity.'); + print('callerIdentity: ${TestConfig.callerIdentity}'); + print('receiverIdentity: ${TestConfig.receiverIdentity}'); + return; + } + + print('\n=== RPC Latency Right After Connect (V0 vs V1) ==='); + print('URL: ${TestConfig.url}'); + print('Iterations: ${TestConfig.iterations}'); + print('Caller identity: ${TestConfig.callerIdentity}'); + print('Receiver identity: ${TestConfig.receiverIdentity}'); + + final statsV0 = LatencyStats(name: 'RPC Latency V0 (dual PC)'); + final statsV1 = LatencyStats(name: 'RPC Latency V1 (single PC)'); + final errors = []; + + Future runMode({ + required bool singlePeerConnection, + required LatencyStats stats, + required String label, + }) async { + print('\n$label (singlePeerConnection: $singlePeerConnection):'); + + for (int i = 0; i < TestConfig.iterations; i++) { + final receiverRoom = Room( + roomOptions: const RoomOptions(adaptiveStream: false, dynacast: false), + ); + final callerRoom = Room( + roomOptions: const RoomOptions(adaptiveStream: false, dynacast: false), + ); + + receiverRoom.registerRpcMethod('latency_echo', (data) async => data.payload); + + try { + final connectOptions = ConnectOptions( + autoSubscribe: true, + singlePeerConnection: singlePeerConnection, + ); + + await receiverRoom.connect( + TestConfig.url, + TestConfig.receiverToken, + connectOptions: connectOptions, + ); + final connectedReceiverIdentity = receiverRoom.localParticipant?.identity; + if (connectedReceiverIdentity == null || connectedReceiverIdentity.isEmpty) { + throw Exception('Receiver room connected without a valid local participant identity'); + } + + await callerRoom.connect( + TestConfig.url, + TestConfig.callerToken, + connectOptions: connectOptions, + ); + + await _waitForRemoteParticipantByIdentity(callerRoom, connectedReceiverIdentity); + + final stopwatch = Stopwatch()..start(); + final payload = 'ping-$i-${DateTime.now().microsecondsSinceEpoch}'; + final response = await callerRoom.localParticipant!.performRpc( + PerformRpcParams( + destinationIdentity: connectedReceiverIdentity, + method: 'latency_echo', + payload: payload, + responseTimeoutMs: const Duration(seconds: 10), + ), + ); + stopwatch.stop(); + + if (response != payload) { + throw Exception('Unexpected RPC response payload: $response'); + } + + final latencyMs = stopwatch.elapsedMicroseconds / 1000.0; + stats.addMeasurement(latencyMs); + + final callerMode = callerRoom.engine.signalClient.singlePcMode ? 'V1' : 'V0'; + final receiverMode = receiverRoom.engine.signalClient.singlePcMode ? 'V1' : 'V0'; + print( + ' Iteration ${i + 1}: ${latencyMs.toStringAsFixed(2)} ms ' + '[caller:$callerMode receiver:$receiverMode]' + ' [dest:$connectedReceiverIdentity]', + ); + } catch (e) { + final errorText = e is RpcError ? 'RpcError(code=${e.code}, message=${e.message})' : e.toString(); + errors.add('$label iteration ${i + 1}: $errorText'); + print(' Iteration ${i + 1}: FAILED - $errorText'); + } finally { + await _safeDisconnectAndDispose(callerRoom, 'caller'); + await _safeDisconnectAndDispose(receiverRoom, 'receiver'); + } + + await Future.delayed(const Duration(milliseconds: 500)); + } + } + + await runMode( + singlePeerConnection: false, + stats: statsV0, + label: 'V0 - Legacy Dual PeerConnection', + ); + await runMode( + singlePeerConnection: true, + stats: statsV1, + label: 'V1 - Single PeerConnection', + ); + + statsV0.printStats(); + statsV1.printStats(); + + if (statsV0.count > 0 && statsV1.count > 0) { + print('\n=== RPC Latency Comparison ==='); + final diff = statsV0.mean - statsV1.mean; + print('V0 (Legacy): ${statsV0.mean.toStringAsFixed(2)} ms avg'); + print('V1 (Single PC): ${statsV1.mean.toStringAsFixed(2)} ms avg'); + if (diff > 0) { + print('V1 is ${diff.toStringAsFixed(2)} ms faster (${(diff / statsV0.mean * 100).toStringAsFixed(1)}% improvement)'); + } else if (diff < 0) { + print('V0 is ${(-diff).toStringAsFixed(2)} ms faster'); + } else { + print('Both modes have similar RPC latency'); + } + } + + final totalCount = statsV0.count + statsV1.count; + if (totalCount == 0 && errors.isNotEmpty) { + print('\nRPC failures:'); + for (final err in errors) { + print(' - $err'); + } + } + expect(totalCount, greaterThan(0), reason: 'At least one RPC call should succeed'); + }); + }); +} diff --git a/example/ios/Runner.xcodeproj/xcshareddata/xcschemes/LiveKit Broadcast Extension.xcscheme b/example/ios/Runner.xcodeproj/xcshareddata/xcschemes/LiveKit Broadcast Extension.xcscheme new file mode 100644 index 000000000..360f3c530 --- /dev/null +++ b/example/ios/Runner.xcodeproj/xcshareddata/xcschemes/LiveKit Broadcast Extension.xcscheme @@ -0,0 +1,97 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/example/lib/latency_test.dart b/example/lib/latency_test.dart new file mode 100644 index 000000000..14853f28d --- /dev/null +++ b/example/lib/latency_test.dart @@ -0,0 +1,402 @@ +// Copyright 2024 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// Connection Latency Test - Device Runnable +/// +/// This file provides a reusable latency measurement utility that can be +/// run on physical devices where environment variables aren't available. +/// +/// Usage: +/// 1. Import this file in your test app +/// 2. Call runLatencyTest() with your credentials +/// 3. Results are printed and returned +/// +/// Example: +/// ```dart +/// import 'package:example/latency_test.dart'; +/// +/// void main() async { +/// await runLatencyTest( +/// url: 'wss://your-server.livekit.cloud', +/// token: 'your_token', +/// iterations: 10, +/// ); +/// } +/// ``` +library latency_test; + +import 'dart:math' as math; + +import 'package:flutter/widgets.dart' hide ConnectionState; +import 'package:livekit_client/livekit_client.dart'; + +/// Statistics calculator for latency measurements +class LatencyStats { + final List _measurements = []; + final String name; + + LatencyStats({this.name = 'Latency'}); + + void addMeasurement(double latencyMs) { + _measurements.add(latencyMs); + } + + int get count => _measurements.length; + bool get isEmpty => _measurements.isEmpty; + + double get min => _measurements.isEmpty ? 0 : _measurements.reduce(math.min); + double get max => _measurements.isEmpty ? 0 : _measurements.reduce(math.max); + + double get mean { + if (_measurements.isEmpty) return 0; + return _measurements.reduce((a, b) => a + b) / _measurements.length; + } + + double get p50 { + if (_measurements.isEmpty) return 0; + final sorted = List.from(_measurements)..sort(); + final middle = sorted.length ~/ 2; + if (sorted.length.isOdd) { + return sorted[middle]; + } + return (sorted[middle - 1] + sorted[middle]) / 2; + } + + double get p95 { + if (_measurements.isEmpty) return 0; + final sorted = List.from(_measurements)..sort(); + final index = ((sorted.length - 1) * 0.95).floor(); + return sorted[index]; + } + + double get p99 { + if (_measurements.isEmpty) return 0; + final sorted = List.from(_measurements)..sort(); + final index = ((sorted.length - 1) * 0.99).floor(); + return sorted[index]; + } + + String getStatsString() { + final buffer = StringBuffer(); + buffer.writeln(name); + buffer.writeln('Samples: $count'); + buffer.writeln('Min: ${min.toStringAsFixed(2)} ms'); + buffer.writeln('Avg: ${mean.toStringAsFixed(2)} ms'); + buffer.writeln('P50: ${p50.toStringAsFixed(2)} ms'); + buffer.writeln('P95: ${p95.toStringAsFixed(2)} ms'); + buffer.writeln('P99: ${p99.toStringAsFixed(2)} ms'); + buffer.writeln('Max: ${max.toStringAsFixed(2)} ms'); + return buffer.toString(); + } + + void printStats() { + print(getStatsString()); + } + + void clear() { + _measurements.clear(); + } + + Map toJson() => { + 'name': name, + 'samples': count, + 'min': min, + 'avg': mean, + 'p50': p50, + 'p95': p95, + 'p99': p99, + 'max': max, + 'measurements': _measurements, + }; +} + +/// Result of a latency test run +class LatencyTestResult { + final LatencyStats stats; + final int successCount; + final int failureCount; + final List errors; + final bool usedV1Path; + + LatencyTestResult({ + required this.stats, + required this.successCount, + required this.failureCount, + required this.errors, + required this.usedV1Path, + }); + + bool get hasErrors => errors.isNotEmpty; + + @override + String toString() { + final buffer = StringBuffer(); + buffer.writeln('=== Latency Test Results ==='); + buffer.writeln('Signaling Path: ${usedV1Path ? 'V1 (Single PC)' : 'V0 (Legacy)'}'); + buffer.writeln('Success: $successCount, Failed: $failureCount'); + buffer.writeln(); + buffer.write(stats.getStatsString()); + if (errors.isNotEmpty) { + buffer.writeln(); + buffer.writeln('Errors:'); + for (final error in errors) { + buffer.writeln(' - $error'); + } + } + return buffer.toString(); + } +} + +/// Callback for progress updates during the test +typedef LatencyTestProgressCallback = void Function( + int iteration, + int total, + double? latencyMs, + String? error, +); + +/// Run a connection latency test +/// +/// [url] - LiveKit server URL (e.g., 'wss://your-server.livekit.cloud') +/// [token] - Valid LiveKit access token +/// [iterations] - Number of test iterations (default: 5) +/// [delayBetweenIterations] - Delay between iterations for cleanup (default: 500ms) +/// [roomOptions] - Custom room options (optional) +/// [connectOptions] - Custom connect options (optional) +/// [onProgress] - Callback for progress updates (optional) +Future runLatencyTest({ + required String url, + required String token, + int iterations = 5, + Duration delayBetweenIterations = const Duration(milliseconds: 500), + RoomOptions roomOptions = const RoomOptions( + adaptiveStream: false, + dynacast: false, + ), + ConnectOptions connectOptions = const ConnectOptions( + autoSubscribe: true, + ), + LatencyTestProgressCallback? onProgress, +}) async { + print('\n=== Connection Latency Test ==='); + print('URL: $url'); + print('Iterations: $iterations'); + print(''); + + final stats = LatencyStats(name: 'Connection Time'); + final errors = []; + int successCount = 0; + int failureCount = 0; + bool? usedV1Path; + + for (int i = 0; i < iterations; i++) { + // Pass roomOptions to Room constructor (not to connect) + final room = Room(roomOptions: roomOptions); + + try { + final stopwatch = Stopwatch()..start(); + + await room.connect( + url, + token, + connectOptions: connectOptions, + ); + + stopwatch.stop(); + + if (room.connectionState == ConnectionState.connected) { + final latencyMs = stopwatch.elapsedMicroseconds / 1000.0; + stats.addMeasurement(latencyMs); + successCount++; + + // Detect which signaling path was used + usedV1Path ??= room.engine.signalClient.singlePcMode; + + final pathInfo = room.engine.signalClient.singlePcMode ? '[V1]' : '[V0]'; + print(' Iteration ${i + 1}: ${latencyMs.toStringAsFixed(2)} ms $pathInfo'); + + onProgress?.call(i + 1, iterations, latencyMs, null); + } else { + final error = 'Unexpected state: ${room.connectionState}'; + print(' Iteration ${i + 1}: FAILED - $error'); + errors.add('Iteration ${i + 1}: $error'); + failureCount++; + + onProgress?.call(i + 1, iterations, null, error); + } + } catch (e) { + final error = e.toString(); + print(' Iteration ${i + 1}: FAILED - $error'); + errors.add('Iteration ${i + 1}: $error'); + failureCount++; + + onProgress?.call(i + 1, iterations, null, error); + } finally { + await room.disconnect(); + await room.dispose(); + } + + // Delay between iterations + if (i < iterations - 1) { + await Future.delayed(delayBetweenIterations); + } + } + + print(''); + stats.printStats(); + + return LatencyTestResult( + stats: stats, + successCount: successCount, + failureCount: failureCount, + errors: errors, + usedV1Path: usedV1Path ?? false, + ); +} + +/// Run a comparison test between different configurations +Future> runLatencyComparison({ + required String url, + required String token, + int iterations = 5, + Duration delayBetweenIterations = const Duration(milliseconds: 500), + LatencyTestProgressCallback? onProgress, +}) async { + final results = {}; + + // Test with fast publish disabled + print('\n=== Testing WITHOUT Fast Publish ==='); + results['no_fast_publish'] = await runLatencyTest( + url: url, + token: token, + iterations: iterations, + delayBetweenIterations: delayBetweenIterations, + roomOptions: const RoomOptions( + adaptiveStream: false, + dynacast: false, + fastPublish: false, + ), + connectOptions: const ConnectOptions(autoSubscribe: true), + onProgress: onProgress, + ); + + // Test with fast publish enabled + print('\n=== Testing WITH Fast Publish ==='); + results['fast_publish'] = await runLatencyTest( + url: url, + token: token, + iterations: iterations, + delayBetweenIterations: delayBetweenIterations, + roomOptions: const RoomOptions( + adaptiveStream: false, + dynacast: false, + fastPublish: true, + ), + connectOptions: const ConnectOptions(autoSubscribe: true), + onProgress: onProgress, + ); + + // Print comparison + print('\n=== Comparison Summary ==='); + final noFp = results['no_fast_publish']!; + final fp = results['fast_publish']!; + + if (noFp.stats.count > 0 && fp.stats.count > 0) { + print('Without Fast Publish: Avg ${noFp.stats.mean.toStringAsFixed(2)} ms'); + print('With Fast Publish: Avg ${fp.stats.mean.toStringAsFixed(2)} ms'); + + final diff = noFp.stats.mean - fp.stats.mean; + if (diff > 0) { + print('Fast Publish is ${diff.toStringAsFixed(2)} ms faster (${(diff / noFp.stats.mean * 100).toStringAsFixed(1)}%)'); + } else if (diff < 0) { + print('No Fast Publish is ${(-diff).toStringAsFixed(2)} ms faster'); + } + } + + return results; +} + +// ============================================================================ +// CONFIGURATION - EDIT THESE VALUES FOR DEVICE TESTING +// ============================================================================ + +/// Default test configuration for device testing. +/// Edit these values or override them when calling the test functions. +class LatencyTestConfig { + // TODO: Replace with your LiveKit server URL + static const String url = 'wss://your-server.livekit.cloud'; + + // TODO: Replace with a valid token + // You can generate a token using: + // - LiveKit CLI: livekit-cli create-token --api-key --api-secret --join --room test --identity user + // - LiveKit Cloud dashboard + // - Your backend server + static const String token = 'your_token_here'; + + // Number of iterations for the test + static const int iterations = 5; + + // Whether the config is set up (check before running) + static bool get isConfigured => + url != 'wss://your-server.livekit.cloud' && token != 'your_token_here' && token.isNotEmpty; +} + +// ============================================================================ +// MAIN - Uncomment to run as standalone script +// ============================================================================ + +/// Standalone entry point for running latency tests on device. +/// +/// To use: +/// 1. Edit LatencyTestConfig above with your credentials +/// 2. Run: flutter run -t lib/latency_test.dart +/// +/// Or import this file and call runLatencyTest() directly. +void main() async { + // Initialize Flutter binding for platform channels + WidgetsFlutterBinding.ensureInitialized(); + + print('LiveKit Connection Latency Test'); + print('================================\n'); + + if (!LatencyTestConfig.isConfigured) { + print('ERROR: Test not configured!'); + print(''); + print('Please edit LatencyTestConfig in this file:'); + print(' - Set url to your LiveKit server URL'); + print(' - Set token to a valid access token'); + print(''); + print('Example:'); + print(' static const String url = \'wss://my-app.livekit.cloud\';'); + print(' static const String token = \'eyJhbGciOiJIUzI1NiIs...\';'); + return; + } + + // Run the latency test + final result = await runLatencyTest( + url: LatencyTestConfig.url, + token: LatencyTestConfig.token, + iterations: LatencyTestConfig.iterations, + ); + + print('\n'); + print(result); + + // Optionally run comparison test + // final comparison = await runLatencyComparison( + // url: LatencyTestConfig.url, + // token: LatencyTestConfig.token, + // iterations: LatencyTestConfig.iterations, + // ); +} diff --git a/example/pubspec.yaml b/example/pubspec.yaml index 38fd761a1..28034b355 100644 --- a/example/pubspec.yaml +++ b/example/pubspec.yaml @@ -28,6 +28,8 @@ dependencies: dev_dependencies: flutter_test: sdk: flutter + integration_test: + sdk: flutter flutter_lints: ^4.0.0 # The following section is specific to Flutter. diff --git a/lib/src/core/engine.dart b/lib/src/core/engine.dart index 2b1794d65..16c511f0c 100644 --- a/lib/src/core/engine.dart +++ b/lib/src/core/engine.dart @@ -278,11 +278,16 @@ class Engine extends Disposable with EventsEmittable { Future cleanUp() async { logger.fine('[$objectId] cleanUp()'); + // In single PC mode, subscriber == publisher, so only dispose once + final isSinglePcMode = subscriber == publisher; + await publisher?.dispose(); publisher = null; _hasPublished = false; - await subscriber?.dispose(); + if (!isSinglePcMode) { + await subscriber?.dispose(); + } subscriber = null; await signalClient.cleanUp(); @@ -315,13 +320,18 @@ class Engine extends Disposable with EventsEmittable { } @internal - Future negotiate({bool? iceRestart}) async { + Future negotiate({bool? iceRestart, bool immediate = false}) async { if (publisher == null) { return; } _hasPublished = true; try { - publisher!.negotiate(null); + if (immediate || iceRestart == true) { + final options = iceRestart == true ? const RTCOfferOptions(iceRestart: true) : null; + await publisher!.createAndSendOffer(options); + } else { + publisher!.negotiate(null); + } } catch (error) { if (error is NegotiationError) { fullReconnectOnNext = true; @@ -409,7 +419,10 @@ class Engine extends Disposable with EventsEmittable { // construct the data channel message var message = rtc.RTCDataChannelMessage.fromBinary(packet.writeToBuffer()); - if (_subscriberPrimary) { + // In subscriber-primary mode (legacy dual PC) and in single PC mode, data can be + // sent right after connect() returns while publisher/DC are still finishing setup. + // Ensure publisher transport and target data channel are ready before sending. + if (_subscriberPrimary || signalClient.singlePcMode) { // make sure publisher transport is connected await ensurePublisherConnected(); @@ -493,7 +506,7 @@ class Engine extends Disposable with EventsEmittable { // start negotiation if (state != rtc.RTCPeerConnectionState.RTCPeerConnectionStateConnecting) { - await negotiate(); + await negotiate(immediate: true); } if (!lkPlatformIsTest()) { logger.fine('Waiting for publisher to ice-connect...'); @@ -622,10 +635,24 @@ class Engine extends Disposable with EventsEmittable { } Future _createPeerConnections(RTCConfiguration rtcConfiguration) async { + // Always create publisher publisher = await Transport.create(_peerConnectionCreate, rtcConfig: rtcConfiguration, connectOptions: connectOptions); - subscriber = - await Transport.create(_peerConnectionCreate, rtcConfig: rtcConfiguration, connectOptions: connectOptions); + + // Set single PC mode flag on publisher transport + publisher?.singlePcMode = signalClient.singlePcMode; + logger.fine('Creating PeerConnections, singlePcMode: ${signalClient.singlePcMode}'); + + // In single PC mode, we only use the publisher PeerConnection for both pub and sub + // The subscriber reference points to the same transport as publisher + if (signalClient.singlePcMode) { + subscriber = publisher; + logger.fine('Single PC mode: using publisher for both pub and sub'); + } else { + // In dual PC mode (legacy V0), create a separate subscriber PeerConnection + subscriber = + await Transport.create(_peerConnectionCreate, rtcConfig: rtcConfiguration, connectOptions: connectOptions); + } publisher?.pc.onIceCandidate = (rtc.RTCIceCandidate candidate) { logger.fine('publisher onIceCandidate'); @@ -639,17 +666,20 @@ class Engine extends Disposable with EventsEmittable { } }; - subscriber?.pc.onIceCandidate = (rtc.RTCIceCandidate candidate) { - logger.fine('subscriber onIceCandidate'); - signalClient.sendIceCandidate(candidate, lk_rtc.SignalTarget.SUBSCRIBER); - }; + // Only set up subscriber-specific callbacks in dual PC mode + if (!signalClient.singlePcMode) { + subscriber?.pc.onIceCandidate = (rtc.RTCIceCandidate candidate) { + logger.fine('subscriber onIceCandidate'); + signalClient.sendIceCandidate(candidate, lk_rtc.SignalTarget.SUBSCRIBER); + }; - subscriber?.pc.onIceConnectionState = (rtc.RTCIceConnectionState state) async { - logger.fine('subscriber iceConnectionState: $state'); - if (state == rtc.RTCIceConnectionState.RTCIceConnectionStateConnected) { - await _handleGettingConnectedServerAddress(subscriber!.pc); - } - }; + subscriber?.pc.onIceConnectionState = (rtc.RTCIceConnectionState state) async { + logger.fine('subscriber iceConnectionState: $state'); + if (state == rtc.RTCIceConnectionState.RTCIceConnectionStateConnected) { + await _handleGettingConnectedServerAddress(subscriber!.pc); + } + }; + } publisher?.onOffer = (offer) { logger.fine('publisher onOffer'); @@ -661,18 +691,21 @@ class Engine extends Disposable with EventsEmittable { subscriber?.pc.onDataChannel = _onDataChannel; } - subscriber?.pc.onConnectionState = (state) async { - events.emit(EngineSubscriberPeerStateUpdatedEvent( - state: state, - isPrimary: _subscriberPrimary, - )); - logger.fine('subscriber connectionState: $state'); - if (state.isDisconnected() || state.isFailed()) { - await handleReconnect(state.isFailed() - ? ClientDisconnectReason.peerConnectionFailed - : ClientDisconnectReason.peerConnectionClosed); - } - }; + // In single PC mode, connection state events come from publisher only + if (!signalClient.singlePcMode) { + subscriber?.pc.onConnectionState = (state) async { + events.emit(EngineSubscriberPeerStateUpdatedEvent( + state: state, + isPrimary: _subscriberPrimary, + )); + logger.fine('subscriber connectionState: $state'); + if (state.isDisconnected() || state.isFailed()) { + await handleReconnect(state.isFailed() + ? ClientDisconnectReason.peerConnectionFailed + : ClientDisconnectReason.peerConnectionClosed); + } + }; + } publisher?.pc.onConnectionState = (state) async { if ([ @@ -686,6 +719,13 @@ class Engine extends Disposable with EventsEmittable { state: state, isPrimary: !_subscriberPrimary, )); + // In single PC mode, also emit subscriber state event since they're the same PC + if (signalClient.singlePcMode) { + events.emit(EngineSubscriberPeerStateUpdatedEvent( + state: state, + isPrimary: _subscriberPrimary, + )); + } logger.fine('publisher connectionState: $state'); if (state.isDisconnected() || state.isFailed()) { await handleReconnect(state.isFailed() @@ -694,7 +734,10 @@ class Engine extends Disposable with EventsEmittable { } }; - subscriber?.pc.onTrack = (rtc.RTCTrackEvent event) { + // In single PC mode, onTrack is set on publisher (which is the same as subscriber) + // In dual PC mode, onTrack is set on subscriber + final pcForTracks = signalClient.singlePcMode ? publisher : subscriber; + pcForTracks?.pc.onTrack = (rtc.RTCTrackEvent event) { logger.fine('[WebRTC] pc.onTrack'); final stream = event.streams.firstOrNull; @@ -743,7 +786,7 @@ class Engine extends Disposable with EventsEmittable { }; // doesn't get called reliably, doesn't work on mac - subscriber?.pc.onRemoveTrack = (rtc.MediaStream stream, rtc.MediaStreamTrack track) { + pcForTracks?.pc.onRemoveTrack = (rtc.MediaStream stream, rtc.MediaStreamTrack track) { logger.fine('[WebRTC] ${track.id} pc.onRemoveTrack'); }; @@ -1099,12 +1142,14 @@ class Engine extends Disposable with EventsEmittable { events.emit(const EngineResumingEvent()); // wait for socket to connect rtc server + // Use the same URL path mode as the initial connection await signalClient.connect( url!, token!, connectOptions: connectOptions, roomOptions: roomOptions, reconnect: true, + useV1Path: signalClient.singlePcMode, // Preserve the URL path mode from initial connection ); await events.waitFor( @@ -1155,12 +1200,19 @@ class Engine extends Disposable with EventsEmittable { await signalClient.cleanUp(); } + // Reset single PC mode flag on full reconnect + // In single PC mode, subscriber == publisher, so only dispose once + final isSinglePcMode = subscriber == publisher; + signalClient.resetSinglePcMode(); + await publisher?.dispose(); publisher = null; _resetPublisherConnection(); - await subscriber?.dispose(); + if (!isSinglePcMode) { + await subscriber?.dispose(); + } subscriber = null; _reliableDCSub = null; @@ -1235,6 +1287,33 @@ class Engine extends Disposable with EventsEmittable { signalClient.sendQueuedRequests(); }); + Future _addSinglePcRecvonlyTransceivers() async { + final transport = publisher; + if (transport == null) { + return; + } + + const mediaKinds = [ + rtc.RTCRtpMediaType.RTCRtpMediaTypeAudio, + rtc.RTCRtpMediaType.RTCRtpMediaTypeVideo, + ]; + + for (final kind in mediaKinds) { + try { + final transceiverInit = rtc.RTCRtpTransceiverInit( + direction: rtc.TransceiverDirection.RecvOnly, + ); + await transport.pc.addTransceiver( + kind: kind, + init: transceiverInit, + ); + logger.fine('Added recvonly $kind transceiver for single PC mode'); + } catch (e) { + logger.warning('Failed to add recvonly $kind transceiver: $e'); + } + } + } + void _setUpSignalListeners() => _signalListener ..on((event) async { // create peer connections @@ -1261,13 +1340,21 @@ class Engine extends Disposable with EventsEmittable { await _createPeerConnections(rtcConfiguration); } - if (!_subscriberPrimary || event.response.fastPublish) { + // In single PC mode or fastPublish, trigger negotiation immediately + final shouldNegotiateImmediately = !_subscriberPrimary || event.response.fastPublish || signalClient.singlePcMode; + + if (shouldNegotiateImmediately) { _enabledPublishCodecs = event.response.enabledPublishCodecs; - /// for subscriberPrimary, we negotiate when necessary (lazy) - /// and if `response.fastPublish == true`, we need to negotiate - /// immediately - await negotiate(); + // In single PC mode, add recvonly media transceivers before negotiation + // so the initial offer advertises receive capability for remote tracks. + if (signalClient.singlePcMode && publisher != null) { + await _addSinglePcRecvonlyTransceivers(); + } + + /// For single PC / fast publish startup, kick off negotiation immediately + /// without blocking synchronized signal-event handling. + unawaited(negotiate(immediate: true)); } events.emit(EngineJoinResponseEvent(response: event.response)); @@ -1294,7 +1381,7 @@ class Engine extends Disposable with EventsEmittable { await subscriber?.pc.setConfiguration(rtcConfiguration.toMap()); if (!_subscriberPrimary) { - await negotiate(); + await negotiate(immediate: true); } // Handle reliable message resending diff --git a/lib/src/core/signal_client.dart b/lib/src/core/signal_client.dart index 93f0f51d9..33ed4b299 100644 --- a/lib/src/core/signal_client.dart +++ b/lib/src/core/signal_client.dart @@ -57,6 +57,10 @@ class SignalClient extends Disposable with EventsEmittable { int _pingCount = 0; String? participantSid; + /// Whether the connection is in single PC mode (using V1 signaling path). + bool _singlePcMode = false; + bool get singlePcMode => _singlePcMode; + List _connectivityResult = []; StreamSubscription>? _connectivitySubscription; @@ -93,6 +97,7 @@ class SignalClient extends Disposable with EventsEmittable { required ConnectOptions connectOptions, required RoomOptions roomOptions, bool reconnect = false, + bool? useV1Path, }) async { if (!kIsWeb && !lkPlatformIsTest()) { _connectivityResult = await Connectivity().checkConnectivity(); @@ -118,7 +123,15 @@ class SignalClient extends Disposable with EventsEmittable { } } - final rtcUri = await Utils.buildUri( + // Determine whether to try V1 path (single PC mode) + // - Use explicit useV1Path if provided (for reconnection) + // - Otherwise, use singlePeerConnection option from connectOptions + // - V1 is only supported on non-web platforms + // - On web, always use V0 regardless of the option + final tryV1Path = useV1Path ?? (!kIsWeb && connectOptions.singlePeerConnection); + + // Build URLs for both V0 and V1 paths + final rtcUriV0 = await Utils.buildUri( uriString, token: token, connectOptions: connectOptions, @@ -127,7 +140,23 @@ class SignalClient extends Disposable with EventsEmittable { sid: reconnect ? participantSid : null, ); - logger.fine('SignalClient connecting with url: $rtcUri'); + Uri? rtcUriV1; + if (tryV1Path) { + rtcUriV1 = await Utils.buildUriV1( + uriString, + token: token, + connectOptions: connectOptions, + roomOptions: roomOptions, + reconnect: reconnect, + sid: reconnect ? participantSid : null, + ); + } + + // Use V1 path first if trying V1, otherwise use V0 + final rtcUri = tryV1Path && rtcUriV1 != null ? rtcUriV1 : rtcUriV0; + + logger.info('SignalClient connecting with ${tryV1Path ? 'V1-first' : 'V0'} signaling path'); + logger.fine('SignalClient connecting with url: $rtcUri (tryV1Path: $tryV1Path)'); try { if (reconnect == true) { @@ -139,20 +168,74 @@ class SignalClient extends Disposable with EventsEmittable { } // Clean up existing socket await cleanUp(); - // Attempt to connect - var future = _wsConnector( - rtcUri, - options: WebSocketEventHandlers( - onData: _onSocketData, - onDispose: _onSocketDispose, - onError: _onSocketError, - ), - headers: { - 'Authorization': 'Bearer $token', - }, - ); - future = future.timeout(connectOptions.timeouts.connection); - _ws = await future; + + // Attempt to connect (try V1 first, fallback to V0 if not supported) + bool usedV1Path = tryV1Path && rtcUriV1 != null; + try { + if (usedV1Path) { + logger.fine('Attempting V1 path connection...'); + } + var future = _wsConnector( + rtcUri, + options: WebSocketEventHandlers( + onData: _onSocketData, + onDispose: _onSocketDispose, + onError: _onSocketError, + ), + headers: { + 'Authorization': 'Bearer $token', + }, + ); + // Use a shorter timeout for V1 attempt so we can fallback to V0 quickly + // if the server doesn't support V1 (404 errors should be fast, but use + // a 5 second timeout as a safety net) + final v1Timeout = usedV1Path + ? const Duration(seconds: 5) + : connectOptions.timeouts.connection; + future = future.timeout(v1Timeout); + _ws = await future; + } catch (v1Error) { + // If V1 path fails and we tried V1, fallback to V0 + // This handles 404 errors (server doesn't support V1) and other connection failures + if (usedV1Path) { + // Check if this is likely a 404/not-found error (quick failure) + // vs a timeout or network error (slow failure) + final errorStr = v1Error.toString().toLowerCase(); + final isNotFoundError = errorStr.contains('404') || + errorStr.contains('not found') || + errorStr.contains('upgrade') || + errorStr.contains('handshake'); + + if (isNotFoundError) { + logger.info('V1 path not supported (404), falling back to V0 path'); + } else { + logger.warning('V1 path failed with error, falling back to V0 path: $v1Error'); + } + usedV1Path = false; + + var future = _wsConnector( + rtcUriV0, + options: WebSocketEventHandlers( + onData: _onSocketData, + onDispose: _onSocketDispose, + onError: _onSocketError, + ), + headers: { + 'Authorization': 'Bearer $token', + }, + ); + future = future.timeout(connectOptions.timeouts.connection); + _ws = await future; + } else { + rethrow; + } + } + + // Track whether we're in single PC mode + _singlePcMode = usedV1Path; + logger.info('SignalClient connected using ${_singlePcMode ? 'V1' : 'V0'} signaling path'); + logger.fine('SignalClient connected, singlePcMode: $_singlePcMode'); + // Successful connection _connectionState = ConnectionState.connected; events.emit(const SignalConnectedEvent()); @@ -170,7 +253,7 @@ class SignalClient extends Disposable with EventsEmittable { connectOptions: connectOptions, roomOptions: roomOptions, validate: true, - forceSecure: rtcUri.isSecureScheme, + forceSecure: rtcUriV0.isSecureScheme, ); final validateResponse = await http.get( @@ -211,6 +294,13 @@ class SignalClient extends Disposable with EventsEmittable { _ws = null; _queue.clear(); _clearPingInterval(); + // Note: Don't reset _singlePcMode here as it's needed for reconnection + } + + /// Reset single PC mode flag. Called during full reconnect. + @internal + void resetSinglePcMode() { + _singlePcMode = false; } void _sendRequest( diff --git a/lib/src/core/transport.dart b/lib/src/core/transport.dart index 743e9eeb7..67c496854 100644 --- a/lib/src/core/transport.dart +++ b/lib/src/core/transport.dart @@ -29,6 +29,93 @@ import '../utils.dart'; const ddExtensionURI = 'https://aomediacodec.github.io/av1-rtp-spec/#dependency-descriptor-rtp-header-extension'; +/// Munge SDP to change `a=inactive` to `a=recvonly` for all active media m-lines. +/// +/// This is needed because libwebrtc can generate `a=inactive` instead of +/// `a=recvonly` when a transceiver is added with `direction: RecvOnly`. +/// That causes the server to not send media. Browser WebRTC correctly +/// generates `a=recvonly`. +/// +/// This updates all non-rejected RTP media sections and leaves data channels +/// and rejected m-lines unchanged. +String mungeInactiveToRecvonlyForMediaSections(String sdp) { + final parsedSdp = sdp_transform.parse(sdp); + final mediaSections = parsedSdp['media']; + if (mediaSections is! List) { + return sdp; + } + + var changed = false; + for (final media in mediaSections.whereType>()) { + final type = media['type']; + final port = media['port']; + final direction = media['direction']; + + final isRejected = port == 0; + final isDataChannel = type == 'application'; + if (isRejected || isDataChannel || direction != 'inactive') { + continue; + } + + media['direction'] = 'recvonly'; + changed = true; + } + + if (!changed) { + return sdp; + } + + return sdp_transform.write(parsedSdp, null); +} + +/// Munge SDP to add stereo=1 to opus fmtp lines. +/// +/// In single PC mode, the receiver sends the offer and doesn't know if the +/// sender will send stereo. Without stereo=1, audio may not work correctly. +/// This adds stereo=1 to all opus codec fmtp lines. +String mungeStereoForAudio(String sdp) { + final lineEnding = sdp.contains('\r\n') ? '\r\n' : '\n'; + final lines = sdp.split(lineEnding); + + // Find opus payload types from rtpmap lines + final opusPayloadTypes = {}; + final rtpmapRegex = RegExp(r'^a=rtpmap:(\d+)\s+opus/'); + + for (final line in lines) { + final match = rtpmapRegex.firstMatch(line); + if (match != null) { + opusPayloadTypes.add(int.parse(match.group(1)!)); + } + } + + if (opusPayloadTypes.isEmpty) { + return sdp; + } + + // Add stereo=1 to opus fmtp lines + for (int i = 0; i < lines.length; i++) { + final line = lines[i]; + if (!line.startsWith('a=fmtp:')) continue; + + // Extract payload type from fmtp line + final fmtpMatch = RegExp(r'^a=fmtp:(\d+)\s+(.*)$').firstMatch(line); + if (fmtpMatch == null) continue; + + final payloadType = int.parse(fmtpMatch.group(1)!); + if (!opusPayloadTypes.contains(payloadType)) continue; + + final params = fmtpMatch.group(2)!; + + // Check if stereo is already set + if (params.contains('stereo=')) continue; + + // Add stereo=1 to the fmtp line + lines[i] = 'a=fmtp:$payloadType $params;stereo=1'; + } + + return lines.join(lineEnding); +} + /* The svc codec (av1/vp9) would use a very low bitrate at the begining and increase slowly by the bandwidth estimator until it reach the target bitrate. The process commonly cost more than 10 seconds cause subscriber will get blur video at @@ -65,6 +152,10 @@ class Transport extends Disposable { Function? _cancelDebounce; ConnectOptions connectOptions; + /// Whether this transport is in single PC mode. + /// When true, SDP munging is applied for recvonly media sections and stereo support. + bool singlePcMode = false; + // private constructor Transport._(this.pc, this.connectOptions) { // @@ -215,7 +306,18 @@ class Transport extends Disposable { }); try { - await setMungedSDP(sd: offer, munged: sdp_transform.write(sdpParsed, null)); + var mungedSdp = sdp_transform.write(sdpParsed, null); + + // Apply single PC mode SDP munging if enabled + if (singlePcMode) { + // Munge a=inactive to a=recvonly for recvonly media sections. + mungedSdp = mungeInactiveToRecvonlyForMediaSections(mungedSdp); + // Add stereo=1 to opus fmtp lines for proper stereo support + mungedSdp = mungeStereoForAudio(mungedSdp); + logger.fine('Applied single PC mode SDP munging'); + } + + await setMungedSDP(sd: offer, munged: mungedSdp); } catch (e) { throw NegotiationError(e.toString()); } diff --git a/lib/src/options.dart b/lib/src/options.dart index af31ce5fd..098be796b 100644 --- a/lib/src/options.dart +++ b/lib/src/options.dart @@ -61,11 +61,24 @@ class ConnectOptions { final Timeouts timeouts; + /// Whether to use single PeerConnection mode (V1 signaling path). + /// + /// When enabled, the SDK uses a single PeerConnection for both publishing + /// and subscribing, which can reduce connection time and improve reliability. + /// + /// This is only supported on native platforms (iOS, Android, macOS, Windows, Linux). + /// On web platforms, this option is ignored and the legacy dual PeerConnection + /// mode is always used. + /// + /// Defaults to true. + final bool singlePeerConnection; + const ConnectOptions({ this.autoSubscribe = true, this.rtcConfiguration = const RTCConfiguration(), this.protocolVersion = ProtocolVersion.v12, this.timeouts = Timeouts.defaultTimeouts, + this.singlePeerConnection = true, }); } diff --git a/lib/src/participant/local.dart b/lib/src/participant/local.dart index 432c7883f..60a97f5d1 100644 --- a/lib/src/participant/local.dart +++ b/lib/src/participant/local.dart @@ -1095,6 +1095,17 @@ extension RPCMethods on LocalParticipant { Future performRpc(PerformRpcParams params) async { final requestId = Uuid().v4(); final completer = Completer(); + void completeOnceError(Object error) { + if (!completer.isCompleted) { + completer.completeError(error); + } + } + + void completeOnceValue(String value) { + if (!completer.isCompleted) { + completer.complete(value); + } + } final maxRoundTripLatency = Duration(seconds: 7); final minEffectiveTimeout = const Duration(milliseconds: 1000); @@ -1115,7 +1126,8 @@ extension RPCMethods on LocalParticipant { ); final ackTimer = Timer(maxRoundTripLatency, () { - completer.completeError(RpcError.builtIn(RpcError.connectionTimeout)); + completeOnceError(RpcError.builtIn(RpcError.connectionTimeout)); + _pendingAcks.remove(requestId); _pendingResponses.remove(requestId); }); @@ -1124,16 +1136,17 @@ extension RPCMethods on LocalParticipant { }; final responseTimer = Timer(params.responseTimeoutMs, () { - completer.completeError(RpcError.builtIn(RpcError.responseTimeout)); + completeOnceError(RpcError.builtIn(RpcError.responseTimeout)); + _pendingAcks.remove(requestId); _pendingResponses.remove(requestId); }); _pendingResponses[requestId] = (String? response, RpcError? error) { responseTimer.cancel(); if (error != null) { - completer.completeError(error); + completeOnceError(error); } else { - completer.complete(response!); + completeOnceValue(response!); } ackTimer.cancel(); _pendingAcks.remove(requestId); diff --git a/lib/src/utils.dart b/lib/src/utils.dart index aa8259ea3..c4f38ebdf 100644 --- a/lib/src/utils.dart +++ b/lib/src/utils.dart @@ -26,6 +26,7 @@ import 'package:flutter_webrtc/flutter_webrtc.dart' as rtc; import 'package:meta/meta.dart'; import './proto/livekit_models.pb.dart' as lk_models; +import './proto/livekit_rtc.pb.dart' as lk_rtc; import './support/native.dart'; import 'extensions.dart'; import 'livekit.dart'; @@ -44,6 +45,65 @@ extension UriExt on Uri { bool get isSecureScheme => ['https', 'wss'].contains(scheme); } +/// Creates a base64-encoded join_request parameter for V1 URL path. +/// +/// This wraps a JoinRequest protobuf into a WrappedJoinRequest and encodes it +/// as base64 for use as a URL query parameter in single PC mode. +@internal +Future createJoinRequestParam({ + required ConnectOptions connectOptions, + required RoomOptions roomOptions, + bool reconnect = false, + lk_models.ReconnectReason? reconnectReason, + String? participantSid, +}) async { + final clientInfo = await Utils._clientInfo(); + final networkType = await Utils.getNetworkType(); + + // Build ClientInfo protobuf + final clientInfoProto = lk_models.ClientInfo( + sdk: lk_models.ClientInfo_SDK.FLUTTER, + version: LiveKitClient.version, + protocol: int.parse(connectOptions.protocolVersion.toStringValue()), + os: clientInfo?.os, + osVersion: clientInfo?.osVersion, + deviceModel: clientInfo?.deviceModel, + browser: clientInfo?.browser, + browserVersion: clientInfo?.browserVersion, + network: networkType, + ); + + // Build ConnectionSettings protobuf + final connectionSettings = lk_rtc.ConnectionSettings( + autoSubscribe: connectOptions.autoSubscribe, + adaptiveStream: roomOptions.adaptiveStream, + ); + + // Build JoinRequest protobuf + final joinRequest = lk_rtc.JoinRequest( + clientInfo: clientInfoProto, + connectionSettings: connectionSettings, + reconnect: reconnect, + ); + + if (reconnectReason != null) { + joinRequest.reconnectReason = reconnectReason; + } + + if (participantSid != null && participantSid.isNotEmpty) { + joinRequest.participantSid = participantSid; + } + + // Wrap JoinRequest in WrappedJoinRequest + final wrappedJoinRequest = lk_rtc.WrappedJoinRequest( + compression: lk_rtc.WrappedJoinRequest_Compression.NONE, + joinRequest: joinRequest.writeToBuffer(), + ); + + // Base64 encode + return base64Encode(wrappedJoinRequest.writeToBuffer()); +} + typedef RetryFuture = Future Function( int triesLeft, List errors, @@ -156,6 +216,9 @@ class Utils { return null; } + /// Build URL for V0 path (legacy signaling). + /// + /// Uses query parameters for connection options. @internal static Future buildUri( String uriString, { @@ -208,6 +271,52 @@ class Utils { ); } + /// Build URL for V1 path (single PC mode signaling). + /// + /// Uses a join_request query parameter containing a base64-encoded + /// WrappedJoinRequest protobuf with all connection options. + @internal + static Future buildUriV1( + String uriString, { + required String token, + required ConnectOptions connectOptions, + required RoomOptions roomOptions, + bool reconnect = false, + bool forceSecure = false, + String? sid, + lk_models.ReconnectReason? reconnectReason, + }) async { + final Uri uri = Uri.parse(uriString); + + final useSecure = uri.isSecureScheme || forceSecure; + final wsScheme = useSecure ? 'wss' : 'ws'; + + final pathSegments = List.from(uri.pathSegments); + + // strip path segment used for LiveKit if already exists + pathSegments.removeWhere((e) => e.isEmpty); + // Use V1 path for single PC mode + pathSegments.addAll(['rtc', 'v1']); + + // Create base64-encoded JoinRequest parameter + final joinRequestParam = await createJoinRequestParam( + connectOptions: connectOptions, + roomOptions: roomOptions, + reconnect: reconnect, + reconnectReason: reconnectReason, + participantSid: sid, + ); + + return uri.replace( + scheme: wsScheme, + pathSegments: pathSegments, + queryParameters: { + if (kIsWeb) 'access_token': token, + 'join_request': joinRequestParam, + }, + ); + } + static List _presetsForDimensions({ required bool isScreenShare, required VideoDimensions dimensions, diff --git a/pubspec.lock b/pubspec.lock index d5b313878..2b03d1495 100644 --- a/pubspec.lock +++ b/pubspec.lock @@ -278,6 +278,11 @@ packages: description: flutter source: sdk version: "0.0.0" + flutter_driver: + dependency: transitive + description: flutter + source: sdk + version: "0.0.0" flutter_test: dependency: "direct dev" description: flutter @@ -304,6 +309,11 @@ packages: url: "https://pub.dev" source: hosted version: "4.0.0" + fuchsia_remote_debug_protocol: + dependency: transitive + description: flutter + source: sdk + version: "0.0.0" glob: dependency: transitive description: @@ -352,6 +362,11 @@ packages: url: "https://pub.dev" source: hosted version: "4.6.0" + integration_test: + dependency: "direct dev" + description: flutter + source: sdk + version: "0.0.0" io: dependency: transitive description: @@ -452,10 +467,10 @@ packages: dependency: "direct main" description: name: meta - sha256: "23f08335362185a5ea2ad3a4e597f1375e78bce8a040df5c600c8d3552ef2394" + sha256: e3641ec5d63ebf0d9b41bd43201a66e3fc79a65db5f61fc181f04cd27aab950c url: "https://pub.dev" source: hosted - version: "1.17.0" + version: "1.16.0" mime: dependency: transitive description: @@ -592,6 +607,14 @@ packages: url: "https://pub.dev" source: hosted version: "1.5.2" + process: + dependency: transitive + description: + name: process + sha256: c6248e4526673988586e8c00bb22a49210c258dc91df5227d5da9748ecf79744 + url: "https://pub.dev" + source: hosted + version: "5.0.5" protobuf: dependency: "direct main" description: @@ -701,6 +724,14 @@ packages: url: "https://pub.dev" source: hosted version: "1.4.1" + sync_http: + dependency: transitive + description: + name: sync_http + sha256: "7f0cd72eca000d2e026bcd6f990b81d0ca06022ef4e32fb257b30d3d1014a961" + url: "https://pub.dev" + source: hosted + version: "0.3.1" synchronized: dependency: "direct main" description: @@ -721,10 +752,10 @@ packages: dependency: transitive description: name: test_api - sha256: ab2726c1a94d3176a45960b6234466ec367179b87dd74f1611adb1f3b5fb9d55 + sha256: "522f00f556e73044315fa4585ec3270f1808a4b186c936e612cab0b565ff1e00" url: "https://pub.dev" source: hosted - version: "0.7.7" + version: "0.7.6" timing: dependency: transitive description: @@ -805,6 +836,14 @@ packages: url: "https://pub.dev" source: hosted version: "3.0.3" + webdriver: + dependency: transitive + description: + name: webdriver + sha256: "2f3a14ca026957870cfd9c635b83507e0e51d8091568e90129fbf805aba7cade" + url: "https://pub.dev" + source: hosted + version: "3.1.0" webrtc_interface: dependency: transitive description: diff --git a/pubspec.yaml b/pubspec.yaml index e20726593..18912ae9d 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -52,6 +52,8 @@ dependencies: dev_dependencies: flutter_test: sdk: flutter + integration_test: + sdk: flutter lints: ^6.0.0 mockito: ^5.3.2 import_sorter: ^4.6.0 diff --git a/test/integration/connection_latency_test.dart b/test/integration/connection_latency_test.dart new file mode 100644 index 000000000..c47a3e830 --- /dev/null +++ b/test/integration/connection_latency_test.dart @@ -0,0 +1,504 @@ +// Copyright 2024 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// Connection Latency Measurement Test +/// +/// IMPORTANT: This test requires native WebRTC plugins and CANNOT run with +/// `flutter test` (which runs in a pure Dart VM without native plugins). +/// +/// To run these tests, use one of these methods: +/// +/// 1. On a REAL DEVICE or EMULATOR using integration_test: +/// flutter test integration_test/connection_latency_test.dart +/// +/// 2. On DESKTOP (macOS/Linux/Windows): +/// flutter run -d macos -t test/integration/connection_latency_test.dart +/// +/// 3. Use the EXAMPLE APP version (recommended for devices): +/// Edit example/lib/latency_test.dart with your credentials, then: +/// cd example && flutter run -t lib/latency_test.dart +/// +/// Environment variables: +/// - LIVEKIT_URL: The LiveKit server URL +/// - LIVEKIT_CALLER_TOKEN: A valid token for authentication +/// - LIVEKIT_TEST_ITERATIONS: (optional) Number of test iterations (default: 5) + +@Timeout(Duration(minutes: 5)) +library; + +import 'dart:io'; +import 'dart:math' as math; + +import 'package:flutter_test/flutter_test.dart'; +import 'package:livekit_client/livekit_client.dart'; + +/// Statistics calculator for latency measurements +class LatencyStats { + final List _measurements = []; + + void addMeasurement(double latencyMs) { + _measurements.add(latencyMs); + } + + int get count => _measurements.length; + + double get min => _measurements.isEmpty ? 0 : _measurements.reduce(math.min); + + double get max => _measurements.isEmpty ? 0 : _measurements.reduce(math.max); + + double get mean { + if (_measurements.isEmpty) return 0; + return _measurements.reduce((a, b) => a + b) / _measurements.length; + } + + double get p50 { + if (_measurements.isEmpty) return 0; + final sorted = List.from(_measurements)..sort(); + final middle = sorted.length ~/ 2; + if (sorted.length.isOdd) { + return sorted[middle]; + } + return (sorted[middle - 1] + sorted[middle]) / 2; + } + + double get p95 { + if (_measurements.isEmpty) return 0; + final sorted = List.from(_measurements)..sort(); + final index = ((sorted.length - 1) * 0.95).floor(); + return sorted[index]; + } + + double get p99 { + if (_measurements.isEmpty) return 0; + final sorted = List.from(_measurements)..sort(); + final index = ((sorted.length - 1) * 0.99).floor(); + return sorted[index]; + } + + void printStats(String title) { + print('\n$title'); + print('Samples: $count'); + print('Min: ${min.toStringAsFixed(2)} ms'); + print('Avg: ${mean.toStringAsFixed(2)} ms'); + print('P50: ${p50.toStringAsFixed(2)} ms'); + print('P95: ${p95.toStringAsFixed(2)} ms'); + print('P99: ${p99.toStringAsFixed(2)} ms'); + print('Max: ${max.toStringAsFixed(2)} ms'); + } + + void clear() { + _measurements.clear(); + } +} + +/// Test configuration loaded from environment variables +class TestConfig { + final String url; + final String token; + final int iterations; + + TestConfig({ + required this.url, + required this.token, + required this.iterations, + }); + + static TestConfig? fromEnvironment() { + final url = Platform.environment['LIVEKIT_URL']; + final token = Platform.environment['LIVEKIT_CALLER_TOKEN']; + final iterationsStr = Platform.environment['LIVEKIT_TEST_ITERATIONS']; + + if (url == null || url.isEmpty) { + return null; + } + if (token == null || token.isEmpty) { + return null; + } + + final iterations = int.tryParse(iterationsStr ?? '5') ?? 5; + + return TestConfig( + url: url, + token: token, + iterations: iterations, + ); + } + + bool get isConfigured => url.isNotEmpty && token.isNotEmpty; +} + +void main() { + // Initialize Flutter binding for platform channels + TestWidgetsFlutterBinding.ensureInitialized(); + + final config = TestConfig.fromEnvironment(); + + /// Skip helper for tests when environment is not configured + void skipIfNotConfigured() { + if (config == null) { + print('\n*** SKIPPING TEST ***'); + print('Environment variables not configured.'); + print('Required: LIVEKIT_URL, LIVEKIT_CALLER_TOKEN'); + print('Optional: LIVEKIT_TEST_ITERATIONS (default: 5)'); + print(''); + print('Example:'); + print(' LIVEKIT_URL=wss://your-server.livekit.cloud \\'); + print(' LIVEKIT_CALLER_TOKEN=your_token \\'); + print(' dart test test/integration/connection_latency_test.dart'); + print(''); + } + } + + group('Connection Latency Measurement Tests', () { + test('Connection Time Measurement', () async { + skipIfNotConfigured(); + if (config == null) { + return; + } + + print('\n=== Connection Time Measurement Test ==='); + print('URL: ${config.url}'); + print('Iterations: ${config.iterations}'); + + final stats = LatencyStats(); + const roomOptions = RoomOptions( + adaptiveStream: false, + dynacast: false, + ); + const connectOptions = ConnectOptions( + autoSubscribe: true, + ); + + for (int i = 0; i < config.iterations; i++) { + final room = Room(); + + try { + final stopwatch = Stopwatch()..start(); + + await room.connect( + config.url, + config.token, + roomOptions: roomOptions, + connectOptions: connectOptions, + ); + + stopwatch.stop(); + + if (room.connectionState == ConnectionState.connected) { + final latencyMs = stopwatch.elapsedMicroseconds / 1000.0; + stats.addMeasurement(latencyMs); + print(' Iteration ${i + 1}: ${latencyMs.toStringAsFixed(2)} ms'); + } else { + print(' Iteration ${i + 1}: FAILED - unexpected state: ${room.connectionState}'); + } + } catch (e) { + print(' Iteration ${i + 1}: FAILED to connect - $e'); + } finally { + await room.disconnect(); + await room.dispose(); + } + + // Small delay between iterations to allow cleanup + await Future.delayed(const Duration(milliseconds: 500)); + } + + stats.printStats('Connection Time Statistics'); + + expect(stats.count, greaterThan(0), reason: 'At least one connection should succeed'); + }); + + test('Connection Time Comparison: V0 vs V1 Signaling', () async { + skipIfNotConfigured(); + if (config == null) { + return; + } + + print('\n=== Connection Time Comparison: V0 vs V1 Signaling ==='); + print('URL: ${config.url}'); + print('Iterations per path: ${config.iterations}'); + + final statsV0 = LatencyStats(); + final statsV1 = LatencyStats(); + + const roomOptions = RoomOptions( + adaptiveStream: false, + dynacast: false, + ); + + // Note: The V0/V1 path selection is automatic based on platform and server support. + // For mobile/desktop (non-web), it tries V1 first with fallback to V0. + // For web, it always uses V0. + // + // To test both paths explicitly, you would need to modify the SDK or use + // a test server that supports both paths. + + print('\nTesting default signaling path (auto-selected)...'); + print('Note: On non-web platforms, V1 is tried first with V0 fallback.'); + + for (int i = 0; i < config.iterations; i++) { + final room = Room(); + + try { + final stopwatch = Stopwatch()..start(); + + await room.connect( + config.url, + config.token, + roomOptions: roomOptions, + connectOptions: const ConnectOptions(autoSubscribe: true), + ); + + stopwatch.stop(); + + if (room.connectionState == ConnectionState.connected) { + final latencyMs = stopwatch.elapsedMicroseconds / 1000.0; + // Check which path was used by examining the signal client + final singlePcMode = room.engine.signalClient.singlePcMode; + if (singlePcMode) { + statsV1.addMeasurement(latencyMs); + print(' Iteration ${i + 1} [V1]: ${latencyMs.toStringAsFixed(2)} ms'); + } else { + statsV0.addMeasurement(latencyMs); + print(' Iteration ${i + 1} [V0]: ${latencyMs.toStringAsFixed(2)} ms'); + } + } else { + print(' Iteration ${i + 1}: FAILED - unexpected state: ${room.connectionState}'); + } + } catch (e) { + print(' Iteration ${i + 1}: FAILED to connect - $e'); + } finally { + await room.disconnect(); + await room.dispose(); + } + + await Future.delayed(const Duration(milliseconds: 500)); + } + + if (statsV0.count > 0) { + statsV0.printStats('V0 Signaling Path Statistics'); + } + + if (statsV1.count > 0) { + statsV1.printStats('V1 Signaling Path Statistics'); + } + + if (statsV0.count > 0 && statsV1.count > 0) { + print('\n=== Comparison Summary ==='); + print(' V0 Mean: ${statsV0.mean.toStringAsFixed(2)} ms'); + print(' V1 Mean: ${statsV1.mean.toStringAsFixed(2)} ms'); + final diff = statsV0.mean - statsV1.mean; + final percentDiff = (diff / statsV0.mean * 100).abs(); + if (diff > 0) { + print(' V1 is ${percentDiff.toStringAsFixed(1)}% faster'); + } else if (diff < 0) { + print(' V0 is ${percentDiff.toStringAsFixed(1)}% faster'); + } else { + print(' Both paths have similar latency'); + } + } + + final totalCount = statsV0.count + statsV1.count; + expect(totalCount, greaterThan(0), reason: 'At least one connection should succeed'); + }); + + test('Connection Time With Fast Publish', () async { + skipIfNotConfigured(); + if (config == null) { + return; + } + + print('\n=== Connection Time With Fast Publish ==='); + print('URL: ${config.url}'); + print('Iterations: ${config.iterations}'); + + final statsNormal = LatencyStats(); + final statsFastPublish = LatencyStats(); + + // Test without fast publish + print('\nWithout fast publish:'); + for (int i = 0; i < config.iterations; i++) { + final room = Room(); + + try { + final stopwatch = Stopwatch()..start(); + + await room.connect( + config.url, + config.token, + roomOptions: const RoomOptions( + adaptiveStream: false, + dynacast: false, + fastPublish: false, + ), + connectOptions: const ConnectOptions(autoSubscribe: true), + ); + + stopwatch.stop(); + + if (room.connectionState == ConnectionState.connected) { + final latencyMs = stopwatch.elapsedMicroseconds / 1000.0; + statsNormal.addMeasurement(latencyMs); + print(' Iteration ${i + 1}: ${latencyMs.toStringAsFixed(2)} ms'); + } else { + print(' Iteration ${i + 1}: FAILED - unexpected state: ${room.connectionState}'); + } + } catch (e) { + print(' Iteration ${i + 1}: FAILED to connect - $e'); + } finally { + await room.disconnect(); + await room.dispose(); + } + + await Future.delayed(const Duration(milliseconds: 500)); + } + + // Test with fast publish + print('\nWith fast publish:'); + for (int i = 0; i < config.iterations; i++) { + final room = Room(); + + try { + final stopwatch = Stopwatch()..start(); + + await room.connect( + config.url, + config.token, + roomOptions: const RoomOptions( + adaptiveStream: false, + dynacast: false, + fastPublish: true, + ), + connectOptions: const ConnectOptions(autoSubscribe: true), + ); + + stopwatch.stop(); + + if (room.connectionState == ConnectionState.connected) { + final latencyMs = stopwatch.elapsedMicroseconds / 1000.0; + statsFastPublish.addMeasurement(latencyMs); + print(' Iteration ${i + 1}: ${latencyMs.toStringAsFixed(2)} ms'); + } else { + print(' Iteration ${i + 1}: FAILED - unexpected state: ${room.connectionState}'); + } + } catch (e) { + print(' Iteration ${i + 1}: FAILED to connect - $e'); + } finally { + await room.disconnect(); + await room.dispose(); + } + + await Future.delayed(const Duration(milliseconds: 500)); + } + + if (statsNormal.count > 0) { + statsNormal.printStats('Normal Connection Statistics'); + } + + if (statsFastPublish.count > 0) { + statsFastPublish.printStats('Fast Publish Connection Statistics'); + } + + final totalCount = statsNormal.count + statsFastPublish.count; + expect(totalCount, greaterThan(0), reason: 'At least one connection should succeed'); + }); + + test('Reconnection Time Measurement', () async { + skipIfNotConfigured(); + if (config == null) { + return; + } + + print('\n=== Reconnection Time Measurement ==='); + print('URL: ${config.url}'); + print('Iterations: ${config.iterations}'); + + final statsInitial = LatencyStats(); + final statsReconnect = LatencyStats(); + + for (int i = 0; i < config.iterations; i++) { + final room = Room(); + + try { + // Measure initial connection + final stopwatchInitial = Stopwatch()..start(); + + await room.connect( + config.url, + config.token, + roomOptions: const RoomOptions( + adaptiveStream: false, + dynacast: false, + ), + connectOptions: const ConnectOptions(autoSubscribe: true), + ); + + stopwatchInitial.stop(); + + if (room.connectionState == ConnectionState.connected) { + final latencyMs = stopwatchInitial.elapsedMicroseconds / 1000.0; + statsInitial.addMeasurement(latencyMs); + print(' Iteration ${i + 1} [Initial]: ${latencyMs.toStringAsFixed(2)} ms'); + + // Disconnect and reconnect + await room.disconnect(); + await Future.delayed(const Duration(milliseconds: 200)); + + // Measure reconnection + final stopwatchReconnect = Stopwatch()..start(); + + await room.connect( + config.url, + config.token, + roomOptions: const RoomOptions( + adaptiveStream: false, + dynacast: false, + ), + connectOptions: const ConnectOptions(autoSubscribe: true), + ); + + stopwatchReconnect.stop(); + + if (room.connectionState == ConnectionState.connected) { + final reconnectLatencyMs = stopwatchReconnect.elapsedMicroseconds / 1000.0; + statsReconnect.addMeasurement(reconnectLatencyMs); + print(' Iteration ${i + 1} [Reconnect]: ${reconnectLatencyMs.toStringAsFixed(2)} ms'); + } else { + print(' Iteration ${i + 1} [Reconnect]: FAILED - unexpected state: ${room.connectionState}'); + } + } else { + print(' Iteration ${i + 1} [Initial]: FAILED - unexpected state: ${room.connectionState}'); + } + } catch (e) { + print(' Iteration ${i + 1}: FAILED - $e'); + } finally { + await room.disconnect(); + await room.dispose(); + } + + await Future.delayed(const Duration(milliseconds: 500)); + } + + if (statsInitial.count > 0) { + statsInitial.printStats('Initial Connection Statistics'); + } + + if (statsReconnect.count > 0) { + statsReconnect.printStats('Reconnection Statistics'); + } + + final totalCount = statsInitial.count + statsReconnect.count; + expect(totalCount, greaterThan(0), reason: 'At least one connection should succeed'); + }); + }); +}