Skip to content

Commit 792683d

Browse files
Implement the tombstonesGCScenarios
As in 8be3aed. Only noteworthy thing here is my increasing of the overridden GC interval compared to its value in the JS tests (see code comment).
1 parent b69a40d commit 792683d

3 files changed

Lines changed: 238 additions & 1 deletion

File tree

Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool
9898
(receivedObjectProtocolMessages, receivedObjectProtocolMessagesContinuation) = AsyncStream.makeStream()
9999
(receivedObjectSyncProtocolMessages, receivedObjectSyncProtocolMessagesContinuation) = AsyncStream.makeStream()
100100
(waitingForSyncEvents, waitingForSyncEventsContinuation) = AsyncStream.makeStream()
101+
(completedGarbageCollectionEvents, completedGarbageCollectionsEventsContinuation) = AsyncStream.makeStream()
101102
mutableState = .init(objectsPool: .init(logger: logger, userCallbackQueue: userCallbackQueue, clock: clock))
102103
garbageCollectionInterval = garbageCollectionOptions.interval
103104
garbageCollectionGracePeriod = garbageCollectionOptions.gracePeriod
@@ -331,21 +332,32 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool
331332
gracePeriod: garbageCollectionGracePeriod,
332333
clock: clock,
333334
logger: logger,
335+
eventsContinuation: completedGarbageCollectionsEventsContinuation,
334336
)
335337
}
336338
}
337339

340+
// These drive the testsOnly_completedGarbageCollectionEvents property that informs the test suite when a garbage collection cycle has completed.
341+
private let completedGarbageCollectionEvents: AsyncStream<Void>
342+
private let completedGarbageCollectionsEventsContinuation: AsyncStream<Void>.Continuation
343+
/// Emits an element whenever a garbage collection cycle has completed.
344+
internal var testsOnly_completedGarbageCollectionEvents: AsyncStream<Void> {
345+
completedGarbageCollectionEvents
346+
}
347+
338348
// MARK: - Testing
339349

340350
/// Finishes the following streams, to allow a test to perform assertions about which elements the streams have emitted to this moment:
341351
///
342352
/// - testsOnly_receivedObjectProtocolMessages
343353
/// - testsOnly_receivedObjectStateProtocolMessages
344354
/// - testsOnly_waitingForSyncEvents
355+
/// - testsOnly_completedGarbageCollectionEvents
345356
internal func testsOnly_finishAllTestHelperStreams() {
346357
receivedObjectProtocolMessagesContinuation.finish()
347358
receivedObjectSyncProtocolMessagesContinuation.finish()
348359
waitingForSyncEventsContinuation.finish()
360+
completedGarbageCollectionsEventsContinuation.finish()
349361
}
350362

351363
// MARK: - Mutable state and the operations that affect it

Sources/AblyLiveObjects/Internal/ObjectsPool.swift

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -408,7 +408,12 @@ internal struct ObjectsPool {
408408
}
409409

410410
/// Performs garbage collection of tombstoned objects and map entries, per RTO10c.
411-
internal mutating func performGarbageCollection(gracePeriod: TimeInterval, clock: SimpleClock, logger: Logger) {
411+
internal mutating func performGarbageCollection(
412+
gracePeriod: TimeInterval,
413+
clock: SimpleClock,
414+
logger: Logger,
415+
eventsContinuation: AsyncStream<Void>.Continuation,
416+
) {
412417
logger.log("Performing garbage collection, grace period \(gracePeriod)s", level: .debug)
413418

414419
let now = clock.now
@@ -433,5 +438,7 @@ internal struct ObjectsPool {
433438
}
434439
return !shouldRelease
435440
}
441+
442+
eventsContinuation.yield()
436443
}
437444
}

Tests/AblyLiveObjectsTests/JS Integration Tests/ObjectsIntegrationTests.swift

Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,20 @@ func waitForCounterUpdate(_ updates: AsyncStream<LiveCounterUpdate>) async {
9696
_ = await updates.first { _ in true }
9797
}
9898

