Skip to content

Commit 20c2c55

Browse files
committed
[AIT-316] feat: introduce support for message annotations
- Added `RealtimeAnnotations` class to manage annotation creation, deletion, and subscription on realtime channels. - Introduced `Annotation` and `AnnotationAction` types to encapsulate annotation details and actions. - Extended flags to include `ANNOTATION_PUBLISH` and `ANNOTATION_SUBSCRIBE`. - Refactored data encoding logic into `ably.util.encoding`. - Integrated annotation handling into `RealtimeChannel` and `RestChannel`.
1 parent b0499a9 commit 20c2c55

13 files changed

Lines changed: 1366 additions & 93 deletions

File tree

ably/realtime/annotations.py

Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
from __future__ import annotations
2+
3+
import logging
4+
from typing import TYPE_CHECKING
5+
6+
from ably.rest.annotations import RestAnnotations, construct_validate_annotation
7+
from ably.transport.websockettransport import ProtocolMessageAction
8+
from ably.types.annotation import Annotation, AnnotationAction
9+
from ably.types.channelstate import ChannelState
10+
from ably.types.flags import Flag
11+
from ably.util.eventemitter import EventEmitter
12+
from ably.util.exceptions import AblyException
13+
from ably.util.helper import is_callable_or_coroutine
14+
15+
if TYPE_CHECKING:
16+
from ably.realtime.channel import RealtimeChannel
17+
from ably.realtime.connectionmanager import ConnectionManager
18+
19+
log = logging.getLogger(__name__)
20+
21+
22+
class RealtimeAnnotations:
23+
"""
24+
Provides realtime methods for managing annotations on messages,
25+
including publishing annotations and subscribing to annotation events.
26+
"""
27+
28+
__connection_manager: ConnectionManager
29+
__channel: RealtimeChannel
30+
31+
def __init__(self, channel: RealtimeChannel, connection_manager: ConnectionManager):
32+
"""
33+
Initialize RealtimeAnnotations.
34+
35+
Args:
36+
channel: The Realtime Channel this annotations instance belongs to
37+
"""
38+
self.__channel = channel
39+
self.__connection_manager = connection_manager
40+
self.__subscriptions = EventEmitter()
41+
self.__rest_annotations = RestAnnotations(channel)
42+
43+
async def publish(self, msg_or_serial, annotation: dict | Annotation, params: dict=None):
44+
"""
45+
Publish an annotation on a message via the realtime connection.
46+
47+
Args:
48+
msg_or_serial: Either a message serial (string) or a Message object
49+
annotation: Dict containing annotation properties (type, name, data, etc.) or Annotation object
50+
params: Optional dict of query parameters
51+
52+
Returns:
53+
None
54+
55+
Raises:
56+
AblyException: If the request fails, inputs are invalid, or channel is in unpublishable state
57+
"""
58+
annotation = construct_validate_annotation(msg_or_serial, annotation)
59+
60+
# Check if channel and connection are in publishable state
61+
self.__channel._throw_if_unpublishable_state()
62+
63+
log.info(
64+
f'RealtimeAnnotations.publish(), channelName = {self.__channel.name}, '
65+
f'sending annotation with messageSerial = {annotation.message_serial}, '
66+
f'type = {annotation.type}'
67+
)
68+
69+
# Convert to wire format (array of annotations)
70+
wire_annotation = annotation.as_dict(binary=self.__channel.ably.options.use_binary_protocol)
71+
72+
# Build protocol message
73+
protocol_message = {
74+
"action": ProtocolMessageAction.ANNOTATION,
75+
"channel": self.__channel.name,
76+
"annotations": [wire_annotation],
77+
}
78+
79+
if params:
80+
# Stringify boolean params
81+
stringified_params = {k: str(v).lower() if isinstance(v, bool) else v for k, v in params.items()}
82+
protocol_message["params"] = stringified_params
83+
84+
# Send via WebSocket
85+
await self.__connection_manager.send_protocol_message(protocol_message)
86+
87+
async def delete(self, msg_or_serial, annotation: dict | Annotation, params=None, timeout=None):
88+
"""
89+
Delete an annotation on a message.
90+
91+
This is a convenience method that sets the action to 'annotation.delete'
92+
and calls publish().
93+
94+
Args:
95+
msg_or_serial: Either a message serial (string) or a Message object
96+
annotation: Dict containing annotation properties or Annotation object
97+
params: Optional dict of query parameters
98+
timeout: Optional timeout (not used for realtime, kept for compatibility)
99+
100+
Returns:
101+
None
102+
103+
Raises:
104+
AblyException: If the request fails or inputs are invalid
105+
"""
106+
if isinstance(annotation, Annotation):
107+
annotation_values = annotation.as_dict()
108+
else:
109+
annotation_values = annotation.copy()
110+
annotation_values['action'] = AnnotationAction.ANNOTATION_DELETE
111+
return await self.publish(msg_or_serial, annotation_values, params)
112+
113+
async def subscribe(self, *args):
114+
"""
115+
Subscribe to annotation events on this channel.
116+
117+
Parameters
118+
----------
119+
*args: type, listener
120+
Subscribe type and listener
121+
122+
arg1(type): str, optional
123+
Subscribe to annotations of the given type
124+
125+
arg2(listener): callable
126+
Subscribe to all annotations on the channel
127+
128+
When no type is provided, arg1 is used as the listener.
129+
130+
Raises
131+
------
132+
AblyException
133+
If unable to subscribe due to invalid channel state or missing ANNOTATION_SUBSCRIBE mode
134+
ValueError
135+
If no valid subscribe arguments are passed
136+
"""
137+
# Parse arguments similar to channel.subscribe
138+
if len(args) == 0:
139+
raise ValueError("annotations.subscribe called without arguments")
140+
141+
if len(args) >= 2 and isinstance(args[0], str):
142+
annotation_type = args[0]
143+
if not args[1]:
144+
raise ValueError("annotations.subscribe called without listener")
145+
if not is_callable_or_coroutine(args[1]):
146+
raise ValueError("subscribe listener must be function or coroutine function")
147+
listener = args[1]
148+
elif is_callable_or_coroutine(args[0]):
149+
listener = args[0]
150+
annotation_type = None
151+
else:
152+
raise ValueError('invalid subscribe arguments')
153+
154+
# Register subscription
155+
if annotation_type is not None:
156+
self.__subscriptions.on(annotation_type, listener)
157+
else:
158+
self.__subscriptions.on(listener)
159+
160+
await self.__channel.attach()
161+
162+
# Check if ANNOTATION_SUBSCRIBE mode is enabled
163+
if self.__channel.state == ChannelState.ATTACHED:
164+
if not self.__channel._has_flag(Flag.ANNOTATION_SUBSCRIBE):
165+
raise AblyException(
166+
"You are trying to add an annotation listener, but you haven't requested the "
167+
"annotation_subscribe channel mode in ChannelOptions, so this won't do anything "
168+
"(we only deliver annotations to clients who have explicitly requested them)",
169+
93001,
170+
400
171+
)
172+
173+
def unsubscribe(self, *args):
174+
"""
175+
Unsubscribe from annotation events on this channel.
176+
177+
Parameters
178+
----------
179+
*args: type, listener
180+
Unsubscribe type and listener
181+
182+
arg1(type): str, optional
183+
Unsubscribe from annotations of the given type
184+
185+
arg2(listener): callable
186+
Unsubscribe from all annotations on the channel
187+
188+
When no type is provided, arg1 is used as the listener.
189+
190+
Raises
191+
------
192+
ValueError
193+
If no valid unsubscribe arguments are passed
194+
"""
195+
if len(args) == 0:
196+
raise ValueError("annotations.unsubscribe called without arguments")
197+
198+
if len(args) >= 2 and isinstance(args[0], str):
199+
annotation_type = args[0]
200+
listener = args[1]
201+
self.__subscriptions.off(annotation_type, listener)
202+
elif is_callable_or_coroutine(args[0]):
203+
listener = args[0]
204+
self.__subscriptions.off(listener)
205+
else:
206+
raise ValueError('invalid unsubscribe arguments')
207+
208+
def _process_incoming(self, incoming_annotations):
209+
"""
210+
Process incoming annotations from the server.
211+
212+
This is called internally when ANNOTATION protocol messages are received.
213+
214+
Args:
215+
incoming_annotations: List of Annotation objects received from the server
216+
"""
217+
for annotation in incoming_annotations:
218+
# Emit to type-specific listeners and catch-all listeners
219+
annotation_type = annotation.type or ''
220+
self.__subscriptions._emit(annotation_type, annotation)
221+
222+
async def get(self, msg_or_serial, params=None):
223+
"""
224+
Retrieve annotations for a message with pagination support.
225+
226+
This delegates to the REST implementation.
227+
228+
Args:
229+
msg_or_serial: Either a message serial (string) or a Message object
230+
params: Optional dict of query parameters (limit, start, end, direction)
231+
232+
Returns:
233+
PaginatedResult: A paginated result containing Annotation objects
234+
235+
Raises:
236+
AblyException: If the request fails or serial is invalid
237+
"""
238+
# Delegate to REST implementation
239+
return await self.__rest_annotations.get(msg_or_serial, params)

