forked from gooddata/gooddata-python-sdk
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathprovisioning.py
More file actions
168 lines (137 loc) · 6.02 KB
/
provisioning.py
File metadata and controls
168 lines (137 loc) · 6.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# (C) 2025 GoodData Corporation
"""Provisioning base class for GoodData Pipelines."""
from pathlib import Path
from typing import Generic, Type, TypeVar
from gooddata_sdk.utils import PROFILES_FILE_PATH, profile_content
from gooddata_pipelines.api import GoodDataApi
from gooddata_pipelines.logger.logger import (
LogObserver,
)
from gooddata_pipelines.provisioning.utils.utils import EntityGroupIds
TFullLoadSourceData = TypeVar("TFullLoadSourceData")
TIncrementalSourceData = TypeVar("TIncrementalSourceData")
class Provisioning(Generic[TFullLoadSourceData, TIncrementalSourceData]):
"""Base provisioning class."""
TProvisioning = TypeVar("TProvisioning", bound="Provisioning")
source_group_full: list[TFullLoadSourceData]
source_group_incremental: list[TIncrementalSourceData]
FULL_LOAD_TYPE: type[TFullLoadSourceData]
INCREMENTAL_LOAD_TYPE: type[TIncrementalSourceData]
def __init__(self, host: str, token: str) -> None:
self.source_id: set[str] = set()
self.upstream_id: set[str] = set()
self._api = GoodDataApi(host, token)
self.logger: LogObserver = LogObserver()
self.fatal_exception: str = ""
@classmethod
def create(
cls: Type[TProvisioning], host: str, token: str
) -> TProvisioning:
"""Creates a provisioner instance using provided host and token."""
cls._validate_credentials(host, token)
return cls(host=host, token=token)
@classmethod
def create_from_profile(
cls: Type[TProvisioning],
profile: str = "default",
profiles_path: Path = PROFILES_FILE_PATH,
) -> TProvisioning:
"""Creates a provisioner instance using a GoodData profile file."""
content = profile_content(profile, profiles_path)
return cls(host=content["host"], token=content["token"])
@staticmethod
def _validate_credentials(host: str, token: str) -> None:
"""Validates the credentials."""
if (not host) and (not token):
raise ValueError("Host and token are required.")
if not host:
raise ValueError("Host is required.")
if not token:
raise ValueError("Token is required.")
@staticmethod
def _create_groups(
source_id: set[str], panther_id: set[str]
) -> EntityGroupIds:
"""Creates groups for provisioning as sets of IDs.
Sorts the IDs into three categories:
- IDs that exist both source and upstream (to be checked further)
- IDs that exist upstream but not in source (to be deleted)
- IDs that exist in source but not upstream (to be created)
"""
ids_in_both_systems: set[str] = source_id.intersection(panther_id)
ids_to_delete: set[str] = panther_id.difference(source_id)
ids_to_create: set[str] = source_id.difference(panther_id)
return EntityGroupIds(
ids_in_both_systems=ids_in_both_systems,
ids_to_delete=ids_to_delete,
ids_to_create=ids_to_create,
)
def _validate_source_data_type(
self,
source_data: list[TFullLoadSourceData] | list[TIncrementalSourceData],
model: type[TFullLoadSourceData] | type[TIncrementalSourceData],
) -> None:
"""Validates data type of the source data."""
if not all(isinstance(record, model) for record in source_data):
raise TypeError(
f"Not all elements in source data are instances of {model.__name__}"
)
def _provision_incremental_load(self) -> None:
raise NotImplementedError(
"Provisioning method to be implemented in the subclass."
)
def _provision_full_load(self) -> None:
raise NotImplementedError(
"Provisioning method to be implemented in the subclass."
)
def full_load(self, source_data: list[TFullLoadSourceData]) -> None:
"""Runs full provisioning workflow with the provided source data.
Full provisioning is a full load of the source data, where the source data
is assumed to a single source of truth and the upstream workspaces are updated
to match it.
That means:
- All workspaces declared in the source data are created if missing, or
updated to match the source data
- All child workspaces not declared under the parent workspace in the
source data are deleted
"""
try:
self._validate_source_data_type(source_data, self.FULL_LOAD_TYPE)
self.source_group_full = source_data
self._provision_full_load()
self.logger.info("Provisioning completed.")
except Exception as e:
self._handle_fatal_exception(e)
def incremental_load(
self, source_data: list[TIncrementalSourceData]
) -> None:
"""Runs incremental provisioning workflow with the provided source data.
Incremental provisioning is used to modify a subset of the upstream workspaces
based on the source data provided. Only changes requested in the source
data will be applied.
"""
try:
self._validate_source_data_type(
source_data, self.INCREMENTAL_LOAD_TYPE
)
self.source_group_incremental = source_data
self._provision_incremental_load()
self.logger.info("Provisioning completed.")
except Exception as e:
self._handle_fatal_exception(e)
def _handle_fatal_exception(self, e: Exception) -> None:
"""Handles fatal exceptions during provisioning.
Logs the exception content. Re-raises the exception if there is no
subscriber to the logger.
"""
self.fatal_exception = str(e)
if hasattr(e, "__dict__"):
exception_context = f"Context: {e.__dict__}"
else:
exception_context = ""
exception_message = (
f"Provisioning failed. Error: {self.fatal_exception}. "
+ exception_context
)
self.logger.error(exception_message)
raise Exception(exception_message)