-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathschema.py
More file actions
108 lines (75 loc) · 3 KB
/
schema.py
File metadata and controls
108 lines (75 loc) · 3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
"""Schemas for main router.
Copyright (c) 2024 MultiFactor
License: https://github.com/MultiDirectoryLab/MultiDirectory/blob/main/LICENSE
"""
from typing import final
from dishka import AsyncContainer
from pydantic import BaseModel, Field, PrivateAttr, SecretStr
from sqlalchemy.sql.elements import ColumnElement, UnaryExpression
from entities import Directory
from ldap_protocol.filter_interpreter import (
Filter,
FilterInterpreterProtocol,
StringFilterInterpreter,
)
from ldap_protocol.ldap_requests import SearchRequest as LDAPSearchRequest
from ldap_protocol.ldap_responses import SearchResultDone, SearchResultEntry
from ldap_protocol.utils.const import GRANT_DN_STRING
class SearchRequest(LDAPSearchRequest):
"""Search request for web api."""
filter: str = Field(..., examples=["(objectClass=*)"]) # type: ignore
_filter_interpreter: FilterInterpreterProtocol = PrivateAttr(
default_factory=StringFilterInterpreter,
)
def _cast_filter(self) -> UnaryExpression | ColumnElement:
"""Cast str filter to sa sql."""
filter_ = self.filter.lower().replace("objectcategory", "objectclass")
return self._filter_interpreter.cast_to_sql(
Filter.parse(filter_).simplify(),
)
@staticmethod
def get_directory_sid(directory: Directory) -> str | None: # type: ignore
for attr in getattr(directory, "attributes", []):
if attr.name and attr.name.lower() == "objectsid" and attr.value:
return attr.value
return None
@staticmethod
def get_directory_guid(directory: Directory) -> str: # type: ignore
return str(directory.object_guid)
@final
async def handle_api( # type: ignore
self,
container: AsyncContainer,
) -> list[SearchResultEntry | SearchResultDone]:
"""Get all responses."""
return await self._handle_api(container) # type: ignore
class SearchResponse(SearchResultDone):
"""Search response for web api."""
search_result: list[SearchResultEntry]
class KerberosSetupRequest(BaseModel):
"""Kerberos setup data."""
krbadmin_password: SecretStr
admin_password: SecretStr
stash_password: SecretStr
class PrincipalAddRequest(BaseModel):
"""Request schema for POST /principal/add."""
principal_name: str
algorithms: list[str] | None = None
password: str | None = None
class KtaddRequest(BaseModel):
"""Request schema for POST /ktadd."""
names: list[str]
is_rand_key: bool = False
class ModifyPrincipalRequest(BaseModel):
"""Request schema for PUT /principal (full modify)."""
principal_name: str
new_name: str | None = None
algorithms: list[str] | None = None
password: str | None = None
class PrimaryGroupRequest(BaseModel):
"""Request schema for setting primary group."""
directory_dn: GRANT_DN_STRING
group_dn: GRANT_DN_STRING
class PrimaryGroupPathDNResponse(BaseModel):
"""Response schema for getting group path DN by primary group ID."""
path_dn: str