-
Notifications
You must be signed in to change notification settings - Fork 67
Expand file tree
/
Copy pathbackup_manager.py
More file actions
423 lines (369 loc) · 14.9 KB
/
backup_manager.py
File metadata and controls
423 lines (369 loc) · 14.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
# (C) 2025 GoodData Corporation
import json
import os
import shutil
import tempfile
import time
import traceback
from pathlib import Path
from typing import Any, Type
import attrs
import requests
import yaml
from gooddata_sdk.utils import PROFILES_FILE_PATH, profile_content
from gooddata_pipelines.api.gooddata_api_wrapper import GoodDataApi
from gooddata_pipelines.backup_and_restore.backup_input_processor import (
BackupInputProcessor,
)
from gooddata_pipelines.backup_and_restore.constants import (
BackupSettings,
DirNames,
)
from gooddata_pipelines.backup_and_restore.models.input_type import InputType
from gooddata_pipelines.backup_and_restore.models.storage import (
BackupRestoreConfig,
StorageType,
)
from gooddata_pipelines.backup_and_restore.storage.base_storage import (
BackupStorage,
)
from gooddata_pipelines.backup_and_restore.storage.local_storage import (
LocalStorage,
)
from gooddata_pipelines.backup_and_restore.storage.s3_storage import (
S3Storage,
)
from gooddata_pipelines.logger import LogObserver
from gooddata_pipelines.utils.rate_limiter import RateLimiter
@attrs.define
class BackupBatch:
list_of_ids: list[str]
class BackupManager:
storage: BackupStorage
def __init__(self, host: str, token: str, config: BackupRestoreConfig):
self._api = GoodDataApi(host, token)
self.logger = LogObserver()
self.config = config
self.storage = self._get_storage(self.config)
self.org_id = self._api.get_organization_id()
self.loader = BackupInputProcessor(self._api, self.config.api_page_size)
self._api_rate_limiter = RateLimiter(
calls_per_second=self.config.api_calls_per_second,
)
@classmethod
def create(
cls: Type["BackupManager"],
config: BackupRestoreConfig,
host: str,
token: str,
) -> "BackupManager":
"""Creates a backup worker instance using the provided host and token."""
return cls(host=host, token=token, config=config)
@classmethod
def create_from_profile(
cls: Type["BackupManager"],
config: BackupRestoreConfig,
profile: str = "default",
profiles_path: Path = PROFILES_FILE_PATH,
) -> "BackupManager":
"""Creates a backup worker instance using a GoodData profile file."""
content = profile_content(profile, profiles_path)
return cls(**content, config=config)
@staticmethod
def _get_storage(conf: BackupRestoreConfig) -> BackupStorage:
"""Returns the storage class based on the storage type."""
if conf.storage_type == StorageType.S3:
return S3Storage(conf)
elif conf.storage_type == StorageType.LOCAL:
return LocalStorage(conf)
else:
raise RuntimeError(
f'Unsupported storage type "{conf.storage_type.value}".'
)
def get_user_data_filters(self, ws_id: str) -> dict:
"""Returns the user data filters for the specified workspace."""
with self._api_rate_limiter:
response: requests.Response = self._api.get_user_data_filters(ws_id)
if response.ok:
return response.json()
else:
raise RuntimeError(f"{response.status_code}: {response.text}")
def _store_user_data_filters(
self,
user_data_filters: dict,
export_path: Path,
ws_id: str,
) -> None:
"""Stores the user data filters in the specified export path."""
os.mkdir(
os.path.join(
export_path,
"gooddata_layouts",
self.org_id,
"workspaces",
ws_id,
"user_data_filters",
)
)
for filter in user_data_filters["userDataFilters"]:
udf_file_path = os.path.join(
export_path,
"gooddata_layouts",
self.org_id,
"workspaces",
ws_id,
"user_data_filters",
filter["id"] + ".yaml",
)
self._write_to_yaml(udf_file_path, filter)
@staticmethod
def _move_folder(source: Path, destination: Path) -> None:
"""Moves the source folder to the destination."""
shutil.move(source, destination)
@staticmethod
def _write_to_yaml(path: str, source: Any) -> None:
"""Writes the source to a YAML file."""
with open(path, "w") as outfile:
yaml.dump(source, outfile)
def _get_automations_from_api(self, workspace_id: str) -> Any:
"""Returns automations for the workspace as JSON."""
with self._api_rate_limiter:
response: requests.Response = self._api.get_automations(
workspace_id
)
if response.ok:
return response.json()
else:
raise RuntimeError(
f"Failed to get automations for {workspace_id}. "
+ f"{response.status_code}: {response.text}"
)
def _store_automations(self, export_path: Path, workspace_id: str) -> None:
"""Stores the automations in the specified export path."""
# Get the automations from the API
automations: Any = self._get_automations_from_api(workspace_id)
automations_folder_path: Path = Path(
export_path,
"gooddata_layouts",
self.org_id,
"workspaces",
workspace_id,
"automations",
)
automations_file_path: Path = Path(
automations_folder_path, "automations.json"
)
os.mkdir(automations_folder_path)
# Store the automations in a JSON file
if len(automations["data"]) > 0:
with open(automations_file_path, "w") as f:
json.dump(automations, f)
def store_declarative_filter_views(
self, export_path: Path, workspace_id: str
) -> None:
"""Stores the filter views in the specified export path."""
# Get the filter views YAML files from the API
with self._api_rate_limiter:
self._api.store_declarative_filter_views(workspace_id, export_path)
# Move filter views to the subfolder containing the analytics model
self._move_folder(
Path(export_path, "gooddata_layouts", self.org_id, "filter_views"),
Path(
export_path,
"gooddata_layouts",
self.org_id,
"workspaces",
workspace_id,
"filter_views",
),
)
def _get_workspace_export(
self,
local_target_path: str,
workspaces_to_export: list[str],
) -> None:
"""
Iterate over all workspaces in the workspaces_to_export list and store
their declarative_workspace and their respective user data filters.
"""
exported = False
for workspace_id in workspaces_to_export:
export_path = Path(
local_target_path,
self.org_id,
workspace_id,
BackupSettings.TIMESTAMP_SDK_FOLDER,
)
try:
user_data_filters = self.get_user_data_filters(workspace_id)
except Exception as e:
self.logger.error(
f"Skipping backup of {workspace_id} - check if workspace exists."
+ f"{e.__class__.__name__}: {e}"
)
continue
try:
# TODO: consider using the API to get JSON declarations in memory
# or check if there is a way to get YAML structures directly from
# the SDK. That way we could save and package all the declarations
# directly instead of reorganizing the folder structures. That should
# be more transparent/readable and possibly safer for threading
with self._api_rate_limiter:
self._api.store_declarative_workspace(
workspace_id, export_path
)
self.store_declarative_filter_views(export_path, workspace_id)
self._store_automations(export_path, workspace_id)
self._store_user_data_filters(
user_data_filters, export_path, workspace_id
)
self.logger.info(f"Stored export for {workspace_id}")
exported = True
except Exception as e:
self.logger.error(
f"Skipping {workspace_id}. {e.__class__.__name__} encountered: {e}"
)
if not exported:
raise RuntimeError(
"None of the workspaces were exported. Check that the source file "
+ "is correct and that the workspaces exist."
)
def _archive_gooddata_layouts_to_zip(self, folder: str) -> None:
"""Archives the gooddata_layouts directory to a zip file."""
try:
target_subdir = ""
for subdir, dirs, files in os.walk(folder):
if DirNames.LAYOUTS in dirs:
target_subdir = os.path.join(subdir, dirs[0])
if DirNames.LDM in dirs:
inner_layouts_dir = subdir + "/gooddata_layouts"
os.mkdir(inner_layouts_dir)
for dir in dirs:
shutil.move(
os.path.join(subdir, dir),
os.path.join(inner_layouts_dir),
)
shutil.make_archive(target_subdir, "zip", subdir)
shutil.rmtree(target_subdir)
except Exception as e:
self.logger.error(f"Error archiving {folder} to zip: {e}")
raise
@staticmethod
def _split_to_batches(
workspaces_to_export: list[str], batch_size: int
) -> list[BackupBatch]:
"""Splits the list of workspaces into batches of the specified size.
The batch is represented as a list of workspace IDs.
Returns a list of batches (i.e. list of lists of IDs)
"""
list_of_batches = []
while workspaces_to_export:
batch = BackupBatch(workspaces_to_export[:batch_size])
workspaces_to_export = workspaces_to_export[batch_size:]
list_of_batches.append(batch)
return list_of_batches
def _process_batch(
self,
batch: BackupBatch,
retry_count: int = 0,
) -> None:
"""Processes a single batch of workspaces for backup.
If the batch processing fails, the function will wait
and retry with exponential backoff up to BackupSettings.MAX_RETRIES.
The base wait time is defined by BackupSettings.RETRY_DELAY.
"""
try:
with tempfile.TemporaryDirectory() as tmpdir:
self._get_workspace_export(tmpdir, batch.list_of_ids)
self._archive_gooddata_layouts_to_zip(
str(Path(tmpdir, self.org_id))
)
self.storage.export(tmpdir, self.org_id)
except Exception as e:
if retry_count < BackupSettings.MAX_RETRIES:
# Retry with exponential backoff until MAX_RETRIES
next_retry = retry_count + 1
wait_time = BackupSettings.RETRY_DELAY**next_retry
self.logger.info(
f"{e.__class__.__name__} encountered while processing a batch. "
+ f"Retrying {next_retry}/{BackupSettings.MAX_RETRIES} "
+ f"in {wait_time} seconds..."
)
time.sleep(wait_time)
self._process_batch(batch, next_retry)
else:
# If the batch fails after MAX_RETRIES, raise the error
self.logger.error(f"Batch failed: {e.__class__.__name__}: {e}")
raise
def _process_batches(
self,
batches: list[BackupBatch],
) -> None:
"""
Processes batches sequentially to avoid overloading the API.
If any batch fails, the processing will stop.
"""
for i, batch in enumerate(batches, 1):
self.logger.info(f"Processing batch {i}/{len(batches)}...")
self._process_batch(batch)
def backup_workspaces(
self,
path_to_csv: str | None = None,
workspace_ids: list[str] | None = None,
) -> None:
"""Runs the backup process for a list of workspace IDs.
Will take the list of workspace IDs or read the list of
workspace IDs from a CSV file and create backup for each
workspace in storage specified in the configuration.
Args:
path_to_csv (str): Path to a CSV file containing a list of workspace IDs
workspace_ids (list[str]): List of workspace IDs
"""
self._backup(InputType.LIST_OF_WORKSPACES, path_to_csv, workspace_ids)
def backup_hierarchies(
self,
path_to_csv: str | None = None,
workspace_ids: list[str] | None = None,
) -> None:
"""Runs the backup process for a list of hierarchies.
Will take the list of workspace IDs or read the list of workspace IDs
from a CSV file and create backup for each of those workspaces' hierarchies
in storage specified in the configuration.
Workspace hierarchy means the workspace itself and all its direct and
indirect children.
Args:
path_to_csv (str): Path to a CSV file containing a list of workspace IDs
workspace_ids (list[str]): List of workspace IDs
"""
self._backup(InputType.HIERARCHY, path_to_csv, workspace_ids)
def backup_entire_organization(self) -> None:
"""Runs the backup process for the entire organization.
Will create backup for all workspaces in the organization in storage
specified in the configuration.
"""
self._backup(InputType.ORGANIZATION)
def _backup(
self,
input_type: InputType,
path_to_csv: str | None = None,
workspace_ids: list[str] | None = None,
) -> None:
"""Runs the backup process with the selected input type."""
try:
workspaces_to_export: list[str] = self.loader.get_ids_to_backup(
input_type,
path_to_csv,
workspace_ids,
)
batches = self._split_to_batches(
workspaces_to_export, self.config.batch_size
)
self.logger.info(
f"Exporting {len(workspaces_to_export)} workspaces in {len(batches)} batches."
)
self._process_batches(batches)
self.logger.info("Backup completed")
except Exception as e:
self.logger.error(f"Backup failed: {e.__class__.__name__}: {e}")
self.logger.error(traceback.format_exc())
raise