Skip to content
Merged
88 changes: 88 additions & 0 deletions firebase-dataconnect/RealtimeTodo.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Realtime Query Subscription TODO List

### TODO 1: Lack of Connection Health Monitoring / Reconnection

* **File:** RealtimeQueryManager.kt
* **Severity:** `CRITICAL`

#### Description

Once `RealtimeQueryManager` successfully transitions to `State.Connected(stream)`, it remains in
this state permanently. If the underlying bidirectional gRPC connection is lost or the stream is
closed (due to network issues, server-side termination, or client-side close), the manager does not
detect this. Any future calls to `subscribe()` will read `State.Connected` and try to use the dead
stream, causing new subscriptions to silently hang or fail indefinitely without any reconnection
attempts.

#### Recommendation

Add connection health monitoring or detect when the stream has completed/failed, and transition
the manager's state back to `State.Disconnected` (or clean up resources) to allow subsequent
subscription calls to trigger a new connection.

---

### TODO 2: Permanent Lock-out / Stuck in `Connecting` State on Connection Failure

* **File:** RealtimeQueryManager.kt
* **Severity:** `CRITICAL`

#### Description

In `ensureConnected`, `currentState.job.await()` is called to wait for the lazy `Deferred`
connection job. If the connection attempt fails (e.g., due to a temporary network issue) and
throws an exception, the exception propagates out of the method. Because the exception is thrown
before the state is updated, the manager's state remains permanently stuck in
`State.Connecting(job)`. Any future connection attempts will call `await()` on the same failed
`Deferred` job, which immediately and permanently re-throws the same cached exception, making the
manager completely unusable until the app or SDK instance is restarted.

### TODO 3: Memory and Resource Leak of Subscription Flows in `flowByQueryId`

* **File:** RealtimeQueryManager.kt
* **Severity:** `HIGH`

#### Description

Active subscription flows are stored in the `flowByQueryId` map to deduplicate identical queries.
However, there is no mechanism to remove flows from this map when a subscription is cancelled,
completed, or when there are no active collectors left. This leads to an unbounded memory/resource
leak as the client subscribes to different queries over time. Furthermore, because the subscription
is never cleaned up, the backend stream may continue sending updates for cancelled subscriptions,
wasting bandwidth and server resources.

#### Recommendation

Implement a reference-counting mechanism or a cleanup callback upon flow completion to remove the
query from `flowByQueryId` once the active collector count drops to zero.

### TODO 3: Memory and Resource Leak of Subscription Flows in `flowByQueryId`

* **File:** DataConnectBidiConnectStream.kt
* **Severity:** `HIGH`

#### Description

Late subscribers to a stream that has already completed (server-side) will hang indefinitely.
This occurs because `incomingResponses` is a `SharedFlow` with `replay = 0`. If the stream
completes, the `IncomingResponse.Completed` signal is emitted and lost for future subscribers.
These subscribers will then wait in `transformWhile` for new emissions that will never come.

The current implementation of `onCompletion` does not prevent the hang for late subscribers.
Because `streams.incomingResponses` is a `SharedFlow` with `replay = 0`, if the stream has already
completed, the `IncomingResponse.Completed` signal is lost. A new subscriber will begin collecting
from `incomingResponses` and wait indefinitely for emissions that will never arrive. The
`onCompletion` block is only executed *after* the flow collection completes, so it cannot resolve a
hang that occurs *during* the collection process.

Discovered by gemini code assist:
https://github.com/firebase/firebase-android-sdk/pull/8158#discussion_r3240286966

#### Recommendation

You should check `streams.completedResponse` before starting the flow collection to ensure the
stream is still active.

