-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathobservations.py
More file actions
122 lines (102 loc) · 4.42 KB
/
observations.py
File metadata and controls
122 lines (102 loc) · 4.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
from typing import Union
from pydantic import HttpUrl
from consys4py.con_sys_api import ConnectedSystemsRequestBuilder
from consys4py.constants import APITerms
def list_all_observations(server_addr: HttpUrl, api_root: str = APITerms.API.value, headers=None):
"""
Lists all observations
:return:
"""
builder = ConnectedSystemsRequestBuilder()
api_request = (builder.with_server_url(server_addr)
.with_api_root(api_root)
.for_resource_type(APITerms.OBSERVATIONS.value)
.build_url_from_base()
.with_headers(headers)
.with_request_method('GET')
.build())
return api_request.make_request()
def list_observations_from_datastream(server_addr: HttpUrl, datastream_id: str, api_root: str = APITerms.API.value,
headers=None):
"""
Lists all observations of a datastream
:return:
"""
builder = ConnectedSystemsRequestBuilder()
api_request = (builder.with_server_url(server_addr)
.with_api_root(api_root)
.for_resource_type(APITerms.DATASTREAMS.value)
.with_resource_id(datastream_id)
.for_sub_resource_type(APITerms.OBSERVATIONS.value)
.build_url_from_base()
.with_headers(headers)
.with_request_method('GET')
.build())
return api_request.make_request()
def add_observations_to_datastream(server_addr: HttpUrl, datastream_id: str, request_body: Union[str, dict],
api_root: str = APITerms.API.value, headers=None):
"""
Adds an observation to a datastream by its id
:return:
"""
builder = ConnectedSystemsRequestBuilder()
api_request = (builder.with_server_url(server_addr)
.with_api_root(api_root)
.for_resource_type(APITerms.DATASTREAMS.value)
.with_resource_id(datastream_id)
.for_sub_resource_type(APITerms.OBSERVATIONS.value)
.with_request_body(request_body)
.build_url_from_base()
.with_headers(headers)
.with_request_method('POST')
.build())
return api_request.make_request()
def retrieve_observation_by_id(server_addr: HttpUrl, observation_id: str, api_root: str = APITerms.API.value,
headers=None):
"""
Retrieves an observation by its id
:return:
"""
builder = ConnectedSystemsRequestBuilder()
api_request = (builder.with_server_url(server_addr)
.with_api_root(api_root)
.for_resource_type(APITerms.OBSERVATIONS.value)
.with_resource_id(observation_id)
.build_url_from_base()
.with_headers(headers)
.with_request_method('GET')
.build())
return api_request.make_request()
def update_observation_by_id(server_addr: HttpUrl, observation_id: str, request_body: Union[str, dict],
api_root: str = APITerms.API.value, headers=None):
"""
Updates an observation by its id
:return:
"""
builder = ConnectedSystemsRequestBuilder()
api_request = (builder.with_server_url(server_addr)
.with_api_root(api_root)
.for_resource_type(APITerms.OBSERVATIONS.value)
.with_resource_id(observation_id)
.with_request_body(request_body)
.build_url_from_base()
.with_headers(headers)
.with_request_method('PUT')
.build())
return api_request.make_request()
def delete_observation_by_id(server_addr: HttpUrl, observation_id: str, api_root: str = APITerms.API.value,
headers=None):
"""
Deletes an observation by its id
:return:
"""
builder = ConnectedSystemsRequestBuilder()
api_request = (builder.with_server_url(server_addr)
.with_api_root(api_root)
.for_resource_type(APITerms.OBSERVATIONS.value)
.with_resource_id(observation_id)
.build_url_from_base()
.with_headers(headers)
.with_request_method('DELETE')
.build())
return api_request.make_request()