Skip to content

Commit f23a886

Browse files
authored
Merge branch 'master' into upgrade-iceberg-version
2 parents 79936dc + bb14041 commit f23a886

8 files changed

Lines changed: 797 additions & 8 deletions

File tree

amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ public static void main(String[] args) {
171171
}
172172

173173
public void registAndElect() throws Exception {
174-
haContainer.registAndElect();
174+
haContainer.registerAndElect();
175175
}
176176

177177
public enum HAState {

amoro-ams/src/main/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainer.java

Lines changed: 74 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import org.slf4j.Logger;
2929
import org.slf4j.LoggerFactory;
3030

31+
import java.util.ArrayList;
32+
import java.util.List;
3133
import java.util.UUID;
3234
import java.util.concurrent.CountDownLatch;
3335
import java.util.concurrent.Executors;
@@ -135,6 +137,44 @@ public void waitFollowerShip() throws InterruptedException {
135137
LOG.info("Became the follower of AMS (Database lease)");
136138
}
137139

140+
@Override
141+
public void registerAndElect() throws Exception {
142+
boolean isMasterSlaveMode = serviceConfig.getBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE);
143+
if (!isMasterSlaveMode) {
144+
LOG.debug("Master-slave mode is not enabled, skip node registration");
145+
return;
146+
}
147+
// In master-slave mode, register node to database by writing OPTIMIZING_SERVICE info
148+
// This is similar to ZK mode registering ephemeral nodes
149+
long now = System.currentTimeMillis();
150+
String optimizingInfoJson = JacksonUtil.toJSONString(optimizingServiceServerInfo);
151+
try {
152+
doAsIgnoreError(
153+
HaLeaseMapper.class,
154+
mapper -> {
155+
int updated =
156+
mapper.updateServerInfo(
157+
clusterName, OPTIMIZING_SERVICE, nodeId, nodeIp, optimizingInfoJson, now);
158+
if (updated == 0) {
159+
mapper.insertServerInfoIfAbsent(
160+
clusterName, OPTIMIZING_SERVICE, nodeId, nodeIp, optimizingInfoJson, now);
161+
}
162+
});
163+
LOG.info(
164+
"Registered AMS node to database: nodeId={}, optimizingService={}",
165+
nodeId,
166+
optimizingServiceServerInfo);
167+
} catch (Exception e) {
168+
LOG.error("Failed to register node to database", e);
169+
throw e;
170+
}
171+
}
172+
173+
@Override
174+
public boolean hasLeadership() {
175+
return isLeader.get();
176+
}
177+
138178
/** Closes the heartbeat executor safely. */
139179
@Override
140180
public void close() {
@@ -147,9 +187,6 @@ public void close() {
147187
}
148188
}
149189

150-
@Override
151-
public void registAndElect() throws Exception {}
152-
153190
private class HeartbeatRunnable implements Runnable {
154191
@Override
155192
public void run() {
@@ -304,6 +341,40 @@ private void onLeaderLost() {
304341
}
305342
}
306343

344+
@Override
345+
public List<AmsServerInfo> getAliveNodes() {
346+
List<AmsServerInfo> aliveNodes = new ArrayList<>();
347+
if (!isLeader.get()) {
348+
LOG.warn("Only leader node can get alive nodes list");
349+
return aliveNodes;
350+
}
351+
try {
352+
long currentTime = System.currentTimeMillis();
353+
List<HaLeaseMeta> leases =
354+
getAs(
355+
HaLeaseMapper.class,
356+
mapper -> mapper.selectLeasesByService(clusterName, OPTIMIZING_SERVICE));
357+
for (HaLeaseMeta lease : leases) {
358+
// Only include nodes with valid (non-expired) leases
359+
if (lease.getLeaseExpireTs() != null && lease.getLeaseExpireTs() > currentTime) {
360+
if (lease.getServerInfoJson() != null && !lease.getServerInfoJson().isEmpty()) {
361+
try {
362+
AmsServerInfo nodeInfo =
363+
JacksonUtil.parseObject(lease.getServerInfoJson(), AmsServerInfo.class);
364+
aliveNodes.add(nodeInfo);
365+
} catch (Exception e) {
366+
LOG.warn("Failed to parse server info for node {}", lease.getNodeId(), e);
367+
}
368+
}
369+
}
370+
}
371+
} catch (Exception e) {
372+
LOG.error("Failed to get alive nodes from database", e);
373+
throw e;
374+
}
375+
return aliveNodes;
376+
}
377+
307378
private AmsServerInfo buildServerInfo(String host, int thriftBindPort, int restBindPort) {
308379
AmsServerInfo amsServerInfo = new AmsServerInfo();
309380
amsServerInfo.setHost(host);

amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainer.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@
1818

1919
package org.apache.amoro.server.ha;
2020

21+
import org.apache.amoro.client.AmsServerInfo;
22+
23+
import java.util.List;
24+
2125
/**
2226
* Common interface for high availability (HA) containers.
2327
*
@@ -49,5 +53,19 @@ public interface HighAvailabilityContainer {
4953
*
5054
* @throws Exception If registration fails or participation in the primary election fails.
5155
*/
52-
void registAndElect() throws Exception;
56+
void registerAndElect() throws Exception;
57+
58+
/**
59+
* Used in master-slave mode to obtain information about all currently registered AMS nodes.
60+
*
61+
* @return List<AmsServerInfo>
62+
*/
63+
List<AmsServerInfo> getAliveNodes();
64+
65+
/**
66+
* Used to determine whether the current AMS node is the primary node.
67+
*
68+
* @return true if the current AMS node is the primary node, false otherwise
69+
*/
70+
boolean hasLeadership();
5371
}

amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818

1919
package org.apache.amoro.server.ha;
2020

21+
import org.apache.amoro.client.AmsServerInfo;
2122
import org.slf4j.Logger;
2223
import org.slf4j.LoggerFactory;
2324

25+
import java.util.List;
2426
import java.util.concurrent.CountDownLatch;
2527

2628
/** No-op HA container that never blocks and performs no leader election. */
@@ -48,5 +50,15 @@ public void close() {
4850
}
4951

5052
@Override
51-
public void registAndElect() throws Exception {}
53+
public void registerAndElect() throws Exception {}
54+
55+
@Override
56+
public List<AmsServerInfo> getAliveNodes() {
57+
return List.of();
58+
}
59+
60+
@Override
61+
public boolean hasLeadership() {
62+
return false;
63+
}
5264
}

