-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathrun.py
More file actions
383 lines (324 loc) · 11.6 KB
/
run.py
File metadata and controls
383 lines (324 loc) · 11.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
"""
Simvue Runs
===========
Contains a class for remotely connecting to Simvue runs, or defining
a new run given relevant arguments.
"""
import http
import typing
import pydantic
import datetime
try:
from typing import Self
except ImportError:
from typing_extensions import Self
from .base import SimvueObject, staging_check, Visibility, write_only
from simvue.api.request import (
get as sv_get,
put as sv_put,
get_json_from_response,
)
from simvue.api.url import URL
from simvue.models import FOLDER_REGEX, NAME_REGEX, DATETIME_FORMAT
Status = typing.Literal[
"lost", "failed", "completed", "terminated", "running", "created"
]
__all__ = ["Run"]
class Run(SimvueObject):
"""Class for interacting with/creating runs on the server."""
def __init__(self, identifier: str | None = None, **kwargs) -> None:
"""Initialise a Run
If an identifier is provided a connection will be made to the
object matching the identifier on the target server.
Else a new Run will be created using arguments provided in kwargs.
Parameters
----------
identifier : str, optional
the remote server unique id for the target run
**kwargs : dict
any additional arguments to be passed to the object initialiser
"""
self.visibility = Visibility(self)
super().__init__(identifier, **kwargs)
@classmethod
@pydantic.validate_call
def new(
cls,
*,
folder: typing.Annotated[str, pydantic.Field(pattern=FOLDER_REGEX)],
system: dict[str, typing.Any] | None = None,
status: typing.Literal[
"terminated", "created", "failed", "completed", "lost", "running"
] = "created",
offline: bool = False,
**kwargs,
) -> Self:
"""Create a new Run on the Simvue server.
Parameters
----------
folder : str
folder to contain this run
offline : bool, optional
create the run in offline mode, default False.
Returns
-------
Run
run object with staged changes
"""
return Run(
folder=folder,
system=system,
status=status,
_read_only=False,
_offline=offline,
**kwargs,
)
@property
@staging_check
def name(self) -> str:
"""Retrieve name associated with this run"""
return self._get_attribute("name")
def delete(self, **kwargs) -> dict[str, typing.Any]:
# Any metric entries need to also be removed
return super().delete(_linked_objects=["metrics", "events"], **kwargs)
@name.setter
@write_only
@pydantic.validate_call
def name(
self, name: typing.Annotated[str, pydantic.Field(pattern=NAME_REGEX)]
) -> None:
"""Set the name for this run."""
self._staging["name"] = name
@property
@staging_check
def tags(self) -> list[str]:
"""Retrieve the tags associated with this run."""
return self._get_attribute("tags")
@tags.setter
@write_only
@pydantic.validate_call
def tags(self, tags: list[str]) -> None:
"""Set the tags for this run."""
self._staging["tags"] = tags
@property
@staging_check
def status(self) -> Status:
"""Get the run status."""
return self._get_attribute("status")
@status.setter
@write_only
@pydantic.validate_call
def status(self, status: Status) -> None:
"""Set the run status."""
self._staging["status"] = status
@property
@staging_check
def ttl(self) -> int:
"""Return the retention period for this run"""
return self._get_attribute("ttl")
@ttl.setter
@write_only
@pydantic.validate_call
def ttl(self, time_seconds: pydantic.NonNegativeInt | None) -> None:
"""Update the retention period for this run"""
self._staging["ttl"] = time_seconds
@property
@staging_check
def folder(self) -> str:
"""Get the folder associated with this run."""
return self._get_attribute("folder")
@folder.setter
@write_only
@pydantic.validate_call
def folder(
self, folder: typing.Annotated[str, pydantic.Field(pattern=FOLDER_REGEX)]
) -> None:
"""Set the folder for this run."""
self._staging["folder"] = folder
@property
@staging_check
def metadata(self) -> dict[str, typing.Any]:
"""Get the metadata for this run."""
return self._get_attribute("metadata")
@metadata.setter
@write_only
@pydantic.validate_call
def metadata(self, metadata: dict[str, typing.Any]) -> None:
"""Set the metadata for this run."""
self._staging["metadata"] = metadata
@property
@staging_check
def description(self) -> str:
"""Get the description for this run."""
return self._get_attribute("description")
@description.setter
@write_only
@pydantic.validate_call
def description(self, description: str | None) -> None:
"""Set the description for this run."""
self._staging["description"] = description
@property
def system(self) -> dict[str, typing.Any]:
"""Get the system metadata for this run."""
return self._get_attribute("system")
@system.setter
@write_only
@pydantic.validate_call
def system(self, system: dict[str, typing.Any]) -> None:
"""Set the system metadata for this run."""
self._staging["system"] = system
@property
@staging_check
def heartbeat_timeout(self) -> int | None:
"""Get the timeout for the heartbeat of this run."""
return self._get_attribute("heartbeat_timeout")
@heartbeat_timeout.setter
@write_only
@pydantic.validate_call
def heartbeat_timeout(self, time_seconds: int | None) -> None:
self._staging["heartbeat_timeout"] = time_seconds
@property
@staging_check
def notifications(self) -> typing.Literal["none", "all", "error", "lost"]:
return self._get_attribute("notifications")["state"]
@notifications.setter
@write_only
@pydantic.validate_call
def notifications(
self, notifications: typing.Literal["none", "all", "error", "lost"]
) -> None:
self._staging["notifications"] = {"state": notifications}
@property
@staging_check
def alerts(self) -> typing.Generator[str, None, None]:
for alert in self.get_alert_details():
yield alert["id"]
def get_alert_details(self) -> typing.Generator[dict[str, typing.Any], None, None]:
"""Retrieve the full details of alerts for this run"""
for alert in self._get_attribute("alerts"):
yield alert["alert"]
@alerts.setter
@write_only
@pydantic.validate_call
def alerts(self, alerts: list[str]) -> None:
self._staging["alerts"] = [
alert for alert in alerts if alert not in self._staging.get("alerts", [])
]
@property
@staging_check
def created(self) -> datetime.datetime | None:
"""Retrieve created datetime for the run"""
_created: str | None = self._get_attribute("created")
return (
datetime.datetime.strptime(_created, DATETIME_FORMAT) if _created else None
)
@property
@staging_check
def started(self) -> datetime.datetime | None:
"""Retrieve started datetime for the run"""
_started: str | None = self._get_attribute("started")
return (
datetime.datetime.strptime(_started, DATETIME_FORMAT) if _started else None
)
@started.setter
@write_only
@pydantic.validate_call
def started(self, started: datetime.datetime) -> None:
self._staging["started"] = started.strftime(DATETIME_FORMAT)
@property
@staging_check
def endtime(self) -> datetime.datetime | None:
"""Retrieve endtime datetime for the run"""
_endtime: str | None = self._get_attribute("endtime")
return (
datetime.datetime.strptime(_endtime, DATETIME_FORMAT) if _endtime else None
)
@endtime.setter
@write_only
@pydantic.validate_call
def endtime(self, endtime: datetime.datetime) -> None:
self._staging["endtime"] = endtime.strftime(DATETIME_FORMAT)
@property
def metrics(
self,
) -> typing.Generator[tuple[str, dict[str, int | float | bool]], None, None]:
yield from self._get_attribute("metrics").items()
@property
def events(
self,
) -> typing.Generator[tuple[str, dict[str, int | float | bool]], None, None]:
yield from self._get_attribute("events").items()
@write_only
def send_heartbeat(self) -> dict[str, typing.Any] | None:
if self._offline or not self._identifier:
return None
_url = self._base_url
_url /= f"{self._identifier}/heartbeat"
_response = sv_put(f"{_url}", headers=self._headers, data={})
return get_json_from_response(
response=_response,
expected_status=[http.HTTPStatus.OK],
scenario="Retrieving heartbeat state",
)
@property
def _abort_url(self) -> URL | None:
return self.url / "abort" if self.url else None
@property
def _artifact_url(self) -> URL | None:
if not self._identifier or not self.url:
return None
_url = self.url
_url /= "artifacts"
return _url
@property
def abort_trigger(self) -> bool:
if self._offline or not self._identifier:
return False
_response = sv_get(f"{self._abort_url}", headers=self._headers)
_json_response = get_json_from_response(
response=_response,
expected_status=[http.HTTPStatus.OK],
scenario=f"Retrieving abort status for run '{self.id}'",
)
return _json_response.get("status", False)
@property
def artifacts(self) -> list[dict[str, typing.Any]]:
"""Retrieve the artifacts for this run"""
if self._offline or not self._artifact_url:
return []
_response = sv_get(url=self._artifact_url, headers=self._headers)
return get_json_from_response(
response=_response,
expected_status=[http.HTTPStatus.OK],
scenario=f"Retrieving artifacts for run '{self.id}'",
expected_type=list,
)
@pydantic.validate_call
def abort(self, reason: str) -> dict[str, typing.Any]:
if not self._abort_url:
raise RuntimeError("Cannot abort run, no endpoint defined")
_response = sv_put(
f"{self._abort_url}", headers=self._headers, data={"reason": reason}
)
return get_json_from_response(
expected_status=[http.HTTPStatus.OK],
scenario=f"Abort of run '{self.id}'",
response=_response,
)
def on_reconnect(self, id_mapping: dict[str, str]):
online_alert_ids: list[str] = []
for id in self._staging.get("alerts", []):
try:
online_alert_ids.append(id_mapping[id])
except KeyError:
raise KeyError(
"Could not find alert ID in offline to online ID mapping."
)
# If run is offline, no alerts have been added yet, so add all alerts:
if self._identifier is not None and self._identifier.startswith("offline"):
self._staging["alerts"] = online_alert_ids
# Otherwise, only add alerts which have not yet been added
else:
self._staging["alerts"] = [
id for id in online_alert_ids if id not in list(self.alerts)
]