-
Notifications
You must be signed in to change notification settings - Fork 24
topological order for deletes #439
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
a657d31
2fbfb02
7887c1d
2801973
181e8ee
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,6 +36,7 @@ class ResourcesHandler: | |
| def __init__(self, config: Configuration) -> None: | ||
| self.config = config | ||
| self.sorter: Optional[TopologicalSorter] = None | ||
| self.cleanup_sorter: Optional[TopologicalSorter] = None | ||
| self.worker: Optional[Workers] = None | ||
| self._dependency_graph = Optional[Dict[Tuple[str, str], List[Tuple[str, str]]]] | ||
|
|
||
|
|
@@ -90,12 +91,63 @@ async def apply_resources(self) -> Tuple[int, int]: | |
| if cleanup_resources: | ||
| cleanup = _cleanup_prompt(self.config, cleanup_resources) | ||
| if cleanup: | ||
| self.config.logger.info("cleaning up resources...") | ||
| await self.worker.init_workers(self._cleanup_worker, None, None) | ||
| for i in cleanup_resources: | ||
| self.worker.work_queue.put_nowait(i) | ||
| await self.worker.schedule_workers() | ||
| self.config.logger.info("finished cleaning up resources") | ||
| self.config.logger.info("cleaning up resources with dependency ordering...") | ||
|
|
||
| # Build reverse dependency graph for ordered cleanup | ||
| try: | ||
| cleanup_graph = self.get_cleanup_dependency_graph(cleanup_resources) | ||
| self.config.logger.info(f"Built cleanup dependency graph with {len(cleanup_graph)} resources") | ||
|
|
||
| # Detect circular dependencies | ||
| from datadog_sync.utils.resource_utils import detect_circular_dependencies | ||
|
|
||
| cycle = detect_circular_dependencies(cleanup_graph) | ||
| if cycle: | ||
| # Circular dependency detected! | ||
| cycle_str = " -> ".join([f"{rt}:{rid}" for rt, rid in cycle]) | ||
| self.config.logger.error(f"Circular dependency detected in cleanup graph: {cycle_str}") | ||
| self.config.logger.error( | ||
| "Cannot safely delete resources. Please manually break circular references first." | ||
| ) | ||
| raise ValueError( | ||
| f"Circular dependency in cleanup graph: {cycle_str}. " | ||
| "Manual intervention required to break the cycle." | ||
| ) | ||
|
|
||
| # Initialize cleanup sorter | ||
| self.cleanup_sorter = init_topological_sorter(cleanup_graph) | ||
|
|
||
| # Initialize workers with cleanup sorter stop condition | ||
| await self.worker.init_workers( | ||
| self._cleanup_worker, lambda: not self.cleanup_sorter.is_active(), None | ||
| ) | ||
|
|
||
| # Run cleanup with ordering | ||
| if self.config.show_progress_bar: | ||
| await self.worker.schedule_workers_with_pbar( | ||
| total=len(cleanup_graph), additional_coros=[self.run_cleanup_sorter()] | ||
| ) | ||
| else: | ||
| await self.worker.schedule_workers(additional_coros=[self.run_cleanup_sorter()]) | ||
|
|
||
| self.config.logger.info("finished cleaning up resources") | ||
|
|
||
| except ValueError: | ||
| # Circular dependency - already logged, re-raise | ||
| raise | ||
| except Exception as e: | ||
| # Unexpected error building or running cleanup graph | ||
| self.config.logger.error(f"Error during ordered cleanup: {str(e)}") | ||
| self.config.logger.warning( | ||
| "Falling back to unordered cleanup (may fail due to dependency issues)" | ||
| ) | ||
|
|
||
| # Fallback to old unordered behavior | ||
| await self.worker.init_workers(self._cleanup_worker, None, None) | ||
| for i in cleanup_resources: | ||
| self.worker.work_queue.put_nowait(i) | ||
| await self.worker.schedule_workers() | ||
| self.config.logger.info("finished cleaning up resources (unordered fallback)") | ||
|
|
||
| # Run pre-apply hooks | ||
| resource_types = set(i[0] for i in self._dependency_graph) | ||
|
|
@@ -349,6 +401,10 @@ async def _cleanup_worker(self, q_item: List) -> None: | |
| await r_class._send_action_metrics("delete", _id, Status.FAILURE.value) | ||
| self.config.logger.error(f"error deleting resource {resource_type} with id {_id}: {str(e)}") | ||
| finally: | ||
| # Mark as done in cleanup sorter if it exists | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to test if it was actually deleted successfully first?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ideally yes, but I think that's a bigger restructure. Calling |
||
| if hasattr(self, "cleanup_sorter") and self.cleanup_sorter: | ||
| self.cleanup_sorter.done(q_item) | ||
|
|
||
| if not r_class.resource_config.concurrent: | ||
| r_class.resource_config.async_lock.release() | ||
|
|
||
|
|
@@ -370,6 +426,30 @@ async def run_sorter(self): | |
| await self.worker.work_queue.put(node) | ||
| await asyncio.sleep(0) | ||
|
|
||
| async def run_cleanup_sorter(self): | ||
| """Mirror of run_sorter() but for cleanup operations. | ||
|
|
||
| Continuously feeds deletion-ready resources to workers in proper order. | ||
| Resources are only deleted after all their dependents are deleted. | ||
| """ | ||
| loop = asyncio.get_event_loop() | ||
|
|
||
| while await loop.run_in_executor(None, self.cleanup_sorter.is_active): | ||
| for node in self.cleanup_sorter.get_ready(): | ||
| resource_type, _id = node | ||
|
|
||
| # Verify resource still exists in destination | ||
| if _id not in self.config.state.destination[resource_type]: | ||
| # Already deleted or doesn't exist | ||
| self.config.logger.debug(f"Resource {resource_type}:{_id} already deleted, marking as done") | ||
| self.cleanup_sorter.done(node) | ||
| continue | ||
|
|
||
| # Add to work queue for deletion | ||
| await self.worker.work_queue.put(node) | ||
|
|
||
| await asyncio.sleep(0) | ||
|
|
||
| def get_dependency_graph(self) -> Tuple[Dict[Tuple[str, str], List[Tuple[str, str]]], Set[Tuple[str, str]]]: | ||
| """Build the dependency graph for all resources. | ||
|
|
||
|
|
@@ -428,6 +508,160 @@ def _resource_connections(self, resource_type: str, _id: str) -> Tuple[Set[Tuple | |
|
|
||
| return failed_connections, missing_resources | ||
|
|
||
| def get_cleanup_dependency_graph( | ||
| self, cleanup_resources: Dict[Tuple[str, str], str | None] | ||
| ) -> Dict[Tuple[str, str], Set[Tuple[str, str]]]: | ||
| """Build REVERSE dependency graph for cleanup. | ||
|
|
||
| For deletion, we need to delete dependents BEFORE dependencies. | ||
| This inverts the normal creation graph. | ||
|
|
||
| Args: | ||
| cleanup_resources: Resources to be deleted from destination | ||
|
|
||
| Returns: | ||
| Dict mapping (resource_type, _id) to set of resources that depend on it | ||
|
|
||
| Example: | ||
| If Dashboard depends on Monitor: | ||
| Creation graph: {("dashboards", "dash-1"): [("monitors", "mon-1")]} | ||
| Cleanup graph: {("monitors", "mon-1"): {("dashboards", "dash-1")}} | ||
|
|
||
| This means: Monitor "mon-1" can only be deleted AFTER Dashboard "dash-1" | ||
| """ | ||
| reverse_graph = defaultdict(set) | ||
|
|
||
| # Initialize all cleanup resources with empty dependencies | ||
| for resource_key in cleanup_resources.keys(): | ||
| reverse_graph[resource_key] = set() | ||
|
|
||
| # Build reverse dependencies by scanning destination state | ||
| for resource_type, _id in cleanup_resources.keys(): | ||
| if resource_type not in self.config.resources: | ||
| continue | ||
|
|
||
| r_config = self.config.resources[resource_type].resource_config | ||
| if not r_config.resource_connections: | ||
| continue | ||
|
|
||
| # Get the actual resource from destination state (already in memory) | ||
| if _id not in self.config.state.destination[resource_type]: | ||
| self.config.logger.debug( | ||
| f"Resource {resource_type}:{_id} not in destination state, skipping dependency analysis" | ||
| ) | ||
| continue | ||
|
|
||
| resource = self.config.state.destination[resource_type][_id] | ||
|
|
||
| # For each dependency this resource has | ||
| for dep_type, attr_paths in r_config.resource_connections.items(): | ||
| for attr_path in attr_paths: | ||
| # Find dependency IDs in the resource | ||
| dep_ids = self._extract_dependency_ids(resource, attr_path, dep_type) | ||
|
|
||
| # For each dependency, add THIS resource as a dependent | ||
| for dep_id in dep_ids: | ||
| dep_key = (dep_type, dep_id) | ||
| # Only include if the dependency is also being cleaned up | ||
| if dep_key in cleanup_resources: | ||
| # This says: "dep_key must be deleted AFTER (resource_type, _id)" | ||
| reverse_graph[dep_key].add((resource_type, _id)) | ||
| self.config.logger.debug(f"Dependency edge: {dep_type}:{dep_id} <- {resource_type}:{_id}") | ||
|
|
||
| return dict(reverse_graph) | ||
|
|
||
| def _extract_dependency_ids(self, resource: Dict, attr_path: str, dep_type: str) -> Set[str]: | ||
| """Extract dependency IDs from a resource at the given attribute path. | ||
|
|
||
| Args: | ||
| resource: The resource dict | ||
| attr_path: Dot-separated path like "widgets.definition.alert_id" | ||
| dep_type: Type of dependency (for context) | ||
|
|
||
| Returns: | ||
| Set of dependency IDs found | ||
| """ | ||
| ids = set() | ||
|
|
||
| def extract_from_obj(obj, path_parts): | ||
| if not obj or not path_parts: | ||
| return | ||
|
|
||
| current_key = path_parts[0] | ||
| remaining = path_parts[1:] | ||
|
|
||
| if isinstance(obj, list): | ||
| for item in obj: | ||
| extract_from_obj(item, path_parts) | ||
| elif isinstance(obj, dict): | ||
| if current_key in obj: | ||
| if not remaining: | ||
| # We're at the target attribute | ||
| value = obj[current_key] | ||
| if value: | ||
| # Handle both lists and single values | ||
| values = [value] if not isinstance(value, list) else value | ||
| for v in values: | ||
| if v: | ||
| # Parse prefixed IDs (e.g., "dashboard:abc-123" → "abc-123") | ||
| parsed_id = self._parse_prefixed_id(str(v), dep_type) | ||
| ids.add(parsed_id) | ||
| else: | ||
| # Keep traversing | ||
| extract_from_obj(obj[current_key], remaining) | ||
|
|
||
| path_parts = attr_path.split(".") | ||
| extract_from_obj(resource, path_parts) | ||
| return ids | ||
|
|
||
| def _parse_prefixed_id(self, value: str, expected_resource_type: str) -> str: | ||
| """Parse IDs that may have resource type prefixes. | ||
|
|
||
| Restriction policies and some other resources use prefixed IDs like | ||
| "dashboard:abc-123" or "slo:xyz-789". This method strips the prefix | ||
| to get the actual resource ID that matches the state keys. | ||
|
|
||
| Examples: | ||
| "dashboard:abc-123" with expected_resource_type="dashboards" → "abc-123" | ||
| "slo:xyz-789" with expected_resource_type="service_level_objectives" → "xyz-789" | ||
| "abc-123" with any type → "abc-123" (passthrough) | ||
|
|
||
| Args: | ||
| value: The ID value, possibly prefixed | ||
| expected_resource_type: The resource type we expect (e.g., "dashboards") | ||
|
|
||
| Returns: | ||
| The parsed ID without prefix | ||
| """ | ||
| # Check if value contains a colon (indicates prefix) | ||
| if ":" not in value: | ||
| return value | ||
|
|
||
| # Split on first colon only | ||
| prefix, actual_id = value.split(":", 1) | ||
|
|
||
| # Map prefix to resource type | ||
| prefix_to_resource_type = { | ||
| "dashboard": "dashboards", | ||
| "slo": "service_level_objectives", | ||
| "notebook": "notebooks", | ||
| "monitor": "monitors", | ||
| "user": "users", | ||
| "role": "roles", | ||
| "team": "teams", | ||
| "security-rule": "security_monitoring_rules", | ||
| } | ||
|
|
||
| # If prefix matches expected resource type, return just the ID | ||
| if prefix_to_resource_type.get(prefix) == expected_resource_type: | ||
| return actual_id | ||
|
|
||
| # If prefix doesn't match, return original value (might be a false positive) | ||
| self.config.logger.debug( | ||
| f"ID '{value}' has prefix '{prefix}' but expected resource type '{expected_resource_type}'" | ||
| ) | ||
| return value | ||
|
|
||
|
|
||
| def _cleanup_prompt( | ||
| config: Configuration, resources_to_cleanup: Dict[Tuple[str, str], str | None], prompt: bool = True | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like an overly broad catch, is there a more specific error we can catch here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call, fixed to pull in specifically the CycleError