diff --git a/.changes/single-pc b/.changes/single-pc new file mode 100644 index 000000000..e108362f2 --- /dev/null +++ b/.changes/single-pc @@ -0,0 +1 @@ +minor type="added" "Single peer connection support" diff --git a/lib/src/core/engine.dart b/lib/src/core/engine.dart index a88cff287..a73289700 100644 --- a/lib/src/core/engine.dart +++ b/lib/src/core/engine.dart @@ -80,10 +80,11 @@ class Engine extends Disposable with EventsEmittable { Transport? subscriber; @internal - Transport? get primary => _subscriberPrimary ? subscriber : publisher; + Transport? get primary => _singlePCMode ? publisher : (_subscriberPrimary ? subscriber : publisher); - rtc.RTCDataChannel? get dataChannel => - _subscriberPrimary ? _reliableDCSub ?? _lossyDCSub : _reliableDCPub ?? _lossyDCPub; + rtc.RTCDataChannel? get dataChannel => _singlePCMode + ? _reliableDCPub ?? _lossyDCPub + : (_subscriberPrimary ? _reliableDCSub ?? _lossyDCSub : _reliableDCPub ?? _lossyDCPub); // data channels for packets rtc.RTCDataChannel? _reliableDCPub; @@ -109,6 +110,7 @@ class Engine extends Disposable with EventsEmittable { FastConnectOptions? fastConnectOptions; bool _subscriberPrimary = false; + bool _singlePCMode = false; String? _connectedServerAddress; String? get connectedServerAddress => _connectedServerAddress; @@ -241,12 +243,27 @@ class Engine extends Disposable with EventsEmittable { try { // wait for socket to connect rtc server - await signalClient.connect( - url, - token, - connectOptions: this.connectOptions, - roomOptions: this.roomOptions, - ); + try { + await signalClient.connect( + url, + token, + connectOptions: this.connectOptions, + roomOptions: this.roomOptions, + ); + } on ConnectException catch (e) { + // If v1 path returned ServiceNotFound, fall back to v0 (dual PC) + if (e.reason == ConnectionErrorReason.ServiceNotFound && this.connectOptions.singlePeerConnection) { + logger.warning('v1 signal path not found, falling back to dual PC mode'); + await signalClient.connect( + url, + token, + connectOptions: this.connectOptions.copyWith(singlePeerConnection: false), + roomOptions: this.roomOptions, + ); + } else { + rethrow; + } + } // wait for join response await events.waitFor( @@ -286,6 +303,8 @@ class Engine extends Disposable with EventsEmittable { await subscriber?.dispose(); subscriber = null; + _singlePCMode = false; + await signalClient.cleanUp(); fullReconnectOnNext = false; @@ -413,7 +432,7 @@ class Engine extends Disposable with EventsEmittable { // construct the data channel message var message = rtc.RTCDataChannelMessage.fromBinary(packet.writeToBuffer()); - if (_subscriberPrimary) { + if (_subscriberPrimary || _singlePCMode) { // make sure publisher transport is connected await ensurePublisherConnected(); @@ -625,11 +644,62 @@ class Engine extends Disposable with EventsEmittable { return rtcConfiguration; } + void _handleOnTrack(rtc.RTCTrackEvent event) { + logger.fine('[WebRTC] pc.onTrack'); + + final stream = event.streams.firstOrNull; + if (stream == null) { + // we need the stream to get the track's id + logger.severe('received track without mediastream'); + return; + } + + // doesn't get called reliably + event.track.onEnded = () { + logger.fine('[WebRTC] track.onEnded'); + }; + + // doesn't get called reliably + stream.onRemoveTrack = (_) { + logger.fine('[WebRTC] stream.onRemoveTrack'); + }; + + if (signalClient.connectionState == ConnectionState.reconnecting || + signalClient.connectionState == ConnectionState.connecting) { + final track = event.track; + final receiver = event.receiver; + events.on((event) async { + Timer(const Duration(milliseconds: 10), () { + events.emit(EngineTrackAddedEvent( + track: track, + stream: stream, + receiver: receiver, + )); + }); + }); + return; + } + + if (connectionState == ConnectionState.disconnected) { + logger.warning('skipping incoming track after Room disconnected'); + return; + } + + events.emit(EngineTrackAddedEvent( + track: event.track, + stream: stream, + receiver: event.receiver, + )); + } + Future _createPeerConnections(RTCConfiguration rtcConfiguration) async { - publisher = - await Transport.create(_peerConnectionCreate, rtcConfig: rtcConfiguration, connectOptions: connectOptions); - subscriber = - await Transport.create(_peerConnectionCreate, rtcConfig: rtcConfiguration, connectOptions: connectOptions); + publisher = await Transport.create(_peerConnectionCreate, + rtcConfig: rtcConfiguration, connectOptions: connectOptions, singlePCMode: _singlePCMode); + + if (!_singlePCMode) { + subscriber = + await Transport.create(_peerConnectionCreate, rtcConfig: rtcConfiguration, connectOptions: connectOptions); + } publisher?.pc.onIceCandidate = (rtc.RTCIceCandidate candidate) { logger.fine('publisher onIceCandidate'); @@ -643,41 +713,50 @@ class Engine extends Disposable with EventsEmittable { } }; - subscriber?.pc.onIceCandidate = (rtc.RTCIceCandidate candidate) { - logger.fine('subscriber onIceCandidate'); - signalClient.sendIceCandidate(candidate, lk_rtc.SignalTarget.SUBSCRIBER); - }; + if (!_singlePCMode) { + subscriber?.pc.onIceCandidate = (rtc.RTCIceCandidate candidate) { + logger.fine('subscriber onIceCandidate'); + signalClient.sendIceCandidate(candidate, lk_rtc.SignalTarget.SUBSCRIBER); + }; - subscriber?.pc.onIceConnectionState = (rtc.RTCIceConnectionState state) async { - logger.fine('subscriber iceConnectionState: $state'); - if (state == rtc.RTCIceConnectionState.RTCIceConnectionStateConnected) { - await _handleGettingConnectedServerAddress(subscriber!.pc); - } - }; + subscriber?.pc.onIceConnectionState = (rtc.RTCIceConnectionState state) async { + logger.fine('subscriber iceConnectionState: $state'); + if (state == rtc.RTCIceConnectionState.RTCIceConnectionStateConnected) { + await _handleGettingConnectedServerAddress(subscriber!.pc); + } + }; + } publisher?.onOffer = (offer) { logger.fine('publisher onOffer'); signalClient.sendOffer(offer); }; - // in subscriber primary mode, server side opens sub data channels. - if (_subscriberPrimary) { + // In single PC mode, server opens data channels on publisher. + // In dual PC subscriber-primary mode, server opens data channels on subscriber. + if (_singlePCMode) { + publisher?.pc.onDataChannel = _onDataChannel; + } else if (_subscriberPrimary) { subscriber?.pc.onDataChannel = _onDataChannel; } - subscriber?.pc.onConnectionState = (state) async { - events.emit(EngineSubscriberPeerStateUpdatedEvent( - state: state, - isPrimary: _subscriberPrimary, - )); - logger.fine('subscriber connectionState: $state'); - if (state.isDisconnected() || state.isFailed()) { - await handleReconnect( - state.isFailed() ? ClientDisconnectReason.peerConnectionFailed : ClientDisconnectReason.peerConnectionClosed, - reconnectReason: lk_models.ReconnectReason.RR_SUBSCRIBER_FAILED, - ); - } - }; + if (!_singlePCMode) { + subscriber?.pc.onConnectionState = (state) async { + events.emit(EngineSubscriberPeerStateUpdatedEvent( + state: state, + isPrimary: _subscriberPrimary, + )); + logger.fine('subscriber connectionState: $state'); + if (state.isDisconnected() || state.isFailed()) { + await handleReconnect( + state.isFailed() + ? ClientDisconnectReason.peerConnectionFailed + : ClientDisconnectReason.peerConnectionClosed, + reconnectReason: lk_models.ReconnectReason.RR_SUBSCRIBER_FAILED, + ); + } + }; + } publisher?.pc.onConnectionState = (state) async { if ([ @@ -689,7 +768,7 @@ class Engine extends Disposable with EventsEmittable { } events.emit(EnginePublisherPeerStateUpdatedEvent( state: state, - isPrimary: !_subscriberPrimary, + isPrimary: _singlePCMode || !_subscriberPrimary, )); logger.fine('publisher connectionState: $state'); if (state.isDisconnected() || state.isFailed()) { @@ -700,58 +779,19 @@ class Engine extends Disposable with EventsEmittable { } }; - subscriber?.pc.onTrack = (rtc.RTCTrackEvent event) { - logger.fine('[WebRTC] pc.onTrack'); - - final stream = event.streams.firstOrNull; - if (stream == null) { - // we need the stream to get the track's id - logger.severe('received track without mediastream'); - return; - } - - // doesn't get called reliably - event.track.onEnded = () { - logger.fine('[WebRTC] track.onEnded'); - }; - - // doesn't get called reliably - stream.onRemoveTrack = (_) { - logger.fine('[WebRTC] stream.onRemoveTrack'); - }; - - if (signalClient.connectionState == ConnectionState.reconnecting || - signalClient.connectionState == ConnectionState.connecting) { - final track = event.track; - final receiver = event.receiver; - events.on((event) async { - Timer(const Duration(milliseconds: 10), () { - events.emit(EngineTrackAddedEvent( - track: track, - stream: stream, - receiver: receiver, - )); - }); - }); - return; - } - - if (connectionState == ConnectionState.disconnected) { - logger.warning('skipping incoming track after Room disconnected'); - return; - } - - events.emit(EngineTrackAddedEvent( - track: event.track, - stream: stream, - receiver: event.receiver, - )); - }; + // In single PC mode, tracks arrive via publisher; otherwise via subscriber. + if (_singlePCMode) { + publisher?.pc.onTrack = _handleOnTrack; + } else { + subscriber?.pc.onTrack = _handleOnTrack; + } // doesn't get called reliably, doesn't work on mac - subscriber?.pc.onRemoveTrack = (rtc.MediaStream stream, rtc.MediaStreamTrack track) { - logger.fine('[WebRTC] ${track.id} pc.onRemoveTrack'); - }; + if (!_singlePCMode) { + subscriber?.pc.onRemoveTrack = (rtc.MediaStream stream, rtc.MediaStreamTrack track) { + logger.fine('[WebRTC] ${track.id} pc.onRemoveTrack'); + }; + } // also handle messages over the pub channel, for backwards compatibility try { @@ -802,23 +842,21 @@ class Engine extends Disposable with EventsEmittable { logger.fine('Server opened DC label: ${dc.label}'); _reliableDCSub = dc; _reliableDCSub?.onMessage = _onDCMessage; - _reliableDCSub?.stateChangeStream.listen((state) => - _reliableDCPub?.stateChangeStream.listen((state) => events.emit(SubscriberDataChannelStateUpdatedEvent( - isPrimary: _subscriberPrimary, - state: state, - type: Reliability.reliable, - )))); + _reliableDCSub?.stateChangeStream.listen((state) => events.emit(SubscriberDataChannelStateUpdatedEvent( + isPrimary: _subscriberPrimary, + state: state, + type: Reliability.reliable, + ))); break; case _lossyDCLabel: logger.fine('Server opened DC label: ${dc.label}'); _lossyDCSub = dc; _lossyDCSub?.onMessage = _onDCMessage; - _lossyDCSub?.stateChangeStream.listen((event) => - _reliableDCPub?.stateChangeStream.listen((state) => events.emit(SubscriberDataChannelStateUpdatedEvent( - isPrimary: _subscriberPrimary, - state: state, - type: Reliability.lossy, - )))); + _lossyDCSub?.stateChangeStream.listen((state) => events.emit(SubscriberDataChannelStateUpdatedEvent( + isPrimary: _subscriberPrimary, + state: state, + type: Reliability.lossy, + ))); break; default: logger.warning('Unknown DC label: ${dc.label}'); @@ -1150,7 +1188,9 @@ class Engine extends Disposable with EventsEmittable { logger.fine('resumeConnection: primary is connected: $isConnected'); if (!isConnected) { - subscriber!.restartingIce = true; + if (!_singlePCMode && subscriber != null) { + subscriber!.restartingIce = true; + } logger.fine('resumeConnection: Waiting for primary to connect...'); await events.waitFor( filter: (event) => event.isPrimary && event.state.isConnected(), @@ -1230,7 +1270,17 @@ class Engine extends Disposable with EventsEmittable { required Iterable? publishTracks, required List trackSidsDisabled, }) async { - final previousAnswer = (await subscriber?.pc.getLocalDescription())?.toPBType(); + lk_rtc.SessionDescription? previousAnswer; + lk_rtc.SessionDescription? previousOffer; + + if (_singlePCMode) { + // In single PC mode, answer = publisher's remoteDescription, + // offer = publisher's localDescription. + previousAnswer = (await publisher?.pc.getRemoteDescription())?.toPBType(); + previousOffer = (await publisher?.pc.getLocalDescription())?.toPBType(); + } else { + previousAnswer = (await subscriber?.pc.getLocalDescription())?.toPBType(); + } // Build data channel receive states for reliability final dataChannelReceiveStates = []; @@ -1245,6 +1295,7 @@ class Engine extends Disposable with EventsEmittable { } signalClient.sendSyncState( answer: previousAnswer, + offer: previousOffer, subscription: subscription, publishTracks: publishTracks, dataChannelInfo: dataChannelInfo(), @@ -1262,6 +1313,9 @@ class Engine extends Disposable with EventsEmittable { ..on((event) async { // create peer connections _subscriberPrimary = event.response.subscriberPrimary; + // Derive single PC mode from the actual signal path, not just the option. + // If v1 was unavailable, signalClient falls back to v0 and useV1SignalPath=false. + _singlePCMode = signalClient.useV1SignalPath; _serverInfo = event.response.serverInfo; final iceServersFromServer = event.response.iceServers.map((e) => e.toSDKType()).toList(); @@ -1314,7 +1368,9 @@ class Engine extends Disposable with EventsEmittable { serverProvidedIceServers: _serverProvidedIceServers); await publisher?.pc.setConfiguration(rtcConfiguration.toMap()); - await subscriber?.pc.setConfiguration(rtcConfiguration.toMap()); + if (!_singlePCMode) { + await subscriber?.pc.setConfiguration(rtcConfiguration.toMap()); + } if (!_subscriberPrimary) { await negotiate(); @@ -1352,6 +1408,11 @@ class Engine extends Disposable with EventsEmittable { } }) ..on((event) async { + if (_singlePCMode) { + // In single PC mode, server uses mediaSectionsRequirement instead of offers. + logger.warning('Received SignalOfferEvent in single PC mode, ignoring'); + return; + } if (subscriber == null) { logger.warning('[$objectId] subscriber is null'); return; @@ -1382,6 +1443,15 @@ class Engine extends Disposable with EventsEmittable { await publisher!.setRemoteDescription(event.sd); }) ..on((event) async { + if (_singlePCMode) { + if (publisher == null) { + logger.warning('Received ${SignalTrickleEvent} but publisher was null.'); + return; + } + logger.fine('got ICE candidate from peer (single PC mode)'); + await publisher!.addIceCandidate(event.candidate); + return; + } if (publisher == null || subscriber == null) { logger.warning('Received ${SignalTrickleEvent} but publisher or subscriber was null.'); return; @@ -1435,6 +1505,38 @@ class Engine extends Disposable with EventsEmittable { signalClient.participantSid = event.response.participant.sid; } events.emit(EngineRoomMovedEvent(response: event.response)); + }) + ..on((event) async { + if (!_singlePCMode || publisher == null) { + logger.warning('Received media sections requirement but not in single PC mode or publisher is null'); + return; + } + // The server sends delta counts (how many *new* transceivers to add), + // not absolute totals. This matches the JS SDK behavior — we blindly + // add the requested number of recvonly transceivers each time. + // In single PC mode only the offerer (client) can add new m-lines, + // so the server signals when it needs additional media sections to + // deliver subscribed tracks. + logger.fine('Adding recvonly transceivers: audio=${event.numAudios}, video=${event.numVideos}'); + + final transceiverInit = rtc.RTCRtpTransceiverInit( + direction: rtc.TransceiverDirection.RecvOnly, + ); + + for (int i = 0; i < event.numAudios; i++) { + await publisher!.pc.addTransceiver( + kind: rtc.RTCRtpMediaType.RTCRtpMediaTypeAudio, + init: transceiverInit, + ); + } + for (int i = 0; i < event.numVideos; i++) { + await publisher!.pc.addTransceiver( + kind: rtc.RTCRtpMediaType.RTCRtpMediaTypeVideo, + init: transceiverInit, + ); + } + + await negotiate(); }); Future disconnect({ diff --git a/lib/src/core/signal_client.dart b/lib/src/core/signal_client.dart index 89bdb258f..a067d552d 100644 --- a/lib/src/core/signal_client.dart +++ b/lib/src/core/signal_client.dart @@ -94,6 +94,12 @@ class SignalClient extends Disposable with EventsEmittable { }); } + // Whether the established connection used the v1 signal path (single PC mode). + bool _useV1SignalPath = false; + + @internal + bool get useV1SignalPath => _useV1SignalPath; + @internal Future connect( String uriString, @@ -127,15 +133,36 @@ class SignalClient extends Disposable with EventsEmittable { } } - final rtcUri = await Utils.buildUri( - uriString, - token: token, - connectOptions: connectOptions, - roomOptions: roomOptions, - reconnect: reconnect, - sid: reconnect ? participantSid : null, - reconnectReason: reconnectReason, - ); + // Use v1 path for initial connection when singlePeerConnection is requested. + // For quick reconnect, use the same path version as the established connection. + final useV1 = reconnect ? _useV1SignalPath : connectOptions.singlePeerConnection; + + final Uri rtcUri; + if (useV1) { + rtcUri = await Utils.buildV1Uri( + uriString, + token: token, + connectOptions: connectOptions, + roomOptions: roomOptions, + reconnect: reconnect, + sid: reconnect ? participantSid : null, + reconnectReason: reconnectReason, + ); + } else { + rtcUri = await Utils.buildV0Uri( + uriString, + token: token, + connectOptions: connectOptions, + roomOptions: roomOptions, + reconnect: reconnect, + sid: reconnect ? participantSid : null, + reconnectReason: reconnectReason, + ); + } + + if (!reconnect) { + _useV1SignalPath = useV1; + } logger.fine('SignalClient connecting with url: $rtcUri'); @@ -173,15 +200,23 @@ class SignalClient extends Disposable with EventsEmittable { // Attempt Validation var finalError = socketError; try { - // Re-build same uri for validate mode - final validateUri = await Utils.buildUri( - uriString, - token: token, - connectOptions: connectOptions, - roomOptions: roomOptions, - validate: true, - forceSecure: rtcUri.isSecureScheme, - ); + // Re-build same uri for validate mode, matching the signal path version + final validateUri = useV1 + ? await Utils.buildV1ValidateUri( + uriString, + token: token, + connectOptions: connectOptions, + roomOptions: roomOptions, + forceSecure: rtcUri.isSecureScheme, + ) + : await Utils.buildV0Uri( + uriString, + token: token, + connectOptions: connectOptions, + roomOptions: roomOptions, + validate: true, + forceSecure: rtcUri.isSecureScheme, + ); final validateResponse = await http.get( validateUri, @@ -189,7 +224,10 @@ class SignalClient extends Disposable with EventsEmittable { 'Authorization': 'Bearer $token', }, ); - if (validateResponse.statusCode != 200) { + if (validateResponse.statusCode == 404) { + finalError = + ConnectException(validateResponse.body, reason: ConnectionErrorReason.ServiceNotFound, statusCode: 404); + } else if (validateResponse.statusCode != 200) { finalError = ConnectException(validateResponse.body, reason: validateResponse.statusCode >= 400 ? ConnectionErrorReason.NotAllowed @@ -359,6 +397,15 @@ class SignalClient extends Disposable with EventsEmittable { } events.emit(SignalRoomMovedEvent(response: msg.roomMoved)); break; + case lk_rtc.SignalResponse_Message.mediaSectionsRequirement: + logger.fine('received media sections requirement: ' + 'audios=${msg.mediaSectionsRequirement.numAudios}, ' + 'videos=${msg.mediaSectionsRequirement.numVideos}'); + events.emit(SignalMediaSectionsRequirementEvent( + numAudios: msg.mediaSectionsRequirement.numAudios, + numVideos: msg.mediaSectionsRequirement.numVideos, + )); + break; default: logger.warning('received unknown signal message'); } @@ -490,10 +537,12 @@ extension SignalClientRequests on SignalClient { required Iterable? dataChannelInfo, required List trackSidsDisabled, List? dataChannelReceiveStates, + lk_rtc.SessionDescription? offer, }) => _sendRequest(lk_rtc.SignalRequest( syncState: lk_rtc.SyncState( answer: answer, + offer: offer, subscription: subscription, publishTracks: publishTracks, dataChannels: dataChannelInfo, diff --git a/lib/src/core/transport.dart b/lib/src/core/transport.dart index 743e9eeb7..c7445ab27 100644 --- a/lib/src/core/transport.dart +++ b/lib/src/core/transport.dart @@ -64,9 +64,10 @@ class Transport extends Disposable { TransportOnOffer? onOffer; Function? _cancelDebounce; ConnectOptions connectOptions; + final bool singlePCMode; // private constructor - Transport._(this.pc, this.connectOptions) { + Transport._(this.pc, this.connectOptions, {this.singlePCMode = false}) { // onDispose(() async { _cancelDebounce?.call(); @@ -101,11 +102,11 @@ class Transport extends Disposable { } static Future create(PeerConnectionCreate peerConnectionCreate, - {RTCConfiguration? rtcConfig, required ConnectOptions connectOptions}) async { + {RTCConfiguration? rtcConfig, required ConnectOptions connectOptions, bool singlePCMode = false}) async { rtcConfig ??= const RTCConfiguration(); logger.fine('[PCTransport] creating ${rtcConfig.toMap()}'); final pc = await peerConnectionCreate(rtcConfig.toMap()); - return Transport._(pc, connectOptions); + return Transport._(pc, connectOptions, singlePCMode: singlePCMode); } late final negotiate = Utils.createDebounceFunc( @@ -214,8 +215,16 @@ class Transport extends Disposable { } }); + var mungedSdp = sdp_transform.write(sdpParsed, null); + + // In single PC mode, munge a=inactive to a=recvonly for RTP media sections. + // WebRTC can generate inactive direction even when transceivers were configured as recvonly. + if (singlePCMode) { + mungedSdp = mungeInactiveToRecvOnlyForMedia(mungedSdp); + } + try { - await setMungedSDP(sd: offer, munged: sdp_transform.write(sdpParsed, null)); + await setMungedSDP(sd: offer, munged: mungedSdp); } catch (e) { throw NegotiationError(e.toString()); } @@ -328,4 +337,70 @@ class Transport extends Disposable { rethrow; } } + + /// Munge SDP to change `a=inactive` to `a=recvonly` for RTP media sections + /// that have no SSRC (i.e. receive-only transceivers with no local media). + /// + /// In single PC mode, libWebRTC may incorrectly generate `a=inactive` for + /// transceivers that were configured as recvonly. We only fix sections + /// without SSRC lines to avoid touching sendonly/sendrecv transceivers that + /// have been intentionally set to inactive (e.g. after unpublishing). + static String mungeInactiveToRecvOnlyForMedia(String sdp) { + final usesCRLF = sdp.contains('\r\n'); + final eol = usesCRLF ? '\r\n' : '\n'; + final lines = sdp.split(eol); + + // Two-pass approach: first collect media section ranges and whether they + // contain SSRC lines, then rewrite only the qualifying sections. + final sections = <_MediaSection>[]; + for (int i = 0; i < lines.length; i++) { + final l = lines[i].trim(); + if (l.startsWith('m=')) { + sections.add(_MediaSection( + startIndex: i, + isRTP: l.contains('RTP/'), + )); + } else if (sections.isNotEmpty) { + if (l.startsWith('a=ssrc:')) { + sections.last.hasSSRC = true; + } + } + } + + // Build a set of line indices where a=inactive should be rewritten. + final rewriteIndices = {}; + for (final section in sections) { + if (!section.isRTP || section.hasSSRC) continue; + final end = sections.indexOf(section) + 1 < sections.length + ? sections[sections.indexOf(section) + 1].startIndex + : lines.length; + for (int i = section.startIndex; i < end; i++) { + if (lines[i].trim() == 'a=inactive') { + rewriteIndices.add(i); + } + } + } + + final out = []; + for (int i = 0; i < lines.length; i++) { + if (rewriteIndices.contains(i)) { + out.add('a=recvonly'); + } else { + out.add(lines[i]); + } + } + + var result = out.join(eol); + if (sdp.endsWith(eol) && !result.endsWith(eol)) { + result += eol; + } + return result; + } +} + +class _MediaSection { + final int startIndex; + final bool isRTP; + bool hasSSRC = false; + _MediaSection({required this.startIndex, required this.isRTP}); } diff --git a/lib/src/exceptions.dart b/lib/src/exceptions.dart index f14ba9640..ed5f2d50a 100644 --- a/lib/src/exceptions.dart +++ b/lib/src/exceptions.dart @@ -25,6 +25,7 @@ enum ConnectionErrorReason { NotAllowed, InternalError, Timeout, + ServiceNotFound, } /// An exception occurred while attempting to connect. diff --git a/lib/src/internal/events.dart b/lib/src/internal/events.dart index b4af51238..2e56e99ed 100644 --- a/lib/src/internal/events.dart +++ b/lib/src/internal/events.dart @@ -562,6 +562,19 @@ class SignalRoomMovedEvent with SignalEvent, InternalEvent { String toString() => '${runtimeType}(room: ${response.room.name})'; } +@internal +class SignalMediaSectionsRequirementEvent with SignalEvent, InternalEvent { + final int numAudios; + final int numVideos; + const SignalMediaSectionsRequirementEvent({ + required this.numAudios, + required this.numVideos, + }); + + @override + String toString() => '${runtimeType}(numAudios: $numAudios, numVideos: $numVideos)'; +} + // ---------------------------------------------------------------------- // Engine events // ---------------------------------------------------------------------- diff --git a/lib/src/options.dart b/lib/src/options.dart index f37cf43de..9eeb75dd5 100644 --- a/lib/src/options.dart +++ b/lib/src/options.dart @@ -61,12 +61,34 @@ class ConnectOptions { final Timeouts timeouts; + /// When true, will attempt to connect via single peer connection mode. + /// Automatically falls back to dual peer connection mode if the server + /// does not support the v1 signal path (HTTP 404). + final bool singlePeerConnection; + const ConnectOptions({ this.autoSubscribe = true, this.rtcConfiguration = const RTCConfiguration(), this.protocolVersion = ProtocolVersion.v16, this.timeouts = Timeouts.defaultTimeouts, + this.singlePeerConnection = false, }); + + ConnectOptions copyWith({ + bool? autoSubscribe, + RTCConfiguration? rtcConfiguration, + ProtocolVersion? protocolVersion, + Timeouts? timeouts, + bool? singlePeerConnection, + }) { + return ConnectOptions( + autoSubscribe: autoSubscribe ?? this.autoSubscribe, + rtcConfiguration: rtcConfiguration ?? this.rtcConfiguration, + protocolVersion: protocolVersion ?? this.protocolVersion, + timeouts: timeouts ?? this.timeouts, + singlePeerConnection: singlePeerConnection ?? this.singlePeerConnection, + ); + } } /// Options used to modify the behavior of the [Room]. diff --git a/lib/src/utils.dart b/lib/src/utils.dart index cb0297d05..0c8ff2ef4 100644 --- a/lib/src/utils.dart +++ b/lib/src/utils.dart @@ -26,6 +26,7 @@ import 'package:flutter_webrtc/flutter_webrtc.dart' as rtc; import 'package:meta/meta.dart'; import './proto/livekit_models.pb.dart' as lk_models; +import './proto/livekit_rtc.pb.dart' as lk_rtc; import './support/native.dart'; import 'extensions.dart'; import 'livekit.dart'; @@ -157,7 +158,7 @@ class Utils { } @internal - static Future buildUri( + static Future buildV0Uri( String uriString, { required String token, required ConnectOptions connectOptions, @@ -210,6 +211,91 @@ class Utils { ); } + @internal + static Future buildV1Uri( + String uriString, { + required String token, + required ConnectOptions connectOptions, + required RoomOptions roomOptions, + bool reconnect = false, + String? sid, + lk_models.ReconnectReason? reconnectReason, + }) async { + final Uri uri = Uri.parse(uriString); + + final useSecure = uri.isSecureScheme; + final wsScheme = useSecure ? 'wss' : 'ws'; + + final pathSegments = List.from(uri.pathSegments); + pathSegments.removeWhere((e) => e.isEmpty); + pathSegments.addAll(['rtc', 'v1']); + + final clientInfo = await _clientInfo(); + final networkType = await getNetworkType(); + + // Populate ClientInfo with the same fields that v0 sends as query params + if (clientInfo != null) { + clientInfo.sdk = lk_models.ClientInfo_SDK.FLUTTER; + clientInfo.version = LiveKitClient.version; + clientInfo.protocol = int.tryParse(connectOptions.protocolVersion.toStringValue()) ?? 0; + clientInfo.network = networkType; + } + + final joinRequest = lk_rtc.JoinRequest( + clientInfo: clientInfo, + connectionSettings: lk_rtc.ConnectionSettings( + autoSubscribe: connectOptions.autoSubscribe, + adaptiveStream: roomOptions.adaptiveStream, + ), + reconnect: reconnect ? true : null, + participantSid: reconnect ? sid : null, + reconnectReason: reconnect ? reconnectReason : null, + ); + + final wrappedJoinRequest = lk_rtc.WrappedJoinRequest( + joinRequest: joinRequest.writeToBuffer(), + ); + + final joinRequestBase64 = base64Encode(wrappedJoinRequest.writeToBuffer()); + + return uri.replace( + scheme: wsScheme, + pathSegments: pathSegments, + queryParameters: { + 'access_token': token, + 'join_request': joinRequestBase64, + }, + ); + } + + /// Converts a v1 WebSocket URL to its HTTP validation counterpart. + /// `wss://host/rtc/v1` → `https://host/rtc/v1/validate` + @internal + static Future buildV1ValidateUri( + String uriString, { + required String token, + required ConnectOptions connectOptions, + required RoomOptions roomOptions, + bool forceSecure = false, + }) async { + final Uri uri = Uri.parse(uriString); + + final useSecure = uri.isSecureScheme || forceSecure; + final httpScheme = useSecure ? 'https' : 'http'; + + final pathSegments = List.from(uri.pathSegments); + pathSegments.removeWhere((e) => e.isEmpty); + pathSegments.addAll(['rtc', 'v1', 'validate']); + + return uri.replace( + scheme: httpScheme, + pathSegments: pathSegments, + queryParameters: { + 'access_token': token, + }, + ); + } + static List _presetsForDimensions({ required bool isScreenShare, required VideoDimensions dimensions,