Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions stream-video-android-core/api/stream-video-android-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -3890,24 +3890,26 @@ public final class io/getstream/android/video/generated/models/IngressVideoLayer
}

public final class io/getstream/android/video/generated/models/JoinCallRequest {
public fun <init> (Ljava/lang/String;Ljava/lang/Boolean;Ljava/lang/Integer;Ljava/lang/String;Ljava/lang/Boolean;Ljava/lang/Boolean;Ljava/lang/Boolean;Lio/getstream/android/video/generated/models/CallRequest;)V
public synthetic fun <init> (Ljava/lang/String;Ljava/lang/Boolean;Ljava/lang/Integer;Ljava/lang/String;Ljava/lang/Boolean;Ljava/lang/Boolean;Ljava/lang/Boolean;Lio/getstream/android/video/generated/models/CallRequest;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun <init> (Ljava/lang/String;Ljava/lang/Boolean;Ljava/lang/Integer;Ljava/lang/String;Ljava/util/List;Ljava/lang/Boolean;Ljava/lang/Boolean;Ljava/lang/Boolean;Lio/getstream/android/video/generated/models/CallRequest;)V
public synthetic fun <init> (Ljava/lang/String;Ljava/lang/Boolean;Ljava/lang/Integer;Ljava/lang/String;Ljava/util/List;Ljava/lang/Boolean;Ljava/lang/Boolean;Ljava/lang/Boolean;Lio/getstream/android/video/generated/models/CallRequest;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun component1 ()Ljava/lang/String;
public final fun component2 ()Ljava/lang/Boolean;
public final fun component3 ()Ljava/lang/Integer;
public final fun component4 ()Ljava/lang/String;
public final fun component5 ()Ljava/lang/Boolean;
public final fun component5 ()Ljava/util/List;
public final fun component6 ()Ljava/lang/Boolean;
public final fun component7 ()Ljava/lang/Boolean;
public final fun component8 ()Lio/getstream/android/video/generated/models/CallRequest;
public final fun copy (Ljava/lang/String;Ljava/lang/Boolean;Ljava/lang/Integer;Ljava/lang/String;Ljava/lang/Boolean;Ljava/lang/Boolean;Ljava/lang/Boolean;Lio/getstream/android/video/generated/models/CallRequest;)Lio/getstream/android/video/generated/models/JoinCallRequest;
public static synthetic fun copy$default (Lio/getstream/android/video/generated/models/JoinCallRequest;Ljava/lang/String;Ljava/lang/Boolean;Ljava/lang/Integer;Ljava/lang/String;Ljava/lang/Boolean;Ljava/lang/Boolean;Ljava/lang/Boolean;Lio/getstream/android/video/generated/models/CallRequest;ILjava/lang/Object;)Lio/getstream/android/video/generated/models/JoinCallRequest;
public final fun component8 ()Ljava/lang/Boolean;
public final fun component9 ()Lio/getstream/android/video/generated/models/CallRequest;
public final fun copy (Ljava/lang/String;Ljava/lang/Boolean;Ljava/lang/Integer;Ljava/lang/String;Ljava/util/List;Ljava/lang/Boolean;Ljava/lang/Boolean;Ljava/lang/Boolean;Lio/getstream/android/video/generated/models/CallRequest;)Lio/getstream/android/video/generated/models/JoinCallRequest;
public static synthetic fun copy$default (Lio/getstream/android/video/generated/models/JoinCallRequest;Ljava/lang/String;Ljava/lang/Boolean;Ljava/lang/Integer;Ljava/lang/String;Ljava/util/List;Ljava/lang/Boolean;Ljava/lang/Boolean;Ljava/lang/Boolean;Lio/getstream/android/video/generated/models/CallRequest;ILjava/lang/Object;)Lio/getstream/android/video/generated/models/JoinCallRequest;
public fun equals (Ljava/lang/Object;)Z
public final fun getCreate ()Ljava/lang/Boolean;
public final fun getData ()Lio/getstream/android/video/generated/models/CallRequest;
public final fun getLocation ()Ljava/lang/String;
public final fun getMembersLimit ()Ljava/lang/Integer;
public final fun getMigratingFrom ()Ljava/lang/String;
public final fun getMigratingFromList ()Ljava/util/List;
public final fun getNotify ()Ljava/lang/Boolean;
public final fun getRing ()Ljava/lang/Boolean;
public final fun getVideo ()Ljava/lang/Boolean;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ data class JoinCallRequest (
@Json(name = "migrating_from")
val migratingFrom: kotlin.String? = null,

@Json(name = "migrating_from_list")
val migratingFromList: kotlin.collections.List<kotlin.String>? = null,
Comment thread
PratimMallick marked this conversation as resolved.

@Json(name = "notify")
val notify: kotlin.Boolean? = null,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,12 @@ public class Call(
var sessionId = UUID.randomUUID().toString()
internal val unifiedSessionId = UUID.randomUUID().toString()

/**
* SFU IDs (edge names) we failed to connect to (e.g. SFU_FULL). Sent in migrating_from_list
* when requesting new credentials so the coordinator can exclude them.
*/
private val failedSfuIds = Collections.synchronizedList(mutableListOf<String>())
Comment thread
PratimMallick marked this conversation as resolved.
Outdated

internal var connectStartTime = 0L
internal var reconnectStartTime = 0L

Expand Down Expand Up @@ -638,6 +644,7 @@ public class Call(
val sfuToken = result.value.credentials.token
val sfuUrl = result.value.credentials.server.url
val sfuWsUrl = result.value.credentials.server.wsEndpoint
val sfuName = result.value.credentials.server.edgeName
val iceServers = result.value.credentials.iceServers.map { it.toIceServer() }
try {
session = if (testInstanceProvider.rtcSessionCreator != null) {
Expand All @@ -652,6 +659,7 @@ public class Call(
sfuUrl = sfuUrl,
sfuWsUrl = sfuWsUrl,
sfuToken = sfuToken,
sfuName = sfuName,
remoteIceServers = iceServers,
powerManager = powerManager,
)
Expand Down Expand Up @@ -828,6 +836,7 @@ public class Call(
cred.server.url,
cred.server.wsEndpoint,
cred.token,
cred.server.edgeName,
cred.iceServers.map { ice ->
ice.toIceServer()
},
Expand Down Expand Up @@ -861,15 +870,16 @@ public class Call(
state._connection.value = RealtimeConnection.Migrating
location?.let {
reconnectStartTime = System.currentTimeMillis()
session?.sfuName?.let { addFailedSfuId(it) }

val joinResponse = joinRequest(location = it)
val joinResponse = joinRequest(location = it, migratingFrom = session?.sfuName)
if (joinResponse is Success) {
// switch to the new SFU
val cred = joinResponse.value.credentials
val session = this.session!!
val currentOptions = this.session?.publisher?.currentOptions()
val oldSfuUrl = session.sfuUrl
logger.i { "Rejoin SFU $oldSfuUrl to ${cred.server.url}" }
val oldSfuName = session.sfuName
logger.i { "Migrate SFU $oldSfuName to ${cred.server.edgeName}" }

this.sessionId = UUID.randomUUID().toString()
val (prevSessionId, subscriptionsInfo, publishingInfo) = session.currentSfuInfo()
Expand All @@ -878,7 +888,7 @@ public class Call(
strategy = WebsocketReconnectStrategy.WEBSOCKET_RECONNECT_STRATEGY_MIGRATE,
announced_tracks = publishingInfo,
subscriptions = subscriptionsInfo,
from_sfu_id = oldSfuUrl,
from_sfu_id = oldSfuName,
reconnect_attempt = reconnectAttepmts,
)
session.prepareRejoin()
Expand All @@ -894,6 +904,7 @@ public class Call(
cred.server.url,
cred.server.wsEndpoint,
cred.token,
cred.server.edgeName,
cred.iceServers.map { ice ->
ice.toIceServer()
},
Expand Down Expand Up @@ -1482,14 +1493,43 @@ public class Call(
return clientImpl.muteUsers(type, id, request)
}

/** Adds the given SFU ID (edge name) to the failed list (for migrating_from_list). */
private fun addFailedSfuId(sfuId: String) {
if (sfuId.isBlank()) return
synchronized(failedSfuIds) {
if (!failedSfuIds.contains(sfuId)) {
failedSfuIds.add(sfuId)
}
Comment thread
PratimMallick marked this conversation as resolved.
Outdated
}
}

/** Returns a snapshot of failed SFU IDs to send as migrating_from_list. */
private fun getFailedSfuIdsSnapshot(): List<String> =
synchronized(failedSfuIds) { failedSfuIds.toList() }

/** Clears the failed SFU list (e.g. after a successful join). */
private fun clearFailedSfuIds() {
failedSfuIds.clear()
}

/**
* Called by [RtcSession] when connection to the SFU is established successfully.
* Clears the failed SFU list so we don't exclude this SFU on future requests.
*/
internal fun onSfuConnectionEstablished() {
clearFailedSfuIds()
}

@VisibleForTesting
internal suspend fun joinRequest(
create: CreateCallOptions? = null,
location: String,
migratingFrom: String? = null,
migratingFromList: List<String>? = null,
ring: Boolean = false,
notify: Boolean = false,
): Result<JoinCallResponse> {
val migratingFromList = migratingFromList ?: getFailedSfuIdsSnapshot().takeIf { it.isNotEmpty() }
val result = clientImpl.joinCall(
type, id,
create = create != null,
Expand All @@ -1502,6 +1542,7 @@ public class Call(
notify = notify,
location = location,
migratingFrom = migratingFrom,
migratingFromList = migratingFromList,
)
result.onSuccess {
state.updateFromResponse(it)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,8 @@ internal class StreamVideoClient internal constructor(
ring: Boolean = false,
notify: Boolean = false,
location: String,
migratingFrom: String?,
migratingFrom: String? = null,
migratingFromList: List<String>? = null,
): Result<JoinCallResponse> {
val joinCallRequest = JoinCallRequest(
create = create,
Expand All @@ -812,6 +813,7 @@ internal class StreamVideoClient internal constructor(
notify = notify,
location = location,
migratingFrom = migratingFrom,
migratingFromList = migratingFromList,
)

val result = apiCall {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ data class TrackDimensions(
*
* For developers: RtcSession throws [IllegalStateException] because its [coroutineScope] & [rtcSessionScope] throws it
*/
private const val MAX_SFU_CONNECTION_RETRIES = 2

public class RtcSession internal constructor(
client: StreamVideo,
private val sessionCounter: Int = 0,
Expand All @@ -218,6 +220,7 @@ public class RtcSession internal constructor(
internal var sfuUrl: String,
internal var sfuWsUrl: String,
internal var sfuToken: String,
internal var sfuName: String,
internal var remoteIceServers: List<IceServer>,
internal val clientImpl: StreamVideoClient = client as StreamVideoClient,
private val supervisorJob: CompletableJob = SupervisorJob(),
Expand Down Expand Up @@ -256,6 +259,13 @@ public class RtcSession internal constructor(
private var muteStateSyncJob: Job? = null
private val oneBasedSessionCounter = sessionCounter + 1

/**
* Tracks consecutive SFU connection failures for the current SFU. After
* [MAX_SFU_CONNECTION_RETRIES] the session triggers a migration to request a new SFU
* from the coordinator instead of retrying the same one indefinitely.
*/
private var sfuConnectionRetryCount = 0

private var stateJob: Job? = null
private var errorJob: Job? = null
private var eventJob: Job? = null
Expand Down Expand Up @@ -623,8 +633,10 @@ public class RtcSession internal constructor(
_sfuSfuSocketState.value = sfuSocketState
when (sfuSocketState) {
is SfuSocketState.Connected -> {
sfuConnectionRetryCount = 0
call.state._connection.value =
RealtimeConnection.Connected
call.onSfuConnectionEstablished()

val pendingTrickleEvents = iceTricklePendingEvents.toList()
iceTricklePendingEvents.clear()
Expand All @@ -637,8 +649,28 @@ public class RtcSession internal constructor(
call.state._connection.value =
RealtimeConnection.InProgress

is SfuSocketState.Disconnected.DisconnectedTemporarily -> {
sfuConnectionRetryCount++
logger.w {
"[stateJob] SFU connection failure $sfuConnectionRetryCount/$MAX_SFU_CONNECTION_RETRIES for $sfuName"
}
if (sfuConnectionRetryCount > MAX_SFU_CONNECTION_RETRIES) {
logger.w {
"[stateJob] Max retries reached for $sfuName, requesting new SFU via migrate()"
}
sfuConnectionRetryCount = 0
sfuConnectionModule.socketConnection.disconnect()
call.migrate()
}
}
Comment thread
rahul-lohra marked this conversation as resolved.
Outdated

is SfuSocketState.Disconnected.WebSocketEventLost -> {
// Intermediate state in HealthMonitor retry cycle — not a new
// connection failure, so don't increment the retry counter.
}

else -> {
// Ignore it
// Ignore other states
}
}
}
Expand Down
Loading