From 56cec8152a069b181fb30a86ca9b9720a0391659 Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Mon, 11 May 2026 18:02:28 -0400 Subject: [PATCH 01/10] Move some test utility classes from `androidTest` to `test`: InProcessDataConnectGrpcStreamingServer.kt and TurbineUtils.kt This enables these classes to be used from both `androidTest` to `test` (thanks to the `@SharedWithAndroidTest` annotation and the associated gradle plugin) instead of just in `androidTest`. A future test will make use of this. --- firebase-dataconnect/androidTestutil/androidTestutil.gradle.kts | 2 -- .../testutil/InProcessDataConnectGrpcStreamingServer.kt | 2 ++ .../com/google/firebase/dataconnect/testutil/TurbineUtils.kt | 0 firebase-dataconnect/testutil/testutil.gradle.kts | 2 ++ 4 files changed, 4 insertions(+), 2 deletions(-) rename firebase-dataconnect/src/{androidTest => test}/kotlin/com/google/firebase/dataconnect/testutil/InProcessDataConnectGrpcStreamingServer.kt (99%) rename firebase-dataconnect/{androidTestutil => testutil}/src/main/kotlin/com/google/firebase/dataconnect/testutil/TurbineUtils.kt (100%) diff --git a/firebase-dataconnect/androidTestutil/androidTestutil.gradle.kts b/firebase-dataconnect/androidTestutil/androidTestutil.gradle.kts index b4ebea9674f..c6f91df0e66 100644 --- a/firebase-dataconnect/androidTestutil/androidTestutil.gradle.kts +++ b/firebase-dataconnect/androidTestutil/androidTestutil.gradle.kts @@ -67,12 +67,10 @@ dependencies { implementation(libs.androidx.test.core) implementation(libs.androidx.test.junit) implementation(libs.auth0.jwt) - implementation(libs.grpc.api) implementation(libs.kotest.assertions) implementation(libs.kotest.property) implementation(libs.kotlinx.coroutines.core) implementation(libs.kotlinx.serialization.core) implementation(libs.kotlinx.serialization.json) implementation(libs.truth) - implementation(libs.turbine) } diff --git a/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/testutil/InProcessDataConnectGrpcStreamingServer.kt b/firebase-dataconnect/src/test/kotlin/com/google/firebase/dataconnect/testutil/InProcessDataConnectGrpcStreamingServer.kt similarity index 99% rename from firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/testutil/InProcessDataConnectGrpcStreamingServer.kt rename to firebase-dataconnect/src/test/kotlin/com/google/firebase/dataconnect/testutil/InProcessDataConnectGrpcStreamingServer.kt index f4239c8e4d0..f00f0defc1b 100644 --- a/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/testutil/InProcessDataConnectGrpcStreamingServer.kt +++ b/firebase-dataconnect/src/test/kotlin/com/google/firebase/dataconnect/testutil/InProcessDataConnectGrpcStreamingServer.kt @@ -14,6 +14,8 @@ * limitations under the License. */ +@file:SharedWithAndroidTest + package com.google.firebase.dataconnect.testutil import google.firebase.dataconnect.proto.ConnectorStreamServiceGrpc.ConnectorStreamServiceImplBase diff --git a/firebase-dataconnect/androidTestutil/src/main/kotlin/com/google/firebase/dataconnect/testutil/TurbineUtils.kt b/firebase-dataconnect/testutil/src/main/kotlin/com/google/firebase/dataconnect/testutil/TurbineUtils.kt similarity index 100% rename from firebase-dataconnect/androidTestutil/src/main/kotlin/com/google/firebase/dataconnect/testutil/TurbineUtils.kt rename to firebase-dataconnect/testutil/src/main/kotlin/com/google/firebase/dataconnect/testutil/TurbineUtils.kt diff --git a/firebase-dataconnect/testutil/testutil.gradle.kts b/firebase-dataconnect/testutil/testutil.gradle.kts index 1c34c94e221..9372cafec33 100644 --- a/firebase-dataconnect/testutil/testutil.gradle.kts +++ b/firebase-dataconnect/testutil/testutil.gradle.kts @@ -67,6 +67,7 @@ dependencies { compileOnly(libs.commons.statistics.inference) implementation(libs.androidx.test.junit) + implementation(libs.grpc.api) implementation(libs.kotest.assertions) implementation(libs.kotest.property) implementation(libs.kotlin.coroutines.test) @@ -77,6 +78,7 @@ dependencies { implementation(libs.robolectric) implementation(libs.testonly.three.ten.abp) implementation(libs.truth) + implementation(libs.turbine) testImplementation(libs.commons.statistics.inference) } From 9ed08880f5d0d81c4052acc8a08198a1ae06e578 Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Mon, 11 May 2026 18:05:12 -0400 Subject: [PATCH 02/10] DataConnectGrpcRPCs.kt: implemented connect() to open a bidirectional streaming connection with the backend for realtime queries. --- .../core/DataConnectBidiConnectStream.kt | 340 ++++++++++++++++++ .../dataconnect/core/DataConnectGrpcRPCs.kt | 150 ++++++++ .../core/FirebaseDataConnectImpl.kt | 1 + .../core/DataConnectGrpcRPCsUnitTest.kt | 1 + 4 files changed, 492 insertions(+) create mode 100644 firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectBidiConnectStream.kt diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectBidiConnectStream.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectBidiConnectStream.kt new file mode 100644 index 00000000000..bbf8deb8dca --- /dev/null +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectBidiConnectStream.kt @@ -0,0 +1,340 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.firebase.dataconnect.core + +import com.google.firebase.dataconnect.core.LoggerGlobals.debug +import com.google.firebase.dataconnect.util.CoroutineUtils +import com.google.firebase.dataconnect.util.NullableReference +import com.google.firebase.dataconnect.util.ProtoUtil.toCompactString +import com.google.protobuf.Struct +import google.firebase.dataconnect.proto.ExecuteRequest +import google.firebase.dataconnect.proto.GraphqlError as GraphqlErrorProto +import google.firebase.dataconnect.proto.GraphqlResponseExtensions.DataConnectProperties as DataConnectPropertiesProto +import google.firebase.dataconnect.proto.StreamRequest as StreamRequestProto +import google.firebase.dataconnect.proto.StreamResponse as StreamResponseProto +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.cancel +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.SendChannel +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.flow.SharingStarted +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.asStateFlow +import kotlinx.coroutines.flow.buffer +import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.mapNotNull +import kotlinx.coroutines.flow.onCompletion +import kotlinx.coroutines.flow.onSubscription +import kotlinx.coroutines.flow.shareIn +import kotlinx.coroutines.flow.transformWhile +import kotlinx.coroutines.flow.update +import kotlinx.coroutines.job + +/** + * Manages a bidirectional gRPC stream for Data Connect operations. + * + * This class multiplexes multiple incoming requests and outgoing responses over a single underlying + * bidirectional connection. It manages the lifecycle of the connection state (Open, Closing, + * Closed), buffering, and correlation of incoming responses to their respective subscribers based + * on `requestId`. + * + * @param outgoingRequests A [SendChannel] where all multiplexed outgoing [StreamRequestProto] + * messages are sent. + * @param incomingResponses A [Flow] of incoming [StreamResponseProto] messages from the server. + * @param coroutineScope The [CoroutineScope] used to launch background collection and manage stream + * lifecycle. + * @param logger The [Logger] used for debug and error logging. + */ +internal class DataConnectBidiConnectStream( + outgoingRequests: SendChannel, + incomingResponses: Flow, + coroutineScope: CoroutineScope, + private val logger: Logger, +) { + + private val state = + MutableStateFlow( + run { + val collectCoroutineScope = + CoroutineUtils.createSupervisorCoroutineScope( + coroutineScope.coroutineContext, + logger, + parent = coroutineScope.coroutineContext.job, + ) + + val completedResponse = + MutableStateFlow(NullableReference(null)) + fun setCompletedResponse(completed: IncomingResponse.Completed) { + completedResponse.update { currentValue -> + check(currentValue.ref === null) { + "internal error t67ss93fvp: completedResponse=${currentValue.ref}, " + + "but expected it to be null since IncomingResponse.Completed " + + "should only ever be emitted once by incomingResponsesSharedFlow" + } + NullableReference(completed) + } + } + + val incomingResponsesSharedFlow = + incomingResponses + .map<_, IncomingResponse>(IncomingResponse::Message) + .onCompletion { throwable -> + val completed = IncomingResponse.Completed(throwable) + setCompletedResponse(completed) + if (throwable === null) { + emit(completed) + } + } + .catch { emit(IncomingResponse.Completed(throwable = it)) } + .buffer(capacity = Channel.UNLIMITED) + .shareIn(collectCoroutineScope, started = SharingStarted.Eagerly, replay = 0) + + State.Open( + outgoingRequests = outgoingRequests, + incomingResponses = incomingResponsesSharedFlow, + completedResponse = completedResponse.asStateFlow(), + coroutineScope = collectCoroutineScope + ) + } + ) + + /** + * Closes the bidirectional stream gracefully. + * + * This method initiates the closure of the internal coroutine scope used for collecting incoming + * responses and suspends until the closure has completed. Once closed, the stream cannot be + * reopened and subsequent calls to [subscribe] will throw an exception. + * + * This method is safe to call many times. All calls will suspend until the closure has completed, + * just like the first call will. If the closure has already completed then this method will + * return immediately as if successful. + */ + suspend fun close() { + logger.debug { "close()" } + + while (true) { + val currentState = state.value + + val newState = + when (currentState) { + is State.Open -> { + currentState.coroutineScope.cancel( + "DataConnectBidiConnectStream.close() called [fvj7hnfksd]" + ) + State.Closing(currentState.coroutineScope) + } + is State.Closing -> { + currentState.coroutineScope.coroutineContext.job.join() + State.Closed + } + State.Closed -> return + } + + state.compareAndSet(currentState, newState) + } + } + + /** + * Starts a subscription for the query with the given [operationName] and [variables]. + * + * @param requestId A unique identifier for this request, used to correlate incoming responses. + * @param operationName The name of the operation to execute. + * @param variables The variables for the operation. + * @return A [Flow] of [ExecuteResponse] objects for the subscription. + */ + fun subscribe( + requestId: String, + operationName: String, + variables: Struct, + ): Flow { + val streams = + when (val currentState = this.state.value) { + is State.Open -> currentState + is State.Closing, + State.Closed -> error("DataConnectBidiConnectStream.close() has been called [rptkgcfzyz]") + } + + val streamRequest = + StreamRequestProto.newBuilder().let { streamRequest -> + streamRequest.setRequestId(requestId) + streamRequest.setSubscribe( + ExecuteRequest.newBuilder().let { executeRequest -> + executeRequest.setOperationName(operationName) + executeRequest.setVariables(variables) + executeRequest.build() + } + ) + streamRequest.build() + } + + val outgoingRequests = streams.outgoingRequests + val incomingResponses = streams.incomingResponses + val completedResponse = streams.completedResponse + + return incomingResponses + .onSubscription { emit(IncomingResponse.Subscribed) } + .transformWhile { incomingResponse -> + when (incomingResponse) { + is IncomingResponse.Subscribed -> { + outgoingRequests.send(streamRequest) + true + } + is IncomingResponse.Message -> { + if (incomingResponse.streamResponse.requestId != requestId) { + true + } else { + val executeResponse = incomingResponse.streamResponse.toExecuteResponse() + if (executeResponse !== null) { + emit(executeResponse) + } + !incomingResponse.streamResponse.cancelled + } + } + is IncomingResponse.Completed -> { + // NOTE: The downstream onCompletion() callback looks after throwing the exception. + false + } + } + } + .onCompletion { throwable -> + if (throwable === null) { + val completed = completedResponse.mapNotNull { it.ref }.first() + if (completed.throwable !== null) { + throw completed.throwable + } + } + } + } + + /** + * Represents the application-level response to a GraphQL execution request. + * + * @property data The data payload returned by the GraphQL query or mutation. + * @property errors The errors related to the execution of the operation. + * @property extensions Additional metadata or properties related to the execution. + */ + class ExecuteResponse( + val data: Struct, + val errors: List, + val extensions: List, + ) { + operator fun component1() = data + operator fun component2() = errors + operator fun component3() = extensions + } + + /** + * Represents the current operational state of the [DataConnectBidiConnectStream]. + * + * State transitions flow from [Open] -> [Closing] -> [Closed]. + */ + private sealed interface State { + /** + * The stream is fully operational and accepting new subscriptions. This is the initial state of + * a newly-created [DataConnectBidiConnectStream] object. + * + * @property outgoingRequests The channel to which to send requests. + * @property incomingResponses The shared flow containing processed [IncomingResponse] signals. + * @property completedResponse A reference that will set to the [IncomingResponse.Completed] + * message _before_ the message is emitted from [incomingResponses]. + * @property coroutineScope The scope actively managing the collection of incoming responses; + * this scope must be canceled by [DataConnectBidiConnectStream.close]. + */ + class Open( + val outgoingRequests: SendChannel, + val incomingResponses: SharedFlow, + val completedResponse: StateFlow>, + val coroutineScope: CoroutineScope, + ) : State { + override fun toString() = "Open" + } + + /** + * The stream is in the process of shutting down and waiting for active jobs to complete. + * + * @property coroutineScope The scope that is undergoing cancellation. + */ + class Closing(val coroutineScope: CoroutineScope) : State { + override fun toString() = "Closing" + } + + /** The stream is completely shut down and inactive. */ + object Closed : State { + override fun toString() = "Closed" + } + } + + /** + * Represents an internal wrapper around incoming server responses and lifecycle signals. + * + * This sealed interface allows the internal [SharedFlow] to multiplex actual response data + * alongside control signals like completion, subscriber readiness, and buffer flushes. + */ + private sealed interface IncomingResponse { + + /** Represents a standard data response from the server. */ + class Message(val streamResponse: StreamResponseProto) : IncomingResponse { + override fun toString() = "Message(${streamResponse.toCompactString()})" + } + + /** + * Represents the termination of the incoming stream, either naturally or due to an error. + * + * By placing this in the [SharedFlow], new or existing subscribers can be notified immediately + * if the underlying stream is disconnected. + * + * @property throwable The exception that caused termination, or null if the stream completed + * normally. + */ + class Completed(val throwable: Throwable?) : IncomingResponse { + override fun toString() = "Completed(throwable=$throwable)" + } + + /** + * A control signal used to synchronize the start of outgoing requests with the readiness of the + * collector. + * + * Emitted locally inside the [subscribe] method's `onSubscription` block. This guarantees that + * the collector in `transformWhile` is fully registered and actively listening to the + * [SharedFlow] *before* the [StreamRequestProto] is actually sent to the server. Without this + * signal, there is a race condition where the server might respond to the request so fast that + * the resulting [Message] is processed by the [SharedFlow] before the `subscribe` collector has + * started listening, leading to silently lost responses. + */ + object Subscribed : IncomingResponse + } + + private companion object { + + fun StreamResponseProto.toExecuteResponse(): ExecuteResponse? = + if (!hasData() && errorsCount == 0) { + null + } else { + ExecuteResponse( + data = if (hasData()) data else Struct.getDefaultInstance(), + errors = if (errorsCount > 0) errorsList else emptyList(), + extensions = + if (hasExtensions() && extensions.dataConnectCount > 0) extensions.dataConnectList + else emptyList(), + ) + } + } +} diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectGrpcRPCs.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectGrpcRPCs.kt index cbf30881490..4531ad9f1ff 100644 --- a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectGrpcRPCs.kt +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectGrpcRPCs.kt @@ -30,6 +30,7 @@ import com.google.firebase.dataconnect.core.LoggerGlobals.debug import com.google.firebase.dataconnect.core.LoggerGlobals.warn import com.google.firebase.dataconnect.sqlite.DataConnectCacheDatabase import com.google.firebase.dataconnect.sqlite.GetEntityIdForPathFunction +import com.google.firebase.dataconnect.util.CoroutineUtils import com.google.firebase.dataconnect.util.ImmutableByteArray import com.google.firebase.dataconnect.util.NullableReference import com.google.firebase.dataconnect.util.ProtoUtil.buildStructProto @@ -42,6 +43,8 @@ import com.google.protobuf.Duration as DurationProto import com.google.protobuf.Struct import google.firebase.dataconnect.proto.ConnectorServiceGrpc import google.firebase.dataconnect.proto.ConnectorServiceGrpcKt +import google.firebase.dataconnect.proto.ConnectorStreamServiceGrpc +import google.firebase.dataconnect.proto.ConnectorStreamServiceGrpcKt import google.firebase.dataconnect.proto.EmulatorInfo import google.firebase.dataconnect.proto.EmulatorIssuesResponse import google.firebase.dataconnect.proto.EmulatorServiceGrpc @@ -53,6 +56,8 @@ import google.firebase.dataconnect.proto.ExecuteQueryResponse import google.firebase.dataconnect.proto.GetEmulatorInfoRequest import google.firebase.dataconnect.proto.GraphqlResponseExtensions.DataConnectProperties import google.firebase.dataconnect.proto.StreamEmulatorIssuesRequest +import google.firebase.dataconnect.proto.StreamRequest +import google.firebase.dataconnect.proto.StreamResponse import io.grpc.ManagedChannel import io.grpc.ManagedChannelBuilder import io.grpc.Metadata @@ -66,10 +71,15 @@ import kotlin.time.Duration import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.asExecutor +import kotlinx.coroutines.cancel +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.consumeAsFlow import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onStart +import kotlinx.coroutines.job import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.withContext @@ -79,6 +89,7 @@ internal class DataConnectGrpcRPCs( host: String, sslEnabled: Boolean, @get:VisibleForTesting val connectorResourceName: String, + nonBlockingCoroutineDispatcher: CoroutineDispatcher, private val blockingCoroutineDispatcher: CoroutineDispatcher, private val grpcMetadata: DataConnectGrpcMetadata, private val cacheSettings: CacheSettings?, @@ -154,12 +165,25 @@ internal class DataConnectGrpcRPCs( ConnectorServiceGrpcKt.ConnectorServiceCoroutineStub(lazyGrpcChannel.getLocked()) } + private val lazyStreamingGrpcStub = + SuspendingLazy(mutex) { + check(!closed) { "DataConnectGrpcRPCs ${logger.nameWithId} instance has been closed" } + ConnectorStreamServiceGrpcKt.ConnectorStreamServiceCoroutineStub(lazyGrpcChannel.getLocked()) + } + private val lazyEmulatorGrpcStub = SuspendingLazy(mutex) { check(!closed) { "DataConnectGrpcRPCs ${logger.nameWithId} instance has been closed" } EmulatorServiceGrpcKt.EmulatorServiceCoroutineStub(lazyGrpcChannel.getLocked()) } + private val connectCoroutineScope = + CoroutineUtils.createSupervisorCoroutineScope( + nonBlockingCoroutineDispatcher, + logger, + coroutineName = "connectCoroutineScope@${logger.nameWithId}" + ) + suspend fun executeMutation( requestId: String, operationName: String, @@ -361,6 +385,107 @@ internal class DataConnectGrpcRPCs( return cachedData?.let(ExecuteQueryResult::FromCache) } + suspend fun connect( + streamId: String, + callerSdkType: FirebaseDataConnect.CallerSdkType, + authToken: DataConnectAuth.GetAuthTokenResult?, + appCheckToken: DataConnectAppCheck.GetAppCheckTokenResult?, + ): DataConnectBidiConnectStream { + val metadata = grpcMetadata.get(authToken, appCheckToken, callerSdkType) + val kotlinMethodName = "connect()" + + val initRequest: StreamRequest = + StreamRequest.newBuilder().run { + setRequestId("init") + setName(connectorResourceName) + build() + } + + val outgoingRequests = Channel(capacity = UNLIMITED) + + outgoingRequests.trySend(initRequest).let { + check(it.isSuccess) { + "internal error b2bs3s6n3c: outgoingRequests.trySend(initRequest) should have succeeded " + + "since the outgoingRequests Channel has capacity=UNLIMITED, but it did not: $it" + } + } + + val outgoingRequestsFlow: Flow = + outgoingRequests.consumeAsFlow().onEach { + if (it === initRequest) { + logger.logGrpcSending( + requestId = streamId, + kotlinMethodName = kotlinMethodName, + grpcMethod = ConnectorStreamServiceGrpc.getConnectMethod(), + metadata = metadata, + request = { initRequest.toStructProto(authUid = authToken?.authUid) }, + requestTypeName = "StreamRequest", + authUid = authToken?.authUid, + ) + } else { + logger.logGrpcSending( + streamId = streamId, + requestId = it.requestId, + kotlinMethodName = kotlinMethodName, + request = { it.toStructProto(authUid = authToken?.authUid) }, + requestTypeName = "StreamRequest", + ) + } + } + + val result = lazyStreamingGrpcStub.get().runCatching { connect(outgoingRequestsFlow, metadata) } + + result.onFailure { + logger.logGrpcFailed( + requestId = streamId, + kotlinMethodName = kotlinMethodName, + it, + ) + } + + val incomingResponses: Flow = + result + .getOrThrow() + .onEach { + logger.logGrpcReceived( + streamId = streamId, + requestId = it.requestId, + kotlinMethodName = kotlinMethodName, + response = { it.toStructProto() }, + responseTypeName = "StreamResponse", + ) + } + .onCompletion { throwable -> + outgoingRequests.close( + when (throwable) { + null -> CancellationException("stream $streamId completed") + is CancellationException -> throwable + else -> CancellationException("stream $streamId failed", throwable) + } + ) + + if (throwable === null) { + logger.logGrpcCompleted( + requestId = streamId, + kotlinMethodName = kotlinMethodName, + ) + } else { + logger.logGrpcFailed( + requestId = streamId, + kotlinMethodName = kotlinMethodName, + throwable = throwable, + ) + } + } + + return DataConnectBidiConnectStream( + outgoingRequests, + incomingResponses, + connectCoroutineScope, + Logger("${logger.nameWithId} $kotlinMethodName [sid=$streamId]"), + ) + } + suspend fun getEmulatorInfo(requestId: String): EmulatorInfo { val request = GetEmulatorInfoRequest.getDefaultInstance() val kotlinMethodName = "getEmulatorInfo()" @@ -433,6 +558,7 @@ internal class DataConnectGrpcRPCs( suspend fun close() { logger.debug { "close()" } mutex.withLock { closed = true } + connectCoroutineScope.cancel("DataConnectGrpcRPCs.close() called [xn8dqn8dzm]") val grpcChannel = lazyGrpcChannel.initializedValueOrNull val cacheDb = lazyCacheDb.initializedValueOrNull?.ref @@ -463,6 +589,8 @@ internal class DataConnectGrpcRPCs( if (exceptions.isNotEmpty()) { throw exceptions.first().apply { exceptions.drop(1).forEach { addSuppressed(it) } } } + + connectCoroutineScope.coroutineContext.job.join() } companion object { @@ -535,6 +663,28 @@ internal class DataConnectGrpcRPCs( "$kotlinMethodName [rid=$requestId] sending: ${struct.toCompactString(keySortSelector)}" } + private inline fun Logger.logGrpcSending( + streamId: String, + requestId: String, + kotlinMethodName: String, + request: () -> Struct, + requestTypeName: String, + ) = debug { + "$kotlinMethodName [sid=$streamId, rid=$requestId] sending $requestTypeName: " + + request().toCompactString() + } + + private inline fun Logger.logGrpcReceived( + streamId: String, + requestId: String, + kotlinMethodName: String, + response: () -> Struct, + responseTypeName: String, + ) = debug { + "$kotlinMethodName [sid=$streamId, rid=$requestId] received $responseTypeName: " + + response().toCompactString() + } + private fun Logger.logGrpcReturningFromCache( requestId: String, kotlinMethodName: String, diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt index 71c46982405..3cf6ff04979 100644 --- a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt @@ -263,6 +263,7 @@ internal class FirebaseDataConnectImpl( host = backendInfo.host, sslEnabled = backendInfo.sslEnabled, connectorResourceName = connectorResourceName, + nonBlockingCoroutineDispatcher = nonBlockingDispatcher, blockingCoroutineDispatcher = blockingDispatcher, grpcMetadata = grpcMetadata, cacheSettings = cacheSettings, diff --git a/firebase-dataconnect/src/test/kotlin/com/google/firebase/dataconnect/core/DataConnectGrpcRPCsUnitTest.kt b/firebase-dataconnect/src/test/kotlin/com/google/firebase/dataconnect/core/DataConnectGrpcRPCsUnitTest.kt index eaea7e9d12e..da22df842be 100644 --- a/firebase-dataconnect/src/test/kotlin/com/google/firebase/dataconnect/core/DataConnectGrpcRPCsUnitTest.kt +++ b/firebase-dataconnect/src/test/kotlin/com/google/firebase/dataconnect/core/DataConnectGrpcRPCsUnitTest.kt @@ -429,6 +429,7 @@ class DataConnectGrpcRPCsUnitTest { host = "localhost:${server.port}", sslEnabled = false, connectorResourceName = connectorResourceNameArb.next(), + nonBlockingCoroutineDispatcher = Dispatchers.Default, blockingCoroutineDispatcher = Dispatchers.IO, grpcMetadata = grpcMetadataArb.next(), cacheSettings = CacheSettings(newDbFile(), maxAge = 1.hours), From 57d35a4d3bff6b7f281c9746c1dc3c735a9ac076 Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Mon, 11 May 2026 18:07:50 -0400 Subject: [PATCH 03/10] DataConnectGrpcRPCsUnitTest.kt: add a basic test for the new connect() method --- .../firebase-dataconnect.gradle.kts | 1 + .../core/DataConnectGrpcRPCsUnitTest.kt | 72 ++++++++++++++++--- .../testutil/property/arbitrary/arbs.kt | 5 ++ 3 files changed, 69 insertions(+), 9 deletions(-) diff --git a/firebase-dataconnect/firebase-dataconnect.gradle.kts b/firebase-dataconnect/firebase-dataconnect.gradle.kts index 6369c0a4b19..986b110f0ed 100644 --- a/firebase-dataconnect/firebase-dataconnect.gradle.kts +++ b/firebase-dataconnect/firebase-dataconnect.gradle.kts @@ -134,6 +134,7 @@ dependencies { testImplementation(libs.mockk) testImplementation(libs.testonly.three.ten.abp) testImplementation(libs.robolectric) + testImplementation(libs.turbine) androidTestImplementation(project(":firebase-dataconnect:androidTestutil")) androidTestImplementation(project(":firebase-dataconnect:connectors")) diff --git a/firebase-dataconnect/src/test/kotlin/com/google/firebase/dataconnect/core/DataConnectGrpcRPCsUnitTest.kt b/firebase-dataconnect/src/test/kotlin/com/google/firebase/dataconnect/core/DataConnectGrpcRPCsUnitTest.kt index da22df842be..bb857a0323e 100644 --- a/firebase-dataconnect/src/test/kotlin/com/google/firebase/dataconnect/core/DataConnectGrpcRPCsUnitTest.kt +++ b/firebase-dataconnect/src/test/kotlin/com/google/firebase/dataconnect/core/DataConnectGrpcRPCsUnitTest.kt @@ -17,6 +17,7 @@ package com.google.firebase.dataconnect.core +import app.cash.turbine.test import com.google.firebase.dataconnect.CachedDataNotFoundException import com.google.firebase.dataconnect.DataConnectPathSegment import com.google.firebase.dataconnect.FirebaseDataConnect.CallerSdkType @@ -26,8 +27,12 @@ import com.google.firebase.dataconnect.core.DataConnectGrpcRPCs.ExecuteQueryResu import com.google.firebase.dataconnect.sqlite.QueryResultArb import com.google.firebase.dataconnect.sqlite.QueryResultArb.EntityRepeatPolicy.INTER_SAMPLE_MUTATED import com.google.firebase.dataconnect.sqlite.hydratedStructWithMutatedEntityValuesFrom +import com.google.firebase.dataconnect.testutil.CleanupsRule import com.google.firebase.dataconnect.testutil.DataConnectLogLevelRule import com.google.firebase.dataconnect.testutil.DataConnectPath +import com.google.firebase.dataconnect.testutil.InProcessDataConnectGrpcStreamingServer +import com.google.firebase.dataconnect.testutil.RandomSeedTestRule +import com.google.firebase.dataconnect.testutil.awaitUntilItemIsInstance import com.google.firebase.dataconnect.testutil.newMockLogger import com.google.firebase.dataconnect.testutil.property.arbitrary.ProtoArb import com.google.firebase.dataconnect.testutil.property.arbitrary.appCheckTokenResult @@ -52,11 +57,13 @@ import google.firebase.dataconnect.proto.ExecuteQueryRequest import google.firebase.dataconnect.proto.ExecuteQueryResponse import google.firebase.dataconnect.proto.GraphqlResponseExtensions import google.firebase.dataconnect.proto.GraphqlResponseExtensions.DataConnectProperties +import google.firebase.dataconnect.proto.StreamRequest import io.grpc.InsecureServerCredentials import io.grpc.Server import io.grpc.okhttp.OkHttpServerBuilder import io.grpc.stub.StreamObserver import io.kotest.assertions.assertSoftly +import io.kotest.assertions.print.print import io.kotest.assertions.throwables.shouldThrow import io.kotest.assertions.withClue import io.kotest.common.DelicateKotest @@ -66,6 +73,7 @@ import io.kotest.matchers.types.shouldBeInstanceOf import io.kotest.property.Arb import io.kotest.property.EdgeConfig import io.kotest.property.PropTestConfig +import io.kotest.property.RandomSource import io.kotest.property.arbitrary.bind import io.kotest.property.arbitrary.distinct import io.kotest.property.arbitrary.enum @@ -95,9 +103,14 @@ class DataConnectGrpcRPCsUnitTest { @get:Rule val dataConnectLogLevelRule = DataConnectLogLevelRule() @get:Rule val temporaryFolder = TemporaryFolder() + @get:Rule val randomSeedTestRule = RandomSeedTestRule() + @get:Rule val cleanups = CleanupsRule() + + private val rs: RandomSource by randomSeedTestRule.rs private val mockLogger = newMockLogger("s3nx74epqj") private val requestIdArb = Arb.dataConnect.requestId() + private val streamIdArb = Arb.dataConnect.streamId() private val connectorResourceNameArb = Arb.dataConnect.connectorResourceName() private val operationNameVariablesPairArb = operationNameVariablesPairArb() private val callerSdkTypeArb = Arb.enum() @@ -220,18 +233,18 @@ class DataConnectGrpcRPCsUnitTest { fun `executeQuery(fetchPolicy=CACHE_ONLY) throws if no cached data`() = runTest { startServer().use { server -> val dataConnectGrpcRPCs = newDataConnectGrpcRPCs(server) - val request = operationNameVariablesPairArb.next() + val request = operationNameVariablesPairArb.next(rs) val exception = shouldThrow { dataConnectGrpcRPCs.executeQuery( - requestIdArb.next(), + requestIdArb.next(rs), request.operationName, request.variables, - callerSdkTypeArb.next(), + callerSdkTypeArb.next(rs), FetchPolicy.CACHE_ONLY, - Arb.dataConnect.authTokenResult().orNull(nullProbability = 0.3).next(), - Arb.dataConnect.appCheckTokenResult().orNull(nullProbability = 0.3).next(), + Arb.dataConnect.authTokenResult().orNull(nullProbability = 0.3).next(rs), + Arb.dataConnect.appCheckTokenResult().orNull(nullProbability = 0.3).next(rs), ) } @@ -373,6 +386,40 @@ class DataConnectGrpcRPCsUnitTest { } } + @Test + fun `connect() eagerly sends init request`() = runTest { + val server = InProcessDataConnectGrpcStreamingServer() + cleanups.register(server) + server.open() + val dataConnectGrpcRPCs = newDataConnectGrpcRPCs(server) + + server.events.test { + dataConnectGrpcRPCs.connect() + + val streamRequest = + awaitUntilItemIsInstance< + _, InProcessDataConnectGrpcStreamingServer.Event.StreamRequestReceived + >() + .streamRequest + + withClue("streamRequest=${streamRequest.print().value}") { + withClue("requestId") { streamRequest.requestId shouldBe "init" } + withClue("name") { streamRequest.name shouldBe dataConnectGrpcRPCs.connectorResourceName } + withClue("requestKindCase") { + streamRequest.requestKindCase shouldBe StreamRequest.RequestKindCase.REQUESTKIND_NOT_SET + } + } + } + } + + private suspend fun DataConnectGrpcRPCs.connect() = + connect( + streamId = streamIdArb.next(rs), + callerSdkType = Arb.enum().next(rs), + authToken = Arb.dataConnect.authTokenResult().orNull(nullProbability = 0.2).next(rs), + appCheckToken = Arb.dataConnect.appCheckTokenResult().orNull(nullProbability = 0.2).next(rs), + ) + private fun newDbFile() = File(temporaryFolder.newFolder(), "db.sqlite") private class StartServerResult( @@ -423,15 +470,22 @@ class DataConnectGrpcRPCsUnitTest { } } - private fun newDataConnectGrpcRPCs(server: StartServerResult) = + private fun newDataConnectGrpcRPCs(server: StartServerResult): DataConnectGrpcRPCs = + newDataConnectGrpcRPCsForLocalhostServerOnPort(server.port) + + private fun newDataConnectGrpcRPCs( + server: InProcessDataConnectGrpcStreamingServer + ): DataConnectGrpcRPCs = newDataConnectGrpcRPCsForLocalhostServerOnPort(server.port) + + private fun newDataConnectGrpcRPCsForLocalhostServerOnPort(port: Int) = DataConnectGrpcRPCs( context = RuntimeEnvironment.getApplication(), - host = "localhost:${server.port}", + host = "localhost:$port", sslEnabled = false, - connectorResourceName = connectorResourceNameArb.next(), + connectorResourceName = connectorResourceNameArb.next(rs), nonBlockingCoroutineDispatcher = Dispatchers.Default, blockingCoroutineDispatcher = Dispatchers.IO, - grpcMetadata = grpcMetadataArb.next(), + grpcMetadata = grpcMetadataArb.next(rs), cacheSettings = CacheSettings(newDbFile(), maxAge = 1.hours), parentLogger = mockLogger, ) diff --git a/firebase-dataconnect/testutil/src/main/kotlin/com/google/firebase/dataconnect/testutil/property/arbitrary/arbs.kt b/firebase-dataconnect/testutil/src/main/kotlin/com/google/firebase/dataconnect/testutil/property/arbitrary/arbs.kt index 1f1bad1875c..503bb4f9ab1 100644 --- a/firebase-dataconnect/testutil/src/main/kotlin/com/google/firebase/dataconnect/testutil/property/arbitrary/arbs.kt +++ b/firebase-dataconnect/testutil/src/main/kotlin/com/google/firebase/dataconnect/testutil/property/arbitrary/arbs.kt @@ -161,6 +161,11 @@ object DataConnectArb { "requestId_${string.bind()}" } + fun streamId(string: Arb = Arb.string(size = 8, Codepoint.alphanumeric())): Arb = + arbitrary { + "streamId_${string.bind()}" + } + fun connectorResourceName( string: Arb = Arb.string(size = 8, Codepoint.az()) ): Arb = arbitrary { "connectorResourceName_${string.bind()}" } From dc4a1d319f0baf51e9b8946b20bb72924558a81a Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Mon, 11 May 2026 18:16:39 -0400 Subject: [PATCH 04/10] DataConnectIntegrationTestBase.kt: add function: `fun Arb.sample(): T` (moved from ConnectRPCIntegrationTest.kt) --- .../dataconnect/testutil/DataConnectIntegrationTestBase.kt | 6 ++++++ .../firebase/dataconnect/ConnectRPCIntegrationTest.kt | 6 ------ 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/firebase-dataconnect/androidTestutil/src/main/kotlin/com/google/firebase/dataconnect/testutil/DataConnectIntegrationTestBase.kt b/firebase-dataconnect/androidTestutil/src/main/kotlin/com/google/firebase/dataconnect/testutil/DataConnectIntegrationTestBase.kt index 38b3768aa42..dfb35f20337 100644 --- a/firebase-dataconnect/androidTestutil/src/main/kotlin/com/google/firebase/dataconnect/testutil/DataConnectIntegrationTestBase.kt +++ b/firebase-dataconnect/androidTestutil/src/main/kotlin/com/google/firebase/dataconnect/testutil/DataConnectIntegrationTestBase.kt @@ -70,6 +70,12 @@ abstract class DataConnectIntegrationTestBase { } } + /** + * Convenience extension function on [Arb] that gets a non-edge-case value using [rs] for the + * randomness source. + */ + fun Arb.sample(): T = sample(rs).value + companion object { val testConnectorConfig: ConnectorConfig get() = diff --git a/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/ConnectRPCIntegrationTest.kt b/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/ConnectRPCIntegrationTest.kt index 97e19c0479f..94674c75504 100644 --- a/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/ConnectRPCIntegrationTest.kt +++ b/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/ConnectRPCIntegrationTest.kt @@ -505,12 +505,6 @@ class ConnectRPCIntegrationTest : DataConnectIntegrationTestBase() { val backend = DataConnectBackend.Custom("localhost:${inProcessServer.port}", sslEnabled = false) return connect(backend) } - - /** - * Convenience extension function on [Arb] that gets a non-edge-case value using the [rs] property - * of the [DataConnectIntegrationTestBase] superclass for the randomness source. - */ - private fun Arb.sample(): T = sample(rs).value } @OptIn(ExperimentalKotest::class) From 3a60db2115d2f88c652c0fa96eba8d78e9eb4677 Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Mon, 11 May 2026 18:18:08 -0400 Subject: [PATCH 05/10] DataConnectGrpcRPCsConnectIntegrationTest.kt: added with two very basic tests for DataConnectGrpcRPCs.connect() --- ...taConnectGrpcRPCsConnectIntegrationTest.kt | 143 ++++++++++++++++++ .../FirebaseDataConnectInternalExts.kt | 3 + .../testutil/schemas/RealtimeConnector.kt | 9 +- 3 files changed, 154 insertions(+), 1 deletion(-) create mode 100644 firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/DataConnectGrpcRPCsConnectIntegrationTest.kt diff --git a/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/DataConnectGrpcRPCsConnectIntegrationTest.kt b/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/DataConnectGrpcRPCsConnectIntegrationTest.kt new file mode 100644 index 00000000000..88f7c33ca83 --- /dev/null +++ b/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/DataConnectGrpcRPCsConnectIntegrationTest.kt @@ -0,0 +1,143 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.firebase.dataconnect + +import app.cash.turbine.test +import com.google.firebase.dataconnect.FirebaseDataConnect.CallerSdkType +import com.google.firebase.dataconnect.core.DataConnectBidiConnectStream +import com.google.firebase.dataconnect.core.DataConnectBidiConnectStream.ExecuteResponse +import com.google.firebase.dataconnect.core.DataConnectGrpcRPCs +import com.google.firebase.dataconnect.testutil.DataConnectIntegrationTestBase +import com.google.firebase.dataconnect.testutil.property.arbitrary.dataConnect +import com.google.firebase.dataconnect.testutil.property.arbitrary.pair +import com.google.firebase.dataconnect.testutil.registerDataConnectKotestPrinters +import com.google.firebase.dataconnect.testutil.schemas.RealtimeConnector +import com.google.firebase.dataconnect.testutil.schemas.RealtimeConnector.GetStringByKeyQuery +import com.google.firebase.dataconnect.util.ProtoUtil.decodeFromStruct +import com.google.firebase.dataconnect.util.ProtoUtil.encodeToStruct +import com.google.protobuf.Struct +import google.firebase.dataconnect.proto.GraphqlError as GraphqlErrorProto +import io.kotest.assertions.assertSoftly +import io.kotest.assertions.withClue +import io.kotest.matchers.collections.shouldContainExactlyInAnyOrder +import io.kotest.matchers.nulls.shouldBeNull +import io.kotest.matchers.nulls.shouldNotBeNull +import io.kotest.matchers.shouldBe +import io.kotest.property.Arb +import io.kotest.property.arbitrary.Codepoint +import io.kotest.property.arbitrary.az +import io.kotest.property.arbitrary.enum +import io.kotest.property.arbitrary.map +import io.kotest.property.arbitrary.string +import io.kotest.property.arbitrary.uuid +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.test.runTest +import kotlinx.serialization.serializer +import org.junit.Before +import org.junit.Test + +/** Integration tests for the [DataConnectGrpcRPCs.connect] method and its return value. */ +class DataConnectGrpcRPCsConnectIntegrationTest : DataConnectIntegrationTestBase() { + + private val requestIdArb = Arb.dataConnect.requestId() + private val streamIdArb = Arb.dataConnect.streamId() + private val callerSdkTypeArb = Arb.enum() + private val keyArb = Arb.uuid().map(RealtimeConnector::Key) + private val nameArb = Arb.string(size = 4, Codepoint.az()).map { "name_$it" } + + @Before + fun registerPrinters() { + registerDataConnectKotestPrinters() + } + + @Test + fun subscribeEmitsInitialResult() = runTest { + val connector = RealtimeConnector.getInstance(dataConnectFactory) + val dataConnectGrpcRPCs = connector.dataConnectGrpcRPCs + val key = keyArb.sample() + + val stream = dataConnectGrpcRPCs.connect() + val executeResponseFlow: Flow = + stream.subscribe( + requestId = requestIdArb.sample(), + operationName = GetStringByKeyQuery.OPERATION_NAME, + variables = key.encodeToGetStringByKeyQueryVariables(), + ) + + executeResponseFlow.test { awaitItem().shouldBeGetStringByKeyQueryResponse(name = null) } + } + + @Test + fun subscribeEmitsUpdatedResult() = runTest { + val connector = RealtimeConnector.getInstance(dataConnectFactory) + val dataConnectGrpcRPCs = connector.dataConnectGrpcRPCs + val (name1, name2) = nameArb.pair().sample() + val key = connector.insertString(name = name1) + + val stream = dataConnectGrpcRPCs.connect() + val executeResponseFlow: Flow = + stream.subscribe( + requestId = requestIdArb.sample(), + operationName = GetStringByKeyQuery.OPERATION_NAME, + variables = key.encodeToGetStringByKeyQueryVariables(), + ) + + executeResponseFlow.test { + awaitItem().shouldBeGetStringByKeyQueryResponse(name = name1) + connector.updateString(key, name = name2) + awaitItem().shouldBeGetStringByKeyQueryResponse(name = name2) + connector.deleteString(key) + awaitItem().shouldBeGetStringByKeyQueryResponse(name = null) + } + } + + private suspend fun DataConnectGrpcRPCs.connect(): DataConnectBidiConnectStream = + connect( + streamId = streamIdArb.sample(), + callerSdkType = callerSdkTypeArb.sample(), + authToken = null, + appCheckToken = null, + ) +} + +private fun GetStringByKeyQuery.Variables.encodeToStruct(): Struct = + encodeToStruct(this, serializer(), null) + +private fun RealtimeConnector.Key.encodeToGetStringByKeyQueryVariables(): Struct = + GetStringByKeyQuery.Variables(this).encodeToStruct() + +private fun Struct.decodeAsGetStringByKeyQueryData(): GetStringByKeyQuery.Data = + decodeFromStruct(this, serializer(), null) + +private fun ExecuteResponse.shouldBeGetStringByKeyQueryResponse( + name: String?, + errors: List = emptyList(), +) { + assertSoftly { + withClue("data") { data.shouldBeGetStringByKeyQueryData(name) } + withClue("errors") { errors shouldContainExactlyInAnyOrder errors } + } +} + +private fun Struct.shouldBeGetStringByKeyQueryData(name: String?) { + val data: GetStringByKeyQuery.Data = decodeAsGetStringByKeyQueryData() + if (name === null) { + data.item.shouldBeNull() + } else { + data.item.shouldNotBeNull() shouldBe GetStringByKeyQuery.Data.Item(name) + } +} diff --git a/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/testutil/FirebaseDataConnectInternalExts.kt b/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/testutil/FirebaseDataConnectInternalExts.kt index f73f1d8b50d..9cd11843dfd 100644 --- a/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/testutil/FirebaseDataConnectInternalExts.kt +++ b/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/testutil/FirebaseDataConnectInternalExts.kt @@ -24,3 +24,6 @@ suspend fun FirebaseDataConnect.awaitAuthReady() = suspend fun FirebaseDataConnect.awaitAppCheckReady() = (this as FirebaseDataConnectInternal).awaitAppCheckReady() + +internal val FirebaseDataConnect.dataConnectGrpcRPCs + get() = (this as FirebaseDataConnectInternal).grpcRPCs diff --git a/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/testutil/schemas/RealtimeConnector.kt b/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/testutil/schemas/RealtimeConnector.kt index a9daa990a90..f89a05bf33d 100644 --- a/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/testutil/schemas/RealtimeConnector.kt +++ b/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/testutil/schemas/RealtimeConnector.kt @@ -18,6 +18,7 @@ package com.google.firebase.dataconnect.testutil.schemas import com.google.firebase.dataconnect.ConnectorConfig import com.google.firebase.dataconnect.FirebaseDataConnect +import com.google.firebase.dataconnect.core.DataConnectGrpcRPCs import com.google.firebase.dataconnect.core.FirebaseDataConnectInternal import com.google.firebase.dataconnect.serializers.UUIDSerializer import com.google.firebase.dataconnect.testutil.DataConnectBackend @@ -30,6 +31,9 @@ class RealtimeConnector private constructor(dataConnectInternal: FirebaseDataCon val dataConnect: FirebaseDataConnect = dataConnectInternal + internal val dataConnectGrpcRPCs: DataConnectGrpcRPCs by + lazy(LazyThreadSafetyMode.PUBLICATION) { dataConnectInternal.grpcRPCs } + val resourceName: String = dataConnectInternal.connectorResourceName val getStringByKey = GetStringByKeyQuery(this) @@ -57,7 +61,10 @@ class RealtimeConnector private constructor(dataConnectInternal: FirebaseDataCon fun queryRef(variables: Variables) = connector.dataConnect.query(OPERATION_NAME, variables, serializer(), serializer()) - @Serializable data class Variables(val key: Key) + @Serializable + data class Variables(val key: Key) { + constructor(id: UUID) : this(Key(id)) + } @Serializable data class Data(val item: Item?) { From 6ea21afad59562853ada1d3b661c184ad8b9f58d Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Mon, 11 May 2026 18:20:41 -0400 Subject: [PATCH 06/10] CHANGELOG.md entry added --- firebase-dataconnect/CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/firebase-dataconnect/CHANGELOG.md b/firebase-dataconnect/CHANGELOG.md index 3b05ca732d7..14af07a90ca 100644 --- a/firebase-dataconnect/CHANGELOG.md +++ b/firebase-dataconnect/CHANGELOG.md @@ -4,6 +4,8 @@ ([#8081](https://github.com/firebase/firebase-android-sdk/pull/8081)) - [changed] Fixed wasteful computation that is only for debug logging. ([#8126](https://github.com/firebase/firebase-android-sdk/pull/8126)) +- [changed] Internal implementation of backend connection for realtime query results. + ([#NNNN](https://github.com/firebase/firebase-android-sdk/pull/NNNN)) # 17.2.2 From a71072dae98c3cecef04548be235fc5c2d1cfb19 Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Mon, 11 May 2026 18:20:55 -0400 Subject: [PATCH 07/10] Empty commit to suppress github actions [skip actions] From 798aff5dc45a7195353e849949a76e1aedbc8e35 Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Mon, 11 May 2026 18:22:36 -0400 Subject: [PATCH 08/10] CHANGELOG.md: updated placeholders with the actual PR number --- firebase-dataconnect/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/firebase-dataconnect/CHANGELOG.md b/firebase-dataconnect/CHANGELOG.md index 14af07a90ca..89cfce4d5b2 100644 --- a/firebase-dataconnect/CHANGELOG.md +++ b/firebase-dataconnect/CHANGELOG.md @@ -5,7 +5,7 @@ - [changed] Fixed wasteful computation that is only for debug logging. ([#8126](https://github.com/firebase/firebase-android-sdk/pull/8126)) - [changed] Internal implementation of backend connection for realtime query results. - ([#NNNN](https://github.com/firebase/firebase-android-sdk/pull/NNNN)) + ([#8141](https://github.com/firebase/firebase-android-sdk/pull/8141)) # 17.2.2 From 744a49f8a42aeab4f560e095fee4f6393a8529c6 Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Mon, 11 May 2026 20:56:11 -0400 Subject: [PATCH 09/10] DataConnectGrpcRPCsConnectIntegrationTest.kt: fix assertion that compared errors with itself instead of with the expected list [skip actions] --- ...taConnectGrpcRPCsConnectIntegrationTest.kt | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/DataConnectGrpcRPCsConnectIntegrationTest.kt b/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/DataConnectGrpcRPCsConnectIntegrationTest.kt index 88f7c33ca83..2640053cbad 100644 --- a/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/DataConnectGrpcRPCsConnectIntegrationTest.kt +++ b/firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/DataConnectGrpcRPCsConnectIntegrationTest.kt @@ -78,7 +78,9 @@ class DataConnectGrpcRPCsConnectIntegrationTest : DataConnectIntegrationTestBase variables = key.encodeToGetStringByKeyQueryVariables(), ) - executeResponseFlow.test { awaitItem().shouldBeGetStringByKeyQueryResponse(name = null) } + executeResponseFlow.test { + awaitItem().shouldBeGetStringByKeyQueryResponse(expectedName = null) + } } @Test @@ -97,11 +99,11 @@ class DataConnectGrpcRPCsConnectIntegrationTest : DataConnectIntegrationTestBase ) executeResponseFlow.test { - awaitItem().shouldBeGetStringByKeyQueryResponse(name = name1) + awaitItem().shouldBeGetStringByKeyQueryResponse(expectedName = name1) connector.updateString(key, name = name2) - awaitItem().shouldBeGetStringByKeyQueryResponse(name = name2) + awaitItem().shouldBeGetStringByKeyQueryResponse(expectedName = name2) connector.deleteString(key) - awaitItem().shouldBeGetStringByKeyQueryResponse(name = null) + awaitItem().shouldBeGetStringByKeyQueryResponse(expectedName = null) } } @@ -124,20 +126,20 @@ private fun Struct.decodeAsGetStringByKeyQueryData(): GetStringByKeyQuery.Data = decodeFromStruct(this, serializer(), null) private fun ExecuteResponse.shouldBeGetStringByKeyQueryResponse( - name: String?, - errors: List = emptyList(), + expectedName: String?, + expectedErrors: List = emptyList(), ) { assertSoftly { - withClue("data") { data.shouldBeGetStringByKeyQueryData(name) } - withClue("errors") { errors shouldContainExactlyInAnyOrder errors } + withClue("data") { data.shouldBeGetStringByKeyQueryData(expectedName) } + withClue("errors") { errors shouldContainExactlyInAnyOrder expectedErrors } } } -private fun Struct.shouldBeGetStringByKeyQueryData(name: String?) { +private fun Struct.shouldBeGetStringByKeyQueryData(expectedName: String?) { val data: GetStringByKeyQuery.Data = decodeAsGetStringByKeyQueryData() - if (name === null) { + if (expectedName === null) { data.item.shouldBeNull() } else { - data.item.shouldNotBeNull() shouldBe GetStringByKeyQuery.Data.Item(name) + data.item.shouldNotBeNull() shouldBe GetStringByKeyQuery.Data.Item(expectedName) } } From 295a65b124f6e79964b6ff2b3c412a0c553bc51c Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Mon, 11 May 2026 21:01:32 -0400 Subject: [PATCH 10/10] DataConnectGrpcRPCs.kt: in connect() make sure to close outgoingRequests if ConnectorStreamServiceCoroutineStub.connect() throws an exception --- .../com/google/firebase/dataconnect/core/DataConnectGrpcRPCs.kt | 1 + 1 file changed, 1 insertion(+) diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectGrpcRPCs.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectGrpcRPCs.kt index 4531ad9f1ff..6c4e29b2826 100644 --- a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectGrpcRPCs.kt +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectGrpcRPCs.kt @@ -436,6 +436,7 @@ internal class DataConnectGrpcRPCs( val result = lazyStreamingGrpcStub.get().runCatching { connect(outgoingRequestsFlow, metadata) } result.onFailure { + outgoingRequests.close(it) logger.logGrpcFailed( requestId = streamId, kotlinMethodName = kotlinMethodName,