Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,34 @@ public class AmoroManagementConf {
.defaultValue(Duration.ofSeconds(3))
.withDescription("Optimizer polling task timeout.");

public static final ConfigOption<Boolean> OPTIMIZER_AUTO_RESTART_ENABLED =
ConfigOptions.key("optimizer.auto-restart-enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to automatically restart optimizers that are unexpectedly down. "
+ "When enabled, the system periodically checks for orphaned resources "
+ "(resources in the resource table without corresponding optimizer instances) "
+ "and restarts the optimizer for those resources.");

public static final ConfigOption<Integer> OPTIMIZER_AUTO_RESTART_MAX_RETRIES =
ConfigOptions.key("optimizer.auto-restart-max-retries")
.intType()
.defaultValue(5)
.withDescription(
"Maximum number of restart attempts for a single orphaned resource "
+ "(counted per AMS lifecycle). "
+ "When exceeded, the orphaned resource will be cleaned up instead of retried.");

public static final ConfigOption<Duration> OPTIMIZER_AUTO_RESTART_GRACE_PERIOD =
ConfigOptions.key("optimizer.auto-restart-grace-period")
.durationType()
.defaultValue(Duration.ofMinutes(5))
.withDescription(
"Grace period after an orphaned resource is first detected before it is considered "
+ "for restart. This avoids restarting resources whose optimizer process is still "
+ "starting up (e.g. Flink/Kubernetes containers).");

public static final ConfigOption<Duration> OPTIMIZER_GROUP_MIN_PARALLELISM_CHECK_INTERVAL =
ConfigOptions.key("optimizer-group.min-parallelism-check-interval")
.durationType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@
import org.apache.amoro.exception.IllegalTaskStateException;
import org.apache.amoro.exception.ObjectNotExistsException;
import org.apache.amoro.exception.PluginRetryAuthException;
import org.apache.amoro.resource.InternalResourceContainer;
import org.apache.amoro.resource.Resource;
import org.apache.amoro.resource.ResourceContainer;
import org.apache.amoro.resource.ResourceGroup;
import org.apache.amoro.resource.ResourceType;
import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.dashboard.model.OptimizerResourceInfo;
import org.apache.amoro.server.ha.HighAvailabilityContainer;
import org.apache.amoro.server.manager.AbstractOptimizerContainer;
import org.apache.amoro.server.optimizing.OptimizingProcess;
import org.apache.amoro.server.optimizing.OptimizingQueue;
import org.apache.amoro.server.optimizing.OptimizingStatus;
Expand Down Expand Up @@ -109,6 +109,9 @@ public class DefaultOptimizingService extends StatedPersistentBase
private final long pollingTimeout;
private final boolean breakQuotaLimit;
private final long refreshGroupInterval;
private final boolean autoRestartEnabled;
private final int autoRestartMaxRetries;
private final long autoRestartGracePeriodMs;
private final Map<String, OptimizingQueue> optimizingQueueByGroup = new ConcurrentHashMap<>();
private final Map<String, OptimizingQueue> optimizingQueueByToken = new ConcurrentHashMap<>();
private final Map<String, OptimizerInstance> authOptimizers = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -151,6 +154,12 @@ public DefaultOptimizingService(
AmoroManagementConf.OPTIMIZER_GROUP_MIN_PARALLELISM_CHECK_INTERVAL);
this.groupMaxKeepingAttempts =
serviceConfig.getInteger(AmoroManagementConf.OPTIMIZER_GROUP_MAX_KEEPING_ATTEMPTS);
this.autoRestartEnabled =
serviceConfig.getBoolean(AmoroManagementConf.OPTIMIZER_AUTO_RESTART_ENABLED);
this.autoRestartMaxRetries =
serviceConfig.getInteger(AmoroManagementConf.OPTIMIZER_AUTO_RESTART_MAX_RETRIES);
this.autoRestartGracePeriodMs =
serviceConfig.getDurationInMillis(AmoroManagementConf.OPTIMIZER_AUTO_RESTART_GRACE_PERIOD);
this.tableService = tableService;
this.catalogManager = catalogManager;
this.optimizerManager = optimizerManager;
Expand Down Expand Up @@ -888,6 +897,15 @@ public int getAttempts() {
*/
private class OptimizerGroupKeeper extends AbstractKeeper<OptimizerGroupKeepingTask> {

/**
* Tracks orphaned resource state. Key is resourceId. Value records the timestamp when the
* resource was first detected as orphaned and the number of restart attempts so far. Entries
* are removed when the optimizer successfully registers or when the resource is cleaned up.
*
* <p>Accessed only from the single keeper thread — no synchronization needed.
*/
private final Map<String, OrphanedResourceState> orphanedResourceStates = new HashMap<>();

public OptimizerGroupKeeper(String threadName) {
super(threadName);
}
Expand All @@ -910,7 +928,21 @@ protected void processTask(OptimizerGroupKeepingTask keepingTask) {
return;
}

int requiredCores = keepingTask.tryKeeping(resourceGroup);
// Check and restart orphaned resources if auto-restart is enabled
int orphanedCores = 0;
if (autoRestartEnabled) {
orphanedCores = restartOrphanedOptimizers(resourceGroup);
}

// rawRequiredCores = minParallelism - currently active optimizer cores.
// Used for minParallelism reset so that orphaned cores (whose optimizer processes are
// dead) are NOT counted as "satisfied" capacity.
int rawRequiredCores = keepingTask.tryKeeping(resourceGroup);

// Subtract orphaned cores already being restarted to avoid double-provisioning.
// Clamp to 0: orphanedCores can exceed rawRequiredCores when orphaned resources cover
// more than the group's parallelism deficit.
int requiredCores = Math.max(0, rawRequiredCores - orphanedCores);
if (requiredCores <= 0) {
LOG.debug(
"The Resource Group:{} has sufficient resources, keep it", resourceGroup.getName());
Expand All @@ -925,12 +957,12 @@ protected void processTask(OptimizerGroupKeepingTask keepingTask) {
resourceGroup.getName(),
keepingTask.getAttempts(),
minParallelism,
minParallelism - requiredCores);
minParallelism - rawRequiredCores);
resourceGroup
.getProperties()
.put(
OptimizerProperties.OPTIMIZER_GROUP_MIN_PARALLELISM,
String.valueOf(minParallelism - requiredCores));
String.valueOf(minParallelism - rawRequiredCores));
updateResourceGroup(resourceGroup);
optimizerManager.updateResourceGroup(resourceGroup);
keepInTouch(resourceGroup.getName(), 1);
Expand All @@ -945,7 +977,7 @@ protected void processTask(OptimizerGroupKeepingTask keepingTask) {
.build();
ResourceContainer rc = Containers.get(resource.getContainerName());
try {
((AbstractOptimizerContainer) rc).requestResource(resource);
((InternalResourceContainer) rc).requestResource(resource);
optimizerManager.createResource(resource);
} finally {
keepInTouch(resourceGroup.getName(), keepingTask.getAttempts() + 1);
Expand All @@ -955,5 +987,184 @@ protected void processTask(OptimizerGroupKeepingTask keepingTask) {
resourceGroup.getName(),
requiredCores);
}

/**
* Detect and restart orphaned resources for a resource group. An orphaned resource is one that
* exists in the resource table but has no corresponding optimizer instance in the optimizer
* table, indicating the optimizer process died unexpectedly.
*
* <p>A grace period ({@link AmoroManagementConf#OPTIMIZER_AUTO_RESTART_GRACE_PERIOD}) is
* applied before an orphaned resource is considered for restart, to avoid interfering with
* resources whose optimizer processes are still starting up (e.g. Flink/Kubernetes).
*
* <p>For each orphaned resource past the grace period:
*
* <ul>
* <li>If retries &lt; max retries: attempt to restart the optimizer via the container
* <li>If retries &gt;= max retries: clean up the orphaned resource from the resource table
* </ul>
*
* @return the total thread count of orphaned resources that are pending restart (either in
* grace period or actively being restarted), so that {@code tryKeeping} can subtract this
* from {@code requiredCores} to avoid double provisioning.
*/
private int restartOrphanedOptimizers(ResourceGroup resourceGroup) {
int orphanedThreadCount = 0;
try {
String groupName = resourceGroup.getName();
List<Resource> resources = optimizerManager.listResourcesByGroup(groupName);
if (resources.isEmpty()) {
return 0;
}

// Collect all optimizer resourceIds in this group.
// Note: these two DB queries are not in the same transaction. An optimizer could
// register between the two calls, making it appear orphaned. The grace period
// is intentionally designed to absorb such transient windows.
// TODO: consider replacing this DB query with authOptimizers (in-memory map) to
// avoid a full table scan per keeper cycle when many groups are active.
List<OptimizerInstance> optimizers = optimizerManager.listOptimizers(groupName);
Set<String> activeResourceIds =
optimizers.stream()
.map(OptimizerInstance::getResourceId)
.filter(Objects::nonNull)
.collect(Collectors.toSet());

// Find orphaned resources (resources without any active optimizer)
List<Resource> orphanedResources =
resources.stream()
.filter(r -> !activeResourceIds.contains(r.getResourceId()))
.collect(Collectors.toList());

// Clean up tracking for resources that are no longer orphaned
orphanedResourceStates
.keySet()
.removeIf(
resourceId ->
resources.stream().noneMatch(r -> resourceId.equals(r.getResourceId()))
|| activeResourceIds.contains(resourceId));

long now = System.currentTimeMillis();
for (Resource orphanedResource : orphanedResources) {
String resourceId = orphanedResource.getResourceId();
OrphanedResourceState state =
orphanedResourceStates.computeIfAbsent(
resourceId, k -> new OrphanedResourceState(now));

// Grace period: skip restart if the resource was recently first detected or last
// restarted. This avoids false-positive restarts for newly started optimizers and
// rate-limits retry attempts after a restart.
long gracePeriodStart =
state.lastRestartTime >= 0 ? state.lastRestartTime : state.firstDetectedTime;
if (now - gracePeriodStart < autoRestartGracePeriodMs) {
LOG.debug(
"Orphaned resource {} in group {} is within grace period ({} ms remaining), "
+ "skipping restart",
resourceId,
groupName,
autoRestartGracePeriodMs - (now - gracePeriodStart));
orphanedThreadCount += orphanedResource.getThreadCount();
continue;
}

if (state.restartAttempts >= autoRestartMaxRetries) {
LOG.warn(
"Orphaned resource {} in group {} has exceeded max restart retries ({}), "
+ "cleaning up the resource",
resourceId,
groupName,
autoRestartMaxRetries);
try {
ResourceContainer rc = Containers.get(orphanedResource.getContainerName());
if (rc instanceof InternalResourceContainer) {
((InternalResourceContainer) rc).releaseResource(orphanedResource);
}
} catch (Throwable t) {
LOG.warn(
"Failed to release orphaned resource {} via container, "
+ "will still remove from DB",
resourceId,
t);
}
// Defensively remove any optimizer that may have registered for this resourceId
// in the window between our two DB queries (TOCTOU). deleteOptimizer is idempotent.
optimizerManager.deleteOptimizer(groupName, resourceId);
optimizerManager.deleteResource(resourceId);
orphanedResourceStates.remove(resourceId);
} else {
LOG.info(
"Detected orphaned resource {} in group {}, attempting restart (attempt {}/{})",
resourceId,
groupName,
state.restartAttempts + 1,
autoRestartMaxRetries);
try {
ResourceContainer rc = Containers.get(orphanedResource.getContainerName());
if (rc instanceof InternalResourceContainer) {
((InternalResourceContainer) rc).requestResource(orphanedResource);
} else {
// Auto-restart is not supported for this container type. Log once to alert
// operators, then suppress further warnings via lastRestartTime so the
// grace period silences subsequent cycles. Do NOT consume restartAttempts —
// the resource requires manual intervention, not automatic cleanup.
if (state.lastRestartTime < 0) {
LOG.warn(
"Container {} for orphaned resource {} is not an InternalResourceContainer,"
+ " auto-restart is not supported. Manual intervention required.",
orphanedResource.getContainerName(),
resourceId);
state.lastRestartTime = now;
}
orphanedThreadCount += orphanedResource.getThreadCount();
continue;
}
// Persist updated properties (e.g. new job-id from doScaleOut)
optimizerManager.updateResource(orphanedResource);
state.restartAttempts++;
// Record restart time so the next cycle waits a grace period before retrying
state.lastRestartTime = now;
orphanedThreadCount += orphanedResource.getThreadCount();
} catch (Throwable t) {
LOG.error(
"Failed to restart orphaned resource {} in group {}, attempt {}/{}",
resourceId,
groupName,
state.restartAttempts + 1,
autoRestartMaxRetries,
t);
state.restartAttempts++;
// Record restart time so failed attempts also respect the grace period
state.lastRestartTime = now;
// Intentionally do NOT add to orphanedThreadCount on failure: the restart
// request itself failed, so no optimizer process was launched. Allowing
// tryKeeping() to provision a new resource provides fast recovery while
// the orphaned resource is retried in the background. This may temporarily
// result in both the orphaned record and a new resource coexisting in the DB,
// but the orphan will eventually be cleaned up after max retries.
}
}
}
} catch (Throwable t) {
LOG.error("Failed to check orphaned resources for group {}", resourceGroup.getName(), t);
}
return orphanedThreadCount;
}
}

/** Tracks the state of an orphaned resource for auto-restart. */
private static class OrphanedResourceState {
final long firstDetectedTime;
/**
* Timestamp of the most recent restart attempt (-1 if no restart has been attempted yet). Used
* to enforce the grace period between consecutive restart attempts.
*/
long lastRestartTime = -1;

int restartAttempts;

OrphanedResourceState(long firstDetectedTime) {
this.firstDetectedTime = firstDetectedTime;
this.restartAttempts = 0;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,14 @@ public interface ResourceMapper {
ResourceGroup selectResourceGroup(@Param("resourceGroup") String groupName);

@Select(
"SELECT resource_id, group_name, container_name, start_time, thread_count, total_memory, properties"
"SELECT resource_id, group_name, container_name, thread_count, total_memory, properties"
+ " FROM resource WHERE group_name = #{resourceGroup}")
@Results({
@Result(property = "resourceId", column = "resource_id"),
@Result(property = "group", column = "group_name"),
@Result(property = "container", column = "container_name"),
@Result(property = "startTime", column = "start_time", typeHandler = Long2TsConverter.class),
@Result(property = "groupName", column = "group_name"),
@Result(property = "containerName", column = "container_name"),
@Result(property = "threadCount", column = "thread_count"),
@Result(property = "totalMemory", column = "total_memory"),
@Result(property = "memoryMb", column = "total_memory"),
@Result(property = "properties", column = "properties", typeHandler = Map2StringConverter.class)
})
List<Resource> selectResourcesByGroup(@Param("resourceGroup") String groupName);
Expand Down Expand Up @@ -95,6 +94,13 @@ public interface ResourceMapper {
+ " #{resource.properties, typeHandler=org.apache.amoro.server.persistence.converter.JsonObjectConverter})")
void insertResource(@Param("resource") Resource resource);

@Update(
"UPDATE resource SET thread_count = #{resource.threadCount}, total_memory = #{resource.memoryMb},"
+ " start_time = CURRENT_TIMESTAMP,"
+ " properties = #{resource.properties, typeHandler=org.apache.amoro.server.persistence.converter.JsonObjectConverter}"
+ " WHERE resource_id = #{resource.resourceId}")
void updateResource(@Param("resource") Resource resource);

@Delete("DELETE FROM resource WHERE resource_id = #{resourceId}")
void deleteResource(@Param("resourceId") String resourceId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ public void createResource(Resource resource) {
doAs(ResourceMapper.class, mapper -> mapper.insertResource(resource));
}

@Override
public void updateResource(Resource resource) {
doAs(ResourceMapper.class, mapper -> mapper.updateResource(resource));
}

@Override
public void deleteResource(String resourceId) {
doAs(ResourceMapper.class, mapper -> mapper.deleteResource(resourceId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ public static void initTableService() {
configurations.set(
AmoroManagementConf.OPTIMIZER_GROUP_MIN_PARALLELISM_CHECK_INTERVAL,
Duration.ofMillis(10L));
// Enable auto-restart with 0ms grace period so that TestOptimizerGroupKeeper's orphan
// detection tests don't need to wait for the default 5-minute grace period.
// Other subclasses are unaffected: auto-restart only fires when orphaned resources
// (resources without an active optimizer) are present, which normal tests avoid.
configurations.set(AmoroManagementConf.OPTIMIZER_AUTO_RESTART_ENABLED, true);
configurations.set(
AmoroManagementConf.OPTIMIZER_AUTO_RESTART_GRACE_PERIOD, Duration.ofMillis(0L));
TABLE_SERVICE =
new DefaultTableService(new Configurations(), CATALOG_MANAGER, runtimeFactory);
OPTIMIZING_SERVICE =
Expand Down
Loading
Loading