forked from line/centraldogma-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbase_client.py
More file actions
116 lines (100 loc) · 3.66 KB
/
base_client.py
File metadata and controls
116 lines (100 loc) · 3.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# Copyright 2025 LINE Corporation
#
# LINE Corporation licenses this file to you under the Apache License,
# version 2.0 (the "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at:
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from typing import Any, Dict, Union, Callable, TypeVar, Optional
from httpx import Client, Limits, Response
from tenacity import stop_after_attempt, wait_exponential, Retrying
from centraldogma.exceptions import to_exception
T = TypeVar("T")
class BaseClient:
def __init__(
self,
base_url: str,
token: str,
http2: bool = True,
retries: int = 1,
max_connections: int = 10,
max_keepalive_connections: int = 2,
**configs,
):
assert retries >= 0, "retries must be greater than or equal to zero"
assert max_connections > 0, "max_connections must be greater than zero"
assert (
max_keepalive_connections > 0
), "max_keepalive_connections must be greater than zero"
base_url = base_url[:-1] if base_url[-1] == "/" else base_url
for key in ["transport", "limits"]:
if key in configs:
del configs[key]
self.retries = retries
self.client = Client(
base_url=f"{base_url}/api/v1",
http2=http2,
limits=Limits(
max_connections=max_connections,
max_keepalive_connections=max_keepalive_connections,
),
**configs,
)
self.token = token
self.headers = self._get_headers(token)
self.patch_headers = self._get_patch_headers(token)
def __enter__(self):
return self
def __exit__(self, *_: Any) -> None:
self.client.close()
def request(
self,
method: str,
path: str,
handler: Optional[Dict[int, Callable[[Response], T]]] = None,
**kwargs,
) -> Union[Response, T]:
kwargs = self._set_request_headers(method, **kwargs)
retryer = Retrying(
stop=stop_after_attempt(self.retries + 1),
wait=wait_exponential(max=60),
reraise=True,
)
return retryer(self._request, method, path, handler, **kwargs)
def _set_request_headers(self, method: str, **kwargs) -> Dict:
default_headers = self.patch_headers if method == "patch" else self.headers
kwargs["headers"] = {**default_headers, **(kwargs.get("headers") or {})}
return kwargs
def _request(
self,
method: str,
path: str,
handler: Optional[Dict[int, Callable[[Response], T]]] = None,
**kwargs,
):
resp = self.client.request(method, path, **kwargs)
if handler:
converter = handler.get(resp.status_code)
if converter:
return converter(resp)
else: # Unexpected response status
raise to_exception(resp)
return resp
@staticmethod
def _get_headers(token: str) -> Dict:
return {
"Authorization": f"bearer {token}",
"Content-Type": "application/json",
}
@staticmethod
def _get_patch_headers(token: str) -> Dict:
return {
"Authorization": f"bearer {token}",
"Content-Type": "application/json-patch+json",
}