-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathfast_api_utils.py
More file actions
147 lines (126 loc) · 6.72 KB
/
fast_api_utils.py
File metadata and controls
147 lines (126 loc) · 6.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import base64
import json
import os
from mdps_ds_lib.lib.utils.file_utils import FileUtils
from mdps_ds_lib.lib.constants import Constants
from cumulus_lambda_functions.lib.authorization.uds_authorizer_abstract import UDSAuthorizorAbstract
from cumulus_lambda_functions.lib.authorization.uds_authorizer_factory import UDSAuthorizerFactory
from cumulus_lambda_functions.lib.lambda_logger_generator import LambdaLoggerGenerator
from fastapi import APIRouter, HTTPException, Request, Response
from cumulus_lambda_functions.lib.uds_db.db_constants import DBConstants
from cumulus_lambda_functions.lib.uds_db.uds_collections import UdsCollections
from cumulus_lambda_functions.uds_api.web_service_constants import WebServiceConstants
LOGGER = LambdaLoggerGenerator.get_logger(__name__, LambdaLoggerGenerator.get_level_from_env())
class FastApiUtils:
@staticmethod
def get_authorization_info(request: Request):
"""
:return:
"""
# TODO this can be a factory method
action = request.method
resource = request.url.path
bearer_token = request.headers.get('Authorization', '')
LOGGER.debug(f'raw bearer_token: {bearer_token}')
username_part = bearer_token.split('.')[1]
jwt_decoded = base64.standard_b64decode(f'{username_part}========'.encode()).decode()
LOGGER.debug(f'jwt_decoded: {jwt_decoded}')
jwt_decoded = json.loads(jwt_decoded)
ldap_groups = jwt_decoded['cognito:groups']
username = jwt_decoded['username']
return {
'username': username,
'ldap_groups': list(set(ldap_groups)),
'action': action,
'resource': resource,
}
@staticmethod
def get_cors_origins():
cors_origins = os.environ.get(Constants.CORS_ORIGINS, '').strip().split(',')
return cors_origins
@staticmethod
def get_api_base_prefix():
return os.environ.get(Constants.DAPA_API_PREIFX_KEY) if Constants.DAPA_API_PREIFX_KEY in os.environ else WebServiceConstants.API_PREFIX
@staticmethod
def get_api_url_base():
if Constants.DAPA_API_URL_BASE not in os.environ:
raise ValueError(f'missing DAPA_API_URL_BASE in ENV')
dapa_api_base = os.environ.get(Constants.DAPA_API_URL_BASE)
dapa_api_base = dapa_api_base[:-1] if dapa_api_base.endswith('/') else dapa_api_base
return dapa_api_base
@staticmethod
def replace_in_file(file_path, old_string, new_string):
try:
with open(file_path, 'r') as file:
content = file.read()
content = content.replace(old_string, new_string)
with open(file_path, 'w') as file:
file.write(content)
except Exception as e:
print(f'{file_path}.. {str(e)}')
return
@staticmethod
def replace_in_folder(folder_path, old_string, new_string):
for dirpath, _, filenames in os.walk(folder_path):
for filename in filenames:
file_path = os.path.join(dirpath, filename)
FastApiUtils.replace_in_file(file_path, old_string, new_string)
return
@staticmethod
def prep_stac_browser():
api_base_prefix = FastApiUtils.get_api_base_prefix()
original_static_parent_dir = f'{os.environ.get(WebServiceConstants.STATIC_PARENT_DIR, WebServiceConstants.STATIC_PARENT_DIR_DEFAULT)}stac_browser'
temp_static_parent_dir = f'/tmp/stac_browser'
FileUtils.copy_dir(original_static_parent_dir, temp_static_parent_dir)
stac_browser_prefix = f'/{api_base_prefix}/{WebServiceConstants.STAC_BROWSER}'
# Cannot change READ ONLY system in lambda. Need a workaround.
FastApiUtils.replace_in_folder(temp_static_parent_dir, WebServiceConstants.STAC_BROWSER_REPLACING_PREFIX, stac_browser_prefix)
stac_browser_settings = {'catalogUrl': f'{FastApiUtils.get_api_url_base()}/misc/catalog_list/'}
FastApiUtils.replace_in_folder(temp_static_parent_dir, f'"{WebServiceConstants.SETTING_PLACEHOLDER}"', json.dumps(stac_browser_settings))
FastApiUtils.replace_in_folder(temp_static_parent_dir, f"'{WebServiceConstants.SETTING_PLACEHOLDER}'", json.dumps(stac_browser_settings))
return stac_browser_prefix, temp_static_parent_dir
async def api_authorized_collections(request: Request):
auth_info = FastApiUtils.get_authorization_info(request) # TODO how to get different authorization info?
authorizer: UDSAuthorizorAbstract = UDSAuthorizerFactory() \
.get_instance(os.getenv('AUTHORIZER_TYPE'),
es_url=os.getenv('ES_URL'),
es_port=int(os.getenv('ES_PORT', '443')),
use_ssl=os.getenv('ES_USE_SSL', 'TRUE').strip() is True,
)
collection_regexes = authorizer.get_authorized_collections(DBConstants.read, auth_info['ldap_groups'])
return collection_regexes
async def uds_api_authorize(request: Request):
db_constants_map = {
'GET': DBConstants.read,
'DELETE': DBConstants.delete,
'PUT': DBConstants.create,
'POST': DBConstants.create,
'PATCH': DBConstants.create, # TODO Update?
}
authorization_type = db_constants_map[request.method]
if request.method not in db_constants_map:
raise HTTPException(status_code=405, detail=json.dumps({
'message': f'unknown HTTP method for authorization. Pls use these {db_constants_map}'
}))
if 'collection_id' not in request.path_params:
raise HTTPException(status_code=400, detail=json.dumps({
'message': 'collection_id is needed as path parameter for authorization.'
}))
collection_id = request.path_params['collection_id']
auth_info = FastApiUtils.get_authorization_info(request) # TODO how to get different authorization info?
authorizer: UDSAuthorizorAbstract = UDSAuthorizerFactory() \
.get_instance(os.getenv('AUTHORIZER_TYPE'),
es_url=os.getenv('ES_URL'),
es_port=int(os.getenv('ES_PORT', '443')),
use_ssl=os.getenv('ES_USE_SSL', 'TRUE').strip() is True,
)
collection_identifier = UdsCollections.decode_identifier(collection_id)
if not authorizer.is_authorized_for_collection(authorization_type, collection_id,
auth_info['ldap_groups'],
collection_identifier.tenant,
collection_identifier.venue):
LOGGER.debug(f'user: {auth_info["username"]} is not authorized for {collection_id}')
raise HTTPException(status_code=403, detail=json.dumps({
'message': 'not authorized to execute this action'
}))
return True