-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathgatekeeper_utils.py
More file actions
48 lines (39 loc) · 1.61 KB
/
gatekeeper_utils.py
File metadata and controls
48 lines (39 loc) · 1.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from fastapi import FastAPI
import httpx
from src.core import config
# Login to gatekeeper using credentials from config file
async def gk_login() -> str:
login_credentials = {
'username': config.WEATHER_SRV_GATEKEEPER_USER,
'password': config.WEATHER_SRV_GATEKEEPER_PASSWORD
}
async with httpx.AsyncClient() as client:
url = f'{config.GATEKEEPER_URL}/api/login/'
r = await client.post(url, data=login_credentials)
r.raise_for_status()
return (r.json()['access'], r.json()['refresh'])
# Logout to gatekeeper using credentials from config file
async def gk_logout(refresh_token):
async with httpx.AsyncClient() as client:
url = f'{config.GATEKEEPER_URL}/api/logout/'
r = await client.post(url, json={"refresh": refresh_token})
r.raise_for_status()
return
# List registered endpoints in gatekeeper
async def gk_service_directory(token: str) -> dict:
async with httpx.AsyncClient() as client:
headers = {
"Authorization": f"Bearer {token}"
}
r = await client.get(f'{config.GATEKEEPER_URL}/api/service_directory/', headers=headers)
r.raise_for_status()
return r.json()
# Register a specific endpoint and method in gatekeeper
async def gk_service_register(token: str, service_data: dict) -> dict:
async with httpx.AsyncClient() as client:
headers = {
"Authorization": f"Bearer {token}"
}
r = await client.post(f'{config.GATEKEEPER_URL}/api/register_service/', headers=headers, json=service_data)
r.raise_for_status()
return r.json()