From 072bae9c1fedfe0041b7417f2d7d35ffbf963a05 Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Tue, 10 Mar 2026 17:34:27 +0900 Subject: [PATCH 1/6] impl 1 --- lib/src/core/engine.dart | 255 +++++++++++++++++++++----------- lib/src/core/signal_client.dart | 50 +++++-- lib/src/core/transport.dart | 48 +++++- lib/src/internal/events.dart | 13 ++ lib/src/options.dart | 8 + lib/src/utils.dart | 49 ++++++ 6 files changed, 323 insertions(+), 100 deletions(-) diff --git a/lib/src/core/engine.dart b/lib/src/core/engine.dart index a88cff287..80f7d1bce 100644 --- a/lib/src/core/engine.dart +++ b/lib/src/core/engine.dart @@ -80,10 +80,11 @@ class Engine extends Disposable with EventsEmittable { Transport? subscriber; @internal - Transport? get primary => _subscriberPrimary ? subscriber : publisher; + Transport? get primary => _singlePCMode ? publisher : (_subscriberPrimary ? subscriber : publisher); - rtc.RTCDataChannel? get dataChannel => - _subscriberPrimary ? _reliableDCSub ?? _lossyDCSub : _reliableDCPub ?? _lossyDCPub; + rtc.RTCDataChannel? get dataChannel => _singlePCMode + ? _reliableDCPub ?? _lossyDCPub + : (_subscriberPrimary ? _reliableDCSub ?? _lossyDCSub : _reliableDCPub ?? _lossyDCPub); // data channels for packets rtc.RTCDataChannel? _reliableDCPub; @@ -109,6 +110,7 @@ class Engine extends Disposable with EventsEmittable { FastConnectOptions? fastConnectOptions; bool _subscriberPrimary = false; + bool _singlePCMode = false; String? _connectedServerAddress; String? get connectedServerAddress => _connectedServerAddress; @@ -286,6 +288,8 @@ class Engine extends Disposable with EventsEmittable { await subscriber?.dispose(); subscriber = null; + _singlePCMode = false; + await signalClient.cleanUp(); fullReconnectOnNext = false; @@ -413,7 +417,7 @@ class Engine extends Disposable with EventsEmittable { // construct the data channel message var message = rtc.RTCDataChannelMessage.fromBinary(packet.writeToBuffer()); - if (_subscriberPrimary) { + if (_subscriberPrimary && !_singlePCMode) { // make sure publisher transport is connected await ensurePublisherConnected(); @@ -625,11 +629,62 @@ class Engine extends Disposable with EventsEmittable { return rtcConfiguration; } + void _handleOnTrack(rtc.RTCTrackEvent event) { + logger.fine('[WebRTC] pc.onTrack'); + + final stream = event.streams.firstOrNull; + if (stream == null) { + // we need the stream to get the track's id + logger.severe('received track without mediastream'); + return; + } + + // doesn't get called reliably + event.track.onEnded = () { + logger.fine('[WebRTC] track.onEnded'); + }; + + // doesn't get called reliably + stream.onRemoveTrack = (_) { + logger.fine('[WebRTC] stream.onRemoveTrack'); + }; + + if (signalClient.connectionState == ConnectionState.reconnecting || + signalClient.connectionState == ConnectionState.connecting) { + final track = event.track; + final receiver = event.receiver; + events.on((event) async { + Timer(const Duration(milliseconds: 10), () { + events.emit(EngineTrackAddedEvent( + track: track, + stream: stream, + receiver: receiver, + )); + }); + }); + return; + } + + if (connectionState == ConnectionState.disconnected) { + logger.warning('skipping incoming track after Room disconnected'); + return; + } + + events.emit(EngineTrackAddedEvent( + track: event.track, + stream: stream, + receiver: event.receiver, + )); + } + Future _createPeerConnections(RTCConfiguration rtcConfiguration) async { - publisher = - await Transport.create(_peerConnectionCreate, rtcConfig: rtcConfiguration, connectOptions: connectOptions); - subscriber = - await Transport.create(_peerConnectionCreate, rtcConfig: rtcConfiguration, connectOptions: connectOptions); + publisher = await Transport.create(_peerConnectionCreate, + rtcConfig: rtcConfiguration, connectOptions: connectOptions, singlePCMode: _singlePCMode); + + if (!_singlePCMode) { + subscriber = + await Transport.create(_peerConnectionCreate, rtcConfig: rtcConfiguration, connectOptions: connectOptions); + } publisher?.pc.onIceCandidate = (rtc.RTCIceCandidate candidate) { logger.fine('publisher onIceCandidate'); @@ -643,41 +698,50 @@ class Engine extends Disposable with EventsEmittable { } }; - subscriber?.pc.onIceCandidate = (rtc.RTCIceCandidate candidate) { - logger.fine('subscriber onIceCandidate'); - signalClient.sendIceCandidate(candidate, lk_rtc.SignalTarget.SUBSCRIBER); - }; + if (!_singlePCMode) { + subscriber?.pc.onIceCandidate = (rtc.RTCIceCandidate candidate) { + logger.fine('subscriber onIceCandidate'); + signalClient.sendIceCandidate(candidate, lk_rtc.SignalTarget.SUBSCRIBER); + }; - subscriber?.pc.onIceConnectionState = (rtc.RTCIceConnectionState state) async { - logger.fine('subscriber iceConnectionState: $state'); - if (state == rtc.RTCIceConnectionState.RTCIceConnectionStateConnected) { - await _handleGettingConnectedServerAddress(subscriber!.pc); - } - }; + subscriber?.pc.onIceConnectionState = (rtc.RTCIceConnectionState state) async { + logger.fine('subscriber iceConnectionState: $state'); + if (state == rtc.RTCIceConnectionState.RTCIceConnectionStateConnected) { + await _handleGettingConnectedServerAddress(subscriber!.pc); + } + }; + } publisher?.onOffer = (offer) { logger.fine('publisher onOffer'); signalClient.sendOffer(offer); }; - // in subscriber primary mode, server side opens sub data channels. - if (_subscriberPrimary) { + // In single PC mode, server opens data channels on publisher. + // In dual PC subscriber-primary mode, server opens data channels on subscriber. + if (_singlePCMode) { + publisher?.pc.onDataChannel = _onDataChannel; + } else if (_subscriberPrimary) { subscriber?.pc.onDataChannel = _onDataChannel; } - subscriber?.pc.onConnectionState = (state) async { - events.emit(EngineSubscriberPeerStateUpdatedEvent( - state: state, - isPrimary: _subscriberPrimary, - )); - logger.fine('subscriber connectionState: $state'); - if (state.isDisconnected() || state.isFailed()) { - await handleReconnect( - state.isFailed() ? ClientDisconnectReason.peerConnectionFailed : ClientDisconnectReason.peerConnectionClosed, - reconnectReason: lk_models.ReconnectReason.RR_SUBSCRIBER_FAILED, - ); - } - }; + if (!_singlePCMode) { + subscriber?.pc.onConnectionState = (state) async { + events.emit(EngineSubscriberPeerStateUpdatedEvent( + state: state, + isPrimary: _subscriberPrimary, + )); + logger.fine('subscriber connectionState: $state'); + if (state.isDisconnected() || state.isFailed()) { + await handleReconnect( + state.isFailed() + ? ClientDisconnectReason.peerConnectionFailed + : ClientDisconnectReason.peerConnectionClosed, + reconnectReason: lk_models.ReconnectReason.RR_SUBSCRIBER_FAILED, + ); + } + }; + } publisher?.pc.onConnectionState = (state) async { if ([ @@ -689,7 +753,7 @@ class Engine extends Disposable with EventsEmittable { } events.emit(EnginePublisherPeerStateUpdatedEvent( state: state, - isPrimary: !_subscriberPrimary, + isPrimary: _singlePCMode || !_subscriberPrimary, )); logger.fine('publisher connectionState: $state'); if (state.isDisconnected() || state.isFailed()) { @@ -700,58 +764,19 @@ class Engine extends Disposable with EventsEmittable { } }; - subscriber?.pc.onTrack = (rtc.RTCTrackEvent event) { - logger.fine('[WebRTC] pc.onTrack'); - - final stream = event.streams.firstOrNull; - if (stream == null) { - // we need the stream to get the track's id - logger.severe('received track without mediastream'); - return; - } - - // doesn't get called reliably - event.track.onEnded = () { - logger.fine('[WebRTC] track.onEnded'); - }; - - // doesn't get called reliably - stream.onRemoveTrack = (_) { - logger.fine('[WebRTC] stream.onRemoveTrack'); - }; - - if (signalClient.connectionState == ConnectionState.reconnecting || - signalClient.connectionState == ConnectionState.connecting) { - final track = event.track; - final receiver = event.receiver; - events.on((event) async { - Timer(const Duration(milliseconds: 10), () { - events.emit(EngineTrackAddedEvent( - track: track, - stream: stream, - receiver: receiver, - )); - }); - }); - return; - } - - if (connectionState == ConnectionState.disconnected) { - logger.warning('skipping incoming track after Room disconnected'); - return; - } - - events.emit(EngineTrackAddedEvent( - track: event.track, - stream: stream, - receiver: event.receiver, - )); - }; + // In single PC mode, tracks arrive via publisher; otherwise via subscriber. + if (_singlePCMode) { + publisher?.pc.onTrack = _handleOnTrack; + } else { + subscriber?.pc.onTrack = _handleOnTrack; + } // doesn't get called reliably, doesn't work on mac - subscriber?.pc.onRemoveTrack = (rtc.MediaStream stream, rtc.MediaStreamTrack track) { - logger.fine('[WebRTC] ${track.id} pc.onRemoveTrack'); - }; + if (!_singlePCMode) { + subscriber?.pc.onRemoveTrack = (rtc.MediaStream stream, rtc.MediaStreamTrack track) { + logger.fine('[WebRTC] ${track.id} pc.onRemoveTrack'); + }; + } // also handle messages over the pub channel, for backwards compatibility try { @@ -1150,7 +1175,9 @@ class Engine extends Disposable with EventsEmittable { logger.fine('resumeConnection: primary is connected: $isConnected'); if (!isConnected) { - subscriber!.restartingIce = true; + if (!_singlePCMode && subscriber != null) { + subscriber!.restartingIce = true; + } logger.fine('resumeConnection: Waiting for primary to connect...'); await events.waitFor( filter: (event) => event.isPrimary && event.state.isConnected(), @@ -1230,7 +1257,17 @@ class Engine extends Disposable with EventsEmittable { required Iterable? publishTracks, required List trackSidsDisabled, }) async { - final previousAnswer = (await subscriber?.pc.getLocalDescription())?.toPBType(); + lk_rtc.SessionDescription? previousAnswer; + lk_rtc.SessionDescription? previousOffer; + + if (_singlePCMode) { + // In single PC mode, answer = publisher's remoteDescription, + // offer = publisher's localDescription. + previousAnswer = (await publisher?.pc.getRemoteDescription())?.toPBType(); + previousOffer = (await publisher?.pc.getLocalDescription())?.toPBType(); + } else { + previousAnswer = (await subscriber?.pc.getLocalDescription())?.toPBType(); + } // Build data channel receive states for reliability final dataChannelReceiveStates = []; @@ -1245,6 +1282,7 @@ class Engine extends Disposable with EventsEmittable { } signalClient.sendSyncState( answer: previousAnswer, + offer: previousOffer, subscription: subscription, publishTracks: publishTracks, dataChannelInfo: dataChannelInfo(), @@ -1262,6 +1300,7 @@ class Engine extends Disposable with EventsEmittable { ..on((event) async { // create peer connections _subscriberPrimary = event.response.subscriberPrimary; + _singlePCMode = roomOptions.singlePeerConnection; _serverInfo = event.response.serverInfo; final iceServersFromServer = event.response.iceServers.map((e) => e.toSDKType()).toList(); @@ -1314,7 +1353,9 @@ class Engine extends Disposable with EventsEmittable { serverProvidedIceServers: _serverProvidedIceServers); await publisher?.pc.setConfiguration(rtcConfiguration.toMap()); - await subscriber?.pc.setConfiguration(rtcConfiguration.toMap()); + if (!_singlePCMode) { + await subscriber?.pc.setConfiguration(rtcConfiguration.toMap()); + } if (!_subscriberPrimary) { await negotiate(); @@ -1352,6 +1393,11 @@ class Engine extends Disposable with EventsEmittable { } }) ..on((event) async { + if (_singlePCMode) { + // In single PC mode, server uses mediaSectionsRequirement instead of offers. + logger.warning('Received SignalOfferEvent in single PC mode, ignoring'); + return; + } if (subscriber == null) { logger.warning('[$objectId] subscriber is null'); return; @@ -1382,6 +1428,15 @@ class Engine extends Disposable with EventsEmittable { await publisher!.setRemoteDescription(event.sd); }) ..on((event) async { + if (_singlePCMode) { + if (publisher == null) { + logger.warning('Received ${SignalTrickleEvent} but publisher was null.'); + return; + } + logger.fine('got ICE candidate from peer (single PC mode)'); + await publisher!.addIceCandidate(event.candidate); + return; + } if (publisher == null || subscriber == null) { logger.warning('Received ${SignalTrickleEvent} but publisher or subscriber was null.'); return; @@ -1435,6 +1490,32 @@ class Engine extends Disposable with EventsEmittable { signalClient.participantSid = event.response.participant.sid; } events.emit(EngineRoomMovedEvent(response: event.response)); + }) + ..on((event) async { + if (!_singlePCMode || publisher == null) { + logger.warning('Received media sections requirement but not in single PC mode or publisher is null'); + return; + } + logger.fine('Adding recvonly transceivers: audio=${event.numAudios}, video=${event.numVideos}'); + + final transceiverInit = rtc.RTCRtpTransceiverInit( + direction: rtc.TransceiverDirection.RecvOnly, + ); + + for (int i = 0; i < event.numAudios; i++) { + await publisher!.pc.addTransceiver( + kind: rtc.RTCRtpMediaType.RTCRtpMediaTypeAudio, + init: transceiverInit, + ); + } + for (int i = 0; i < event.numVideos; i++) { + await publisher!.pc.addTransceiver( + kind: rtc.RTCRtpMediaType.RTCRtpMediaTypeVideo, + init: transceiverInit, + ); + } + + await negotiate(); }); Future disconnect({ diff --git a/lib/src/core/signal_client.dart b/lib/src/core/signal_client.dart index 89bdb258f..4e8eef53b 100644 --- a/lib/src/core/signal_client.dart +++ b/lib/src/core/signal_client.dart @@ -94,6 +94,9 @@ class SignalClient extends Disposable with EventsEmittable { }); } + // Whether the established connection used the v1 signal path (single PC mode). + bool _useV1SignalPath = false; + @internal Future connect( String uriString, @@ -127,15 +130,33 @@ class SignalClient extends Disposable with EventsEmittable { } } - final rtcUri = await Utils.buildUri( - uriString, - token: token, - connectOptions: connectOptions, - roomOptions: roomOptions, - reconnect: reconnect, - sid: reconnect ? participantSid : null, - reconnectReason: reconnectReason, - ); + // Use v1 path for initial connection when singlePeerConnection is requested. + // For reconnect, use the same path version as the established connection. + final useV1 = reconnect ? _useV1SignalPath : roomOptions.singlePeerConnection; + + final Uri rtcUri; + if (useV1 && !reconnect) { + rtcUri = await Utils.buildV1Uri( + uriString, + token: token, + connectOptions: connectOptions, + roomOptions: roomOptions, + ); + } else { + rtcUri = await Utils.buildUri( + uriString, + token: token, + connectOptions: connectOptions, + roomOptions: roomOptions, + reconnect: reconnect, + sid: reconnect ? participantSid : null, + reconnectReason: reconnectReason, + ); + } + + if (!reconnect) { + _useV1SignalPath = useV1; + } logger.fine('SignalClient connecting with url: $rtcUri'); @@ -359,6 +380,15 @@ class SignalClient extends Disposable with EventsEmittable { } events.emit(SignalRoomMovedEvent(response: msg.roomMoved)); break; + case lk_rtc.SignalResponse_Message.mediaSectionsRequirement: + logger.fine('received media sections requirement: ' + 'audios=${msg.mediaSectionsRequirement.numAudios}, ' + 'videos=${msg.mediaSectionsRequirement.numVideos}'); + events.emit(SignalMediaSectionsRequirementEvent( + numAudios: msg.mediaSectionsRequirement.numAudios, + numVideos: msg.mediaSectionsRequirement.numVideos, + )); + break; default: logger.warning('received unknown signal message'); } @@ -490,10 +520,12 @@ extension SignalClientRequests on SignalClient { required Iterable? dataChannelInfo, required List trackSidsDisabled, List? dataChannelReceiveStates, + lk_rtc.SessionDescription? offer, }) => _sendRequest(lk_rtc.SignalRequest( syncState: lk_rtc.SyncState( answer: answer, + offer: offer, subscription: subscription, publishTracks: publishTracks, dataChannels: dataChannelInfo, diff --git a/lib/src/core/transport.dart b/lib/src/core/transport.dart index 743e9eeb7..e483a363d 100644 --- a/lib/src/core/transport.dart +++ b/lib/src/core/transport.dart @@ -64,9 +64,10 @@ class Transport extends Disposable { TransportOnOffer? onOffer; Function? _cancelDebounce; ConnectOptions connectOptions; + final bool singlePCMode; // private constructor - Transport._(this.pc, this.connectOptions) { + Transport._(this.pc, this.connectOptions, {this.singlePCMode = false}) { // onDispose(() async { _cancelDebounce?.call(); @@ -101,11 +102,11 @@ class Transport extends Disposable { } static Future create(PeerConnectionCreate peerConnectionCreate, - {RTCConfiguration? rtcConfig, required ConnectOptions connectOptions}) async { + {RTCConfiguration? rtcConfig, required ConnectOptions connectOptions, bool singlePCMode = false}) async { rtcConfig ??= const RTCConfiguration(); logger.fine('[PCTransport] creating ${rtcConfig.toMap()}'); final pc = await peerConnectionCreate(rtcConfig.toMap()); - return Transport._(pc, connectOptions); + return Transport._(pc, connectOptions, singlePCMode: singlePCMode); } late final negotiate = Utils.createDebounceFunc( @@ -214,8 +215,16 @@ class Transport extends Disposable { } }); + var mungedSdp = sdp_transform.write(sdpParsed, null); + + // In single PC mode, munge a=inactive to a=recvonly for RTP media sections. + // WebRTC can generate inactive direction even when transceivers were configured as recvonly. + if (singlePCMode) { + mungedSdp = mungeInactiveToRecvOnlyForMedia(mungedSdp); + } + try { - await setMungedSDP(sd: offer, munged: sdp_transform.write(sdpParsed, null)); + await setMungedSDP(sd: offer, munged: mungedSdp); } catch (e) { throw NegotiationError(e.toString()); } @@ -328,4 +337,35 @@ class Transport extends Disposable { rethrow; } } + + /// Munge SDP to change `a=inactive` to `a=recvonly` for RTP media m-lines + /// in single PC mode. WebRTC can generate inactive direction even when + /// transceivers were configured as recvonly. Only rewrites RTP m-sections — + /// non-RTP sections (e.g. data channel `m=application`) are preserved. + static String mungeInactiveToRecvOnlyForMedia(String sdp) { + final usesCRLF = sdp.contains('\r\n'); + final eol = usesCRLF ? '\r\n' : '\n'; + final lines = sdp.split(eol); + + final out = []; + var inRTPMediaSection = false; + + for (final line in lines) { + final l = line.trim(); + if (l.startsWith('m=')) { + inRTPMediaSection = l.contains('RTP/'); + } + if (inRTPMediaSection && l == 'a=inactive') { + out.add('a=recvonly'); + } else { + out.add(line); + } + } + + var result = out.join(eol); + if (sdp.endsWith(eol) && !result.endsWith(eol)) { + result += eol; + } + return result; + } } diff --git a/lib/src/internal/events.dart b/lib/src/internal/events.dart index b4af51238..2e56e99ed 100644 --- a/lib/src/internal/events.dart +++ b/lib/src/internal/events.dart @@ -562,6 +562,19 @@ class SignalRoomMovedEvent with SignalEvent, InternalEvent { String toString() => '${runtimeType}(room: ${response.room.name})'; } +@internal +class SignalMediaSectionsRequirementEvent with SignalEvent, InternalEvent { + final int numAudios; + final int numVideos; + const SignalMediaSectionsRequirementEvent({ + required this.numAudios, + required this.numVideos, + }); + + @override + String toString() => '${runtimeType}(numAudios: $numAudios, numVideos: $numVideos)'; +} + // ---------------------------------------------------------------------- // Engine events // ---------------------------------------------------------------------- diff --git a/lib/src/options.dart b/lib/src/options.dart index f37cf43de..4d780f61e 100644 --- a/lib/src/options.dart +++ b/lib/src/options.dart @@ -121,6 +121,11 @@ class RoomOptions { /// fast track publication final bool fastPublish; + /// When true, will attempt to connect via single peer connection mode. + /// Falls back to dual peer connection mode if not available. + /// Requires LiveKit Cloud or LiveKit OSS >= 1.9.2. + final bool singlePeerConnection; + /// deprecated, use [createVisualizer] instead /// please refer to example/lib/widgets/sound_waveform.dart @Deprecated('Use createVisualizer instead') @@ -140,6 +145,7 @@ class RoomOptions { this.encryption, this.enableVisualizer = false, this.fastPublish = true, + this.singlePeerConnection = false, }); RoomOptions copyWith({ @@ -155,6 +161,7 @@ class RoomOptions { E2EEOptions? e2eeOptions, E2EEOptions? encryption, bool? fastPublish, + bool? singlePeerConnection, }) { return RoomOptions( defaultCameraCaptureOptions: defaultCameraCaptureOptions ?? this.defaultCameraCaptureOptions, @@ -170,6 +177,7 @@ class RoomOptions { e2eeOptions: e2eeOptions ?? this.e2eeOptions, encryption: encryption ?? this.encryption, fastPublish: fastPublish ?? this.fastPublish, + singlePeerConnection: singlePeerConnection ?? this.singlePeerConnection, ); } } diff --git a/lib/src/utils.dart b/lib/src/utils.dart index cb0297d05..ad97f5d41 100644 --- a/lib/src/utils.dart +++ b/lib/src/utils.dart @@ -26,6 +26,7 @@ import 'package:flutter_webrtc/flutter_webrtc.dart' as rtc; import 'package:meta/meta.dart'; import './proto/livekit_models.pb.dart' as lk_models; +import './proto/livekit_rtc.pb.dart' as lk_rtc; import './support/native.dart'; import 'extensions.dart'; import 'livekit.dart'; @@ -210,6 +211,54 @@ class Utils { ); } + @internal + static Future buildV1Uri( + String uriString, { + required String token, + required ConnectOptions connectOptions, + required RoomOptions roomOptions, + bool reconnect = false, + String? sid, + lk_models.ReconnectReason? reconnectReason, + }) async { + final Uri uri = Uri.parse(uriString); + + final useSecure = uri.isSecureScheme; + final wsScheme = useSecure ? 'wss' : 'ws'; + + final pathSegments = List.from(uri.pathSegments); + pathSegments.removeWhere((e) => e.isEmpty); + pathSegments.addAll(['rtc', 'v1']); + + final clientInfo = await _clientInfo(); + + final joinRequest = lk_rtc.JoinRequest( + clientInfo: clientInfo, + connectionSettings: lk_rtc.ConnectionSettings( + autoSubscribe: connectOptions.autoSubscribe, + adaptiveStream: roomOptions.adaptiveStream, + ), + reconnect: reconnect ? true : null, + participantSid: reconnect ? sid : null, + reconnectReason: reconnect ? reconnectReason : null, + ); + + final wrappedJoinRequest = lk_rtc.WrappedJoinRequest( + joinRequest: joinRequest.writeToBuffer(), + ); + + final joinRequestBase64 = base64Encode(wrappedJoinRequest.writeToBuffer()); + + return uri.replace( + scheme: wsScheme, + pathSegments: pathSegments, + queryParameters: { + 'access_token': token, + 'join_request': joinRequestBase64, + }, + ); + } + static List _presetsForDimensions({ required bool isScreenShare, required VideoDimensions dimensions, From 28b3e84042b26207279c50428a68d86f338e9902 Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Tue, 10 Mar 2026 17:36:43 +0900 Subject: [PATCH 2/6] changes --- .changes/single-pc | 1 + 1 file changed, 1 insertion(+) create mode 100644 .changes/single-pc diff --git a/.changes/single-pc b/.changes/single-pc new file mode 100644 index 000000000..e108362f2 --- /dev/null +++ b/.changes/single-pc @@ -0,0 +1 @@ +minor type="added" "Single peer connection support" From 8fc899a0254ba53fab24006e305c5bdf98e477e3 Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Tue, 10 Mar 2026 21:05:02 +0900 Subject: [PATCH 3/6] fixes --- lib/src/core/engine.dart | 37 +++++++++++++++++---- lib/src/core/signal_client.dart | 41 ++++++++++++++++------- lib/src/core/transport.dart | 59 ++++++++++++++++++++++++++------- lib/src/exceptions.dart | 1 + lib/src/options.dart | 4 +-- lib/src/utils.dart | 37 +++++++++++++++++++++ 6 files changed, 146 insertions(+), 33 deletions(-) diff --git a/lib/src/core/engine.dart b/lib/src/core/engine.dart index 80f7d1bce..2d667dc7c 100644 --- a/lib/src/core/engine.dart +++ b/lib/src/core/engine.dart @@ -243,12 +243,27 @@ class Engine extends Disposable with EventsEmittable { try { // wait for socket to connect rtc server - await signalClient.connect( - url, - token, - connectOptions: this.connectOptions, - roomOptions: this.roomOptions, - ); + try { + await signalClient.connect( + url, + token, + connectOptions: this.connectOptions, + roomOptions: this.roomOptions, + ); + } on ConnectException catch (e) { + // If v1 path returned ServiceNotFound, fall back to v0 (dual PC) + if (e.reason == ConnectionErrorReason.ServiceNotFound && this.roomOptions.singlePeerConnection) { + logger.warning('v1 signal path not found, falling back to dual PC mode'); + await signalClient.connect( + url, + token, + connectOptions: this.connectOptions, + roomOptions: this.roomOptions.copyWith(singlePeerConnection: false), + ); + } else { + rethrow; + } + } // wait for join response await events.waitFor( @@ -1300,7 +1315,9 @@ class Engine extends Disposable with EventsEmittable { ..on((event) async { // create peer connections _subscriberPrimary = event.response.subscriberPrimary; - _singlePCMode = roomOptions.singlePeerConnection; + // Derive single PC mode from the actual signal path, not just the option. + // If v1 was unavailable, signalClient falls back to v0 and useV1SignalPath=false. + _singlePCMode = signalClient.useV1SignalPath; _serverInfo = event.response.serverInfo; final iceServersFromServer = event.response.iceServers.map((e) => e.toSDKType()).toList(); @@ -1496,6 +1513,12 @@ class Engine extends Disposable with EventsEmittable { logger.warning('Received media sections requirement but not in single PC mode or publisher is null'); return; } + // The server sends delta counts (how many *new* transceivers to add), + // not absolute totals. This matches the JS SDK behavior — we blindly + // add the requested number of recvonly transceivers each time. + // In single PC mode only the offerer (client) can add new m-lines, + // so the server signals when it needs additional media sections to + // deliver subscribed tracks. logger.fine('Adding recvonly transceivers: audio=${event.numAudios}, video=${event.numVideos}'); final transceiverInit = rtc.RTCRtpTransceiverInit( diff --git a/lib/src/core/signal_client.dart b/lib/src/core/signal_client.dart index 4e8eef53b..196ea2492 100644 --- a/lib/src/core/signal_client.dart +++ b/lib/src/core/signal_client.dart @@ -97,6 +97,9 @@ class SignalClient extends Disposable with EventsEmittable { // Whether the established connection used the v1 signal path (single PC mode). bool _useV1SignalPath = false; + @internal + bool get useV1SignalPath => _useV1SignalPath; + @internal Future connect( String uriString, @@ -131,16 +134,19 @@ class SignalClient extends Disposable with EventsEmittable { } // Use v1 path for initial connection when singlePeerConnection is requested. - // For reconnect, use the same path version as the established connection. + // For quick reconnect, use the same path version as the established connection. final useV1 = reconnect ? _useV1SignalPath : roomOptions.singlePeerConnection; final Uri rtcUri; - if (useV1 && !reconnect) { + if (useV1) { rtcUri = await Utils.buildV1Uri( uriString, token: token, connectOptions: connectOptions, roomOptions: roomOptions, + reconnect: reconnect, + sid: reconnect ? participantSid : null, + reconnectReason: reconnectReason, ); } else { rtcUri = await Utils.buildUri( @@ -194,15 +200,23 @@ class SignalClient extends Disposable with EventsEmittable { // Attempt Validation var finalError = socketError; try { - // Re-build same uri for validate mode - final validateUri = await Utils.buildUri( - uriString, - token: token, - connectOptions: connectOptions, - roomOptions: roomOptions, - validate: true, - forceSecure: rtcUri.isSecureScheme, - ); + // Re-build same uri for validate mode, matching the signal path version + final validateUri = useV1 + ? await Utils.buildV1ValidateUri( + uriString, + token: token, + connectOptions: connectOptions, + roomOptions: roomOptions, + forceSecure: rtcUri.isSecureScheme, + ) + : await Utils.buildUri( + uriString, + token: token, + connectOptions: connectOptions, + roomOptions: roomOptions, + validate: true, + forceSecure: rtcUri.isSecureScheme, + ); final validateResponse = await http.get( validateUri, @@ -210,7 +224,10 @@ class SignalClient extends Disposable with EventsEmittable { 'Authorization': 'Bearer $token', }, ); - if (validateResponse.statusCode != 200) { + if (validateResponse.statusCode == 404) { + finalError = + ConnectException(validateResponse.body, reason: ConnectionErrorReason.ServiceNotFound, statusCode: 404); + } else if (validateResponse.statusCode != 200) { finalError = ConnectException(validateResponse.body, reason: validateResponse.statusCode >= 400 ? ConnectionErrorReason.NotAllowed diff --git a/lib/src/core/transport.dart b/lib/src/core/transport.dart index e483a363d..c7445ab27 100644 --- a/lib/src/core/transport.dart +++ b/lib/src/core/transport.dart @@ -338,27 +338,55 @@ class Transport extends Disposable { } } - /// Munge SDP to change `a=inactive` to `a=recvonly` for RTP media m-lines - /// in single PC mode. WebRTC can generate inactive direction even when - /// transceivers were configured as recvonly. Only rewrites RTP m-sections — - /// non-RTP sections (e.g. data channel `m=application`) are preserved. + /// Munge SDP to change `a=inactive` to `a=recvonly` for RTP media sections + /// that have no SSRC (i.e. receive-only transceivers with no local media). + /// + /// In single PC mode, libWebRTC may incorrectly generate `a=inactive` for + /// transceivers that were configured as recvonly. We only fix sections + /// without SSRC lines to avoid touching sendonly/sendrecv transceivers that + /// have been intentionally set to inactive (e.g. after unpublishing). static String mungeInactiveToRecvOnlyForMedia(String sdp) { final usesCRLF = sdp.contains('\r\n'); final eol = usesCRLF ? '\r\n' : '\n'; final lines = sdp.split(eol); - final out = []; - var inRTPMediaSection = false; - - for (final line in lines) { - final l = line.trim(); + // Two-pass approach: first collect media section ranges and whether they + // contain SSRC lines, then rewrite only the qualifying sections. + final sections = <_MediaSection>[]; + for (int i = 0; i < lines.length; i++) { + final l = lines[i].trim(); if (l.startsWith('m=')) { - inRTPMediaSection = l.contains('RTP/'); + sections.add(_MediaSection( + startIndex: i, + isRTP: l.contains('RTP/'), + )); + } else if (sections.isNotEmpty) { + if (l.startsWith('a=ssrc:')) { + sections.last.hasSSRC = true; + } + } + } + + // Build a set of line indices where a=inactive should be rewritten. + final rewriteIndices = {}; + for (final section in sections) { + if (!section.isRTP || section.hasSSRC) continue; + final end = sections.indexOf(section) + 1 < sections.length + ? sections[sections.indexOf(section) + 1].startIndex + : lines.length; + for (int i = section.startIndex; i < end; i++) { + if (lines[i].trim() == 'a=inactive') { + rewriteIndices.add(i); + } } - if (inRTPMediaSection && l == 'a=inactive') { + } + + final out = []; + for (int i = 0; i < lines.length; i++) { + if (rewriteIndices.contains(i)) { out.add('a=recvonly'); } else { - out.add(line); + out.add(lines[i]); } } @@ -369,3 +397,10 @@ class Transport extends Disposable { return result; } } + +class _MediaSection { + final int startIndex; + final bool isRTP; + bool hasSSRC = false; + _MediaSection({required this.startIndex, required this.isRTP}); +} diff --git a/lib/src/exceptions.dart b/lib/src/exceptions.dart index f14ba9640..ed5f2d50a 100644 --- a/lib/src/exceptions.dart +++ b/lib/src/exceptions.dart @@ -25,6 +25,7 @@ enum ConnectionErrorReason { NotAllowed, InternalError, Timeout, + ServiceNotFound, } /// An exception occurred while attempting to connect. diff --git a/lib/src/options.dart b/lib/src/options.dart index 4d780f61e..9f1af02be 100644 --- a/lib/src/options.dart +++ b/lib/src/options.dart @@ -122,8 +122,8 @@ class RoomOptions { final bool fastPublish; /// When true, will attempt to connect via single peer connection mode. - /// Falls back to dual peer connection mode if not available. - /// Requires LiveKit Cloud or LiveKit OSS >= 1.9.2. + /// Automatically falls back to dual peer connection mode if the server + /// does not support the v1 signal path (HTTP 404). final bool singlePeerConnection; /// deprecated, use [createVisualizer] instead diff --git a/lib/src/utils.dart b/lib/src/utils.dart index ad97f5d41..717b81a03 100644 --- a/lib/src/utils.dart +++ b/lib/src/utils.dart @@ -231,6 +231,15 @@ class Utils { pathSegments.addAll(['rtc', 'v1']); final clientInfo = await _clientInfo(); + final networkType = await getNetworkType(); + + // Populate ClientInfo with the same fields that v0 sends as query params + if (clientInfo != null) { + clientInfo.sdk = lk_models.ClientInfo_SDK.FLUTTER; + clientInfo.version = LiveKitClient.version; + clientInfo.protocol = int.tryParse(connectOptions.protocolVersion.toStringValue()) ?? 0; + clientInfo.network = networkType; + } final joinRequest = lk_rtc.JoinRequest( clientInfo: clientInfo, @@ -259,6 +268,34 @@ class Utils { ); } + /// Converts a v1 WebSocket URL to its HTTP validation counterpart. + /// `wss://host/rtc/v1` → `https://host/rtc/v1/validate` + @internal + static Future buildV1ValidateUri( + String uriString, { + required String token, + required ConnectOptions connectOptions, + required RoomOptions roomOptions, + bool forceSecure = false, + }) async { + final Uri uri = Uri.parse(uriString); + + final useSecure = uri.isSecureScheme || forceSecure; + final httpScheme = useSecure ? 'https' : 'http'; + + final pathSegments = List.from(uri.pathSegments); + pathSegments.removeWhere((e) => e.isEmpty); + pathSegments.addAll(['rtc', 'v1', 'validate']); + + return uri.replace( + scheme: httpScheme, + pathSegments: pathSegments, + queryParameters: { + 'access_token': token, + }, + ); + } + static List _presetsForDimensions({ required bool isScreenShare, required VideoDimensions dimensions, From 61450f6274c5b1f46cbf84223c906e2f553299a7 Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Tue, 10 Mar 2026 21:40:26 +0900 Subject: [PATCH 4/6] ref1 --- lib/src/core/signal_client.dart | 4 ++-- lib/src/utils.dart | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/src/core/signal_client.dart b/lib/src/core/signal_client.dart index 196ea2492..7e9e1088c 100644 --- a/lib/src/core/signal_client.dart +++ b/lib/src/core/signal_client.dart @@ -149,7 +149,7 @@ class SignalClient extends Disposable with EventsEmittable { reconnectReason: reconnectReason, ); } else { - rtcUri = await Utils.buildUri( + rtcUri = await Utils.buildV0Uri( uriString, token: token, connectOptions: connectOptions, @@ -209,7 +209,7 @@ class SignalClient extends Disposable with EventsEmittable { roomOptions: roomOptions, forceSecure: rtcUri.isSecureScheme, ) - : await Utils.buildUri( + : await Utils.buildV0Uri( uriString, token: token, connectOptions: connectOptions, diff --git a/lib/src/utils.dart b/lib/src/utils.dart index 717b81a03..0c8ff2ef4 100644 --- a/lib/src/utils.dart +++ b/lib/src/utils.dart @@ -158,7 +158,7 @@ class Utils { } @internal - static Future buildUri( + static Future buildV0Uri( String uriString, { required String token, required ConnectOptions connectOptions, From 4eb4d1d3c3237c38c4267a760a11f8d6c948c25c Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Fri, 13 Mar 2026 14:07:46 +0900 Subject: [PATCH 5/6] move opts --- lib/src/core/engine.dart | 6 +++--- lib/src/core/signal_client.dart | 2 +- lib/src/options.dart | 30 ++++++++++++++++++++++-------- 3 files changed, 26 insertions(+), 12 deletions(-) diff --git a/lib/src/core/engine.dart b/lib/src/core/engine.dart index 2d667dc7c..40526bb53 100644 --- a/lib/src/core/engine.dart +++ b/lib/src/core/engine.dart @@ -252,13 +252,13 @@ class Engine extends Disposable with EventsEmittable { ); } on ConnectException catch (e) { // If v1 path returned ServiceNotFound, fall back to v0 (dual PC) - if (e.reason == ConnectionErrorReason.ServiceNotFound && this.roomOptions.singlePeerConnection) { + if (e.reason == ConnectionErrorReason.ServiceNotFound && this.connectOptions.singlePeerConnection) { logger.warning('v1 signal path not found, falling back to dual PC mode'); await signalClient.connect( url, token, - connectOptions: this.connectOptions, - roomOptions: this.roomOptions.copyWith(singlePeerConnection: false), + connectOptions: this.connectOptions.copyWith(singlePeerConnection: false), + roomOptions: this.roomOptions, ); } else { rethrow; diff --git a/lib/src/core/signal_client.dart b/lib/src/core/signal_client.dart index 7e9e1088c..a067d552d 100644 --- a/lib/src/core/signal_client.dart +++ b/lib/src/core/signal_client.dart @@ -135,7 +135,7 @@ class SignalClient extends Disposable with EventsEmittable { // Use v1 path for initial connection when singlePeerConnection is requested. // For quick reconnect, use the same path version as the established connection. - final useV1 = reconnect ? _useV1SignalPath : roomOptions.singlePeerConnection; + final useV1 = reconnect ? _useV1SignalPath : connectOptions.singlePeerConnection; final Uri rtcUri; if (useV1) { diff --git a/lib/src/options.dart b/lib/src/options.dart index 9f1af02be..9eeb75dd5 100644 --- a/lib/src/options.dart +++ b/lib/src/options.dart @@ -61,12 +61,34 @@ class ConnectOptions { final Timeouts timeouts; + /// When true, will attempt to connect via single peer connection mode. + /// Automatically falls back to dual peer connection mode if the server + /// does not support the v1 signal path (HTTP 404). + final bool singlePeerConnection; + const ConnectOptions({ this.autoSubscribe = true, this.rtcConfiguration = const RTCConfiguration(), this.protocolVersion = ProtocolVersion.v16, this.timeouts = Timeouts.defaultTimeouts, + this.singlePeerConnection = false, }); + + ConnectOptions copyWith({ + bool? autoSubscribe, + RTCConfiguration? rtcConfiguration, + ProtocolVersion? protocolVersion, + Timeouts? timeouts, + bool? singlePeerConnection, + }) { + return ConnectOptions( + autoSubscribe: autoSubscribe ?? this.autoSubscribe, + rtcConfiguration: rtcConfiguration ?? this.rtcConfiguration, + protocolVersion: protocolVersion ?? this.protocolVersion, + timeouts: timeouts ?? this.timeouts, + singlePeerConnection: singlePeerConnection ?? this.singlePeerConnection, + ); + } } /// Options used to modify the behavior of the [Room]. @@ -121,11 +143,6 @@ class RoomOptions { /// fast track publication final bool fastPublish; - /// When true, will attempt to connect via single peer connection mode. - /// Automatically falls back to dual peer connection mode if the server - /// does not support the v1 signal path (HTTP 404). - final bool singlePeerConnection; - /// deprecated, use [createVisualizer] instead /// please refer to example/lib/widgets/sound_waveform.dart @Deprecated('Use createVisualizer instead') @@ -145,7 +162,6 @@ class RoomOptions { this.encryption, this.enableVisualizer = false, this.fastPublish = true, - this.singlePeerConnection = false, }); RoomOptions copyWith({ @@ -161,7 +177,6 @@ class RoomOptions { E2EEOptions? e2eeOptions, E2EEOptions? encryption, bool? fastPublish, - bool? singlePeerConnection, }) { return RoomOptions( defaultCameraCaptureOptions: defaultCameraCaptureOptions ?? this.defaultCameraCaptureOptions, @@ -177,7 +192,6 @@ class RoomOptions { e2eeOptions: e2eeOptions ?? this.e2eeOptions, encryption: encryption ?? this.encryption, fastPublish: fastPublish ?? this.fastPublish, - singlePeerConnection: singlePeerConnection ?? this.singlePeerConnection, ); } } From b23390a31c4a02fb1218e15f3831ec94841d21a0 Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Fri, 13 Mar 2026 14:22:13 +0900 Subject: [PATCH 6/6] dc fixes --- lib/src/core/engine.dart | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/lib/src/core/engine.dart b/lib/src/core/engine.dart index 40526bb53..a73289700 100644 --- a/lib/src/core/engine.dart +++ b/lib/src/core/engine.dart @@ -432,7 +432,7 @@ class Engine extends Disposable with EventsEmittable { // construct the data channel message var message = rtc.RTCDataChannelMessage.fromBinary(packet.writeToBuffer()); - if (_subscriberPrimary && !_singlePCMode) { + if (_subscriberPrimary || _singlePCMode) { // make sure publisher transport is connected await ensurePublisherConnected(); @@ -842,23 +842,21 @@ class Engine extends Disposable with EventsEmittable { logger.fine('Server opened DC label: ${dc.label}'); _reliableDCSub = dc; _reliableDCSub?.onMessage = _onDCMessage; - _reliableDCSub?.stateChangeStream.listen((state) => - _reliableDCPub?.stateChangeStream.listen((state) => events.emit(SubscriberDataChannelStateUpdatedEvent( - isPrimary: _subscriberPrimary, - state: state, - type: Reliability.reliable, - )))); + _reliableDCSub?.stateChangeStream.listen((state) => events.emit(SubscriberDataChannelStateUpdatedEvent( + isPrimary: _subscriberPrimary, + state: state, + type: Reliability.reliable, + ))); break; case _lossyDCLabel: logger.fine('Server opened DC label: ${dc.label}'); _lossyDCSub = dc; _lossyDCSub?.onMessage = _onDCMessage; - _lossyDCSub?.stateChangeStream.listen((event) => - _reliableDCPub?.stateChangeStream.listen((state) => events.emit(SubscriberDataChannelStateUpdatedEvent( - isPrimary: _subscriberPrimary, - state: state, - type: Reliability.lossy, - )))); + _lossyDCSub?.stateChangeStream.listen((state) => events.emit(SubscriberDataChannelStateUpdatedEvent( + isPrimary: _subscriberPrimary, + state: state, + type: Reliability.lossy, + ))); break; default: logger.warning('Unknown DC label: ${dc.label}');