Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,20 @@ internal class FetcherController<Key : Any, Network : Any, Output : Any, Local :
flow { emitAll(realFetcher(key)) }.map {
when (it) {
is FetcherResult.Data -> {
StoreReadResponse.Data(
it.value,
origin = StoreReadResponseOrigin.Fetcher(it.origin),
) as StoreReadResponse<Network>
try {
val network = it.value
val local = converter.fromNetworkToLocal(network)
sourceOfTruth?.write(key, local)
StoreReadResponse.Data(
network,
origin = StoreReadResponseOrigin.Fetcher(it.origin),
) as StoreReadResponse<Network>
} catch (exception: Throwable) {
StoreReadResponse.Error.Exception(
exception,
origin = StoreReadResponseOrigin.Fetcher(it.origin),
)
Comment thread
matt-ramotar marked this conversation as resolved.
}
Comment thread
matt-ramotar marked this conversation as resolved.
Comment thread
matt-ramotar marked this conversation as resolved.
}

is FetcherResult.Error.Message ->
Expand All @@ -106,17 +116,15 @@ internal class FetcherController<Key : Any, Network : Any, Output : Any, Local :
StoreReadResponseOrigin.Fetcher()
emit(StoreReadResponse.NoNewData(origin))
},
/**
* When enabled, downstream collectors are never closed, instead, they are kept active to
* receive values dispatched by fetchers created after them. This makes [FetcherController]
* act like a [SourceOfTruth] in the lack of a [SourceOfTruth] provided by the developer.
*/
// When enabled, downstream collectors are never closed.
// Instead, they are kept active to receive values dispatched by fetchers created after them.
// This makes FetcherController act like a SourceOfTruth in the lack of a SourceOfTruth provided by the developer.
piggybackingDownstream = true,
onEach = { response ->
response.dataOrNull()?.let { network: Network ->
val local: Local = converter.fromNetworkToLocal(network)
sourceOfTruth?.write(key, local)
}
onEach = { _ ->
// Exceptions thrown here propagate to the actor and close downstream channels silently.
// This caused store.stream() and store.get() to hang indefinitely (see #660).
// Consequently, we are intentionally performing no work here.
// Conversion and SOT writes now happen in the source flow above.
},
)
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,13 +233,13 @@ internal class RealStore<Key : Any, Network : Any, Output : Any, Local : Any>(
val responseOrigin = it.value.origin as StoreReadResponseOrigin.Fetcher
requestKeyToFetcherName[request.key] = responseOrigin.name

val fallBackToSourceOfTruth =
it.value is StoreReadResponse.Error && request.fallBackToSourceOfTruth

if (it.value is StoreReadResponse.Data || it.value is StoreReadResponse.NoNewData || fallBackToSourceOfTruth) {
// Unlocking disk only if network sent data or reported no new data
if (it.value is StoreReadResponse.Data ||
it.value is StoreReadResponse.NoNewData ||
it.value is StoreReadResponse.Error
) {
// Unlocking disk only if network sent data, reported no new data, or returned an error
// so that fresh data request never receives new fetcher data after
// cached disk data.
// cached disk data, and so that the flow can properly complete on errors.
Comment thread
matt-ramotar marked this conversation as resolved.
// This means that if the user asked for fresh data but the network returned
// no new data we will still unblock disk.
diskLock.complete(Unit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.mobilenativefoundation.store.store5.util.asSourceOfTruth
import kotlin.test.Test
import kotlin.test.assertContains
import kotlin.test.assertEquals
import kotlin.test.assertIs

@FlowPreview
@ExperimentalCoroutinesApi
Expand Down Expand Up @@ -1094,4 +1095,116 @@ class FlowStoreTests {
)

private fun <Key : Any, Output : Any> StoreBuilder<Key, Output>.buildWithTestScope() = scope(testScope).build()

@Test
fun stream_givenConverterThrows_thenEmitsError() =
testScope.runTest {
// Given
val exception = IllegalStateException("Converter failed")
val persister = InMemoryPersister<Int, String>()

val pipeline =
StoreBuilder.from(
fetcher = Fetcher.of { _: Int -> "network" },
sourceOfTruth = persister.asSourceOfTruth(),
converter =
object : Converter<String, String, String> {
override fun fromNetworkToLocal(network: String): String {
throw exception
}

override fun fromOutputToLocal(output: String): String = output
},
).buildWithTestScope()

// When + Then
pipeline.stream(StoreReadRequest.fresh(1)).test {
assertEquals(
Loading(
origin = StoreReadResponseOrigin.Fetcher(),
),
awaitItem(),
)

val errorResponse = awaitItem()
assertIs<StoreReadResponse.Error.Exception>(errorResponse)
assertEquals(exception.message, errorResponse.error.message)
}
}

@Test
fun stream_givenNamedFetcherAndConverterThrows_thenErrorContainsFetcherName() =
testScope.runTest {
// Given
val fetcherName = "TestFetcher"
val exception = IllegalStateException("Converter failed")
val persister = InMemoryPersister<Int, String>()

val pipeline =
StoreBuilder.from(
fetcher = Fetcher.of(name = fetcherName) { _: Int -> "network" },
sourceOfTruth = persister.asSourceOfTruth(),
converter =
object : Converter<String, String, String> {
override fun fromNetworkToLocal(network: String): String {
throw exception
}

override fun fromOutputToLocal(output: String): String = output
},
).buildWithTestScope()

// When + Then
pipeline.stream(StoreReadRequest.fresh(1)).test {
assertEquals(
Loading(
origin = StoreReadResponseOrigin.Fetcher(),
),
awaitItem(),
)

val errorResponse = awaitItem()
assertIs<StoreReadResponse.Error.Exception>(errorResponse)
val origin = errorResponse.origin
assertIs<StoreReadResponseOrigin.Fetcher>(origin)
assertEquals(fetcherName, origin.name)
}
}

@Test
fun stream_givenConverterThrowsWithFreshRequest_thenFlowCompletes() =
testScope.runTest {
// Given: fresh() request skips disk cache and fallBackToSourceOfTruth defaults to false
val exception = IllegalStateException("Converter failed")
val persister = InMemoryPersister<Int, String>()

val pipeline =
StoreBuilder.from(
fetcher = Fetcher.of { _: Int -> "network" },
sourceOfTruth = persister.asSourceOfTruth(),
converter =
object : Converter<String, String, String> {
override fun fromNetworkToLocal(network: String): String {
throw exception
}

override fun fromOutputToLocal(output: String): String = output
},
).buildWithTestScope()

// When + Then: Flow should complete, not hang indefinitely
pipeline.stream(StoreReadRequest.fresh(1)).test {
assertEquals(
Loading(
origin = StoreReadResponseOrigin.Fetcher(),
),
awaitItem(),
)

val errorResponse = awaitItem()
assertIs<StoreReadResponse.Error.Exception>(errorResponse)
assertEquals(exception.message, errorResponse.error.message)
cancelAndIgnoreRemainingEvents()
}
}
}
Loading