From c37165bb67e8332fe27b44d07ecfd4a0e178035b Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 26 Mar 2026 12:34:43 +0000 Subject: [PATCH 1/2] Add comprehensive tests and docstrings for counters module, fix CountersManager.update stub type Co-authored-by: s3rius <18153319+s3rius@users.noreply.github.com> Agent-Logs-Url: https://github.com/taskiq-python/natsrpy/sessions/2733a833-233b-4952-805d-165167131405 --- python/natsrpy/_natsrpy_rs/js/counters.pyi | 66 +++++- python/natsrpy/_natsrpy_rs/js/managers.pyi | 2 +- python/tests/test_counters.py | 227 +++++++++++++++++++++ 3 files changed, 290 insertions(+), 5 deletions(-) create mode 100644 python/tests/test_counters.py diff --git a/python/natsrpy/_natsrpy_rs/js/counters.pyi b/python/natsrpy/_natsrpy_rs/js/counters.pyi index 86e13c9..44f960d 100644 --- a/python/natsrpy/_natsrpy_rs/js/counters.pyi +++ b/python/natsrpy/_natsrpy_rs/js/counters.pyi @@ -155,6 +155,20 @@ class CountersConfig: @final class CounterEntry: + """A single counter entry retrieved from a counters stream. + + Holds the current aggregated value for a counter subject along + with metadata about cross-stream sources and the last increment. + + Attributes: + subject: the subject this counter entry belongs to. + value: the current aggregated counter value. + sources: mapping of source stream names to their per-subject + counter contributions. + increment: the value of the last increment applied, or ``None`` + when the entry was retrieved via ``Counters.get``. + """ + subject: str value: int sources: dict[str, dict[str, int]] @@ -162,24 +176,68 @@ class CounterEntry: @final class Counters: + """Handle for a JetStream counters stream. + + Provides atomic increment, decrement, and retrieval operations + on CRDT counters backed by a JetStream stream with + ``allow_message_counter`` enabled. + """ + async def add( self, key: str, value: int, timeout: float | timedelta | None = None, - ) -> int: ... + ) -> int: + """Add an arbitrary value to a counter. + + :param key: subject key identifying the counter. + :param value: integer amount to add (may be negative). + :param timeout: optional operation timeout in seconds or as + a timedelta. + :return: the new counter value after the addition. + """ + async def incr( self, key: str, timeout: float | timedelta | None = None, - ) -> int: ... + ) -> int: + """Increment a counter by one. + + Shorthand for ``add(key, 1)``. + + :param key: subject key identifying the counter. + :param timeout: optional operation timeout in seconds or as + a timedelta. + :return: the new counter value after the increment. + """ + async def decr( self, key: str, timeout: float | timedelta | None = None, - ) -> int: ... + ) -> int: + """Decrement a counter by one. + + Shorthand for ``add(key, -1)``. + + :param key: subject key identifying the counter. + :param timeout: optional operation timeout in seconds or as + a timedelta. + :return: the new counter value after the decrement. + """ + async def get( self, key: str, timeout: float | timedelta | None = None, - ) -> CounterEntry: ... + ) -> CounterEntry: + """Retrieve the current value of a counter. + + :param key: subject key identifying the counter. + :param timeout: optional operation timeout in seconds or as + a timedelta. + :return: counter entry with the current value and metadata. + :raises Exception: if no counter entry exists for the key. + """ diff --git a/python/natsrpy/_natsrpy_rs/js/managers.pyi b/python/natsrpy/_natsrpy_rs/js/managers.pyi index 1eca8e1..3e5af25 100644 --- a/python/natsrpy/_natsrpy_rs/js/managers.pyi +++ b/python/natsrpy/_natsrpy_rs/js/managers.pyi @@ -91,7 +91,7 @@ class CountersManager: :return: True if the stream was deleted. """ - async def update(self, config: CountersConfig) -> Counters: + async def update(self, config: StreamConfig) -> Counters: """Update an existing counters stream configuration. :param config: new stream configuration. diff --git a/python/tests/test_counters.py b/python/tests/test_counters.py new file mode 100644 index 0000000..6036a5d --- /dev/null +++ b/python/tests/test_counters.py @@ -0,0 +1,227 @@ +import uuid + +import pytest +from natsrpy.js import CounterEntry, Counters, CountersConfig, JetStream, StreamConfig + + +async def test_counters_create(js: JetStream) -> None: + name = f"test-cnt-create-{uuid.uuid4().hex[:8]}" + config = CountersConfig(name=name, subjects=[f"{name}.>"]) + counters = await js.counters.create(config) + try: + assert isinstance(counters, Counters) + finally: + await js.counters.delete(name) + + +async def test_counters_create_or_update(js: JetStream) -> None: + name = f"test-cnt-cou-{uuid.uuid4().hex[:8]}" + config = CountersConfig(name=name, subjects=[f"{name}.>"]) + counters = await js.counters.create_or_update(config) + try: + assert isinstance(counters, Counters) + config.description = "updated" + counters2 = await js.counters.create_or_update(config) + assert isinstance(counters2, Counters) + finally: + await js.counters.delete(name) + + +async def test_counters_get(js: JetStream) -> None: + name = f"test-cnt-get-{uuid.uuid4().hex[:8]}" + config = CountersConfig(name=name, subjects=[f"{name}.>"]) + await js.counters.create(config) + try: + counters = await js.counters.get(name) + assert isinstance(counters, Counters) + finally: + await js.counters.delete(name) + + +async def test_counters_delete(js: JetStream) -> None: + name = f"test-cnt-del-{uuid.uuid4().hex[:8]}" + config = CountersConfig(name=name, subjects=[f"{name}.>"]) + await js.counters.create(config) + result = await js.counters.delete(name) + assert result is True + + +async def test_counters_update(js: JetStream) -> None: + name = f"test-cnt-upd-{uuid.uuid4().hex[:8]}" + config = CountersConfig(name=name, subjects=[f"{name}.>"]) + await js.counters.create(config) + try: + update_cfg = StreamConfig( + name=name, + subjects=[f"{name}.>"], + allow_direct=True, + allow_message_counter=True, + description="updated description", + ) + counters = await js.counters.update(update_cfg) + assert isinstance(counters, Counters) + finally: + await js.counters.delete(name) + + +async def test_counters_incr(js: JetStream) -> None: + name = f"test-cnt-incr-{uuid.uuid4().hex[:8]}" + subj = f"{name}.hits" + config = CountersConfig(name=name, subjects=[f"{name}.>"]) + counters = await js.counters.create(config) + try: + value = await counters.incr(subj) + assert value == 1 + finally: + await js.counters.delete(name) + + +async def test_counters_decr(js: JetStream) -> None: + name = f"test-cnt-decr-{uuid.uuid4().hex[:8]}" + subj = f"{name}.hits" + config = CountersConfig(name=name, subjects=[f"{name}.>"]) + counters = await js.counters.create(config) + try: + value = await counters.decr(subj) + assert value == -1 + finally: + await js.counters.delete(name) + + +async def test_counters_add(js: JetStream) -> None: + name = f"test-cnt-add-{uuid.uuid4().hex[:8]}" + subj = f"{name}.hits" + config = CountersConfig(name=name, subjects=[f"{name}.>"]) + counters = await js.counters.create(config) + try: + value = await counters.add(subj, 10) + assert value == 10 + finally: + await js.counters.delete(name) + + +async def test_counters_add_negative(js: JetStream) -> None: + name = f"test-cnt-addneg-{uuid.uuid4().hex[:8]}" + subj = f"{name}.hits" + config = CountersConfig(name=name, subjects=[f"{name}.>"]) + counters = await js.counters.create(config) + try: + value = await counters.add(subj, -5) + assert value == -5 + finally: + await js.counters.delete(name) + + +async def test_counters_get_entry(js: JetStream) -> None: + name = f"test-cnt-gete-{uuid.uuid4().hex[:8]}" + subj = f"{name}.hits" + config = CountersConfig(name=name, subjects=[f"{name}.>"]) + counters = await js.counters.create(config) + try: + await counters.incr(subj) + entry = await counters.get(subj) + assert isinstance(entry, CounterEntry) + assert entry.subject == subj + assert entry.value == 1 + finally: + await js.counters.delete(name) + + +async def test_counter_entry_attributes(js: JetStream) -> None: + name = f"test-cnt-attr-{uuid.uuid4().hex[:8]}" + subj = f"{name}.hits" + config = CountersConfig(name=name, subjects=[f"{name}.>"]) + counters = await js.counters.create(config) + try: + await counters.add(subj, 5) + entry = await counters.get(subj) + assert isinstance(entry.subject, str) + assert isinstance(entry.value, int) + assert isinstance(entry.sources, dict) + assert entry.increment is None or isinstance(entry.increment, int) + finally: + await js.counters.delete(name) + + +async def test_counters_multiple_increments(js: JetStream) -> None: + name = f"test-cnt-multi-{uuid.uuid4().hex[:8]}" + subj = f"{name}.hits" + config = CountersConfig(name=name, subjects=[f"{name}.>"]) + counters = await js.counters.create(config) + try: + val1 = await counters.incr(subj) + val2 = await counters.incr(subj) + val3 = await counters.incr(subj) + assert val1 == 1 + assert val2 == 2 + assert val3 == 3 + entry = await counters.get(subj) + assert entry.value == 3 + finally: + await js.counters.delete(name) + + +async def test_counters_incr_then_decr(js: JetStream) -> None: + name = f"test-cnt-incdec-{uuid.uuid4().hex[:8]}" + subj = f"{name}.hits" + config = CountersConfig(name=name, subjects=[f"{name}.>"]) + counters = await js.counters.create(config) + try: + await counters.incr(subj) + await counters.incr(subj) + await counters.decr(subj) + entry = await counters.get(subj) + assert entry.value == 1 + finally: + await js.counters.delete(name) + + +async def test_counters_separate_subjects(js: JetStream) -> None: + name = f"test-cnt-sep-{uuid.uuid4().hex[:8]}" + subj_a = f"{name}.a" + subj_b = f"{name}.b" + config = CountersConfig(name=name, subjects=[f"{name}.>"]) + counters = await js.counters.create(config) + try: + await counters.add(subj_a, 10) + await counters.add(subj_b, 20) + entry_a = await counters.get(subj_a) + entry_b = await counters.get(subj_b) + assert entry_a.value == 10 + assert entry_b.value == 20 + finally: + await js.counters.delete(name) + + +async def test_counters_get_nonexistent_key(js: JetStream) -> None: + name = f"test-cnt-nokey-{uuid.uuid4().hex[:8]}" + config = CountersConfig(name=name, subjects=[f"{name}.>"]) + counters = await js.counters.create(config) + try: + with pytest.raises(Exception): + await counters.get(f"{name}.nonexistent") + finally: + await js.counters.delete(name) + + +async def test_counters_config_description(js: JetStream) -> None: + name = f"test-cnt-desc-{uuid.uuid4().hex[:8]}" + config = CountersConfig( + name=name, + subjects=[f"{name}.>"], + description="A test counters stream", + ) + counters = await js.counters.create(config) + try: + assert isinstance(counters, Counters) + finally: + await js.counters.delete(name) + + +async def test_counters_config_defaults() -> None: + config = CountersConfig(name="test", subjects=["test.>"]) + assert config.name == "test" + assert config.subjects == ["test.>"] + assert config.description is None + assert config.max_bytes is not None + assert config.max_messages is not None From d4bc5a4f5f0ddb8b97b5a73374eefb472e42224d Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Thu, 26 Mar 2026 13:45:21 +0100 Subject: [PATCH 2/2] Fixed interfaces and tests. --- python/natsrpy/_natsrpy_rs/js/managers.pyi | 2 +- python/tests/test_counters.py | 6 ++---- src/js/managers/counters.rs | 4 ++-- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/python/natsrpy/_natsrpy_rs/js/managers.pyi b/python/natsrpy/_natsrpy_rs/js/managers.pyi index 3e5af25..1eca8e1 100644 --- a/python/natsrpy/_natsrpy_rs/js/managers.pyi +++ b/python/natsrpy/_natsrpy_rs/js/managers.pyi @@ -91,7 +91,7 @@ class CountersManager: :return: True if the stream was deleted. """ - async def update(self, config: StreamConfig) -> Counters: + async def update(self, config: CountersConfig) -> Counters: """Update an existing counters stream configuration. :param config: new stream configuration. diff --git a/python/tests/test_counters.py b/python/tests/test_counters.py index 6036a5d..5c27fbe 100644 --- a/python/tests/test_counters.py +++ b/python/tests/test_counters.py @@ -1,7 +1,7 @@ import uuid import pytest -from natsrpy.js import CounterEntry, Counters, CountersConfig, JetStream, StreamConfig +from natsrpy.js import CounterEntry, Counters, CountersConfig, JetStream async def test_counters_create(js: JetStream) -> None: @@ -51,11 +51,9 @@ async def test_counters_update(js: JetStream) -> None: config = CountersConfig(name=name, subjects=[f"{name}.>"]) await js.counters.create(config) try: - update_cfg = StreamConfig( + update_cfg = CountersConfig( name=name, subjects=[f"{name}.>"], - allow_direct=True, - allow_message_counter=True, description="updated description", ) counters = await js.counters.update(update_cfg) diff --git a/src/js/managers/counters.rs b/src/js/managers/counters.rs index 498c46f..c54fa83 100644 --- a/src/js/managers/counters.rs +++ b/src/js/managers/counters.rs @@ -7,7 +7,7 @@ use crate::{ use pyo3::{Bound, PyAny, Python}; use tokio::sync::RwLock; -use crate::{exceptions::rust_err::NatsrpyResult, js::stream::StreamConfig, utils::natsrpy_future}; +use crate::{exceptions::rust_err::NatsrpyResult, utils::natsrpy_future}; #[pyo3::pyclass] pub struct CountersManager { @@ -87,7 +87,7 @@ impl CountersManager { pub fn update<'py>( &self, py: Python<'py>, - config: StreamConfig, + config: CountersConfig, ) -> NatsrpyResult> { let ctx = self.ctx.clone(); natsrpy_future(py, async move {