Conversation
WalkthroughRenamed internal sync pool and API ( Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
Implements partial OBJECT_SYNC handling for Live Objects by merging multiple sync messages for the same objectId (instead of overwriting), while also renaming the internal sync pool for clearer, standardized naming.
Changes:
- Rename
syncObjectsDataPooltosyncObjectsPool(and corresponding accessor/clear methods) across implementation and tests. - Update sync processing to merge partial sync map messages for the same
objectId, with special handling for tombstones and unsupported partial types. - Add unit tests covering partial sync merging/tombstone/skip paths and update test helpers accordingly.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt | Renames sync pool and adds partial-sync merge logic for repeated objectIds during OBJECT_SYNC. |
| liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt | Updates state-change cleanup to call the renamed sync-pool clearing method. |
| liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt | Adds new tests for partial sync merge/tombstone/skip logging behavior. |
| liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultRealtimeObjectsTest.kt | Updates tests to use the renamed sync-pool APIs/accessors. |
| liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt | Updates the test-only private-field accessor to the renamed sync pool. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt
Show resolved
Hide resolved
liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt (1)
660-690: Cover mergedsiteTimeserials, not just merged entries.This test still passes if the second fragment's site-serial metadata is dropped during the merge, even though that metadata is what the runtime uses for post-sync ordering.
🧪 Suggested coverage
- siteTimeserials = mapOf("site1" to "serial2"), + siteTimeserials = mapOf("site2" to "serial2"), map = ObjectsMap(semantics = ObjectsMapSemantics.LWW, entries = mapOf("key2" to ObjectsMapEntry(data = ObjectData(string = "value2")))) ) ) objectsManager.handleObjectSyncMessages(listOf(msg1, msg2), "sync-1:") val liveMap = defaultRealtimeObjects.objectsPool.get("map:test@1") as DefaultLiveMap assertNotNull(liveMap.data["key1"], "key1 should be present after merge") assertNotNull(liveMap.data["key2"], "key2 should be present after merge") assertEquals("value1", liveMap.data["key1"]?.data?.string) assertEquals("value2", liveMap.data["key2"]?.data?.string) + assertEquals("serial1", liveMap.siteTimeserials["site1"]) + assertEquals("serial2", liveMap.siteTimeserials["site2"])
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt`:
- Around line 263-276: When merging partial map syncs in ObjectsManager (the
when branch handling objectState.map), also merge the siteTimeserials from the
existing fragment and the incoming fragment instead of keeping the first
fragment's value; update the mergedMap creation to combine
existingState.map?.siteTimeserials and objectState.map.siteTimeserials (e.g.,
concatenate then deduplicate/preserve order as appropriate) and set that on
mergedMap before building mergedState and writing syncObjectsPool[objectId] =
existingEntry.copy(objectState = mergedState).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 254bfbfc-c643-471d-9f08-aedfae574373
📒 Files selected for processing (5)
liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.ktliveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultRealtimeObjectsTest.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt
95135d5 to
872a6f6
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt (1)
270-274:⚠️ Potential issue | 🟠 MajorMerge
siteTimeserialswhen coalescing partial map sync fragments.
existingState.copy(map = mergedMap)keeps only the first fragment'ssiteTimeserials. If later partials carry newer per-site serials,applyObjectSync()seeds the live object with stale ordering metadata and can misorder or incorrectly dedupe subsequent ops.💡 Proposed fix
val existingState = existingEntry.objectState!! // non-null for existing entry val mergedEntries = existingState.map?.entries.orEmpty() + objectState.map.entries.orEmpty() val mergedMap = (existingState.map ?: ObjectsMap()).copy(entries = mergedEntries) - val mergedState = existingState.copy(map = mergedMap) + val mergedState = existingState.copy( + siteTimeserials = existingState.siteTimeserials + objectState.siteTimeserials, + map = mergedMap, + ) syncObjectsPool[objectId] = existingEntry.copy(objectState = mergedState)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt` around lines 270 - 274, When coalescing partial map sync fragments we currently only take entries from both fragments but keep the first fragment's per-site serials, causing stale siteTimeserials to seed live objects; update the merge to combine siteTimeserials from existingState.map and objectState.map by merging their maps and selecting the newer/higher serial per site (e.g., max value per key), then build mergedMap using ObjectsMap().copy(entries = mergedEntries, siteTimeserials = mergedSiteTimeserials) and assign mergedState = existingState.copy(map = mergedMap) before writing back to syncObjectsPool[objectId]; ensure you reference existingState, objectState, mergedEntries, mergedMap and applyObjectSync remains compatible with the merged siteTimeserials.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt`:
- Around line 77-83: The problem is that startNewSync reassigns
syncCompletionWaiter unconditionally, which leaves any applyAckResult() awaiting
the previous CompletableDeferred stranded when a server-initiated OBJECT_SYNC
begins; to fix, in ObjectsManager.startNewSync do not replace an existing active
syncCompletionWaiter—only create a new CompletableDeferred when there is no
current waiter or the existing one is already completed/consumed; keep using the
same syncCompletionWaiter across resyncs so applyAckResult() can resume/fail
correctly and continue to clear syncObjectsPool and set currentSyncId/state as
before.
---
Duplicate comments:
In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt`:
- Around line 270-274: When coalescing partial map sync fragments we currently
only take entries from both fragments but keep the first fragment's per-site
serials, causing stale siteTimeserials to seed live objects; update the merge to
combine siteTimeserials from existingState.map and objectState.map by merging
their maps and selecting the newer/higher serial per site (e.g., max value per
key), then build mergedMap using ObjectsMap().copy(entries = mergedEntries,
siteTimeserials = mergedSiteTimeserials) and assign mergedState =
existingState.copy(map = mergedMap) before writing back to
syncObjectsPool[objectId]; ensure you reference existingState, objectState,
mergedEntries, mergedMap and applyObjectSync remains compatible with the merged
siteTimeserials.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: cdab0f9e-90f7-424e-97bc-6a7c001ea7f8
📒 Files selected for processing (5)
liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.ktliveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultRealtimeObjectsTest.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt
🚧 Files skipped from review as they are similar to previous changes (1)
- liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt
872a6f6 to
aa8f205
Compare
aa8f205 to
92ed8eb
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (2)
liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt (2)
270-274:⚠️ Potential issue | 🟠 MajorMerge
siteTimeserialswhen coalescing partial map syncs.
ObjectStateinliveobjects/src/main/kotlin/io/ably/lib/objects/ObjectMessage.kt:264-304treatssiteTimeserialsas part of the snapshot, but Lines 270-274 only merge map entries. Any site serials carried by later fragments are lost, so the synced object is seeded with stale ordering/dedup metadata.💡 Suggested fix
- val mergedMap = (existingState.map ?: ObjectsMap()).copy(entries = mergedEntries) - val mergedState = existingState.copy(map = mergedMap) + val mergedMap = (existingState.map ?: ObjectsMap()).copy( + semantics = objectState.map.semantics ?: existingState.map?.semantics, + entries = mergedEntries, + ) + val mergedState = existingState.copy( + siteTimeserials = existingState.siteTimeserials + objectState.siteTimeserials, + map = mergedMap, + )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt` around lines 270 - 274, The merge currently only combines map.entries and discards siteTimeserials; update the coalescing logic around existingEntry/existingState/objectState so you also merge siteTimeserials (e.g., combine existingState.siteTimeserials with objectState.siteTimeserials, preferring the newer fragment's values where keys collide), then include that merged siteTimeserials in mergedState (the copy used to update syncObjectsPool[objectId]) so ordering/dedup metadata is preserved.
80-82:⚠️ Potential issue | 🔴 CriticalDon't replace an active
syncCompletionWaiter.If a server-initiated resync starts while
applyAckResult()is already awaiting the current deferred on Lines 109-113, this assignment strands those coroutines on the old instance and their publishes never resume or fail. Reuse the existing active waiter and only allocate a new one when there isn't one.💡 Suggested fix
- syncCompletionWaiter = CompletableDeferred() + if (syncCompletionWaiter?.isActive != true) { + syncCompletionWaiter = CompletableDeferred() + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt` around lines 80 - 82, When starting a resync, do not unconditionally replace the active CompletableDeferred; instead check the current syncCompletionWaiter used by applyAckResult() and only allocate a new CompletableDeferred when there is no existing waiter or the existing one is completed (e.g. if (syncCompletionWaiter == null || syncCompletionWaiter!!.isCompleted) { syncCompletionWaiter = CompletableDeferred() }), so coroutines awaiting the old waiter in applyAckResult() are not stranded; keep the existing syncCompletionWaiter otherwise while still clearing syncObjectsPool and updating currentSyncId.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultRealtimeObjectsTest.kt`:
- Around line 371-373: The buffered increment in the test is currently a no-op
because CounterInc(number = 0.0) is queued; update the queued CounterInc to use
the expected delta (CounterInc(number = 3.0)) so the post-sync value moves from
5.0 to 8.0 and the replay is +3.0. Locate the queued operation using the symbol
CounterInc (and the surrounding ObjectOperationAction.CounterInc / objectId
"counter:test@1") and change its number to 3.0; apply the same fix to the second
occurrence referenced around the later expectations so both buffered increments
match the assertions.
---
Duplicate comments:
In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt`:
- Around line 270-274: The merge currently only combines map.entries and
discards siteTimeserials; update the coalescing logic around
existingEntry/existingState/objectState so you also merge siteTimeserials (e.g.,
combine existingState.siteTimeserials with objectState.siteTimeserials,
preferring the newer fragment's values where keys collide), then include that
merged siteTimeserials in mergedState (the copy used to update
syncObjectsPool[objectId]) so ordering/dedup metadata is preserved.
- Around line 80-82: When starting a resync, do not unconditionally replace the
active CompletableDeferred; instead check the current syncCompletionWaiter used
by applyAckResult() and only allocate a new CompletableDeferred when there is no
existing waiter or the existing one is completed (e.g. if (syncCompletionWaiter
== null || syncCompletionWaiter!!.isCompleted) { syncCompletionWaiter =
CompletableDeferred() }), so coroutines awaiting the old waiter in
applyAckResult() are not stranded; keep the existing syncCompletionWaiter
otherwise while still clearing syncObjectsPool and updating currentSyncId.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 52afa433-a7cb-4bd8-84b9-aa8a063da076
📒 Files selected for processing (5)
liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.ktliveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultRealtimeObjectsTest.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt
🚧 Files skipped from review as they are similar to previous changes (2)
- liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt
- liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt
liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultRealtimeObjectsTest.kt
Outdated
Show resolved
Hide resolved
- rename `syncObjectsDataPool` to `syncObjectsPool` - Standardized naming across `ObjectsManager` to improve code clarity. - Updated references in implementation and tests.
92ed8eb to
f430594
Compare
There was a problem hiding this comment.
♻️ Duplicate comments (2)
liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt (2)
270-274:⚠️ Potential issue | 🟠 MajorMerge
siteTimeserialswhen combining partial map fragments.This merge keeps the first fragment's
siteTimeserialsand drops later fragments' metadata.applyObjectSync()then seeds the live map from stale per-site serials, which can break ordering/dedup after sync.🔧 Proposed fix
val existingState = existingEntry.objectState!! // non-null for existing entry val mergedEntries = existingState.map?.entries.orEmpty() + objectState.map.entries.orEmpty() - val mergedMap = (existingState.map ?: ObjectsMap()).copy(entries = mergedEntries) - val mergedState = existingState.copy(map = mergedMap) + val mergedMap = (existingState.map ?: ObjectsMap()).copy( + semantics = objectState.map.semantics ?: existingState.map?.semantics, + entries = mergedEntries, + ) + val mergedState = existingState.copy( + siteTimeserials = existingState.siteTimeserials + objectState.siteTimeserials, + map = mergedMap, + ) syncObjectsPool[objectId] = existingEntry.copy(objectState = mergedState)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt` around lines 270 - 274, When merging partial map fragments in ObjectsManager, also merge per-site serial metadata instead of keeping only the first fragment's siteTimeserials: in the block using existingState, objectState and building mergedMap (referencing existingState.map, objectState.map, ObjectsMap, mergedEntries and syncObjectsPool[objectId] = existingEntry.copy(...)), combine siteTimeserials from both maps into mergedMap.siteTimeserials by merging keys and taking the highest serial per site (or otherwise resolving duplicates deterministically) so applyObjectSync seeds the live map with the correct per-site serials and preserves ordering/dedup semantics.
80-82:⚠️ Potential issue | 🟠 MajorPreserve the active sync waiter across resyncs.
Line 82 replaces any in-flight
syncCompletionWaiter. If a newOBJECT_SYNCstarts whileapplyAckResult()is already awaiting the old deferred, that caller never resumes or fails.🔧 Proposed fix
internal fun startNewSync(syncId: String?) { Log.v(tag, "Starting new sync sequence: syncId=$syncId") syncObjectsPool.clear() // RTO5a2a currentSyncId = syncId - syncCompletionWaiter = CompletableDeferred() + if (syncCompletionWaiter?.isActive != true) { + syncCompletionWaiter = CompletableDeferred() + } stateChange(ObjectsState.Syncing) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt` around lines 80 - 82, The resync logic in ObjectsManager (the lines calling syncObjectsPool.clear(), setting currentSyncId, and assigning syncCompletionWaiter) overwrites any existing in-flight syncCompletionWaiter and can leave callers of applyAckResult() hanging; change the assignment so you only create/replace syncCompletionWaiter when there is no active unfinished deferred (i.e., if syncCompletionWaiter is null or already completed), otherwise preserve the existing Deferred so applyAckResult() can resume; reference the symbols syncObjectsPool.clear(), currentSyncId, syncCompletionWaiter, and applyAckResult() when making the conditional creation/assignment.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt`:
- Around line 270-274: When merging partial map fragments in ObjectsManager,
also merge per-site serial metadata instead of keeping only the first fragment's
siteTimeserials: in the block using existingState, objectState and building
mergedMap (referencing existingState.map, objectState.map, ObjectsMap,
mergedEntries and syncObjectsPool[objectId] = existingEntry.copy(...)), combine
siteTimeserials from both maps into mergedMap.siteTimeserials by merging keys
and taking the highest serial per site (or otherwise resolving duplicates
deterministically) so applyObjectSync seeds the live map with the correct
per-site serials and preserves ordering/dedup semantics.
- Around line 80-82: The resync logic in ObjectsManager (the lines calling
syncObjectsPool.clear(), setting currentSyncId, and assigning
syncCompletionWaiter) overwrites any existing in-flight syncCompletionWaiter and
can leave callers of applyAckResult() hanging; change the assignment so you only
create/replace syncCompletionWaiter when there is no active unfinished deferred
(i.e., if syncCompletionWaiter is null or already completed), otherwise preserve
the existing Deferred so applyAckResult() can resume; reference the symbols
syncObjectsPool.clear(), currentSyncId, syncCompletionWaiter, and
applyAckResult() when making the conditional creation/assignment.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: a8f91d7b-0bbb-49b4-a27e-f6a77b23d37f
📒 Files selected for processing (6)
liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.ktliveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.ktliveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultRealtimeObjectsTest.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt
✅ Files skipped from review due to trivial changes (1)
- liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt
syncObjectsDataPooltosyncObjectsPoolObjectsManagerto improve code clarity.Spec: ably/specification#413
JS: ably/ably-js#2152
Summary by CodeRabbit