-
Notifications
You must be signed in to change notification settings - Fork 24
Expand file tree
/
Copy pathresources_handler.py
More file actions
681 lines (568 loc) · 31 KB
/
resources_handler.py
File metadata and controls
681 lines (568 loc) · 31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
# Unless explicitly stated otherwise all files in this repository are licensed
# under the 3-clause BSD style license (see LICENSE).
# This product includes software developed at Datadog (https://www.datadoghq.com/).
# Copyright 2019 Datadog, Inc.
from __future__ import annotations
import asyncio
from collections import defaultdict
from copy import deepcopy
from time import sleep
from typing import Dict, TYPE_CHECKING, List, Optional, Set, Tuple
from click import confirm
from pprint import pformat
from datadog_sync.constants import TRUE, FALSE, FORCE, Command, Origin, Status
from datadog_sync.utils.resource_utils import (
CustomClientHTTPError,
ResourceConnectionError,
SkipResource,
check_diff,
create_global_downtime,
find_attr,
prep_resource,
init_topological_sorter,
)
from datadog_sync.utils.workers import Workers
if TYPE_CHECKING:
from datadog_sync.utils.configuration import Configuration
from graphlib import TopologicalSorter
class ResourcesHandler:
def __init__(self, config: Configuration) -> None:
self.config = config
self.sorter: Optional[TopologicalSorter] = None
self.cleanup_sorter: Optional[TopologicalSorter] = None
self.worker: Optional[Workers] = None
self._dependency_graph = Optional[Dict[Tuple[str, str], List[Tuple[str, str]]]]
async def init_async(self) -> None:
self.worker: Workers = Workers(self.config)
async def reset(self) -> None:
if self.config.backup_before_reset:
await self.import_resources()
else:
# make the warning red and give the user time to hit ctrl-c
self.config.logger.warning("\n\033[91m\nABOUT TO RESET WITHOUT BACKUP\033[00m\n")
sleep(5)
await self.import_resources_without_saving()
# move the import data from source to destination
self.config.state._data.destination = self.config.state._data.source
for resource_type in self.config.resources_arg:
resources = {}
for _id, resource in self.config.state._data.destination[resource_type].items():
resources[(resource_type, _id)] = resource
if resources:
delete = _cleanup_prompt(self.config, resources)
if delete:
self.config.logger.info("deleting resources...")
await self.worker.init_workers(self._cleanup_worker, None, None)
for resource in resources:
self.worker.work_queue.put_nowait(resource)
await self.worker.schedule_workers()
self.config.logger.info("finished deleting resources")
async def apply_resources(self) -> Tuple[int, int]:
# Build dependency graph and missing resources
self._dependency_graph, missing = self.get_dependency_graph()
# Import resources that are missing but needed for resource connections
if self.config.force_missing_dependencies and missing:
self.config.logger.info("importing missing dependencies...")
await self.worker.init_workers(self._force_missing_dep_import_cb, None, len(missing))
for item in missing:
self.worker.work_queue.put_nowait(item)
await self.worker.schedule_workers()
self.config.logger.info("finished importing missing dependencies")
else:
self.config.logger.info("did not import missing dependencies...")
# handle resource cleanups
if self.config.cleanup != FALSE:
cleanup_resources = self.config.state.get_resources_to_cleanup(self.config.resources_arg)
if cleanup_resources:
cleanup = _cleanup_prompt(self.config, cleanup_resources)
if cleanup:
self.config.logger.info("cleaning up resources with dependency ordering...")
# Build reverse dependency graph for ordered cleanup
try:
cleanup_graph = self.get_cleanup_dependency_graph(cleanup_resources)
self.config.logger.info(f"Built cleanup dependency graph with {len(cleanup_graph)} resources")
# Detect circular dependencies
from datadog_sync.utils.resource_utils import detect_circular_dependencies
cycle = detect_circular_dependencies(cleanup_graph)
if cycle:
# Circular dependency detected!
cycle_str = " -> ".join([f"{rt}:{rid}" for rt, rid in cycle])
self.config.logger.error(f"Circular dependency detected in cleanup graph: {cycle_str}")
self.config.logger.error(
"Cannot safely delete resources. Please manually break circular references first."
)
raise ValueError(
f"Circular dependency in cleanup graph: {cycle_str}. "
"Manual intervention required to break the cycle."
)
# Initialize cleanup sorter
self.cleanup_sorter = init_topological_sorter(cleanup_graph)
# Initialize workers with cleanup sorter stop condition
await self.worker.init_workers(
self._cleanup_worker, lambda: not self.cleanup_sorter.is_active(), None
)
# Run cleanup with ordering
if self.config.show_progress_bar:
await self.worker.schedule_workers_with_pbar(
total=len(cleanup_graph), additional_coros=[self.run_cleanup_sorter()]
)
else:
await self.worker.schedule_workers(additional_coros=[self.run_cleanup_sorter()])
self.config.logger.info("finished cleaning up resources")
except ValueError:
# Circular dependency - already logged, re-raise
raise
except Exception as e:
# Unexpected error building or running cleanup graph
self.config.logger.error(f"Error during ordered cleanup: {str(e)}")
self.config.logger.warning(
"Falling back to unordered cleanup (may fail due to dependency issues)"
)
# Fallback to old unordered behavior
await self.worker.init_workers(self._cleanup_worker, None, None)
for i in cleanup_resources:
self.worker.work_queue.put_nowait(i)
await self.worker.schedule_workers()
self.config.logger.info("finished cleaning up resources (unordered fallback)")
# Run pre-apply hooks
resource_types = set(i[0] for i in self._dependency_graph)
await self.worker.init_workers(self._pre_apply_hook_cb, None, len(resource_types))
for resource_type in resource_types:
self.worker.work_queue.put_nowait(resource_type)
await self.worker.schedule_workers()
# Additional pre-apply actions
if self.config.create_global_downtime:
await create_global_downtime(self.config)
# initalize topological sorters
self.sorter = init_topological_sorter(self._dependency_graph)
await self.worker.init_workers(self._apply_resource_cb, lambda: not self.sorter.is_active(), None)
if self.config.show_progress_bar:
await self.worker.schedule_workers_with_pbar(
total=len(self._dependency_graph), additional_coros=[self.run_sorter()]
)
else:
await self.worker.schedule_workers(additional_coros=[self.run_sorter()])
self.config.logger.info(f"finished syncing resource items: {self.worker.counter}.")
self.config.state.dump_state()
async def _apply_resource_cb(self, q_item: List) -> None:
resource_type, _id = q_item
try:
r_class = self.config.resources[resource_type]
resource = deepcopy(self.config.state.source[resource_type][_id])
if not r_class.resource_config.concurrent:
await r_class.resource_config.async_lock.acquire()
if not r_class.filter(resource):
self.worker.counter.increment_filtered()
return
# Run hooks
await r_class._pre_resource_action_hook(_id, resource)
r_class.connect_resources(_id, resource)
prep_resource(r_class.resource_config, resource)
if _id in self.config.state.destination[resource_type]:
destination_copy = deepcopy(self.config.state.destination[resource_type][_id])
prep_resource(r_class.resource_config, destination_copy)
diff = check_diff(r_class.resource_config, resource, destination_copy)
if not diff:
raise SkipResource(_id, resource_type, "No differences detected.")
self.config.logger.debug(f"Running update for {resource_type} with {_id}")
await r_class._update_resource(_id, resource)
await r_class._send_action_metrics(
Command.SYNC.value, _id, Status.SUCCESS.value, tags=["action_sub_type:update"]
)
self.config.logger.debug(f"Finished update for {resource_type} with {_id}")
else:
self.config.logger.debug(f"Running create for {resource_type} with id: {_id}")
await r_class._create_resource(_id, resource)
await r_class._send_action_metrics(
Command.SYNC.value, _id, Status.SUCCESS.value, tags=["action_sub_type:create"]
)
self.config.logger.debug(f"finished create for {resource_type} with id: {_id}")
self.worker.counter.increment_success()
except SkipResource as e:
self.config.logger.info(f"skipping resource: {str(e)}", resource_type=resource_type, _id=_id)
self.worker.counter.increment_skipped()
await r_class._send_action_metrics(Command.SYNC.value, _id, Status.SKIPPED.value, tags=["reason:unknown"])
except ResourceConnectionError:
self.worker.counter.increment_skipped()
await r_class._send_action_metrics(
Command.SYNC.value, _id, Status.SKIPPED.value, tags=["reason:connection_error"]
)
except Exception as e:
self.worker.counter.increment_failure()
self.config.logger.error(str(e), resource_type=resource_type, _id=_id)
await r_class._send_action_metrics(Command.SYNC.value, _id, Status.FAILURE.value)
finally:
# always place in done queue regardless of exception thrown
self.sorter.done(q_item)
if not r_class.resource_config.concurrent:
r_class.resource_config.async_lock.release()
async def diffs(self) -> None:
self._dependency_graph, _ = self.get_dependency_graph()
# Run pre-apply hooks
resource_types = set(i[0] for i in self._dependency_graph.keys())
await self.worker.init_workers(self._pre_apply_hook_cb, None, len(resource_types))
for resource_type in resource_types:
self.worker.work_queue.put_nowait(resource_type)
await self.worker.schedule_workers()
# Check diffs for individual resource items
await self.worker.init_workers(self._diffs_worker_cb, None, None)
for resource_type, _id in self._dependency_graph.keys():
self.worker.work_queue.put_nowait((resource_type, _id, False))
if self.config.cleanup != FALSE:
for resource_type, _id in self.config.state.get_resources_to_cleanup(self.config.resources_arg).keys():
self.worker.work_queue.put_nowait((resource_type, _id, True))
await self.worker.schedule_workers()
async def _diffs_worker_cb(self, q_item: List) -> None:
resource_type, _id, delete = q_item
r_class = self.config.resources[resource_type]
if delete:
self.config.logger.info(
"to be deleted: \n {}".format(
pformat(self.config.state.destination[resource_type][_id]),
),
resource_type=resource_type,
_id=_id,
)
else:
resource = self.config.state.source[resource_type][_id]
if not r_class.filter(resource):
return
try:
await r_class._pre_resource_action_hook(_id, resource)
except SkipResource as e:
self.config.logger.warning(f"skipping resource: resource_type:{resource_type} id:{_id}")
self.config.logger.debug(str(e))
return
try:
r_class.connect_resources(_id, resource)
except ResourceConnectionError:
return
if _id in self.config.state.destination[resource_type]:
# We have to compare the prepared versions to deal w/ non-nullable attributes
destination_copy = deepcopy(self.config.state.destination[resource_type][_id])
resource_copy = deepcopy(resource)
prep_resource(r_class.resource_config, destination_copy)
prep_resource(r_class.resource_config, resource_copy)
diff = check_diff(r_class.resource_config, destination_copy, resource_copy)
if diff:
self.config.logger.info("diff: \n {}".format(pformat(diff)), resource_type=resource_type, _id=_id)
else:
self.config.logger.info(f"to be created: {resource_type} {_id}")
async def import_resources(self) -> None:
await self.import_resources_without_saving()
self.config.state.dump_state(Origin.SOURCE)
async def import_resources_without_saving(self) -> None:
# Get all resources for each resource type
tmp_storage = defaultdict(list)
await self.worker.init_workers(self._import_get_resources_cb, None, len(self.config.resources_arg), tmp_storage)
for resource_type in self.config.resources_arg:
self.worker.work_queue.put_nowait(resource_type)
if self.config.show_progress_bar:
await self.worker.schedule_workers_with_pbar(total=len(self.config.resources_arg))
else:
await self.worker.schedule_workers()
self.config.logger.info(f"Finished getting resources. {self.worker.counter}")
# Begin importing individual resource items
self.config.logger.info("importing individual resource items")
await self.worker.init_workers(self._import_resource, None, None)
total = 0
for k, v in tmp_storage.items():
total += len(v)
for resource in v:
self.worker.work_queue.put_nowait((k, resource))
if self.config.show_progress_bar:
await self.worker.schedule_workers_with_pbar(total=total)
else:
await self.worker.schedule_workers()
self.config.logger.info(f"finished importing individual resource items: {self.worker.counter}.")
async def _import_get_resources_cb(self, resource_type: str, tmp_storage) -> None:
self.config.logger.info("getting resources", resource_type=resource_type)
r_class = self.config.resources[resource_type]
self.config.state.source[resource_type].clear()
try:
get_resp = await r_class._get_resources(self.config.source_client)
self.worker.counter.increment_success()
tmp_storage[resource_type] = get_resp
except TimeoutError:
self.worker.counter.increment_failure()
self.config.logger.error(f"TimeoutError while getting resources {resource_type}")
except Exception as e:
self.worker.counter.increment_failure()
self.config.logger.error(f"Error while getting resources {resource_type}: {str(e)}")
async def _import_resource(self, q_item: List) -> None:
resource_type, resource = q_item
_id = resource.get("id")
r_class = self.config.resources[resource_type]
if not r_class.filter(resource):
self.worker.counter.increment_filtered()
return
try:
await r_class._import_resource(resource=resource)
self.worker.counter.increment_success()
await r_class._send_action_metrics(Command.IMPORT.value, _id, Status.SUCCESS.value)
except SkipResource as e:
self.worker.counter.increment_skipped()
await r_class._send_action_metrics(Command.IMPORT.value, _id, Status.SKIPPED.value)
self.config.logger.info(f"skipping resource: {str(e)}", resource_type=resource_type, _id=_id)
self.config.logger.debug(str(e))
except Exception as e:
self.worker.counter.increment_failure()
await r_class._send_action_metrics(Command.IMPORT.value, _id, Status.FAILURE.value)
self.config.logger.error(f"error while importing resource: resource_type:{resource_type} id:{_id}")
self.config.logger.debug(f"error detail: {str(e)}", resource_type=resource_type)
async def _force_missing_dep_import_cb(self, q_item: List):
resource_type, _id = q_item
try:
_id = await self.config.resources[resource_type]._import_resource(_id=_id)
except CustomClientHTTPError as e:
self.config.logger.error(f"error importing dependency: {str(e)}", resource_type=resource_type, _id=_id)
return
failed_connections, missing_deps = self._resource_connections(resource_type, _id)
self._dependency_graph[q_item] = failed_connections
for missing_id in missing_deps:
self.worker.work_queue.put_nowait(missing_id)
async def _cleanup_worker(self, q_item: List) -> None:
resource_type, _id = q_item
self.config.logger.info("deleting resource", resource_type=resource_type, _id=_id)
r_class = self.config.resources[resource_type]
try:
if not r_class.resource_config.concurrent:
await r_class.resource_config.async_lock.acquire()
await r_class._delete_resource(_id)
self.worker.counter.increment_success()
await r_class._send_action_metrics("delete", _id, Status.SUCCESS.value)
except SkipResource as e:
self.worker.counter.increment_skipped()
await r_class._send_action_metrics("delete", _id, Status.SKIPPED.value, tags=["reason:unknown"])
self.config.logger.info(f"skipping resource: {str(e)}", resource_type=resource_type, _id=_id)
self.config.logger.info(f"skip deleting resource: {str(e)}", resource_type=resource_type, _id=_id)
except Exception as e:
self.worker.counter.increment_failure()
await r_class._send_action_metrics("delete", _id, Status.FAILURE.value)
self.config.logger.error(f"error deleting resource {resource_type} with id {_id}: {str(e)}")
finally:
# Mark as done in cleanup sorter if it exists
if hasattr(self, "cleanup_sorter") and self.cleanup_sorter:
self.cleanup_sorter.done(q_item)
if not r_class.resource_config.concurrent:
r_class.resource_config.async_lock.release()
async def _pre_apply_hook_cb(self, resource_type: str) -> None:
try:
await self.config.resources[resource_type]._pre_apply_hook()
except Exception as e:
self.config.logger.warning(f"error while running pre-apply hook: {str(e)}", resource_type=resource_type)
async def run_sorter(self):
loop = asyncio.get_event_loop()
while await loop.run_in_executor(None, self.sorter.is_active):
for node in self.sorter.get_ready():
if node[1] not in self.config.state.source[node[0]]:
# at this point, we already attempted to import missing resources
# so mark the node as complete and continue
self.sorter.done(node)
continue
await self.worker.work_queue.put(node)
await asyncio.sleep(0)
async def run_cleanup_sorter(self):
"""Mirror of run_sorter() but for cleanup operations.
Continuously feeds deletion-ready resources to workers in proper order.
Resources are only deleted after all their dependents are deleted.
"""
loop = asyncio.get_event_loop()
while await loop.run_in_executor(None, self.cleanup_sorter.is_active):
for node in self.cleanup_sorter.get_ready():
resource_type, _id = node
# Verify resource still exists in destination
if _id not in self.config.state.destination[resource_type]:
# Already deleted or doesn't exist
self.config.logger.debug(f"Resource {resource_type}:{_id} already deleted, marking as done")
self.cleanup_sorter.done(node)
continue
# Add to work queue for deletion
await self.worker.work_queue.put(node)
await asyncio.sleep(0)
def get_dependency_graph(self) -> Tuple[Dict[Tuple[str, str], List[Tuple[str, str]]], Set[Tuple[str, str]]]:
"""Build the dependency graph for all resources.
Returns:
Tuple[Dict[Tuple[str, str], List[Tuple[str, str]]], Set[Tuple[str, str]]]: Returns
a tuple of the dependency graph and missing resources.
"""
dependency_graph = {}
missing_resources = set()
for resource_type, _id in self.config.state.get_all_resources(self.config.resources_arg).keys():
deps, missing = self._resource_connections(resource_type, _id)
dependency_graph[(resource_type, _id)] = deps
missing_resources.update(missing)
return dependency_graph, missing_resources
def _resource_connections(self, resource_type: str, _id: str) -> Tuple[Set[Tuple[str, str]], Set[Tuple[str, str]]]:
"""Returns the failed connections and missing resources for a given resource.
Failed connections are all dependencies of given resource that is not in destination state.
Missing resources are all resources that have not been imported yet in source state.
Args:
resource_type (str): Type of the resource
_id (str): Resource id
Returns:
Tuple[Set[Tuple[str, str]], Set[Tuple[str, str]]]: failed_connections, missing_resources
"""
failed_connections = set()
missing_resources = set()
if not self.config.resources[resource_type].resource_config.resource_connections:
return failed_connections, missing_resources
resource = deepcopy(self.config.state.source[resource_type][_id])
if self.config.resources[resource_type].resource_config.resource_connections:
for resource_to_connect, v in self.config.resources[
resource_type
].resource_config.resource_connections.items():
for attr_connection in v:
failed = find_attr(
attr_connection,
resource_to_connect,
resource,
self.config.resources[resource_type].connect_id,
)
if failed:
# After retrieving all of the failed connections, we check if
# the resources are imported. Otherwise append to missing with its type.
for f_id in failed:
if f_id not in self.config.state.source[resource_to_connect]:
missing_resources.add((resource_to_connect, f_id))
failed_connections.add((resource_to_connect, f_id))
return failed_connections, missing_resources
def get_cleanup_dependency_graph(
self, cleanup_resources: Dict[Tuple[str, str], str | None]
) -> Dict[Tuple[str, str], Set[Tuple[str, str]]]:
"""Build REVERSE dependency graph for cleanup.
For deletion, we need to delete dependents BEFORE dependencies.
This inverts the normal creation graph.
Args:
cleanup_resources: Resources to be deleted from destination
Returns:
Dict mapping (resource_type, _id) to set of resources that depend on it
Example:
If Dashboard depends on Monitor:
Creation graph: {("dashboards", "dash-1"): [("monitors", "mon-1")]}
Cleanup graph: {("monitors", "mon-1"): {("dashboards", "dash-1")}}
This means: Monitor "mon-1" can only be deleted AFTER Dashboard "dash-1"
"""
reverse_graph = defaultdict(set)
# Initialize all cleanup resources with empty dependencies
for resource_key in cleanup_resources.keys():
reverse_graph[resource_key] = set()
# Build reverse dependencies by scanning destination state
for resource_type, _id in cleanup_resources.keys():
if resource_type not in self.config.resources:
continue
r_config = self.config.resources[resource_type].resource_config
if not r_config.resource_connections:
continue
# Get the actual resource from destination state (already in memory)
if _id not in self.config.state.destination[resource_type]:
self.config.logger.debug(
f"Resource {resource_type}:{_id} not in destination state, skipping dependency analysis"
)
continue
resource = self.config.state.destination[resource_type][_id]
# For each dependency this resource has
for dep_type, attr_paths in r_config.resource_connections.items():
for attr_path in attr_paths:
# Find dependency IDs in the resource
dep_ids = self._extract_dependency_ids(resource, attr_path, dep_type)
# For each dependency, add THIS resource as a dependent
for dep_id in dep_ids:
dep_key = (dep_type, dep_id)
# Only include if the dependency is also being cleaned up
if dep_key in cleanup_resources:
# This says: "dep_key must be deleted AFTER (resource_type, _id)"
reverse_graph[dep_key].add((resource_type, _id))
self.config.logger.debug(f"Dependency edge: {dep_type}:{dep_id} <- {resource_type}:{_id}")
return dict(reverse_graph)
def _extract_dependency_ids(self, resource: Dict, attr_path: str, dep_type: str) -> Set[str]:
"""Extract dependency IDs from a resource at the given attribute path.
Args:
resource: The resource dict
attr_path: Dot-separated path like "widgets.definition.alert_id"
dep_type: Type of dependency (for context)
Returns:
Set of dependency IDs found
"""
ids = set()
def extract_from_obj(obj, path_parts):
if not obj or not path_parts:
return
current_key = path_parts[0]
remaining = path_parts[1:]
if isinstance(obj, list):
for item in obj:
extract_from_obj(item, path_parts)
elif isinstance(obj, dict):
if current_key in obj:
if not remaining:
# We're at the target attribute
value = obj[current_key]
if value:
# Handle both lists and single values
values = [value] if not isinstance(value, list) else value
for v in values:
if v:
# Parse prefixed IDs (e.g., "dashboard:abc-123" → "abc-123")
parsed_id = self._parse_prefixed_id(str(v), dep_type)
ids.add(parsed_id)
else:
# Keep traversing
extract_from_obj(obj[current_key], remaining)
path_parts = attr_path.split(".")
extract_from_obj(resource, path_parts)
return ids
def _parse_prefixed_id(self, value: str, expected_resource_type: str) -> str:
"""Parse IDs that may have resource type prefixes.
Restriction policies and some other resources use prefixed IDs like
"dashboard:abc-123" or "slo:xyz-789". This method strips the prefix
to get the actual resource ID that matches the state keys.
Examples:
"dashboard:abc-123" with expected_resource_type="dashboards" → "abc-123"
"slo:xyz-789" with expected_resource_type="service_level_objectives" → "xyz-789"
"abc-123" with any type → "abc-123" (passthrough)
Args:
value: The ID value, possibly prefixed
expected_resource_type: The resource type we expect (e.g., "dashboards")
Returns:
The parsed ID without prefix
"""
# Check if value contains a colon (indicates prefix)
if ":" not in value:
return value
# Split on first colon only
prefix, actual_id = value.split(":", 1)
# Map prefix to resource type
prefix_to_resource_type = {
"dashboard": "dashboards",
"slo": "service_level_objectives",
"notebook": "notebooks",
"monitor": "monitors",
"user": "users",
"role": "roles",
"team": "teams",
"security-rule": "security_monitoring_rules",
}
# If prefix matches expected resource type, return just the ID
if prefix_to_resource_type.get(prefix) == expected_resource_type:
return actual_id
# If prefix doesn't match, return original value (might be a false positive)
self.config.logger.debug(
f"ID '{value}' has prefix '{prefix}' but expected resource type '{expected_resource_type}'"
)
return value
def _cleanup_prompt(
config: Configuration, resources_to_cleanup: Dict[Tuple[str, str], str | None], prompt: bool = True
) -> bool:
if config.cleanup == FORCE or not prompt:
return True
elif config.cleanup == TRUE:
for resource_type, _id in resources_to_cleanup:
config.logger.warning(
f"Resource will be deleted: \n" f"{pformat(config.state.destination[resource_type][_id])}",
resource_type=resource_type,
_id=_id,
)
return confirm(f"Delete above {len(resources_to_cleanup)} resources from destination org?")
else:
return False