-
Notifications
You must be signed in to change notification settings - Fork 135
Expand file tree
/
Copy pathparser.py
More file actions
559 lines (410 loc) · 24.9 KB
/
parser.py
File metadata and controls
559 lines (410 loc) · 24.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
import argparse
from datetime import datetime, timedelta
import configparser
import wmconstants
from enum import Enum
from os import path
auth_key = ['host',
'username',
'token']
class NotebookFormat(Enum):
dbc = 'DBC'
source = 'SOURCE'
html = 'HTML'
# jupyter is only supported for python notebooks. consider adding this back if there's demand
# jupyter = 'JUPYTER'
def __str__(self):
return self.value
class ValidateSkipTasks(argparse.Action):
def __call__(self, parser, args, values, option_string=None):
valid_tasks = wmconstants.TASK_OBJECTS
for task in values:
if task not in valid_tasks:
raise ValueError(f"invalid task {task}. Skipped tasks must come from {valid_tasks}.")
setattr(args, self.dest, values)
def valid_date(s):
try:
return datetime.strptime(s, "%Y-%m-%d")
except ValueError:
msg = "not a valid date: {0!r}. It must be in YYYY-MM-DD".format(s)
raise argparse.ArgumentTypeError(msg)
def is_azure_creds(creds):
if 'azuredatabricks.net' in creds.get('host', ''):
return True
return False
def convert_args_to_list(arg_str):
arg_list = map(lambda x: x.lstrip().rstrip(), arg_str.split(','))
return list(arg_list)
def get_login_credentials(creds_path='~/.databrickscfg', profile='DEFAULT'):
config = configparser.ConfigParser()
abs_creds_path = path.expanduser(creds_path)
config.read(abs_creds_path)
try:
current_profile = dict(config[profile])
if not current_profile:
raise ValueError(f"Unable to find a defined profile to run this tool. Profile \'{profile}\' not found.")
return current_profile
except KeyError:
raise ValueError(
'Unable to find credentials to load for profile. Profile only supports tokens.')
def get_export_user_parser():
# export workspace items
parser = argparse.ArgumentParser(
description='Export user(s) workspace artifacts from Databricks')
parser.add_argument('--profile', action='store', default='DEFAULT',
help='Profile to parse the credentials')
parser.add_argument('--azure', action='store_true', default=False,
help='Run on Azure. (Default is AWS)')
parser.add_argument('--skip-failed', action='store_true', default=False,
help='Skip retries for any failed hive metastore exports.')
parser.add_argument('--silent', action='store_true', default=False,
help='Silent all logging of export operations.')
# Don't verify ssl
parser.add_argument('--no-ssl-verification', action='store_true',
help='Set Verify=False when making http requests.')
parser.add_argument('--debug', action='store_true',
help='Enable debug logging')
parser.add_argument('--set-export-dir', action='store',
help='Set the base directory to export artifacts')
parser.add_argument('--users', action='store',
help='Download user(s) artifacts such as notebooks, cluster specs, jobs. '
'Provide a list of user ids / emails to export')
return parser
def get_export_parser():
# export workspace items
parser = argparse.ArgumentParser(description='Export full workspace artifacts from Databricks')
# export all users and groups
parser.add_argument('--users', action='store_true',
help='Download all the users and groups in the workspace')
# log all user workspace paths
parser.add_argument('--workspace', action='store_true',
help='Log all the notebook paths in the workspace. (metadata only)')
parser.add_argument('--notebook-format', type=NotebookFormat,
choices=list(NotebookFormat), default=NotebookFormat.dbc,
help='Choose the file format to download the notebooks (default: DBC)')
# download all user workspace notebooks
parser.add_argument('--download', action='store_true',
help='Download all notebooks for the environment')
# add all lib configs
parser.add_argument('--libs', action='store_true',
help='Log all the libs for the environment')
# add all clusters configs
parser.add_argument('--clusters', action='store_true',
help='Log all the clusters for the environment')
# get all job configs
parser.add_argument('--jobs', action='store_true',
help='Log all the job configs for the environment')
# get all metastore
parser.add_argument('--metastore', action='store_true',
help='log all the metastore table definitions')
# get all secret scopes
parser.add_argument('--secrets', action='store_true',
help='log all the secret scopes')
# get all mlflow experiments
parser.add_argument('--mlflow-experiments', action='store_true',
help='log all the mlflow experiments')
# get all mlflow experiments permissions
parser.add_argument('--mlflow-experiments-permissions', action='store_true',
help='log all the mlflow experiments permissions')
# get all mlflow runs
parser.add_argument('--mlflow-runs', action='store_true',
help='log all the mlflow runs')
# get all metastore
parser.add_argument('--metastore-unicode', action='store_true',
help='log all the metastore table definitions including unicode characters')
parser.add_argument('--session', action='store', default='',
help='If set, the script resumes from latest checkpoint of given session; '
'Otherwise, pipeline starts from beginning and creates a new session.')
# get all table ACLs (TODO need to make sure that unicode database object names are supported)
parser.add_argument('--table-acls', action='store_true',
help='log all table ACL grant and deny statements')
# cluster name used to export the metastore
parser.add_argument('--cluster-name', action='store',
help='Cluster name to export the metastore to a specific cluster. Cluster will be started.')
# get database to export for metastore and table ACLs
parser.add_argument('--database', action='store',
help='Database name to export for the metastore and table ACLs. Single database name supported')
# iam role used to export the metastore
parser.add_argument('--iam', action='store',
help='IAM Instance Profile to export metastore entires')
# skip failures
parser.add_argument('--skip-failed', action='store_true', default=False,
help='Skip retries for any failed hive metastore exports.')
# get mount points
parser.add_argument('--mounts', action='store_true', default=False,
help='Log all mount points.')
# get azure logs
parser.add_argument('--azure', action='store_true', default=False,
help='Run on Azure. (Default is AWS)')
#
parser.add_argument('--profile', action='store', default='DEFAULT',
help='Profile to parse the credentials')
parser.add_argument('--single-user', action='store',
help='User\'s email to export their user identity and entitlements')
parser.add_argument('--export-home', action='store',
help='User workspace name to export, typically the users email address')
parser.add_argument('--export-groups', action='store',
help='Group names to export as a set. Includes group, users, and notebooks.')
parser.add_argument('--workspace-acls', action='store_true',
help='Permissions for workspace objects to export')
parser.add_argument('--workspace-top-level-only', action='store_true',
help='Download only top level notebook directories')
parser.add_argument('--silent', action='store_true', default=False,
help='Silent all logging of export operations.')
# Don't verify ssl
parser.add_argument('--no-ssl-verification', action='store_true',
help='Set Verify=False when making http requests.')
parser.add_argument('--debug', action='store_true',
help='Enable debug logging')
parser.add_argument('--reset-exports', action='store_true',
help='Clear export directory')
parser.add_argument('--set-export-dir', action='store',
help='Set the base directory to export artifacts')
parser.add_argument('--pause-all-jobs', action='store_true',
help='Pause all scheduled jobs')
parser.add_argument('--unpause-all-jobs', action='store_true',
help='Unpause all scheduled jobs')
parser.add_argument('--update-account-id', action='store',
help='Set the account id for instance profiles to a new account id')
parser.add_argument('--old-account-id', action='store',
help='Old account ID to filter on')
parser.add_argument('--replace-old-email', action='store',
help='Old email address to update from logs')
parser.add_argument('--update-new-email', action='store',
help='New email address to replace the logs')
parser.add_argument('--replace-email', action='store',
help='Update old emails with new e-mails. NOTE: Similar to replace-old-email but capable of using multiple e-mails. Format old1@email:new1@email.com,old2@email.com:new2@email.com')
parser.add_argument('--bypass-windows-check', action='store_true',
help='By-pass windows os checker')
parser.add_argument('--use-checkpoint', action='store_true',
help='use checkpointing to restart from previous state')
parser.add_argument('--num-parallel', type=int, default=4, help='Number of parallel threads to use to '
'export/import')
parser.add_argument('--retry-total', type=int, default=3, help='Total number or retries when making calls to Databricks API')
parser.add_argument('--retry-backoff', type=float, default=1.0, help='Backoff factor to apply between retry attempts when making calls to Databricks API')
parser.add_argument('--start-date', action='store', default=None,
help='start-date format: YYYY-MM-DD. If not provided, defaults to past 30 days. Currently, only used for exporting ML runs objects.',
type=valid_date)
parser.add_argument('--exclude-work-item-prefixes', nargs='+', type=str, default=[],
help='List of prefixes to skip export for log_all_workspace_items')
return parser
def get_import_parser():
# import workspace items parser
parser = argparse.ArgumentParser(description='Import full workspace artifacts into Databricks')
# import all users and groups
parser.add_argument('--users', action='store_true',
help='Import all the users and groups from the logfile.')
# import all notebooks
parser.add_argument('--workspace', action='store_true',
help='Import all notebooks from export dir into the workspace.')
# skip previous successful imports
parser.add_argument('--restart-from-checkpoint', action='store_true',
help='Restart the workspace import and skip previously successful imports. '
'Only works with --workspace option')
parser.add_argument('--workspace-top-level', action='store_true',
help='Import all top level notebooks from export dir into the workspace. Excluding Users dirs')
parser.add_argument('--workspace-acls', action='store_true',
help='Permissions for workspace objects to import')
parser.add_argument('--overwrite-notebooks', action='store_true', default=False,
help='Flag to overwrite notebooks to forcefully overwrite during notebook imports')
parser.add_argument('--notebook-format', type=NotebookFormat,
choices=list(NotebookFormat), default=NotebookFormat.dbc,
help='Choose the file format of the notebook to import (default: DBC)')
parser.add_argument('--import-home', action='store',
help='User workspace name to import, typically the users email address')
parser.add_argument('--import-groups', action='store_true',
help='Groups to import into a new workspace. Includes group creation and user notebooks.')
# import all notebooks
parser.add_argument('--archive-missing', action='store_true',
help='Import all missing users into the top level /Archive/ directory.')
# import all lib configs
parser.add_argument('--libs', action='store_true',
help='Import all the libs from the logfile into the workspace.')
# import all clusters configs
parser.add_argument('--clusters', action='store_true',
help='Import all the cluster configs for the environment')
# import all job configs
parser.add_argument('--jobs', action='store_true',
help='Import all job configurations to the environment.')
# import all metastore
parser.add_argument('--metastore', action='store_true',
help='Import the metastore to the workspace.')
# import all metastore including defns with unicode
parser.add_argument('--metastore-unicode', action='store_true',
help='Import all the metastore table definitions with unicode characters')
parser.add_argument('--session', action='store', default='',
help='If set, the script resumes from latest checkpoint of given session; '
'Otherwise, pipeline starts from beginning and creates a new session.')
# import all table acls
parser.add_argument('--table-acls', action='store_true',
help='Import table acls to the workspace.')
parser.add_argument('--get-repair-log', action='store_true',
help='Report on current tables requiring repairs')
parser.add_argument('--repair-metastore-tables', action='store_true', default=False,
help='Repair legacy metastore tables')
# cluster name used to import the metastore
parser.add_argument('--cluster-name', action='store',
help='Cluster name to import the metastore to a specific cluster. Cluster will be started.')
# skip failures
parser.add_argument('--skip-failed', action='store_true', default=False,
help='Skip missing users that do not exist when importing user notebooks')
# import all secret scopes
parser.add_argument('--secrets', action='store_true',
help='Import all secret scopes')
# import all mlflow experiments
parser.add_argument('--mlflow-experiments', action='store_true',
help='Import all the mlflow experiments')
# import all mlflow experiments permissions
parser.add_argument('--mlflow-experiments-permissions', action='store_true',
help='Import all the mlflow experiments permissions')
# import all mlflow runs
parser.add_argument('--mlflow-runs', action='store_true',
help='Import all the mlflow runs')
# get azure logs
parser.add_argument('--azure', action='store_true',
help='Run on Azure. (Default is AWS)')
#
parser.add_argument('--profile', action='store', default='DEFAULT',
help='Profile to parse the credentials')
# Source workspace's profile. Necessary for importing mlflow runs objects
parser.add_argument('--src-profile', action='store', default=None,
help='Source Profile to parse the credentials')
parser.add_argument('--single-user', action='store',
help='User\'s email to export their user identity and entitlements')
# Don't verify ssl
parser.add_argument('--no-ssl-verification', action='store_true',
help='Set Verify=False when making http requests.')
parser.add_argument('--silent', action='store_true',
help='Silent all logging of import operations.')
parser.add_argument('--debug', action='store_true',
help='Enable debug logging')
parser.add_argument('--set-export-dir', action='store',
help='Set the base directory to import artifacts if the export dir was a customized')
parser.add_argument('--pause-all-jobs', action='store_true',
help='Pause all scheduled jobs')
parser.add_argument('--unpause-all-jobs', action='store_true',
help='Unpause all scheduled jobs')
parser.add_argument('--delete-all-jobs', action='store_true',
help='Delete all jobs')
parser.add_argument('--single-user-all-jobs', action='store_true',
help='Set all jobs as single user to allow UC enabled clusters')
parser.add_argument('--shared-all-jobs', action='store_true',
help='Set all jobs as shared to allow UC enabled clusters')
parser.add_argument('--set-policy-all-jobs', action='store',
help='Set all jobs with the provided policy')
parser.add_argument('--use-checkpoint', action='store_true',
help='use checkpointing to restart from previous state')
parser.add_argument('--num-parallel', type=int, default=4, help='Number of parallel threads to use to '
'export/import')
parser.add_argument('--retry-total', type=int, default=3, help='Total number or retries when making calls to Databricks API')
parser.add_argument('--retry-backoff', type=float, default=1.0, help='Backoff factor to apply between retry attempts when making calls to Databricks API')
return parser
def prompt_for_input(message):
import sys
# raw_input returns the empty string for "enter", therefore default is no
yes = {'yes', 'y', 'ye'}
no = {'no', 'n', ''}
choice = input(message + '\n').lower()
if choice in yes:
return True
elif choice in no:
return False
else:
sys.stdout.write("Please respond with 'yes' or 'no'")
def build_client_config_without_profile(args):
return build_client_config('', '', '', args)
def build_client_config(profile, url, token, args):
# cant use netrc credentials because requests module tries to load the credentials into http basic auth headers
# aws is the default
config = {'profile': profile,
'url': url,
'token': token,
'is_aws': (not args.azure),
'verbose': (not args.silent),
'verify_ssl': (not args.no_ssl_verification),
'skip_failed': args.skip_failed,
'debug': args.debug,
'file_format': str(args.notebook_format)
}
# this option only exists during imports so we check for existence
if 'overwrite_notebooks' in args:
config['overwrite_notebooks'] = args.overwrite_notebooks
else:
config['overwrite_notebooks'] = False
if args.set_export_dir:
if args.set_export_dir.rstrip()[-1] != '/':
config['export_dir'] = args.set_export_dir + '/'
else:
config['export_dir'] = args.set_export_dir
elif config['is_aws']:
config['export_dir'] = 'logs/'
else:
config['export_dir'] = 'azure_logs/'
config['use_checkpoint'] = args.use_checkpoint
config['num_parallel'] = args.num_parallel
config['retry_total'] = args.retry_total
config['retry_backoff'] = args.retry_backoff
return config
def get_pipeline_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description='Export user(s) workspace artifacts from Databricks')
parser.add_argument('--profile', action='store', default='DEFAULT',
help='Profile to parse the credentials')
parser.add_argument('--azure', action='store_true', default=False,
help='Run on Azure. (Default is AWS)')
parser.add_argument('--silent', action='store_true', default=False,
help='Silent all logging of export operations.')
parser.add_argument('--no-ssl-verification', action='store_true',
help='Set Verify=False when making http requests.')
parser.add_argument('--debug', action='store_true',
help='Enable debug logging')
parser.add_argument('--set-export-dir', action='store',
help='Set the base directory to export artifacts')
parser.add_argument('--cluster-name', action='store', required=False,
help='Cluster name to export the metastore to a specific cluster. Cluster will be started.')
# Workspace arguments
parser.add_argument('--notebook-format', type=NotebookFormat,
choices=list(NotebookFormat), default=NotebookFormat.dbc,
help='Choose the file format to download the notebooks (default: DBC)')
parser.add_argument('--overwrite-notebooks', action='store_true', default=False,
help='Flag to overwrite notebooks to forcefully overwrite during notebook imports')
parser.add_argument('--archive-missing', action='store_true',
help='Import all missing users into the top level /Archive/ directory.')
# Metastore arguments
parser.add_argument('--repair-metastore-tables', action='store_true', default=False,
help='Repair legacy metastore tables')
parser.add_argument('--metastore-unicode', action='store_true',
help='log all the metastore table definitions including unicode characters')
parser.add_argument('--skip-failed', action='store_true', default=False,
help='Skip retries for any failed hive metastore exports.')
parser.add_argument('--session', action='store', default='',
help='If set, pipeline resumes from latest checkpoint of given session; '
'Otherwise, pipeline starts from beginning and creates a new session.')
parser.add_argument('--dry-run', action='store_true', default=False,
help='Dry run the pipeline i.e. will not execute tasks if true.')
parser.add_argument('--export-pipeline', action='store_true',
help='Execute all export tasks.')
parser.add_argument('--import-pipeline', action='store_true',
help='Execute all import tasks.')
parser.add_argument('--validate-pipeline', action='store_true',
help='Validate exported data between source and destination.')
parser.add_argument('--validate-source-session', action='store', default='',
help='Session used by exporting source workspace. Only used for ' +
'--validate-pipeline.')
parser.add_argument('--validate-destination-session', action='store', default='',
help='Session used by exporting destination workspace. Only used for ' +
'--validate-pipeline.')
parser.add_argument('--use-checkpoint', action='store_true',
help='use checkpointing to restart from previous state')
parser.add_argument('--skip-tasks', nargs='+', type=str, action=ValidateSkipTasks, default=[],
help='List of tasks to skip from the pipeline.')
parser.add_argument('--num-parallel', type=int, default=4, help='Number of parallel threads to use to '
'export/import')
parser.add_argument('--retry-total', type=int, default=3, help='Total number or retries when making calls to Databricks API')
parser.add_argument('--retry-backoff', type=float, default=1.0, help='Backoff factor to apply between retry attempts when making calls to Databricks API')
parser.add_argument('--start-date', action='store', default=None,
help='start-date format: YYYY-MM-DD. If not provided, defaults to past 30 days. Currently, only used for exporting ML runs objects.',
type=valid_date)
parser.add_argument('--exclude-work-item-prefixes', nargs='+', type=str, default=[],
help='List of prefixes to skip export for log_all_workspace_items')
return parser