99+
// Note that Cursor decided to implement this in a different way to the waitForObjectSync that I'd already implemented; TODO pick one of the two approaches (this one might be cleaner).
100+
func waitForObjectOperation(_ objects: any RealtimeObjects, _ action: ObjectOperationAction) async throws {
101+
// Cast to access internal API for testing
102+
let internallyTypedObjects = try #require(objects as? PublicDefaultRealtimeObjects)
103+
let objectMessages = internallyTypedObjects.testsOnly_receivedObjectProtocolMessages
104+
105+
// Wait for an object protocol message containing the specified action
106+
_ = await objectMessages.first { messages in
107+
messages.contains { message in
108+
message.operation?.action == .known(action)
109+
}
110+
}
111+
}
112+
99113
// I added this @MainActor as an "I don't understand what's going on there; let's try this" when observing that for some reason the setter of setListenerAfterProcessingIncomingMessage was hanging inside `-[ARTSRDelegateController dispatchQueue]`. This seems to avoid it and I have not investigated more deeply 🤷
100114
@MainActor
101115
func waitForObjectSync(_ realtime: ARTRealtime) async throws {
@@ -3703,6 +3717,210 @@ private struct ObjectsIntegrationTests {
37033717
}
37043718

37053719
// TODO: Implement the remaining scenarios
3720+
3721+
// MARK: - Tombstones GC Scenarios
3722+
3723+
enum TombstonesGCScenarios: Scenarios {
3724+
struct Context {
3725+
var root: any LiveMap
3726+
var objectsHelper: ObjectsHelper
3727+
var channelName: String
3728+
var channel: ARTRealtimeChannel
3729+
var objects: any RealtimeObjects
3730+
var client: ARTRealtime
3731+
var waitForGCCycles: @Sendable (Int) async -> Void
3732+
}
3733+
3734+
static let scenarios: [TestScenario<Context>] = [
3735+
.init(
3736+
disabled: false,
3737+
allTransportsAndProtocols: false,
3738+
description: "tombstoned object is removed from the pool after the GC grace period",
3739+
action: { ctx in
3740+
let objectsHelper = ctx.objectsHelper
3741+
let channelName = ctx.channelName
3742+
let channel = ctx.channel
3743+
let objects = ctx.objects
3744+
let waitForGCCycles = ctx.waitForGCCycles
3745+
3746+
// Wait for counter creation
3747+
async let counterCreatedPromise: Void = waitForObjectOperation(ctx.objects, .counterCreate)
3748+
3749+
// Send a CREATE op, this adds an object to the pool
3750+
let createResult = try await objectsHelper.operationRequest(
3751+
channelName: channelName,
3752+
opBody: objectsHelper.counterCreateRestOp(number: 1),
3753+
)
3754+
let objectId = createResult.objectId
3755+
_ = try await counterCreatedPromise
3756+
3757+
// Cast to access internal API for testing
3758+
let internallyTypedObjects = try #require(objects as? PublicDefaultRealtimeObjects)
3759+
3760+
#expect(internallyTypedObjects.testsOnly_proxied.testsOnly_objectsPool.entries[objectId] != nil, "Check object exists in the pool after creation")
3761+
3762+
// Inject OBJECT_DELETE for the object. This should tombstone the object and make it
3763+
// inaccessible to the end user, but still keep it in memory in the local pool
3764+
await objectsHelper.processObjectOperationMessageOnChannel(
3765+
channel: channel,
3766+
serial: lexicoTimeserial(seriesId: "aaa", timestamp: 0, counter: 0),
3767+
siteCode: "aaa",
3768+
state: [objectsHelper.objectDeleteOp(objectId: objectId)],
3769+
)
3770+
3771+
#expect(
3772+
internallyTypedObjects.testsOnly_proxied.testsOnly_objectsPool.entries[objectId] != nil,
3773+
"Check object exists in the pool immediately after OBJECT_DELETE",
3774+
)
3775+
3776+
let poolEntry = try #require(internallyTypedObjects.testsOnly_proxied.testsOnly_objectsPool.entries[objectId])
3777+
#expect(
3778+
poolEntry.isTombstone == true,
3779+
"Check object's \"tombstone\" flag is set to \"true\" after OBJECT_DELETE",
3780+
)
3781+
3782+
// We expect 2 cycles to guarantee that grace period has expired, which will always be
3783+
// true based on the test config used
3784+
await waitForGCCycles(2)
3785+
3786+
// Object should be removed from the local pool entirely now, as the GC grace period has passed
3787+
#expect(
3788+
internallyTypedObjects.testsOnly_proxied.testsOnly_objectsPool.entries[objectId] == nil,
3789+
"Check object does not exist in the pool after the GC grace period expiration",
3790+
)
3791+
},
3792+
),
3793+
.init(
3794+
disabled: false,
3795+
allTransportsAndProtocols: true,
3796+
description: "tombstoned map entry is removed from the LiveMap after the GC grace period",
3797+
action: { ctx in
3798+
let root = ctx.root
3799+
let objectsHelper = ctx.objectsHelper
3800+
let channelName = ctx.channelName
3801+
let waitForGCCycles = ctx.waitForGCCycles
3802+
3803+
let keyUpdatedPromise = try root.updates()
3804+
async let keyUpdatedWait: Void = {
3805+
await waitForMapKeyUpdate(keyUpdatedPromise, "foo")
3806+
}()
3807+
3808+
// Set a key on root
3809+
_ = try await objectsHelper.operationRequest(
3810+
channelName: channelName,
3811+
opBody: objectsHelper.mapSetRestOp(
3812+
objectId: "root",
3813+
key: "foo",
3814+
value: ["string": .string("bar")],
3815+
),
3816+
)
3817+
await keyUpdatedWait
3818+
3819+
#expect(
3820+
try #require(root.get(key: "foo")?.stringValue) == "bar",
3821+
"Check key \"foo\" exists on root after MAP_SET",
3822+
)
3823+
3824+
let keyUpdatedPromise2 = try root.updates()
3825+
async let keyUpdatedWait2: Void = {
3826+
await waitForMapKeyUpdate(keyUpdatedPromise2, "foo")
3827+
}()
3828+
3829+
// Remove the key from the root. This should tombstone the map entry and make it
3830+
// inaccessible to the end user, but still keep it in memory in the underlying map
3831+
_ = try await objectsHelper.operationRequest(
3832+
channelName: channelName,
3833+
opBody: objectsHelper.mapRemoveRestOp(objectId: "root", key: "foo"),
3834+
)
3835+
await keyUpdatedWait2
3836+
3837+
#expect(
3838+
try root.get(key: "foo") == nil,
3839+
"Check key \"foo\" is inaccessible via public API on root after MAP_REMOVE",
3840+
)
3841+
3842+
// Cast to access internal API for testing
3843+
let internallyTypedRoot = try #require(root as? PublicDefaultLiveMap)
3844+
let internalRoot = internallyTypedRoot.proxied
3845+
let underlyingData = internalRoot.testsOnly_data
3846+
3847+
#expect(
3848+
underlyingData["foo"] != nil,
3849+
"Check map entry for \"foo\" exists on root in the underlying data immediately after MAP_REMOVE",
3850+
)
3851+
#expect(
3852+
underlyingData["foo"]?.tombstone == true,
3853+
"Check map entry for \"foo\" on root has \"tombstone\" flag set to \"true\" after MAP_REMOVE",
3854+
)
3855+
3856+
// We expect 2 cycles to guarantee that grace period has expired, which will always be
3857+
// true based on the test config used
3858+
await waitForGCCycles(2)
3859+
3860+
// The entry should be removed from the underlying map now
3861+
let underlyingDataAfterGC = internalRoot.testsOnly_data
3862+
#expect(
3863+
underlyingDataAfterGC["foo"] == nil,
3864+
"Check map entry for \"foo\" does not exist on root in the underlying data after the GC grace period expiration",
3865+
)
3866+
},
3867+
),
3868+
]
3869+
}
3870+
3871+
@Test(arguments: TombstonesGCScenarios.testCases)
3872+
func tombstonesGCScenarios(testCase: TestCase<TombstonesGCScenarios.Context>) async throws {
3873+
guard !testCase.disabled else {
3874+
withKnownIssue {
3875+
Issue.record("Test case is disabled")
3876+
}
3877+
return
3878+
}
3879+
3880+
// Configure GC options with shorter intervals for testing
3881+
var options = testCase.options
3882+
options.garbageCollectionOptions = .init(
3883+
interval: 2.0, // JS uses 0.5s but I've found that, at least testing locally, this was not enough to compensate for the clock skew between my local clock and whatever was used to generate the tombstonedAt timestamps server-side.
3884+
gracePeriod: 0.25,
3885+
)
3886+
3887+
let objectsHelper = try await ObjectsHelper()
3888+
let client = try await realtimeWithObjects(options: options)
3889+
3890+
try await monitorConnectionThenCloseAndFinishAsync(client) {
3891+
let channel = client.channels.get(testCase.channelName, options: channelOptionsWithObjects())
3892+
let objects = channel.objects
3893+
3894+
try await channel.attachAsync()
3895+
let root = try await objects.getRoot()
3896+
3897+
// Helper function to wait for a specific number of GC cycles
3898+
let internallyTypedObjects = try #require(objects as? PublicDefaultRealtimeObjects)
3899+
let waitForGCCycles: @Sendable (Int) async -> Void = { cycles in
3900+
let gcEvents = internallyTypedObjects.testsOnly_proxied.testsOnly_completedGarbageCollectionEvents
3901+
3902+
var gcCalledTimes = 0
3903+
for await _ in gcEvents {
3904+
gcCalledTimes += 1
3905+
if gcCalledTimes >= cycles {
3906+
break
3907+
}
3908+
}
3909+
}
3910+
3911+
try await testCase.scenario.action(
3912+
.init(
3913+
root: root,
3914+
objectsHelper: objectsHelper,
3915+
channelName: testCase.channelName,
3916+
channel: channel,
3917+
objects: objects,
3918+
client: client,
3919+
waitForGCCycles: waitForGCCycles,
3920+
),
3921+
)
3922+
}
3923+
}
37063924
}
37073925

37083926
// swiftlint:enable trailing_closure

0 commit comments

Comments
 (0)