-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathmigrate_roles.py
More file actions
182 lines (162 loc) · 7.49 KB
/
migrate_roles.py
File metadata and controls
182 lines (162 loc) · 7.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
import ast
import requests
import os
from migrate_policies import main as migrate_policies
SYSTEM_ROLES = ["VAULT_OWNER", "VAULT_EDITOR", "VAULT_VIEWER", "PIPELINE_MANAGER", "CONNECTION_MANAGER"]
ROLE_IDS = os.getenv("ROLE_IDS")
TARGET_VAULT_ID = os.getenv("TARGET_VAULT_ID")
SOURCE_ACCOUNT_ID = os.getenv("SOURCE_ACCOUNT_ID")
TARGET_ACCOUNT_ID = os.getenv("TARGET_ACCOUNT_ID")
SOURCE_ACCOUNT_AUTH = os.getenv("SOURCE_ACCOUNT_AUTH")
TARGET_ACCOUNT_AUTH = os.getenv("TARGET_ACCOUNT_AUTH")
SOURCE_ENV_URL = os.getenv("SOURCE_ENV_URL")
TARGET_ENV_URL = os.getenv("TARGET_ENV_URL")
MIGRATE_ALL_ROLES = os.getenv("MIGRATE_ALL_ROLES")
SKIP_ROLE_CREATION_IF_ROLE_EXISTS = os.getenv("SKIP_ROLE_CREATION_IF_ROLE_EXISTS")
SOURCE_VAULT_ID = os.getenv("SOURCE_VAULT_ID")
SOURCE_ACCOUNT_HEADERS = {
"X-SKYFLOW-ACCOUNT-ID": SOURCE_ACCOUNT_ID,
"Authorization": f"Bearer {SOURCE_ACCOUNT_AUTH}",
"Content-Type": "application/json",
}
TARGET_ACCOUNT_HEADERS = {
"X-SKYFLOW-ACCOUNT-ID": TARGET_ACCOUNT_ID,
"Authorization": f"Bearer {TARGET_ACCOUNT_AUTH}",
"Content-Type": "application/json",
}
def get_role(role_id):
"""Fetch a single role definition from the source account."""
response = requests.get(
f"{SOURCE_ENV_URL}/v1/roles/{role_id}", headers=SOURCE_ACCOUNT_HEADERS
)
response.raise_for_status()
return response.json()
def get_system_role(role_name):
"""Return a system role present in the target vault."""
response = requests.get(
f"{TARGET_ENV_URL}/v1/roles?name={role_name}&resource.type=VAULT&resource.ID={TARGET_VAULT_ID}", headers=TARGET_ACCOUNT_HEADERS
)
response.raise_for_status()
return response.json()
def create_role(role):
"""Create a custom role in the target vault."""
response = requests.post(
f"{TARGET_ENV_URL}/v1/roles", json=role, headers=TARGET_ACCOUNT_HEADERS
)
response.raise_for_status()
return response.json()
def get_role_policies(role_id):
"""List all policies attached to the given role."""
response = requests.get(
f"{SOURCE_ENV_URL}/v1/roles/{role_id}/policies", headers=SOURCE_ACCOUNT_HEADERS
)
response.raise_for_status()
return response.json()
def get_role_by_role_name(role_name):
"""Search the target vault for an existing custom role by name."""
response = requests.get(
f"{TARGET_ENV_URL}/v1/roles?name={role_name}&resource.type=VAULT&resource.ID={TARGET_VAULT_ID}",
headers=TARGET_ACCOUNT_HEADERS,
)
response.raise_for_status()
return response.json()
def assign_policy_to_role(policy_ids, role_id: list):
"""Assign the provided policies to the role."""
for policy_id in policy_ids:
assign_request = {"ID": policy_id, "roleIDs": role_id}
response = requests.post(
f"{TARGET_ENV_URL}/v1/policies/assign",
json=assign_request,
headers=TARGET_ACCOUNT_HEADERS,
)
response.raise_for_status()
# return response.json()
def list_all_roles() -> list:
"""Lists custom roles"""
response = requests.get(
f"{SOURCE_ENV_URL}/v1/roles?type=CUSTOM&resource.ID={SOURCE_VAULT_ID}&resource.type=VAULT",
headers=SOURCE_ACCOUNT_HEADERS,
)
response.raise_for_status()
return response.json()
def transform_role_payload(source_resource):
"""Transforms source role payload to target payload."""
transformed_resource = {}
transformed_resource["roleDefinition"] = source_resource["role"]["definition"]
permissions: list = source_resource["role"]["definition"]["permissions"]
new_permissions = [
permission
for permission in permissions
if permission
not in [
"accounts.read:upstream",
"workspaces.read:upstream",
"vaults.read:upstream",
]
]
# remove upstream read permissions that are implicitly granted in the target account
transformed_resource["roleDefinition"]["permissions"] = new_permissions
transformed_resource["resource"] = {"ID": TARGET_VAULT_ID, "type": "VAULT"}
return transformed_resource
def main(role_ids=None):
"""Migrates roles and their associated policies."""
try:
print("-- Initializing Roles migration --")
should_enable_custom_role_check = SKIP_ROLE_CREATION_IF_ROLE_EXISTS
if MIGRATE_ALL_ROLES:
if(SOURCE_VAULT_ID):
print(f"-- Fetching Roles for the vault {SOURCE_VAULT_ID}")
roles = list_all_roles()
role_ids = [role["ID"] for role in roles["roles"]]
else:
print("-- Please provide valid input. Source vault ID is required to migrate all roles --")
else:
role_ids = role_ids if role_ids else ast.literal_eval(ROLE_IDS)
roles_created = []
for index, role_id in enumerate(role_ids):
role_info = get_role(role_id)
role_name = role_info["role"]["definition"]["name"]
print(f"-- Working on Role {index + 1}: {role_name}, ID: {role_id} --")
if(role_name in SYSTEM_ROLES):
print('-- SYSTEM_ROLE found, skipping role creation --')
system_role = get_system_role(role_name)
roles_created.append({"ID": system_role["roles"][0]["ID"]})
else:
should_create_role = True
if(should_enable_custom_role_check):
print('-- Checking if a role exists for the given vault --')
role_response = get_role_by_role_name(role_name)
if(len(role_response["roles"]) == 1):
print("-- Found an existing CUSTOM_ROLE, skipping role creation --")
should_create_role = False
roles_created.append({"ID" : role_response["roles"][0]["ID"]})
else:
print("-- Role does not exist --")
if should_create_role:
role_payload = transform_role_payload(role_info)
print(f"-- Creating role: {role_name} --")
new_role = create_role(role_payload)
roles_created.append(new_role)
print(f"-- Fetching policies for the given role --")
role_policies = get_role_policies(role_id)
policy_ids = [policy["ID"] for policy in role_policies["policies"]]
no_of_policies = len(policy_ids)
if no_of_policies == 0:
print('-- No policies found for the given role --')
else:
print(f"-- Working on policies migration. No. of policies found for given role: {no_of_policies} --")
policies_created = migrate_policies(policy_ids)
created_policy_ids = [policy["ID"] for policy in policies_created]
assign_policy_to_role(created_policy_ids, [new_role["ID"]])
print(f"-- Role migrated successfully: {role_name}. Source ROLE_ID: {role_id}, Target ROLE_ID: {new_role['ID']} --")
print("-- Roles migration script executed successfully --")
return roles_created
except requests.exceptions.HTTPError as http_err:
print(f'-- Role creation failed for {role_name if role_name else ""}, ID: {role_id}. --')
print(f'-- migrate_roles HTTP error: {http_err.response.content.decode()} --')
exit(1)
except Exception as err:
print(f'-- migrate_roles error: {err} --')
exit(1)
if __name__ == "__main__":
main()