From 1e67c68b94ab1bafe96081c51670a3dbb8c58660 Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Thu, 9 Apr 2026 17:59:41 +0200 Subject: [PATCH 1/2] Added otlp for JS consumers and publishing. --- examples/consumers.py | 27 +- examples/opentelemetry.py | 16 ++ python/natsrpy/_natsrpy_rs/__init__.pyi | 2 +- python/natsrpy/_natsrpy_rs/js/consumers.pyi | 52 +++- python/natsrpy/instrumentation/__init__.py | 35 ++- .../instrumentation/ctx_manager_wrapper.py | 42 +++ python/natsrpy/instrumentation/js_consumer.py | 266 ++++++++++++++++++ python/natsrpy/instrumentation/js_publish.py | 83 ++++++ python/natsrpy/instrumentation/nats_core.py | 65 +++-- .../natsrpy/instrumentation/span_builder.py | 60 +++- python/natsrpy/js.py | 6 + python/tests/test_consumers.py | 9 +- src/js/consumers/mod.rs | 6 +- src/js/consumers/pull/consumer.rs | 100 ++++++- src/js/consumers/push/consumer.rs | 45 ++- src/js/consumers/push/mod.rs | 2 +- src/js/managers/consumers.rs | 6 - 17 files changed, 751 insertions(+), 71 deletions(-) create mode 100644 examples/opentelemetry.py create mode 100644 python/natsrpy/instrumentation/ctx_manager_wrapper.py create mode 100644 python/natsrpy/instrumentation/js_consumer.py create mode 100644 python/natsrpy/instrumentation/js_publish.py diff --git a/examples/consumers.py b/examples/consumers.py index 57df1c3..8986625 100644 --- a/examples/consumers.py +++ b/examples/consumers.py @@ -41,18 +41,31 @@ async def main() -> None: # We publish a single message await js.publish("stream.example.test", "message for stream") - # We use messages() to get async iterator which we - # use to get messages for push_consumer. - async for push_message in await push_consumer.messages(): - print(f"[FROM_PUSH] {push_message.payload!r}") # noqa: T201 - await push_message.ack() - break + async with push_consumer.consume() as messages: + async for push_message in messages: + print(f"[FROM_PUSH] {push_message.payload!r}") # noqa: T201 + break - # Pull consumers have to request batches of messages. + # Pull consumers have 2 different APIs. + # 1. You can use fetch directly. + # 2. Use async iterator API. + + # Here's how to call pull-consumer fetch method. + # It returns a batch of messages. + # However, please be careful, this method has worse opentelemetry + # instrumentation. Because essentailly it's the same as just calling a function. + # with no scope. for pull_message in await pull_consumer.fetch(max_messages=10): print(f"[FROM_PULL] {pull_message.payload!r}") # noqa: T201 await pull_message.ack() + # This API is more prefered, because it has better + # Opentelemetry instrumentation. + async with pull_consumer.consume() as messages: + async for message in messages: + print(f"[FROM_PULL] {message.payload!r}") # noqa: T201 + break + # Cleanup await stream.consumers.delete(push_consumer.name) await stream.consumers.delete(pull_consumer.name) diff --git a/examples/opentelemetry.py b/examples/opentelemetry.py new file mode 100644 index 0000000..24443e3 --- /dev/null +++ b/examples/opentelemetry.py @@ -0,0 +1,16 @@ +from natsrpy.instrumentation import NatsrpyInstrumentor + +NatsrpyInstrumentor().instrument( + # If true, then message payload will be attached + # to some spans. + capture_body=False, + # If true, then message headers will be attached + # to some spans. + capture_headers=False, +) + +# We also support zero-code instrumentation. +# In case if you're using it, you can specify those parameters +# by setting the following environment variables: +# * `OTEL_PYTHON_NATSRPY_CAPTURE_BODY=true` +# * `OTEL_PYTHON_NATSRPY_CAPTURE_HEADERS=true` diff --git a/python/natsrpy/_natsrpy_rs/__init__.pyi b/python/natsrpy/_natsrpy_rs/__init__.pyi index 4e18482..1921378 100644 --- a/python/natsrpy/_natsrpy_rs/__init__.pyi +++ b/python/natsrpy/_natsrpy_rs/__init__.pyi @@ -102,7 +102,7 @@ class SubscriptionCtxManager(Generic[_T]): """ def __aenter__(self) -> Future[_T]: ... - async def __aexit__( + def __aexit__( self, _exc_type: type[BaseException] | None = None, _exc_val: BaseException | None = None, diff --git a/python/natsrpy/_natsrpy_rs/js/consumers.pyi b/python/natsrpy/_natsrpy_rs/js/consumers.pyi index 9e1c895..65cf000 100644 --- a/python/natsrpy/_natsrpy_rs/js/consumers.pyi +++ b/python/natsrpy/_natsrpy_rs/js/consumers.pyi @@ -1,5 +1,6 @@ from asyncio import Future from datetime import timedelta +from types import TracebackType from typing import final from natsrpy._natsrpy_rs.js import JetStreamMessage @@ -12,8 +13,11 @@ __all__ = [ "PriorityPolicy", "PullConsumer", "PullConsumerConfig", + "PullConsumerContextManager", + "PullConsumerFetcher", "PushConsumer", "PushConsumerConfig", + "PushConsumerContextManager", "ReplayPolicy", ] @@ -283,6 +287,28 @@ class MessagesIterator: def __aiter__(self) -> Self: ... def __anext__(self) -> Future[JetStreamMessage]: ... +@final +class PushConsumerContextManager: + """ + Context manager for consuming messages from push-based consumer. + + This class is used to scope the message consumption. + Mostly used for opentelemetry support. + """ + + def __aenter__(self) -> Future[MessagesIterator]: + """Get an async iterator for consuming messages. + + :return: an async iterator over JetStream messages. + """ + + def __aexit__( + self, + _exc_type: type[BaseException] | None = None, + _exc_val: BaseException | None = None, + _exc_tb: TracebackType | None = None, + ) -> Future[None]: ... + @final class PushConsumer: """A push-based JetStream consumer. @@ -298,11 +324,26 @@ class PushConsumer: def stream_name(self) -> str: """Get stream name that this consumer attached to.""" - def messages(self) -> Future[MessagesIterator]: - """Get an async iterator for consuming messages. + def consume(self) -> PushConsumerContextManager: + """Start consuming messages.""" - :return: an async iterator over JetStream messages. - """ +@final +class PullConsumerFetcher: + def __aiter__(self) -> Self: + """Returns this very object.""" + + def __anext__(self) -> Future[JetStreamMessage]: + """Get a next message from the stream.""" + +@final +class PullConsumerContextManager: + def __aenter__(self) -> Future[PullConsumerFetcher]: ... + def __aexit__( + self, + _exc_type: type[BaseException] | None = None, + _exc_val: BaseException | None = None, + _exc_tb: TracebackType | None = None, + ) -> Future[None]: ... @final class PullConsumer: @@ -319,6 +360,9 @@ class PullConsumer: def stream_name(self) -> str: """Get stream name that this consumer attached to.""" + def consume(self) -> PullConsumerContextManager: + """Start consuming messages.""" + def fetch( self, max_messages: int | None = None, diff --git a/python/natsrpy/instrumentation/__init__.py b/python/natsrpy/instrumentation/__init__.py index c7d5929..96b4117 100644 --- a/python/natsrpy/instrumentation/__init__.py +++ b/python/natsrpy/instrumentation/__init__.py @@ -33,10 +33,13 @@ async def main() -> None: """ import logging +import os from collections.abc import Collection from importlib import metadata from typing import Any +from .js_consumer import JSConsumerInstrumentation +from .js_publish import JSPublishInstrumentation from .nats_core import NatsCoreInstrumentator try: @@ -68,12 +71,42 @@ def instrumentation_dependencies(self) -> Collection[str]: def _instrument(self, **kwargs: Any) -> None: tracer_provider = kwargs.get("tracer_provider") + capture_body = ( + os.environ.get( + "OTEL_PYTHON_NATSRPY_CAPTURE_BODY", + str(kwargs.get("capture_body", False)), + ).lower() + == "true" + ) + capture_headers = ( + os.environ.get( + "OTEL_PYTHON_NATSRPY_CAPTURE_HEADERS", + str(kwargs.get("capture_headers", False)), + ).lower() + == "true" + ) tracer = trace.get_tracer( _INSTRUMENTATION_MODULE_NAME, metadata.version("natsrpy"), tracer_provider, ) - NatsCoreInstrumentator(tracer).instrument() + NatsCoreInstrumentator( + tracer, + capture_body=capture_body, + capture_headers=capture_headers, + ).instrument() + JSConsumerInstrumentation( + tracer, + capture_body=capture_body, + capture_headers=capture_headers, + ).instrument() + JSPublishInstrumentation( + tracer, + capture_body=capture_body, + capture_headers=capture_headers, + ).instrument() def _uninstrument(self, **kwargs: Any) -> None: NatsCoreInstrumentator.uninstrument() + JSConsumerInstrumentation.uninstrument() + JSPublishInstrumentation.uninstrument() diff --git a/python/natsrpy/instrumentation/ctx_manager_wrapper.py b/python/natsrpy/instrumentation/ctx_manager_wrapper.py new file mode 100644 index 0000000..5bc0126 --- /dev/null +++ b/python/natsrpy/instrumentation/ctx_manager_wrapper.py @@ -0,0 +1,42 @@ +from collections.abc import Callable +from typing import Any, Concatenate + +from typing_extensions import ParamSpec +from wrapt import ObjectProxy + +_P = ParamSpec("_P") + + +class AsyncCtxManagerProxy(ObjectProxy): # type: ignore + """ + Proxy object for context managers. + + This class wraps a context manager, + wrapping returned values on __aenter__, + and calling __cancel_ctx__ at the exit. + """ + + def __init__( + self, + wrapped: Any, + sub_wrappers: dict[type[Any], Callable[Concatenate[Any, _P], Any]], + *args: _P.args, + **kwargs: _P.kwargs, + ) -> None: + super().__init__(wrapped) + self._self_sub_args = args + self._self_sub_kwargs = kwargs + self._self_sub = None + self._self_subwrappers = sub_wrappers + + async def __aenter__(self) -> Any: + sub: Any = await self.__wrapped__.__aenter__() + sub_wrapper = self._self_subwrappers.get(type(sub)) + if sub_wrapper: + sub = sub_wrapper(sub, *self._self_sub_args, **self._self_sub_kwargs) + self._self_sub = sub + return sub + + async def __aexit__(self, *args: object, **kwargs: dict[Any, Any]) -> Any: + if self._self_sub and hasattr(self._self_sub, "__cancel_ctx__"): + self._self_sub.__cancel_ctx__(*args, **kwargs) diff --git a/python/natsrpy/instrumentation/js_consumer.py b/python/natsrpy/instrumentation/js_consumer.py new file mode 100644 index 0000000..bbbd619 --- /dev/null +++ b/python/natsrpy/instrumentation/js_consumer.py @@ -0,0 +1,266 @@ +from collections.abc import AsyncGenerator, Callable +from types import TracebackType +from typing import Any + +from opentelemetry import context, propagate, trace # type: ignore +from opentelemetry.instrumentation.utils import is_instrumentation_enabled, unwrap +from opentelemetry.trace import Link, SpanKind, Tracer, get_current_span +from typing_extensions import Self +from wrapt import ObjectProxy, wrap_function_wrapper + +from natsrpy.instrumentation.ctx_manager_wrapper import AsyncCtxManagerProxy +from natsrpy.instrumentation.span_builder import SpanAction, SpanBuilder +from natsrpy.js import ( + JetStreamMessage, + MessagesIterator, + PullConsumer, + PullConsumerFetcher, + PushConsumer, +) + + +class MessagesIteratorProxy(ObjectProxy): # type: ignore + """Proxy for iterable subscriptions.""" + + def __init__( + self, + wrapped: MessagesIterator, + tracer: Tracer, + capture_body: bool, + capture_headers: bool, + ) -> None: + super().__init__(wrapped) + # Proxy-local attrs should have _self_ prefix + self._self_tracer = tracer + self._self_cancel_ctx: tuple[Any, Any] | None = None + self._self_capture_body = capture_body + self._self_capture_headers = capture_headers + + def __aiter__(self) -> Self: + return self + + def __cancel_ctx__( + self, + typ: type[BaseException] | None = None, + exc: BaseException | None = None, + tb: TracebackType | None = None, + ) -> None: + if self._self_cancel_ctx is None: + return + token, span_ctx = self._self_cancel_ctx + span_ctx.__exit__(typ, exc, tb) + context.detach(token) + self._self_cancel_ctx = None + + async def __anext__(self) -> Any: + # We cancel previous context if any. + self.__cancel_ctx__() + # Getting next message + next_msg: JetStreamMessage = await anext(self.__wrapped__) + if not is_instrumentation_enabled(): + return next_msg + new_ctx = propagate.extract(next_msg.headers) + token = context.attach(new_ctx) + span = ( + SpanBuilder(self._self_tracer, SpanKind.CONSUMER, SpanAction.RECEIVE) + .with_js_message( + next_msg, + capture_body=self._self_capture_body, + capture_headers=self._self_capture_headers, + ) + .build() + ) + span_ctx = trace.use_span(span, end_on_exit=True) + span_ctx.__enter__() + self._self_cancel_ctx = (token, span_ctx) + + return next_msg + + +class JSConsumerInstrumentation: + """ + Implementation of OpenTelemetry instrumentation for JetStream consumers. + + This instrumentation covers Push and Pull based consumers. + """ + + def __init__( + self, + tracer: Tracer, + capture_headers: bool = False, + capture_body: bool = False, + ) -> None: + self.tracer = tracer + self.capture_headers = capture_headers + self.capture_body = capture_body + + def instrument(self) -> None: + """Setup instrumentation for both consumers.""" + self.instrument_pull() + self.instrument_push() + self.instrument_message() + + @classmethod + def uninstrument(cls) -> None: + """Unwrap Jetstream message methods.""" + # Push-consumer + unwrap(PushConsumer, "consume") + # Pull-consumer + unwrap(PullConsumer, "consume") + unwrap(PullConsumer, "fetch") + # Message + unwrap(JetStreamMessage, "ack") + unwrap(JetStreamMessage, "nack") + unwrap(JetStreamMessage, "progress") + unwrap(JetStreamMessage, "term") + unwrap(JetStreamMessage, "next") + + def instrument_push(self) -> None: + """ + Instrument push consumer related methods. + + Push consumers can only be used as async iterators + with a context manager. This method instruments such approach. + """ + + def wrapper( + wrapper: Any, + _: PushConsumer, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> AsyncGenerator[Any, None]: + return AsyncCtxManagerProxy( + wrapper(*args, **kwargs), + {MessagesIterator: MessagesIteratorProxy}, + self.tracer, + capture_headers=self.capture_headers, + capture_body=self.capture_body, + ) + + wrap_function_wrapper( + "natsrpy._natsrpy_rs.js.consumers", + "PushConsumer.consume", + wrapper, + ) + + def instrument_pull(self) -> None: + """ + Instrument pull consumer related methods. + + Pull consumer can be used directly using + fetch method, or implicitly using an async iterator returned by consume method. + + This method instruments both of those approaches. + """ + + def consume_wrapper( + wrapper: Any, + _: PullConsumer, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> AsyncGenerator[Any, None]: + return AsyncCtxManagerProxy( + wrapper(*args, **kwargs), + {PullConsumerFetcher: MessagesIteratorProxy}, + self.tracer, + capture_headers=self.capture_headers, + capture_body=self.capture_body, + ) + + async def fetch_wrapper( + wrapper: Any, + _: PullConsumer, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> list[JetStreamMessage]: + messages: list[JetStreamMessage] = await wrapper(*args, **kwargs) + if not messages: + return messages + links = [] + for message in messages: + ctx = propagate.extract(message.headers) + sc = get_current_span(ctx).get_span_context() + links.append(Link(sc)) + + span = ( + SpanBuilder( + self.tracer, + SpanKind.CONSUMER, + SpanAction.FETCH, + ) + .with_links(links) + .with_messages_count(len(messages)) + .build() + ) + + with trace.use_span(span, end_on_exit=True): + return messages + + wrap_function_wrapper( + "natsrpy._natsrpy_rs.js.consumers", + "PullConsumer.fetch", + fetch_wrapper, + ) + wrap_function_wrapper( + "natsrpy._natsrpy_rs.js.consumers", + "PullConsumer.consume", + consume_wrapper, + ) + + def instrument_message(self) -> None: + """ + Instrumnet all methods related to a single message. + + Messages in JetStream should be acknowledged or marked as + to be acknowledged later. + + This method wraps all of those methods into instrumented ones, + that actually wrapped. + """ + + def gen_wrapper(action: SpanAction) -> Callable[..., Any]: + def wrapper( + wrapper: Any, + message: JetStreamMessage, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + span = ( + SpanBuilder(self.tracer, SpanKind.INTERNAL, action) + .with_js_message( + message, + capture_body=False, + capture_headers=False, + ) + .build() + ) + with trace.use_span(span, end_on_exit=True): + return wrapper(*args, **kwargs) + + return wrapper + + wrap_function_wrapper( + "natsrpy._natsrpy_rs.js", + "JetStreamMessage.ack", + gen_wrapper(SpanAction.ACK), + ) + wrap_function_wrapper( + "natsrpy._natsrpy_rs.js", + "JetStreamMessage.nack", + gen_wrapper(SpanAction.NACK), + ) + wrap_function_wrapper( + "natsrpy._natsrpy_rs.js", + "JetStreamMessage.progress", + gen_wrapper(SpanAction.PROGRESS), + ) + wrap_function_wrapper( + "natsrpy._natsrpy_rs.js", + "JetStreamMessage.term", + gen_wrapper(SpanAction.TERM), + ) + wrap_function_wrapper( + "natsrpy._natsrpy_rs.js", + "JetStreamMessage.next", + gen_wrapper(SpanAction.NEXT), + ) diff --git a/python/natsrpy/instrumentation/js_publish.py b/python/natsrpy/instrumentation/js_publish.py new file mode 100644 index 0000000..f3aec7b --- /dev/null +++ b/python/natsrpy/instrumentation/js_publish.py @@ -0,0 +1,83 @@ +from collections.abc import Awaitable, Callable +from typing import Any + +from opentelemetry import propagate, trace # type: ignore +from opentelemetry.instrumentation.utils import is_instrumentation_enabled, unwrap +from opentelemetry.trace import SpanKind, Tracer +from wrapt import wrap_function_wrapper + +from natsrpy import Nats +from natsrpy.js import JetStream + +from .span_builder import SpanAction, SpanBuilder + + +class JSPublishInstrumentation: + """Instrument JS publish method.""" + + def __init__( + self, + tracer: Tracer, + capture_headers: bool = False, + capture_body: bool = False, + ) -> None: + self.tracer = tracer + self.capture_headers = capture_headers + self.capture_body = capture_body + + def instrument(self) -> None: + """Setup otel instrumentation for core Nats.""" + self._instrument_publish() + + @staticmethod + def uninstrument() -> None: + """Remove instrumentations from core Nats.""" + unwrap(JetStream, "publish") + + def _instrument_publish(self) -> None: + async def _wrapped_publish( + wrapper: Callable[..., Awaitable[Any]], + subject: str, + payload: str | bytes | bytearray | memoryview, + *, + headers: dict[str, str] | None = None, + **kwargs: dict[str, Any], + ) -> Any: + if not is_instrumentation_enabled(): + return await wrapper( + subject, + payload, + headers=headers, + **kwargs, + ) + span_builder = ( + SpanBuilder(self.tracer, SpanKind.PRODUCER, SpanAction.PUBLISH) + .with_subject(subject) + .with_payload(payload, capture_body=self.capture_body) + ) + headers = headers or {} + if self.capture_headers: + span_builder.with_headers(headers) + span = span_builder.build() + with trace.use_span(span, end_on_exit=True): + propagate.inject(headers) + return await wrapper( + subject, + payload, + headers=headers, + **kwargs, + ) + + def _publish_decorator( + wrapper: Any, + _: Nats, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + return _wrapped_publish(wrapper, *args, **kwargs) + + wrap_function_wrapper( + "natsrpy._natsrpy_rs.js", + "JetStream.publish", + _publish_decorator, + ) diff --git a/python/natsrpy/instrumentation/nats_core.py b/python/natsrpy/instrumentation/nats_core.py index 994be66..cbb6764 100644 --- a/python/natsrpy/instrumentation/nats_core.py +++ b/python/natsrpy/instrumentation/nats_core.py @@ -3,7 +3,7 @@ from types import TracebackType from typing import Any -from opentelemetry import context, propagate, trace +from opentelemetry import context, propagate, trace # type: ignore from opentelemetry.instrumentation.utils import is_instrumentation_enabled, unwrap from opentelemetry.trace import SpanKind, Tracer from typing_extensions import Self @@ -11,16 +11,26 @@ from natsrpy import IteratorSubscription, Message, Nats +from .ctx_manager_wrapper import AsyncCtxManagerProxy from .span_builder import SpanAction, SpanBuilder class IterableSubscriptionProxy(ObjectProxy): # type: ignore """Proxy for iterable subscriptions.""" - def __init__(self, wrapped: IteratorSubscription, tracer: Tracer) -> None: + def __init__( + self, + wrapped: IteratorSubscription, + tracer: Tracer, + capture_headers: bool, + capture_body: bool, + ) -> None: super().__init__(wrapped) # Proxy-local attrs should have _self_ prefix self._self_tracer = tracer + self._self_capture_body = capture_body + self._self_capture_headers = capture_headers + self._self_cancel_ctx: tuple[Any, Any] | None = None def __aiter__(self) -> Self: @@ -50,7 +60,11 @@ async def __anext__(self) -> Any: token = context.attach(new_ctx) span = ( SpanBuilder(self._self_tracer, SpanKind.CONSUMER, SpanAction.RECEIVE) - .with_message(next_msg) + .with_message( + next_msg, + capture_body=self._self_capture_body, + capture_headers=self._self_capture_headers, + ) .build() ) span_ctx = trace.use_span(span, end_on_exit=True) @@ -60,26 +74,6 @@ async def __anext__(self) -> Any: return next_msg -class SubscriptionCtxProxy(ObjectProxy): # type: ignore - """Proxy object for subscription context manager.""" - - def __init__(self, wrapped: Any, tracer: Tracer) -> None: - super().__init__(wrapped) - self._self_tracer = tracer - self._self_sub = None - - async def __aenter__(self) -> Any: - sub = await self.__wrapped__.__aenter__() - if isinstance(sub, IteratorSubscription): - sub = IterableSubscriptionProxy(sub, self._self_tracer) - self._self_sub = sub - return sub - - def __aexit__(self, *args: Any, **kwargs: dict[Any, Any]) -> Any: - if self._self_sub and isinstance(self._self_sub, IterableSubscriptionProxy): - self._self_sub.__cancel_ctx__(*args, **kwargs) - - class NatsCoreInstrumentator: """Instrument core nats methods.""" @@ -104,7 +98,7 @@ def uninstrument() -> None: unwrap(Nats, "publish") def _instrument_publish(self) -> None: - def _wrapped_publish( + async def _wrapped_publish( wrapper: Callable[..., Any], subject: str, payload: bytes | str | bytearray | memoryview, @@ -113,22 +107,24 @@ def _wrapped_publish( **kwargs: dict[str, Any], ) -> Any: if not is_instrumentation_enabled(): - return wrapper( + return await wrapper( subject, payload, headers=headers, **kwargs, ) - span = ( + span_builder = ( SpanBuilder(self.tracer, SpanKind.PRODUCER, SpanAction.PUBLISH) .with_subject(subject) - .with_payload(payload) - .build() + .with_payload(payload, capture_body=self.capture_body) ) headers = headers or {} + if self.capture_headers: + span_builder = span_builder.with_headers(headers) + span = span_builder.build() with trace.use_span(span, end_on_exit=True): propagate.inject(headers) - return wrapper( + return await wrapper( subject, payload, headers=headers, @@ -167,7 +163,11 @@ async def _fixed_cb(message: Message) -> None: token = context.attach(ctx) span = ( SpanBuilder(self.tracer, SpanKind.CONSUMER, SpanAction.RECEIVE) - .with_message(message) + .with_message( + message, + capture_body=self.capture_body, + capture_headers=self.capture_headers, + ) .build() ) try: @@ -194,9 +194,12 @@ def wrapper( args: tuple[Any, ...], kwargs: dict[str, Any], ) -> AsyncGenerator[Any, None]: - return SubscriptionCtxProxy( + return AsyncCtxManagerProxy( wrapper(*process_args(*args, **kwargs)), + {IteratorSubscription: IterableSubscriptionProxy}, self.tracer, + capture_headers=self.capture_headers, + capture_body=self.capture_body, ) wrap_function_wrapper("natsrpy._natsrpy_rs", "Nats.subscribe", wrapper) diff --git a/python/natsrpy/instrumentation/span_builder.py b/python/natsrpy/instrumentation/span_builder.py index baf69c2..b5f24bf 100644 --- a/python/natsrpy/instrumentation/span_builder.py +++ b/python/natsrpy/instrumentation/span_builder.py @@ -1,13 +1,15 @@ import enum +from collections.abc import Sequence from typing import Any from opentelemetry.semconv._incubating.attributes.messaging_attributes import ( + MESSAGING_BATCH_MESSAGE_COUNT, MESSAGING_DESTINATION_NAME, MESSAGING_MESSAGE_BODY_SIZE, MESSAGING_MESSAGE_ID, MESSAGING_SYSTEM, ) -from opentelemetry.trace import Span, SpanKind, Tracer +from opentelemetry.trace import Link, Span, SpanKind, Tracer from typing_extensions import Self from natsrpy import Message @@ -22,6 +24,12 @@ class SpanAction(enum.Enum): PUBLISH = "publish" RECEIVE = "receive" + FETCH = "fetch" + ACK = "ack" + NACK = "nack" + PROGRESS = "progress" + TERM = "term" + NEXT = "next" class SpanBuilder: @@ -32,15 +40,23 @@ def __init__(self, tracer: Tracer, kind: SpanKind, action: SpanAction) -> None: self.attributes: dict[str, Any] = DEFAULT_ATTRS.copy() self.kind = kind self.action = action + self.links: Sequence[Link] | None = None def with_subject(self, subject: str) -> Self: """Set message subject.""" self.attributes[MESSAGING_DESTINATION_NAME] = subject return self - def with_payload(self, payload: Any) -> Self: + def with_payload(self, payload: Any, capture_body: bool = False) -> Self: """Set payload-related attributes.""" self.attributes[MESSAGING_MESSAGE_BODY_SIZE] = len(payload) + if capture_body: + self.attributes["messaging.message.payload"] = payload + return self + + def with_headers(self, headers: dict[str, Any]) -> Self: + """Add headers to a list of lables.""" + self.attributes["messaging.message.headers"] = headers return self def with_message_id(self, message_id: int) -> Self: @@ -48,17 +64,46 @@ def with_message_id(self, message_id: int) -> Self: self.attributes[MESSAGING_MESSAGE_ID] = message_id return self - def with_message(self, msg: Message) -> Self: + def with_message( + self, + msg: Message, + capture_body: bool, + capture_headers: bool, + ) -> Self: """Add message-related attributes.""" - return self.with_subject(msg.subject).with_payload(msg.payload) + self.with_subject(msg.subject).with_payload( + msg.payload, + capture_body=capture_body, + ) + if capture_headers: + self.with_headers(msg.headers) + return self - def with_js_message(self, msg: JetStreamMessage) -> Self: + def with_messages_count(self, count: int) -> Self: + """Set number of messages in the batch.""" + self.attributes[MESSAGING_BATCH_MESSAGE_COUNT] = count + return self + + def with_js_message( + self, + msg: JetStreamMessage, + capture_body: bool, + capture_headers: bool, + ) -> Self: """Add message-related attributes in JS context.""" - return ( + ( self.with_subject(msg.subject) - .with_payload(msg.payload) + .with_payload(msg.payload, capture_body=capture_body) .with_message_id(msg.stream_sequence) ) + if capture_headers: + self.with_headers(msg.headers) + return self + + def with_links(self, links: Sequence[Link]) -> Self: + """Attache linked spans.""" + self.links = links + return self def build(self) -> Span: """Build resulting span.""" @@ -66,4 +111,5 @@ def build(self) -> Span: self.action.value.lower(), kind=self.kind, attributes=self.attributes, + links=self.links, ) diff --git a/python/natsrpy/js.py b/python/natsrpy/js.py index f4e8245..d7fe3f3 100644 --- a/python/natsrpy/js.py +++ b/python/natsrpy/js.py @@ -6,8 +6,11 @@ PriorityPolicy, PullConsumer, PullConsumerConfig, + PullConsumerContextManager, + PullConsumerFetcher, PushConsumer, PushConsumerConfig, + PushConsumerContextManager, ReplayPolicy, ) from ._natsrpy_rs.js.counters import CounterEntry, Counters, CountersConfig @@ -85,8 +88,11 @@ "Publication", "PullConsumer", "PullConsumerConfig", + "PullConsumerContextManager", + "PullConsumerFetcher", "PushConsumer", "PushConsumerConfig", + "PushConsumerContextManager", "ReplayPolicy", "Republish", "RetentionPolicy", diff --git a/python/tests/test_consumers.py b/python/tests/test_consumers.py index 1be1c36..e0b20df 100644 --- a/python/tests/test_consumers.py +++ b/python/tests/test_consumers.py @@ -263,10 +263,11 @@ async def test_push_consumer_messages(js: JetStream) -> None: name=f"consumer-{uuid.uuid4()}", ) consumer = await stream.consumers.create(consumer_config) - msgs_iter = await consumer.messages() - for message in messages: - nats_msg = await asyncio.wait_for(anext(msgs_iter), timeout=0.5) - assert message == nats_msg.payload + async with consumer.consume() as consumer_messages: + for message in messages: + nats_msg = await asyncio.wait_for(anext(consumer_messages), timeout=0.5) + assert message == nats_msg.payload + finally: await js.streams.delete(stream_name) diff --git a/src/js/consumers/mod.rs b/src/js/consumers/mod.rs index 721dfbe..018c59d 100644 --- a/src/js/consumers/mod.rs +++ b/src/js/consumers/mod.rs @@ -7,9 +7,13 @@ pub mod pymod { #[pymodule_export] use super::common::{AckPolicy, DeliverPolicy, PriorityPolicy, ReplayPolicy}; #[pymodule_export] - pub use super::pull::{config::PullConsumerConfig, consumer::PullConsumer}; + pub use super::pull::{ + config::PullConsumerConfig, consumer::PullConsumer, consumer::PullConsumerContextManager, + consumer::PullConsumerFetcher, + }; #[pymodule_export] pub use super::push::{ config::PushConsumerConfig, consumer::MessagesIterator, consumer::PushConsumer, + consumer::PushConsumerContextManager, }; } diff --git a/src/js/consumers/pull/consumer.rs b/src/js/consumers/pull/consumer.rs index 3dffbec..ae4a0e1 100644 --- a/src/js/consumers/pull/consumer.rs +++ b/src/js/consumers/pull/consumer.rs @@ -1,11 +1,15 @@ use std::sync::Arc; use futures_util::StreamExt; -use pyo3::{Bound, PyAny, Python}; +use pyo3::{Bound, PyAny, PyRef, Python}; use crate::{ - exceptions::rust_err::NatsrpyResult, - utils::{futures::natsrpy_future_with_timeout, py_types::TimeValue}, + exceptions::rust_err::{NatsrpyError, NatsrpyResult}, + js::pymod::JetStreamMessage, + utils::{ + futures::natsrpy_future_with_timeout, natsrpy_future, py_types::TimeValue, + streamer::Streamer, + }, }; type NatsPullConsumer = @@ -33,8 +37,98 @@ impl PullConsumer { } } +#[pyo3::pyclass] +pub struct PullConsumerFetcher { + pub consumer: Arc, + pub messages: Arc< + tokio::sync::Mutex< + Streamer< + Result< + async_nats::jetstream::Message, + async_nats::jetstream::consumer::pull::MessagesError, + >, + >, + >, + >, +} + +impl PullConsumerFetcher { + #[must_use] + pub fn new( + consumer: Arc, + messages: async_nats::jetstream::consumer::pull::Stream, + ) -> Self { + Self { + consumer, + messages: Arc::new(tokio::sync::Mutex::new(Streamer::new(messages))), + } + } +} + +#[pyo3::pymethods] +impl PullConsumerFetcher { + #[must_use] + pub const fn __aiter__(slf: PyRef) -> PyRef { + slf + } + + pub fn __anext__<'py>(&self, py: Python<'py>) -> NatsrpyResult> { + let ctx = self.messages.clone(); + natsrpy_future(py, async move { + let value = ctx.lock().await.next().await; + match value { + Some(info) => JetStreamMessage::try_from(info?), + None => Err(NatsrpyError::AsyncStopIteration), + } + }) + } +} + +#[pyo3::pyclass] +pub struct PullConsumerContextManager { + consumer: Arc, +} + +impl PullConsumerContextManager { + #[must_use] + pub const fn new(consumer: Arc) -> Self { + Self { consumer } + } +} + +#[pyo3::pymethods] +impl PullConsumerContextManager { + pub fn __aenter__<'py>(&self, py: Python<'py>) -> NatsrpyResult> { + let consumer = self.consumer.clone(); + natsrpy_future(py, async move { + let messages = consumer.messages().await?; + Ok(PullConsumerFetcher::new(consumer, messages)) + }) + } + + #[pyo3(signature=( + _exc_type=None, + _exc_val=None, + _exc_tb=None, + ))] + pub fn __aexit__<'py>( + &self, + py: Python<'py>, + _exc_type: Option>, + _exc_val: Option>, + _exc_tb: Option>, + ) -> NatsrpyResult> { + natsrpy_future(py, async move { Ok(()) }) + } +} + #[pyo3::pymethods] impl PullConsumer { + #[must_use] + pub fn consume(&self) -> PullConsumerContextManager { + PullConsumerContextManager::new(self.consumer.clone()) + } + #[pyo3(signature=( max_messages=None, group=None, diff --git a/src/js/consumers/push/consumer.rs b/src/js/consumers/push/consumer.rs index 862a491..1fe386d 100644 --- a/src/js/consumers/push/consumer.rs +++ b/src/js/consumers/push/consumer.rs @@ -34,6 +34,43 @@ impl PushConsumer { } } +#[pyo3::pyclass] +pub struct PushConsumerContextManager { + context: Arc, +} + +impl PushConsumerContextManager { + #[must_use] + pub const fn new(context: Arc) -> Self { + Self { context } + } +} + +#[pyo3::pymethods] +impl PushConsumerContextManager { + pub fn __aenter__<'py>(&self, py: Python<'py>) -> NatsrpyResult> { + let consumer = self.context.clone(); + natsrpy_future(py, async move { + Ok(MessagesIterator::from(consumer.messages().await?)) + }) + } + + #[pyo3(signature=( + _exc_type=None, + _exc_val=None, + _exc_tb=None, + ))] + pub fn __aexit__<'py>( + &self, + py: Python<'py>, + _exc_type: Option>, + _exc_val: Option>, + _exc_tb: Option>, + ) -> NatsrpyResult> { + natsrpy_future(py, async move { Ok(()) }) + } +} + #[pyo3::pyclass] pub struct MessagesIterator { messages: Option>>, @@ -49,11 +86,9 @@ impl From for MessagesIterator #[pyo3::pymethods] impl PushConsumer { - pub fn messages<'py>(&self, py: Python<'py>) -> NatsrpyResult> { - let consumer = self.consumer.clone(); - natsrpy_future(py, async move { - Ok(MessagesIterator::from(consumer.messages().await?)) - }) + #[must_use] + pub fn consume(&self) -> PushConsumerContextManager { + PushConsumerContextManager::new(self.consumer.clone()) } #[must_use] diff --git a/src/js/consumers/push/mod.rs b/src/js/consumers/push/mod.rs index f7a77e0..baeffed 100644 --- a/src/js/consumers/push/mod.rs +++ b/src/js/consumers/push/mod.rs @@ -2,4 +2,4 @@ pub mod config; pub mod consumer; pub use config::PushConsumerConfig; -pub use consumer::PushConsumer; +pub use consumer::{PushConsumer, PushConsumerContextManager}; diff --git a/src/js/managers/consumers.rs b/src/js/managers/consumers.rs index f5b16c2..3beac92 100644 --- a/src/js/managers/consumers.rs +++ b/src/js/managers/consumers.rs @@ -153,12 +153,6 @@ impl<'py> FromPyObject<'_, 'py> for ConsumerConfigs { } } -#[pyo3::pyclass] -pub enum Consumers { - Pull(consumers::pull::PullConsumer), - Push(consumers::push::PushConsumer), -} - #[pyo3::pymethods] impl ConsumersManager { pub fn create<'py>( From b1bddb77ca2401a31142f8d84c773821979e4918 Mon Sep 17 00:00:00 2001 From: Copilot <198982749+Copilot@users.noreply.github.com> Date: Thu, 9 Apr 2026 19:40:16 +0200 Subject: [PATCH 2/2] Expand test coverage and fix incorrect stub type annotations (#49) Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: s3rius <18153319+s3rius@users.noreply.github.com> --- python/natsrpy/_natsrpy_rs/js/kv.pyi | 6 +- python/natsrpy/_natsrpy_rs/js/stream.pyi | 4 +- python/tests/test_consumers.py | 40 +++++++++ python/tests/test_jetstream.py | 4 + python/tests/test_js_message_acks.py | 105 +++++++++++++++++++++++ python/tests/test_kv.py | 38 ++++++++ python/tests/test_nats_client.py | 75 ++++++++++++++++ python/tests/test_streams.py | 60 +++++++++++++ python/tests/test_subscriptions.py | 11 +++ 9 files changed, 337 insertions(+), 6 deletions(-) diff --git a/python/natsrpy/_natsrpy_rs/js/kv.pyi b/python/natsrpy/_natsrpy_rs/js/kv.pyi index 0c38e2b..187a255 100644 --- a/python/natsrpy/_natsrpy_rs/js/kv.pyi +++ b/python/natsrpy/_natsrpy_rs/js/kv.pyi @@ -122,10 +122,10 @@ class KVConfig: """ bucket: str - description: str + description: str | None max_value_size: int | None history: int | None - max_age: float | None + max_age: timedelta | None max_bytes: int | None storage: StorageType | None num_replicas: int | None @@ -135,7 +135,7 @@ class KVConfig: mirror_direct: bool | None compression: bool | None placement: Placement | None - limit_markers: float | None + limit_markers: timedelta | None def __new__( cls, diff --git a/python/natsrpy/_natsrpy_rs/js/stream.pyi b/python/natsrpy/_natsrpy_rs/js/stream.pyi index ca9c236..de0fbe1 100644 --- a/python/natsrpy/_natsrpy_rs/js/stream.pyi +++ b/python/natsrpy/_natsrpy_rs/js/stream.pyi @@ -132,8 +132,6 @@ class SubjectTransform: source: str destination: str - def __new__(cls, source: str, destination: str) -> Self: ... - @final class Source: """Configuration for a stream source or mirror origin. @@ -454,7 +452,7 @@ class StreamInfo: """ config: StreamConfig - created: float + created: int state: StreamState cluster: ClusterInfo | None mirror: SourceInfo | None diff --git a/python/tests/test_consumers.py b/python/tests/test_consumers.py index e0b20df..0f84f1d 100644 --- a/python/tests/test_consumers.py +++ b/python/tests/test_consumers.py @@ -304,3 +304,43 @@ async def test_push_consumer_config_properties() -> None: assert config.description == "push test" assert config.ack_policy == AckPolicy.EXPLICIT assert config.deliver_policy == DeliverPolicy.NEW + + +async def test_pull_consumer_consume_context_manager(js: JetStream) -> None: + stream_name = f"test-pullctx-{uuid.uuid4().hex[:8]}" + subj = f"{stream_name}.data" + config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"]) + stream = await js.streams.create(config) + try: + await js.publish(subj, b"consume-msg", wait=True) + consumer = await stream.consumers.create( + PullConsumerConfig(name=f"consumer-{uuid.uuid4().hex[:8]}"), + ) + async with consumer.consume() as fetcher: + msg = await anext(fetcher) + assert msg.payload == b"consume-msg" + await msg.ack() + finally: + await js.streams.delete(stream_name) + + +async def test_push_consumer_consume_context_manager(js: JetStream) -> None: + stream_name = f"test-pushctx-{uuid.uuid4().hex[:8]}" + subj = f"{stream_name}.data" + config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"]) + stream = await js.streams.create(config) + try: + await js.publish(subj, b"push-consume-msg", wait=True) + deliver_subj = uuid.uuid4().hex + consumer = await stream.consumers.create( + PushConsumerConfig( + deliver_subject=deliver_subj, + name=f"push-{uuid.uuid4().hex[:8]}", + ), + ) + async with consumer.consume() as msgs: + msg = await anext(msgs) + assert msg.payload == b"push-consume-msg" + await msg.ack() + finally: + await js.streams.delete(stream_name) diff --git a/python/tests/test_jetstream.py b/python/tests/test_jetstream.py index 4fac22a..d18ef7e 100644 --- a/python/tests/test_jetstream.py +++ b/python/tests/test_jetstream.py @@ -48,3 +48,7 @@ async def test_jetstream_publish_with_headers(js: JetStream) -> None: await js.publish(subj, b"with-headers", headers={"x-test": "value"}, wait=True) finally: await js.streams.delete(stream_name) + + +async def test_jetstream_has_counters_manager(js: JetStream) -> None: + assert js.counters is not None diff --git a/python/tests/test_js_message_acks.py b/python/tests/test_js_message_acks.py index bc4b3bc..16722a6 100644 --- a/python/tests/test_js_message_acks.py +++ b/python/tests/test_js_message_acks.py @@ -1,4 +1,5 @@ import uuid +from datetime import datetime from natsrpy.js import ( AckPolicy, @@ -166,3 +167,107 @@ async def test_message_headers_empty(js: JetStream) -> None: assert isinstance(messages[0].headers, dict) finally: await js.streams.delete(stream_name) + + +async def test_message_stream_sequence_and_consumer_sequence(js: JetStream) -> None: + stream_name = f"test-seqs-{uuid.uuid4().hex[:8]}" + subj = f"{stream_name}.data" + config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"]) + stream = await js.streams.create(config) + try: + await js.publish(subj, b"seq-msg", wait=True) + consumer = await stream.consumers.create( + PullConsumerConfig(name=f"consumer-{uuid.uuid4().hex[:8]}"), + ) + messages = await consumer.fetch(max_messages=1, timeout=5.0) + assert len(messages) == 1 + msg = messages[0] + assert isinstance(msg.stream_sequence, int) + assert msg.stream_sequence >= 1 + assert isinstance(msg.consumer_sequence, int) + assert msg.consumer_sequence >= 1 + finally: + await js.streams.delete(stream_name) + + +async def test_message_consumer_and_stream_names(js: JetStream) -> None: + stream_name = f"test-names-{uuid.uuid4().hex[:8]}" + subj = f"{stream_name}.data" + config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"]) + stream = await js.streams.create(config) + try: + await js.publish(subj, b"names-msg", wait=True) + consumer_name = f"consumer-{uuid.uuid4().hex[:8]}" + consumer = await stream.consumers.create( + PullConsumerConfig(name=consumer_name), + ) + messages = await consumer.fetch(max_messages=1, timeout=5.0) + assert len(messages) == 1 + msg = messages[0] + assert msg.stream == stream_name + assert msg.consumer == consumer_name + finally: + await js.streams.delete(stream_name) + + +async def test_message_delivered_and_pending(js: JetStream) -> None: + stream_name = f"test-delpend-{uuid.uuid4().hex[:8]}" + subj = f"{stream_name}.data" + config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"]) + stream = await js.streams.create(config) + try: + await js.publish(subj, b"msg-1", wait=True) + await js.publish(subj, b"msg-2", wait=True) + consumer = await stream.consumers.create( + PullConsumerConfig(name=f"consumer-{uuid.uuid4().hex[:8]}"), + ) + messages = await consumer.fetch(max_messages=1, timeout=5.0) + assert len(messages) == 1 + msg = messages[0] + assert isinstance(msg.delivered, int) + assert msg.delivered >= 1 + assert isinstance(msg.pending, int) + assert msg.pending >= 0 + await msg.ack() + finally: + await js.streams.delete(stream_name) + + +async def test_message_published_timestamp(js: JetStream) -> None: + stream_name = f"test-pub-ts-{uuid.uuid4().hex[:8]}" + subj = f"{stream_name}.data" + config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"]) + stream = await js.streams.create(config) + try: + await js.publish(subj, b"ts-msg", wait=True) + consumer = await stream.consumers.create( + PullConsumerConfig(name=f"consumer-{uuid.uuid4().hex[:8]}"), + ) + messages = await consumer.fetch(max_messages=1, timeout=5.0) + assert len(messages) == 1 + msg = messages[0] + assert isinstance(msg.published, datetime) + finally: + await js.streams.delete(stream_name) + + +async def test_message_length_and_dunder_len(js: JetStream) -> None: + stream_name = f"test-msglen-{uuid.uuid4().hex[:8]}" + subj = f"{stream_name}.data" + config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"]) + stream = await js.streams.create(config) + try: + payload = b"length-test-payload" + await js.publish(subj, payload, wait=True) + consumer = await stream.consumers.create( + PullConsumerConfig(name=f"consumer-{uuid.uuid4().hex[:8]}"), + ) + messages = await consumer.fetch(max_messages=1, timeout=5.0) + assert len(messages) == 1 + msg = messages[0] + assert isinstance(msg.length, int) + assert msg.length >= len(payload) + assert len(msg) == msg.length + await msg.ack() + finally: + await js.streams.delete(stream_name) diff --git a/python/tests/test_kv.py b/python/tests/test_kv.py index 7b02b52..554d5be 100644 --- a/python/tests/test_kv.py +++ b/python/tests/test_kv.py @@ -831,3 +831,41 @@ async def test_kv_operation_equality() -> None: assert KVOperation.Put != KVOperation.Delete assert KVOperation.Put != KVOperation.Purge assert KVOperation.Delete != KVOperation.Purge + + +async def test_kv_entry_seen_current(js: JetStream) -> None: + bucket = f"test-kv-seen-{uuid.uuid4().hex[:8]}" + config = KVConfig(bucket=bucket) + kv = await js.kv.create(config) + try: + # Create watcher on empty bucket, then put — the received entry is a live update + watcher = await kv.watch_all() + await kv.put("mykey", b"value1") + entry = await asyncio.wait_for(anext(watcher), timeout=5.0) + assert isinstance(entry.seen_current, bool) + # First live update on an empty bucket is marked as seen_current + assert entry.seen_current is True + finally: + await js.kv.delete(bucket) + + +async def test_kv_config_max_age_timedelta(js: JetStream) -> None: + bucket = f"test-kv-maxage-{uuid.uuid4().hex[:8]}" + max_age = timedelta(hours=1) + config = KVConfig(bucket=bucket, max_age=max_age) + assert config.max_age == max_age + kv = await js.kv.create(config) + try: + assert kv is not None + finally: + await js.kv.delete(bucket) + + +async def test_kv_config_description_none() -> None: + config = KVConfig(bucket="test-desc-none") + assert config.description is None + + +async def test_kv_config_description_set() -> None: + config = KVConfig(bucket="test-desc-set", description="my description") + assert config.description == "my description" diff --git a/python/tests/test_nats_client.py b/python/tests/test_nats_client.py index 47bacc4..06782f6 100644 --- a/python/tests/test_nats_client.py +++ b/python/tests/test_nats_client.py @@ -92,3 +92,78 @@ async def test_nats_connection_failure() -> None: nats = Nats(addrs=["localhost:19999"]) with pytest.raises(Exception): await nats.startup() + + +async def test_nats_addr_property(nats_url: str) -> None: + nats = Nats(addrs=[nats_url]) + assert nats.addr == [nats_url] + + +async def test_nats_addr_default() -> None: + nats = Nats() + assert nats.addr == ["nats://localhost:4222"] + + +async def test_nats_token_property() -> None: + nats = Nats(token="secret-token") # noqa: S106 + assert nats.token == "secret-token" # noqa: S105 + + +async def test_nats_token_default() -> None: + nats = Nats() + assert nats.token is None + + +async def test_nats_nkey_property() -> None: + nats = Nats() + assert nats.nkey is None + + +async def test_nats_user_and_pass_property() -> None: + nats = Nats(user_and_pass=("user", "pass")) + assert nats.user_and_pass == ("user", "pass") + + +async def test_nats_user_and_pass_default() -> None: + nats = Nats() + assert nats.user_and_pass is None + + +async def test_nats_custom_inbox_prefix_property() -> None: + nats = Nats(custom_inbox_prefix="_custom") + assert nats.custom_inbox_prefix == "_custom" + + +async def test_nats_custom_inbox_prefix_default() -> None: + nats = Nats() + assert nats.custom_inbox_prefix is None + + +async def test_nats_read_buffer_capacity_property() -> None: + nats = Nats(read_buffer_capacity=1024) + assert nats.read_buffer_capacity == 1024 + + +async def test_nats_read_buffer_capacity_default() -> None: + nats = Nats() + assert nats.read_buffer_capacity == 65535 + + +async def test_nats_sender_capacity_property() -> None: + nats = Nats(sender_capacity=64) + assert nats.sender_capacity == 64 + + +async def test_nats_sender_capacity_default() -> None: + nats = Nats() + assert nats.sender_capacity == 128 + + +async def test_nats_max_reconnects_property() -> None: + nats = Nats(max_reconnects=5) + assert nats.max_reconnects == 5 + + +async def test_nats_max_reconnects_default() -> None: + nats = Nats() + assert nats.max_reconnects is None diff --git a/python/tests/test_streams.py b/python/tests/test_streams.py index 2dfef3b..e1b3db6 100644 --- a/python/tests/test_streams.py +++ b/python/tests/test_streams.py @@ -252,3 +252,63 @@ async def test_stream_state_after_publish(js: JetStream) -> None: assert info.state.bytes > 0 finally: await js.streams.delete(name) + + +async def test_stream_name_property(js: JetStream) -> None: + name = f"test-sname-{uuid.uuid4().hex[:8]}" + config = StreamConfig(name=name, subjects=[f"{name}.>"]) + stream = await js.streams.create(config) + try: + assert stream.name == name + finally: + await js.streams.delete(name) + + +async def test_stream_info_created_field(js: JetStream) -> None: + name = f"test-screated-{uuid.uuid4().hex[:8]}" + config = StreamConfig(name=name, subjects=[f"{name}.>"]) + stream = await js.streams.create(config) + try: + info = await stream.get_info() + assert isinstance(info.created, (int, float)) + assert info.created > 0 + finally: + await js.streams.delete(name) + + +async def test_stream_state_subjects_count(js: JetStream) -> None: + name = f"test-ssubj-{uuid.uuid4().hex[:8]}" + config = StreamConfig(name=name, subjects=[f"{name}.>"]) + stream = await js.streams.create(config) + try: + await js.publish(f"{name}.a", b"msg-a", wait=True) + await js.publish(f"{name}.b", b"msg-b", wait=True) + info = await stream.get_info() + assert info.state.subjects_count == 2 + finally: + await js.streams.delete(name) + + +async def test_stream_state_timestamps(js: JetStream) -> None: + name = f"test-sts-{uuid.uuid4().hex[:8]}" + subj = f"{name}.data" + config = StreamConfig(name=name, subjects=[f"{name}.>"]) + stream = await js.streams.create(config) + try: + await js.publish(subj, b"ts-msg", wait=True) + info = await stream.get_info() + assert info.state.first_timestamp >= 0 + assert info.state.last_timestamp >= 0 + finally: + await js.streams.delete(name) + + +async def test_stream_state_consumer_count(js: JetStream) -> None: + name = f"test-scnt-{uuid.uuid4().hex[:8]}" + config = StreamConfig(name=name, subjects=[f"{name}.>"]) + stream = await js.streams.create(config) + try: + info = await stream.get_info() + assert info.state.consumer_count == 0 + finally: + await js.streams.delete(name) diff --git a/python/tests/test_subscriptions.py b/python/tests/test_subscriptions.py index 7aa37bd..7ba064d 100644 --- a/python/tests/test_subscriptions.py +++ b/python/tests/test_subscriptions.py @@ -123,3 +123,14 @@ async def test_fullwild_subscription(nats: Nats) -> None: msg = await anext(sub) assert msg.payload == b"full-wild" assert msg.subject == f"{prefix}.a.b.c" + + +async def test_subscription_ctx_manager_detatch(nats: Nats) -> None: + subj = uuid.uuid4().hex + ctx = nats.subscribe(subject=subj) + sub = await ctx.detatch() + assert isinstance(sub, IteratorSubscription) + await nats.publish(subj, b"detatch-test") + msg = await asyncio.wait_for(anext(sub), timeout=5.0) + assert msg.payload == b"detatch-test" + await sub.unsubscribe()