Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,6 @@ public class AmoroManagementConf {
.defaultValue("")
.withDescription("Optional LDAP bind password used when querying role-mapping groups.");

/** Enable master & slave mode, which supports horizontal scaling of AMS. */
public static final ConfigOption<Boolean> USE_MASTER_SLAVE_MODE =
ConfigOptions.key("use-master-slave-mode")
.booleanType()
.defaultValue(false)
.withDescription(
"This setting controls whether to enable the AMS horizontal scaling feature, "
+ "which is currently under development and testing.");

public static final ConfigOption<Duration> CATALOG_META_CACHE_EXPIRATION_INTERVAL =
ConfigOptions.key("catalog-meta-cache.expiration-interval")
.durationType()
Expand Down Expand Up @@ -340,6 +331,15 @@ public class AmoroManagementConf {
.defaultValue(java.time.Duration.ofSeconds(30))
.withDescription("TTL of HA lease.");

/** Enable master & slave mode, which supports horizontal scaling of AMS. */
public static final ConfigOption<Boolean> HA_USE_MASTER_SLAVE_MODE =
ConfigOptions.key("ha.use-master-slave-mode")
.booleanType()
.defaultValue(false)
.withDescription(
"This setting controls whether to enable the AMS horizontal scaling feature, "
+ "which is currently under development and testing.");

public static final ConfigOption<Integer> HA_BUCKET_ID_TOTAL_COUNT =
ConfigOptions.key("ha.bucket-id.total-count")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.amoro.server;

import static org.apache.amoro.server.AmoroManagementConf.USE_MASTER_SLAVE_MODE;
import static org.apache.amoro.server.AmoroManagementConf.HA_USE_MASTER_SLAVE_MODE;

import io.javalin.Javalin;
import io.javalin.http.Context;
Expand Down Expand Up @@ -373,7 +373,7 @@ public void dispose() {
private void initConfig() throws Exception {
LOG.info("initializing configurations...");
new ConfigurationHelper().init();
IS_MASTER_SLAVE_MODE = serviceConfig.getBoolean(USE_MASTER_SLAVE_MODE);
IS_MASTER_SLAVE_MODE = serviceConfig.getBoolean(HA_USE_MASTER_SLAVE_MODE);
}

public Configurations getServiceConfig() {
Expand Down Expand Up @@ -667,6 +667,12 @@ private void initContainerConfig() {
containerProperties.putIfAbsent(
OptimizerProperties.AMS_OPTIMIZER_URI,
AmsUtil.getAMSThriftAddress(serviceConfig, Constants.THRIFT_OPTIMIZING_SERVICE_NAME));
// When master-slave mode is enabled, automatically inject the flag into container
// properties to ensure the -msm argument is correctly passed when starting optimizers.
if (serviceConfig.getBoolean(AmoroManagementConf.HA_USE_MASTER_SLAVE_MODE)) {
containerProperties.putIfAbsent(
OptimizerProperties.OPTIMIZER_MASTER_SLAVE_MODE_ENABLED, "true");
}
// put addition system properties
container.setProperties(containerProperties);
containerList.add(container);
Expand Down
218 changes: 180 additions & 38 deletions amoro-ams/src/main/java/org/apache/amoro/server/AmsAssignService.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public AmsAssignService(
* Start the assignment service. Only works in master-slave mode and when current node is leader.
*/
public void start() {
if (!serviceConfig.getBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE)) {
if (!serviceConfig.getBoolean(AmoroManagementConf.HA_USE_MASTER_SLAVE_MODE)) {
LOG.info("Master-slave mode is not enabled, skip starting bucket assignment service");
return;
}
Expand Down Expand Up @@ -151,7 +151,20 @@ public void doAssign() {
detectNodeChanges(aliveNodes, currentAssignments, aliveNodeMap, normalized.assignedNodes);

if (!change.needReassign()) {
refreshLastUpdateTime(aliveNodes);
// Even if no node changes, check if current assignments are balanced.
// This handles cases where previous rebalancing was incomplete or
// assignments became uneven due to historical reasons.
if (isUnbalanced(aliveNodes, normalized.assignments)) {
LOG.info(
"No node changes detected, but current assignments are unbalanced. Performing rebalance...");
List<String> allBuckets = generateBucketIds();
Map<AmsServerInfo, List<String>> newAssignments =
buildNewAssignments(aliveNodes, new HashSet<>(), normalized.assignments);
rebalanceExistingAssignments(aliveNodes, allBuckets, newAssignments);
persistAssignments(newAssignments);
} else {
refreshLastUpdateTime(aliveNodes);
}
return;
}

Expand All @@ -175,8 +188,92 @@ public void doAssign() {
}

/**
* Redistribute buckets incrementally to alive nodes using round-robin. This minimizes bucket
* migration by only redistributing buckets from offline nodes.
* Check if the current assignments are unbalanced. Assignments are considered unbalanced if the
* difference between the max and min bucket count across nodes exceeds 1.
*
* @param aliveNodes List of alive nodes
* @param normalizedAssignments Current normalized assignments
* @return true if assignments are unbalanced
*/
private boolean isUnbalanced(
List<AmsServerInfo> aliveNodes, Map<AmsServerInfo, List<String>> normalizedAssignments) {
if (aliveNodes.size() <= 1) {
return false;
}
int maxCount = Integer.MIN_VALUE;
int minCount = Integer.MAX_VALUE;
for (AmsServerInfo node : aliveNodes) {
int count = normalizedAssignments.getOrDefault(node, new ArrayList<>()).size();
maxCount = Math.max(maxCount, count);
minCount = Math.min(minCount, count);
}
// Balanced means the difference between max and min should be at most 1
return (maxCount - minCount) > 1;
}

/**
* Rebalance existing assignments to ensure even distribution across all alive nodes. Moves
* buckets from nodes with too many to nodes with too few, minimizing total migration.
*
* @param aliveNodes All alive nodes
* @param allBuckets All bucket IDs
* @param currentAssignments Current assignments map (will be modified)
*/
private void rebalanceExistingAssignments(
List<AmsServerInfo> aliveNodes,
List<String> allBuckets,
Map<AmsServerInfo, List<String>> currentAssignments) {
int totalBuckets = allBuckets.size();
int totalNodes = aliveNodes.size();
int targetPerNode = totalBuckets / totalNodes;
int remainder = totalBuckets % totalNodes;

// Collect excess buckets from nodes that have too many
List<String> excessBuckets = new ArrayList<>();

// Sort nodes by current bucket count descending, so we take from the most loaded first
List<AmsServerInfo> sortedNodes = new ArrayList<>(aliveNodes);
sortedNodes.sort(
(n1, n2) -> {
int c1 = currentAssignments.getOrDefault(n1, new ArrayList<>()).size();
int c2 = currentAssignments.getOrDefault(n2, new ArrayList<>()).size();
return Integer.compare(c2, c1);
});

// First pass: determine each node's target and collect excess buckets
// The first 'remainder' nodes (sorted by count desc) get targetPerNode + 1,
// the rest get targetPerNode
int remainderSlots = remainder;
for (AmsServerInfo node : sortedNodes) {
int nodeTarget = targetPerNode + (remainderSlots > 0 ? 1 : 0);
List<String> buckets = currentAssignments.getOrDefault(node, new ArrayList<>());
if (buckets.size() > nodeTarget) {
// Remove excess buckets from the end of the list
int excess = buckets.size() - nodeTarget;
for (int i = 0; i < excess; i++) {
excessBuckets.add(buckets.remove(buckets.size() - 1));
}
LOG.info(
"Collected {} excess buckets from node {} (had: {}, target: {})",
excess,
node,
buckets.size() + excess,
nodeTarget);
}
if (remainderSlots > 0) {
remainderSlots--;
}
}

// Second pass: distribute excess buckets to nodes that have fewer than target
if (!excessBuckets.isEmpty()) {
redistributeBucketsIncrementally(aliveNodes, excessBuckets, currentAssignments);
}
}

/**
* Redistribute buckets incrementally to alive nodes, always assigning each bucket to the node
* with the fewest buckets. This ensures even distribution regardless of the current state.
*
* @param aliveNodes List of alive nodes
* @param bucketsToRedistribute Buckets to redistribute (from offline nodes)
Expand All @@ -190,12 +287,18 @@ private void redistributeBucketsIncrementally(
return;
}

// Distribute buckets using round-robin to minimize migration
int nodeIndex = 0;
// Assign each bucket to the node with the fewest buckets to ensure even distribution
for (String bucketId : bucketsToRedistribute) {
AmsServerInfo node = aliveNodes.get(nodeIndex % aliveNodes.size());
currentAssignments.get(node).add(bucketId);
nodeIndex++;
AmsServerInfo minNode = null;
int minCount = Integer.MAX_VALUE;
for (AmsServerInfo node : aliveNodes) {
int count = currentAssignments.get(node).size();
if (count < minCount) {
minCount = count;
minNode = node;
}
}
currentAssignments.get(minNode).add(bucketId);
}
}

Expand All @@ -219,13 +322,24 @@ private void balanceBucketsForNewNodes(
return;
}

// Calculate how many buckets each new node should get
int bucketsPerNewNode = targetBucketsPerNode;
// Determine which alive nodes already have extra buckets (i.e. targetBucketsPerNode + 1)
// so that remainder slots are distributed fairly across all nodes, not just new ones.
int remainderSlotsTaken = 0;
for (AmsServerInfo node : aliveNodes) {
if (!newNodes.contains(node)) {
int count = currentAssignments.getOrDefault(node, new ArrayList<>()).size();
if (count > targetBucketsPerNode) {
remainderSlotsTaken++;
}
}
}

int newNodeIndex = 0;
for (AmsServerInfo newNode : newNodes) {
// First 'remainder' nodes get one extra bucket
int targetForNewNode = bucketsPerNewNode + (newNodeIndex < remainder ? 1 : 0);
int currentCount = currentAssignments.get(newNode).size();
// Assign extra bucket only if there are remaining remainder slots
int targetForNewNode =
targetBucketsPerNode + ((remainderSlotsTaken + newNodeIndex) < remainder ? 1 : 0);
int currentCount = currentAssignments.getOrDefault(newNode, new ArrayList<>()).size();
int needed = targetForNewNode - currentCount;

if (needed > 0) {
Expand Down Expand Up @@ -380,48 +494,76 @@ private NodeChangeResult detectNodeChanges(
Map<AmsServerInfo, List<String>> currentAssignments,
Map<String, AmsServerInfo> aliveNodeMap,
Set<AmsServerInfo> currentAssignedNodes) {
Set<AmsServerInfo> aliveNodeSet = new HashSet<>(aliveNodes);
Set<AmsServerInfo> newNodes = new HashSet<>(aliveNodeSet);
newNodes.removeAll(currentAssignedNodes);

Set<AmsServerInfo> offlineNodes = new HashSet<>();
for (AmsServerInfo storedNode : currentAssignments.keySet()) {
if (!aliveNodeMap.containsKey(getNodeKey(storedNode))) {
offlineNodes.add(storedNode);
// Build nodeKey sets for comparison instead of relying on AmsServerInfo.equals(),
// because ZkBucketAssignStore.parseNodeKey() only sets host and thriftBindPort (restBindPort
// is null), which would cause equals() mismatch with the full AmsServerInfo from
// getAliveNodes().
Set<String> assignedNodeKeys = new HashSet<>();
for (AmsServerInfo node : currentAssignedNodes) {
assignedNodeKeys.add(getNodeKey(node));
}
Set<AmsServerInfo> newNodes = new HashSet<>();
for (AmsServerInfo aliveNode : aliveNodes) {
if (!assignedNodeKeys.contains(getNodeKey(aliveNode))) {
newNodes.add(aliveNode);
}
}

Set<AmsServerInfo> offlineNodes = new HashSet<>();
long currentTime = System.currentTimeMillis();
Set<String> aliveNodeKeys = new HashSet<>();
for (AmsServerInfo node : aliveNodes) {
aliveNodeKeys.add(getNodeKey(node));
}

for (AmsServerInfo node : currentAssignedNodes) {
String nodeKey = getNodeKey(node);
// If the node is currently alive, it should never be considered offline,
// even if its lastUpdateTime is stale (e.g. node restarted within nodeOfflineTimeoutMs
// but lastUpdateTime was not refreshed while it was down).
if (aliveNodeKeys.contains(nodeKey)) {
try {
long lastUpdateTime = assignStore.getLastUpdateTime(node);
if (lastUpdateTime > 0 && (currentTime - lastUpdateTime) > nodeOfflineTimeoutMs) {
for (AmsServerInfo storedNode : currentAssignments.keySet()) {
if (getNodeKey(storedNode).equals(nodeKey)) {
offlineNodes.add(storedNode);
break;
}
}
LOG.warn(
"Node {} is considered offline due to timeout. Last update: {}",
node,
lastUpdateTime);
}
} catch (BucketAssignStoreException e) {
LOG.warn("Failed to get last update time for node {}, treating as offline", node, e);
continue;
}
try {
long lastUpdateTime = assignStore.getLastUpdateTime(node);
boolean shouldMarkOffline;
if (lastUpdateTime <= 0) {
// Missing timestamp means the node's saveAssignments never completed its
// updateLastUpdateTime call (e.g. leader crashed between the two ZK writes)
// or the store data was corrupted. Since the node is already absent from
// the alive list, treat it as offline immediately to avoid stranded buckets.
shouldMarkOffline = true;
LOG.warn(
"Node {} is considered offline (missing last update time, not in alive list)", node);
} else if ((currentTime - lastUpdateTime) > nodeOfflineTimeoutMs) {
shouldMarkOffline = true;
LOG.warn(
"Node {} is considered offline due to timeout. Last update: {}",
node,
lastUpdateTime);
} else {
shouldMarkOffline = false;
LOG.debug(
"Node {} is not in alive list but heartbeat not timeout (last update: {}), waiting for timeout",
node,
lastUpdateTime);
}
if (shouldMarkOffline) {
for (AmsServerInfo storedNode : currentAssignments.keySet()) {
if (getNodeKey(storedNode).equals(nodeKey)) {
offlineNodes.add(storedNode);
break;
}
}
}
} catch (BucketAssignStoreException e) {
LOG.warn("Failed to get last update time for node {}, treating as offline", node, e);
for (AmsServerInfo storedNode : currentAssignments.keySet()) {
if (getNodeKey(storedNode).equals(nodeKey)) {
offlineNodes.add(storedNode);
break;
}
}
}
}
return new NodeChangeResult(newNodes, offlineNodes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ public DefaultOptimizingService(
this.bucketAssignStore = bucketAssignStore;
this.haContainer = haContainer;
this.isMasterSlaveMode =
haContainer != null && serviceConfig.getBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE);
haContainer != null
&& serviceConfig.getBoolean(AmoroManagementConf.HA_USE_MASTER_SLAVE_MODE);
this.tableHandlerChain = new TableRuntimeHandlerImpl();
this.planExecutor =
Executors.newCachedThreadPool(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ public void waitFollowerShip() throws InterruptedException {

@Override
public void registerAndElect() throws Exception {
boolean isMasterSlaveMode = serviceConfig.getBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE);
boolean isMasterSlaveMode =
serviceConfig.getBoolean(AmoroManagementConf.HA_USE_MASTER_SLAVE_MODE);
if (!isMasterSlaveMode) {
LOG.debug("Master-slave mode is not enabled, skip node registration");
return;
Expand Down Expand Up @@ -220,7 +221,8 @@ public void close() {
}
// Remove this node from bucket_assignments so the leader immediately stops seeing it
// as alive without waiting for the heartbeat TTL to expire.
boolean isMasterSlaveMode = serviceConfig.getBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE);
boolean isMasterSlaveMode =
serviceConfig.getBoolean(AmoroManagementConf.HA_USE_MASTER_SLAVE_MODE);
if (isMasterSlaveMode) {
try {
String nodeKey = getNodeKey();
Expand Down
Loading
Loading