From f430594b4897937bf1c26f59a20ceee2ce7f8f6d Mon Sep 17 00:00:00 2001 From: evgeny Date: Wed, 11 Mar 2026 09:14:10 +0000 Subject: [PATCH 1/2] [AIT-208] feat: partial sync implementation - rename `syncObjectsDataPool` to `syncObjectsPool` - Standardized naming across `ObjectsManager` to improve code clarity. - Updated references in implementation and tests. --- .../lib/objects/DefaultRealtimeObjects.kt | 4 +- .../io/ably/lib/objects/ObjectsManager.kt | 67 ++++++--- .../lib/objects/type/BaseRealtimeObject.kt | 2 +- .../io/ably/lib/objects/unit/TestHelpers.kt | 4 +- .../objects/DefaultRealtimeObjectsTest.kt | 18 +-- .../unit/objects/ObjectsManagerTest.kt | 135 ++++++++++++++++++ 6 files changed, 199 insertions(+), 31 deletions(-) diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt index 3d3b85535..44bd7f1f4 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt @@ -315,7 +315,7 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal // if no HAS_OBJECTS flag received on attach, we can end sync sequence immediately and treat it as no objects on a channel. // reset the objects pool to its initial state, and emit update events so subscribers to root object get notified about changes. objectsPool.resetToInitialPool(true) // RTO4b1, RTO4b2 - objectsManager.clearSyncObjectsDataPool() // RTO4b3 + objectsManager.clearSyncObjectsPool() // RTO4b3 // RTO4b5 removed — buffer already cleared by RTO4d above // defer the state change event until the next tick if we started a new sequence just now due to being in initialized state. // this allows any event listeners to process the start of the new sequence event that was emitted earlier during this event loop. @@ -340,7 +340,7 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal if (state != ChannelState.suspended) { // do not emit data update events as the actual current state of Objects data is unknown when we're in these channel states objectsPool.clearObjectsData(false) - objectsManager.clearSyncObjectsDataPool() + objectsManager.clearSyncObjectsPool() } } else -> { diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt index 93051b427..bc28e3173 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt @@ -17,7 +17,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject /** * @spec RTO5 - Sync objects data pool for collecting sync messages */ - private val syncObjectsDataPool = mutableMapOf() + private val syncObjectsPool = mutableMapOf() private var currentSyncId: String? = null /** * @spec RTO7 - Buffered object operations during sync @@ -59,7 +59,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject } // RTO5a3 - continue current sync sequence - applyObjectSyncMessages(objectMessages) // RTO5b + applyObjectSyncMessages(objectMessages) // RTO5f // RTO5a4 - if this is the last (or only) message in a sequence of sync updates, end the sync if (syncTracker.hasSyncEnded()) { @@ -77,7 +77,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject internal fun startNewSync(syncId: String?) { Log.v(tag, "Starting new sync sequence: syncId=$syncId") - syncObjectsDataPool.clear() // RTO5a2a + syncObjectsPool.clear() // RTO5a2a currentSyncId = syncId syncCompletionWaiter = CompletableDeferred() stateChange(ObjectsState.Syncing) @@ -93,7 +93,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject applySync() // RTO5c1/2/7 applyObjectMessages(bufferedObjectOperations, ObjectsOperationSource.CHANNEL) // RTO5c6 bufferedObjectOperations.clear() // RTO5c5 - syncObjectsDataPool.clear() // RTO5c4 + syncObjectsPool.clear() // RTO5c4 currentSyncId = null // RTO5c3 realtimeObjects.appliedOnAckSerials.clear() // RTO5c9 stateChange(ObjectsState.Synced) // RTO5c8 @@ -127,8 +127,8 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject * Clears the sync objects data pool. * Used by DefaultRealtimeObjects.handleStateChange. */ - internal fun clearSyncObjectsDataPool() { - syncObjectsDataPool.clear() + internal fun clearSyncObjectsPool() { + syncObjectsPool.clear() } /** @@ -145,7 +145,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject * @spec RTO5c - Processes sync data and updates objects pool */ private fun applySync() { - if (syncObjectsDataPool.isEmpty()) { + if (syncObjectsPool.isEmpty()) { return } @@ -154,8 +154,8 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject val existingObjectUpdates = mutableListOf>() // RTO5c1 - for ((objectId, objectMessage) in syncObjectsDataPool) { - val objectState = objectMessage.objectState as ObjectState // we have non-null objectState here due to RTO5b + for ((objectId, objectMessage) in syncObjectsPool) { + val objectState = objectMessage.objectState as ObjectState // we have non-null objectState here due to RTO5f receivedObjectIds.add(objectId) val existingObject = realtimeObjects.objectsPool.get(objectId) @@ -232,9 +232,9 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject } /** - * Applies sync messages to sync data pool. + * Applies sync messages to sync data pool, merging partial sync messages for the same objectId. * - * @spec RTO5b - Collects object states during sync sequence + * @spec RTO5f - Collects and merges object states during sync sequence */ private fun applyObjectSyncMessages(objectMessages: List) { for (objectMessage in objectMessages) { @@ -244,11 +244,44 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject } val objectState: ObjectState = objectMessage.objectState - if (objectState.counter != null || objectState.map != null) { - syncObjectsDataPool[objectState.objectId] = objectMessage - } else { - // RTO5c1b1c - object state must contain either counter or map data - Log.w(tag, "Object state received without counter or map data, skipping message: ${objectMessage.id}") + val objectId = objectState.objectId + val existingEntry = syncObjectsPool[objectId] + + if (existingEntry == null) { + // RTO5f1 - objectId not in pool, store directly + if (objectState.counter != null || objectState.map != null) { + syncObjectsPool[objectId] = objectMessage + } else { + // RTO5c1b1c - object state must contain either counter or map data + Log.w(tag, "Object state received without counter or map data, skipping message: ${objectMessage.id}") + } + continue + } + + // RTO5f2 - objectId already in pool; this is a partial sync message, merge based on type + when { + objectState.map != null -> { + // RTO5f2a - map object: merge entries + if (objectState.tombstone) { + // RTO5f2a1 - tombstone: replace pool entry entirely + syncObjectsPool[objectId] = objectMessage + } else { + // RTO5f2a2 - merge map entries; server guarantees no duplicate keys across partials + val existingState = existingEntry.objectState!! // non-null for existing entry + val mergedEntries = existingState.map?.entries.orEmpty() + objectState.map.entries.orEmpty() + val mergedMap = (existingState.map ?: ObjectsMap()).copy(entries = mergedEntries) + val mergedState = existingState.copy(map = mergedMap) + syncObjectsPool[objectId] = existingEntry.copy(objectState = mergedState) + } + } + objectState.counter != null -> { + // RTO5f2b - counter objects must never be split across messages + Log.e(tag, "Received partial sync message for a counter object, skipping: ${objectMessage.id}") + } + else -> { + // RTO5f2c - unsupported type, log warning and skip + Log.w(tag, "Received partial sync message for an unsupported object type, skipping: ${objectMessage.id}") + } } } } @@ -284,7 +317,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject internal fun dispose() { syncCompletionWaiter?.cancel() - syncObjectsDataPool.clear() + syncObjectsPool.clear() bufferedObjectOperations.clear() disposeObjectsStateListeners() } diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt index 91bfeb011..c6f602b5c 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt @@ -48,7 +48,7 @@ internal abstract class BaseRealtimeObject( * @spec RTLM6/RTLC6 - Overrides ObjectMessage with object data state from sync to LiveMap/LiveCounter */ internal fun applyObjectSync(objectMessage: ObjectMessage): ObjectUpdate { - val objectState = objectMessage.objectState as ObjectState // we have non-null objectState here due to RTO5b + val objectState = objectMessage.objectState as ObjectState // we have non-null objectState here due to RTO5f validate(objectState) // object's site serials are still updated even if it is tombstoned, so always use the site serials received from the operation. // should default to empty map if site serials do not exist on the object state, so that any future operation may be applied to this object. diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt index 3d10f22a9..56e4b2d20 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt @@ -68,8 +68,8 @@ internal val BaseRealtimeObject.TombstonedAt: Long? * START - DefaultRealtimeObjects dep mocks * ====================================== */ -internal val ObjectsManager.SyncObjectsDataPool: Map - get() = this.getPrivateField("syncObjectsDataPool") +internal val ObjectsManager.SyncObjectsPool: Map + get() = this.getPrivateField("syncObjectsPool") internal val ObjectsManager.BufferedObjectOperations: List get() = this.getPrivateField("bufferedObjectOperations") diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultRealtimeObjectsTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultRealtimeObjectsTest.kt index 7fdba7c8a..0a0ae9907 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultRealtimeObjectsTest.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultRealtimeObjectsTest.kt @@ -13,7 +13,7 @@ import io.ably.lib.objects.type.livemap.DefaultLiveMap import io.ably.lib.objects.type.livemap.LiveMapEntry import io.ably.lib.objects.unit.BufferedObjectOperations import io.ably.lib.objects.unit.ObjectsManager -import io.ably.lib.objects.unit.SyncObjectsDataPool +import io.ably.lib.objects.unit.SyncObjectsPool import io.ably.lib.objects.unit.getMockObjectsAdapter import io.ably.lib.objects.unit.getDefaultRealtimeObjectsWithMockedDeps import io.ably.lib.objects.unit.getMockRealtimeChannel @@ -83,7 +83,7 @@ class DefaultRealtimeObjectsTest { defaultRealtimeObjects.ObjectsManager.endSync() } - assertEquals(0, defaultRealtimeObjects.ObjectsManager.SyncObjectsDataPool.size) // RTO4b3 + assertEquals(0, defaultRealtimeObjects.ObjectsManager.SyncObjectsPool.size) // RTO4b3 assertEquals(0, defaultRealtimeObjects.ObjectsManager.BufferedObjectOperations.size) // RTO4d assertEquals(1, defaultRealtimeObjects.objectsPool.size()) // RTO4b1 - Only root remains assertEquals(rootObject, defaultRealtimeObjects.objectsPool.get(ROOT_OBJECT_ID)) // points to previously created root object @@ -246,16 +246,16 @@ class DefaultRealtimeObjectsTest { failCalled.await() verify(exactly = 0) { defaultRealtimeObjects.objectsPool.clearObjectsData(any()) } - verify(exactly = 0) { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsDataPool() } + verify(exactly = 0) { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsPool() } } @Test fun `(RTO4) handleStateChange(DETACHED) clears objects data and sync pool`() = runTest { val defaultRealtimeObjects = getDefaultRealtimeObjectsWithMockedDeps() - // Use clearSyncObjectsDataPool (the last operation in the coroutine) as the completion signal + // Use clearSyncObjectsPool (the last operation in the coroutine) as the completion signal val syncPoolCleared = CompletableDeferred() - every { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsDataPool() } answers { + every { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsPool() } answers { callOriginal() syncPoolCleared.complete(Unit) } @@ -265,7 +265,7 @@ class DefaultRealtimeObjectsTest { syncPoolCleared.await() verify(exactly = 1) { defaultRealtimeObjects.objectsPool.clearObjectsData(false) } - verify(exactly = 1) { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsDataPool() } + verify(exactly = 1) { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsPool() } } @Test @@ -281,7 +281,7 @@ class DefaultRealtimeObjectsTest { operation = ObjectOperation( action = ObjectOperationAction.CounterInc, objectId = "counter:test@1", - counterOp = ObjectsCounterOp(amount = 5.0) + counterInc = CounterInc(number = 5.0) ) ) ) @@ -313,7 +313,7 @@ class DefaultRealtimeObjectsTest { operation = ObjectOperation( action = ObjectOperationAction.CounterInc, objectId = "counter:test@1", - counterOp = ObjectsCounterOp(amount = 5.0) + counterInc = CounterInc(number = 5.0) ) ) ) @@ -370,7 +370,7 @@ class DefaultRealtimeObjectsTest { operation = ObjectOperation( action = ObjectOperationAction.CounterInc, objectId = "counter:test@1", - counterOp = ObjectsCounterOp(amount = 3.0) + counterInc = CounterInc(number = 3.0) ), serial = "serial-op-1", siteCode = "site1" diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt index 1890e584a..e92ca12b8 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt @@ -12,6 +12,7 @@ import io.ably.lib.objects.unit.* import io.ably.lib.objects.unit.getDefaultRealtimeObjectsWithMockedDeps import io.ably.lib.types.AblyException import io.ably.lib.types.ErrorInfo +import io.ably.lib.util.Log import io.mockk.* import kotlinx.coroutines.launch import kotlinx.coroutines.test.runTest @@ -655,6 +656,140 @@ class ObjectsManagerTest { "appliedOnAckSerials should be empty when serials length mismatches") } + @Test + fun `(RTO5f2a2) partial sync map entries are merged across two messages with the same objectId`() { + val defaultRealtimeObjects = makeRealtimeObjects() + val objectsManager = defaultRealtimeObjects.ObjectsManager + + val msg1 = ObjectMessage( + id = "msg1", + objectState = ObjectState( + objectId = "map:test@1", + tombstone = false, + siteTimeserials = mapOf("site1" to "serial1"), + map = ObjectsMap(semantics = ObjectsMapSemantics.LWW, entries = mapOf("key1" to ObjectsMapEntry(data = ObjectData(string = "value1")))) + ) + ) + val msg2 = ObjectMessage( + id = "msg2", + objectState = ObjectState( + objectId = "map:test@1", + tombstone = false, + siteTimeserials = mapOf("site1" to "serial2"), + map = ObjectsMap(semantics = ObjectsMapSemantics.LWW, entries = mapOf("key2" to ObjectsMapEntry(data = ObjectData(string = "value2")))) + ) + ) + + objectsManager.handleObjectSyncMessages(listOf(msg1, msg2), "sync-1:") + + val liveMap = defaultRealtimeObjects.objectsPool.get("map:test@1") as DefaultLiveMap + assertNotNull(liveMap.data["key1"], "key1 should be present after merge") + assertNotNull(liveMap.data["key2"], "key2 should be present after merge") + assertEquals("value1", liveMap.data["key1"]?.data?.string) + assertEquals("value2", liveMap.data["key2"]?.data?.string) + } + + @Test + fun `(RTO5f2a1) tombstone on second partial message replaces pool entry entirely`() { + val defaultRealtimeObjects = makeRealtimeObjects() + val objectsManager = defaultRealtimeObjects.ObjectsManager + + val msg1 = ObjectMessage( + id = "msg1", + objectState = ObjectState( + objectId = "map:test@1", + tombstone = false, + siteTimeserials = mapOf("site1" to "serial1"), + map = ObjectsMap(semantics = ObjectsMapSemantics.LWW, entries = mapOf("key1" to ObjectsMapEntry(data = ObjectData(string = "value1")))) + ) + ) + val msg2 = ObjectMessage( + id = "msg2", + objectState = ObjectState( + objectId = "map:test@1", + tombstone = true, + siteTimeserials = mapOf("site1" to "serial2"), + map = ObjectsMap(semantics = ObjectsMapSemantics.LWW, entries = emptyMap()) + ) + ) + + objectsManager.handleObjectSyncMessages(listOf(msg1, msg2), "sync-1:") + + val liveMap = defaultRealtimeObjects.objectsPool.get("map:test@1") as DefaultLiveMap + // After tombstone replaces the entry, the map should have no key1 + assertNull(liveMap.data["key1"], "key1 should not be present after tombstone replaced the pool entry") + } + + @Test + fun `(RTO5f2b) partial sync counter message logs error and is skipped`() { + val defaultRealtimeObjects = makeRealtimeObjects() + val objectsManager = defaultRealtimeObjects.ObjectsManager + + mockkStatic(Log::class) + every { Log.e(any(), any()) } returns 0 + + val msg1 = ObjectMessage( + id = "msg1", + objectState = ObjectState( + objectId = "counter:test@1", + tombstone = false, + siteTimeserials = mapOf("site1" to "serial1"), + counter = ObjectsCounter(count = 10.0) + ) + ) + val msg2 = ObjectMessage( + id = "msg2", + objectState = ObjectState( + objectId = "counter:test@1", + tombstone = false, + siteTimeserials = mapOf("site1" to "serial2"), + counter = ObjectsCounter(count = 5.0) + ) + ) + + objectsManager.handleObjectSyncMessages(listOf(msg1, msg2), "sync-1:") + + // Pool should contain only msg1 (msg2 skipped) + val counter = defaultRealtimeObjects.objectsPool.get("counter:test@1") as DefaultLiveCounter + assertEquals(10.0, counter.data.get(), "counter value should be from msg1 only (msg2 skipped)") + verify { Log.e(any(), match { it.contains("partial sync message for a counter") }) } + } + + @Test + fun `(RTO5f2c) partial sync message with unsupported type logs warning and is skipped`() { + val defaultRealtimeObjects = makeRealtimeObjects() + val objectsManager = defaultRealtimeObjects.ObjectsManager + + mockkStatic(Log::class) + every { Log.w(any(), any()) } returns 0 + + val msg1 = ObjectMessage( + id = "msg1", + objectState = ObjectState( + objectId = "map:test@1", + tombstone = false, + siteTimeserials = mapOf("site1" to "serial1"), + map = ObjectsMap(semantics = ObjectsMapSemantics.LWW, entries = mapOf("key1" to ObjectsMapEntry(data = ObjectData(string = "value1")))) + ) + ) + // msg2 has neither map nor counter — hits the else branch (RTO5f2c) + val msg2 = ObjectMessage( + id = "msg2", + objectState = ObjectState( + objectId = "map:test@1", + tombstone = false, + siteTimeserials = mapOf("site1" to "serial2"), + ) + ) + + objectsManager.handleObjectSyncMessages(listOf(msg1, msg2), "sync-1:") + + // Pool entry should still be msg1 (msg2 was skipped) + val liveMap = defaultRealtimeObjects.objectsPool.get("map:test@1") as DefaultLiveMap + assertNotNull(liveMap.data["key1"], "key1 should still be present (msg2 skipped)") + verify { Log.w(any(), match { it.contains("unsupported object type") }) } + } + private fun mockZeroValuedObjects() { mockkObject(DefaultLiveMap.Companion) every { From f8555a9e7a28e5d53973372dda4f8fdbc911f781 Mon Sep 17 00:00:00 2001 From: evgeny Date: Thu, 12 Mar 2026 18:31:03 +0000 Subject: [PATCH 2/2] [AIT-208] chore: handle unsupported object types gracefully during sync - Updated `createObjectFromState` to skip unsupported object types instead of throwing errors. - Improved logging to warn about unsupported object types. - Enhanced test coverage for handling unsupported object sync scenarios. - Merged partial sync map entries seamlessly across multiple protocol messages. --- .../io/ably/lib/objects/ObjectsManager.kt | 16 ++- .../unit/objects/ObjectsManagerTest.kt | 129 ++++++++++++++++++ 2 files changed, 139 insertions(+), 6 deletions(-) diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt index bc28e3173..9c669f033 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt @@ -15,7 +15,7 @@ import kotlinx.coroutines.CompletableDeferred internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObjects): ObjectsStateCoordinator() { private val tag = "ObjectsManager" /** - * @spec RTO5 - Sync objects data pool for collecting sync messages + * @spec RTO5 - Sync objects pool for collecting sync messages */ private val syncObjectsPool = mutableMapOf() private var currentSyncId: String? = null @@ -93,7 +93,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject applySync() // RTO5c1/2/7 applyObjectMessages(bufferedObjectOperations, ObjectsOperationSource.CHANNEL) // RTO5c6 bufferedObjectOperations.clear() // RTO5c5 - syncObjectsPool.clear() // RTO5c4 + syncObjectsPool.clear() // RTO5c4 currentSyncId = null // RTO5c3 realtimeObjects.appliedOnAckSerials.clear() // RTO5c9 stateChange(ObjectsState.Synced) // RTO5c8 @@ -124,7 +124,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject } /** - * Clears the sync objects data pool. + * Clears the sync objects pool. * Used by DefaultRealtimeObjects.handleStateChange. */ internal fun clearSyncObjectsPool() { @@ -166,7 +166,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject existingObjectUpdates.add(Pair(existingObject, update)) } else { // RTO5c1b // RTO5c1b1, RTO5c1b1a, RTO5c1b1b - Create new object and add it to the pool - val newObject = createObjectFromState(objectState) + val newObject = createObjectFromState(objectState) ?: continue // RTO5c1b1c - skip unsupported newObject.applyObjectSync(objectMessage) realtimeObjects.objectsPool.set(objectId, newObject) } @@ -291,11 +291,15 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject * * @spec RTO5c1b - Creates objects from object state based on type */ - private fun createObjectFromState(objectState: ObjectState): BaseRealtimeObject { + private fun createObjectFromState(objectState: ObjectState): BaseRealtimeObject? { return when { objectState.counter != null -> DefaultLiveCounter.zeroValue(objectState.objectId, realtimeObjects) // RTO5c1b1a objectState.map != null -> DefaultLiveMap.zeroValue(objectState.objectId, realtimeObjects) // RTO5c1b1b - else -> throw clientError("Object state must contain either counter or map data") // RTO5c1b1c + else -> { + // RTO5c1b1c - unsupported object type, skip gracefully + Log.w(tag, "Received unsupported object state during OBJECT_SYNC (no counter or map), skipping objectId: ${objectState.objectId}") + null + } } } diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt index e92ca12b8..2a9ac5b13 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt @@ -689,6 +689,135 @@ class ObjectsManagerTest { assertEquals("value2", liveMap.data["key2"]?.data?.string) } + @Test + fun `(RTO5f2a2) partial sync map entries merged across separate protocol messages`() { + val defaultRealtimeObjects = makeRealtimeObjects() + val objectsManager = defaultRealtimeObjects.ObjectsManager + + val objectId = "map:test@1" + val siteTimeserials = mapOf("site1" to "serial1") + + // Protocol message 1: first partial (has cursor → not ending) + objectsManager.handleObjectSyncMessages( + listOf( + ObjectMessage( + id = "msg1", + objectState = ObjectState( + objectId = objectId, + tombstone = false, + siteTimeserials = siteTimeserials, + map = ObjectsMap( + semantics = ObjectsMapSemantics.LWW, + entries = mapOf("key1" to ObjectsMapEntry(data = ObjectData(string = "value1"))) + ) + ) + ) + ), + "sync-1:cursor1" + ) + + // Protocol message 2: second partial for same objectId (has cursor → not ending) + objectsManager.handleObjectSyncMessages( + listOf( + ObjectMessage( + id = "msg2", + objectState = ObjectState( + objectId = objectId, + tombstone = false, + siteTimeserials = siteTimeserials, + map = ObjectsMap( + semantics = ObjectsMapSemantics.LWW, + entries = mapOf("key2" to ObjectsMapEntry(data = ObjectData(string = "value2"))) + ) + ) + ) + ), + "sync-1:cursor2" + ) + + // Protocol message 3: third partial for same objectId (empty cursor → ends sync) + objectsManager.handleObjectSyncMessages( + listOf( + ObjectMessage( + id = "msg3", + objectState = ObjectState( + objectId = objectId, + tombstone = false, + siteTimeserials = siteTimeserials, + map = ObjectsMap( + semantics = ObjectsMapSemantics.LWW, + entries = mapOf("key3" to ObjectsMapEntry(data = ObjectData(string = "value3"))) + ) + ) + ) + ), + "sync-1:" // empty cursor → sync ends, applySync() runs + ) + + // Verify all 3 keys from 3 separate protocol messages are merged into the live map + val liveMap = defaultRealtimeObjects.objectsPool.get(objectId) as DefaultLiveMap + assertNotNull(liveMap.data["key1"], "key1 from first protocol message should be present") + assertNotNull(liveMap.data["key2"], "key2 from second protocol message should be present") + assertNotNull(liveMap.data["key3"], "key3 from third protocol message should be present") + assertEquals("value1", liveMap.data["key1"]?.data?.string) + assertEquals("value2", liveMap.data["key2"]?.data?.string) + assertEquals("value3", liveMap.data["key3"]?.data?.string) + } + + @Test + fun `(RTO5c1b1c) unsupported object type during sync is skipped without breaking other objects`() { + val defaultRealtimeObjects = makeRealtimeObjects() + val objectsManager = defaultRealtimeObjects.ObjectsManager + + mockkStatic(Log::class) + every { Log.w(any(), any()) } returns 0 + + // msg1: valid map object + val msg1 = ObjectMessage( + id = "msg1", + objectState = ObjectState( + objectId = "map:test@1", + tombstone = false, + siteTimeserials = mapOf("site1" to "serial1"), + map = ObjectsMap( + semantics = ObjectsMapSemantics.LWW, + entries = mapOf("key1" to ObjectsMapEntry(data = ObjectData(string = "value1"))) + ) + ) + ) + // msg2: unsupported type (neither counter nor map) + val msg2 = ObjectMessage( + id = "msg2", + objectState = ObjectState( + objectId = "unknown:test@2", + tombstone = false, + siteTimeserials = mapOf("site1" to "serial1"), + ) + ) + // msg3: valid counter object + val msg3 = ObjectMessage( + id = "msg3", + objectState = ObjectState( + objectId = "counter:test@3", + tombstone = false, + siteTimeserials = mapOf("site1" to "serial1"), + counter = ObjectsCounter(count = 42.0) + ) + ) + + // Send all three in one sync — msg2 should be skipped, msg1 and msg3 should be applied + objectsManager.handleObjectSyncMessages(listOf(msg1, msg2, msg3), "sync-1:") + + val liveMap = defaultRealtimeObjects.objectsPool.get("map:test@1") as DefaultLiveMap + assertNotNull(liveMap.data["key1"], "valid map object should be created despite unsupported object in same sync") + + val counter = defaultRealtimeObjects.objectsPool.get("counter:test@3") as DefaultLiveCounter + assertEquals(42.0, counter.data.get(), "valid counter should be created despite unsupported object in same sync") + + // Unsupported object should NOT be in the pool + assertNull(defaultRealtimeObjects.objectsPool.get("unknown:test@2"), "unsupported object type should not be in pool") + } + @Test fun `(RTO5f2a1) tombstone on second partial message replaces pool entry entirely`() { val defaultRealtimeObjects = makeRealtimeObjects()