Skip to content

Commit cb427d8

Browse files
Implement the OBJECT_SYNC specification
Based on [1] at dd25dca. The development approach here was roughly the following: I decided the internal types and interfaces that I wanted to exist, and some of the implementation details (e.g. the mutations that DefaultRealtimeObjects performs during a sync sequence). I then asked Cursor to fill in the most of the implementation and tests (providing it with the relevant spec text). I have done some tweaking of the code that Cursor generated, but that doesn't mean that all of the code that's here is exactly the same as code that I would write, and I'm going to leave it that way. I have looked through all (and changed some) of the tests that Cursor generated, and in doing so also fixed the spec conformance tags (because it got those very wrong), so I am _fairly_ confident that the tests are testing what they claim to test. But I have not looked at them in minute detail (in particular, the exact data created by the canned data factories that it created). I also think that tests are one of those things where there's not always one right way to write them, so I'm not going to get hung up on "could it have done this test differently?" or "are the tests consistent with each other?" at the moment. There are some outstanding questions on the spec PR at the moment; have implemented to best of my understanding, and will update later once those are answered. Not yet implemented: - Checking of channel modes before performing operations (RTO2 and the points that refer to it); there's an outstanding question about this at [2] A note re the MutableState pattern that I've used here — done this so that the class can essentially have mutating instance methods which can call each other without having to worry about whose responsibility it is to acquire the mutex. [1] ably/specification#333 [2] https://github.com/ably/specification/pull/333/files#r2152297442
1 parent 3f6de86 commit cb427d8

16 files changed

Lines changed: 3484 additions & 12 deletions

Sources/AblyLiveObjects/DefaultRealtimeObjects.swift

Lines changed: 231 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ import Ably
22
internal import AblyPlugin
33