To address this, you should check the state of `streams.completedResponse` *before* or *at the
start* of the flow collection (e.g., inside the `flow { ... }` builder) to verify if the stream is
already finished, and emit the completion signal or throw the cached exception immediately if it is.
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import com.google.firebase.dataconnect.core.LoggerGlobals.debug
import com.google.firebase.dataconnect.util.CoroutineUtils.createChildSupervisorScope
import com.google.firebase.dataconnect.util.NullableReference
import com.google.firebase.dataconnect.util.ProtoUtil.toCompactString
import com.google.protobuf.Empty
import com.google.protobuf.Struct
import google.firebase.dataconnect.proto.ExecuteRequest
import google.firebase.dataconnect.proto.GraphqlError as GraphqlErrorProto
import google.firebase.dataconnect.proto.GraphqlResponseExtensions.DataConnectProperties as DataConnectPropertiesProto
import google.firebase.dataconnect.proto.ResumeRequest
import google.firebase.dataconnect.proto.StreamRequest as StreamRequestProto
import google.firebase.dataconnect.proto.StreamResponse as StreamResponseProto
import kotlinx.coroutines.CoroutineScope
Expand All @@ -39,7 +41,9 @@ import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.flow.onCompletion
Expand All @@ -48,6 +52,8 @@ import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.flow.transformWhile
import kotlinx.coroutines.flow.update
import kotlinx.coroutines.job
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

