@@ -90,12 +90,25 @@ internal final class DefaultLiveCounter: Sendable {
9090 notYetImplemented ( )
9191 }
9292
93- internal func subscribe( listener _: @escaping ( sending any LiveCounterUpdate ) -> Void ) -> any SubscribeResponse {
94- notYetImplemented ( )
93+ internal func subscribe( listener: @escaping ( sending any LiveCounterUpdate ) -> Void ) -> any SubscribeResponse {
94+ mutex. withLock {
95+ // swiftlint:disable:next trailing_closure
96+ mutableState. liveObject. subscribe ( listener: listener, updateSelfLater: { [ weak self] updater in
97+ guard let self else {
98+ return
99+ }
100+
101+ mutex. withLock {
102+ updater ( & mutableState. liveObject)
103+ }
104+ } )
105+ }
95106 }
96107
97108 internal func unsubscribeAll( ) {
98- notYetImplemented ( )
109+ mutex. withLock {
110+ mutableState. liveObject. unsubscribeAll ( )
111+ }
99112 }
100113
101114 internal func on( event _: LiveObjectLifecycleEvent , callback _: @escaping ( ) -> Void ) -> any OnLiveObjectLifecycleEventResponse {
@@ -106,31 +119,42 @@ internal final class DefaultLiveCounter: Sendable {
106119 notYetImplemented ( )
107120 }
108121
122+ // MARK: - Emitting update from external sources
123+
124+ /// Emit an event from this `LiveCounter`.
125+ ///
126+ /// This is used to instruct this counter to emit updates during an `OBJECT_SYNC`.
127+ internal func emit( _ update: LiveObjectUpdate < LiveCounterUpdate > ) {
128+ mutex. withLock {
129+ mutableState. liveObject. emit ( update)
130+ }
131+ }
132+
109133 // MARK: - Data manipulation
110134
111135 /// Replaces the internal data of this counter with the provided ObjectState, per RTLC6.
112- internal func replaceData( using state: ObjectState ) {
136+ internal func replaceData( using state: ObjectState ) -> LiveObjectUpdate < LiveCounterUpdate > {
113137 mutex. withLock {
114138 mutableState. replaceData ( using: state)
115139 }
116140 }
117141
118142 /// Test-only method to merge initial value from an ObjectOperation, per RTLC10.
119- internal func testsOnly_mergeInitialValue( from operation: ObjectOperation ) {
143+ internal func testsOnly_mergeInitialValue( from operation: ObjectOperation ) -> LiveObjectUpdate < LiveCounterUpdate > {
120144 mutex. withLock {
121145 mutableState. mergeInitialValue ( from: operation)
122146 }
123147 }
124148
125149 /// Test-only method to apply a COUNTER_CREATE operation, per RTLC8.
126- internal func testsOnly_applyCounterCreateOperation( _ operation: ObjectOperation ) {
150+ internal func testsOnly_applyCounterCreateOperation( _ operation: ObjectOperation ) -> LiveObjectUpdate < LiveCounterUpdate > {
127151 mutex. withLock {
128152 mutableState. applyCounterCreateOperation ( operation, logger: logger)
129153 }
130154 }
131155
132156 /// Test-only method to apply a COUNTER_INC operation, per RTLC9.
133- internal func testsOnly_applyCounterIncOperation( _ operation: WireObjectsCounterOp ? ) {
157+ internal func testsOnly_applyCounterIncOperation( _ operation: WireObjectsCounterOp ? ) -> LiveObjectUpdate < LiveCounterUpdate > {
134158 mutex. withLock {
135159 mutableState. applyCounterIncOperation ( operation)
136160 }
@@ -158,13 +182,13 @@ internal final class DefaultLiveCounter: Sendable {
158182
159183 private struct MutableState {
160184 /// The mutable state common to all LiveObjects.
161- internal var liveObject : LiveObjectMutableState
185+ internal var liveObject : LiveObjectMutableState < LiveCounterUpdate >
162186
163187 /// The internal data that this map holds, per RTLC3.
164188 internal var data : Double
165189
166190 /// Replaces the internal data of this counter with the provided ObjectState, per RTLC6.
167- internal mutating func replaceData( using state: ObjectState ) {
191+ internal mutating func replaceData( using state: ObjectState ) -> LiveObjectUpdate < LiveCounterUpdate > {
168192 // RTLC6a: Replace the private siteTimeserials with the value from ObjectState.siteTimeserials
169193 liveObject. siteTimeserials = state. siteTimeserials
170194
@@ -175,19 +199,32 @@ internal final class DefaultLiveCounter: Sendable {
175199 data = state. counter? . count? . doubleValue ?? 0
176200
177201 // RTLC6d: If ObjectState.createOp is present, merge the initial value into the LiveCounter as described in RTLC10
178- if let createOp = state. createOp {
202+ return if let createOp = state. createOp {
179203 mergeInitialValue ( from: createOp)
204+ } else {
205+ // TODO: I assume this is what to do, clarify in https://github.com/ably/specification/pull/346/files#r2201363446
206+ . noop
180207 }
181208 }
182209
183210 /// Merges the initial value from an ObjectOperation into this LiveCounter, per RTLC10.
184- internal mutating func mergeInitialValue( from operation: ObjectOperation ) {
211+ internal mutating func mergeInitialValue( from operation: ObjectOperation ) -> LiveObjectUpdate < LiveCounterUpdate > {
212+ let update : LiveObjectUpdate < LiveCounterUpdate >
213+
185214 // RTLC10a: Add ObjectOperation.counter.count to data, if it exists
186215 if let operationCount = operation. counter? . count? . doubleValue {
187216 data += operationCount
217+ // RTLC10c
218+ update = . update( DefaultLiveCounterUpdate ( amount: operationCount) )
219+ } else {
220+ // RTLC10d
221+ update = . noop
188222 }
223+
189224 // RTLC10b: Set the private flag createOperationIsMerged to true
190225 liveObject. createOperationIsMerged = true
226+
227+ return update
191228 }
192229
193230 /// Attempts to apply an operation from an inbound `ObjectMessage`, per RTLC7.
@@ -210,13 +247,17 @@ internal final class DefaultLiveCounter: Sendable {
210247 switch operation. action {
211248 case . known( . counterCreate) :
212249 // RTLC7d1
213- applyCounterCreateOperation (
250+ let update = applyCounterCreateOperation (
214251 operation,
215252 logger: logger,
216253 )
254+ // RTLC7d1a
255+ liveObject. emit ( update)
217256 case . known( . counterInc) :
218257 // RTLC7d2
219- applyCounterIncOperation ( operation. counterOp)
258+ let update = applyCounterIncOperation ( operation. counterOp)
259+ // RTLC7d2a
260+ liveObject. emit ( update)
220261 default :
221262 // RTLC7d3
222263 logger. log ( " Operation \( operation) has unsupported action for LiveCounter; discarding " , level: . warn)
@@ -227,25 +268,28 @@ internal final class DefaultLiveCounter: Sendable {
227268 internal mutating func applyCounterCreateOperation(
228269 _ operation: ObjectOperation ,
229270 logger: Logger ,
230- ) {
271+ ) -> LiveObjectUpdate < LiveCounterUpdate > {
231272 if liveObject. createOperationIsMerged {
232273 // RTLC8b
233274 logger. log ( " Not applying COUNTER_CREATE because a COUNTER_CREATE has already been applied " , level: . warn)
234- return
275+ return . noop
235276 }
236277
237- // RTLC8c
238- mergeInitialValue ( from: operation)
278+ // RTLC8c, RTLC8e
279+ return mergeInitialValue ( from: operation)
239280 }
240281
241282 /// Applies a `COUNTER_INC` operation, per RTLC9.
242- internal mutating func applyCounterIncOperation( _ operation: WireObjectsCounterOp ? ) {
283+ internal mutating func applyCounterIncOperation( _ operation: WireObjectsCounterOp ? ) -> LiveObjectUpdate < LiveCounterUpdate > {
243284 guard let operation else {
244- return
285+ // RTL9e
286+ return . noop
245287 }
246288
247- // RTLC9b
248- data += operation. amount. doubleValue
289+ // RTLC9b, RTLC9d
290+ let amount = operation. amount. doubleValue
291+ data += amount
292+ return . update( DefaultLiveCounterUpdate ( amount: amount) )
249293 }
250294 }
251295}
0 commit comments