ably/realtime/channel.py

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,13 @@
44
import logging
55
from typing import TYPE_CHECKING
66

7+
from ably.realtime.annotations import RealtimeAnnotations
78
from ably.realtime.connection import ConnectionState
9+
from ably.realtime.presence import RealtimePresence
810
from ably.rest.channel import Channel
911
from ably.rest.channel import Channels as RestChannels
1012
from ably.transport.websockettransport import ProtocolMessageAction
13+
from ably.types.annotation import Annotation
1114
from ably.types.channeloptions import ChannelOptions
1215
from ably.types.channelstate import ChannelState, ChannelStateChange
1316
from ably.types.flags import Flag, has_flag
@@ -64,6 +67,7 @@ def __init__(self, realtime: AblyRealtime, name: str, channel_options: ChannelOp
6467
self.__error_reason: AblyException | None = None
6568
self.__channel_options = channel_options or ChannelOptions()
6669
self.__params: dict[str, str] | None = None
70+
self.__flags: int = 0 # Channel mode flags from ATTACHED message
6771

6872
# Delta-specific fields for RTL19/RTL20 compliance
6973
vcdiff_decoder = self.__realtime.options.vcdiff_decoder if self.__realtime.options.vcdiff_decoder else None
@@ -74,12 +78,15 @@ def __init__(self, realtime: AblyRealtime, name: str, channel_options: ChannelOp
7478
# will be disrupted if the user called .off() to remove all listeners
7579
self.__internal_state_emitter = EventEmitter()
7680

81+
# Pass channel options as dictionary to parent Channel class
82+
Channel.__init__(self, realtime, name, self.__channel_options.to_dict())
83+
7784
# Initialize presence for this channel
78-
from ably.realtime.presence import RealtimePresence
85+
7986
self.__presence = RealtimePresence(self)
8087

81-
# Pass channel options as dictionary to parent Channel class
82-
Channel.__init__(self, realtime, name, self.__channel_options.to_dict())
88+
# Initialize realtime annotations for this channel (override REST annotations)
89+
self._Channel__annotations = RealtimeAnnotations(self, realtime.connection.connection_manager)
8390

8491
async def set_options(self, channel_options: ChannelOptions) -> None:
8592
"""Set channel options"""
@@ -459,6 +466,21 @@ def _throw_if_unpublishable_state(self) -> None:
459466
90001,
460467
)
461468

469+
def _has_flag(self, flag: Flag) -> bool:
470+
"""Check if a specific flag is set in the channel's mode flags.
471+
472+
Parameters
473+
----------
474+
flag : Flag
475+
The flag to check
476+
477+
Returns
478+
-------
479+
bool
480+
True if the flag is set, False otherwise
481+
"""
482+
return has_flag(self.__flags, flag)
483+
462484
async def _send_update(
463485
self,
464486
message: Message,
@@ -491,8 +513,8 @@ async def _send_update(
491513
if not message.serial:
492514
raise AblyException(
493515
"Message serial is required for update/delete/append operations",
494-
400,
495-
40003
516+
status_code=400,
517+
code=40003,
496518
)
497519

498520
# Check connection and channel state
@@ -702,6 +724,8 @@ def _on_message(self, proto_msg: dict) -> None:
702724
resumed = has_flag(flags, Flag.RESUMED)
703725
# RTP1: Check for HAS_PRESENCE flag
704726
has_presence = has_flag(flags, Flag.HAS_PRESENCE)
727+
# Store channel attach flags
728+
self.__flags = flags
705729

706730
# RTL12
707731
if self.state == ChannelState.ATTACHED:
@@ -744,6 +768,15 @@ def _on_message(self, proto_msg: dict) -> None:
744768
decoded_presence = PresenceMessage.from_encoded_array(presence_messages, cipher=self.cipher)
745769
sync_channel_serial = proto_msg.get('channelSerial')
746770
self.__presence.set_presence(decoded_presence, is_sync=True, sync_channel_serial=sync_channel_serial)
771+
elif action == ProtocolMessageAction.ANNOTATION:
772+
# Handle ANNOTATION messages
773+
annotation_data = proto_msg.get('annotations', [])
774+
try:
775+
annotations = Annotation.from_encoded_array(annotation_data, cipher=self.cipher)
776+
# Process annotations through the annotations handler
777+
self.annotations._process_incoming(annotations)
778+
except Exception as e:
779+
log.error(f"Annotation processing error {e}. Skip annotations {annotation_data}")
747780
elif action == ProtocolMessageAction.ERROR:
748781
error = AblyException.from_dict(proto_msg.get('error'))
749782
self._notify_state(ChannelState.FAILED, reason=error)

0 commit comments

Comments
 (0)