This repository was archived by the owner on Jan 16, 2023. It is now read-only.
forked from airbytehq/airbyte
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathauthenticator.py
More file actions
147 lines (118 loc) · 5 KB
/
authenticator.py
File metadata and controls
147 lines (118 loc) · 5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import dpath
import pendulum
import requests
from requests.auth import AuthBase
from typing import Any, List, Mapping, MutableMapping, Tuple, Union
from abc import abstractmethod
class AbstractGithubIntunedAuthenticator(AuthBase):
"""
Abstract class for an OAuth authenticators that implements the OAuth token refresh flow. The authenticator
is designed to generically perform the refresh flow without regard to how config fields are get/set by
delegating that behavior to the classes implementing the interface.
"""
def __call__(self, request: requests.Request) -> requests.Request:
"""Attach the HTTP headers required to authenticate on the HTTP request"""
request.headers.update(self.get_auth_header())
return request
def get_auth_header(self) -> Mapping[str, Any]:
"""HTTP header to set on the requests"""
return {"Authorization": f"Bearer {self.get_access_token()}"}
def get_access_token(self) -> str:
"""Returns the access token"""
if self.token_has_expired():
current_datetime = pendulum.now()
accessToken, expires_at = self.refresh_access_token()
self.access_token = accessToken
self.set_token_expiry_date(current_datetime, expires_at)
return self.access_token
return self.access_token
def token_has_expired(self) -> bool:
"""Returns True if the token is expired"""
if not self.get_token_expiry_date():
return True
return pendulum.now() > self.get_token_expiry_date()
def _get_refresh_access_token_response(self):
response = requests.request(method="POST", url=self.get_token_refresh_endpoint(), data=self.build_refresh_request_body())
response.raise_for_status()
return response.json()
def refresh_access_token(self) -> Tuple[str, int]:
"""
Returns the refresh token and its lifespan in seconds
:return: a tuple of (access_token, token_lifespan_in_seconds)
"""
try:
response_json = self._get_refresh_access_token_response()
return (
response_json["accessToken"],
response_json["expiresAt"],
)
except Exception as e:
raise Exception(f"Error while refreshing access token: {e}") from e
def build_refresh_request_body(self) -> Mapping[str, Any]:
"""
Returns the request body to set on the refresh request
Override to define additional parameters
"""
payload: MutableMapping[str, Any] = {
"installationId": self.get_installation_id(),
"secret": self.get_secret(),
}
return payload
@abstractmethod
def get_token_refresh_endpoint(self) -> str:
"""Returns the endpoint to refresh the access token"""
@abstractmethod
def get_installation_id(self) -> int:
"""The client id to authenticate"""
@abstractmethod
def get_secret(self) -> str:
"""The client secret to authenticate"""
@abstractmethod
def get_token_expiry_date(self) -> pendulum.DateTime:
"""Expiration date of the access token"""
@abstractmethod
def set_token_expiry_date(self, initial_time: pendulum.DateTime, value: Union[str, int]):
"""Setter for access token expiration date"""
@property
@abstractmethod
def access_token(self) -> str:
"""Returns the access token"""
@access_token.setter
@abstractmethod
def access_token(self, value: str) -> str:
"""Setter for the access token"""
class GithubIntunedAuthenticator(AbstractGithubIntunedAuthenticator):
"""
Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials.
The generated access token is attached to each request via the Authorization header.
If a connector_config is provided any mutation of it's value in the scope of this class will emit AirbyteControlConnectorConfigMessage.
"""
def __init__(
self,
token_refresh_endpoint: str,
installation_id: str,
secret: str,
):
self._token_refresh_endpoint = token_refresh_endpoint
self.installation_id = installation_id
self.secret = secret
self._token_expiry_date = None
self._access_token = None
def get_token_refresh_endpoint(self) -> str:
return self._token_refresh_endpoint
def get_installation_id(self) -> str:
return self.installation_id
def get_secret(self) -> str:
return self.secret
def get_access_token_name(self) -> str:
return self._access_token_name
def get_token_expiry_date(self) -> pendulum.DateTime:
return self._token_expiry_date
def set_token_expiry_date(self, initial_time: pendulum.DateTime, value: Union[str, int]):
self._token_expiry_date = pendulum.parse(value)
@property
def access_token(self) -> str:
return self._access_token
@access_token.setter
def access_token(self, value: str):
self._access_token = value