Skip to content

Commit 1e518c6

Browse files
committed
Subscriptions for PathObject and Instance
1 parent 907ed5a commit 1e518c6

2 files changed

Lines changed: 83 additions & 8 deletions

File tree

textile/features.textile

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2126,6 +2126,12 @@ h4. ReferenceExtras
21262126
*** @(REX2b1)@ Should be written in reverse domain name notation
21272127
*** @(REX2b2)@ Types beginning with @com.ably.@ are reserved
21282128

2129+
h4. Subscription
2130+
2131+
* @(SUB1)@ A @Subscription@ represents a registration for receiving events from a subscribe operation
2132+
* @(SUB2)@ The @Subscription@ object has the following method:
2133+
** @(SUB2a)@ @unsubscribe@ - deregisters the listener that was registered by the corresponding @subscribe@ call. Once @unsubscribe@ called, the listener must not be called for any subsequent events
2134+
21292135
h3(#options). Option types
21302136

21312137
h4. ClientOptions
@@ -3162,6 +3168,9 @@ class MessageVersion:
31623168
clientId: string? // TM2s3
31633169
description: string? // TM2s4
31643170
metadata: Dict<string, string>? //TM2s5
3171+
3172+
interface Subscription: // SUB*
3173+
unsubscribe() // SUB2a
31653174
</pre>
31663175

31673176
h2(#old-specs). Old specs

textile/objects-features.textile

Lines changed: 74 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,14 @@ h3(#realtime-objects). RealtimeObject
290290
* @(RTO22)@ @ObjectsOperationSource@ is an internal enum describing the source of an operation being applied:
291291
** @(RTO22a)@ @LOCAL@ - an operation that originated locally, being applied upon receipt of the @ACK@ from Realtime
292292
** @(RTO22b)@ @CHANNEL@ - an operation received over a Realtime channel
293+
* @(RTO24)@ Internal @PathObjectSubscriptionRegister@ - manages path-based subscriptions for @PathObject#subscribe@ ("RTPO19":#RTPO19)
294+
** @(RTO24a)@ The @RealtimeObject@ instance maintains a single @PathObjectSubscriptionRegister@ that manages all path-based subscriptions for the channel
295+
** @(RTO24b)@ When a @LiveObject@ in the @ObjectsPool@ emits a @LiveObjectUpdate@ (per "RTLO4b4":#RTLO4b4), the @PathObjectSubscriptionRegister@ must determine which subscriptions should be notified:
296+
*** @(RTO24b1)@ Determine the paths in the LiveObjects tree at which the updated @LiveObject@ is located
297+
*** @(RTO24b2)@ For each registered subscription, check whether the event path starts with (or equals) the subscription's path
298+
*** @(RTO24b3)@ If the event path matches, apply depth filtering: the event is dispatched to the subscription if the number of path segments from the subscription path to the event path plus 1 does not exceed the subscription's @depth@ option (or if @depth@ is undefined). Formally, the event is dispatched if @eventPath.length - subscriptionPath.length + 1 <= depth@
299+
*** @(RTO24b4)@ Create a @PathObjectSubscriptionEvent@ with a @PathObject@ pointing to the event path and the @ObjectMessage@ that caused the change, and call the subscription's listener
300+
*** @(RTO24b5)@ If a listener throws an error, the error must be caught and logged without affecting the dispatch to other subscriptions
293301

294302
h3(#liveobject). LiveObject
295303

@@ -317,9 +325,10 @@ h3(#liveobject). LiveObject
317325
**** @(RTLO4b4c)@ When a @LiveObjectUpdate@ is emitted:
318326
***** @(RTLO4b4c1)@ If @LiveObjectUpdate@ is indicated to be a no-op, do nothing
319327
***** @(RTLO4b4c2)@ Otherwise, the registered listener is called with the @LiveObjectUpdate@ object
320-
*** @(RTLO4b5)@ The client library may return a subscription object (or the idiomatic equivalent for the language) as a result of this operation:
321-
**** @(RTLO4b5a)@ The subscription object includes an @unsubscribe@ function
322-
**** @(RTLO4b5b)@ Calling @unsubscribe@ deregisters the listener previously registered by the user via the corresponding @subscribe@ call
328+
*** @(RTLO4b5)@ This clause has been replaced by "RTLO4b7":#RTLO4b7
329+
**** @(RTLO4b5a)@ This clause has been replaced by "RTLO4b7":#RTLO4b7
330+
**** @(RTLO4b5b)@ This clause has been replaced by "RTLO4b7":#RTLO4b7
331+
*** @(RTLO4b7)@ Returns a "@Subscription@":../features#SUB1 object
323332
*** @(RTLO4b6)@ This operation must not have any side effects on @RealtimeObject@, the underlying channel, or their status
324333
** @(RTLO4c)@ public @unsubscribe@ - unsubscribes a previously registered listener
325334
*** @(RTLO4c1)@ This operation does not require any specific channel modes to be granted, nor does it require the channel to be in a specific state
@@ -857,6 +866,31 @@ A @PathObject@ is obtained from @RealtimeObject#get@ ("RTO23":#RTO23), which ret
857866
** @(RTPO18b)@ Resolves the path using the path resolution procedure ("RTPO3":#RTPO3). On failure, throws per "RTPO3c2":#RTPO3c2
858867
** @(RTPO18c)@ If the resolved value is a @LiveCounter@, delegates to @LiveCounter#decrement@ ("RTLC13":#RTLC13) with the provided @amount@
859868
** @(RTPO18d)@ If the resolved value is not a @LiveCounter@, the library must throw an @ErrorInfo@ error with @statusCode@ 400 and @code@ 92007
869+
* @(RTPO19)@ @PathObject#subscribe@ function:
870+
** @(RTPO19a)@ Expects the following arguments:
871+
*** @(RTPO19a1)@ @listener@ - a callback function that receives a @PathObjectSubscriptionEvent@ ("RTPO19d":#RTPO19d) when a change occurs at or below this path
872+
*** @(RTPO19a2)@ @options@ @PathObjectSubscriptionOptions@ (optional) - subscription options
873+
** @(RTPO19b)@ @PathObjectSubscriptionOptions@ has the following properties:
874+
*** @(RTPO19b1)@ @depth@ @Number@ (optional) - controls how many levels deep in the subtree changes trigger the listener:
875+
**** @(RTPO19b1a)@ If undefined (default), the subscription receives events for changes at any depth below the subscribed path
876+
**** @(RTPO19b1b)@ If @depth@ is 1, only changes to the object at the exact subscribed path trigger the listener
877+
**** @(RTPO19b1c)@ If @depth@ is @n@, changes up to @n - 1@ levels of children below the subscribed path trigger the listener
878+
**** @(RTPO19b1d)@ If @depth@ is provided and is not a positive integer, the library must throw an @ErrorInfo@ error with @statusCode@ 400 and @code@ 40003
879+
** @(RTPO19c)@ Returns a "@Subscription@":../features#SUB1 object
880+
** @(RTPO19d)@ The listener receives a @PathObjectSubscriptionEvent@ object with:
881+
*** @(RTPO19d1)@ @object@ - a @PathObject@ pointing to the path where the change occurred
882+
*** @(RTPO19d2)@ @message@ @ObjectMessage@ (optional) - the @ObjectMessage@ that caused the change
883+
** @(RTPO19e)@ The subscription is path-based: it follows the path, not a specific object. If the object at the path changes identity (e.g. via a @MAP_SET@ operation replacing it), the subscription continues to deliver events for the new object at that path
884+
** @(RTPO19f)@ Events at child paths bubble up to the subscription, subject to depth filtering. For example, a subscription at path @a.b@ receives events for changes at @a.b@, @a.b.c@, @a.b.c.d@, etc., depending on the configured depth. The dispatch rules are described in "RTO24b":#RTO24b
885+
** @(RTPO19g)@ This operation must not have any side effects on @RealtimeObject@, the underlying channel, or their status
886+
* @(RTPO20)@ @PathObject#unsubscribe@ function:
887+
** @(RTPO20a)@ Accepts a @listener@ argument and deregisters it from receiving further events for this @PathObject@'s path
888+
** @(RTPO20b)@ This operation must not have any side effects on @RealtimeObject@, the underlying channel, or their status
889+
* @(RTPO21)@ The client library should provide a method that allows consuming subscription events as a stream or iterable, rather than via a callback. A suggested name for this method is @subscribeIterator@:
890+
** @(RTPO21a)@ Expects the following arguments:
891+
*** @(RTPO21a1)@ @options@ @PathObjectSubscriptionOptions@ (optional) - same options as @PathObject#subscribe@ ("RTPO19b":#RTPO19b)
892+
** @(RTPO21b)@ Returns a stream or iterable that yields @PathObjectSubscriptionEvent@ objects, using the idiomatic construct for the language (e.g. async iterators, channels, flows, or async sequences)
893+
** @(RTPO21c)@ Internally wraps @PathObject#subscribe@ ("RTPO19":#RTPO19), converting the callback-based subscription into the appropriate streaming or iterable pattern
860894

861895
h3(#instance). Instance
862896

@@ -913,6 +947,24 @@ An @Instance@ holds a direct reference to a specific resolved @LiveObject@ or pr
913947
*** @(RTINS15a1)@ @amount@ @Number@ (optional) - the amount by which to decrement the counter value. Defaults to 1
914948
** @(RTINS15b)@ If the wrapped value is a @LiveCounter@, delegates to @LiveCounter#decrement@ ("RTLC13":#RTLC13) with the provided @amount@
915949
** @(RTINS15c)@ If the wrapped value is not a @LiveCounter@, the library must throw an @ErrorInfo@ error with @statusCode@ 400 and @code@ 92007
950+
* @(RTINS16)@ @Instance#subscribe@ function:
951+
** @(RTINS16a)@ Expects the following arguments:
952+
*** @(RTINS16a1)@ @listener@ - a callback function that receives an @InstanceSubscriptionEvent@ ("RTINS16d":#RTINS16d) when the wrapped object is updated
953+
** @(RTINS16b)@ If the wrapped value is not a @LiveObject@ (i.e. it is a primitive), the library must throw an @ErrorInfo@ error with @statusCode@ 400 and @code@ 92007, indicating that subscribe is not supported for primitive values
954+
** @(RTINS16c)@ Subscribes to data updates on the underlying @LiveObject@ using @LiveObject#subscribe@ ("RTLO4b":#RTLO4b)
955+
** @(RTINS16d)@ The listener receives an @InstanceSubscriptionEvent@ object with:
956+
*** @(RTINS16d1)@ @object@ - the @Instance@ representing the updated object
957+
*** @(RTINS16d2)@ @message@ @ObjectMessage@ (optional) - the @ObjectMessage@ that caused the change
958+
** @(RTINS16e)@ Returns a "@Subscription@":../features#SUB1 object
959+
** @(RTINS16f)@ The subscription is identity-based: it follows the specific @LiveObject@ instance, regardless of where it sits in the tree
960+
** @(RTINS16g)@ This operation must not have any side effects on @RealtimeObject@, the underlying channel, or their status
961+
* @(RTINS17)@ @Instance#unsubscribe@ function:
962+
** @(RTINS17a)@ Accepts a @listener@ argument and deregisters it from receiving further events using @LiveObject#unsubscribe@ ("RTLO4c":#RTLO4c)
963+
** @(RTINS17b)@ This operation must not have any side effects on @RealtimeObject@, the underlying channel, or their status
964+
* @(RTINS18)@ The client library should provide a method that allows consuming subscription events as a stream or iterable, rather than via a callback. A suggested name for this method is @subscribeIterator@:
965+
** @(RTINS18a)@ If the wrapped value is not a @LiveObject@, the library must throw an @ErrorInfo@ error with @statusCode@ 400 and @code@ 92007
966+
** @(RTINS18b)@ Returns a stream or iterable that yields @InstanceSubscriptionEvent@ objects, using the idiomatic construct for the language (e.g. async iterators, channels, flows, or async sequences)
967+
** @(RTINS18c)@ Internally wraps @Instance#subscribe@ ("RTINS16":#RTINS16), converting the callback-based subscription into the appropriate streaming or iterable pattern
916968

917969
h2(#idl). Interface Definition
918970

@@ -951,13 +1003,10 @@ class LiveObject: // RTLO*, internal
9511003
tombstonedAt: Time? // RTLO3e
9521004
canApplyOperation(ObjectMessage) -> Boolean // RTLO4a
9531005
tombstone(ObjectMessage) // RTLO4e
954-
subscribe((LiveObjectUpdate) ->) -> LiveObjectSubscription // RTLO4b
1006+
subscribe((LiveObjectUpdate) ->) -> Subscription // RTLO4b
9551007
unsubscribe((LiveObjectUpdate) ->) // RTLO4c
9561008

957-
interface LiveObjectSubscription: // RTLO4b5
958-
unsubscribe() // RTLO4b5a
959-
960-
interface LiveObjectUpdate: // RTLO4b4, internal
1009+
interface LiveObjectUpdate: // RTLO4b4
9611010
update: Object // RTLO4b4a
9621011
noop: Boolean // RTLO4b4b
9631012

@@ -989,6 +1038,17 @@ class LiveCounterValueType: // RTLCV*
9891038
class LiveMapValueType: // RTLMV*
9901039
// created via LiveMap.create(), RTLMV3
9911040

1041+
interface PathObjectSubscriptionEvent: // RTPO19d
1042+
object: PathObject // RTPO19d1
1043+
message: ObjectMessage? // RTPO19d2
1044+
1045+
interface PathObjectSubscriptionOptions: // RTPO19b
1046+
depth: Number? // RTPO19b1
1047+
1048+
interface InstanceSubscriptionEvent: // RTINS16d
1049+
object: Instance // RTINS16d1
1050+
message: ObjectMessage? // RTINS16d2
1051+
9921052
class PathObject: // RTPO*
9931053
path() -> String // RTPO4
9941054
get(String key) -> PathObject // RTPO5
@@ -1005,6 +1065,9 @@ class PathObject: // RTPO*
10051065
remove(String key) => io // RTPO16
10061066
increment(Number amount?) => io // RTPO17
10071067
decrement(Number amount?) => io // RTPO18
1068+
subscribe((PathObjectSubscriptionEvent) -> listener, PathObjectSubscriptionOptions? options) -> Subscription // RTPO19
1069+
unsubscribe((PathObjectSubscriptionEvent) -> listener) // RTPO20
1070+
subscribeIterator(PathObjectSubscriptionOptions? options) -> Stream<PathObjectSubscriptionEvent> // RTPO21
10081071

10091072
class Instance: // RTINS*
10101073
id: String? // RTINS3
@@ -1020,4 +1083,7 @@ class Instance: // RTINS*
10201083
remove(String key) => io // RTINS13
10211084
increment(Number amount?) => io // RTINS14
10221085
decrement(Number amount?) => io // RTINS15
1086+
subscribe((InstanceSubscriptionEvent) -> listener) -> Subscription // RTINS16
1087+
unsubscribe((InstanceSubscriptionEvent) -> listener) // RTINS17
1088+
subscribeIterator() -> Stream<InstanceSubscriptionEvent> // RTINS18
10231089
</pre>

0 commit comments

Comments
 (0)