-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathkerberos.py
More file actions
167 lines (142 loc) · 4.86 KB
/
kerberos.py
File metadata and controls
167 lines (142 loc) · 4.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
"""FastAPI adapter for KerberosService.
Copyright (c) 2024 MultiFactor
License: https://github.com/MultiDirectoryLab/MultiDirectory/blob/main/LICENSE
"""
from typing import Any, AsyncGenerator
from fastapi import Request, Response
from fastapi.responses import StreamingResponse
from pydantic import SecretStr
from starlette.background import BackgroundTask
from api.base_adapter import BaseAdapter
from api.main.schema import (
KerberosSetupRequest,
KtaddRequest,
PrincipalAddRequest,
PrincipalPutRequest,
)
from ldap_protocol.dialogue import LDAPSession, UserSchema
from ldap_protocol.kerberos import KerberosState
from ldap_protocol.kerberos.service import KerberosService
from ldap_protocol.ldap_requests.contexts import LDAPAddRequestContext
class KerberosFastAPIAdapter(BaseAdapter[KerberosService]):
"""Adapter for using KerberosService with FastAPI and background tasks."""
async def setup_krb_catalogue(
self,
mail: str,
krbadmin_password: SecretStr,
ldap_session: LDAPSession,
ctx: LDAPAddRequestContext,
) -> None:
"""Create Kerberos structure in the LDAP directory.
:raises HTTPException: on Kerberos errors
:return: None
"""
return await self._service.setup_krb_catalogue(
mail,
krbadmin_password,
ldap_session,
ctx,
)
async def setup_kdc(
self,
data: KerberosSetupRequest,
user: UserSchema,
request: Request,
) -> Response:
"""Set up KDC, generate configs, and schedule background task.
:raises HTTPException: on Kerberos errors
:return: BackgroundTask (background task scheduled)
"""
task_struct = await self._service.setup_kdc(
data.krbadmin_password.get_secret_value(),
data.admin_password.get_secret_value(),
data.stash_password.get_secret_value(),
user,
request,
)
task = BackgroundTask(
task_struct.func,
*task_struct.args,
**task_struct.kwargs,
)
return Response(background=task)
async def add_principal(self, request: PrincipalAddRequest) -> None:
"""Create principal in Kerberos with given name.
:raises HTTPException: on Kerberos errors
:return: None
"""
return await self._service.add_principal(
request.principal_name,
password=request.password,
algorithms=request.algorithms,
)
async def rename_principal(self, request: PrincipalPutRequest) -> None:
"""Modify principal (rename, password, algorithms).
:raises HTTPException: on Kerberos errors
:return: None
"""
return await self._service.rename_principal(
principal_name=request.principal_name,
principal_new_name=request.new_principal_name,
algorithms=request.algorithms,
password=request.password,
)
async def reset_principal_pw(
self,
principal_name: str,
new_password: str,
) -> None:
"""Reset principal password in Kerberos.
:raises HTTPException: on Kerberos errors
:return: None
"""
return await self._service.reset_principal_pw(
principal_name,
new_password,
)
async def delete_principal(
self,
principal_name: str,
) -> None:
"""Delete principal in Kerberos.
:raises HTTPException: on Kerberos errors
:return: None
"""
return await self._service.delete_principal(principal_name)
async def ktadd(
self,
data: KtaddRequest,
) -> StreamingResponse:
"""Generate keytab and return as streaming response.
:raises HTTPException: on Kerberos errors
:return: StreamingResponse
"""
aiter_bytes, task_struct = await self._service.ktadd(
data.names,
is_rand_key=data.is_rand_key,
)
task = BackgroundTask(
task_struct.func,
*task_struct.args,
**task_struct.kwargs,
)
if isinstance(aiter_bytes, bytes):
async def _bytes_to_async_iter(
data: bytes,
) -> AsyncGenerator[bytes, Any]:
yield data
aiter_bytes = _bytes_to_async_iter(aiter_bytes)
return StreamingResponse(
aiter_bytes,
media_type="application/txt",
headers={
"Content-Disposition": 'attachment; filename="krb5.keytab"',
},
background=task,
)
async def get_status(self) -> KerberosState:
"""Get Kerberos server state.
:raises HTTPException: on Kerberos errors
:return: KerberosState
"""
return await self._service.get_status()