[AMORO-3988][ams] Automatic restart the optimizers if it is down unexpected#4180
Open
lintingbin wants to merge 4 commits intoapache:masterfrom
Open
[AMORO-3988][ams] Automatic restart the optimizers if it is down unexpected#4180lintingbin wants to merge 4 commits intoapache:masterfrom
lintingbin wants to merge 4 commits intoapache:masterfrom
Conversation
…ectedly Add auto-restart mechanism for optimizers that die unexpectedly. When an optimizer process crashes, the resource record remains in the DB but the optimizer record is removed by heartbeat timeout. This leaves 'orphaned' resources with no active optimizer. Changes: - Add orphaned resource detection in OptimizerGroupKeeper: periodically cross-check resource table against optimizer table to find resources without active optimizers - Restart orphaned optimizer via container.requestResource() - Track retry count per orphaned resource; clean up resource from DB after exceeding configurable max retries - Add configuration options: - optimizer.auto-restart-enabled (default: false) - optimizer.auto-restart-max-retries (default: 5) - Fix ResourceMapper.selectResourcesByGroup result mapping: property names did not match Resource class fields (group->groupName, container->containerName, totalMemory->memoryMb) - Add 3 test cases for auto-restart scenarios
Fix several issues in the auto-restart mechanism: 1. Prevent double provisioning: restartOrphanedOptimizers now returns the total thread count of orphaned resources being restarted, which is subtracted from requiredCores in tryKeeping to avoid duplicate resource allocation. 2. Add grace period for orphaned resource detection: new config optimizer.auto-restart-grace-period (default 5min) prevents misidentifying resources whose optimizer is still starting up (e.g. Flink/Kubernetes) as orphaned. 3. Persist updated properties after restart: add updateResource method to ResourceMapper/ResourceManager/DefaultOptimizerManager so that new job-id and other properties from doScaleOut are persisted to DB after restarting an orphaned resource. 4. Use timestamp-based tracking: replace simple retry counter with OrphanedResourceState that tracks both firstDetectedTime and restartAttempts. After a successful restart, the grace period timer is reset to allow the optimizer time to register. 5. Use InternalResourceContainer interface instead of casting to AbstractOptimizerContainer, with instanceof check for safety. 6. Improve test stability: replace Thread.sleep with polling-based waitUntil helper to reduce flakiness in CI environments.
- Fix grace period to use lastRestartTime instead of firstDetectedTime so retry attempts are rate-limited from the most recent restart, not first detection - Fix rawRequiredCores vs requiredCores: orphaned cores must not be counted as satisfied capacity when resetting minParallelism after max keeping attempts - Fix non-InternalResourceContainer: log warn only once, suppress further cycles via grace period, and do not consume restartAttempts (requires manual cleanup) - Fix TOCTOU: add defensive deleteOptimizer before deleteResource on max retries - Fix ResourceMapper: remove start_time from selectResourcesByGroup (no mapping), and add start_time = CURRENT_TIMESTAMP to updateResource so restarts are visible - Fix test race in testNoRestartWhenOptimizerIsActive: authenticate before createResource to close the orphan detection window - Replace ConcurrentHashMap with HashMap in orphanedResourceStates (single-threaded) - Add docs/configuration/ams-config.md entries for three new config keys - Add comments explaining TOCTOU, grace period semantics, and test rationale Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Search before asking
What type of PR is this?
What does this PR do?
Add an automatic restart mechanism for optimizers that die unexpectedly.
Problem
When an optimizer process crashes, the
OptimizerKeeperdetects the heartbeat timeout and removes the optimizer record from theoptimizertable. However, the corresponding resource record in theresourcetable is not cleaned up, leaving an "orphaned" resource with no active optimizer. Currently there is no mechanism to detect this situation and restart the optimizer.Solution
Extend the existing
OptimizerGroupKeeperto periodically detect orphaned resources and automatically restart the optimizer:OptimizerGroupKeeper.processTask(), cross-check theresourcetable against theoptimizertable to find resources without any active optimizer instances.container.requestResource(resource)to restart the optimizer process.Configuration
Two new configuration options are added (disabled by default):
optimizer.auto-restart-enabledfalseoptimizer.auto-restart-max-retries5Bug fix
Also fixes
ResourceMapper.selectResourcesByGroupresult mapping where property names did not matchResourceclass fields (group→groupName,container→containerName,totalMemory→memoryMb).Checklist
Closes #3988