From 3ee35515b104ab101b9cbff3d7ffdeee185a360c Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Thu, 14 May 2026 03:06:17 -0400 Subject: [PATCH 1/8] TurbineUtils.kt: various improvements, notably TurbinePredicateResult --- .../dataconnect/testutil/TurbineUtils.kt | 156 ++++++++++++++---- 1 file changed, 121 insertions(+), 35 deletions(-) diff --git a/firebase-dataconnect/testutil/src/main/kotlin/com/google/firebase/dataconnect/testutil/TurbineUtils.kt b/firebase-dataconnect/testutil/src/main/kotlin/com/google/firebase/dataconnect/testutil/TurbineUtils.kt index 45039724394..1f72770af90 100644 --- a/firebase-dataconnect/testutil/src/main/kotlin/com/google/firebase/dataconnect/testutil/TurbineUtils.kt +++ b/firebase-dataconnect/testutil/src/main/kotlin/com/google/firebase/dataconnect/testutil/TurbineUtils.kt @@ -22,55 +22,132 @@ import io.grpc.Status import io.grpc.StatusException import io.kotest.assertions.fail import io.kotest.assertions.print.print +import java.util.concurrent.atomic.AtomicInteger -suspend inline fun ReceiveTurbine.awaitUntilItem( +/** + * Represents the result of evaluating a predicate on an item emitted from a [ReceiveTurbine]. + * + * Used by [awaitUntilItem] to let the predicate function not only test a value, but also map it to + * another value (if desired). For example, a predicate function could test that a given [String] + * parses successfully as an [Int], and then return the parsed [Int] instead of the original + * unparsed [String]. + * + * @param T The type of the mapped value when the predicate is satisfied, which does _not_ have to + * be the same type as the value that satisfied the predicate. + */ +sealed interface TurbinePredicateResult { + /** + * Represents a predicate being satisfied, holding the value that satisfied the predicate or some + * other value derived from that value. + * + * @property mappedValue The value resulting from the satisfied predicate, which can be the + * original value or a cast/transformed representation of it. + */ + class Satisfied(val mappedValue: T) : TurbinePredicateResult { + override fun toString() = + "TurbinePredicateResult.Satisfied(mappedValue=${mappedValue.print().value})" + } + /** Represents a failed match where the predicate is not satisfied. */ + object Unsatisfied : TurbinePredicateResult { + override fun toString() = "TurbinePredicateResult.Unsatisfied" + } +} + +/** + * Awaits events from the receiver [ReceiveTurbine] until an emitted item satisfies the given + * [predicate], ignoring any items received that did _not_ match the predicate. + * + * @param T The type of items emitted by the turbine. + * @param R The type of the resulting value returned by the satisfied predicate. + * @param predicateDescription An optional description of [predicate] to include in the message of + * [AssertionError], if thrown (for example, "String can be parsed as an Int"). + * @param onIgnoredItem An optional callback invoked for each item that does not satisfy the + * predicate and, therefore, is ignored. + * @param predicate The condition to evaluate on each item. Returns + * [TurbinePredicateResult.Satisfied] with the mapped value if satisfied, or + * [TurbinePredicateResult.Unsatisfied] otherwise. + * @return The mapped value from the satisfied predicate. + * @throws AssertionError if the flow completes, fails, or times out before emitting an item that + * satisfies [predicate]. + */ +suspend inline fun ReceiveTurbine.awaitUntilItem( predicateDescription: String? = null, - onSkippedItem: (T) -> Unit = {}, - predicate: (T) -> Boolean, -): T { - var skippedItemCount = 0 + onIgnoredItem: (T) -> Unit = {}, + predicate: (T) -> TurbinePredicateResult, +): R { + val skippedItemCount = AtomicInteger(0) while (true) { when (val event = awaitEvent()) { Event.Complete -> fail( - "Flow completed normally after skipping $skippedItemCount items produced " + - "that didn't match the given predicate ($predicateDescription) " + - "but expected it to produce an item that matched the predicate" + "Flow completed normally after skipping ${skippedItemCount.get()} items produced " + + "that didn't satisfy the given predicate ($predicateDescription) " + + "but expected it to produce an item that satisfied the predicate" ) is Event.Error -> fail( - "Flow failed with exception ${event.throwable} after skipping $skippedItemCount " + - "items produced that didn't match the given predicate ($predicateDescription) " + - "but expected it to produce an item that matched the predicate" + "Flow failed with exception ${event.throwable} after skipping " + + "${skippedItemCount.get()} items produced " + + "that didn't satisfy the given predicate ($predicateDescription) " + + "but expected it to produce an item that satisfied the predicate" ) is Event.Item -> - if (predicate(event.value)) { - return event.value - } else { - onSkippedItem(event.value) - skippedItemCount++ + when (val predicateResult = predicate(event.value)) { + is TurbinePredicateResult.Satisfied -> return predicateResult.mappedValue + TurbinePredicateResult.Unsatisfied -> { + onIgnoredItem(event.value) + skippedItemCount.incrementAndGet() + } } } } } +/** + * Awaits events from the receiver [ReceiveTurbine] until an emitted item is an instance of type [U] + * , ignoring any items received that were not of type [U]. + * + * @param T The base type of items emitted by the turbine. + * @param U The specific expected type of item to wait for. + * @param onIgnoredItem An optional callback invoked for each item that is not an instance of [U] + * and, therefore, is ignored. + * @return The first observed emitted item of [U]. + * @throws AssertionError if the flow completes, fails, or times out before emitting an item of type + * [U]. + */ suspend inline fun ReceiveTurbine.awaitUntilItemIsInstance( - onSkippedItem: (T) -> Unit = {}, -): U { - val item = awaitUntilItem("is instance of ${U::class.qualifiedName}", onSkippedItem) { it is U } - return item as U -} + onIgnoredItem: (T) -> Unit = {}, +): U = + awaitUntilItem("is instance of ${U::class.qualifiedName}", onIgnoredItem) { + when (it) { + is U -> TurbinePredicateResult.Satisfied(it) + else -> TurbinePredicateResult.Unsatisfied + } + } +/** + * Awaits a terminal error event from the receiver [ReceiveTurbine] and asserts that the thrown + * exception is of type [T]. + * + * @param T The expected type of the thrown exception. + * @param exceptionDescription An optional description of the expected exception to include in the + * message of [AssertionError], if thrown (for example, "status code 42"). + * @param validate An optional validation block to assert additional properties on the exception, + * which is called with the matching exception before it is returned. + * @return The caught exception of type [T]. + * @throws AssertionError if the flow emits an item, completes normally, or fails with an exception + * other than [T]. + */ suspend inline fun ReceiveTurbine<*>.awaitError( - exceptionDescriptionSuffix: String? = null, + exceptionDescription: String? = null, validate: (T) -> Unit = {} ): T { val expectedText = buildString { append(T::class.qualifiedName) - if (exceptionDescriptionSuffix !== null) { - append("with ") - append(exceptionDescriptionSuffix) + if (exceptionDescription !== null) { + append(" with ") + append(exceptionDescription) } } @@ -95,21 +172,30 @@ suspend inline fun ReceiveTurbine<*>.awaitError( return exception } +/** + * Awaits a terminal error event from the receiver [ReceiveTurbine] and asserts that the thrown + * exception is a [StatusException] with the specified gRPC status [code]. + * + * @param code The expected gRPC [Status.Code]. + * @param validate An optional validation block to assert additional properties on the + * [StatusException] before returning it. + * @return The caught [StatusException]. + * @throws AssertionError if the flow emits an item, completes normally, fails with an exception + * other than [StatusException], or fails with a [StatusException] whose code does not match [code]. + */ suspend inline fun ReceiveTurbine<*>.awaitStatusException( - code: Status.Code?, + code: Status.Code, validate: (StatusException) -> Unit = {} ): StatusException { - val exceptionDescriptionSuffix = if (code === null) null else "with code $code" + val exceptionDescriptionSuffix = "with code $code" val statusException = awaitError(exceptionDescriptionSuffix) - if (code !== null) { - val actualCode = statusException.status.code - if (actualCode != code) { - fail( - "Flow failed with StatusException (as expected) but with the wrong code: " + - "expected $code but got $actualCode" - ) - } + val actualCode = statusException.status.code + if (actualCode != code) { + fail( + "Flow failed with StatusException (as expected) but with the wrong code: " + + "got $actualCode but expected $code" + ) } validate(statusException) From ec2fc7dc91cbe5352d21a2aead3e9694aeb19d42 Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Thu, 14 May 2026 03:08:19 -0400 Subject: [PATCH 2/8] RealtimeQueryManager.kt added, with support for deduping identical queries --- .../core/DataConnectBidiConnectStream.kt | 26 ++- .../dataconnect/core/DataConnectGrpcClient.kt | 48 ++---- .../core/FirebaseDataConnectImpl.kt | 31 +++- .../core/RealtimeQuerySubscriptionImpl.kt | 2 +- .../querymgr/RealtimeQueryManager.kt | 154 ++++++++++++++++++ 5 files changed, 216 insertions(+), 45 deletions(-) create mode 100644 firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/querymgr/RealtimeQueryManager.kt diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectBidiConnectStream.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectBidiConnectStream.kt index 3ad7814dafb..aeea6ba1cfd 100644 --- a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectBidiConnectStream.kt +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectBidiConnectStream.kt @@ -25,6 +25,7 @@ import com.google.protobuf.Struct import google.firebase.dataconnect.proto.ExecuteRequest import google.firebase.dataconnect.proto.GraphqlError as GraphqlErrorProto import google.firebase.dataconnect.proto.GraphqlResponseExtensions.DataConnectProperties as DataConnectPropertiesProto +import google.firebase.dataconnect.proto.ResumeRequest import google.firebase.dataconnect.proto.StreamRequest as StreamRequestProto import google.firebase.dataconnect.proto.StreamResponse as StreamResponseProto import kotlinx.coroutines.CoroutineScope @@ -48,6 +49,8 @@ import kotlinx.coroutines.flow.shareIn import kotlinx.coroutines.flow.transformWhile import kotlinx.coroutines.flow.update import kotlinx.coroutines.job +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock /** * Manages a bidirectional gRPC stream for Data Connect operations. @@ -169,7 +172,7 @@ internal class DataConnectBidiConnectStream( State.Closed -> error("DataConnectBidiConnectStream.close() has been called [rptkgcfzyz]") } - val streamRequest = + fun subscribeStreamRequest(): StreamRequestProto = StreamRequestProto.newBuilder().let { streamRequest -> streamRequest.setRequestId(requestId) streamRequest.setSubscribe( @@ -182,16 +185,35 @@ internal class DataConnectBidiConnectStream( streamRequest.build() } + fun resumeStreamRequest(): StreamRequestProto = + StreamRequestProto.newBuilder().let { streamRequest -> + streamRequest.setRequestId(requestId) + streamRequest.setResume(ResumeRequest.getDefaultInstance()) + streamRequest.build() + } + val outgoingRequests = streams.outgoingRequests val incomingResponses = streams.incomingResponses val completedResponse = streams.completedResponse + val subscribedMutex = Mutex() + var subscribed = false return incomingResponses .onSubscription { emit(IncomingResponse.Subscribed) } .transformWhile { incomingResponse -> when (incomingResponse) { is IncomingResponse.Subscribed -> { - val sendResult = outgoingRequests.trySend(streamRequest) + val sendResult = + subscribedMutex.withLock { + val streamRequest = + if (subscribed) { + resumeStreamRequest() + } else { + subscribed = true + subscribeStreamRequest() + } + outgoingRequests.trySend(streamRequest) + } when { sendResult.isSuccess -> true sendResult.isClosed -> false diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectGrpcClient.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectGrpcClient.kt index e6c2e5253d2..74d3a0c2ad7 100644 --- a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectGrpcClient.kt +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectGrpcClient.kt @@ -27,10 +27,6 @@ import com.google.protobuf.Struct import google.firebase.dataconnect.proto.GraphqlError import io.grpc.Status import io.grpc.StatusException -import kotlinx.coroutines.NonCancellable -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.withContext internal class DataConnectGrpcClient( private val grpcRPCs: DataConnectGrpcRPCs, @@ -99,43 +95,19 @@ internal class DataConnectGrpcClient( } @ExperimentalRealtimeQueries - fun connect( + suspend fun connect( + streamId: String, requestId: String, - operationName: String, - variables: Struct, callerSdkType: FirebaseDataConnect.CallerSdkType, - ): Flow = flow { - val connection = - grpcRPCs.retryOnGrpcUnauthenticatedError(requestId, "connect") { authToken, appCheckToken -> - connect( - requestId, - callerSdkType, - authToken, - appCheckToken, - ) - } - - try { - val flow = - connection.subscribe( - requestId = requestId, - operationName = operationName, - variables = variables, - ) - - flow.collect { executeResponse: DataConnectBidiConnectStream.ExecuteResponse -> - emit( - OperationResult( - data = executeResponse.data, - errors = executeResponse.errors, - source = DataSource.SERVER, - ) - ) - } - } finally { - withContext(NonCancellable) { connection.close() } + ): DataConnectBidiConnectStream = + grpcRPCs.retryOnGrpcUnauthenticatedError(requestId, "connect") { authToken, appCheckToken -> + connect( + requestId, + callerSdkType, + authToken, + appCheckToken, + ) } - } private suspend inline fun T.retryOnGrpcUnauthenticatedError( requestId: String, diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt index 5bd785b4e9b..487a9d3a1ee 100644 --- a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt @@ -35,6 +35,7 @@ import com.google.firebase.dataconnect.isDefaultHost import com.google.firebase.dataconnect.querymgr.LiveQueries import com.google.firebase.dataconnect.querymgr.LiveQuery import com.google.firebase.dataconnect.querymgr.QueryManager +import com.google.firebase.dataconnect.querymgr.RealtimeQueryManager import com.google.firebase.dataconnect.querymgr.RegisteredDataDeserializer import com.google.firebase.dataconnect.util.AlphanumericStringUtil.toAlphaNumericString import com.google.firebase.dataconnect.util.CoroutineUtils.createSupervisorCoroutineScope @@ -77,6 +78,7 @@ internal interface FirebaseDataConnectInternal : FirebaseDataConnect { val grpcClient: DataConnectGrpcClient val grpcRPCs: DataConnectGrpcRPCs val queryManager: QueryManager + @OptIn(ExperimentalRealtimeQueries::class) val realtimeQueryManager: RealtimeQueryManager suspend fun awaitAuthReady() suspend fun awaitAppCheckReady() @@ -168,11 +170,15 @@ internal class FirebaseDataConnectImpl( data class New(val emulatorSettings: EmulatedServiceSettings?) : State { constructor() : this(null) } - data class Initialized( + + data class Initialized + constructor( val grpcRPCs: DataConnectGrpcRPCs, val grpcClient: DataConnectGrpcClient, - val queryManager: QueryManager + val queryManager: QueryManager, + @OptIn(ExperimentalRealtimeQueries::class) val realtimeQueryManager: RealtimeQueryManager, ) : State + data class Closing(val grpcRPCs: DataConnectGrpcRPCs, val closeJob: Deferred) : State object Closed : State } @@ -186,6 +192,10 @@ internal class FirebaseDataConnectImpl( override val queryManager: QueryManager get() = initialize().queryManager + @OptIn(ExperimentalRealtimeQueries::class) + override val realtimeQueryManager: RealtimeQueryManager + get() = initialize().realtimeQueryManager + private fun initialize(): State.Initialized { val newState = state.updateAndGet { currentState -> @@ -194,7 +204,11 @@ internal class FirebaseDataConnectImpl( val grpcRPCs = createDataConnectGrpcRPCs(currentState.emulatorSettings) val grpcClient = createDataConnectGrpcClient(grpcRPCs) val queryManager = createQueryManager(grpcClient) - State.Initialized(grpcRPCs, grpcClient, queryManager) + + @OptIn(ExperimentalRealtimeQueries::class) + val realtimeQueryManager = createRealtimeQueryManager(grpcClient) + @OptIn(ExperimentalRealtimeQueries::class) + State.Initialized(grpcRPCs, grpcClient, queryManager, realtimeQueryManager) } is State.Initialized -> currentState is State.Closing -> currentState @@ -353,6 +367,15 @@ internal class FirebaseDataConnectImpl( return QueryManager(liveQueries) } + @ExperimentalRealtimeQueries + private fun createRealtimeQueryManager(grpcClient: DataConnectGrpcClient): RealtimeQueryManager = + RealtimeQueryManager( + grpcClient = grpcClient, + coroutineScope = coroutineScope, + idStringGenerator = idStringGenerator, + logger = Logger("RealtimeQueryManager").apply { "created by ${logger.nameWithId}" }, + ) + override fun useEmulator(host: String, port: Int): Unit = runBlocking { state.update { currentState -> when (currentState) { @@ -383,7 +406,7 @@ internal class FirebaseDataConnectImpl( } emulatorInfo.servicesList.forEachIndexed { index, serviceInfo -> logger.debug { - "[rid=$requestId] service #${index+1}:" + + "[rid=$requestId] service #${index + 1}:" + " serviceId=${serviceInfo.serviceId}" + " connectionString=${serviceInfo.connectionString}" } diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/RealtimeQuerySubscriptionImpl.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/RealtimeQuerySubscriptionImpl.kt index 4e4991adeb6..cd1a6d74187 100644 --- a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/RealtimeQuerySubscriptionImpl.kt +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/RealtimeQuerySubscriptionImpl.kt @@ -47,7 +47,7 @@ internal class RealtimeQuerySubscriptionImpl( val requestId = query.dataConnect.idStringGenerator.next("rid") val connectionFlow = - dataConnect.grpcClient.connect( + dataConnect.realtimeQueryManager.subscribe( requestId = requestId, operationName = operationName, variables = diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/querymgr/RealtimeQueryManager.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/querymgr/RealtimeQueryManager.kt new file mode 100644 index 00000000000..85e0abcb00f --- /dev/null +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/querymgr/RealtimeQueryManager.kt @@ -0,0 +1,154 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.firebase.dataconnect.querymgr + +import com.google.firebase.dataconnect.DataSource +import com.google.firebase.dataconnect.ExperimentalRealtimeQueries +import com.google.firebase.dataconnect.FirebaseDataConnect.CallerSdkType +import com.google.firebase.dataconnect.core.DataConnectBidiConnectStream +import com.google.firebase.dataconnect.core.DataConnectGrpcClient +import com.google.firebase.dataconnect.core.Logger +import com.google.firebase.dataconnect.util.CoroutineUtils.createChildSupervisorScope +import com.google.firebase.dataconnect.util.IdStringGenerator +import com.google.firebase.dataconnect.util.ImmutableByteArray +import com.google.firebase.dataconnect.util.ProtoUtil.calculateSha512 +import com.google.protobuf.Struct +import java.util.concurrent.atomic.AtomicReference +import kotlinx.coroutines.CoroutineName +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.CoroutineStart +import kotlinx.coroutines.Deferred +import kotlinx.coroutines.async +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.job +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock + +@ExperimentalRealtimeQueries +internal class RealtimeQueryManager( + private val grpcClient: DataConnectGrpcClient, + coroutineScope: CoroutineScope, + private val idStringGenerator: IdStringGenerator, + private val logger: Logger, +) { + + private var state = AtomicReference(State.Disconnected) + + private val coroutineScope = + coroutineScope.createChildSupervisorScope(logger).also { + it.coroutineContext.job.invokeOnCompletion { state.set(State.Closed) } + } + + suspend fun subscribe( + requestId: String, + operationName: String, + variables: Struct, + callerSdkType: CallerSdkType, + ): Flow { + val connection = ensureConnected(requestId, callerSdkType) + + val coroutineName = "${logger.nameWithId}-subscribe(rid=$requestId)[ecpvdvmzvj]" + val job = + coroutineScope.async(CoroutineName(coroutineName)) { + connection.subscribe(requestId = requestId, operationName = operationName, variables) + } + + return job.await() + } + + // NOTE: This method MUST be called on a coroutine running in this.coroutineScope. + private suspend fun State.Connected.subscribe( + requestId: String, + operationName: String, + variables: Struct, + ): Flow { + // calculateSha512() is a CPU intensive operation that should NOT be performed on the main + // thread. This is the first reason why this method assumes it's running in this.coroutineScope. + val queryId = variables.calculateSha512(preamble = operationName) + + // Acquiring the lock by an arbitrary thread could result in priority inversion. This is the + // second reason why this method assumes it's running in this.coroutineScope: control over the + // thread that acquires the lock. + mutex.withLock { + return flowByQueryId.getOrPut(queryId) { + val executeResponseFlow = stream.subscribe(requestId, operationName, variables) + + executeResponseFlow.map { executeResponse -> + DataConnectGrpcClient.OperationResult( + data = executeResponse.data, + errors = executeResponse.errors, + source = DataSource.SERVER, + ) + } + } + } + } + + private suspend fun ensureConnected( + requestId: String, + callerSdkType: CallerSdkType + ): State.Connected { + while (true) { + val currentState = state.get() + + val newState = + when (currentState) { + State.Disconnected -> + State.Connecting( + coroutineScope.async(start = CoroutineStart.LAZY) { + grpcClient.connect( + streamId = idStringGenerator.next("con"), + requestId = requestId, + callerSdkType = callerSdkType, + ) + } + ) + is State.Connecting -> { + val stream = currentState.job.await() + State.Connected(stream) + } + is State.Connected -> return currentState + State.Closed -> error("${logger.nameWithId} has been closed") + } + + state.compareAndSet(currentState, newState) + } + } + + private sealed interface State { + object Disconnected : State { + override fun toString() = "Disconnected" + } + + class Connecting(val job: Deferred) : State { + override fun toString() = "Connecting" + } + + class Connected(val stream: DataConnectBidiConnectStream) : State { + val mutex = Mutex() + val flowByQueryId: + MutableMap> = + mutableMapOf() + override fun toString() = "Connected" + } + + object Closed : State { + override fun toString() = "Closed" + } + } +} From 3eaedd76abd7427822b0df4dd24deceab9c5e604 Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Thu, 14 May 2026 03:09:08 -0400 Subject: [PATCH 3/8] RealtimeQuerySubscriptionImplUnitTest.kt added --- .../core/DataConnectGrpcRPCsUnitTest.kt | 8 +- .../RealtimeQuerySubscriptionImplUnitTest.kt | 456 ++++++++++++++++++ .../testutil/OperationNameVariablesPair.kt | 22 + 3 files changed, 480 insertions(+), 6 deletions(-) create mode 100644 firebase-dataconnect/src/test/kotlin/com/google/firebase/dataconnect/core/RealtimeQuerySubscriptionImplUnitTest.kt create mode 100644 firebase-dataconnect/testutil/src/main/kotlin/com/google/firebase/dataconnect/testutil/OperationNameVariablesPair.kt diff --git a/firebase-dataconnect/src/test/kotlin/com/google/firebase/dataconnect/core/DataConnectGrpcRPCsUnitTest.kt b/firebase-dataconnect/src/test/kotlin/com/google/firebase/dataconnect/core/DataConnectGrpcRPCsUnitTest.kt index 6bb99626d5c..276812183e0 100644 --- a/firebase-dataconnect/src/test/kotlin/com/google/firebase/dataconnect/core/DataConnectGrpcRPCsUnitTest.kt +++ b/firebase-dataconnect/src/test/kotlin/com/google/firebase/dataconnect/core/DataConnectGrpcRPCsUnitTest.kt @@ -32,6 +32,7 @@ import com.google.firebase.dataconnect.testutil.CleanupsRule import com.google.firebase.dataconnect.testutil.DataConnectLogLevelRule import com.google.firebase.dataconnect.testutil.DataConnectPath import com.google.firebase.dataconnect.testutil.InProcessDataConnectGrpcStreamingServer +import com.google.firebase.dataconnect.testutil.OperationNameVariablesPair import com.google.firebase.dataconnect.testutil.RandomSeedTestRule import com.google.firebase.dataconnect.testutil.awaitUntilItemIsInstance import com.google.firebase.dataconnect.testutil.newMockLogger @@ -495,15 +496,10 @@ class DataConnectGrpcRPCsUnitTest { private val propTestConfig = PropTestConfig(iterations = 50, edgeConfig = EdgeConfig(edgecasesGenerationProbability = 0.2)) -data class OperationNameVariablesPair( - val operationName: String, - val variables: StructProto, -) - private fun operationNameVariablesPairArb( operationName: Arb = Arb.dataConnect.operationName(), variables: Arb = Arb.proto.struct(), -): Arb = +): Arb> = Arb.bind(operationName, variables.map { it.struct }, ::OperationNameVariablesPair) private fun StructProto.toExecuteQueryResponse(): ExecuteQueryResponse = diff --git a/firebase-dataconnect/src/test/kotlin/com/google/firebase/dataconnect/core/RealtimeQuerySubscriptionImplUnitTest.kt b/firebase-dataconnect/src/test/kotlin/com/google/firebase/dataconnect/core/RealtimeQuerySubscriptionImplUnitTest.kt new file mode 100644 index 00000000000..2703679a947 --- /dev/null +++ b/firebase-dataconnect/src/test/kotlin/com/google/firebase/dataconnect/core/RealtimeQuerySubscriptionImplUnitTest.kt @@ -0,0 +1,456 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@file:OptIn(ExperimentalRealtimeQueries::class) + +package com.google.firebase.dataconnect.core + +import android.content.Context.CONNECTIVITY_SERVICE +import android.net.ConnectivityManager +import androidx.test.ext.junit.runners.AndroidJUnit4 +import app.cash.turbine.ReceiveTurbine +import app.cash.turbine.test +import app.cash.turbine.turbineScope +import com.google.firebase.dataconnect.DataConnectSettings +import com.google.firebase.dataconnect.ExperimentalRealtimeQueries +import com.google.firebase.dataconnect.FirebaseDataConnect.CallerSdkType +import com.google.firebase.dataconnect.QueryRef +import com.google.firebase.dataconnect.testutil.CleanupsRule +import com.google.firebase.dataconnect.testutil.FirebaseAppUnitTestingRule +import com.google.firebase.dataconnect.testutil.InProcessDataConnectGrpcStreamingServer +import com.google.firebase.dataconnect.testutil.InProcessDataConnectGrpcStreamingServer.Event.ConnectRpcStarted +import com.google.firebase.dataconnect.testutil.InProcessDataConnectGrpcStreamingServer.Event.StreamRequestReceived +import com.google.firebase.dataconnect.testutil.OperationNameVariablesPair +import com.google.firebase.dataconnect.testutil.RandomSeedTestRule +import com.google.firebase.dataconnect.testutil.TurbinePredicateResult +import com.google.firebase.dataconnect.testutil.UnavailableDeferred +import com.google.firebase.dataconnect.testutil.awaitError +import com.google.firebase.dataconnect.testutil.awaitUntilItem +import com.google.firebase.dataconnect.testutil.awaitUntilItemIsInstance +import com.google.firebase.dataconnect.testutil.property.arbitrary.dataConnect +import com.google.firebase.dataconnect.testutil.registerDataConnectKotestPrinters +import com.google.firebase.dataconnect.testutil.shouldBe +import com.google.firebase.dataconnect.testutil.shouldContainWithNonAbuttingTextIgnoringCase +import com.google.firebase.dataconnect.util.IdStringGenerator +import com.google.firebase.dataconnect.util.ProtoUtil.encodeToStruct +import google.firebase.dataconnect.proto.StreamRequest +import google.firebase.dataconnect.proto.StreamRequest.RequestKindCase +import google.firebase.dataconnect.proto.StreamResponse +import io.kotest.assertions.assertSoftly +import io.kotest.assertions.print.print +import io.kotest.assertions.withClue +import io.kotest.common.DelicateKotest +import io.kotest.matchers.collections.shouldBeIn +import io.kotest.matchers.result.shouldBeSuccess +import io.kotest.matchers.shouldBe +import io.kotest.matchers.types.shouldBeInstanceOf +import io.kotest.property.Arb +import io.kotest.property.RandomSource +import io.kotest.property.arbitrary.Codepoint +import io.kotest.property.arbitrary.arbitrary +import io.kotest.property.arbitrary.az +import io.kotest.property.arbitrary.distinct +import io.kotest.property.arbitrary.enum +import io.kotest.property.arbitrary.int +import io.kotest.property.arbitrary.map +import io.kotest.property.arbitrary.string +import io.mockk.every +import io.mockk.mockk +import io.mockk.spyk +import kotlin.random.Random +import kotlinx.coroutines.asExecutor +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.StandardTestDispatcher +import kotlinx.coroutines.test.TestScope +import kotlinx.coroutines.test.runTest +import kotlinx.serialization.Serializable +import kotlinx.serialization.serializer +import org.junit.Before +import org.junit.Rule +import org.junit.Test +import org.junit.rules.TestName +import org.junit.runner.RunWith + +@RunWith(AndroidJUnit4::class) +class RealtimeQuerySubscriptionImplUnitTest { + + @get:Rule val cleanups = CleanupsRule() + @get:Rule val testName = TestName() + + @get:Rule(order = Int.MIN_VALUE) val randomSeedTestRule = RandomSeedTestRule() + + private val rs: RandomSource by randomSeedTestRule.rs + + @get:Rule + val firebaseAppFactory = + FirebaseAppUnitTestingRule( + appNameKey = "ex2bk4bks2", + applicationIdKey = "2f2c3gdydn", + projectIdKey = "kzbqx23hhn" + ) + + @Before + fun registerPrinters() { + registerDataConnectKotestPrinters() + } + + @Test + fun `collecting flow after DataConnect is closed throws`() = runTest { + val subscription = querySubscription() + subscription.query.dataConnect.suspendingClose() + + subscription.flow.test { + awaitError { + it.message shouldContainWithNonAbuttingTextIgnoringCase "closed" + } + } + } + + @Test + fun `collecting flow sends init StreamRequest first`() = runTest { + val server = runningInProcessDataConnectServer() + val dataConnect = dataConnect(server) + val subscription = querySubscription(dataConnect) + + server.events.test { + backgroundScope.launch { subscription.flow.collect() } + + val event: StreamRequestReceived = awaitUntilItemIsInstance() + event.streamRequest.let { request -> + withClue("request=${request.print().value}") { + request.requestId shouldBe "init" + request.requestKindCase shouldBe RequestKindCase.REQUESTKIND_NOT_SET + } + } + + cancelAndIgnoreRemainingEvents() + } + } + + @Test + fun `collecting flow sends expected subscribe StreamRequest`() = runTest { + val server = runningInProcessDataConnectServer() + val subscribeRequestId = Arb.dataConnect.requestId().sample() + val idStringGenerator = idStringGeneratorThatGeneratesRequestId(subscribeRequestId) + val dataConnect = dataConnect(server, idStringGenerator) + val subscription = querySubscription(dataConnect) + + server.events.test { + backgroundScope.launch { subscription.flow.collect() } + awaitUntilInitStreamRequest() + + val event: StreamRequestReceived = awaitUntilItemIsInstance() + event.streamRequest.shouldBeSubscribeRequestFor(subscription.query, subscribeRequestId) + + cancelAndIgnoreRemainingEvents() + } + } + + @Test + fun `flow correctly decodes StreamResponse messages`() = runTest { + val server = runningInProcessDataConnectServer() + val subscribeRequestId = Arb.dataConnect.requestId().sample() + val idStringGenerator = idStringGeneratorThatGeneratesRequestId(subscribeRequestId) + val dataConnect = dataConnect(server, idStringGenerator) + val subscription = querySubscription(dataConnect) + + turbineScope { + val serverCollector = server.events.testIn(backgroundScope, name = "serverCollector") + val clientCollector = subscription.flow.testIn(backgroundScope, name = "clientCollector") + + val responseSender = + serverCollector.awaitUntilItemIsInstance<_, ConnectRpcStarted>().responseObserver + serverCollector.awaitUntilInitStreamRequest() + serverCollector.awaitUntilStreamRequestWithRequestId(subscribeRequestId) + + val testDataArb = testDataArb() + repeat(5) { + val testData = testDataArb.sample() + + responseSender.onNext( + StreamResponse.newBuilder() + .setRequestId(subscribeRequestId) + .setData(encodeToStruct(testData)) + .build() + ) + + val querySubscriptionResult = clientCollector.awaitItem() + withClue(querySubscriptionResult.print().value) { + val queryResult = querySubscriptionResult.result.shouldBeSuccess() + queryResult.data shouldBe testData + } + } + + serverCollector.cancelAndIgnoreRemainingEvents() + clientCollector.cancelAndIgnoreRemainingEvents() + } + } + + @Test + fun `flows for distinct operation name and variables pairs share same connection`() = runTest { + val requestIds = distinctRequestIdArb().sampleList(10) + val subscriptionParameters = + distinctOperationNameVariablesPairWithRepeatedComponentsArb().sampleList(requestIds.size) + val idStringGenerator = idStringGeneratorThatGeneratesRequestIds(requestIds) + val server = runningInProcessDataConnectServer() + val dataConnect = dataConnect(server, idStringGenerator) + val subscriptions = + subscriptionParameters.map { querySubscription(dataConnect, it.operationName, it.variables) } + + turbineScope { + val serverCollector = server.events.testIn(backgroundScope, name = "serverCollector") + val clientCollectors = + subscriptions.mapIndexed { index, subscription -> + subscription.flow.testIn(backgroundScope, name = "clientCollector$index") + } + + val connection: ConnectRpcStarted = serverCollector.awaitUntilItemIsInstance() + serverCollector.awaitUntilInitStreamRequest() + val unacknowledgedRequestIds = requestIds.toMutableSet() + while (unacknowledgedRequestIds.isNotEmpty()) { + serverCollector.awaitItem().shouldBeInstanceOf().let { + it.connectionId shouldBe connection.connectionId + it.streamRequest.requestId shouldBeIn unacknowledgedRequestIds + unacknowledgedRequestIds.remove(it.streamRequest.requestId) + } + } + + serverCollector.cancelAndIgnoreRemainingEvents() + clientCollectors.forEach { it.cancelAndIgnoreRemainingEvents() } + } + } + + @Test + fun `flows for identical operation name and variables pairs share same request`() = runTest { + val requestIds = distinctRequestIdArb().sampleList(10) + val operationName = Arb.dataConnect.operationName().sample() + val variables = testVariablesArb().sample() + val idStringGenerator = idStringGeneratorThatGeneratesRequestIds(requestIds) + val server = runningInProcessDataConnectServer() + val dataConnect = dataConnect(server, idStringGenerator) + val subscriptions = + List(requestIds.size) { querySubscription(dataConnect, operationName, variables) } + + turbineScope { + val serverCollector = server.events.testIn(backgroundScope, name = "serverCollector") + val clientCollectors = + subscriptions.mapIndexed { index, subscription -> + subscription.flow.testIn(backgroundScope, name = "clientCollector$index") + } + + serverCollector.awaitUntilInitStreamRequest() + val subscribeRequest: StreamRequestReceived = serverCollector.awaitUntilItemIsInstance() + subscribeRequest.streamRequest.requestId shouldBeIn requestIds + subscribeRequest.streamRequest.requestKindCase shouldBe RequestKindCase.SUBSCRIBE + repeat(subscriptions.size - 1) { + val resumeRequest: StreamRequestReceived = serverCollector.awaitUntilItemIsInstance() + resumeRequest.connectionId shouldBe subscribeRequest.connectionId + resumeRequest.streamRequest.requestId shouldBe subscribeRequest.streamRequest.requestId + resumeRequest.streamRequest.requestKindCase shouldBe RequestKindCase.RESUME + } + + serverCollector.cancelAndIgnoreRemainingEvents() + clientCollectors.forEach { it.cancelAndIgnoreRemainingEvents() } + } + } + + private fun runningInProcessDataConnectServer(): InProcessDataConnectGrpcStreamingServer { + val server = InProcessDataConnectGrpcStreamingServer() + cleanups.register(server) + server.open() + return server + } + + private fun TestScope.dataConnect( + server: InProcessDataConnectGrpcStreamingServer, + idStringGenerator: IdStringGenerator? = null, + ): FirebaseDataConnectImpl = dataConnect(server.port, idStringGenerator) + + private fun TestScope.dataConnect( + serverLocalBindPort: Int? = null, + idStringGenerator: IdStringGenerator? = null, + ): FirebaseDataConnectImpl { + val executor = StandardTestDispatcher(testScheduler).asExecutor() + + val settings: DataConnectSettings = + if (serverLocalBindPort === null) { + Arb.dataConnect.dataConnectSettings().sample() + } else { + DataConnectSettings("localhost:$serverLocalBindPort", sslEnabled = false) + } + + return FirebaseDataConnectImpl( + context = + mockk(name = "FirebaseDataConnectImpl.context") { + every { getSystemService(CONNECTIVITY_SERVICE) } returns + mockk(relaxed = true) + }, + app = firebaseAppFactory.newInstance(), + projectId = Arb.dataConnect.projectId().sample(), + config = Arb.dataConnect.connectorConfig().sample(), + blockingExecutor = executor, + nonBlockingExecutor = executor, + deferredAuthProvider = UnavailableDeferred(), + deferredAppCheckProvider = UnavailableDeferred(), + creator = mockk(name = "FirebaseDataConnectImpl.creator", relaxed = true), + settings = settings, + idStringGenerator = idStringGenerator ?: IdStringGenerator(Random.Default), + ) + } + + private fun idStringGeneratorThatGeneratesRequestId(requestId: String): IdStringGenerator = + idStringGeneratorThatGeneratesRequestIds(listOf(requestId)) + + private fun idStringGeneratorThatGeneratesRequestIds( + requestIds: List + ): IdStringGenerator = + spyk(IdStringGenerator(Random.Default), name = "IdStringGenerator for ${testName.methodName}") { + every { next("rid") } + .returnsMany(requestIds) + .andThenThrows( + IllegalStateException( + "I only know how to generate ${requestIds.size} requestIds, " + + "and I've already generated all of them [gpap8mjgg5]" + ) + ) + } + + private fun TestScope.querySubscription( + dataConnect: FirebaseDataConnectImpl? = null, + operationName: String? = null, + variables: TestVariables? = null, + ): RealtimeQuerySubscriptionImpl = + queryRef(dataConnect, operationName, variables).subscribe() + + private fun TestScope.queryRef( + dataConnect: FirebaseDataConnectImpl? = null, + operationName: String? = null, + variables: TestVariables? = null, + ): RealtimeQueryRefImpl = + RealtimeQueryRefImpl( + dataConnect = dataConnect ?: dataConnect(), + operationName = operationName ?: "opName_${alphabeticStringArb().sample()}", + variables = variables ?: testVariablesArb().sample(), + dataDeserializer = serializer(), + variablesSerializer = serializer(), + callerSdkType = Arb.enum().sample(), + dataSerializersModule = Arb.dataConnect.serializersModule().sample(), + variablesSerializersModule = Arb.dataConnect.serializersModule().sample(), + ) + + private fun Arb.sample(): T = sample(rs).value + + private fun Arb.sampleList(size: Int): List = List(size) { sample() } +} + +@Serializable private data class TestVariables(val stringValue: String) + +@Serializable private data class TestData(val intValue: Int) + +private fun alphabeticStringArb(): Arb = Arb.string(0..5, Codepoint.az()) + +private fun testVariablesArb(stringValue: Arb = alphabeticStringArb()): Arb = + stringValue.map(::TestVariables) + +private fun testDataArb(intValue: Arb = Arb.int()): Arb = intValue.map(::TestData) + +private suspend fun ReceiveTurbine + .awaitUntilInitStreamRequest(): StreamRequestReceived = + withClue("awaiting 'init' StreamRequest message") { + awaitUntilItemIsInstance<_, StreamRequestReceived>().also { event -> + val streamRequest = event.streamRequest + withClue("request=${streamRequest.print().value}") { + assertSoftly { + streamRequest.requestId shouldBe "init" + streamRequest.requestKindCase shouldBe RequestKindCase.REQUESTKIND_NOT_SET + } + } + } + } + +private suspend fun ReceiveTurbine + .awaitUntilStreamRequestWithRequestId(requestId: String): StreamRequestReceived { + val predicateDescription = "StreamRequest with requestId=$requestId" + return withClue("awaiting $predicateDescription") { + awaitUntilItem(predicateDescription) { + when (it) { + is StreamRequestReceived -> + if (it.streamRequest.requestId == requestId) { + TurbinePredicateResult.Satisfied(it) + } else { + TurbinePredicateResult.Unsatisfied + } + else -> TurbinePredicateResult.Unsatisfied + } + } + } +} + +private fun StreamRequest.shouldBeSubscribeRequestFor( + queryRef: QueryRef, + expectedRequestId: String +): Unit = + withClue("StreamRequest=${print().value}") { + withClue("requestId") { requestId shouldBe expectedRequestId } + withClue("requestKindCase") { requestKindCase shouldBe RequestKindCase.SUBSCRIBE } + withClue("operationName") { subscribe.operationName shouldBe queryRef.operationName } + withClue("variables") { subscribe.variables shouldBe encodeToStruct(queryRef.variables) } + } + +private fun distinctRequestIdArb(): Arb = + @OptIn(DelicateKotest::class) Arb.dataConnect.requestId().distinct() + +/** + * Creates and returns an [Arb] that generates operation name/variables pairs such that both + * operation names and variables are recycled but _never_ in the same combination. + * + * Informally, the returned Arb generates a sequence like this: + * 1. (op1, vars1) + * 2. (op2, vars1) + * 3. (op2, vars2) + * 4. (op3, vars1) + * 5. (op3, vars2) + * 6. (op3, vars3) + * 7. and so on... + * + * Therefore, tests are recommended to generate a bunch and then shuffle their order. + */ +private fun distinctOperationNameVariablesPairWithRepeatedComponentsArb( + operationName: Arb = + @OptIn(DelicateKotest::class) Arb.dataConnect.operationName().distinct(), + variables: Arb = @OptIn(DelicateKotest::class) testVariablesArb().distinct(), +): Arb> { + val producedOperationNames = mutableSetOf() + val producedVariables = mutableListOf() + var currentVariables = producedVariables.iterator() + + return arbitrary { + if (!currentVariables.hasNext()) { + val newOperationName = operationName.bind() + check(newOperationName !in producedOperationNames) + val newVariables = variables.bind() + check(newVariables !in producedVariables) + producedOperationNames.add(newOperationName) + producedVariables.add(newVariables) + currentVariables = producedVariables.iterator() + } + + val operationName = producedOperationNames.last() + val variables = currentVariables.next() + OperationNameVariablesPair(operationName, variables) + } +} diff --git a/firebase-dataconnect/testutil/src/main/kotlin/com/google/firebase/dataconnect/testutil/OperationNameVariablesPair.kt b/firebase-dataconnect/testutil/src/main/kotlin/com/google/firebase/dataconnect/testutil/OperationNameVariablesPair.kt new file mode 100644 index 00000000000..d173967d9ee --- /dev/null +++ b/firebase-dataconnect/testutil/src/main/kotlin/com/google/firebase/dataconnect/testutil/OperationNameVariablesPair.kt @@ -0,0 +1,22 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.firebase.dataconnect.testutil + +data class OperationNameVariablesPair( + val operationName: String, + val variables: T, +) From c1ea16f9cc51c3f9e24f04973917d997f7dead5b Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Thu, 14 May 2026 03:21:28 -0400 Subject: [PATCH 4/8] TurbineUtils.kt: replace AtomicInteger with Int, since there is only one coroutine accessing it --- .../google/firebase/dataconnect/testutil/TurbineUtils.kt | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/firebase-dataconnect/testutil/src/main/kotlin/com/google/firebase/dataconnect/testutil/TurbineUtils.kt b/firebase-dataconnect/testutil/src/main/kotlin/com/google/firebase/dataconnect/testutil/TurbineUtils.kt index 1f72770af90..2bac95c6538 100644 --- a/firebase-dataconnect/testutil/src/main/kotlin/com/google/firebase/dataconnect/testutil/TurbineUtils.kt +++ b/firebase-dataconnect/testutil/src/main/kotlin/com/google/firebase/dataconnect/testutil/TurbineUtils.kt @@ -22,7 +22,6 @@ import io.grpc.Status import io.grpc.StatusException import io.kotest.assertions.fail import io.kotest.assertions.print.print -import java.util.concurrent.atomic.AtomicInteger /** * Represents the result of evaluating a predicate on an item emitted from a [ReceiveTurbine]. @@ -75,20 +74,20 @@ suspend inline fun ReceiveTurbine.awaitUntilItem( onIgnoredItem: (T) -> Unit = {}, predicate: (T) -> TurbinePredicateResult, ): R { - val skippedItemCount = AtomicInteger(0) + var skippedItemCount = 0 while (true) { when (val event = awaitEvent()) { Event.Complete -> fail( - "Flow completed normally after skipping ${skippedItemCount.get()} items produced " + + "Flow completed normally after skipping $skippedItemCount items produced " + "that didn't satisfy the given predicate ($predicateDescription) " + "but expected it to produce an item that satisfied the predicate" ) is Event.Error -> fail( "Flow failed with exception ${event.throwable} after skipping " + - "${skippedItemCount.get()} items produced " + + "$skippedItemCount items produced " + "that didn't satisfy the given predicate ($predicateDescription) " + "but expected it to produce an item that satisfied the predicate" ) @@ -97,7 +96,7 @@ suspend inline fun ReceiveTurbine.awaitUntilItem( is TurbinePredicateResult.Satisfied -> return predicateResult.mappedValue TurbinePredicateResult.Unsatisfied -> { onIgnoredItem(event.value) - skippedItemCount.incrementAndGet() + skippedItemCount++ } } } From 8f1f589534e5f39c4a3ecd5562ced6d83230488e Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Thu, 14 May 2026 04:51:51 -0400 Subject: [PATCH 5/8] DataConnectBidiConnectStream.kt: fix flow structure bug --- .../core/DataConnectBidiConnectStream.kt | 203 ++++++++++++------ 1 file changed, 134 insertions(+), 69 deletions(-) diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectBidiConnectStream.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectBidiConnectStream.kt index aeea6ba1cfd..5e3c6fdb0fe 100644 --- a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectBidiConnectStream.kt +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectBidiConnectStream.kt @@ -21,6 +21,7 @@ import com.google.firebase.dataconnect.core.LoggerGlobals.debug import com.google.firebase.dataconnect.util.CoroutineUtils.createChildSupervisorScope import com.google.firebase.dataconnect.util.NullableReference import com.google.firebase.dataconnect.util.ProtoUtil.toCompactString +import com.google.protobuf.Empty import com.google.protobuf.Struct import google.firebase.dataconnect.proto.ExecuteRequest import google.firebase.dataconnect.proto.GraphqlError as GraphqlErrorProto @@ -40,7 +41,9 @@ import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.emitAll import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.mapNotNull import kotlinx.coroutines.flow.onCompletion @@ -172,84 +175,51 @@ internal class DataConnectBidiConnectStream( State.Closed -> error("DataConnectBidiConnectStream.close() has been called [rptkgcfzyz]") } - fun subscribeStreamRequest(): StreamRequestProto = - StreamRequestProto.newBuilder().let { streamRequest -> - streamRequest.setRequestId(requestId) - streamRequest.setSubscribe( - ExecuteRequest.newBuilder().let { executeRequest -> - executeRequest.setOperationName(operationName) - executeRequest.setVariables(variables) - executeRequest.build() - } - ) - streamRequest.build() - } + val subscriptionStateManager = + SubscriptionStateManager( + requestId = requestId, + operationName = operationName, + variables, + streams.outgoingRequests + ) - fun resumeStreamRequest(): StreamRequestProto = - StreamRequestProto.newBuilder().let { streamRequest -> - streamRequest.setRequestId(requestId) - streamRequest.setResume(ResumeRequest.getDefaultInstance()) - streamRequest.build() - } + return flow { + val subscription = subscriptionStateManager.Subscriber() - val outgoingRequests = streams.outgoingRequests - val incomingResponses = streams.incomingResponses - val completedResponse = streams.completedResponse - val subscribedMutex = Mutex() - var subscribed = false - - return incomingResponses - .onSubscription { emit(IncomingResponse.Subscribed) } - .transformWhile { incomingResponse -> - when (incomingResponse) { - is IncomingResponse.Subscribed -> { - val sendResult = - subscribedMutex.withLock { - val streamRequest = - if (subscribed) { - resumeStreamRequest() - } else { - subscribed = true - subscribeStreamRequest() + emitAll( + streams.incomingResponses + .onSubscription { emit(IncomingResponse.Subscribed) } + .transformWhile { incomingResponse -> + when (incomingResponse) { + is IncomingResponse.Subscribed -> subscription.onSubscribed() + is IncomingResponse.Message -> { + if (incomingResponse.streamResponse.requestId != requestId) { + true + } else { + val executeResponse = incomingResponse.streamResponse.toExecuteResponse() + if (executeResponse !== null) { + emit(executeResponse) } - outgoingRequests.trySend(streamRequest) + !incomingResponse.streamResponse.cancelled + } + } + is IncomingResponse.Completed -> { + false // NOTE: The downstream onCompletion() looks after throwing the exception. } - when { - sendResult.isSuccess -> true - sendResult.isClosed -> false - else -> - error( - "internal error xw3zdzycfq: outgoingRequests.trySend(streamRequest) " + - "was unable to enqueue the streamRequest; this should never happen because " + - "outgoingRequests is created with capacity=UNLIMITED (sendResult=$sendResult)" - ) } } - is IncomingResponse.Message -> { - if (incomingResponse.streamResponse.requestId != requestId) { - true - } else { - val executeResponse = incomingResponse.streamResponse.toExecuteResponse() - if (executeResponse !== null) { - emit(executeResponse) + .onCompletion { throwable -> + subscription.onCompleted() + + if (throwable === null) { + val completed = streams.completedResponse.mapNotNull { it.ref }.first() + if (completed.throwable !== null) { + throw completed.throwable } - !incomingResponse.streamResponse.cancelled } } - is IncomingResponse.Completed -> { - // NOTE: The downstream onCompletion() callback looks after throwing the exception. - false - } - } - } - .onCompletion { throwable -> - if (throwable === null) { - val completed = completedResponse.mapNotNull { it.ref }.first() - if (completed.throwable !== null) { - throw completed.throwable - } - } - } + ) + } } /** @@ -350,6 +320,101 @@ internal class DataConnectBidiConnectStream( object Subscribed : IncomingResponse } + private class SubscriptionStateManager( + requestId: String, + operationName: String, + variables: Struct, + private val outgoingRequests: SendChannel, + ) { + + val mutex = Mutex() + var subscriberCount = 0 + + inner class Subscriber { + private var subscribed = false + + suspend fun onSubscribed(): Boolean = + mutex.withLock { + val streamRequest = + if (subscriberCount == 0) { + subscribeStreamRequest + } else { + resumeStreamRequest + } + + val sendResult = outgoingRequests.trySend(streamRequest) + + when { + sendResult.isSuccess -> { + subscribed = true + subscriberCount++ + true + } + sendResult.isClosed -> false + else -> + error( + "internal error xw3zdzycfq: outgoingRequests.trySend(subscribe or resume) " + + "was unable to enqueue the streamRequest; this should never happen because " + + "outgoingRequests is created with capacity=UNLIMITED (sendResult=$sendResult)" + ) + } + } + + suspend fun onCompleted() { + mutex.withLock { + if (!subscribed) { + return + } + + subscribed = false + subscriberCount-- + check(subscriberCount >= 0) { + "internal error hpn3qsj746: subscriberCount should never be less than zero, " + + "but it is: $subscriberCount" + } + + if (subscriberCount == 0) { + val sendResult = outgoingRequests.trySend(cancelStreamRequest) + if (sendResult.isFailure && !sendResult.isClosed) { + error( + "internal error mxcsq556tv: outgoingRequests.trySend(cancel) " + + "was unable to enqueue the streamRequest; this should never happen because " + + "outgoingRequests is created with capacity=UNLIMITED (sendResult=$sendResult)" + ) + } + } + } + } + } + + private val subscribeStreamRequest = + StreamRequestProto.newBuilder().let { streamRequest -> + streamRequest.setRequestId(requestId) + streamRequest.setSubscribe( + ExecuteRequest.newBuilder().let { executeRequest -> + executeRequest.setOperationName(operationName) + executeRequest.setVariables(variables) + executeRequest.build() + } + ) + streamRequest.build() + } + + private val resumeStreamRequest = + StreamRequestProto.newBuilder().let { streamRequest -> + streamRequest.setRequestId(requestId) + streamRequest.setResume(ResumeRequest.getDefaultInstance()) + streamRequest.build() + } + + private val cancelStreamRequest = + StreamRequestProto.newBuilder().let { streamRequest -> + streamRequest.setRequestId(requestId) + streamRequest.setCancel(Empty.getDefaultInstance()) + streamRequest.build() + } + } + private companion object { fun StreamResponseProto.toExecuteResponse(): ExecuteResponse? = From d70d5b036fe920e6a79b03fb5f2e245342c2fb31 Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Thu, 14 May 2026 04:59:20 -0400 Subject: [PATCH 6/8] RealtimeTodo.md aedded --- firebase-dataconnect/RealtimeTodo.md | 57 ++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 firebase-dataconnect/RealtimeTodo.md diff --git a/firebase-dataconnect/RealtimeTodo.md b/firebase-dataconnect/RealtimeTodo.md new file mode 100644 index 00000000000..478b57a5bdc --- /dev/null +++ b/firebase-dataconnect/RealtimeTodo.md @@ -0,0 +1,57 @@ +# Realtime Query Subscription TODO List + +### TODO 1: Lack of Connection Health Monitoring / Reconnection + +* **File:** RealtimeQueryManager.kt +* **Severity:** `CRITICAL` + +#### Description + +Once `RealtimeQueryManager` successfully transitions to `State.Connected(stream)`, it remains in +this state permanently. If the underlying bidirectional gRPC connection is lost or the stream is +closed (due to network issues, server-side termination, or client-side close), the manager does not +detect this. Any future calls to `subscribe()` will read `State.Connected` and try to use the dead +stream, causing new subscriptions to silently hang or fail indefinitely without any reconnection +attempts. + +#### Recommendation + +Add connection health monitoring or detect when the stream has completed/failed, and transition +the manager's state back to `State.Disconnected` (or clean up resources) to allow subsequent +subscription calls to trigger a new connection. + +--- + +### TODO 2: Permanent Lock-out / Stuck in `Connecting` State on Connection Failure + +* **File:** RealtimeQueryManager.kt +* **Severity:** `CRITICAL` + +#### Description + +In `ensureConnected`, `currentState.job.await()` is called to wait for the lazy `Deferred` +connection job. If the connection attempt fails (e.g., due to a temporary network issue) and +throws an exception, the exception propagates out of the method. Because the exception is thrown +before the state is updated, the manager's state remains permanently stuck in +`State.Connecting(job)`. Any future connection attempts will call `await()` on the same failed +`Deferred` job, which immediately and permanently re-throws the same cached exception, making the +manager completely unusable until the app or SDK instance is restarted. + +### TODO 3: Memory and Resource Leak of Subscription Flows in `flowByQueryId` + +* **File:** RealtimeQueryManager.kt +* **Severity:** `HIGH` + +#### Description + +Active subscription flows are stored in the `flowByQueryId` map to deduplicate identical queries. +However, there is no mechanism to remove flows from this map when a subscription is cancelled, +completed, or when there are no active collectors left. This leads to an unbounded memory/resource +leak as the client subscribes to different queries over time. Furthermore, because the subscription +is never cleaned up, the backend stream may continue sending updates for cancelled subscriptions, +wasting bandwidth and server resources. + +#### Recommendation + +Implement a reference-counting mechanism or a cleanup callback upon flow completion to remove the +query from `flowByQueryId` once the active collector count drops to zero. From 4add6b16bbc410f72a087afc36f01a3e42898423 Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Thu, 14 May 2026 05:12:09 -0400 Subject: [PATCH 7/8] FirebaseDataConnectImpl.kt: fix log message that was never actually logged --- .../google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt index 487a9d3a1ee..63604a5f9e8 100644 --- a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt @@ -373,7 +373,7 @@ internal class FirebaseDataConnectImpl( grpcClient = grpcClient, coroutineScope = coroutineScope, idStringGenerator = idStringGenerator, - logger = Logger("RealtimeQueryManager").apply { "created by ${logger.nameWithId}" }, + logger = Logger("RealtimeQueryManager").apply { debug { "created by ${logger.nameWithId}" } }, ) override fun useEmulator(host: String, port: Int): Unit = runBlocking { From d2f80b3527b31567a3a09901f2b8bbce0cd8aace Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Thu, 14 May 2026 05:20:54 -0400 Subject: [PATCH 8/8] RealtimeTodo.md: update with bug discovered by gemini --- firebase-dataconnect/RealtimeTodo.md | 31 ++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/firebase-dataconnect/RealtimeTodo.md b/firebase-dataconnect/RealtimeTodo.md index 478b57a5bdc..759a2b4f652 100644 --- a/firebase-dataconnect/RealtimeTodo.md +++ b/firebase-dataconnect/RealtimeTodo.md @@ -55,3 +55,34 @@ wasting bandwidth and server resources. Implement a reference-counting mechanism or a cleanup callback upon flow completion to remove the query from `flowByQueryId` once the active collector count drops to zero. + +### TODO 3: Memory and Resource Leak of Subscription Flows in `flowByQueryId` + +* **File:** DataConnectBidiConnectStream.kt +* **Severity:** `HIGH` + +#### Description + +Late subscribers to a stream that has already completed (server-side) will hang indefinitely. +This occurs because `incomingResponses` is a `SharedFlow` with `replay = 0`. If the stream +completes, the `IncomingResponse.Completed` signal is emitted and lost for future subscribers. +These subscribers will then wait in `transformWhile` for new emissions that will never come. + +The current implementation of `onCompletion` does not prevent the hang for late subscribers. +Because `streams.incomingResponses` is a `SharedFlow` with `replay = 0`, if the stream has already +completed, the `IncomingResponse.Completed` signal is lost. A new subscriber will begin collecting +from `incomingResponses` and wait indefinitely for emissions that will never arrive. The +`onCompletion` block is only executed *after* the flow collection completes, so it cannot resolve a +hang that occurs *during* the collection process. + +Discovered by gemini code assist: +https://github.com/firebase/firebase-android-sdk/pull/8158#discussion_r3240286966 + +#### Recommendation + +You should check `streams.completedResponse` before starting the flow collection to ensure the +stream is still active. + +To address this, you should check the state of `streams.completedResponse` *before* or *at the +start* of the flow collection (e.g., inside the `flow { ... }` builder) to verify if the stream is +already finished, and emit the completion signal or throw the cached exception immediately if it is.