Skip to content

Commit e04bac7

Browse files
authored
Merge branch 'master' into lance-format
2 parents 34691d9 + ba7f927 commit e04bac7

8 files changed

Lines changed: 63 additions & 11 deletions

File tree

amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,15 @@ public class AmoroManagementConf {
5353
.defaultValue("admin")
5454
.withDescription("The administrator password");
5555

56+
/** Enable master & slave mode, which supports horizontal scaling of AMS. */
57+
public static final ConfigOption<Boolean> USE_MASTER_SLAVE_MODE =
58+
ConfigOptions.key("use-master-slave-mode")
59+
.booleanType()
60+
.defaultValue(false)
61+
.withDescription(
62+
"This setting controls whether to enable the AMS horizontal scaling feature, "
63+
+ "which is currently under development and testing.");
64+
5665
public static final ConfigOption<Duration> CATALOG_META_CACHE_EXPIRATION_INTERVAL =
5766
ConfigOptions.key("catalog-meta-cache.expiration-interval")
5867
.durationType()

amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package org.apache.amoro.server;
2020

21+
import static org.apache.amoro.server.AmoroManagementConf.USE_MASTER_SLAVE_MODE;
22+
2123
import io.javalin.Javalin;
2224
import io.javalin.http.HttpCode;
2325
import io.javalin.http.staticfiles.Location;
@@ -99,6 +101,7 @@ public class AmoroServiceContainer {
99101
public static final Logger LOG = LoggerFactory.getLogger(AmoroServiceContainer.class);
100102

101103
public static final String SERVER_CONFIG_FILENAME = "config.yaml";
104+
private static boolean IS_MASTER_SLAVE_MODE = false;
102105

103106
private final HighAvailabilityContainer haContainer;
104107
private DataSource dataSource;
@@ -133,17 +136,24 @@ public static void main(String[] args) {
133136
LOG.info("AMS service has been shut down");
134137
}));
135138
service.startRestServices();
136-
while (true) {
137-
try {
138-
// Used to block AMS instances that have not acquired leadership
139-
service.waitLeaderShip();
140-
service.transitionToLeader();
141-
// Used to block AMS instances that have acquired leadership
142-
service.waitFollowerShip();
143-
} catch (Exception e) {
144-
LOG.error("AMS start error", e);
145-
} finally {
146-
service.transitionToFollower();
139+
if (IS_MASTER_SLAVE_MODE) {
140+
// Even if one does not become the master, it cannot block the subsequent logic.
141+
service.registAndElect();
142+
// Regardless of whether tp becomes the master, the service needs to be activated.
143+
service.startOptimizingService();
144+
} else {
145+
while (true) {
146+
try {
147+
// Used to block AMS instances that have not acquired leadership
148+
service.waitLeaderShip();
149+
service.transitionToLeader();
150+
// Used to block AMS instances that have acquired leadership
151+
service.waitFollowerShip();
152+
} catch (Exception e) {
153+
LOG.error("AMS start error", e);
154+
} finally {
155+
service.transitionToFollower();
156+
}
147157
}
148158
}
149159
} catch (Throwable t) {
@@ -152,6 +162,10 @@ public static void main(String[] args) {
152162
}
153163
}
154164

165+
public void registAndElect() throws Exception {
166+
haContainer.registAndElect();
167+
}
168+
155169
public enum HAState {
156170
INITIALIZING(0),
157171
FOLLOWER(1),
@@ -306,6 +320,7 @@ public void dispose() {
306320
private void initConfig() throws Exception {
307321
LOG.info("initializing configurations...");
308322
new ConfigurationHelper().init();
323+
IS_MASTER_SLAVE_MODE = serviceConfig.getBoolean(USE_MASTER_SLAVE_MODE);
309324
}
310325

311326
public Configurations getServiceConfig() {

amoro-ams/src/main/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainer.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,9 @@ public void close() {
147147
}
148148
}
149149

150+
@Override
151+
public void registAndElect() throws Exception {}
152+
150153
private class HeartbeatRunnable implements Runnable {
151154
@Override
152155
public void run() {

amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainer.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,12 @@ public interface HighAvailabilityContainer {
4242

4343
/** Closes the container and releases resources. */
4444
void close();
45+
46+
/**
47+
* In master-slave mode, this is used for AMS nodes to register and participate in the master
48+
* election process.
49+
*
50+
* @throws Exception If registration fails or participation in the primary election fails.
51+
*/
52+
void registAndElect() throws Exception;
4553
}

amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,7 @@ public void waitFollowerShip() throws InterruptedException {
4646
public void close() {
4747
LOG.info("Noop HA: closed");
4848
}
49+
50+
@Override
51+
public void registAndElect() throws Exception {}
4952
}

amoro-ams/src/main/java/org/apache/amoro/server/ha/ZkHighAvailabilityContainer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,11 @@ public void waitLeaderShip() throws Exception {
140140
LOG.info("Became the leader of AMS");
141141
}
142142

143+
@Override
144+
public void registAndElect() throws Exception {
145+
// TODO Here you can register for AMS and participate in the election.
146+
}
147+
143148
@Override
144149
public void waitFollowerShip() throws Exception {
145150
LOG.info("Waiting to become the follower of AMS");

amoro-ams/src/test/java/org/apache/amoro/server/TestInternalMixedCatalogService.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,14 @@ public void before() {
189189
@AfterEach
190190
public void after() {
191191
LOG.info("Test finished.");
192+
try {
193+
// explicitly clean up possible residual table runtime records
194+
if (catalog.tableExists(tableIdentifier)) {
195+
catalog.dropTable(tableIdentifier, true);
196+
}
197+
} catch (Exception e) {
198+
LOG.warn("Failed to drop table during cleanup", e);
199+
}
192200
catalog.dropDatabase(database);
193201
}
194202

docs/configuration/ams-config.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ table td:last-child, table th:last-child { width: 40%; word-break: break-all; }
125125
| thrift-server.selector-thread-count | 2 | The number of selector threads for the Thrift server. |
126126
| thrift-server.table-service.bind-port | 1260 | Port that the table service thrift server is bound to. |
127127
| thrift-server.table-service.worker-thread-count | 20 | The number of worker threads for the Thrift server. |
128+
| use-master-slave-mode | false | This setting controls whether to enable the AMS horizontal scaling feature, which is currently under development and testing. |
128129

129130

130131
## Shade Utils Configuration

0 commit comments

Comments
 (0)