amoro-ams/src/main/java/org/apache/amoro/server/ha/ZkHighAvailabilityContainer.java

Lines changed: 105 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
import java.io.File;
4747
import java.io.IOException;
4848
import java.nio.charset.StandardCharsets;
49+
import java.util.ArrayList;
50+
import java.util.List;
4951
import java.util.Map;
5052
import java.util.concurrent.CountDownLatch;
5153

@@ -55,13 +57,27 @@ public class ZkHighAvailabilityContainer implements HighAvailabilityContainer, L
5557

5658
private final LeaderLatch leaderLatch;
5759
private final CuratorFramework zkClient;
60+
61+
// Package-private accessors for testing
62+
CuratorFramework getZkClient() {
63+
return zkClient;
64+
}
65+
66+
LeaderLatch getLeaderLatch() {
67+
return leaderLatch;
68+
}
69+
5870
private final String tableServiceMasterPath;
5971
private final String optimizingServiceMasterPath;
72+
private final String nodesPath;
6073
private final AmsServerInfo tableServiceServerInfo;
6174
private final AmsServerInfo optimizingServiceServerInfo;
75+
private final boolean isMasterSlaveMode;
6276
private volatile CountDownLatch followerLatch;
77+
private String registeredNodePath;
6378

6479
public ZkHighAvailabilityContainer(Configurations serviceConfig) throws Exception {
80+
this.isMasterSlaveMode = serviceConfig.getBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE);
6581
if (serviceConfig.getBoolean(AmoroManagementConf.HA_ENABLE)) {
6682
String zkServerAddress = serviceConfig.getString(AmoroManagementConf.HA_ZOOKEEPER_ADDRESS);
6783
int zkSessionTimeout =
@@ -71,6 +87,7 @@ public ZkHighAvailabilityContainer(Configurations serviceConfig) throws Exceptio
7187
String haClusterName = serviceConfig.getString(AmoroManagementConf.HA_CLUSTER_NAME);
7288
tableServiceMasterPath = AmsHAProperties.getTableServiceMasterPath(haClusterName);
7389
optimizingServiceMasterPath = AmsHAProperties.getOptimizingServiceMasterPath(haClusterName);
90+
nodesPath = AmsHAProperties.getNodesPath(haClusterName);
7491
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3, 5000);
7592
setupZookeeperAuth(serviceConfig);
7693
this.zkClient =
@@ -83,6 +100,7 @@ public ZkHighAvailabilityContainer(Configurations serviceConfig) throws Exceptio
83100
zkClient.start();
84101
createPathIfNeeded(tableServiceMasterPath);
85102
createPathIfNeeded(optimizingServiceMasterPath);
103+
createPathIfNeeded(nodesPath);
86104
String leaderPath = AmsHAProperties.getLeaderPath(haClusterName);
87105
createPathIfNeeded(leaderPath);
88106
leaderLatch = new LeaderLatch(zkClient, leaderPath);
@@ -103,8 +121,10 @@ public ZkHighAvailabilityContainer(Configurations serviceConfig) throws Exceptio
103121
zkClient = null;
104122
tableServiceMasterPath = null;
105123
optimizingServiceMasterPath = null;
124+
nodesPath = null;
106125
tableServiceServerInfo = null;
107126
optimizingServiceServerInfo = null;
127+
registeredNodePath = null;
108128
// block follower latch forever when ha is disabled
109129
followerLatch = new CountDownLatch(1);
110130
}
@@ -141,8 +161,25 @@ public void waitLeaderShip() throws Exception {
141161
}
142162