/**
* Manages a bidirectional gRPC stream for Data Connect operations.
Expand Down Expand Up @@ -169,65 +175,51 @@ internal class DataConnectBidiConnectStream(
State.Closed -> error("DataConnectBidiConnectStream.close() has been called [rptkgcfzyz]")
}

val streamRequest =
StreamRequestProto.newBuilder().let { streamRequest ->
streamRequest.setRequestId(requestId)
streamRequest.setSubscribe(
ExecuteRequest.newBuilder().let { executeRequest ->
executeRequest.setOperationName(operationName)
executeRequest.setVariables(variables)
executeRequest.build()
}
)
streamRequest.build()
}
val subscriptionStateManager =
SubscriptionStateManager(
requestId = requestId,
operationName = operationName,
variables,
streams.outgoingRequests
)

return flow {
val subscription = subscriptionStateManager.Subscriber()

val outgoingRequests = streams.outgoingRequests
val incomingResponses = streams.incomingResponses
val completedResponse = streams.completedResponse

return incomingResponses
.onSubscription { emit(IncomingResponse.Subscribed) }
.transformWhile { incomingResponse ->
when (incomingResponse) {
is IncomingResponse.Subscribed -> {
val sendResult = outgoingRequests.trySend(streamRequest)
when {
sendResult.isSuccess -> true
sendResult.isClosed -> false
else ->
error(
"internal error xw3zdzycfq: outgoingRequests.trySend(streamRequest) " +
"was unable to enqueue the streamRequest; this should never happen because " +
"outgoingRequests is created with capacity=UNLIMITED (sendResult=$sendResult)"
)
emitAll(
streams.incomingResponses
.onSubscription { emit(IncomingResponse.Subscribed) }
.transformWhile { incomingResponse ->
Comment thread
dconeybe marked this conversation as resolved.
when (incomingResponse) {
is IncomingResponse.Subscribed -> subscription.onSubscribed()
is IncomingResponse.Message -> {
if (incomingResponse.streamResponse.requestId != requestId) {
true
} else {
val executeResponse = incomingResponse.streamResponse.toExecuteResponse()
if (executeResponse !== null) {
emit(executeResponse)
}
!incomingResponse.streamResponse.cancelled
}
}
is IncomingResponse.Completed -> {
false // NOTE: The downstream onCompletion() looks after throwing the exception.
}
}
}
is IncomingResponse.Message -> {
if (incomingResponse.streamResponse.requestId != requestId) {
true
} else {
val executeResponse = incomingResponse.streamResponse.toExecuteResponse()
if (executeResponse !== null) {
emit(executeResponse)
.onCompletion { throwable ->
subscription.onCompleted()

if (throwable === null) {
val completed = streams.completedResponse.mapNotNull { it.ref }.first()
if (completed.throwable !== null) {
throw completed.throwable
}
!incomingResponse.streamResponse.cancelled
}
}
is IncomingResponse.Completed -> {
// NOTE: The downstream onCompletion() callback looks after throwing the exception.
false
}
}
}
.onCompletion { throwable ->
if (throwable === null) {
val completed = completedResponse.mapNotNull { it.ref }.first()
if (completed.throwable !== null) {
throw completed.throwable
}
}
}
)
}
}

/**
Expand Down Expand Up @@ -328,6 +320,101 @@ internal class DataConnectBidiConnectStream(
object Subscribed : IncomingResponse
}

private class SubscriptionStateManager(
requestId: String,
operationName: String,
variables: Struct,
private val outgoingRequests: SendChannel<StreamRequestProto>,
) {

val mutex = Mutex()
var subscriberCount = 0

inner class Subscriber {
private var subscribed = false

suspend fun onSubscribed(): Boolean =
mutex.withLock {
val streamRequest =
if (subscriberCount == 0) {
subscribeStreamRequest
} else {
resumeStreamRequest
}

val sendResult = outgoingRequests.trySend(streamRequest)

when {
sendResult.isSuccess -> {
subscribed = true
subscriberCount++
true
}
sendResult.isClosed -> false
else ->
error(
"internal error xw3zdzycfq: outgoingRequests.trySend(subscribe or resume) " +
"was unable to enqueue the streamRequest; this should never happen because " +
"outgoingRequests is created with capacity=UNLIMITED (sendResult=$sendResult)"
)
}
}

suspend fun onCompleted() {
mutex.withLock {
if (!subscribed) {
return
}

subscribed = false
subscriberCount--
check(subscriberCount >= 0) {
"internal error hpn3qsj746: subscriberCount should never be less than zero, " +
"but it is: $subscriberCount"
}

if (subscriberCount == 0) {
val sendResult = outgoingRequests.trySend(cancelStreamRequest)
if (sendResult.isFailure && !sendResult.isClosed) {
error(
"internal error mxcsq556tv: outgoingRequests.trySend(cancel) " +
"was unable to enqueue the streamRequest; this should never happen because " +
"outgoingRequests is created with capacity=UNLIMITED (sendResult=$sendResult)"
)
}
}
}
}
}

private val subscribeStreamRequest =
StreamRequestProto.newBuilder().let { streamRequest ->
streamRequest.setRequestId(requestId)
streamRequest.setSubscribe(
ExecuteRequest.newBuilder().let { executeRequest ->
executeRequest.setOperationName(operationName)
executeRequest.setVariables(variables)
executeRequest.build()
}
)
streamRequest.build()
}

private val resumeStreamRequest =
StreamRequestProto.newBuilder().let { streamRequest ->
streamRequest.setRequestId(requestId)
streamRequest.setResume(ResumeRequest.getDefaultInstance())
streamRequest.build()
}

private val cancelStreamRequest =
StreamRequestProto.newBuilder().let { streamRequest ->
streamRequest.setRequestId(requestId)
streamRequest.setCancel(Empty.getDefaultInstance())
streamRequest.build()
}
}

private companion object {

fun StreamResponseProto.toExecuteResponse(): ExecuteResponse? =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ import com.google.protobuf.Struct
import google.firebase.dataconnect.proto.GraphqlError
import io.grpc.Status
import io.grpc.StatusException
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.withContext

internal class DataConnectGrpcClient(
private val grpcRPCs: DataConnectGrpcRPCs,
Expand Down Expand Up @@ -99,43 +95,19 @@ internal class DataConnectGrpcClient(
}

@ExperimentalRealtimeQueries
fun connect(
suspend fun connect(
streamId: String,
requestId: String,
operationName: String,
variables: Struct,
callerSdkType: FirebaseDataConnect.CallerSdkType,
): Flow<OperationResult> = flow {
val connection =
grpcRPCs.retryOnGrpcUnauthenticatedError(requestId, "connect") { authToken, appCheckToken ->
connect(
requestId,
callerSdkType,
authToken,
appCheckToken,
)
}

try {
val flow =
connection.subscribe(
requestId = requestId,
operationName = operationName,
variables = variables,
)

flow.collect { executeResponse: DataConnectBidiConnectStream.ExecuteResponse ->
emit(
OperationResult(
data = executeResponse.data,
errors = executeResponse.errors,
source = DataSource.SERVER,
)
)
}
} finally {
withContext(NonCancellable) { connection.close() }
): DataConnectBidiConnectStream =
grpcRPCs.retryOnGrpcUnauthenticatedError(requestId, "connect") { authToken, appCheckToken ->
connect(
requestId,
callerSdkType,
authToken,
appCheckToken,
)
}
}

private suspend inline fun <T, R> T.retryOnGrpcUnauthenticatedError(
requestId: String,
Expand Down
Loading
Loading