-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathuser_management.py
More file actions
130 lines (100 loc) · 3.75 KB
/
user_management.py
File metadata and controls
130 lines (100 loc) · 3.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import ldap
import ldap.modlist
import os
import pwd
import subprocess
from typing import List
import config
class LDAPConn(object):
"""Representing connection to a LDAP instance with simple get/update/delete interface"""
def __init__(self):
self.ld = ldap.initialize(config.LDAP_URI)
self.ld.simple_bind_s(who=config.LDAP_ROOTUSER,
cred=config.LDAP_ROOTUSER_PASSWD)
def delete_entry(self, dn: str) -> bool:
"""Delete entry on LDAP, return True if success"""
try:
self.ld.delete_s(dn)
except ldap.NO_SUCH_OBJECT:
return False
return True
def get_entry(self, dn: str) -> dict:
"""Retrieve LDAP entry by dn
Return: The entry, should be a dict. Or None if not found
"""
result_list = self.ld.search_s(dn, ldap.SCOPE_SUBTREE)
if len(result_list) == 0:
return None
assert len(result_list) == 1
_, entry = result_list[0]
return entry
def update_entry(self, dn: str, new_entry: dict) -> bool:
"""Update entry on LDAP, return True if success"""
old_entry = self.get_entry(dn)
if old_entry is None:
return False
modlist = ldap.modlist.modifyModlist(old_entry, new_entry)
self.ld.modify_s(dn, modlist)
return True
def _uid_to_ldap_dn(user_id: str) -> str:
return f"uid={user_id},ou=people,{config.LDAP_BASE}"
def _gid_to_ldap_dn(group_id: str) -> str:
return f"cn={group_id},ou=groups,{config.LDAP_BASE}"
def unix_user_exists(user_id: str) -> bool:
"""Return whether the given user_id exists on this unix system (including LDAP)"""
try:
pwd.getpwnam(user_id)
except KeyError:
return False
return True
def create_ldap_user(user_id: str) -> bool:
"""Create new user on LDAP
Note: This is using external shell script
Retrun: True if successful, else False
"""
# TODO: Rewrite the logic in python
r = subprocess.run(["bash", "./adduser.sh", user_id])
return r.returncode == 0
def delete_ldap_user(user_id: str) -> bool:
"""Remove user from LDAP, return True if success"""
conn = LDAPConn()
success = conn.delete_entry(_uid_to_ldap_dn(user_id))
if not success:
return False
if os.path.isdir(f'/home/{user_id}'):
r = subprocess.run(['sudo', 'rm', '-rf', f'/home/{user_id}'])
assert r.returncode == 0
return True
def update_user_sshkey_list(user_id: str, keys: List[str]) -> bool:
"""Update user's list of SSH Public Key, return True if success"""
conn = LDAPConn()
byte_keys = [key.encode('utf-8') for key in keys]
user_dn = _uid_to_ldap_dn(user_id)
user_entry = conn.get_entry(user_dn)
if user_entry is None:
return False
user_entry['sshPublicKey'] = byte_keys
conn.update_entry(user_dn, user_entry)
return True
def update_group_member_list(group_id: str, member_uids: List[str]) -> bool:
"""Update a specific group's member list, return True if success"""
conn = LDAPConn()
byte_member_uids = [uid.encode('utf-8') for uid in member_uids]
group_dn = _gid_to_ldap_dn(group_id)
group_entry = conn.get_entry(group_dn)
if group_entry is None:
return False
group_entry['memberUid'] = byte_member_uids
conn.update_entry(group_dn, group_entry)
return True
def update_user_login_shell(user_id: str, shell: str) -> bool:
"""Update user's login shell, return True if success"""
conn = LDAPConn()
byte_shell = shell.encode('utf-8')
user_dn = _uid_to_ldap_dn(user_id)
user_entry = conn.get_entry(user_dn)
if user_entry is None:
return False
user_entry['loginShell'] = [byte_shell]
conn.update_entry(user_dn, user_entry)
return True