|
3 | 3 | import logging |
4 | 4 | import os |
5 | 5 | from collections import OrderedDict |
6 | | -from typing import Iterator |
| 6 | +from typing import Iterator, Optional |
7 | 7 | from urllib import parse |
8 | 8 |
|
9 | 9 | import msgpack |
10 | 10 |
|
11 | 11 | from ably.http.paginatedresult import PaginatedResult, format_params |
12 | 12 | from ably.types.channeldetails import ChannelDetails |
13 | | -from ably.types.message import Message, make_message_response_handler |
| 13 | +from ably.types.message import ( |
| 14 | + Message, |
| 15 | + MessageAction, |
| 16 | + MessageVersion, |
| 17 | + make_message_response_handler, |
| 18 | + make_single_message_response_handler, |
| 19 | +) |
| 20 | +from ably.types.operations import MessageOperation, PublishResult, UpdateDeleteResult |
14 | 21 | from ably.types.presence import Presence |
15 | 22 | from ably.util.crypto import get_cipher |
16 | | -from ably.util.exceptions import IncompatibleClientIdException, catch_all |
| 23 | +from ably.util.exceptions import ( |
| 24 | + AblyException, |
| 25 | + IncompatibleClientIdException, |
| 26 | + catch_all, |
| 27 | +) |
17 | 28 |
|
18 | 29 | log = logging.getLogger(__name__) |
19 | 30 |
|
@@ -99,7 +110,13 @@ async def publish_messages(self, messages, params=None, timeout=None): |
99 | 110 | if params: |
100 | 111 | params = {k: str(v).lower() if type(v) is bool else v for k, v in params.items()} |
101 | 112 | path += '?' + parse.urlencode(params) |
102 | | - return await self.ably.http.post(path, body=request_body, timeout=timeout) |
| 113 | + response = await self.ably.http.post(path, body=request_body, timeout=timeout) |
| 114 | + |
| 115 | + # Parse response to extract serials |
| 116 | + result_data = response.to_native() |
| 117 | + if result_data and isinstance(result_data, dict): |
| 118 | + return PublishResult.from_dict(result_data) |
| 119 | + return PublishResult() |
103 | 120 |
|
104 | 121 | async def publish_name_data(self, name, data, timeout=None): |
105 | 122 | messages = [Message(name, data)] |
@@ -141,6 +158,187 @@ async def status(self): |
141 | 158 | obj = response.to_native() |
142 | 159 | return ChannelDetails.from_dict(obj) |
143 | 160 |
|
| 161 | + async def _send_update( |
| 162 | + self, |
| 163 | + message: Message, |
| 164 | + action: MessageAction, |
| 165 | + operation: Optional[MessageOperation] = None, |
| 166 | + params: Optional[dict] = None, |
| 167 | + ): |
| 168 | + """Internal method to send update/delete/append operations.""" |
| 169 | + if not message.serial: |
| 170 | + raise AblyException( |
| 171 | + "Message serial is required for update/delete/append operations", |
| 172 | + 400, |
| 173 | + 40003 |
| 174 | + ) |
| 175 | + |
| 176 | + if not operation: |
| 177 | + version = None |
| 178 | + else: |
| 179 | + version = MessageVersion( |
| 180 | + client_id=operation.client_id, |
| 181 | + description=operation.description, |
| 182 | + metadata=operation.metadata |
| 183 | + ) |
| 184 | + |
| 185 | + # Create a new message with the operation fields |
| 186 | + update_message = Message( |
| 187 | + name=message.name, |
| 188 | + data=message.data, |
| 189 | + client_id=message.client_id, |
| 190 | + serial=message.serial, |
| 191 | + action=action, |
| 192 | + version=version, |
| 193 | + ) |
| 194 | + |
| 195 | + # Encrypt if needed |
| 196 | + if self.cipher: |
| 197 | + update_message.encrypt(self.__cipher) |
| 198 | + |
| 199 | + # Serialize the message |
| 200 | + request_body = update_message.as_dict(binary=self.ably.options.use_binary_protocol) |
| 201 | + |
| 202 | + if not self.ably.options.use_binary_protocol: |
| 203 | + request_body = json.dumps(request_body, separators=(',', ':')) |
| 204 | + else: |
| 205 | + request_body = msgpack.packb(request_body, use_bin_type=True) |
| 206 | + |
| 207 | + # Build path with params |
| 208 | + path = self.__base_path + 'messages/{}'.format(parse.quote_plus(message.serial, safe=':')) |
| 209 | + if params: |
| 210 | + params = {k: str(v).lower() if type(v) is bool else v for k, v in params.items()} |
| 211 | + path += '?' + parse.urlencode(params) |
| 212 | + |
| 213 | + # Send request |
| 214 | + response = await self.ably.http.patch(path, body=request_body) |
| 215 | + |
| 216 | + # Parse response |
| 217 | + result_data = response.to_native() |
| 218 | + if result_data and isinstance(result_data, dict): |
| 219 | + return UpdateDeleteResult.from_dict(result_data) |
| 220 | + return UpdateDeleteResult() |
| 221 | + |
| 222 | + async def update_message(self, message: Message, operation: MessageOperation = None, params: dict = None): |
| 223 | + """Updates an existing message on this channel. |
| 224 | +
|
| 225 | + Parameters: |
| 226 | + - message: Message object to update. Must have a serial field. |
| 227 | + - operation: Optional MessageOperation containing description and metadata for the update. |
| 228 | + - params: Optional dict of query parameters. |
| 229 | +
|
| 230 | + Returns: |
| 231 | + - UpdateDeleteResult containing the version serial of the updated message. |
| 232 | + """ |
| 233 | + return await self._send_update(message, MessageAction.MESSAGE_UPDATE, operation, params) |
| 234 | + |
| 235 | + async def delete_message(self, message: Message, operation: MessageOperation = None, params: dict = None): |
| 236 | + """Deletes a message on this channel. |
| 237 | +
|
| 238 | + Parameters: |
| 239 | + - message: Message object to delete. Must have a serial field. |
| 240 | + - operation: Optional MessageOperation containing description and metadata for the delete. |
| 241 | + - params: Optional dict of query parameters. |
| 242 | +
|
| 243 | + Returns: |
| 244 | + - UpdateDeleteResult containing the version serial of the deleted message. |
| 245 | + """ |
| 246 | + return await self._send_update(message, MessageAction.MESSAGE_DELETE, operation, params) |
| 247 | + |
| 248 | + async def append_message(self, message: Message, operation: MessageOperation = None, params: dict = None): |
| 249 | + """Appends data to an existing message on this channel. |
| 250 | +
|
| 251 | + Parameters: |
| 252 | + - message: Message object with data to append. Must have a serial field. |
| 253 | + - operation: Optional MessageOperation containing description and metadata for the append. |
| 254 | + - params: Optional dict of query parameters. |
| 255 | +
|
| 256 | + Returns: |
| 257 | + - UpdateDeleteResult containing the version serial of the appended message. |
| 258 | + """ |
| 259 | + return await self._send_update(message, MessageAction.MESSAGE_APPEND, operation, params) |
| 260 | + |
| 261 | + async def get_message(self, serial_or_message, timeout=None): |
| 262 | + """Retrieves a single message by its serial. |
| 263 | +
|
| 264 | + Parameters: |
| 265 | + - serial_or_message: Either a string serial or a Message object with a serial field. |
| 266 | +
|
| 267 | + Returns: |
| 268 | + - Message object for the requested serial. |
| 269 | +
|
| 270 | + Raises: |
| 271 | + - AblyException: If the serial is missing or the message cannot be retrieved. |
| 272 | + """ |
| 273 | + # Extract serial from string or Message object |
| 274 | + if isinstance(serial_or_message, str): |
| 275 | + serial = serial_or_message |
| 276 | + elif isinstance(serial_or_message, Message): |
| 277 | + serial = serial_or_message.serial |
| 278 | + else: |
| 279 | + serial = None |
| 280 | + |
| 281 | + if not serial: |
| 282 | + raise AblyException( |
| 283 | + 'This message lacks a serial. Make sure you have enabled "Message annotations, ' |
| 284 | + 'updates, and deletes" in channel settings on your dashboard.', |
| 285 | + 400, |
| 286 | + 40003 |
| 287 | + ) |
| 288 | + |
| 289 | + # Build the path |
| 290 | + path = self.__base_path + 'messages/' + parse.quote_plus(serial, safe=':') |
| 291 | + |
| 292 | + # Make the request |
| 293 | + response = await self.ably.http.get(path, timeout=timeout) |
| 294 | + |
| 295 | + # Create Message from the response |
| 296 | + message_handler = make_single_message_response_handler(self.__cipher) |
| 297 | + return message_handler(response) |
| 298 | + |
| 299 | + async def get_message_versions(self, serial_or_message, params=None): |
| 300 | + """Retrieves version history for a message. |
| 301 | +
|
| 302 | + Parameters: |
| 303 | + - serial_or_message: Either a string serial or a Message object with a serial field. |
| 304 | + - params: Optional dict of query parameters for pagination (e.g., limit, start, end, direction). |
| 305 | +
|
| 306 | + Returns: |
| 307 | + - PaginatedResult containing Message objects representing each version. |
| 308 | +
|
| 309 | + Raises: |
| 310 | + - AblyException: If the serial is missing or versions cannot be retrieved. |
| 311 | + """ |
| 312 | + # Extract serial from string or Message object |
| 313 | + if isinstance(serial_or_message, str): |
| 314 | + serial = serial_or_message |
| 315 | + elif isinstance(serial_or_message, Message): |
| 316 | + serial = serial_or_message.serial |
| 317 | + else: |
| 318 | + serial = None |
| 319 | + |
| 320 | + if not serial: |
| 321 | + raise AblyException( |
| 322 | + 'This message lacks a serial. Make sure you have enabled "Message annotations, ' |
| 323 | + 'updates, and deletes" in channel settings on your dashboard.', |
| 324 | + 400, |
| 325 | + 40003 |
| 326 | + ) |
| 327 | + |
| 328 | + # Build the path |
| 329 | + params_str = format_params({}, **params) if params else '' |
| 330 | + path = self.__base_path + 'messages/' + parse.quote_plus(serial, safe=':') + '/versions' + params_str |
| 331 | + |
| 332 | + # Create message handler for decoding |
| 333 | + message_handler = make_message_response_handler(self.__cipher) |
| 334 | + |
| 335 | + # Return paginated result |
| 336 | + return await PaginatedResult.paginated_query( |
| 337 | + self.ably.http, |
| 338 | + url=path, |
| 339 | + response_processor=message_handler |
| 340 | + ) |
| 341 | + |
144 | 342 | @property |
145 | 343 | def ably(self): |
146 | 344 | return self.__ably |
|
0 commit comments