143163
@Override
144-
public void registAndElect() throws Exception {
145-
// TODO Here you can register for AMS and participate in the election.
164+
public void registerAndElect() throws Exception {
165+
if (!isMasterSlaveMode) {
166+
LOG.debug("Master-slave mode is not enabled, skip node registration");
167+
return;
168+
}
169+
if (zkClient == null || nodesPath == null) {
170+
LOG.warn("HA is not enabled, skip node registration");
171+
return;
172+
}
173+
// Register node to ZK using ephemeral node
174+
// The node will be automatically deleted when the session expires
175+
String nodeInfo = JacksonUtil.toJSONString(optimizingServiceServerInfo);
176+
registeredNodePath =
177+
zkClient
178+
.create()
179+
.creatingParentsIfNeeded()
180+
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
181+
.forPath(nodesPath + "/node-", nodeInfo.getBytes(StandardCharsets.UTF_8));
182+
LOG.info("Registered AMS node to ZK: {}", registeredNodePath);
146183
}
147184

148185
@Override
@@ -158,6 +195,18 @@ public void waitFollowerShip() throws Exception {
158195
public void close() {
159196
if (leaderLatch != null) {
160197
try {
198+
// Unregister node from ZK
199+
if (registeredNodePath != null) {
200+
try {
201+
zkClient.delete().forPath(registeredNodePath);
202+
LOG.info("Unregistered AMS node from ZK: {}", registeredNodePath);
203+
} catch (KeeperException.NoNodeException e) {
204+
// Node already deleted, ignore
205+
LOG.debug("Node {} already deleted", registeredNodePath);
206+
} catch (Exception e) {
207+
LOG.warn("Failed to unregister node from ZK: {}", registeredNodePath, e);
208+
}
209+
}
161210
this.leaderLatch.close();
162211
this.zkClient.close();
163212
} catch (IOException e) {
@@ -192,6 +241,60 @@ private AmsServerInfo buildServerInfo(String host, int thriftBindPort, int restB
192241
return amsServerInfo;
193242
}
194243

244+
/**
245+
* Get list of alive nodes. Only the leader node can call this method.
246+
*
247+
* @return List of alive node information
248+
*/
249+
public List<AmsServerInfo> getAliveNodes() {
250+
List<AmsServerInfo> aliveNodes = new ArrayList<>();
251+
if (!isMasterSlaveMode) {
252+
LOG.debug("Master-slave mode is not enabled, return empty node list");
253+
return aliveNodes;
254+
}
255+
if (zkClient == null || nodesPath == null) {
256+
LOG.warn("HA is not enabled, return empty node list");
257+
return aliveNodes;
258+
}
259+
if (!leaderLatch.hasLeadership()) {
260+
LOG.warn("Only leader node can get alive nodes list");
261+
return aliveNodes;
262+
}
263+
try {
264+
List<String> nodePaths = zkClient.getChildren().forPath(nodesPath);
265+
for (String nodePath : nodePaths) {
266+
try {
267+
String fullPath = nodesPath + "/" + nodePath;
268+
byte[] data = zkClient.getData().forPath(fullPath);
269+
if (data != null && data.length > 0) {
270+
String nodeInfoJson = new String(data, StandardCharsets.UTF_8);
271+
AmsServerInfo nodeInfo = JacksonUtil.parseObject(nodeInfoJson, AmsServerInfo.class);
272+
aliveNodes.add(nodeInfo);
273+
}
274+
} catch (Exception e) {
275+
LOG.warn("Failed to get node info for path: {}", nodePath, e);
276+
}
277+
}
278+
} catch (KeeperException.NoNodeException e) {
279+
LOG.debug("Nodes path {} does not exist", nodesPath);
280+
} catch (Exception e) {
281+
throw new RuntimeException(e);
282+
}
283+
return aliveNodes;
284+
}
285+
286+
/**
287+
* Check if current node is the leader.
288+
*
289+
* @return true if current node is the leader, false otherwise
290+
*/
291+
public boolean hasLeadership() {
292+
if (leaderLatch == null) {
293+
return false;
294+
}
295+
return leaderLatch.hasLeadership();
296+
}
297+
195298
private void createPathIfNeeded(String path) throws Exception {
196299
try {
197300
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path);

amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/HaLeaseMapper.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,20 @@ int renewLease(
107107
HaLeaseMeta selectLease(
108108
@Param("clusterName") String clusterName, @Param("serviceName") String serviceName);
109109

110+
/**
111+
* Select all leases for cluster and service.
112+
*
113+
* @param clusterName cluster name
114+
* @param serviceName service name
115+
* @return list of lease rows
116+
*/
117+
@Select(
118+
"SELECT cluster_name, service_name, node_id, node_ip, server_info_json, lease_expire_ts, version, updated_at "
119+
+ "FROM ha_lease WHERE cluster_name = #{clusterName} AND service_name = #{serviceName}")
120+
@ResultMap("HaLeaseMetaMap")
121+
List<HaLeaseMeta> selectLeasesByService(
122+
@Param("clusterName") String clusterName, @Param("serviceName") String serviceName);
123+
110124
/**
111125
* Select current lease for cluster and service.
112126
*

0 commit comments

Comments
 (0)