-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdelete.py
More file actions
179 lines (150 loc) · 6.01 KB
/
delete.py
File metadata and controls
179 lines (150 loc) · 6.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
"""Delete protocol.
Copyright (c) 2024 MultiFactor
License: https://github.com/MultiDirectoryLab/MultiDirectory/blob/main/LICENSE
"""
from typing import AsyncGenerator, ClassVar
from sqlalchemy import delete, exists, select
from sqlalchemy.orm import joinedload, selectinload
from entities import Directory, Group
from enums import AceType
from ldap_protocol.asn1parser import ASN1Row
from ldap_protocol.kerberos.exceptions import (
KRBAPIConnectionError,
KRBAPIDeletePrincipalError,
KRBAPIPrincipalNotFoundError,
)
from ldap_protocol.ldap_codes import LDAPCodes
from ldap_protocol.ldap_responses import (
INVALID_ACCESS_RESPONSE,
DeleteResponse,
)
from ldap_protocol.objects import ProtocolRequests
from ldap_protocol.utils.helpers import is_dn_in_base_directory
from ldap_protocol.utils.queries import (
get_base_directories,
get_filter_from_path,
is_computer,
validate_entry,
)
from repo.pg.tables import Attribute, queryable_attr as qa
from .base import BaseRequest
from .contexts import LDAPDeleteRequestContext
class DeleteRequest(BaseRequest):
"""Delete request.
DelRequest ::= [APPLICATION 10] LDAPDN
"""
PROTOCOL_OP: ClassVar[int] = ProtocolRequests.DELETE
CONTEXT_TYPE: ClassVar[type] = LDAPDeleteRequestContext
entry: str
@classmethod
def from_data(cls, data: ASN1Row) -> "DeleteRequest":
return cls(entry=data)
async def handle( # noqa: C901
self,
ctx: LDAPDeleteRequestContext,
) -> AsyncGenerator[DeleteResponse, None]:
"""Delete request handler."""
if not ctx.ldap_session.user:
yield DeleteResponse(**INVALID_ACCESS_RESPONSE)
return
if not validate_entry(self.entry.lower()):
yield DeleteResponse(result_code=LDAPCodes.INVALID_DN_SYNTAX)
return
if not ctx.ldap_session.user.role_ids:
yield DeleteResponse(
result_code=LDAPCodes.INSUFFICIENT_ACCESS_RIGHTS,
)
return
query = (
select(Directory)
.options(
joinedload(qa(Directory.user)),
selectinload(qa(Directory.groups)).selectinload(
qa(Group.directory),
),
joinedload(qa(Directory.group)).selectinload(
qa(Group.members),
),
selectinload(qa(Directory.attributes)),
)
.filter(get_filter_from_path(self.entry))
)
query = ctx.access_manager.mutate_query_with_ace_load(
user_role_ids=ctx.ldap_session.user.role_ids,
query=query,
ace_types=[AceType.DELETE],
require_attribute_type_null=True,
)
directory = await ctx.session.scalar(query)
if not directory:
yield DeleteResponse(result_code=LDAPCodes.NO_SUCH_OBJECT)
return
if directory.is_system:
yield DeleteResponse(
result_code=LDAPCodes.UNWILLING_TO_PERFORM,
)
return
self.set_event_data(
{"before_attrs": self.get_directory_attrs(directory)},
)
if directory.is_domain:
yield DeleteResponse(result_code=LDAPCodes.UNWILLING_TO_PERFORM)
return
if directory.group:
primary_group_members_query = exists(Attribute).where(
qa(Attribute.name) == "primaryGroupID",
qa(Attribute.value) == directory.relative_id,
)
if await ctx.session.scalar(select(primary_group_members_query)):
yield DeleteResponse(
result_code=LDAPCodes.ENTRY_ALREADY_EXISTS,
error_message=(
"Can't delete group with members having"
" it as primary group."
),
)
return
has_access_to_delete = ctx.access_manager.check_entity_level_access(
aces=directory.access_control_entries,
entity_type_id=directory.entity_type_id,
)
if not has_access_to_delete:
yield DeleteResponse(
result_code=LDAPCodes.INSUFFICIENT_ACCESS_RIGHTS,
)
return
for base_directory in await get_base_directories(ctx.session):
if is_dn_in_base_directory(base_directory, self.entry):
base_dn = base_directory
break
try:
if directory.user:
if directory.path_dn == ctx.ldap_session.user.dn:
yield DeleteResponse(
result_code=LDAPCodes.OPERATIONS_ERROR,
error_message="Cannot delete yourself.",
)
return
await ctx.session_storage.clear_user_sessions(
directory.user.id,
)
await ctx.kadmin.del_principal(directory.user.sam_account_name)
if await is_computer(directory.id, ctx.session):
computer_sam_account_names = directory.attributes_dict.get("sAMAccountName") # noqa: E501 # fmt: skip
if computer_sam_account_names:
computer_sam_account_name = computer_sam_account_names[0]
await ctx.kadmin.del_principal(f"host/{computer_sam_account_name}") # noqa: E501 # fmt: skip
await ctx.kadmin.del_principal(f"host/{computer_sam_account_name}.{base_dn.name}") # noqa: E501 # fmt: skip
else:
raise KRBAPIDeletePrincipalError
except KRBAPIPrincipalNotFoundError:
pass
except (KRBAPIDeletePrincipalError, KRBAPIConnectionError):
yield DeleteResponse(
result_code=LDAPCodes.UNAVAILABLE,
errorMessage="KerberosError",
)
return
await ctx.session.execute(delete(Directory).filter_by(id=directory.id))
await ctx.session.commit()
yield DeleteResponse(result_code=LDAPCodes.SUCCESS)