-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapi_client.py
More file actions
358 lines (303 loc) · 10.3 KB
/
api_client.py
File metadata and controls
358 lines (303 loc) · 10.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
import os
from datetime import datetime
from datetime import timedelta
from http.cookiejar import DefaultCookiePolicy
from urllib.parse import urljoin
from urllib.parse import urlparse
import requests
from requests.adapters import HTTPAdapter
from requests.auth import AuthBase
from urllib3.util import Retry
import typing as t
from flareio.exceptions import TokenError
from flareio.models import ScrollEventsResult
from flareio.ratelimit import Limiter
from flareio.version import __version__ as _flareio_version
_API_DOMAIN_DEFAULT: str = "api.flare.io"
_ALLOWED_API_DOMAINS: t.Tuple[str, ...] = (
_API_DOMAIN_DEFAULT,
"api.eu.flare.io",
)
class FlareApiClient:
def __init__(
self,
*,
api_key: str,
tenant_id: t.Optional[int] = None,
session: t.Optional[requests.Session] = None,
api_domain: t.Optional[str] = None,
_auth: AuthBase | None = None,
_enable_beta_features: bool = False,
) -> None:
if not api_key:
raise Exception("API Key cannot be empty.")
api_domain = api_domain or _API_DOMAIN_DEFAULT
if api_domain not in _ALLOWED_API_DOMAINS:
raise Exception(
f"Invalid API domain: {api_domain}. Only {_ALLOWED_API_DOMAINS} are supported."
)
if api_domain != _API_DOMAIN_DEFAULT and not _enable_beta_features:
raise Exception("Custom API domains considered a beta feature.")
self._api_domain: str = api_domain
self._api_key: str = api_key
self._tenant_id: t.Optional[int] = tenant_id
self._auth: t.Optional[AuthBase] = _auth
self._api_token: t.Optional[str] = None
self._api_token_exp: t.Optional[datetime] = None
self._session = session or self._create_session()
@classmethod
def from_env(cls) -> "FlareApiClient":
api_key: t.Optional[str] = os.environ.get("FLARE_API_KEY")
if not api_key:
raise Exception(
"Please set the FLARE_API_KEY environment variable. Otherwise, initiate the client using FlareApiClient(api_key=...)."
)
tenant_id: t.Optional[str] = os.environ.get("FLARE_TENANT_ID")
return cls(
api_key=api_key,
tenant_id=int(tenant_id) if tenant_id else None,
)
@staticmethod
def _create_session() -> requests.Session:
session = requests.Session()
# Set User-Agent
session.headers["User-Agent"] = (
f"python-flareio/{_flareio_version} requests/{requests.__version__}"
)
# Don't accept cookies.
session.cookies.set_policy(
policy=DefaultCookiePolicy(
allowed_domains=[],
),
)
# Enable retries
session.mount(
"https://",
HTTPAdapter(max_retries=FlareApiClient._create_retry()),
)
return session
@staticmethod
def _create_retry() -> Retry:
retry = Retry(
total=5,
backoff_factor=2,
status_forcelist=[429, 502, 503, 504],
allowed_methods={"GET", "POST"},
)
# Support for urllib3 < 2.X
if hasattr(retry, "backoff_max"):
setattr(retry, "backoff_max", 15)
return retry
def generate_token(self) -> str:
payload: t.Optional[dict] = None
if self._tenant_id is not None:
payload = {
"tenant_id": self._tenant_id,
}
resp = self._session.post(
f"https://{self._api_domain}/tokens/generate",
json=payload,
headers={
"Authorization": self._api_key,
},
)
try:
resp.raise_for_status()
except Exception as ex:
raise TokenError("Failed to fetch API Token") from ex
token: str = resp.json()["token"]
self._api_token = token
self._api_token_exp = datetime.now() + timedelta(minutes=45)
return token
def _apply_auth(
self,
*,
request: requests.PreparedRequest,
) -> requests.PreparedRequest:
if self._auth:
self._auth(request)
return request
api_token: t.Optional[str] = self._api_token
if not api_token or (
self._api_token_exp and self._api_token_exp < datetime.now()
):
api_token = self.generate_token()
request.headers["Authorization"] = f"Bearer {api_token}"
return request
def _request(
self,
*,
method: str,
url: str,
params: t.Optional[t.Dict[str, t.Any]] = None,
json: t.Optional[t.Dict[str, t.Any]] = None,
headers: t.Optional[t.Dict[str, t.Any]] = None,
) -> requests.Response:
url = urljoin(f"https://{self._api_domain}", url)
netloc: str = urlparse(url).netloc
if not netloc == self._api_domain:
raise Exception(
f"Client was used to access {netloc=} at {url=}. Only the domain {self._api_domain} is supported."
)
request = requests.Request(
method=method,
url=url,
params=params,
json=json,
headers=headers,
)
prepared = self._session.prepare_request(request)
prepared = self._apply_auth(request=prepared)
resp = self._session.send(prepared)
return resp
def post(
self,
url: str,
*,
params: t.Optional[t.Dict[str, t.Any]] = None,
json: t.Optional[t.Dict[str, t.Any]] = None,
headers: t.Optional[t.Dict[str, t.Any]] = None,
) -> requests.Response:
return self._request(
method="POST",
url=url,
params=params,
json=json,
headers=headers,
)
def get(
self,
url: str,
*,
params: t.Optional[t.Dict[str, t.Any]] = None,
json: t.Optional[t.Dict[str, t.Any]] = None,
headers: t.Optional[t.Dict[str, t.Any]] = None,
) -> requests.Response:
return self._request(
method="GET",
url=url,
params=params,
json=json,
headers=headers,
)
def put(
self,
url: str,
*,
params: t.Optional[t.Dict[str, t.Any]] = None,
json: t.Optional[t.Dict[str, t.Any]] = None,
headers: t.Optional[t.Dict[str, t.Any]] = None,
) -> requests.Response:
return self._request(
method="PUT",
url=url,
params=params,
json=json,
headers=headers,
)
def delete(
self,
url: str,
*,
params: t.Optional[t.Dict[str, t.Any]] = None,
json: t.Optional[t.Dict[str, t.Any]] = None,
headers: t.Optional[t.Dict[str, t.Any]] = None,
) -> requests.Response:
return self._request(
method="DELETE",
url=url,
params=params,
json=json,
headers=headers,
)
def scroll(
self,
*,
method: t.Literal[
"GET",
"POST",
],
url: str,
params: t.Optional[t.Dict[str, t.Any]] = None,
json: t.Optional[t.Dict[str, t.Any]] = None,
headers: t.Optional[t.Dict[str, t.Any]] = None,
) -> t.Iterator[requests.Response]:
if method not in {"GET", "POST"}:
raise Exception("Scrolling is only supported for GET or POST")
params = dict(params) if params else None
json = dict(json) if json else None
from_in_params: bool = "from" in (params or {})
from_in_json: bool = "from" in (json or {})
if not (from_in_params or from_in_json):
raise Exception("You must specify from either in params or in json")
if from_in_params and from_in_json:
raise Exception("You can't specify from both in params and in json")
while True:
resp = self._request(
method=method,
url=url,
params=params,
json=json,
headers=headers,
)
resp.raise_for_status()
resp_body = resp.json()
yield resp
if "next" not in resp_body:
raise Exception(
"'next' was not found in the response body. Are you sure it supports scrolling?"
)
next_page: t.Optional[str] = resp_body["next"]
if not next_page:
break
if params and from_in_params:
params["from"] = next_page
if json and from_in_json:
json["from"] = next_page
def scroll_events(
self,
*,
method: t.Literal[
"GET",
"POST",
],
pages_url: str,
events_url: str,
params: t.Optional[t.Dict[str, t.Any]] = None,
json: t.Optional[t.Dict[str, t.Any]] = None,
_pages_limiter: t.Optional[Limiter] = None,
_events_limiter: t.Optional[Limiter] = None,
) -> t.Iterator[ScrollEventsResult]:
pages_limiter: Limiter = _pages_limiter or Limiter(
tick_interval=timedelta(seconds=1),
)
events_limiter: Limiter = _events_limiter or Limiter(
tick_interval=timedelta(seconds=0.25),
)
pages_limiter.tick()
for page_resp in self.scroll(
method=method,
url=pages_url,
params=params,
json=json,
):
page_resp.raise_for_status()
page_items: t.List[dict] = page_resp.json()["items"]
page_next: t.Optional[str] = page_resp.json()["next"]
for event_item in page_items:
event_uid: str = event_item["metadata"]["uid"]
events_limiter.tick()
event_resp: requests.Response = self.get(
url=events_url,
params={
"uid": event_uid,
},
)
event_resp.raise_for_status()
event: dict = event_resp.json()
yield ScrollEventsResult(
metadata=event_item,
event=event,
next=page_next,
)
pages_limiter.tick()