@@ -21,50 +21,29 @@ async def js(nats: Nats) -> JetStream:
2121
2222
2323async def test_pull_consumer_create (js : JetStream ) -> None :
24- stream_name = f"test-pcreate-{ uuid .uuid4 (). hex [: 8 ] } "
24+ stream_name = f"test-pcreate-{ uuid .uuid4 ()} "
2525 config = StreamConfig (name = stream_name , subjects = [f"{ stream_name } .>" ])
2626 stream = await js .streams .create (config )
2727 try :
2828 consumer_config = PullConsumerConfig (
29- name = f"consumer-{ uuid .uuid4 (). hex [: 8 ] } " ,
29+ name = f"consumer-{ uuid .uuid4 ()} " ,
3030 )
3131 consumer = await stream .consumers .create (consumer_config )
3232 assert isinstance (consumer , PullConsumer )
3333 finally :
3434 await js .streams .delete (stream_name )
3535
3636
37- async def test_pull_consumer_fetch (js : JetStream ) -> None :
38- stream_name = f"test-pfetch-{ uuid .uuid4 ().hex [:8 ]} "
39- subj = f"{ stream_name } .data"
40- config = StreamConfig (name = stream_name , subjects = [f"{ stream_name } .>" ])
41- stream = await js .streams .create (config )
42- try :
43- await js .publish (subj , b"fetch-msg-1" )
44- await js .publish (subj , b"fetch-msg-2" )
45-
46- consumer_config = PullConsumerConfig (
47- name = f"consumer-{ uuid .uuid4 ().hex [:8 ]} " ,
48- )
49- consumer = await stream .consumers .create (consumer_config )
50- messages = await consumer .fetch (max_messages = 2 , timeout = 5.0 )
51- assert len (messages ) == 2
52- assert messages [0 ].payload == b"fetch-msg-1"
53- assert messages [1 ].payload == b"fetch-msg-2"
54- finally :
55- await js .streams .delete (stream_name )
56-
57-
5837async def test_pull_consumer_fetch_with_ack (js : JetStream ) -> None :
59- stream_name = f"test-pack-{ uuid .uuid4 (). hex [: 8 ] } "
38+ stream_name = f"test-pack-{ uuid .uuid4 ()} "
6039 subj = f"{ stream_name } .data"
6140 config = StreamConfig (name = stream_name , subjects = [f"{ stream_name } .>" ])
6241 stream = await js .streams .create (config )
6342 try :
6443 await js .publish (subj , b"ack-msg" )
6544
6645 consumer_config = PullConsumerConfig (
67- name = f"consumer-{ uuid .uuid4 (). hex [: 8 ] } " ,
46+ name = f"consumer-{ uuid .uuid4 ()} " ,
6847 ack_policy = AckPolicy .EXPLICIT ,
6948 )
7049 consumer = await stream .consumers .create (consumer_config )
@@ -76,15 +55,15 @@ async def test_pull_consumer_fetch_with_ack(js: JetStream) -> None:
7655
7756
7857async def test_pull_consumer_nack (js : JetStream ) -> None :
79- stream_name = f"test-pnack-{ uuid .uuid4 (). hex [: 8 ] } "
58+ stream_name = f"test-pnack-{ uuid .uuid4 ()} "
8059 subj = f"{ stream_name } .data"
8160 config = StreamConfig (name = stream_name , subjects = [f"{ stream_name } .>" ])
8261 stream = await js .streams .create (config )
8362 try :
8463 await js .publish (subj , b"nack-msg" )
8564
8665 consumer_config = PullConsumerConfig (
87- name = f"consumer-{ uuid .uuid4 (). hex [: 8 ] } " ,
66+ name = f"consumer-{ uuid .uuid4 ()} " ,
8867 ack_policy = AckPolicy .EXPLICIT ,
8968 )
9069 consumer = await stream .consumers .create (consumer_config )
@@ -96,15 +75,15 @@ async def test_pull_consumer_nack(js: JetStream) -> None:
9675
9776
9877async def test_pull_consumer_term (js : JetStream ) -> None :
99- stream_name = f"test-pterm-{ uuid .uuid4 (). hex [: 8 ] } "
78+ stream_name = f"test-pterm-{ uuid .uuid4 ()} "
10079 subj = f"{ stream_name } .data"
10180 config = StreamConfig (name = stream_name , subjects = [f"{ stream_name } .>" ])
10281 stream = await js .streams .create (config )
10382 try :
10483 await js .publish (subj , b"term-msg" )
10584
10685 consumer_config = PullConsumerConfig (
107- name = f"consumer-{ uuid .uuid4 (). hex [: 8 ] } " ,
86+ name = f"consumer-{ uuid .uuid4 ()} " ,
10887 ack_policy = AckPolicy .EXPLICIT ,
10988 )
11089 consumer = await stream .consumers .create (consumer_config )
@@ -116,15 +95,15 @@ async def test_pull_consumer_term(js: JetStream) -> None:
11695
11796
11897async def test_pull_consumer_progress (js : JetStream ) -> None :
119- stream_name = f"test-pprog-{ uuid .uuid4 (). hex [: 8 ] } "
98+ stream_name = f"test-pprog-{ uuid .uuid4 ()} "
12099 subj = f"{ stream_name } .data"
121100 config = StreamConfig (name = stream_name , subjects = [f"{ stream_name } .>" ])
122101 stream = await js .streams .create (config )
123102 try :
124103 await js .publish (subj , b"progress-msg" )
125104
126105 consumer_config = PullConsumerConfig (
127- name = f"consumer-{ uuid .uuid4 (). hex [: 8 ] } " ,
106+ name = f"consumer-{ uuid .uuid4 ()} " ,
128107 ack_policy = AckPolicy .EXPLICIT ,
129108 )
130109 consumer = await stream .consumers .create (consumer_config )
@@ -137,14 +116,14 @@ async def test_pull_consumer_progress(js: JetStream) -> None:
137116
138117
139118async def test_pull_consumer_message_properties (js : JetStream ) -> None :
140- stream_name = f"test-pmsgprop-{ uuid .uuid4 (). hex [: 8 ] } "
119+ stream_name = f"test-pmsgprop-{ uuid .uuid4 ()} "
141120 subj = f"{ stream_name } .data"
142121 config = StreamConfig (name = stream_name , subjects = [f"{ stream_name } .>" ])
143122 stream = await js .streams .create (config )
144123 try :
145124 await js .publish (subj , b"prop-msg" )
146125
147- consumer_name = f"consumer-{ uuid .uuid4 (). hex [: 8 ] } "
126+ consumer_name = f"consumer-{ uuid .uuid4 ()} "
148127 consumer_config = PullConsumerConfig (name = consumer_name )
149128 consumer = await stream .consumers .create (consumer_config )
150129 messages = await consumer .fetch (max_messages = 1 , timeout = 5.0 )
@@ -166,15 +145,15 @@ async def test_pull_consumer_message_properties(js: JetStream) -> None:
166145
167146
168147async def test_pull_consumer_with_filter_subject (js : JetStream ) -> None :
169- stream_name = f"test-pfilter-{ uuid .uuid4 (). hex [: 8 ] } "
148+ stream_name = f"test-pfilter-{ uuid .uuid4 ()} "
170149 config = StreamConfig (name = stream_name , subjects = [f"{ stream_name } .>" ])
171150 stream = await js .streams .create (config )
172151 try :
173152 await js .publish (f"{ stream_name } .a" , b"msg-a" )
174153 await js .publish (f"{ stream_name } .b" , b"msg-b" )
175154
176155 consumer_config = PullConsumerConfig (
177- name = f"consumer-{ uuid .uuid4 (). hex [: 8 ] } " ,
156+ name = f"consumer-{ uuid .uuid4 ()} " ,
178157 filter_subject = f"{ stream_name } .a" ,
179158 )
180159 consumer = await stream .consumers .create (consumer_config )
@@ -187,7 +166,7 @@ async def test_pull_consumer_with_filter_subject(js: JetStream) -> None:
187166
188167
189168async def test_pull_consumer_deliver_policy (js : JetStream ) -> None :
190- stream_name = f"test-pdeliver-{ uuid .uuid4 (). hex [: 8 ] } "
169+ stream_name = f"test-pdeliver-{ uuid .uuid4 ()} "
191170 subj = f"{ stream_name } .data"
192171 config = StreamConfig (name = stream_name , subjects = [f"{ stream_name } .>" ])
193172 stream = await js .streams .create (config )
@@ -196,7 +175,7 @@ async def test_pull_consumer_deliver_policy(js: JetStream) -> None:
196175 await js .publish (subj , b"new-msg" )
197176
198177 consumer_config = PullConsumerConfig (
199- name = f"consumer-{ uuid .uuid4 (). hex [: 8 ] } " ,
178+ name = f"consumer-{ uuid .uuid4 ()} " ,
200179 deliver_policy = DeliverPolicy .LAST ,
201180 )
202181 consumer = await stream .consumers .create (consumer_config )
@@ -208,12 +187,12 @@ async def test_pull_consumer_deliver_policy(js: JetStream) -> None:
208187
209188
210189async def test_pull_consumer_replay_policy (js : JetStream ) -> None :
211- stream_name = f"test-preplay-{ uuid .uuid4 (). hex [: 8 ] } "
190+ stream_name = f"test-preplay-{ uuid .uuid4 ()} "
212191 config = StreamConfig (name = stream_name , subjects = [f"{ stream_name } .>" ])
213192 stream = await js .streams .create (config )
214193 try :
215194 consumer_config = PullConsumerConfig (
216- name = f"consumer-{ uuid .uuid4 (). hex [: 8 ] } " ,
195+ name = f"consumer-{ uuid .uuid4 ()} " ,
217196 replay_policy = ReplayPolicy .INSTANT ,
218197 )
219198 consumer = await stream .consumers .create (consumer_config )
@@ -223,12 +202,11 @@ async def test_pull_consumer_replay_policy(js: JetStream) -> None:
223202
224203
225204async def test_pull_consumer_durable (js : JetStream ) -> None :
226- stream_name = f"test-pdurable-{ uuid .uuid4 ().hex [:8 ]} "
227- subj = f"{ stream_name } .data"
205+ stream_name = f"test-pdurable-{ uuid .uuid4 ()} "
228206 config = StreamConfig (name = stream_name , subjects = [f"{ stream_name } .>" ])
229207 stream = await js .streams .create (config )
230208 try :
231- durable_name = f"durable-{ uuid .uuid4 (). hex [: 8 ] } "
209+ durable_name = f"durable-{ uuid .uuid4 ()} "
232210 consumer_config = PullConsumerConfig (
233211 durable_name = durable_name ,
234212 )
@@ -242,41 +220,59 @@ async def test_pull_consumer_durable(js: JetStream) -> None:
242220
243221
244222async def test_push_consumer_create (js : JetStream ) -> None :
245- stream_name = f"test-pushcreate-{ uuid .uuid4 (). hex [: 8 ] } "
223+ stream_name = f"test-pushcreate-{ uuid .uuid4 ()} "
246224 config = StreamConfig (name = stream_name , subjects = [f"{ stream_name } .>" ])
247225 stream = await js .streams .create (config )
248226 try :
249227 deliver_subj = uuid .uuid4 ().hex
250228 consumer_config = PushConsumerConfig (
251229 deliver_subject = deliver_subj ,
252- name = f"consumer-{ uuid .uuid4 (). hex [: 8 ] } " ,
230+ name = f"consumer-{ uuid .uuid4 ()} " ,
253231 )
254232 consumer = await stream .consumers .create (consumer_config )
255233 assert isinstance (consumer , PushConsumer )
256234 finally :
257235 await js .streams .delete (stream_name )
258236
259237
238+ async def test_pull_consumer_messages (js : JetStream ) -> None :
239+ stream_name = f"test-pushmsg-{ uuid .uuid4 ()} "
240+ subj = f"{ stream_name } .data"
241+ config = StreamConfig (name = stream_name , subjects = [f"{ stream_name } .>" ])
242+ messages = [uuid .uuid4 ().hex .encode (), uuid .uuid4 ().hex .encode ()]
243+ stream = await js .streams .create (config )
244+ try :
245+ for message in messages :
246+ await js .publish (subj , message )
247+ consumer_config = PullConsumerConfig (name = f"consumer-{ uuid .uuid4 ()} " )
248+ consumer = await stream .consumers .create (consumer_config )
249+ msgs_iter = await consumer .fetch (timeout = 0.5 )
250+ for nats_msg , payload in zip (msgs_iter , messages , strict = True ):
251+ assert nats_msg .payload == payload
252+ finally :
253+ await js .streams .delete (stream_name )
254+
255+
260256async def test_push_consumer_messages (js : JetStream ) -> None :
261- stream_name = f"test-pushmsg-{ uuid .uuid4 (). hex [: 8 ] } "
257+ stream_name = f"test-pushmsg-{ uuid .uuid4 ()} "
262258 subj = f"{ stream_name } .data"
263259 config = StreamConfig (name = stream_name , subjects = [f"{ stream_name } .>" ])
260+ messages = [uuid .uuid4 ().hex .encode (), uuid .uuid4 ().hex .encode ()]
264261 stream = await js .streams .create (config )
265262 try :
266- await js . publish ( subj , b"push-msg-1" )
267- await js .publish (subj , b"push-msg-2" )
263+ for message in messages :
264+ await js .publish (subj , message )
268265
269266 deliver_subj = uuid .uuid4 ().hex
270267 consumer_config = PushConsumerConfig (
271268 deliver_subject = deliver_subj ,
272- name = f"consumer-{ uuid .uuid4 (). hex [: 8 ] } " ,
269+ name = f"consumer-{ uuid .uuid4 ()} " ,
273270 )
274271 consumer = await stream .consumers .create (consumer_config )
275272 msgs_iter = await consumer .messages ()
276- msg1 = await msgs_iter .next (timeout = 5.0 )
277- msg2 = await msgs_iter .next (timeout = 5.0 )
278- assert msg1 .payload == b"push-msg-1"
279- assert msg2 .payload == b"push-msg-2"
273+ for message in messages :
274+ nats_msg = await msgs_iter .next (timeout = 0.5 )
275+ assert message == nats_msg .payload
280276 finally :
281277 await js .streams .delete (stream_name )
282278
0 commit comments