-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrouter_auth.py
More file actions
246 lines (217 loc) · 7.05 KB
/
router_auth.py
File metadata and controls
246 lines (217 loc) · 7.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
"""Auth api.
Copyright (c) 2024 MultiFactor
License: https://github.com/MultiDirectoryLab/MultiDirectory/blob/main/LICENSE
"""
from ipaddress import IPv4Address, IPv6Address
from typing import Annotated
from dishka import FromDishka
from fastapi import Body, Depends, Request, Response, status
from fastapi_error_map.routing import ErrorAwareRouter
from fastapi_error_map.rules import rule
from api.auth.adapters import AuthFastAPIAdapter
from api.auth.utils import get_ip_from_request, get_user_agent_from_request
from api.error_routing import (
ERROR_MAP_TYPE,
DishkaErrorAwareRoute,
DomainErrorTranslator,
)
from api.utils import require_master_db
from enums import DomainCodes
from ldap_protocol.auth.exceptions.mfa import (
MFAAPIError,
MFAConnectError,
MFARequiredError,
MissingMFACredentialsError,
)
from ldap_protocol.auth.schemas import (
MFAChallengeResponse,
OAuth2Form,
SetupRequest,
)
from ldap_protocol.dialogue import UserSchema
from ldap_protocol.identity.exceptions import (
AlreadyConfiguredError,
AuthValidationError,
ForbiddenError,
LoginFailedError,
PasswordPolicyError,
UnauthorizedError,
UserNotFoundError,
)
from ldap_protocol.kerberos.exceptions import KRBAPIChangePasswordError
from ldap_protocol.session_storage import SessionStorage
from .utils import verify_auth
translator = DomainErrorTranslator(DomainCodes.AUTH)
error_map: ERROR_MAP_TYPE = {
UnauthorizedError: rule(
status=status.HTTP_401_UNAUTHORIZED,
translator=translator,
),
AlreadyConfiguredError: rule(
status=status.HTTP_400_BAD_REQUEST,
translator=translator,
),
ForbiddenError: rule(
status=status.HTTP_400_BAD_REQUEST,
translator=translator,
),
LoginFailedError: rule(
status=status.HTTP_400_BAD_REQUEST,
translator=translator,
),
PasswordPolicyError: rule(
status=status.HTTP_422_UNPROCESSABLE_ENTITY,
translator=translator,
),
UserNotFoundError: rule(
status=status.HTTP_400_BAD_REQUEST,
translator=translator,
),
AuthValidationError: rule(
status=status.HTTP_422_UNPROCESSABLE_ENTITY,
translator=translator,
),
MFARequiredError: rule(
status=status.HTTP_400_BAD_REQUEST,
translator=translator,
),
MissingMFACredentialsError: rule(
status=status.HTTP_400_BAD_REQUEST,
translator=translator,
),
MFAAPIError: rule(
status=status.HTTP_400_BAD_REQUEST,
translator=translator,
),
MFAConnectError: rule(
status=status.HTTP_400_BAD_REQUEST,
translator=translator,
),
PermissionError: rule(
status=status.HTTP_400_BAD_REQUEST,
translator=translator,
),
KRBAPIChangePasswordError: rule(
status=status.HTTP_400_BAD_REQUEST,
translator=translator,
),
}
auth_router = ErrorAwareRouter(
prefix="/auth",
tags=["Auth"],
route_class=DishkaErrorAwareRoute,
)
@auth_router.post("/", error_map=error_map)
async def login(
form: Annotated[OAuth2Form, Depends()],
request: Request,
ip: Annotated[IPv4Address | IPv6Address, Depends(get_ip_from_request)],
user_agent: Annotated[str, Depends(get_user_agent_from_request)],
auth_manager: FromDishka[AuthFastAPIAdapter],
) -> MFAChallengeResponse | None:
"""Create session to cookies and storage.
- **username**: username formats:
`DN`, `userPrincipalName`, `saMAccountName`
- **password**: password
\f
:param Annotated[OAuth2Form, Depends form: login form
:param Response response: FastAPI response
:param Annotated[IPv4Address | IPv6Address, Depends ip: client ip
:param Annotated[str, Depends user_agent: client user agent
:param FromDishka[IdentityFastAPIAdapter] auth_manager: auth manager
:raises HTTPException: 401 if incorrect username or password
:raises HTTPException: 403 if user not part of domain admins
:raises HTTPException: 403 if user account is disabled
:raises HTTPException: 403 if user account is expired
:raises HTTPException: 403 if ip is not provided
:raises HTTPException: 403 if user not part of network policy
:return None: None
"""
return await auth_manager.login(
form=form,
request=request,
ip=ip,
user_agent=user_agent,
)
@auth_router.get("/me", error_map=error_map)
async def users_me(
identity_adapter: FromDishka[AuthFastAPIAdapter],
) -> UserSchema:
"""Get current logged-in user data.
:param identity_adapter: IdentityFastAPIAdapter instance for user
identity operations
:return: UserSchema
"""
return await identity_adapter.get_current_user()
@auth_router.delete(
"/",
response_class=Response,
error_map=error_map,
)
async def logout(
response: Response,
storage: FromDishka[SessionStorage],
identity_adapter: FromDishka[AuthFastAPIAdapter],
) -> None:
"""Delete token cookies and user session.
:param response: FastAPI Response
:param storage: SessionStorage
:param user: UserSchema (current user)
:return: None
"""
user = await identity_adapter.get_current_user()
response.delete_cookie("id", httponly=True)
await storage.delete_user_session(user.session_id)
@auth_router.patch(
"/user/password",
status_code=200,
dependencies=[Depends(verify_auth), Depends(require_master_db)],
error_map=error_map,
)
async def password_reset(
auth_manager: FromDishka[AuthFastAPIAdapter],
identity: Annotated[str, Body(examples=["admin"])],
new_password: Annotated[str, Body(examples=["password"])],
old_password: Annotated[
str | None,
Body(examples=["old_password"]),
] = None,
) -> None:
"""Reset user's (entry) password.
:param identity: user identity (userPrincipalName, saMAccountName or DN)
:param new_password: new password
:param old_password: old password (if verifying)
:param auth_manager: IdentityFastAPIAdapter
:raises HTTPException: 404 if user not found
:raises HTTPException: 422 if password is invalid
:raises HTTPException: 424 if kerberos password update failed
:return: None
"""
await auth_manager.reset_password(identity, new_password, old_password)
@auth_router.get("/setup", error_map=error_map)
async def check_setup(
auth_manager: FromDishka[AuthFastAPIAdapter],
) -> bool:
"""Check if initial setup is required.
:param auth_manager: IdentityFastAPIAdapter
:return: bool
"""
return await auth_manager.check_setup_needed()
@auth_router.post(
"/setup",
status_code=status.HTTP_200_OK,
responses={423: {"detail": "Locked"}},
error_map=error_map,
dependencies=[Depends(require_master_db)],
)
async def first_setup(
request: SetupRequest,
auth_manager: FromDishka[AuthFastAPIAdapter],
) -> None:
"""Perform initial structure and policy setup.
:param request: SetupRequest
:param auth_manager: IdentityFastAPIAdapter
:raises HTTPException: 423 if setup already performed
:return: None
"""
await auth_manager.perform_first_setup(request)