Skip to content

Commit 2d2bd0b

Browse files
committed
stream() Return streamer object instead of receiver object
Signed-off-by: Mathias L. Baumann <mathias.baumann@frequenz.com>
1 parent e78ccc4 commit 2d2bd0b

File tree

2 files changed

+16
-13
lines changed

2 files changed

+16
-13
lines changed

src/frequenz/client/dispatch/_client.py

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,11 @@
3131
UpdateMicrogridDispatchResponse,
3232
)
3333

34-
from frequenz import channels
3534
from frequenz.client.base.channel import ChannelOptions, SslOptions
3635
from frequenz.client.base.client import BaseApiClient
3736
from frequenz.client.base.conversion import to_timestamp
3837
from frequenz.client.base.exception import ClientNotConnected
39-
from frequenz.client.base.retry import LinearBackoff
38+
from frequenz.client.base.retry import LinearBackoff, Strategy
4039
from frequenz.client.base.streaming import GrpcStreamBroadcaster
4140

4241
from ._internal_types import DispatchCreateRequest
@@ -61,13 +60,16 @@ def __init__(
6160
server_url: str,
6261
key: str,
6362
connect: bool = True,
63+
stream_retry_strategy: Strategy | None = None,
6464
) -> None:
6565
"""Initialize the client.
6666
6767
Args:
6868
server_url: The URL of the server to connect to.
6969
key: API key to use for authentication.
70-
connect: Whether to connect to the service immediately.
70+
connect: Whether to connect to the service immediately (default: True).
71+
stream_retry_strategy: The retry strategy to use for streaming
72+
methods, defaults to `LinearBackoff` with 1 second interval and no limit.
7173
"""
7274
super().__init__(
7375
server_url,
@@ -89,6 +91,11 @@ def __init__(
8991
self._streams: dict[
9092
int, GrpcStreamBroadcaster[StreamMicrogridDispatchesResponse, DispatchEvent]
9193
] = {}
94+
self._stream_retry_strategy = (
95+
LinearBackoff(interval=1, limit=None)
96+
if stream_retry_strategy is None
97+
else stream_retry_strategy.copy()
98+
)
9299
"""A dictionary of streamers, keyed by microgrid_id."""
93100

94101
@property
@@ -197,7 +204,9 @@ def to_interval(
197204
else:
198205
break
199206

200-
def stream(self, microgrid_id: int) -> channels.Receiver[DispatchEvent]:
207+
def stream(
208+
self, microgrid_id: int
209+
) -> GrpcStreamBroadcaster[StreamMicrogridDispatchesResponse, DispatchEvent]:
201210
"""Receive a stream of dispatch events.
202211
203212
This function returns a receiver channel that can be used to receive
@@ -216,14 +225,8 @@ def stream(self, microgrid_id: int) -> channels.Receiver[DispatchEvent]:
216225
microgrid_id: The microgrid_id to receive dispatches for.
217226
218227
Returns:
219-
A receiver channel to receive the stream of dispatch events.
228+
A broadcaster to receive the stream of dispatch events.
220229
"""
221-
return self._get_stream(microgrid_id).new_receiver()
222-
223-
def _get_stream(
224-
self, microgrid_id: int
225-
) -> GrpcStreamBroadcaster[StreamMicrogridDispatchesResponse, DispatchEvent]:
226-
"""Get an instance to the streaming helper."""
227230
broadcaster = self._streams.get(microgrid_id)
228231
# pylint: disable=protected-access
229232
if broadcaster is not None and broadcaster._channel.is_closed:
@@ -241,7 +244,7 @@ def _get_stream(
241244
),
242245
),
243246
transform=DispatchEvent.from_protobuf,
244-
retry_strategy=LinearBackoff(interval=1, limit=None),
247+
retry_strategy=self._stream_retry_strategy.copy(),
245248
)
246249
self._streams[microgrid_id] = broadcaster
247250

tests/test_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ async def test_dispatch_stream(client: FakeClient, sample: Dispatch) -> None:
307307
microgrid_id = random.randint(1, 100)
308308
dispatches = [sample, sample, sample]
309309

310-
stream = client.stream(microgrid_id)
310+
stream = client.stream(microgrid_id).new_receiver()
311311

312312
async def expect(dispatch: Dispatch, event: Event) -> None:
313313
message = await stream.receive()

0 commit comments

Comments
 (0)