-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathuse_cases.py
More file actions
161 lines (141 loc) · 5.52 KB
/
use_cases.py
File metadata and controls
161 lines (141 loc) · 5.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
"""Setup service.
Copyright (c) 2025 MultiFactor
License: https://github.com/MultiDirectoryLab/MultiDirectory/blob/main/LICENSE
"""
import copy
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from constants import (
DOMAIN_ADMIN_GROUP_NAME,
FIRST_SETUP_DATA,
USERS_CONTAINER_NAME,
)
from enums import SamAccountTypeCodes
from ldap_protocol.auth.dto import SetupDTO
from ldap_protocol.auth.setup_gateway import SetupGateway
from ldap_protocol.identity.exceptions import (
AlreadyConfiguredError,
ForbiddenError,
)
from ldap_protocol.ldap_schema.entity_type_use_case import EntityTypeUseCase
from ldap_protocol.policies.audit.audit_use_case import AuditUseCase
from ldap_protocol.policies.password import PasswordPolicyUseCases
from ldap_protocol.roles.role_use_case import RoleUseCase
from ldap_protocol.utils.helpers import create_integer_hash, ft_now
class SetupUseCase:
"""Setup manager."""
def __init__(
self,
setup_gateway: SetupGateway,
entity_type_use_case: EntityTypeUseCase,
password_use_cases: PasswordPolicyUseCases,
role_use_case: RoleUseCase,
audit_use_case: AuditUseCase,
session: AsyncSession,
) -> None:
"""Initialize Setup manager.
:param setup_gateway: Setup use case
:param entity_type_use_case: Entity Type use case
return: None.
"""
self._setup_gateway = setup_gateway
self._entity_type_use_case = entity_type_use_case
self._password_use_cases = password_use_cases
self._role_use_case = role_use_case
self._audit_use_case = audit_use_case
self._session = session
async def setup(self, dto: SetupDTO) -> None:
"""Perform the initial setup of structure and policies.
:param dto: SetupDTO with setup parameters
:raises AlreadyConfiguredError: if setup already performed
:raises ForbiddenError: if password policy not passed
:return: None.
"""
if await self.is_setup():
raise AlreadyConfiguredError("Setup already performed")
await self._entity_type_use_case.create_for_first_setup()
data = copy.deepcopy(FIRST_SETUP_DATA)
data.append(self._create_user_data(dto))
await self._create(dto, data)
async def is_setup(self) -> bool:
"""Check if setup is performed.
:return: bool (True if setup is performed, False otherwise)
"""
return await self._setup_gateway.is_setup()
def _create_user_data(self, dto: SetupDTO) -> dict:
"""Create user data by request.
:param dto: SetupDTO with setup parameters
:return: dict with user data
"""
return {
"name": USERS_CONTAINER_NAME,
"object_class": "container",
"attributes": {"objectClass": ["top"]},
"children": [
{
"name": dto.username,
"object_class": "user",
"organizationalPerson": {
"sam_account_name": dto.username,
"user_principal_name": dto.user_principal_name,
"mail": dto.mail,
"display_name": dto.display_name,
"password": dto.password,
"groups": [DOMAIN_ADMIN_GROUP_NAME],
},
"attributes": {
"objectClass": [
"top",
"person",
"organizationalPerson",
"posixAccount",
"shadowAccount",
"inetOrgPerson",
],
"pwdLastSet": [ft_now()],
"loginShell": ["/bin/bash"],
"uidNumber": [str(create_integer_hash(dto.username))],
"gidNumber": ["513"],
"userAccountControl": ["512"],
"primaryGroupID": ["512"],
"givenName": [dto.username],
"sAMAccountType": [
str(SamAccountTypeCodes.SAM_USER_OBJECT),
],
},
"objectSid": 500,
},
],
}
async def _create(self, dto: SetupDTO, data: list) -> None:
"""Create setup environment.
:param dto: SetupDTO with setup parameters
:param data: list of data to create
:return: None.
"""
try:
await self._setup_gateway.setup_enviroment(
data=data,
dn=dto.domain,
is_system=True,
)
await self._password_use_cases.create_default_domain_policy()
errors = await (
self
._password_use_cases
.check_password_violations(
password=dto.password,
user=None,
)
) # fmt: skip
if errors:
raise ForbiddenError(errors)
await self._role_use_case.create_domain_admins_role()
await self._role_use_case.create_read_only_role()
await self._audit_use_case.create_policies()
await self._session.commit()
except IntegrityError:
await self._session.rollback()
raise AlreadyConfiguredError(
"Setup already performed (locked)",
)