44
/// The class that provides the public API for interacting with LiveObjects, via the ``ARTRealtimeChannel/objects`` property.
5-
internal final class DefaultRealtimeObjects: RealtimeObjects {
5+
internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolDelegate {
66
// Used for synchronizing access to all of this instance's mutable state. This is a temporary solution just to allow us to implement `Sendable`, and we'll revisit it in https://github.com/ably/ably-cocoa-liveobjects-plugin/issues/3.
77
private let mutex = NSLock()
88

9+
private nonisolated(unsafe) var mutableState: MutableState!
10+
911
private let coreSDK: CoreSDK
1012
private let logger: AblyPlugin.Logger
1113

@@ -15,17 +17,98 @@ internal final class DefaultRealtimeObjects: RealtimeObjects {
1517
private let receivedObjectSyncProtocolMessages: AsyncStream<[InboundObjectMessage]>
1618
private let receivedObjectSyncProtocolMessagesContinuation: AsyncStream<[InboundObjectMessage]>.Continuation
1719

20+
internal var testsOnly_objectsPool: ObjectsPool {
21+
mutex.withLock {
22+
mutableState.objectsPool
23+
}
24+
}
25+
26+
/// If this returns false, it means that there is currently no stored sync sequence ID or SyncObjectsPool
27+
internal var testsOnly_hasSyncSequence: Bool {
28+
mutex.withLock {
29+
mutableState.syncSequence != nil
30+
}
31+
}
32+
33+
// These drive the testsOnly_waitingForSyncEvents property that informs the test suite when `getRoot()` is waiting for the object sync sequence to complete per RTO1c.
34+
private let waitingForSyncEvents: AsyncStream<Void>
35+
private let waitingForSyncEventsContinuation: AsyncStream<Void>.Continuation
36+
/// Emits an element whenever `getRoot()` starts waiting for the object sync sequence to complete per RTO1c.
37+
internal var testsOnly_waitingForSyncEvents: AsyncStream<Void> {
38+
waitingForSyncEvents
39+
}
40+
41+
/// Contains the data gathered during an `OBJECT_SYNC` sequence.
42+
private struct SyncSequence {
43+
/// The sync sequence ID, per RTO5a1.
44+
internal var id: String
45+
46+
/// The `ObjectMessage`s gathered during this sync sequence.
47+
internal var syncObjectsPool: [ObjectState]
48+
}
49+
50+
/// Tracks whether an object sync sequence has happened yet. This allows us to wait for a sync before returning from `getRoot()`, per RTO1c.
51+
private struct SyncStatus {
52+
private(set) var isSyncComplete = false
53+
private let syncCompletionEvents: AsyncStream<Void>
54+
private let syncCompletionContinuation: AsyncStream<Void>.Continuation
55+
56+
internal init() {
57+
(syncCompletionEvents, syncCompletionContinuation) = AsyncStream.makeStream()
58+
}
59+
60+
internal mutating func signalSyncComplete() {
61+
isSyncComplete = true
62+
syncCompletionContinuation.yield()
63+
}
64+
65+
internal func waitForSyncCompletion() async {
66+
await syncCompletionEvents.first { _ in true }
67+
}
68+
}
69+
1870
internal init(coreSDK: CoreSDK, logger: AblyPlugin.Logger) {
1971
self.coreSDK = coreSDK
2072
self.logger = logger
2173
(receivedObjectProtocolMessages, receivedObjectProtocolMessagesContinuation) = AsyncStream.makeStream()
2274
(receivedObjectSyncProtocolMessages, receivedObjectSyncProtocolMessagesContinuation) = AsyncStream.makeStream()
75+
(waitingForSyncEvents, waitingForSyncEventsContinuation) = AsyncStream.makeStream()
76+
mutableState = .init(objectsPool: .init(rootDelegate: self, rootCoreSDK: coreSDK))
77+
}
78+
79+
// MARK: - LiveMapObjectPoolDelegate
80+
81+
internal func getObjectFromPool(id: String) -> ObjectsPool.Entry? {
82+
mutex.withLock {
83+
mutableState.objectsPool.entries[id]
84+
}
2385
}
2486

2587
// MARK: `RealtimeObjects` protocol
2688

2789
internal func getRoot() async throws(ARTErrorInfo) -> any LiveMap {
28-
notYetImplemented()
90+
// RTO1b: If the channel is in the DETACHED or FAILED state, the library should indicate an error with code 90001
91+
let currentChannelState = coreSDK.channelState
92+
if currentChannelState == .detached || currentChannelState == .failed {
93+
throw ARTErrorInfo.create(withCode: Int(ARTErrorCode.channelOperationFailedInvalidState.rawValue), message: "getRoot operation failed (invalid channel state: \(currentChannelState))")
94+
}
95+
96+
let syncStatus = mutex.withLock {
97+
mutableState.syncStatus
98+
}
99+
100+
if !syncStatus.isSyncComplete {
101+
// RTO1c
102+
waitingForSyncEventsContinuation.yield()
103+
logger.log("getRoot started waiting for sync sequence to complete", level: .debug)
104+
await syncStatus.waitForSyncCompletion()
105+
logger.log("getRoot completed waiting for sync sequence to complete", level: .debug)
106+
}
107+
108+
return mutex.withLock {
109+
// RTO1d
110+
mutableState.objectsPool.root
111+
}
29112
}
30113

31114
internal func createMap(entries _: [String: LiveMapValue]) async throws(ARTErrorInfo) -> any LiveMap {
@@ -58,16 +141,20 @@ internal final class DefaultRealtimeObjects: RealtimeObjects {
58141

59142
// MARK: Handling channel events
60143

61-
private nonisolated(unsafe) var onChannelAttachedHasObjects: Bool?
62144
internal var testsOnly_onChannelAttachedHasObjects: Bool? {
63145
mutex.withLock {
64-
onChannelAttachedHasObjects
146+
mutableState.onChannelAttachedHasObjects
65147
}
66148
}
67149

68150
internal func onChannelAttached(hasObjects: Bool) {
69151
mutex.withLock {
70-
onChannelAttachedHasObjects = hasObjects
152+
mutableState.onChannelAttached(
153+
hasObjects: hasObjects,
154+
logger: logger,
155+
mapDelegate: self,
156+
coreSDK: coreSDK,
157+
)
71158
}
72159
}
73160

@@ -83,8 +170,27 @@ internal final class DefaultRealtimeObjects: RealtimeObjects {
83170
receivedObjectSyncProtocolMessages
84171
}
85172

86-
internal func handleObjectSyncProtocolMessage(objectMessages: [InboundObjectMessage], protocolMessageChannelSerial _: String?) {
87-
receivedObjectSyncProtocolMessagesContinuation.yield(objectMessages)
173+
/// Implements the `OBJECT_SYNC` handling of RTO5.
174+
internal func handleObjectSyncProtocolMessage(objectMessages: [InboundObjectMessage], protocolMessageChannelSerial: String?) {
175+
mutex.withLock {
176+
mutableState.handleObjectSyncProtocolMessage(
177+
objectMessages: objectMessages,
178+
protocolMessageChannelSerial: protocolMessageChannelSerial,
179+
logger: logger,
180+
receivedObjectSyncProtocolMessagesContinuation: receivedObjectSyncProtocolMessagesContinuation,
181+
mapDelegate: self,
182+
coreSDK: coreSDK,
183+
)
184+
}
185+
}
186+
187+
/// Creates a zero-value LiveObject in the object pool for this object ID.
188+
///
189+
/// Intended as a way for tests to populate the object pool.
190+
internal func testsOnly_createZeroValueLiveObject(forObjectID objectID: String, coreSDK: CoreSDK) -> ObjectsPool.Entry? {
191+
mutex.withLock {
192+
mutableState.objectsPool.createZeroValueObject(forObjectID: objectID, mapDelegate: self, coreSDK: coreSDK)
193+
}
88194
}
89195

90196
// MARK: - Sending `OBJECT` ProtocolMessage
@@ -93,4 +199,122 @@ internal final class DefaultRealtimeObjects: RealtimeObjects {
93199
internal func testsOnly_sendObject(objectMessages: [OutboundObjectMessage]) async throws(InternalError) {
94200
try await coreSDK.sendObject(objectMessages: objectMessages)
95201
}
202+
203+
// MARK: - Testing
204+
205+
/// Finishes the following streams, to allow a test to perform assertions about which elements the streams have emitted to this moment:
206+
///
207+
/// - testsOnly_receivedObjectProtocolMessages
208+
/// - testsOnly_receivedObjectStateProtocolMessages
209+
/// - testsOnly_waitingForSyncEvents
210+
internal func testsOnly_finishAllTestHelperStreams() {
211+
receivedObjectProtocolMessagesContinuation.finish()
212+
receivedObjectSyncProtocolMessagesContinuation.finish()
213+
waitingForSyncEventsContinuation.finish()
214+
}
215+
216+
// MARK: - Mutable state and the operations that affect it
217+
218+
private struct MutableState {
219+
internal var objectsPool: ObjectsPool
220+
internal var syncSequence: SyncSequence?
221+
internal var syncStatus = SyncStatus()
222+
internal var onChannelAttachedHasObjects: Bool?
223+
224+
internal mutating func onChannelAttached(
225+
hasObjects: Bool,
226+
logger: Logger,
227+
mapDelegate: LiveMapObjectPoolDelegate,
228+
coreSDK: CoreSDK,
229+
) {
230+
logger.log("onChannelAttached(hasObjects: \(hasObjects)", level: .debug)
231+
232+
onChannelAttachedHasObjects = hasObjects
233+
234+
// We only care about the case where HAS_OBJECTS is not set (RTO4b); if it is set then we're going to shortly receive an OBJECT_SYNC instead (RTO4a)
235+
guard !hasObjects else {
236+
return
237+
}
238+
239+
// RTO4b1, RTO4b2: Reset the ObjectsPool to have a single empty root object
240+
// TODO: this one is unclear (are we meant to replace the root or just clear its data?) https://github.com/ably/specification/pull/333/files#r2183493458
241+
objectsPool = .init(rootDelegate: mapDelegate, rootCoreSDK: coreSDK)
242+
243+
// I have, for now, not directly implemented the "perform the actions for object sync completion" of RTO4b4 since my implementation doesn't quite match the model given there; here you only have a SyncObjectsPool if you have an OBJECT_SYNC in progress, which you might not have upon receiving an ATTACHED. Instead I've just implemented what seem like the relevant side effects. Can revisit this if "the actions for object sync completion" get more complex.
244+
245+
// RTO4b3, RTO4b4, RTO5c3, RTO5c4
246+
syncSequence = nil
247+
syncStatus.signalSyncComplete()
248+
}
249+
250+
/// Implements the `OBJECT_SYNC` handling of RTO5.
251+
internal mutating func handleObjectSyncProtocolMessage(
252+
objectMessages: [InboundObjectMessage],
253+
protocolMessageChannelSerial: String?,
254+
logger: Logger,
255+
receivedObjectSyncProtocolMessagesContinuation: AsyncStream<[InboundObjectMessage]>.Continuation,
256+
mapDelegate: LiveMapObjectPoolDelegate,
257+
coreSDK: CoreSDK,
258+
) {
259+
logger.log("handleObjectSyncProtocolMessage(objectMessages: \(objectMessages), protocolMessageChannelSerial: \(String(describing: protocolMessageChannelSerial)))", level: .debug)
260+
261+
receivedObjectSyncProtocolMessagesContinuation.yield(objectMessages)
262+
263+
// If populated, this contains a full set of sync data for the channel, and should be applied to the ObjectsPool.
264+
let completedSyncObjectsPool: [ObjectState]?
265+
266+
if let protocolMessageChannelSerial {
267+
let syncCursor: SyncCursor
268+
do {
269+
// RTO5a
270+
syncCursor = try SyncCursor(channelSerial: protocolMessageChannelSerial)
271+
} catch {
272+
logger.log("Failed to parse sync cursor: \(error)", level: .error)
273+
return
274+
}
275+
276+
// Figure out whether to continue any existing sync sequence or start a new one
277+
var updatedSyncSequence: SyncSequence = if let syncSequence {
278+
if syncCursor.sequenceID == syncSequence.id {
279+
// RTO5a3: Continue existing sync sequence
280+
syncSequence
281+
} else {
282+
// RTO5a2: new sequence started, discard previous
283+
.init(id: syncCursor.sequenceID, syncObjectsPool: [])
284+
}
285+
} else {
286+
// There's no current sync sequence; start one
287+
.init(id: syncCursor.sequenceID, syncObjectsPool: [])
288+
}
289+
290+
// RTO5b
291+
updatedSyncSequence.syncObjectsPool.append(contentsOf: objectMessages.compactMap(\.object))
292+
293+
syncSequence = updatedSyncSequence
294+
295+
completedSyncObjectsPool = if syncCursor.isEndOfSequence {
296+
updatedSyncSequence.syncObjectsPool
297+
} else {
298+
nil
299+
}
300+
} else {
301+
// RTO5a5: The sync data is contained entirely within this single OBJECT_SYNC
302+
completedSyncObjectsPool = objectMessages.compactMap(\.object)
303+
}
304+
305+
if let completedSyncObjectsPool {
306+
// RTO5c
307+
objectsPool.applySyncObjectsPool(
308+
completedSyncObjectsPool,
309+
mapDelegate: mapDelegate,
310+
coreSDK: coreSDK,
311+
logger: logger,
312+
)
313+
// RTO5c3, RTO5c4
314+
syncSequence = nil
315+
316+
syncStatus.signalSyncComplete()
317+
}
318+
}
319+
}
96320
}

Sources/AblyLiveObjects/Internal/CoreSDK.swift

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ internal import AblyPlugin
66
/// This provides us with a mockable interface to ably-cocoa, and it also allows internal components and their tests not to need to worry about some of the boring details of how we bridge Swift types to AblyPlugin's Objective-C API (i.e. boxing).
77
internal protocol CoreSDK: AnyObject, Sendable {
88
func sendObject(objectMessages: [OutboundObjectMessage]) async throws(InternalError)
9+
10+
/// Returns the current state of the Realtime channel that this wraps.
11+
var channelState: ARTRealtimeChannelState { get }
912
}
1013

1114
internal final class DefaultCoreSDK: CoreSDK {
@@ -41,4 +44,8 @@ internal final class DefaultCoreSDK: CoreSDK {
4144
pluginAPI: pluginAPI,
4245
)
4346
}
47+
48+
internal var channelState: ARTRealtimeChannelState {
49+
channel.state
50+
}
4451
}

0 commit comments

Comments
 (0)