Skip to content

Commit 4494033

Browse files
Implement the OBJECT_SYNC specification
Based on [1] at dd25dca. The development approach here was roughly the following: I decided the internal types and interfaces that I wanted to exist, and some of the implementation details (e.g. the mutations that DefaultRealtimeObjects performs during a sync sequence). I then asked Cursor to fill in the most of the implementation and tests (providing it with the relevant spec text). I have done some tweaking of the code that Cursor generated, but that doesn't mean that all of the code that's here is exactly the same as code that I would write, and I'm going to leave it that way. I have looked through all (and changed some) of the tests that Cursor generated, and in doing so also fixed the spec conformance tags (because it got those very wrong), so I am _fairly_ confident that the tests are testing what they claim to test. But I have not looked at them in minute detail (in particular, the exact data created by the canned data factories that it created). I also think that tests are one of those things where there's not always one right way to write them, so I'm not going to get hung up on "could it have done this test differently?" or "are the tests consistent with each other?" at the moment. There are some outstanding questions on the spec PR at the moment; have implemented to best of my understanding, and will update later once those are answered. Not yet implemented: - Checking of channel modes before performing operations (RTO2 and the points that refer to it); there's an outstanding question about this at [2] [1] ably/specification#333 [2] https://github.com/ably/specification/pull/333/files#r2152297442
1 parent 3f6de86 commit 4494033

16 files changed

Lines changed: 3440 additions & 9 deletions

Sources/AblyLiveObjects/DefaultRealtimeObjects.swift

Lines changed: 187 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import Ably
22
internal import AblyPlugin
33

