diff --git a/python/natsrpy/_natsrpy_rs/js/kv.pyi b/python/natsrpy/_natsrpy_rs/js/kv.pyi index 0c38e2b..187a255 100644 --- a/python/natsrpy/_natsrpy_rs/js/kv.pyi +++ b/python/natsrpy/_natsrpy_rs/js/kv.pyi @@ -122,10 +122,10 @@ class KVConfig: """ bucket: str - description: str + description: str | None max_value_size: int | None history: int | None - max_age: float | None + max_age: timedelta | None max_bytes: int | None storage: StorageType | None num_replicas: int | None @@ -135,7 +135,7 @@ class KVConfig: mirror_direct: bool | None compression: bool | None placement: Placement | None - limit_markers: float | None + limit_markers: timedelta | None def __new__( cls, diff --git a/python/natsrpy/_natsrpy_rs/js/stream.pyi b/python/natsrpy/_natsrpy_rs/js/stream.pyi index ca9c236..de0fbe1 100644 --- a/python/natsrpy/_natsrpy_rs/js/stream.pyi +++ b/python/natsrpy/_natsrpy_rs/js/stream.pyi @@ -132,8 +132,6 @@ class SubjectTransform: source: str destination: str - def __new__(cls, source: str, destination: str) -> Self: ... - @final class Source: """Configuration for a stream source or mirror origin. @@ -454,7 +452,7 @@ class StreamInfo: """ config: StreamConfig - created: float + created: int state: StreamState cluster: ClusterInfo | None mirror: SourceInfo | None diff --git a/python/tests/test_consumers.py b/python/tests/test_consumers.py index e0b20df..0f84f1d 100644 --- a/python/tests/test_consumers.py +++ b/python/tests/test_consumers.py @@ -304,3 +304,43 @@ async def test_push_consumer_config_properties() -> None: assert config.description == "push test" assert config.ack_policy == AckPolicy.EXPLICIT assert config.deliver_policy == DeliverPolicy.NEW + + +async def test_pull_consumer_consume_context_manager(js: JetStream) -> None: + stream_name = f"test-pullctx-{uuid.uuid4().hex[:8]}" + subj = f"{stream_name}.data" + config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"]) + stream = await js.streams.create(config) + try: + await js.publish(subj, b"consume-msg", wait=True) + consumer = await stream.consumers.create( + PullConsumerConfig(name=f"consumer-{uuid.uuid4().hex[:8]}"), + ) + async with consumer.consume() as fetcher: + msg = await anext(fetcher) + assert msg.payload == b"consume-msg" + await msg.ack() + finally: + await js.streams.delete(stream_name) + + +async def test_push_consumer_consume_context_manager(js: JetStream) -> None: + stream_name = f"test-pushctx-{uuid.uuid4().hex[:8]}" + subj = f"{stream_name}.data" + config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"]) + stream = await js.streams.create(config) + try: + await js.publish(subj, b"push-consume-msg", wait=True) + deliver_subj = uuid.uuid4().hex + consumer = await stream.consumers.create( + PushConsumerConfig( + deliver_subject=deliver_subj, + name=f"push-{uuid.uuid4().hex[:8]}", + ), + ) + async with consumer.consume() as msgs: + msg = await anext(msgs) + assert msg.payload == b"push-consume-msg" + await msg.ack() + finally: + await js.streams.delete(stream_name) diff --git a/python/tests/test_jetstream.py b/python/tests/test_jetstream.py index 4fac22a..d18ef7e 100644 --- a/python/tests/test_jetstream.py +++ b/python/tests/test_jetstream.py @@ -48,3 +48,7 @@ async def test_jetstream_publish_with_headers(js: JetStream) -> None: await js.publish(subj, b"with-headers", headers={"x-test": "value"}, wait=True) finally: await js.streams.delete(stream_name) + + +async def test_jetstream_has_counters_manager(js: JetStream) -> None: + assert js.counters is not None diff --git a/python/tests/test_js_message_acks.py b/python/tests/test_js_message_acks.py index bc4b3bc..16722a6 100644 --- a/python/tests/test_js_message_acks.py +++ b/python/tests/test_js_message_acks.py @@ -1,4 +1,5 @@ import uuid +from datetime import datetime from natsrpy.js import ( AckPolicy, @@ -166,3 +167,107 @@ async def test_message_headers_empty(js: JetStream) -> None: assert isinstance(messages[0].headers, dict) finally: await js.streams.delete(stream_name) + + +async def test_message_stream_sequence_and_consumer_sequence(js: JetStream) -> None: + stream_name = f"test-seqs-{uuid.uuid4().hex[:8]}" + subj = f"{stream_name}.data" + config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"]) + stream = await js.streams.create(config) + try: + await js.publish(subj, b"seq-msg", wait=True) + consumer = await stream.consumers.create( + PullConsumerConfig(name=f"consumer-{uuid.uuid4().hex[:8]}"), + ) + messages = await consumer.fetch(max_messages=1, timeout=5.0) + assert len(messages) == 1 + msg = messages[0] + assert isinstance(msg.stream_sequence, int) + assert msg.stream_sequence >= 1 + assert isinstance(msg.consumer_sequence, int) + assert msg.consumer_sequence >= 1 + finally: + await js.streams.delete(stream_name) + + +async def test_message_consumer_and_stream_names(js: JetStream) -> None: + stream_name = f"test-names-{uuid.uuid4().hex[:8]}" + subj = f"{stream_name}.data" + config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"]) + stream = await js.streams.create(config) + try: + await js.publish(subj, b"names-msg", wait=True) + consumer_name = f"consumer-{uuid.uuid4().hex[:8]}" + consumer = await stream.consumers.create( + PullConsumerConfig(name=consumer_name), + ) + messages = await consumer.fetch(max_messages=1, timeout=5.0) + assert len(messages) == 1 + msg = messages[0] + assert msg.stream == stream_name + assert msg.consumer == consumer_name + finally: + await js.streams.delete(stream_name) + + +async def test_message_delivered_and_pending(js: JetStream) -> None: + stream_name = f"test-delpend-{uuid.uuid4().hex[:8]}" + subj = f"{stream_name}.data" + config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"]) + stream = await js.streams.create(config) + try: + await js.publish(subj, b"msg-1", wait=True) + await js.publish(subj, b"msg-2", wait=True) + consumer = await stream.consumers.create( + PullConsumerConfig(name=f"consumer-{uuid.uuid4().hex[:8]}"), + ) + messages = await consumer.fetch(max_messages=1, timeout=5.0) + assert len(messages) == 1 + msg = messages[0] + assert isinstance(msg.delivered, int) + assert msg.delivered >= 1 + assert isinstance(msg.pending, int) + assert msg.pending >= 0 + await msg.ack() + finally: + await js.streams.delete(stream_name) + + +async def test_message_published_timestamp(js: JetStream) -> None: + stream_name = f"test-pub-ts-{uuid.uuid4().hex[:8]}" + subj = f"{stream_name}.data" + config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"]) + stream = await js.streams.create(config) + try: + await js.publish(subj, b"ts-msg", wait=True) + consumer = await stream.consumers.create( + PullConsumerConfig(name=f"consumer-{uuid.uuid4().hex[:8]}"), + ) + messages = await consumer.fetch(max_messages=1, timeout=5.0) + assert len(messages) == 1 + msg = messages[0] + assert isinstance(msg.published, datetime) + finally: + await js.streams.delete(stream_name) + + +async def test_message_length_and_dunder_len(js: JetStream) -> None: + stream_name = f"test-msglen-{uuid.uuid4().hex[:8]}" + subj = f"{stream_name}.data" + config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"]) + stream = await js.streams.create(config) + try: + payload = b"length-test-payload" + await js.publish(subj, payload, wait=True) + consumer = await stream.consumers.create( + PullConsumerConfig(name=f"consumer-{uuid.uuid4().hex[:8]}"), + ) + messages = await consumer.fetch(max_messages=1, timeout=5.0) + assert len(messages) == 1 + msg = messages[0] + assert isinstance(msg.length, int) + assert msg.length >= len(payload) + assert len(msg) == msg.length + await msg.ack() + finally: + await js.streams.delete(stream_name) diff --git a/python/tests/test_kv.py b/python/tests/test_kv.py index 7b02b52..554d5be 100644 --- a/python/tests/test_kv.py +++ b/python/tests/test_kv.py @@ -831,3 +831,41 @@ async def test_kv_operation_equality() -> None: assert KVOperation.Put != KVOperation.Delete assert KVOperation.Put != KVOperation.Purge assert KVOperation.Delete != KVOperation.Purge + + +async def test_kv_entry_seen_current(js: JetStream) -> None: + bucket = f"test-kv-seen-{uuid.uuid4().hex[:8]}" + config = KVConfig(bucket=bucket) + kv = await js.kv.create(config) + try: + # Create watcher on empty bucket, then put — the received entry is a live update + watcher = await kv.watch_all() + await kv.put("mykey", b"value1") + entry = await asyncio.wait_for(anext(watcher), timeout=5.0) + assert isinstance(entry.seen_current, bool) + # First live update on an empty bucket is marked as seen_current + assert entry.seen_current is True + finally: + await js.kv.delete(bucket) + + +async def test_kv_config_max_age_timedelta(js: JetStream) -> None: + bucket = f"test-kv-maxage-{uuid.uuid4().hex[:8]}" + max_age = timedelta(hours=1) + config = KVConfig(bucket=bucket, max_age=max_age) + assert config.max_age == max_age + kv = await js.kv.create(config) + try: + assert kv is not None + finally: + await js.kv.delete(bucket) + + +async def test_kv_config_description_none() -> None: + config = KVConfig(bucket="test-desc-none") + assert config.description is None + + +async def test_kv_config_description_set() -> None: + config = KVConfig(bucket="test-desc-set", description="my description") + assert config.description == "my description" diff --git a/python/tests/test_nats_client.py b/python/tests/test_nats_client.py index 47bacc4..06782f6 100644 --- a/python/tests/test_nats_client.py +++ b/python/tests/test_nats_client.py @@ -92,3 +92,78 @@ async def test_nats_connection_failure() -> None: nats = Nats(addrs=["localhost:19999"]) with pytest.raises(Exception): await nats.startup() + + +async def test_nats_addr_property(nats_url: str) -> None: + nats = Nats(addrs=[nats_url]) + assert nats.addr == [nats_url] + + +async def test_nats_addr_default() -> None: + nats = Nats() + assert nats.addr == ["nats://localhost:4222"] + + +async def test_nats_token_property() -> None: + nats = Nats(token="secret-token") # noqa: S106 + assert nats.token == "secret-token" # noqa: S105 + + +async def test_nats_token_default() -> None: + nats = Nats() + assert nats.token is None + + +async def test_nats_nkey_property() -> None: + nats = Nats() + assert nats.nkey is None + + +async def test_nats_user_and_pass_property() -> None: + nats = Nats(user_and_pass=("user", "pass")) + assert nats.user_and_pass == ("user", "pass") + + +async def test_nats_user_and_pass_default() -> None: + nats = Nats() + assert nats.user_and_pass is None + + +async def test_nats_custom_inbox_prefix_property() -> None: + nats = Nats(custom_inbox_prefix="_custom") + assert nats.custom_inbox_prefix == "_custom" + + +async def test_nats_custom_inbox_prefix_default() -> None: + nats = Nats() + assert nats.custom_inbox_prefix is None + + +async def test_nats_read_buffer_capacity_property() -> None: + nats = Nats(read_buffer_capacity=1024) + assert nats.read_buffer_capacity == 1024 + + +async def test_nats_read_buffer_capacity_default() -> None: + nats = Nats() + assert nats.read_buffer_capacity == 65535 + + +async def test_nats_sender_capacity_property() -> None: + nats = Nats(sender_capacity=64) + assert nats.sender_capacity == 64 + + +async def test_nats_sender_capacity_default() -> None: + nats = Nats() + assert nats.sender_capacity == 128 + + +async def test_nats_max_reconnects_property() -> None: + nats = Nats(max_reconnects=5) + assert nats.max_reconnects == 5 + + +async def test_nats_max_reconnects_default() -> None: + nats = Nats() + assert nats.max_reconnects is None diff --git a/python/tests/test_streams.py b/python/tests/test_streams.py index 2dfef3b..e1b3db6 100644 --- a/python/tests/test_streams.py +++ b/python/tests/test_streams.py @@ -252,3 +252,63 @@ async def test_stream_state_after_publish(js: JetStream) -> None: assert info.state.bytes > 0 finally: await js.streams.delete(name) + + +async def test_stream_name_property(js: JetStream) -> None: + name = f"test-sname-{uuid.uuid4().hex[:8]}" + config = StreamConfig(name=name, subjects=[f"{name}.>"]) + stream = await js.streams.create(config) + try: + assert stream.name == name + finally: + await js.streams.delete(name) + + +async def test_stream_info_created_field(js: JetStream) -> None: + name = f"test-screated-{uuid.uuid4().hex[:8]}" + config = StreamConfig(name=name, subjects=[f"{name}.>"]) + stream = await js.streams.create(config) + try: + info = await stream.get_info() + assert isinstance(info.created, (int, float)) + assert info.created > 0 + finally: + await js.streams.delete(name) + + +async def test_stream_state_subjects_count(js: JetStream) -> None: + name = f"test-ssubj-{uuid.uuid4().hex[:8]}" + config = StreamConfig(name=name, subjects=[f"{name}.>"]) + stream = await js.streams.create(config) + try: + await js.publish(f"{name}.a", b"msg-a", wait=True) + await js.publish(f"{name}.b", b"msg-b", wait=True) + info = await stream.get_info() + assert info.state.subjects_count == 2 + finally: + await js.streams.delete(name) + + +async def test_stream_state_timestamps(js: JetStream) -> None: + name = f"test-sts-{uuid.uuid4().hex[:8]}" + subj = f"{name}.data" + config = StreamConfig(name=name, subjects=[f"{name}.>"]) + stream = await js.streams.create(config) + try: + await js.publish(subj, b"ts-msg", wait=True) + info = await stream.get_info() + assert info.state.first_timestamp >= 0 + assert info.state.last_timestamp >= 0 + finally: + await js.streams.delete(name) + + +async def test_stream_state_consumer_count(js: JetStream) -> None: + name = f"test-scnt-{uuid.uuid4().hex[:8]}" + config = StreamConfig(name=name, subjects=[f"{name}.>"]) + stream = await js.streams.create(config) + try: + info = await stream.get_info() + assert info.state.consumer_count == 0 + finally: + await js.streams.delete(name) diff --git a/python/tests/test_subscriptions.py b/python/tests/test_subscriptions.py index 7aa37bd..7ba064d 100644 --- a/python/tests/test_subscriptions.py +++ b/python/tests/test_subscriptions.py @@ -123,3 +123,14 @@ async def test_fullwild_subscription(nats: Nats) -> None: msg = await anext(sub) assert msg.payload == b"full-wild" assert msg.subject == f"{prefix}.a.b.c" + + +async def test_subscription_ctx_manager_detatch(nats: Nats) -> None: + subj = uuid.uuid4().hex + ctx = nats.subscribe(subject=subj) + sub = await ctx.detatch() + assert isinstance(sub, IteratorSubscription) + await nats.publish(subj, b"detatch-test") + msg = await asyncio.wait_for(anext(sub), timeout=5.0) + assert msg.payload == b"detatch-test" + await sub.unsubscribe()