Skip to content

Commit 11de235

Browse files
Implement new rules for discarding buffered object operations
That is, do it when we get a discontinuity, not when a new sync sequence changes, per spec changes in [1]. [1] ably/specification#416
1 parent d1047bc commit 11de235

6 files changed

Lines changed: 137 additions & 36 deletions

File tree

AblyLiveObjects.xcworkspace/xcshareddata/swiftpm/Package.resolved

Lines changed: 3 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Package.resolved

Lines changed: 3 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Package.swift

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@ let package = Package(
1818
),
1919
],
2020
dependencies: [
21+
// TODO: Unpin before release
2122
.package(
2223
url: "https://github.com/ably/ably-cocoa",
23-
from: "1.2.55",
24+
revision: "ee0b1fabf2e30377d6a1d90f3d2b6614d16951d8",
2425
),
26+
// TODO: Unpin before release
2527
.package(
2628
url: "https://github.com/ably/ably-cocoa-plugin-support",
27-
from: "1.0.0",
29+
revision: "2e640ce2ef422cd5ef693a9e1111400e9985e088",
2830
),
2931
.package(
3032
url: "https://github.com/apple/swift-argument-parser",

Sources/AblyLiveObjects/Internal/DefaultInternalPlugin.swift

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,12 @@ internal final class DefaultInternalPlugin: NSObject, _AblyPluginSupportPrivate.
114114
return wireObjectMessage.toWireObject.toPluginSupportDataDictionary
115115
}
116116

117-
internal func nosync_onChannelAttached(_ channel: _AblyPluginSupportPrivate.RealtimeChannel, hasObjects: Bool) {
118-
nosync_realtimeObjects(for: channel).nosync_onChannelAttached(hasObjects: hasObjects)
117+
internal func nosync_onChannelAttached(_: _AblyPluginSupportPrivate.RealtimeChannel, hasObjects _: Bool) {
118+
preconditionFailure("Expected a version of ably-cocoa that calls the variant that accepts a `resumed` argument")
119+
}
120+
121+
internal func nosync_onChannelAttached(_ channel: _AblyPluginSupportPrivate.RealtimeChannel, hasObjects: Bool, resumed: Bool) {
122+
nosync_realtimeObjects(for: channel).nosync_onChannelAttached(hasObjects: hasObjects, resumed: resumed)
119123
}
120124

121125
internal func nosync_handleObjectProtocolMessage(withObjectMessages publicObjectMessages: [any _AblyPluginSupportPrivate.ObjectMessageProtocol], channel: _AblyPluginSupportPrivate.RealtimeChannel) {

Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -302,10 +302,11 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo
302302
}
303303
}
304304

305-
internal func nosync_onChannelAttached(hasObjects: Bool) {
305+
internal func nosync_onChannelAttached(hasObjects: Bool, resumed: Bool) {
306306
mutableStateMutex.withoutSync { mutableState in
307307
mutableState.nosync_onChannelAttached(
308308
hasObjects: hasObjects,
309+
resumed: resumed,
309310
logger: logger,
310311
userCallbackQueue: userCallbackQueue,
311312
)
@@ -500,13 +501,20 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo
500501

501502
internal mutating func nosync_onChannelAttached(
502503
hasObjects: Bool,
504+
resumed: Bool,
503505
logger: Logger,
504506
userCallbackQueue: DispatchQueue,
505507
) {
506-
logger.log("onChannelAttached(hasObjects: \(hasObjects)", level: .debug)
508+
logger.log("onChannelAttached(hasObjects: \(hasObjects), resumed: \(resumed)", level: .debug)
507509

508510
onChannelAttachedHasObjects = hasObjects
509511

512+
// RTO4d: If the RESUMED flag is not set, clear any buffered operations
513+
if !resumed, case let .syncing(syncingData) = state {
514+
logger.log("Clearing buffered operations due to ATTACHED without RESUMED flag", level: .debug)
515+
syncingData.bufferedObjectOperations = []
516+
}
517+
510518
// We will subsequently transition to .synced either by the completion of the RTO4a OBJECT_SYNC, or by the RTO4b no-HAS_OBJECTS case below
511519
if state.toObjectsSyncState != .syncing {
512520
// RTO4c
@@ -558,9 +566,8 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo
558566
// Figure out whether to continue any existing sync sequence or start a new one
559567
let isNewSyncSequence = syncCursor == nil || syncingData.syncSequence?.id != syncCursor?.sequenceID
560568
if isNewSyncSequence {
561-
// RTO5a2a, RTO5a2b: new sequence started, discard previous. Else we continue the existing sequence per RTO5a3
569+
// RTO5a2a: new sequence started, discard previous SyncObjectsPool. Else we continue the existing sequence per RTO5a3
562570
syncingData.syncSequence = nil
563-
syncingData.bufferedObjectOperations = []
564571
}
565572
}
566573

Tests/AblyLiveObjectsTests/InternalDefaultRealtimeObjectsTests.swift

Lines changed: 110 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -126,9 +126,8 @@ struct InternalDefaultRealtimeObjectsTests {
126126

127127
// @spec RTO5a2
128128
// @spec RTO5a2a
129-
// @spec RTO5a2b
130129
@Test
131-
func newSequenceIdDiscardsInFlightSync() async throws {
130+
func newSequenceIdDiscardsInFlightSyncButKeepsBufferedOperations() async throws {
132131
let internalQueue = TestFactories.createInternalQueue()
133132
let realtimeObjects = InternalDefaultRealtimeObjectsTests.createDefaultRealtimeObjects(internalQueue: internalQueue)
134133
let firstSequenceId = "seq1"
@@ -145,7 +144,8 @@ struct InternalDefaultRealtimeObjectsTests {
145144

146145
#expect(realtimeObjects.testsOnly_hasSyncSequence)
147146

148-
// Inject an OBJECT; it will get buffered per RTO8a and subsequently discarded per RTO5a2b
147+
// Inject an OBJECT; it will get buffered per RTO8a and applied after sync completion per RTO5c6
148+
// (Note: RTO5a2b was deleted, so buffered operations are NOT cleared when a new sequence ID arrives)
149149
internalQueue.ably_syncNoDeadlock {
150150
realtimeObjects.nosync_handleObjectProtocolMessage(objectMessages: [
151151
TestFactories.mapCreateOperationMessage(objectId: "map:3@789"),
@@ -172,10 +172,11 @@ struct InternalDefaultRealtimeObjectsTests {
172172
)
173173
}
174174

175-
// Verify only the second sequence's objects were applied (RTO5a2a - previous cleared)
175+
// Verify the second sequence's objects were applied (RTO5a2a - SyncObjectsPool was cleared)
176+
// but the buffered OBJECT operation was also applied (RTO5a2b deleted - BufferedObjectOperations NOT cleared)
176177
let pool = realtimeObjects.testsOnly_objectsPool
177-
#expect(pool.entries["map:1@123"] == nil) // From discarded first sequence
178-
#expect(pool.entries["map:3@789"] == nil) // Check we discarded the OBJECT that was buffered during discarded first sequence (RTO5a2b)
178+
#expect(pool.entries["map:1@123"] == nil) // From discarded first sequence's SyncObjectsPool
179+
#expect(pool.entries["map:3@789"] != nil) // Buffered OBJECT was applied (RTO5a2b deleted)
179180
#expect(pool.entries["map:2@456"] != nil) // From completed second sequence
180181
#expect(!realtimeObjects.testsOnly_hasSyncSequence)
181182
}
@@ -373,7 +374,7 @@ struct InternalDefaultRealtimeObjectsTests {
373374

374375
// When: onChannelAttached is called with hasObjects = true
375376
internalQueue.ably_syncNoDeadlock {
376-
realtimeObjects.nosync_onChannelAttached(hasObjects: true)
377+
realtimeObjects.nosync_onChannelAttached(hasObjects: true, resumed: false)
377378
}
378379

379380
// Then: Nothing should be modified
@@ -389,6 +390,96 @@ struct InternalDefaultRealtimeObjectsTests {
389390
#expect(realtimeObjects.testsOnly_hasSyncSequence)
390391
}
391392

393+
// MARK: - RTO4d Tests
394+
395+
// @spec RTO4d - Checks that buffered operations are cleared when ATTACHED is received without RESUMED flag
396+
@Test
397+
func clearsBufferedOperationsWhenResumedIsFalse() {
398+
let internalQueue = TestFactories.createInternalQueue()
399+
let realtimeObjects = InternalDefaultRealtimeObjectsTests.createDefaultRealtimeObjects(internalQueue: internalQueue)
400+
401+
// Start a sync sequence
402+
internalQueue.ably_syncNoDeadlock {
403+
realtimeObjects.nosync_handleObjectSyncProtocolMessage(
404+
objectMessages: [
405+
TestFactories.mapObjectMessage(objectId: "map:sync@123"),
406+
],
407+
protocolMessageChannelSerial: "seq1:cursor1",
408+
)
409+
}
410+
411+
#expect(realtimeObjects.testsOnly_hasSyncSequence)
412+
413+
// Buffer some OBJECT operations during the sync
414+
internalQueue.ably_syncNoDeadlock {
415+
realtimeObjects.nosync_handleObjectProtocolMessage(objectMessages: [
416+
TestFactories.mapCreateOperationMessage(objectId: "map:buffered@456"),
417+
])
418+
}
419+
420+
// When: onChannelAttached is called with resumed = false (RTO4d)
421+
internalQueue.ably_syncNoDeadlock {
422+
realtimeObjects.nosync_onChannelAttached(hasObjects: true, resumed: false)
423+
}
424+
425+
// Continue and complete the sync sequence
426+
internalQueue.ably_syncNoDeadlock {
427+
realtimeObjects.nosync_handleObjectSyncProtocolMessage(
428+
objectMessages: [],
429+
protocolMessageChannelSerial: "seq1:",
430+
)
431+
}
432+
433+
// Then: The buffered OBJECT operation should NOT have been applied because it was cleared per RTO4d
434+
let pool = realtimeObjects.testsOnly_objectsPool
435+
#expect(pool.entries["map:sync@123"] != nil) // From the sync sequence
436+
#expect(pool.entries["map:buffered@456"] == nil) // Buffered operation was cleared (RTO4d)
437+
}
438+
439+
// @spec RTO4d - Checks that buffered operations are NOT cleared when ATTACHED is received WITH RESUMED flag
440+
@Test
441+
func preservesBufferedOperationsWhenResumedIsTrue() {
442+
let internalQueue = TestFactories.createInternalQueue()
443+
let realtimeObjects = InternalDefaultRealtimeObjectsTests.createDefaultRealtimeObjects(internalQueue: internalQueue)
444+
445+
// Start a sync sequence
446+
internalQueue.ably_syncNoDeadlock {
447+
realtimeObjects.nosync_handleObjectSyncProtocolMessage(
448+
objectMessages: [
449+
TestFactories.mapObjectMessage(objectId: "map:sync@123"),
450+
],
451+
protocolMessageChannelSerial: "seq1:cursor1",
452+
)
453+
}
454+
455+
#expect(realtimeObjects.testsOnly_hasSyncSequence)
456+
457+
// Buffer some OBJECT operations during the sync
458+
internalQueue.ably_syncNoDeadlock {
459+
realtimeObjects.nosync_handleObjectProtocolMessage(objectMessages: [
460+
TestFactories.mapCreateOperationMessage(objectId: "map:buffered@456"),
461+
])
462+
}
463+
464+
// When: onChannelAttached is called with resumed = true (state was preserved)
465+
internalQueue.ably_syncNoDeadlock {
466+
realtimeObjects.nosync_onChannelAttached(hasObjects: true, resumed: true)
467+
}
468+
469+
// Continue and complete the sync sequence
470+
internalQueue.ably_syncNoDeadlock {
471+
realtimeObjects.nosync_handleObjectSyncProtocolMessage(
472+
objectMessages: [],
473+
protocolMessageChannelSerial: "seq1:",
474+
)
475+
}
476+
477+
// Then: The buffered OBJECT operation should have been applied because it was NOT cleared
478+
let pool = realtimeObjects.testsOnly_objectsPool
479+
#expect(pool.entries["map:sync@123"] != nil) // From the sync sequence
480+
#expect(pool.entries["map:buffered@456"] != nil) // Buffered operation was preserved and applied
481+
}
482+
392483
// MARK: - RTO4b Tests
393484

394485
// @spec RTO4b1
@@ -440,7 +531,7 @@ struct InternalDefaultRealtimeObjectsTests {
440531

441532
// When: onChannelAttached is called with hasObjects = false
442533
internalQueue.ably_syncNoDeadlock {
443-
realtimeObjects.nosync_onChannelAttached(hasObjects: false)
534+
realtimeObjects.nosync_onChannelAttached(hasObjects: false, resumed: false)
444535
}
445536

446537
// Then: Verify the expected behavior per RTO4b
@@ -476,15 +567,15 @@ struct InternalDefaultRealtimeObjectsTests {
476567

477568
// First call with hasObjects = true (should do nothing)
478569
internalQueue.ably_syncNoDeadlock {
479-
realtimeObjects.nosync_onChannelAttached(hasObjects: true)
570+
realtimeObjects.nosync_onChannelAttached(hasObjects: true, resumed: false)
480571
}
481572
#expect(realtimeObjects.testsOnly_onChannelAttachedHasObjects == true)
482573
let originalPool = realtimeObjects.testsOnly_objectsPool
483574
let originalRoot = originalPool.root
484575

485576
// Second call with hasObjects = false (should reset)
486577
internalQueue.ably_syncNoDeadlock {
487-
realtimeObjects.nosync_onChannelAttached(hasObjects: false)
578+
realtimeObjects.nosync_onChannelAttached(hasObjects: false, resumed: false)
488579
}
489580
#expect(realtimeObjects.testsOnly_onChannelAttachedHasObjects == false)
490581
let newPool = realtimeObjects.testsOnly_objectsPool
@@ -493,7 +584,7 @@ struct InternalDefaultRealtimeObjectsTests {
493584

494585
// Third call with hasObjects = true again (should do nothing)
495586
internalQueue.ably_syncNoDeadlock {
496-
realtimeObjects.nosync_onChannelAttached(hasObjects: true)
587+
realtimeObjects.nosync_onChannelAttached(hasObjects: true, resumed: false)
497588
}
498589
#expect(realtimeObjects.testsOnly_onChannelAttachedHasObjects == true)
499590
let finalPool = realtimeObjects.testsOnly_objectsPool
@@ -530,7 +621,7 @@ struct InternalDefaultRealtimeObjectsTests {
530621

531622
// When: onChannelAttached is called with hasObjects = false
532623
internalQueue.ably_syncNoDeadlock {
533-
realtimeObjects.nosync_onChannelAttached(hasObjects: false)
624+
realtimeObjects.nosync_onChannelAttached(hasObjects: false, resumed: false)
534625
}
535626

536627
// Then: All sync data should be discarded
@@ -565,7 +656,7 @@ struct InternalDefaultRealtimeObjectsTests {
565656

566657
// When: onChannelAttached is called with hasObjects = false
567658
internalQueue.ably_syncNoDeadlock {
568-
realtimeObjects.nosync_onChannelAttached(hasObjects: false)
659+
realtimeObjects.nosync_onChannelAttached(hasObjects: false, resumed: false)
569660
}
570661

571662
// Then: Should still reset the pool correctly
@@ -583,7 +674,7 @@ struct InternalDefaultRealtimeObjectsTests {
583674

584675
// When: onChannelAttached is called with hasObjects = false
585676
internalQueue.ably_syncNoDeadlock {
586-
realtimeObjects.nosync_onChannelAttached(hasObjects: false)
677+
realtimeObjects.nosync_onChannelAttached(hasObjects: false, resumed: false)
587678
}
588679

589680
// Then: The new root should be properly initialized
@@ -611,7 +702,7 @@ struct InternalDefaultRealtimeObjectsTests {
611702

612703
// Complete sync via ATTACHED with HAS_OBJECTS false (RTO4b)
613704
internalQueue.ably_syncNoDeadlock {
614-
realtimeObjects.nosync_onChannelAttached(hasObjects: false)
705+
realtimeObjects.nosync_onChannelAttached(hasObjects: false, resumed: false)
615706
}
616707

617708
// getRoot should now complete
@@ -736,7 +827,7 @@ struct InternalDefaultRealtimeObjectsTests {
736827

737828
// Complete sync first
738829
internalQueue.ably_syncNoDeadlock {
739-
realtimeObjects.nosync_onChannelAttached(hasObjects: false)
830+
realtimeObjects.nosync_onChannelAttached(hasObjects: false, resumed: false)
740831
}
741832

742833
// getRoot should return
@@ -761,7 +852,7 @@ struct InternalDefaultRealtimeObjectsTests {
761852

762853
// Complete sync first
763854
internalQueue.ably_syncNoDeadlock {
764-
realtimeObjects.nosync_onChannelAttached(hasObjects: false)
855+
realtimeObjects.nosync_onChannelAttached(hasObjects: false, resumed: false)
765856
}
766857

767858
// Call getRoot
@@ -1602,7 +1693,8 @@ struct InternalDefaultRealtimeObjectsTests {
16021693
for channelEvent in scenario.channelEvents {
16031694
switch channelEvent {
16041695
case let .attached(hasObjects):
1605-
realtimeObjects.nosync_onChannelAttached(hasObjects: hasObjects)
1696+
// The value of `resumed` is arbitrary for these tests; they're focused on sync state transitions
1697+
realtimeObjects.nosync_onChannelAttached(hasObjects: hasObjects, resumed: true)
16061698
case let .objectSync(channelSerial):
16071699
realtimeObjects.nosync_handleObjectSyncProtocolMessage(
16081700
objectMessages: [],

0 commit comments

Comments
 (0)