Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
// if no HAS_OBJECTS flag received on attach, we can end sync sequence immediately and treat it as no objects on a channel.
// reset the objects pool to its initial state, and emit update events so subscribers to root object get notified about changes.
objectsPool.resetToInitialPool(true) // RTO4b1, RTO4b2
objectsManager.clearSyncObjectsDataPool() // RTO4b3
objectsManager.clearSyncObjectsPool() // RTO4b3
// RTO4b5 removed — buffer already cleared by RTO4d above
// defer the state change event until the next tick if we started a new sequence just now due to being in initialized state.
// this allows any event listeners to process the start of the new sequence event that was emitted earlier during this event loop.
Expand All @@ -340,7 +340,7 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
if (state != ChannelState.suspended) {
// do not emit data update events as the actual current state of Objects data is unknown when we're in these channel states
objectsPool.clearObjectsData(false)
objectsManager.clearSyncObjectsDataPool()
objectsManager.clearSyncObjectsPool()
}
}
else -> {
Expand Down
67 changes: 50 additions & 17 deletions liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
/**
* @spec RTO5 - Sync objects data pool for collecting sync messages
*/
private val syncObjectsDataPool = mutableMapOf<String, ObjectMessage>()
private val syncObjectsPool = mutableMapOf<String, ObjectMessage>()
private var currentSyncId: String? = null
/**
* @spec RTO7 - Buffered object operations during sync
Expand Down Expand Up @@ -59,7 +59,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
}

// RTO5a3 - continue current sync sequence
applyObjectSyncMessages(objectMessages) // RTO5b
applyObjectSyncMessages(objectMessages) // RTO5f

// RTO5a4 - if this is the last (or only) message in a sequence of sync updates, end the sync
if (syncTracker.hasSyncEnded()) {
Expand All @@ -77,7 +77,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
internal fun startNewSync(syncId: String?) {
Log.v(tag, "Starting new sync sequence: syncId=$syncId")

syncObjectsDataPool.clear() // RTO5a2a
syncObjectsPool.clear() // RTO5a2a
currentSyncId = syncId
syncCompletionWaiter = CompletableDeferred()
stateChange(ObjectsState.Syncing)
Expand All @@ -93,7 +93,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
applySync() // RTO5c1/2/7
applyObjectMessages(bufferedObjectOperations, ObjectsOperationSource.CHANNEL) // RTO5c6
bufferedObjectOperations.clear() // RTO5c5
syncObjectsDataPool.clear() // RTO5c4
syncObjectsPool.clear() // RTO5c4
currentSyncId = null // RTO5c3
realtimeObjects.appliedOnAckSerials.clear() // RTO5c9
stateChange(ObjectsState.Synced) // RTO5c8
Expand Down Expand Up @@ -127,8 +127,8 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
* Clears the sync objects data pool.
* Used by DefaultRealtimeObjects.handleStateChange.
*/
internal fun clearSyncObjectsDataPool() {
syncObjectsDataPool.clear()
internal fun clearSyncObjectsPool() {
syncObjectsPool.clear()
}

/**
Expand All @@ -145,7 +145,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
* @spec RTO5c - Processes sync data and updates objects pool
*/
private fun applySync() {
if (syncObjectsDataPool.isEmpty()) {
if (syncObjectsPool.isEmpty()) {
return
}

Expand All @@ -154,8 +154,8 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
val existingObjectUpdates = mutableListOf<Pair<BaseRealtimeObject, ObjectUpdate>>()

// RTO5c1
for ((objectId, objectMessage) in syncObjectsDataPool) {
val objectState = objectMessage.objectState as ObjectState // we have non-null objectState here due to RTO5b
for ((objectId, objectMessage) in syncObjectsPool) {
val objectState = objectMessage.objectState as ObjectState // we have non-null objectState here due to RTO5f
receivedObjectIds.add(objectId)
val existingObject = realtimeObjects.objectsPool.get(objectId)

Expand Down Expand Up @@ -232,9 +232,9 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
}

/**
* Applies sync messages to sync data pool.
* Applies sync messages to sync data pool, merging partial sync messages for the same objectId.
*
* @spec RTO5b - Collects object states during sync sequence
* @spec RTO5f - Collects and merges object states during sync sequence
*/
private fun applyObjectSyncMessages(objectMessages: List<ObjectMessage>) {
for (objectMessage in objectMessages) {
Expand All @@ -244,11 +244,44 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
}

val objectState: ObjectState = objectMessage.objectState
if (objectState.counter != null || objectState.map != null) {
syncObjectsDataPool[objectState.objectId] = objectMessage
} else {
// RTO5c1b1c - object state must contain either counter or map data
Log.w(tag, "Object state received without counter or map data, skipping message: ${objectMessage.id}")
val objectId = objectState.objectId
val existingEntry = syncObjectsPool[objectId]

if (existingEntry == null) {
// RTO5f1 - objectId not in pool, store directly
if (objectState.counter != null || objectState.map != null) {
syncObjectsPool[objectId] = objectMessage
} else {
// RTO5c1b1c - object state must contain either counter or map data
Log.w(tag, "Object state received without counter or map data, skipping message: ${objectMessage.id}")
}
continue
}

// RTO5f2 - objectId already in pool; this is a partial sync message, merge based on type
when {
objectState.map != null -> {
// RTO5f2a - map object: merge entries
if (objectState.tombstone) {
// RTO5f2a1 - tombstone: replace pool entry entirely
syncObjectsPool[objectId] = objectMessage
} else {
// RTO5f2a2 - merge map entries; server guarantees no duplicate keys across partials
val existingState = existingEntry.objectState!! // non-null for existing entry
val mergedEntries = existingState.map?.entries.orEmpty() + objectState.map.entries.orEmpty()
val mergedMap = (existingState.map ?: ObjectsMap()).copy(entries = mergedEntries)
val mergedState = existingState.copy(map = mergedMap)
syncObjectsPool[objectId] = existingEntry.copy(objectState = mergedState)
}
}
objectState.counter != null -> {
// RTO5f2b - counter objects must never be split across messages
Log.e(tag, "Received partial sync message for a counter object, skipping: ${objectMessage.id}")
}
else -> {
// RTO5f2c - unsupported type, log warning and skip
Log.w(tag, "Received partial sync message for an unsupported object type, skipping: ${objectMessage.id}")
}
}
}
}
Expand Down Expand Up @@ -284,7 +317,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject

internal fun dispose() {
syncCompletionWaiter?.cancel()
syncObjectsDataPool.clear()
syncObjectsPool.clear()
bufferedObjectOperations.clear()
disposeObjectsStateListeners()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ internal abstract class BaseRealtimeObject(
* @spec RTLM6/RTLC6 - Overrides ObjectMessage with object data state from sync to LiveMap/LiveCounter
*/
internal fun applyObjectSync(objectMessage: ObjectMessage): ObjectUpdate {
val objectState = objectMessage.objectState as ObjectState // we have non-null objectState here due to RTO5b
val objectState = objectMessage.objectState as ObjectState // we have non-null objectState here due to RTO5f
validate(objectState)
// object's site serials are still updated even if it is tombstoned, so always use the site serials received from the operation.
// should default to empty map if site serials do not exist on the object state, so that any future operation may be applied to this object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ internal val BaseRealtimeObject.TombstonedAt: Long?
* START - DefaultRealtimeObjects dep mocks
* ======================================
*/
internal val ObjectsManager.SyncObjectsDataPool: Map<String, ObjectState>
get() = this.getPrivateField("syncObjectsDataPool")
internal val ObjectsManager.SyncObjectsPool: Map<String, ObjectMessage>
get() = this.getPrivateField("syncObjectsPool")

internal val ObjectsManager.BufferedObjectOperations: List<ObjectMessage>
get() = this.getPrivateField("bufferedObjectOperations")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import io.ably.lib.objects.type.livemap.DefaultLiveMap
import io.ably.lib.objects.type.livemap.LiveMapEntry
import io.ably.lib.objects.unit.BufferedObjectOperations
import io.ably.lib.objects.unit.ObjectsManager
import io.ably.lib.objects.unit.SyncObjectsDataPool
import io.ably.lib.objects.unit.SyncObjectsPool
import io.ably.lib.objects.unit.getMockObjectsAdapter
import io.ably.lib.objects.unit.getDefaultRealtimeObjectsWithMockedDeps
import io.ably.lib.objects.unit.getMockRealtimeChannel
Expand Down Expand Up @@ -83,7 +83,7 @@ class DefaultRealtimeObjectsTest {
defaultRealtimeObjects.ObjectsManager.endSync()
}

assertEquals(0, defaultRealtimeObjects.ObjectsManager.SyncObjectsDataPool.size) // RTO4b3
assertEquals(0, defaultRealtimeObjects.ObjectsManager.SyncObjectsPool.size) // RTO4b3
assertEquals(0, defaultRealtimeObjects.ObjectsManager.BufferedObjectOperations.size) // RTO4d
assertEquals(1, defaultRealtimeObjects.objectsPool.size()) // RTO4b1 - Only root remains
assertEquals(rootObject, defaultRealtimeObjects.objectsPool.get(ROOT_OBJECT_ID)) // points to previously created root object
Expand Down Expand Up @@ -246,16 +246,16 @@ class DefaultRealtimeObjectsTest {
failCalled.await()

verify(exactly = 0) { defaultRealtimeObjects.objectsPool.clearObjectsData(any()) }
verify(exactly = 0) { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsDataPool() }
verify(exactly = 0) { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsPool() }
}

@Test
fun `(RTO4) handleStateChange(DETACHED) clears objects data and sync pool`() = runTest {
val defaultRealtimeObjects = getDefaultRealtimeObjectsWithMockedDeps()

// Use clearSyncObjectsDataPool (the last operation in the coroutine) as the completion signal
// Use clearSyncObjectsPool (the last operation in the coroutine) as the completion signal
val syncPoolCleared = CompletableDeferred<Unit>()
every { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsDataPool() } answers {
every { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsPool() } answers {
callOriginal()
syncPoolCleared.complete(Unit)
}
Expand All @@ -265,7 +265,7 @@ class DefaultRealtimeObjectsTest {
syncPoolCleared.await()

verify(exactly = 1) { defaultRealtimeObjects.objectsPool.clearObjectsData(false) }
verify(exactly = 1) { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsDataPool() }
verify(exactly = 1) { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsPool() }
}

@Test
Expand All @@ -281,7 +281,7 @@ class DefaultRealtimeObjectsTest {
operation = ObjectOperation(
action = ObjectOperationAction.CounterInc,
objectId = "counter:test@1",
counterOp = ObjectsCounterOp(amount = 5.0)
counterInc = CounterInc(number = 5.0)
)
)
)
Expand Down Expand Up @@ -313,7 +313,7 @@ class DefaultRealtimeObjectsTest {
operation = ObjectOperation(
action = ObjectOperationAction.CounterInc,
objectId = "counter:test@1",
counterOp = ObjectsCounterOp(amount = 5.0)
counterInc = CounterInc(number = 5.0)
)
)
)
Expand Down Expand Up @@ -370,7 +370,7 @@ class DefaultRealtimeObjectsTest {
operation = ObjectOperation(
action = ObjectOperationAction.CounterInc,
objectId = "counter:test@1",
counterOp = ObjectsCounterOp(amount = 3.0)
counterInc = CounterInc(number = 3.0)
),
serial = "serial-op-1",
siteCode = "site1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import io.ably.lib.objects.unit.*
import io.ably.lib.objects.unit.getDefaultRealtimeObjectsWithMockedDeps
import io.ably.lib.types.AblyException
import io.ably.lib.types.ErrorInfo
import io.ably.lib.util.Log
import io.mockk.*
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runTest
Expand Down Expand Up @@ -655,6 +656,140 @@ class ObjectsManagerTest {
"appliedOnAckSerials should be empty when serials length mismatches")
}

@Test
fun `(RTO5f2a2) partial sync map entries are merged across two messages with the same objectId`() {
val defaultRealtimeObjects = makeRealtimeObjects()
val objectsManager = defaultRealtimeObjects.ObjectsManager

val msg1 = ObjectMessage(
id = "msg1",
objectState = ObjectState(
objectId = "map:test@1",
tombstone = false,
siteTimeserials = mapOf("site1" to "serial1"),
map = ObjectsMap(semantics = ObjectsMapSemantics.LWW, entries = mapOf("key1" to ObjectsMapEntry(data = ObjectData(string = "value1"))))
)
)
val msg2 = ObjectMessage(
id = "msg2",
objectState = ObjectState(
objectId = "map:test@1",
tombstone = false,
siteTimeserials = mapOf("site1" to "serial2"),
map = ObjectsMap(semantics = ObjectsMapSemantics.LWW, entries = mapOf("key2" to ObjectsMapEntry(data = ObjectData(string = "value2"))))
)
)

objectsManager.handleObjectSyncMessages(listOf(msg1, msg2), "sync-1:")

val liveMap = defaultRealtimeObjects.objectsPool.get("map:test@1") as DefaultLiveMap
assertNotNull(liveMap.data["key1"], "key1 should be present after merge")
assertNotNull(liveMap.data["key2"], "key2 should be present after merge")
assertEquals("value1", liveMap.data["key1"]?.data?.string)
assertEquals("value2", liveMap.data["key2"]?.data?.string)
}

@Test
fun `(RTO5f2a1) tombstone on second partial message replaces pool entry entirely`() {
val defaultRealtimeObjects = makeRealtimeObjects()
val objectsManager = defaultRealtimeObjects.ObjectsManager

val msg1 = ObjectMessage(
id = "msg1",
objectState = ObjectState(
objectId = "map:test@1",
tombstone = false,
siteTimeserials = mapOf("site1" to "serial1"),
map = ObjectsMap(semantics = ObjectsMapSemantics.LWW, entries = mapOf("key1" to ObjectsMapEntry(data = ObjectData(string = "value1"))))
)
)
val msg2 = ObjectMessage(
id = "msg2",
objectState = ObjectState(
objectId = "map:test@1",
tombstone = true,
siteTimeserials = mapOf("site1" to "serial2"),
map = ObjectsMap(semantics = ObjectsMapSemantics.LWW, entries = emptyMap())
)
)

objectsManager.handleObjectSyncMessages(listOf(msg1, msg2), "sync-1:")

val liveMap = defaultRealtimeObjects.objectsPool.get("map:test@1") as DefaultLiveMap
// After tombstone replaces the entry, the map should have no key1
assertNull(liveMap.data["key1"], "key1 should not be present after tombstone replaced the pool entry")
}

@Test
fun `(RTO5f2b) partial sync counter message logs error and is skipped`() {
val defaultRealtimeObjects = makeRealtimeObjects()
val objectsManager = defaultRealtimeObjects.ObjectsManager

mockkStatic(Log::class)
every { Log.e(any(), any<String>()) } returns 0

val msg1 = ObjectMessage(
id = "msg1",
objectState = ObjectState(
objectId = "counter:test@1",
tombstone = false,
siteTimeserials = mapOf("site1" to "serial1"),
counter = ObjectsCounter(count = 10.0)
)
)
val msg2 = ObjectMessage(
id = "msg2",
objectState = ObjectState(
objectId = "counter:test@1",
tombstone = false,
siteTimeserials = mapOf("site1" to "serial2"),
counter = ObjectsCounter(count = 5.0)
)
)

objectsManager.handleObjectSyncMessages(listOf(msg1, msg2), "sync-1:")

// Pool should contain only msg1 (msg2 skipped)
val counter = defaultRealtimeObjects.objectsPool.get("counter:test@1") as DefaultLiveCounter
assertEquals(10.0, counter.data.get(), "counter value should be from msg1 only (msg2 skipped)")
verify { Log.e(any(), match<String> { it.contains("partial sync message for a counter") }) }
}

@Test
fun `(RTO5f2c) partial sync message with unsupported type logs warning and is skipped`() {
val defaultRealtimeObjects = makeRealtimeObjects()
val objectsManager = defaultRealtimeObjects.ObjectsManager

mockkStatic(Log::class)
every { Log.w(any(), any<String>()) } returns 0

val msg1 = ObjectMessage(
id = "msg1",
objectState = ObjectState(
objectId = "map:test@1",
tombstone = false,
siteTimeserials = mapOf("site1" to "serial1"),
map = ObjectsMap(semantics = ObjectsMapSemantics.LWW, entries = mapOf("key1" to ObjectsMapEntry(data = ObjectData(string = "value1"))))
)
)
// msg2 has neither map nor counter — hits the else branch (RTO5f2c)
val msg2 = ObjectMessage(
id = "msg2",
objectState = ObjectState(
objectId = "map:test@1",
tombstone = false,
siteTimeserials = mapOf("site1" to "serial2"),
)
)

objectsManager.handleObjectSyncMessages(listOf(msg1, msg2), "sync-1:")

// Pool entry should still be msg1 (msg2 was skipped)
val liveMap = defaultRealtimeObjects.objectsPool.get("map:test@1") as DefaultLiveMap
assertNotNull(liveMap.data["key1"], "key1 should still be present (msg2 skipped)")
verify { Log.w(any(), match<String> { it.contains("unsupported object type") }) }
}

private fun mockZeroValuedObjects() {
mockkObject(DefaultLiveMap.Companion)
every {
Expand Down
Loading