forked from frequenz-floss/frequenz-client-assets-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path_client.py
More file actions
185 lines (157 loc) · 6.79 KB
/
_client.py
File metadata and controls
185 lines (157 loc) · 6.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# License: MIT
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
"""
Assets API client.
This module provides a client for the Assets API.
"""
from __future__ import annotations
from collections.abc import Iterable
from frequenz.api.assets.v1 import assets_pb2, assets_pb2_grpc
from frequenz.client.base import channel
from frequenz.client.base.client import BaseApiClient, call_stub_method
from frequenz.client.common.microgrid import MicrogridId
from frequenz.client.common.microgrid.electrical_components import ElectricalComponentId
from ._microgrid import Microgrid
from ._microgrid_proto import microgrid_from_proto
from .electrical_component._connection import ComponentConnection
from .electrical_component._connection_proto import component_connection_from_proto
from .electrical_component._electrical_component import ElectricalComponent
from .electrical_component._electrical_component_proto import electrical_component_proto
from .exceptions import ClientNotConnected
DEFAULT_GRPC_CALL_TIMEOUT = 60.0
"""The default timeout for gRPC calls made by this client (in seconds)."""
class AssetsApiClient(
BaseApiClient[assets_pb2_grpc.PlatformAssetsStub]
): # pylint: disable=too-many-arguments
"""A client for the Assets API."""
def __init__(
self,
server_url: str,
*,
auth_key: str | None = None,
sign_secret: str | None = None,
channel_defaults: channel.ChannelOptions = channel.ChannelOptions(),
connect: bool = True,
) -> None:
"""
Initialize the AssetsApiClient.
Args:
server_url: The location of the microgrid API server in the form of a URL.
The following format is expected:
"grpc://hostname{:`port`}{?ssl=`ssl`}",
where the `port` should be an int between 0 and 65535 (defaulting to
9090) and `ssl` should be a boolean (defaulting to `true`).
For example: `grpc://localhost:1090?ssl=true`.
auth_key: The authentication key to use for the connection.
sign_secret: The secret to use for signing requests.
channel_defaults: The default options use to create the channel when not
specified in the URL.
connect: Whether to connect to the server as soon as a client instance is
created. If `False`, the client will not connect to the server until
[connect()][frequenz.client.base.client.BaseApiClient.connect] is
called.
"""
super().__init__(
server_url,
assets_pb2_grpc.PlatformAssetsStub,
connect=connect,
channel_defaults=channel_defaults,
auth_key=auth_key,
sign_secret=sign_secret,
)
@property
def stub(self) -> assets_pb2_grpc.PlatformAssetsAsyncStub:
"""
The gRPC stub for the Assets API.
Returns:
The gRPC stub for the Assets API.
Raises:
ClientNotConnected: If the client is not connected to the server.
"""
if self._channel is None or self._stub is None:
raise ClientNotConnected(server_url=self.server_url, operation="stub")
# This type: ignore is needed because the stub is a sync stub, but we need to
# use the async stub, so we cast the sync stub to the async stub.
return self._stub # type: ignore
async def get_microgrid( # noqa: DOC502 (raises ApiClientError indirectly)
self, microgrid_id: MicrogridId
) -> Microgrid:
"""
Get the details of a microgrid.
Args:
microgrid_id: The ID of the microgrid to get the details of.
Returns:
The details of the microgrid.
Raises:
ApiClientError: If there are any errors communicating with the Assets API,
most likely a subclass of [GrpcError][frequenz.client.base.exception.GrpcError].
"""
response = await call_stub_method(
self,
lambda: self.stub.GetMicrogrid(
assets_pb2.GetMicrogridRequest(microgrid_id=int(microgrid_id)),
timeout=DEFAULT_GRPC_CALL_TIMEOUT,
),
method_name="GetMicrogrid",
)
return microgrid_from_proto(response.microgrid)
async def list_microgrid_electrical_components(
self, microgrid_id: MicrogridId
) -> list[ElectricalComponent]:
"""
Get the electrical components of a microgrid.
Args:
microgrid_id: The ID of the microgrid to get the electrical components of.
Returns:
The electrical components of the microgrid.
"""
response = await call_stub_method(
self,
lambda: self.stub.ListMicrogridElectricalComponents(
assets_pb2.ListMicrogridElectricalComponentsRequest(
microgrid_id=int(microgrid_id),
),
timeout=DEFAULT_GRPC_CALL_TIMEOUT,
),
method_name="ListMicrogridElectricalComponents",
)
return [
electrical_component_proto(component) for component in response.components
]
async def list_microgrid_electrical_component_connections(
self,
microgrid_id: MicrogridId,
source_component_ids: Iterable[ElectricalComponentId] = (),
destination_component_ids: Iterable[ElectricalComponentId] = (),
) -> list[ComponentConnection | None]:
"""
Get the electrical component connections of a microgrid.
Args:
microgrid_id: The ID of the microgrid to get the electrical
component connections of.
source_component_ids: Only return connections that originate from
these component IDs. If None or empty, no filtering is applied.
destination_component_ids: Only return connections that terminate at
these component IDs. If None or empty, no filtering is applied.
Returns:
The electrical component connections of the microgrid.
"""
request = assets_pb2.ListMicrogridElectricalComponentConnectionsRequest(
microgrid_id=int(microgrid_id),
source_component_ids=(int(c) for c in source_component_ids),
destination_component_ids=(int(c) for c in destination_component_ids),
)
response = await call_stub_method(
self,
lambda: self.stub.ListMicrogridElectricalComponentConnections(
request,
timeout=DEFAULT_GRPC_CALL_TIMEOUT,
),
method_name="ListMicrogridElectricalComponentConnections",
)
return list(
map(
component_connection_from_proto,
filter(bool, response.connections),
)
)