Skip to content

Commit 12aa415

Browse files
Implement subscriptions spec
Based on [1] at 2963300. pretty much all done by me, only a bit of dependency injection test updates were done by cursor for now don't test replaceData return value since not specified Have not implemented RTL04b1's channel mode checking for same reason as mentioned in 8d881e2. [1] ably/specification#346
1 parent 2c3bd97 commit 12aa415

17 files changed

Lines changed: 849 additions & 132 deletions

CONTRIBUTING.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@ To check formatting and code quality, run `swift run BuildTool lint`. Run with `
4444
### Throwing errors
4545

4646
- The public API of the SDK should use typed throws, and the thrown errors should be of type `ARTErrorInfo`.
47-
- `Dictionary.mapValues` does not support typed throws. We have our own extension `ablyLiveObjects_mapValuesWithTypedThrow` which does; use this.
47+
- Some platform methods do not support typed throws. In these cases, we have our own extension which does; use this instead. They are:
48+
- `Dictionary.mapValues`; use `ablyLiveObjects_mapValuesWithTypedThrow`.
49+
- `NSLock.withLock`; use `ablyLiveObjects_mapValuesWithTypedThrow`.
4850

4951
### Testing guidelines
5052

Sources/AblyLiveObjects/DefaultRealtimeObjects.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ internal final class DefaultRealtimeObjects: Sendable, LiveMapObjectPoolDelegate
138138
notYetImplemented()
139139
}
140140

141+
@discardableResult
141142
internal func on(event _: ObjectsEvent, callback _: ObjectsEventCallback) -> any OnObjectsEventResponse {
142143
notYetImplemented()
143144
}

Sources/AblyLiveObjects/Internal/DefaultLiveCounter.swift

Lines changed: 69 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -96,14 +96,29 @@ internal final class DefaultLiveCounter: Sendable {
9696
notYetImplemented()
9797
}
9898

99-
internal func subscribe(listener _: @escaping LiveObjectUpdateCallback<LiveCounterUpdate>) -> any SubscribeResponse {
100-
notYetImplemented()
99+
@discardableResult
100+
internal func subscribe(listener: @escaping LiveObjectUpdateCallback<DefaultLiveCounterUpdate>, coreSDK: CoreSDK) throws(ARTErrorInfo) -> any SubscribeResponse {
101+
try mutex.ablyLiveObjects_withLockWithTypedThrow { () throws(ARTErrorInfo) in
102+
// swiftlint:disable:next trailing_closure
103+
try mutableState.liveObject.subscribe(listener: listener, coreSDK: coreSDK, updateSelfLater: { [weak self] updater in
104+
guard let self else {
105+
return
106+
}
107+
108+
mutex.withLock {
109+
updater(&mutableState.liveObject)
110+
}
111+
})
112+
}
101113
}
102114

103115
internal func unsubscribeAll() {
104-
notYetImplemented()
116+
mutex.withLock {
117+
mutableState.liveObject.unsubscribeAll()
118+
}
105119
}
106120

121+
@discardableResult
107122
internal func on(event _: LiveObjectLifecycleEvent, callback _: @escaping LiveObjectLifecycleEventCallback) -> any OnLiveObjectLifecycleEventResponse {
108123
notYetImplemented()
109124
}
@@ -112,31 +127,42 @@ internal final class DefaultLiveCounter: Sendable {
112127
notYetImplemented()
113128
}
114129

130+
// MARK: - Emitting update from external sources
131+
132+
/// Emit an event from this `LiveCounter`.
133+
///
134+
/// This is used to instruct this counter to emit updates during an `OBJECT_SYNC`.
135+
internal func emit(_ update: LiveObjectUpdate<DefaultLiveCounterUpdate>) {
136+
mutex.withLock {
137+
mutableState.liveObject.emit(update, on: userCallbackQueue)
138+
}
139+
}
140+
115141
// MARK: - Data manipulation
116142

117143
/// Replaces the internal data of this counter with the provided ObjectState, per RTLC6.
118-
internal func replaceData(using state: ObjectState) {
144+
internal func replaceData(using state: ObjectState) -> LiveObjectUpdate<DefaultLiveCounterUpdate> {
119145
mutex.withLock {
120146
mutableState.replaceData(using: state)
121147
}
122148
}
123149

124150
/// Test-only method to merge initial value from an ObjectOperation, per RTLC10.
125-
internal func testsOnly_mergeInitialValue(from operation: ObjectOperation) {
151+
internal func testsOnly_mergeInitialValue(from operation: ObjectOperation) -> LiveObjectUpdate<DefaultLiveCounterUpdate> {
126152
mutex.withLock {
127153
mutableState.mergeInitialValue(from: operation)
128154
}
129155
}
130156

131157
/// Test-only method to apply a COUNTER_CREATE operation, per RTLC8.
132-
internal func testsOnly_applyCounterCreateOperation(_ operation: ObjectOperation) {
158+
internal func testsOnly_applyCounterCreateOperation(_ operation: ObjectOperation) -> LiveObjectUpdate<DefaultLiveCounterUpdate> {
133159
mutex.withLock {
134160
mutableState.applyCounterCreateOperation(operation, logger: logger)
135161
}
136162
}
137163

138164
/// Test-only method to apply a COUNTER_INC operation, per RTLC9.
139-
internal func testsOnly_applyCounterIncOperation(_ operation: WireObjectsCounterOp?) {
165+
internal func testsOnly_applyCounterIncOperation(_ operation: WireObjectsCounterOp?) -> LiveObjectUpdate<DefaultLiveCounterUpdate> {
140166
mutex.withLock {
141167
mutableState.applyCounterIncOperation(operation)
142168
}
@@ -156,6 +182,7 @@ internal final class DefaultLiveCounter: Sendable {
156182
objectMessageSiteCode: objectMessageSiteCode,
157183
objectsPool: &objectsPool,
158184
logger: logger,
185+
userCallbackQueue: userCallbackQueue,
159186
)
160187
}
161188
}
@@ -164,13 +191,13 @@ internal final class DefaultLiveCounter: Sendable {
164191

165192
private struct MutableState {
166193
/// The mutable state common to all LiveObjects.
167-
internal var liveObject: LiveObjectMutableState
194+
internal var liveObject: LiveObjectMutableState<DefaultLiveCounterUpdate>
168195

169196
/// The internal data that this map holds, per RTLC3.
170197
internal var data: Double
171198

172199
/// Replaces the internal data of this counter with the provided ObjectState, per RTLC6.
173-
internal mutating func replaceData(using state: ObjectState) {
200+
internal mutating func replaceData(using state: ObjectState) -> LiveObjectUpdate<DefaultLiveCounterUpdate> {
174201
// RTLC6a: Replace the private siteTimeserials with the value from ObjectState.siteTimeserials
175202
liveObject.siteTimeserials = state.siteTimeserials
176203

@@ -181,19 +208,32 @@ internal final class DefaultLiveCounter: Sendable {
181208
data = state.counter?.count?.doubleValue ?? 0
182209

183210
// RTLC6d: If ObjectState.createOp is present, merge the initial value into the LiveCounter as described in RTLC10
184-
if let createOp = state.createOp {
211+
return if let createOp = state.createOp {
185212
mergeInitialValue(from: createOp)
213+
} else {
214+
// TODO: I assume this is what to do, clarify in https://github.com/ably/specification/pull/346/files#r2201363446
215+
.noop
186216
}
187217
}
188218

189219
/// Merges the initial value from an ObjectOperation into this LiveCounter, per RTLC10.
190-
internal mutating func mergeInitialValue(from operation: ObjectOperation) {
220+
internal mutating func mergeInitialValue(from operation: ObjectOperation) -> LiveObjectUpdate<DefaultLiveCounterUpdate> {
221+
let update: LiveObjectUpdate<DefaultLiveCounterUpdate>
222+
191223
// RTLC10a: Add ObjectOperation.counter.count to data, if it exists
192224
if let operationCount = operation.counter?.count?.doubleValue {
193225
data += operationCount
226+
// RTLC10c
227+
update = .update(DefaultLiveCounterUpdate(amount: operationCount))
228+
} else {
229+
// RTLC10d
230+
update = .noop
194231
}
232+
195233
// RTLC10b: Set the private flag createOperationIsMerged to true
196234
liveObject.createOperationIsMerged = true
235+
236+
return update
197237
}
198238

199239
/// Attempts to apply an operation from an inbound `ObjectMessage`, per RTLC7.
@@ -203,6 +243,7 @@ internal final class DefaultLiveCounter: Sendable {
203243
objectMessageSiteCode: String?,
204244
objectsPool: inout ObjectsPool,
205245
logger: Logger,
246+
userCallbackQueue: DispatchQueue,
206247
) {
207248
guard let applicableOperation = liveObject.canApplyOperation(objectMessageSerial: objectMessageSerial, objectMessageSiteCode: objectMessageSiteCode, logger: logger) else {
208249
// RTLC7b
@@ -216,13 +257,17 @@ internal final class DefaultLiveCounter: Sendable {
216257
switch operation.action {
217258
case .known(.counterCreate):
218259
// RTLC7d1
219-
applyCounterCreateOperation(
260+
let update = applyCounterCreateOperation(
220261
operation,
221262
logger: logger,
222263
)
264+
// RTLC7d1a
265+
liveObject.emit(update, on: userCallbackQueue)
223266
case .known(.counterInc):
224267
// RTLC7d2
225-
applyCounterIncOperation(operation.counterOp)
268+
let update = applyCounterIncOperation(operation.counterOp)
269+
// RTLC7d2a
270+
liveObject.emit(update, on: userCallbackQueue)
226271
default:
227272
// RTLC7d3
228273
logger.log("Operation \(operation) has unsupported action for LiveCounter; discarding", level: .warn)
@@ -233,25 +278,28 @@ internal final class DefaultLiveCounter: Sendable {
233278
internal mutating func applyCounterCreateOperation(
234279
_ operation: ObjectOperation,
235280
logger: Logger,
236-
) {
281+
) -> LiveObjectUpdate<DefaultLiveCounterUpdate> {
237282
if liveObject.createOperationIsMerged {
238283
// RTLC8b
239284
logger.log("Not applying COUNTER_CREATE because a COUNTER_CREATE has already been applied", level: .warn)
240-
return
285+
return .noop
241286
}
242287

243-
// RTLC8c
244-
mergeInitialValue(from: operation)
288+
// RTLC8c, RTLC8e
289+
return mergeInitialValue(from: operation)
245290
}
246291

247292
/// Applies a `COUNTER_INC` operation, per RTLC9.
248-
internal mutating func applyCounterIncOperation(_ operation: WireObjectsCounterOp?) {
293+
internal mutating func applyCounterIncOperation(_ operation: WireObjectsCounterOp?) -> LiveObjectUpdate<DefaultLiveCounterUpdate> {
249294
guard let operation else {
250-
return
295+
// RTL9e
296+
return .noop
251297
}
252298

253-
// RTLC9b
254-
data += operation.amount.doubleValue
299+
// RTLC9b, RTLC9d
300+
let amount = operation.amount.doubleValue
301+
data += amount
302+
return .update(DefaultLiveCounterUpdate(amount: amount))
255303
}
256304
}
257305
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
internal struct DefaultLiveCounterUpdate: LiveCounterUpdate, Equatable {
2+
internal var amount: Double
3+
}

0 commit comments

Comments
 (0)