44
/// The class that provides the public API for interacting with LiveObjects, via the ``ARTRealtimeChannel/objects`` property.
5-
internal final class DefaultRealtimeObjects: RealtimeObjects {
5+
internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolDelegate {
66
// Used for synchronizing access to all of this instance's mutable state. This is a temporary solution just to allow us to implement `Sendable`, and we'll revisit it in https://github.com/ably/ably-cocoa-liveobjects-plugin/issues/3.
77
private let mutex = NSLock()
88

@@ -15,17 +15,91 @@ internal final class DefaultRealtimeObjects: RealtimeObjects {
1515
private let receivedObjectSyncProtocolMessages: AsyncStream<[InboundObjectMessage]>
1616
private let receivedObjectSyncProtocolMessagesContinuation: AsyncStream<[InboundObjectMessage]>.Continuation
1717

18+
private nonisolated(unsafe) var objectsPool: ObjectsPool!
19+
internal var testsOnly_objectsPool: ObjectsPool {
20+
mutex.withLock {
21+
objectsPool
22+
}
23+
}
24+
25+
private nonisolated(unsafe) var syncSequence: SyncSequence?
26+
/// If this returns false, it means that there is currently no stored sync sequence ID or SyncObjectsPool
27+
internal var testsOnly_hasSyncSequence: Bool {
28+
mutex.withLock {
29+
syncSequence != nil
30+
}
31+
}
32+
33+
private nonisolated(unsafe) var syncStatus = SyncStatus()
34+
// These drive the testsOnly_waitingForSyncEvents property that informs the test suite when `getRoot()` is waiting for the object sync sequence to complete per RTO1c.
35+
private let waitingForSyncEvents: AsyncStream<Void>
36+
private let waitingForSyncEventsContinuation: AsyncStream<Void>.Continuation
37+
/// Emits an element whenever `getRoot()` starts waiting for the object sync sequence to complete per RTO1c.
38+
internal var testsOnly_waitingForSyncEvents: AsyncStream<Void> {
39+
waitingForSyncEvents
40+
}
41+
42+
/// Tracks whether an object sync sequence has happened yet. This allows us to wait for a sync before returning from `getRoot()`, per RTO1c.
43+
private struct SyncStatus {
44+
private(set) var isSyncComplete = false
45+
private let syncCompletionEvents: AsyncStream<Void>
46+
private let syncCompletionContinuation: AsyncStream<Void>.Continuation
47+
48+
internal init() {
49+
(syncCompletionEvents, syncCompletionContinuation) = AsyncStream.makeStream()
50+
}
51+
52+
internal mutating func signalSyncComplete() {
53+
isSyncComplete = true
54+
syncCompletionContinuation.yield()
55+
}
56+
57+
internal func waitForSyncCompletion() async {
58+
await syncCompletionEvents.first { _ in true }
59+
}
60+
}
61+
1862
internal init(coreSDK: CoreSDK, logger: AblyPlugin.Logger) {
1963
self.coreSDK = coreSDK
2064
self.logger = logger
2165
(receivedObjectProtocolMessages, receivedObjectProtocolMessagesContinuation) = AsyncStream.makeStream()
2266
(receivedObjectSyncProtocolMessages, receivedObjectSyncProtocolMessagesContinuation) = AsyncStream.makeStream()
67+
(waitingForSyncEvents, waitingForSyncEventsContinuation) = AsyncStream.makeStream()
68+
objectsPool = .init(rootDelegate: self, rootCoreSDK: coreSDK)
69+
}
70+
71+
// MARK: - LiveMapObjectPoolDelegate
72+
73+
internal func getObjectFromPool(id: String) -> ObjectsPool.Entry? {
74+
mutex.withLock {
75+
objectsPool.entries[id]
76+
}
2377
}
2478

2579
// MARK: `RealtimeObjects` protocol
2680

2781
internal func getRoot() async throws(ARTErrorInfo) -> any LiveMap {
28-
notYetImplemented()
82+
// RTO1b: If the channel is in the DETACHED or FAILED state, the library should indicate an error with code 90001
83+
let currentChannelState = coreSDK.channelState
84+
if currentChannelState == .detached || currentChannelState == .failed {
85+
throw ARTErrorInfo.create(withCode: Int(ARTErrorCode.channelOperationFailedInvalidState.rawValue), message: "getRoot operation failed (invalid channel state: \(currentChannelState))")
86+
}
87+
88+
let isSyncComplete = mutex.withLock {
89+
syncStatus.isSyncComplete
90+
}
91+
if !isSyncComplete {
92+
// RTO1c
93+
waitingForSyncEventsContinuation.yield()
94+
logger.log("getRoot started waiting for sync sequence to complete", level: .debug)
95+
await syncStatus.waitForSyncCompletion()
96+
logger.log("getRoot completed waiting for sync sequence to complete", level: .debug)
97+
}
98+
99+
return mutex.withLock {
100+
// RTO1d
101+
objectsPool.root
102+
}
29103
}
30104

31105
internal func createMap(entries _: [String: LiveMapValue]) async throws(ARTErrorInfo) -> any LiveMap {
@@ -67,7 +141,24 @@ internal final class DefaultRealtimeObjects: RealtimeObjects {
67141

68142
internal func onChannelAttached(hasObjects: Bool) {
69143
mutex.withLock {
144+
logger.log("onChannelAttached(hasObjects: \(hasObjects)", level: .debug)
145+
70146
onChannelAttachedHasObjects = hasObjects
147+
148+
// We only care about the case where HAS_OBJECTS is not set (RTO4b); if it is set then we're going to shortly receive an OBJECT_SYNC instead (RTO4a)
149+
guard !hasObjects else {
150+
return
151+
}
152+
153+
// RTO4b1, RTO4b2: Reset the ObjectsPool to have a single empty root object
154+
// TODO: this one is unclear (are we meant to replace the root or just clear its data?) https://github.com/ably/specification/pull/333/files#r2183493458
155+
objectsPool = .init(rootDelegate: self, rootCoreSDK: coreSDK)
156+
157+
// I have, for now, not directly implemented the "perform the actions for object sync completion" of RTO4b4 since my implementation doesn't quite match the model given there; here you only have a SyncObjectsPool if you have an OBJECT_SYNC in progress, which you might not have upon receiving an ATTACHED. Instead I've just implemented what seem like the relevant side effects. Can revisit this if "the actions for object sync completion" get more complex.
158+
159+
// RTO4b3, RTO4b4, RTO5c3, RTO5c4
160+
syncSequence = nil
161+
syncStatus.signalSyncComplete()
71162
}
72163
}
73164

@@ -83,8 +174,87 @@ internal final class DefaultRealtimeObjects: RealtimeObjects {
83174
receivedObjectSyncProtocolMessages
84175
}
85176

86-
internal func handleObjectSyncProtocolMessage(objectMessages: [InboundObjectMessage], protocolMessageChannelSerial _: String?) {
87-
receivedObjectSyncProtocolMessagesContinuation.yield(objectMessages)
177+
/// Contains the data gathered during an `OBJECT_SYNC` sequence.
178+
private struct SyncSequence {
179+
/// The sync sequence ID, per RTO5a1.
180+
internal var id: String
181+
182+
/// The `ObjectMessage`s gathered during this sync sequence.
183+
internal var syncObjectsPool: [ObjectState]
184+
}
185+
186+
/// Implements the `OBJECT_SYNC` handling of RTO5.
187+
internal func handleObjectSyncProtocolMessage(objectMessages: [InboundObjectMessage], protocolMessageChannelSerial: String?) {
188+
mutex.withLock {
189+
logger.log("handleObjectSyncProtocolMessage(objectMessages: \(objectMessages), protocolMessageChannelSerial: \(String(describing: protocolMessageChannelSerial)))", level: .debug)
190+
191+
receivedObjectSyncProtocolMessagesContinuation.yield(objectMessages)
192+
193+
// If populated, this contains a full set of sync data for the channel, and should be applied to the ObjectsPool.
194+
let completedSyncObjectsPool: [ObjectState]?
195+
196+
if let protocolMessageChannelSerial {
197+
let syncCursor: SyncCursor
198+
do {
199+
// RTO5a
200+
syncCursor = try SyncCursor(channelSerial: protocolMessageChannelSerial)
201+
} catch {
202+
logger.log("Failed to parse sync cursor: \(error)", level: .error)
203+
return
204+
}
205+
206+
// Figure out whether to continue any existing sync sequence or start a new one
207+
var updatedSyncSequence: SyncSequence = if let syncSequence {
208+
if syncCursor.sequenceID == syncSequence.id {
209+
// RTO5a3: Continue existing sync sequence
210+
syncSequence
211+
} else {
212+
// RTO5a2: new sequence started, discard previous
213+
.init(id: syncCursor.sequenceID, syncObjectsPool: [])
214+
}
215+
} else {
216+
// There's no current sync sequence; start one
217+
.init(id: syncCursor.sequenceID, syncObjectsPool: [])
218+
}
219+
220+
// RTO5b
221+
updatedSyncSequence.syncObjectsPool.append(contentsOf: objectMessages.compactMap(\.object))
222+
223+
syncSequence = updatedSyncSequence
224+
225+
completedSyncObjectsPool = if syncCursor.isEndOfSequence {
226+
updatedSyncSequence.syncObjectsPool
227+
} else {
228+
nil
229+
}
230+
} else {
231+
// RTO5a5: The sync data is contained entirely within this single OBJECT_SYNC
232+
completedSyncObjectsPool = objectMessages.compactMap(\.object)
233+
}
234+
235+
if let completedSyncObjectsPool {
236+
// RTO5c
237+
objectsPool.applySyncObjectsPool(
238+
completedSyncObjectsPool,
239+
mapDelegate: self,
240+
coreSDK: coreSDK,
241+
logger: logger,
242+
)
243+
// RTO5c3, RTO5c4
244+
syncSequence = nil
245+
246+
syncStatus.signalSyncComplete()
247+
}
248+
}
249+
}
250+
251+
/// Creates a zero-value LiveObject in the object pool for this object ID.
252+
///
253+
/// Intended as a way for tests to populate the object pool.
254+
internal func testsOnly_createZeroValueLiveObject(forObjectID objectID: String, coreSDK: CoreSDK) -> ObjectsPool.Entry? {
255+
mutex.withLock {
256+
objectsPool.createZeroValueObject(forObjectID: objectID, mapDelegate: self, coreSDK: coreSDK)
257+
}
88258
}
89259

90260
// MARK: - Sending `OBJECT` ProtocolMessage
@@ -93,4 +263,17 @@ internal final class DefaultRealtimeObjects: RealtimeObjects {
93263
internal func testsOnly_sendObject(objectMessages: [OutboundObjectMessage]) async throws(InternalError) {
94264
try await coreSDK.sendObject(objectMessages: objectMessages)
95265
}
266+
267+
// MARK: - Testing
268+
269+
/// Finishes the following streams, to allow a test to perform assertions about which elements the streams have emitted to this moment:
270+
///
271+
/// - testsOnly_receivedObjectProtocolMessages
272+
/// - testsOnly_receivedObjectStateProtocolMessages
273+
/// - testsOnly_waitingForSyncEvents
274+
internal func testsOnly_finishAllTestHelperStreams() {
275+
receivedObjectProtocolMessagesContinuation.finish()
276+
receivedObjectSyncProtocolMessagesContinuation.finish()
277+
waitingForSyncEventsContinuation.finish()
278+
}
96279
}

Sources/AblyLiveObjects/Internal/CoreSDK.swift

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ internal import AblyPlugin
66
/// This provides us with a mockable interface to ably-cocoa, and it also allows internal components and their tests not to need to worry about some of the boring details of how we bridge Swift types to AblyPlugin's Objective-C API (i.e. boxing).
77
internal protocol CoreSDK: AnyObject, Sendable {
88
func sendObject(objectMessages: [OutboundObjectMessage]) async throws(InternalError)
9+
10+
/// Returns the current state of the Realtime channel that this wraps.
11+
var channelState: ARTRealtimeChannelState { get }
912
}
1013

1114
internal final class DefaultCoreSDK: CoreSDK {
@@ -41,4 +44,8 @@ internal final class DefaultCoreSDK: CoreSDK {
4144
pluginAPI: pluginAPI,
4245
)
4346
}
47+
48+
internal var channelState: ARTRealtimeChannelState {
49+
channel.state
50+
}
4451
}

Sources/AblyLiveObjects/Internal/DefaultLiveCounter.swift

Lines changed: 117 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,80 @@ import Foundation
33

44
/// Our default implementation of ``LiveCounter``.
55
internal final class DefaultLiveCounter: LiveCounter {
6-
internal init() {}
6+
// Used for synchronizing access to all of this instance's mutable state. This is a temporary solution just to allow us to implement `Sendable`, and we'll revisit it in https://github.com/ably/ably-cocoa-liveobjects-plugin/issues/3.
7+
private let mutex = NSLock()
8+
9+
private nonisolated(unsafe) var mutableState: MutableState
10+
11+
internal var testsOnly_siteTimeserials: [String: String]? {
12+
mutex.withLock {
13+
mutableState.siteTimeserials
14+
}
15+
}
16+
17+
internal var testsOnly_createOperationIsMerged: Bool? {
18+
mutex.withLock {
19+
mutableState.createOperationIsMerged
20+
}
21+
}
22+
23+
internal var testsOnly_objectID: String? {
24+
mutex.withLock {
25+
mutableState.objectID
26+
}
27+
}
28+
29+
private let coreSDK: CoreSDK
30+
31+
// MARK: - Initialization
32+
33+
internal convenience init(
34+
testsOnly_data data: Double,
35+
objectID: String?,
36+
coreSDK: CoreSDK
37+
) {
38+
self.init(data: data, objectID: objectID, coreSDK: coreSDK)
39+
}
40+
41+
private init(
42+
data: Double,
43+
objectID: String?,
44+
coreSDK: CoreSDK
45+
) {
46+
mutableState = .init(data: data, objectID: objectID)
47+
self.coreSDK = coreSDK
48+
}
49+
50+
/// Creates a "zero-value LiveCounter", per RTLC4.
51+
///
52+
/// - Parameters:
53+
/// - objectID: The value for the "private objectId field" of RTO5c1b1a.
54+
internal static func createZeroValued(
55+
objectID: String? = nil,
56+
coreSDK: CoreSDK,
57+
) -> Self {
58+
.init(
59+
data: 0,
60+
objectID: objectID,
61+
coreSDK: coreSDK,
62+
)
63+
}
764

865
// MARK: - LiveCounter conformance
966

1067
internal var value: Double {
11-
notYetImplemented()
68+
get throws(ARTErrorInfo) {
69+
// RTLC5b: If the channel is in the DETACHED or FAILED state, the library should indicate an error with code 90001
70+
let currentChannelState = coreSDK.channelState
71+
if currentChannelState == .detached || currentChannelState == .failed {
72+
throw ARTErrorInfo.create(withCode: Int(ARTErrorCode.channelOperationFailedInvalidState.rawValue), message: "LiveCounter.value operation failed (invalid channel state: \(currentChannelState))")
73+
}
74+
75+
return mutex.withLock {
76+
// RTLC5c
77+
mutableState.data
78+
}
79+
}
1280
}
1381

1482
internal func increment(amount _: Double) async throws(ARTErrorInfo) {
@@ -34,4 +102,51 @@ internal final class DefaultLiveCounter: LiveCounter {
34102
internal func offAll() {
35103
notYetImplemented()
36104
}
105+
106+
// MARK: - Data manipulation
107+
108+
/// Replaces the internal data of this counter with the provided ObjectState, per RTLC6.
109+
internal func replaceData(using state: ObjectState) {
110+
mutex.withLock {
111+
mutableState.replaceData(using: state)
112+
}
113+
}
114+
115+
// MARK: - Mutable state and the operations that affect it
116+
117+
private struct MutableState {
118+
/// The internal data that this map holds, per RTLC3.
119+
internal var data: Double
120+
121+
/// The site timeserials for this counter, per RTLC6a.
122+
internal var siteTimeserials: [String: String]?
123+
124+
/// Whether the create operation has been merged, per RTLC6b and RTLC6d2.
125+
internal var createOperationIsMerged: Bool?
126+
127+
/// The "private `objectId` field" of RTO5c1b1a.
128+
internal var objectID: String?
129+
130+
/// Replaces the internal data of this counter with the provided ObjectState, per RTLC6.
131+
internal mutating func replaceData(using state: ObjectState) {
132+
// RTLC6a: Replace the private siteTimeserials with the value from ObjectState.siteTimeserials
133+
siteTimeserials = state.siteTimeserials
134+
135+
// RTLC6b: Set the private flag createOperationIsMerged to false
136+
createOperationIsMerged = false
137+
138+
// RTLC6c: Set data to the value of ObjectState.counter.count, or to 0 if it does not exist
139+
data = state.counter?.count?.doubleValue ?? 0
140+
141+
// RTLC6d: If ObjectState.createOp is present
142+
if let createOp = state.createOp {
143+
// RTLC6d1: Add ObjectState.createOp.counter.count to data, if it exists
144+
if let createOpCount = createOp.counter?.count?.doubleValue {
145+
data += createOpCount
146+
}
147+
// RTLC6d2: Set the private flag createOperationIsMerged to true
148+
createOperationIsMerged = true
149+
}
150+
}
151+
}
37152
}

0 commit comments

Comments
 (0)