props = info.getDatabaseDescriptor().getCustomProperties();
+ System.out.println("Properties:");
+ if (props.isEmpty()) {
+ System.out.println(" (none)");
+ } else {
+ props.forEach((k, v) -> System.out.println(" " + k + " = " + v));
+ }
+ }
+
+ void dropDatabase(DatabaseCommandOptions opts) throws Exception {
+ String dbName = opts.database();
+ admin.dropDatabase(dbName, opts.ifExists(), opts.cascade())
+ .get(CommandUtils.DEFAULT_TIMEOUT_SECS, TimeUnit.SECONDS);
+ System.out.println("Dropped database \"" + dbName + "\".");
+ }
+
+ @Override
+ public void close() throws Exception {
+ connection.close();
+ }
+ }
+
+ /** CLI option parser for database commands. */
+ static class DatabaseCommandOptions extends CommandDefaultOptions {
+ final Option createOpt =
+ Option.builder().longOpt("create").desc("Create a new database.").build();
+ final Option listOpt = Option.builder().longOpt("list").desc("List all databases.").build();
+ final Option describeOpt =
+ Option.builder().longOpt("describe").desc("Describe a database.").build();
+ final Option dropOpt = Option.builder().longOpt("drop").desc("Drop a database.").build();
+ final Option databaseOpt =
+ Option.builder()
+ .longOpt("database")
+ .hasArg()
+ .argName("name")
+ .desc("The database name.")
+ .build();
+ final Option ifExistsOpt =
+ Option.builder()
+ .longOpt("if-exists")
+ .desc("Only execute if the database exists.")
+ .build();
+ final Option ifNotExistsOpt =
+ Option.builder()
+ .longOpt("if-not-exists")
+ .desc("Only execute if the database does not exist.")
+ .build();
+ final Option cascadeOpt =
+ Option.builder()
+ .longOpt("cascade")
+ .desc("Drop all tables in the database (for --drop).")
+ .build();
+ final Option propertyOpt =
+ Option.builder()
+ .longOpt("property")
+ .hasArg()
+ .argName("key=value")
+ .desc("Database property (repeatable).")
+ .build();
+
+ DatabaseCommandOptions(String[] args) throws ParseException {
+ super();
+ options.addOption(createOpt);
+ options.addOption(listOpt);
+ options.addOption(describeOpt);
+ options.addOption(dropOpt);
+ options.addOption(databaseOpt);
+ options.addOption(ifExistsOpt);
+ options.addOption(ifNotExistsOpt);
+ options.addOption(cascadeOpt);
+ options.addOption(propertyOpt);
+ parse(args);
+ }
+
+ boolean hasCreateOption() {
+ return commandLine.hasOption(createOpt);
+ }
+
+ boolean hasListOption() {
+ return commandLine.hasOption(listOpt);
+ }
+
+ boolean hasDescribeOption() {
+ return commandLine.hasOption(describeOpt);
+ }
+
+ boolean hasDropOption() {
+ return commandLine.hasOption(dropOpt);
+ }
+
+ String database() {
+ String db = commandLine.getOptionValue(databaseOpt);
+ if (db == null) {
+ throw new IllegalArgumentException("--database is required.");
+ }
+ return db;
+ }
+
+ boolean ifExists() {
+ return commandLine.hasOption(ifExistsOpt);
+ }
+
+ boolean ifNotExists() {
+ return commandLine.hasOption(ifNotExistsOpt);
+ }
+
+ boolean cascade() {
+ return commandLine.hasOption(cascadeOpt);
+ }
+
+ CommandLine commandLine() {
+ return commandLine;
+ }
+ }
+}
diff --git a/fluss-cli/src/main/java/org/apache/fluss/cli/SchemaParser.java b/fluss-cli/src/main/java/org/apache/fluss/cli/SchemaParser.java
new file mode 100644
index 0000000000..b4608c28d3
--- /dev/null
+++ b/fluss-cli/src/main/java/org/apache/fluss/cli/SchemaParser.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.cli;
+
+import org.apache.fluss.annotation.Internal;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypes;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Parses a schema definition string into a {@link Schema}.
+ *
+ * Format: {@code "col1 TYPE1, col2 TYPE2 NOT NULL, col3 TYPE3"}
+ *
+ *
Uses parenthesis-aware splitting to handle types like {@code DECIMAL(10,2)}. Type strings
+ * (including nullability suffix) are parsed by {@link DataTypes#parse(String)}.
+ */
+@Internal
+public final class SchemaParser {
+
+ private SchemaParser() {}
+
+ /**
+ * Parse a schema definition string into a Schema.
+ *
+ * @param schemaStr comma-separated column definitions
+ * @param primaryKeys primary key column names (empty list if no primary key)
+ * @return the parsed Schema
+ */
+ public static Schema parseSchema(String schemaStr, List primaryKeys) {
+ if (schemaStr == null || schemaStr.trim().isEmpty()) {
+ throw new IllegalArgumentException("Schema definition cannot be empty.");
+ }
+
+ Schema.Builder builder = Schema.newBuilder();
+ List columnDefs = splitRespectingParentheses(schemaStr, ',');
+
+ for (String colDef : columnDefs) {
+ String trimmed = colDef.trim();
+ if (trimmed.isEmpty()) {
+ continue;
+ }
+
+ int firstSpace = trimmed.indexOf(' ');
+ if (firstSpace < 0) {
+ throw new IllegalArgumentException(
+ "Invalid column definition: '"
+ + trimmed
+ + "'. Expected format: 'name TYPE [NOT NULL]'.");
+ }
+
+ String name = trimmed.substring(0, firstSpace).trim();
+ String typeStr = trimmed.substring(firstSpace + 1).trim();
+
+ DataType dataType = DataTypes.parse(typeStr);
+ builder.column(name, dataType);
+ }
+
+ if (primaryKeys != null && !primaryKeys.isEmpty()) {
+ builder.primaryKey(primaryKeys);
+ }
+
+ return builder.build();
+ }
+
+ /**
+ * Split a string by delimiter, skipping delimiters inside parentheses. Handles types like
+ * {@code DECIMAL(10,2)}.
+ */
+ static List splitRespectingParentheses(String input, char delimiter) {
+ List result = new ArrayList<>();
+ int depth = 0;
+ int start = 0;
+ for (int i = 0; i < input.length(); i++) {
+ char c = input.charAt(i);
+ if (c == '(') {
+ depth++;
+ } else if (c == ')') {
+ depth--;
+ } else if (c == delimiter && depth == 0) {
+ result.add(input.substring(start, i));
+ start = i + 1;
+ }
+ }
+ result.add(input.substring(start));
+ return result;
+ }
+}
diff --git a/fluss-cli/src/main/java/org/apache/fluss/cli/TableCommand.java b/fluss-cli/src/main/java/org/apache/fluss/cli/TableCommand.java
new file mode 100644
index 0000000000..f7aa8c11e0
--- /dev/null
+++ b/fluss-cli/src/main/java/org/apache/fluss/cli/TableCommand.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.cli;
+
+import org.apache.fluss.annotation.Internal;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.FlussConnection;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.metadata.MetadataUpdater;
+import org.apache.fluss.cluster.BucketLocation;
+import org.apache.fluss.cluster.Cluster;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metadata.TablePath;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.ParseException;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+/** CLI tool for managing Fluss tables. */
+@Internal
+public class TableCommand {
+
+ public static void main(String[] args) {
+ System.exit(mainNoExit(args));
+ }
+
+ static int mainNoExit(String[] args) {
+ try {
+ execute(args);
+ return 0;
+ } catch (Throwable e) {
+ System.err.println(
+ "Error while executing table command: "
+ + CommandUtils.unwrapExceptionMessage(e));
+ return 1;
+ }
+ }
+
+ static void execute(String[] args) throws Exception {
+ TableCommandOptions opts = new TableCommandOptions(args);
+ if (opts.hasHelpOption()) {
+ CommandUtils.printUsageAndExit(opts.options(), "fluss-table.sh");
+ }
+ if (opts.hasVersionOption()) {
+ CommandUtils.printVersionAndExit();
+ }
+ CommandUtils.validateActions(
+ opts.commandLine(), opts.createOpt, opts.listOpt, opts.describeOpt, opts.dropOpt);
+
+ try (TableService service = new TableService(opts)) {
+ if (opts.hasCreateOption()) {
+ service.createTable(opts);
+ } else if (opts.hasListOption()) {
+ service.listTables(opts);
+ } else if (opts.hasDescribeOption()) {
+ service.describeTable(opts);
+ } else if (opts.hasDropOption()) {
+ service.dropTable(opts);
+ }
+ }
+ }
+
+ /** Service class that wraps the Admin client for table operations. */
+ static class TableService implements AutoCloseable {
+ private final Admin admin;
+ private final Connection connection;
+
+ TableService(TableCommandOptions opts) {
+ this.connection = CommandUtils.createConnection(opts);
+ this.admin = connection.getAdmin();
+ }
+
+ void createTable(TableCommandOptions opts) throws Exception {
+ TablePath tablePath = opts.tablePath();
+ String schemaStr = opts.schema();
+ List primaryKeys = opts.primaryKeys();
+
+ Schema schema = SchemaParser.parseSchema(schemaStr, primaryKeys);
+
+ TableDescriptor.Builder builder = TableDescriptor.builder().schema(schema);
+
+ List partitionKeys = opts.partitionKeys();
+ if (!partitionKeys.isEmpty()) {
+ builder.partitionedBy(partitionKeys);
+ }
+
+ Integer buckets = opts.buckets();
+ if (buckets != null) {
+ builder.distributedBy(buckets);
+ }
+
+ Map props =
+ CommandUtils.parseProperties(opts.commandLine().getOptionValues("property"));
+ props.forEach(builder::property);
+
+ admin.createTable(tablePath, builder.build(), opts.ifNotExists())
+ .get(CommandUtils.DEFAULT_TIMEOUT_SECS, TimeUnit.SECONDS);
+ System.out.println("Created table \"" + tablePath + "\".");
+ }
+
+ void listTables(TableCommandOptions opts) throws Exception {
+ String database = opts.database();
+ List tables =
+ admin.listTables(database)
+ .get(CommandUtils.DEFAULT_TIMEOUT_SECS, TimeUnit.SECONDS);
+ tables.forEach(System.out::println);
+ }
+
+ void describeTable(TableCommandOptions opts) throws Exception {
+ TablePath tablePath = opts.tablePath();
+ TableInfo tableInfo =
+ admin.getTableInfo(tablePath)
+ .get(CommandUtils.DEFAULT_TIMEOUT_SECS, TimeUnit.SECONDS);
+
+ Schema schema = tableInfo.getSchema();
+ boolean hasPK = tableInfo.hasPrimaryKey();
+
+ System.out.println("Table: " + tablePath);
+ System.out.println("Type: " + (hasPK ? "PrimaryKey" : "Log"));
+
+ System.out.println("Schema:");
+ for (Schema.Column col : schema.getColumns()) {
+ System.out.println(" " + col.getName() + " " + col.getDataType());
+ }
+
+ if (hasPK) {
+ System.out.println("PrimaryKey: " + String.join(", ", tableInfo.getPrimaryKeys()));
+ }
+
+ List partitionKeys = tableInfo.getPartitionKeys();
+ if (!partitionKeys.isEmpty()) {
+ System.out.println("PartitionKeys: " + String.join(", ", partitionKeys));
+ }
+
+ int numBuckets = tableInfo.getNumBuckets();
+ if (numBuckets > 0) {
+ System.out.println("Buckets: " + numBuckets);
+ }
+
+ List bucketKeys = tableInfo.getBucketKeys();
+ if (!bucketKeys.isEmpty()) {
+ System.out.println("BucketKeys: " + String.join(", ", bucketKeys));
+ }
+
+ Optional comment = tableInfo.getComment();
+ if (comment.isPresent()) {
+ System.out.println("Comment: " + comment.get());
+ }
+
+ Configuration props = tableInfo.getProperties();
+ Configuration customProps = tableInfo.getCustomProperties();
+ Map propsMap = props.toMap();
+ Map customPropsMap = customProps.toMap();
+ if (!propsMap.isEmpty() || !customPropsMap.isEmpty()) {
+ System.out.println("Properties:");
+ propsMap.forEach((k, v) -> System.out.println(" " + k + " = " + v));
+ customPropsMap.forEach((k, v) -> System.out.println(" " + k + " = " + v));
+ }
+
+ printBucketDistribution(tablePath);
+ }
+
+ private void printBucketDistribution(TablePath tablePath) {
+ MetadataUpdater metadataUpdater = ((FlussConnection) connection).getMetadataUpdater();
+ metadataUpdater.updateTableOrPartitionMetadata(tablePath, null);
+ Cluster cluster = metadataUpdater.getCluster();
+ PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tablePath);
+ java.util.List bucketLocations =
+ cluster.getAvailableBucketsForPhysicalTablePath(physicalTablePath);
+ if (bucketLocations == null || bucketLocations.isEmpty()) {
+ return;
+ }
+ System.out.println("Bucket Distribution:");
+ System.out.printf(" %-8s%-8s%-12s%s%n", "Bucket", "Leader", "Replicas", "Isr");
+ for (BucketLocation loc : bucketLocations) {
+ String leader = loc.getLeader() == null ? "none" : String.valueOf(loc.getLeader());
+ System.out.printf(
+ " %-8d%-8s%-12s%s%n",
+ loc.getBucketId(),
+ leader,
+ formatIntArray(loc.getReplicas()),
+ formatIntArray(loc.getIsr()));
+ }
+ }
+
+ private static String formatIntArray(int[] arr) {
+ StringBuilder sb = new StringBuilder("[");
+ for (int i = 0; i < arr.length; i++) {
+ if (i > 0) {
+ sb.append(",");
+ }
+ sb.append(arr[i]);
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+
+ void dropTable(TableCommandOptions opts) throws Exception {
+ TablePath tablePath = opts.tablePath();
+ admin.dropTable(tablePath, opts.ifExists())
+ .get(CommandUtils.DEFAULT_TIMEOUT_SECS, TimeUnit.SECONDS);
+ System.out.println("Dropped table \"" + tablePath + "\".");
+ }
+
+ @Override
+ public void close() throws Exception {
+ connection.close();
+ }
+ }
+
+ /** CLI option parser for table commands. */
+ static class TableCommandOptions extends CommandDefaultOptions {
+ final Option createOpt =
+ Option.builder().longOpt("create").desc("Create a new table.").build();
+ final Option listOpt =
+ Option.builder().longOpt("list").desc("List all tables in a database.").build();
+ final Option describeOpt =
+ Option.builder().longOpt("describe").desc("Describe a table.").build();
+ final Option dropOpt = Option.builder().longOpt("drop").desc("Drop a table.").build();
+ final Option databaseOpt =
+ Option.builder()
+ .longOpt("database")
+ .hasArg()
+ .argName("name")
+ .desc("The database name (for --list).")
+ .build();
+ final Option tableOpt =
+ Option.builder()
+ .longOpt("table")
+ .hasArg()
+ .argName("db.table")
+ .desc("The table path in format 'database.table'.")
+ .build();
+ final Option schemaOpt =
+ Option.builder()
+ .longOpt("schema")
+ .hasArg()
+ .argName("definition")
+ .desc("Column definitions: 'col1 INT, col2 STRING NOT NULL, ...'")
+ .build();
+ final Option primaryKeyOpt =
+ Option.builder()
+ .longOpt("primary-key")
+ .hasArg()
+ .argName("columns")
+ .desc("Primary key columns (comma-separated).")
+ .build();
+ final Option partitionByOpt =
+ Option.builder()
+ .longOpt("partition-by")
+ .hasArg()
+ .argName("columns")
+ .desc("Partition columns (comma-separated).")
+ .build();
+ final Option bucketsOpt =
+ Option.builder()
+ .longOpt("buckets")
+ .hasArg()
+ .argName("num")
+ .desc("Number of buckets.")
+ .build();
+ final Option propertyOpt =
+ Option.builder()
+ .longOpt("property")
+ .hasArg()
+ .argName("key=value")
+ .desc("Table property (repeatable).")
+ .build();
+ final Option ifExistsOpt =
+ Option.builder()
+ .longOpt("if-exists")
+ .desc("Only execute if the table exists.")
+ .build();
+ final Option ifNotExistsOpt =
+ Option.builder()
+ .longOpt("if-not-exists")
+ .desc("Only execute if the table does not exist.")
+ .build();
+
+ TableCommandOptions(String[] args) throws ParseException {
+ super();
+ options.addOption(createOpt);
+ options.addOption(listOpt);
+ options.addOption(describeOpt);
+ options.addOption(dropOpt);
+ options.addOption(databaseOpt);
+ options.addOption(tableOpt);
+ options.addOption(schemaOpt);
+ options.addOption(primaryKeyOpt);
+ options.addOption(partitionByOpt);
+ options.addOption(bucketsOpt);
+ options.addOption(propertyOpt);
+ options.addOption(ifExistsOpt);
+ options.addOption(ifNotExistsOpt);
+ parse(args);
+ }
+
+ boolean hasCreateOption() {
+ return commandLine.hasOption(createOpt);
+ }
+
+ boolean hasListOption() {
+ return commandLine.hasOption(listOpt);
+ }
+
+ boolean hasDescribeOption() {
+ return commandLine.hasOption(describeOpt);
+ }
+
+ boolean hasDropOption() {
+ return commandLine.hasOption(dropOpt);
+ }
+
+ TablePath tablePath() {
+ String table = commandLine.getOptionValue(tableOpt);
+ if (table == null) {
+ throw new IllegalArgumentException("--table is required.");
+ }
+ String[] parts = table.split("\\.", 2);
+ if (parts.length != 2 || parts[0].isEmpty() || parts[1].isEmpty()) {
+ throw new IllegalArgumentException(
+ "Invalid table path: '" + table + "'. Expected format: 'database.table'.");
+ }
+ return TablePath.of(parts[0], parts[1]);
+ }
+
+ String database() {
+ String db = commandLine.getOptionValue(databaseOpt);
+ if (db == null) {
+ throw new IllegalArgumentException("--database is required.");
+ }
+ return db;
+ }
+
+ String schema() {
+ String s = commandLine.getOptionValue(schemaOpt);
+ if (s == null) {
+ throw new IllegalArgumentException("--schema is required for --create.");
+ }
+ return s;
+ }
+
+ List primaryKeys() {
+ String pk = commandLine.getOptionValue(primaryKeyOpt);
+ if (pk == null) {
+ return Collections.emptyList();
+ }
+ return Arrays.asList(trimElements(pk.split(",")));
+ }
+
+ List partitionKeys() {
+ String pb = commandLine.getOptionValue(partitionByOpt);
+ if (pb == null) {
+ return Collections.emptyList();
+ }
+ return Arrays.asList(trimElements(pb.split(",")));
+ }
+
+ private static String[] trimElements(String[] elements) {
+ for (int i = 0; i < elements.length; i++) {
+ elements[i] = elements[i].trim();
+ }
+ return elements;
+ }
+
+ Integer buckets() {
+ String b = commandLine.getOptionValue(bucketsOpt);
+ if (b == null) {
+ return null;
+ }
+ return Integer.parseInt(b);
+ }
+
+ boolean ifExists() {
+ return commandLine.hasOption(ifExistsOpt);
+ }
+
+ boolean ifNotExists() {
+ return commandLine.hasOption(ifNotExistsOpt);
+ }
+
+ CommandLine commandLine() {
+ return commandLine;
+ }
+ }
+}
diff --git a/fluss-cli/src/test/java/org/apache/fluss/cli/CommandDefaultOptionsTest.java b/fluss-cli/src/test/java/org/apache/fluss/cli/CommandDefaultOptionsTest.java
new file mode 100644
index 0000000000..4fb2a19143
--- /dev/null
+++ b/fluss-cli/src/test/java/org/apache/fluss/cli/CommandDefaultOptionsTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.cli;
+
+import org.apache.commons.cli.ParseException;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Properties;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link CommandDefaultOptions}. */
+class CommandDefaultOptionsTest {
+
+ @Test
+ void testParseBootstrapServer() throws ParseException {
+ CommandDefaultOptions opts = new CommandDefaultOptions();
+ opts.parse(new String[] {"--bootstrap-server", "localhost:9123"});
+
+ assertThat(opts.bootstrapServer()).isEqualTo("localhost:9123");
+ assertThat(opts.hasHelpOption()).isFalse();
+ assertThat(opts.hasVersionOption()).isFalse();
+ }
+
+ @Test
+ void testMissingBootstrapServerThrows() {
+ CommandDefaultOptions opts = new CommandDefaultOptions();
+
+ assertThatThrownBy(() -> opts.parse(new String[] {"--list"}))
+ .isInstanceOf(ParseException.class);
+ }
+
+ @Test
+ void testHelpOption() throws ParseException {
+ CommandDefaultOptions opts = new CommandDefaultOptions();
+ opts.parse(new String[] {"--help"});
+
+ assertThat(opts.hasHelpOption()).isTrue();
+ }
+
+ @Test
+ void testVersionOption() throws ParseException {
+ CommandDefaultOptions opts = new CommandDefaultOptions();
+ opts.parse(new String[] {"--version"});
+
+ assertThat(opts.hasVersionOption()).isTrue();
+ }
+
+ @Test
+ void testCommandConfigLoadsProperties(@TempDir Path tempDir)
+ throws ParseException, IOException {
+ Path configFile = tempDir.resolve("client.properties");
+ Properties props = new Properties();
+ props.setProperty("security.protocol", "SASL_SSL");
+ try (FileOutputStream fos = new FileOutputStream(configFile.toFile())) {
+ props.store(fos, null);
+ }
+
+ CommandDefaultOptions opts = new CommandDefaultOptions();
+ opts.parse(
+ new String[] {
+ "--bootstrap-server",
+ "localhost:9123",
+ "--command-config",
+ configFile.toString()
+ });
+
+ Properties loaded = opts.commandConfig();
+ assertThat(loaded.getProperty("security.protocol")).isEqualTo("SASL_SSL");
+ }
+
+ @Test
+ void testCommandConfigDefaultsToEmptyProperties() throws ParseException {
+ CommandDefaultOptions opts = new CommandDefaultOptions();
+ opts.parse(new String[] {"--bootstrap-server", "localhost:9123"});
+
+ assertThat(opts.commandConfig()).isEmpty();
+ }
+}
diff --git a/fluss-cli/src/test/java/org/apache/fluss/cli/CommandUtilsTest.java b/fluss-cli/src/test/java/org/apache/fluss/cli/CommandUtilsTest.java
new file mode 100644
index 0000000000..6bc0ab766d
--- /dev/null
+++ b/fluss-cli/src/test/java/org/apache/fluss/cli/CommandUtilsTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.cli;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link CommandUtils}. */
+class CommandUtilsTest {
+
+ @Test
+ void testParsePropertiesValid() {
+ Map props =
+ CommandUtils.parseProperties(new String[] {"key1=value1", "key2=value2"});
+
+ assertThat(props).hasSize(2);
+ assertThat(props.get("key1")).isEqualTo("value1");
+ assertThat(props.get("key2")).isEqualTo("value2");
+ }
+
+ @Test
+ void testParsePropertiesWithEqualsInValue() {
+ Map props = CommandUtils.parseProperties(new String[] {"key=val=ue"});
+
+ assertThat(props.get("key")).isEqualTo("val=ue");
+ }
+
+ @Test
+ void testParsePropertiesNull() {
+ Map props = CommandUtils.parseProperties(null);
+
+ assertThat(props).isEmpty();
+ }
+
+ @Test
+ void testParsePropertiesInvalidFormat() {
+ assertThatThrownBy(() -> CommandUtils.parseProperties(new String[] {"noequalssign"}))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Invalid property format");
+ }
+
+ @Test
+ void testValidateActionsExactlyOne() throws Exception {
+ Option createOpt = Option.builder().longOpt("create").build();
+ Option listOpt = Option.builder().longOpt("list").build();
+ Options opts = new Options();
+ opts.addOption(createOpt);
+ opts.addOption(listOpt);
+ CommandLine cmd = new DefaultParser().parse(opts, new String[] {"--create"});
+
+ // should not throw
+ CommandUtils.validateActions(cmd, createOpt, listOpt);
+ }
+
+ @Test
+ void testValidateActionsNoneThrows() throws Exception {
+ Option createOpt = Option.builder().longOpt("create").build();
+ Option listOpt = Option.builder().longOpt("list").build();
+ Options opts = new Options();
+ opts.addOption(createOpt);
+ opts.addOption(listOpt);
+ CommandLine cmd = new DefaultParser().parse(opts, new String[] {});
+
+ assertThatThrownBy(() -> CommandUtils.validateActions(cmd, createOpt, listOpt))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Exactly one of");
+ }
+
+ @Test
+ void testValidateActionsMultipleThrows() throws Exception {
+ Option createOpt = Option.builder().longOpt("create").build();
+ Option listOpt = Option.builder().longOpt("list").build();
+ Options opts = new Options();
+ opts.addOption(createOpt);
+ opts.addOption(listOpt);
+ CommandLine cmd = new DefaultParser().parse(opts, new String[] {"--create", "--list"});
+
+ assertThatThrownBy(() -> CommandUtils.validateActions(cmd, createOpt, listOpt))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Exactly one of");
+ }
+
+ @Test
+ void testUnwrapExecutionException() {
+ ExecutionException e = new ExecutionException(new RuntimeException("root cause"));
+
+ assertThat(CommandUtils.unwrapExceptionMessage(e)).isEqualTo("root cause");
+ }
+
+ @Test
+ void testUnwrapRegularException() {
+ RuntimeException e = new RuntimeException("direct message");
+
+ assertThat(CommandUtils.unwrapExceptionMessage(e)).isEqualTo("direct message");
+ }
+}
diff --git a/fluss-cli/src/test/java/org/apache/fluss/cli/DatabaseCommandTest.java b/fluss-cli/src/test/java/org/apache/fluss/cli/DatabaseCommandTest.java
new file mode 100644
index 0000000000..066cd473b2
--- /dev/null
+++ b/fluss-cli/src/test/java/org/apache/fluss/cli/DatabaseCommandTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.cli;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link DatabaseCommand} option parsing. */
+class DatabaseCommandTest {
+
+ @Test
+ void testParseCreateOptions() throws Exception {
+ DatabaseCommand.DatabaseCommandOptions opts =
+ new DatabaseCommand.DatabaseCommandOptions(
+ new String[] {
+ "--bootstrap-server", "localhost:9123", "--create", "--database", "mydb"
+ });
+
+ assertThat(opts.hasCreateOption()).isTrue();
+ assertThat(opts.hasListOption()).isFalse();
+ assertThat(opts.database()).isEqualTo("mydb");
+ }
+
+ @Test
+ void testParseListOptions() throws Exception {
+ DatabaseCommand.DatabaseCommandOptions opts =
+ new DatabaseCommand.DatabaseCommandOptions(
+ new String[] {"--bootstrap-server", "localhost:9123", "--list"});
+
+ assertThat(opts.hasListOption()).isTrue();
+ }
+
+ @Test
+ void testParseDropWithCascadeAndIfExists() throws Exception {
+ DatabaseCommand.DatabaseCommandOptions opts =
+ new DatabaseCommand.DatabaseCommandOptions(
+ new String[] {
+ "--bootstrap-server",
+ "localhost:9123",
+ "--drop",
+ "--database",
+ "mydb",
+ "--cascade",
+ "--if-exists"
+ });
+
+ assertThat(opts.hasDropOption()).isTrue();
+ assertThat(opts.database()).isEqualTo("mydb");
+ assertThat(opts.cascade()).isTrue();
+ assertThat(opts.ifExists()).isTrue();
+ }
+
+ @Test
+ void testMissingDatabaseForCreateThrows() throws Exception {
+ DatabaseCommand.DatabaseCommandOptions opts =
+ new DatabaseCommand.DatabaseCommandOptions(
+ new String[] {"--bootstrap-server", "localhost:9123", "--create"});
+
+ assertThatThrownBy(opts::database)
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("--database is required");
+ }
+
+ @Test
+ void testIfNotExistsOption() throws Exception {
+ DatabaseCommand.DatabaseCommandOptions opts =
+ new DatabaseCommand.DatabaseCommandOptions(
+ new String[] {
+ "--bootstrap-server",
+ "localhost:9123",
+ "--create",
+ "--database",
+ "mydb",
+ "--if-not-exists"
+ });
+
+ assertThat(opts.ifNotExists()).isTrue();
+ }
+
+ @Test
+ void testMainNoExitReturnsOneOnError() {
+ // No args at all -> parse error -> exit code 1
+ int exitCode = DatabaseCommand.mainNoExit(new String[] {});
+ assertThat(exitCode).isEqualTo(1);
+ }
+}
diff --git a/fluss-cli/src/test/java/org/apache/fluss/cli/SchemaParserTest.java b/fluss-cli/src/test/java/org/apache/fluss/cli/SchemaParserTest.java
new file mode 100644
index 0000000000..9793a3397c
--- /dev/null
+++ b/fluss-cli/src/test/java/org/apache/fluss/cli/SchemaParserTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.cli;
+
+import org.apache.fluss.metadata.Schema;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link SchemaParser}. */
+class SchemaParserTest {
+
+ @Test
+ void testSimpleColumns() {
+ Schema schema = SchemaParser.parseSchema("id INT, name STRING", Collections.emptyList());
+
+ List cols = schema.getColumns();
+ assertThat(cols).hasSize(2);
+ assertThat(cols.get(0).getName()).isEqualTo("id");
+ assertThat(cols.get(0).getDataType().isNullable()).isTrue();
+ assertThat(cols.get(1).getName()).isEqualTo("name");
+ assertThat(cols.get(1).getDataType().isNullable()).isTrue();
+ }
+
+ @Test
+ void testNotNullColumn() {
+ Schema schema =
+ SchemaParser.parseSchema("id INT NOT NULL, name STRING", Collections.emptyList());
+
+ List cols = schema.getColumns();
+ assertThat(cols.get(0).getName()).isEqualTo("id");
+ assertThat(cols.get(0).getDataType().isNullable()).isFalse();
+ assertThat(cols.get(1).getDataType().isNullable()).isTrue();
+ }
+
+ @Test
+ void testDecimalWithCommaInParens() {
+ Schema schema = SchemaParser.parseSchema("amount DECIMAL(10,2)", Collections.emptyList());
+
+ List cols = schema.getColumns();
+ assertThat(cols).hasSize(1);
+ assertThat(cols.get(0).getName()).isEqualTo("amount");
+ assertThat(cols.get(0).getDataType().toString()).contains("DECIMAL");
+ }
+
+ @Test
+ void testMultipleNotNull() {
+ Schema schema =
+ SchemaParser.parseSchema(
+ "a INT NOT NULL, b STRING NOT NULL", Collections.emptyList());
+
+ List cols = schema.getColumns();
+ assertThat(cols.get(0).getDataType().isNullable()).isFalse();
+ assertThat(cols.get(1).getDataType().isNullable()).isFalse();
+ }
+
+ @Test
+ void testSingleColumn() {
+ Schema schema = SchemaParser.parseSchema("id BIGINT", Collections.emptyList());
+
+ assertThat(schema.getColumns()).hasSize(1);
+ assertThat(schema.getColumns().get(0).getName()).isEqualTo("id");
+ }
+
+ @Test
+ void testExtraWhitespace() {
+ Schema schema =
+ SchemaParser.parseSchema(" id INT , name STRING ", Collections.emptyList());
+
+ List cols = schema.getColumns();
+ assertThat(cols).hasSize(2);
+ assertThat(cols.get(0).getName()).isEqualTo("id");
+ assertThat(cols.get(1).getName()).isEqualTo("name");
+ }
+
+ @Test
+ void testWithPrimaryKey() {
+ Schema schema = SchemaParser.parseSchema("id INT, name STRING", Arrays.asList("id"));
+
+ assertThat(schema.getPrimaryKey()).isPresent();
+ assertThat(schema.getPrimaryKey().get().getColumnNames()).containsExactly("id");
+ }
+
+ @Test
+ void testInvalidColumnDefinition() {
+ assertThatThrownBy(() -> SchemaParser.parseSchema("nospace", Collections.emptyList()))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Invalid column definition");
+ }
+
+ @Test
+ void testUnknownTypeThrows() {
+ assertThatThrownBy(() -> SchemaParser.parseSchema("x FOOBAR", Collections.emptyList()))
+ .isInstanceOf(Exception.class);
+ }
+
+ @Test
+ void testEmptySchemaThrows() {
+ assertThatThrownBy(() -> SchemaParser.parseSchema("", Collections.emptyList()))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ void testAllSupportedTypes() {
+ String schema =
+ "a BOOLEAN, b TINYINT, c SMALLINT, d INT, e BIGINT, "
+ + "f FLOAT, g DOUBLE, h STRING, i DATE, j TIMESTAMP";
+ Schema result = SchemaParser.parseSchema(schema, Collections.emptyList());
+
+ assertThat(result.getColumns()).hasSize(10);
+ }
+}
diff --git a/fluss-cli/src/test/java/org/apache/fluss/cli/TableCommandTest.java b/fluss-cli/src/test/java/org/apache/fluss/cli/TableCommandTest.java
new file mode 100644
index 0000000000..61f0ac98ad
--- /dev/null
+++ b/fluss-cli/src/test/java/org/apache/fluss/cli/TableCommandTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.cli;
+
+import org.apache.fluss.metadata.TablePath;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link TableCommand} option parsing. */
+class TableCommandTest {
+
+ @Test
+ void testParseCreateOptions() throws Exception {
+ TableCommand.TableCommandOptions opts =
+ new TableCommand.TableCommandOptions(
+ new String[] {
+ "--bootstrap-server",
+ "localhost:9123",
+ "--create",
+ "--table",
+ "mydb.users",
+ "--schema",
+ "id INT, name STRING",
+ "--primary-key",
+ "id",
+ "--buckets",
+ "4"
+ });
+
+ assertThat(opts.hasCreateOption()).isTrue();
+ assertThat(opts.tablePath()).isEqualTo(TablePath.of("mydb", "users"));
+ assertThat(opts.schema()).isEqualTo("id INT, name STRING");
+ assertThat(opts.primaryKeys()).containsExactly("id");
+ assertThat(opts.buckets()).isEqualTo(4);
+ }
+
+ @Test
+ void testParseListOptions() throws Exception {
+ TableCommand.TableCommandOptions opts =
+ new TableCommand.TableCommandOptions(
+ new String[] {
+ "--bootstrap-server", "localhost:9123", "--list", "--database", "mydb"
+ });
+
+ assertThat(opts.hasListOption()).isTrue();
+ assertThat(opts.database()).isEqualTo("mydb");
+ }
+
+ @Test
+ void testTablePathParsing() throws Exception {
+ TableCommand.TableCommandOptions opts =
+ new TableCommand.TableCommandOptions(
+ new String[] {
+ "--bootstrap-server",
+ "localhost:9123",
+ "--describe",
+ "--table",
+ "mydb.orders"
+ });
+
+ TablePath path = opts.tablePath();
+ assertThat(path.getDatabaseName()).isEqualTo("mydb");
+ assertThat(path.getTableName()).isEqualTo("orders");
+ }
+
+ @Test
+ void testTablePathInvalidFormatNoDot() throws Exception {
+ TableCommand.TableCommandOptions opts =
+ new TableCommand.TableCommandOptions(
+ new String[] {
+ "--bootstrap-server",
+ "localhost:9123",
+ "--describe",
+ "--table",
+ "notable"
+ });
+
+ assertThatThrownBy(opts::tablePath)
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Invalid table path");
+ }
+
+ @Test
+ void testTablePathInvalidEmptyParts() throws Exception {
+ TableCommand.TableCommandOptions opts =
+ new TableCommand.TableCommandOptions(
+ new String[] {
+ "--bootstrap-server",
+ "localhost:9123",
+ "--describe",
+ "--table",
+ ".table"
+ });
+
+ assertThatThrownBy(opts::tablePath)
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Invalid table path");
+ }
+
+ @Test
+ void testMissingTableThrows() throws Exception {
+ TableCommand.TableCommandOptions opts =
+ new TableCommand.TableCommandOptions(
+ new String[] {"--bootstrap-server", "localhost:9123", "--describe"});
+
+ assertThatThrownBy(opts::tablePath)
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("--table is required");
+ }
+
+ @Test
+ void testMissingSchemaForCreateThrows() throws Exception {
+ TableCommand.TableCommandOptions opts =
+ new TableCommand.TableCommandOptions(
+ new String[] {
+ "--bootstrap-server",
+ "localhost:9123",
+ "--create",
+ "--table",
+ "mydb.users"
+ });
+
+ assertThatThrownBy(opts::schema)
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("--schema is required");
+ }
+
+ @Test
+ void testPartitionByOption() throws Exception {
+ TableCommand.TableCommandOptions opts =
+ new TableCommand.TableCommandOptions(
+ new String[] {
+ "--bootstrap-server",
+ "localhost:9123",
+ "--create",
+ "--table",
+ "mydb.orders",
+ "--schema",
+ "id INT, dt STRING",
+ "--partition-by",
+ "dt"
+ });
+
+ List keys = opts.partitionKeys();
+ assertThat(keys).containsExactly("dt");
+ }
+
+ @Test
+ void testNoBucketsReturnsNull() throws Exception {
+ TableCommand.TableCommandOptions opts =
+ new TableCommand.TableCommandOptions(
+ new String[] {
+ "--bootstrap-server",
+ "localhost:9123",
+ "--create",
+ "--table",
+ "mydb.users",
+ "--schema",
+ "id INT"
+ });
+
+ assertThat(opts.buckets()).isNull();
+ }
+
+ @Test
+ void testMainNoExitReturnsOneOnError() {
+ int exitCode = TableCommand.mainNoExit(new String[] {});
+ assertThat(exitCode).isEqualTo(1);
+ }
+}
diff --git a/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java b/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java
index 2990054999..d4d8b86d06 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java
@@ -291,6 +291,10 @@ private static List toBucketLocations(
for (int i = 0; i < replicas.length; i++) {
replicas[i] = pbBucketMetadata.getReplicaIdAt(i);
}
+ int[] isr = new int[pbBucketMetadata.getIsrsCount()];
+ for (int i = 0; i < isr.length; i++) {
+ isr[i] = pbBucketMetadata.getIsrAt(i);
+ }
Integer leader = null;
if (pbBucketMetadata.hasLeaderId()) {
leader = pbBucketMetadata.getLeaderId();
@@ -298,7 +302,7 @@ private static List toBucketLocations(
PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tablePath, partitionName);
BucketLocation bucketLocation =
- new BucketLocation(physicalTablePath, tableBucket, leader, replicas);
+ new BucketLocation(physicalTablePath, tableBucket, leader, replicas, isr);
bucketLocations.add(bucketLocation);
}
return bucketLocations;
diff --git a/fluss-common/src/main/java/org/apache/fluss/cluster/BucketLocation.java b/fluss-common/src/main/java/org/apache/fluss/cluster/BucketLocation.java
index 36977136de..6977c6139a 100644
--- a/fluss-common/src/main/java/org/apache/fluss/cluster/BucketLocation.java
+++ b/fluss-common/src/main/java/org/apache/fluss/cluster/BucketLocation.java
@@ -33,6 +33,7 @@ public final class BucketLocation {
private final TableBucket tableBucket;
@Nullable private Integer leader;
private final int[] replicas;
+ private final int[] isr;
public BucketLocation(
PhysicalTablePath physicalTablePath,
@@ -40,7 +41,7 @@ public BucketLocation(
int bucketId,
@Nullable Integer leader,
int[] replicas) {
- this(physicalTablePath, new TableBucket(tableId, bucketId), leader, replicas);
+ this(physicalTablePath, new TableBucket(tableId, bucketId), leader, replicas, new int[0]);
}
public BucketLocation(
@@ -48,10 +49,20 @@ public BucketLocation(
TableBucket tableBucket,
@Nullable Integer leader,
int[] replicas) {
+ this(physicalTablePath, tableBucket, leader, replicas, new int[0]);
+ }
+
+ public BucketLocation(
+ PhysicalTablePath physicalTablePath,
+ TableBucket tableBucket,
+ @Nullable Integer leader,
+ int[] replicas,
+ int[] isr) {
this.physicalTablePath = physicalTablePath;
this.tableBucket = tableBucket;
this.leader = leader;
this.replicas = replicas;
+ this.isr = isr;
}
public PhysicalTablePath getPhysicalTablePath() {
@@ -79,6 +90,10 @@ public int[] getReplicas() {
return replicas;
}
+ public int[] getIsr() {
+ return isr;
+ }
+
@Override
public boolean equals(Object object) {
if (this == object) {
@@ -91,22 +106,29 @@ public boolean equals(Object object) {
return Objects.equals(physicalTablePath, that.physicalTablePath)
&& Objects.equals(tableBucket, that.tableBucket)
&& Objects.equals(leader, that.leader)
- && Objects.deepEquals(replicas, that.replicas);
+ && Objects.deepEquals(replicas, that.replicas)
+ && Objects.deepEquals(isr, that.isr);
}
@Override
public int hashCode() {
- return Objects.hash(physicalTablePath, tableBucket, leader, Arrays.hashCode(replicas));
+ return Objects.hash(
+ physicalTablePath,
+ tableBucket,
+ leader,
+ Arrays.hashCode(replicas),
+ Arrays.hashCode(isr));
}
@Override
public String toString() {
return String.format(
- "Bucket(physicalTablePath = %s, %s, leader = %s, replicas = %s)",
+ "Bucket(physicalTablePath = %s, %s, leader = %s, replicas = %s, isr = %s)",
physicalTablePath,
tableBucket,
leader == null ? "none" : leader,
- formatNodeIds(replicas));
+ formatNodeIds(replicas),
+ formatNodeIds(isr));
}
/** Format the node ids from each item in the array for display. */
diff --git a/fluss-common/src/test/java/org/apache/fluss/cluster/BucketLocationTest.java b/fluss-common/src/test/java/org/apache/fluss/cluster/BucketLocationTest.java
index 9e494829c6..32ee31f217 100644
--- a/fluss-common/src/test/java/org/apache/fluss/cluster/BucketLocationTest.java
+++ b/fluss-common/src/test/java/org/apache/fluss/cluster/BucketLocationTest.java
@@ -47,7 +47,7 @@ void testToString() {
assertThat(bucketLocation.toString())
.isEqualTo(
"Bucket(physicalTablePath = test_db.test_table, TableBucket{tableId=150001, bucket=0}, "
- + "leader = 0, replicas = [0,1,2])");
+ + "leader = 0, replicas = [0,1,2], isr = [])");
bucketLocation =
new BucketLocation(
@@ -55,6 +55,6 @@ void testToString() {
assertThat(bucketLocation.toString())
.isEqualTo(
"Bucket(physicalTablePath = test_db.test_table, TableBucket{tableId=150001, bucket=0}, "
- + "leader = none, replicas = [0,1,2])");
+ + "leader = none, replicas = [0,1,2], isr = [])");
}
}
diff --git a/fluss-dist/pom.xml b/fluss-dist/pom.xml
index 117f0a6469..eca37bd1fb 100644
--- a/fluss-dist/pom.xml
+++ b/fluss-dist/pom.xml
@@ -39,6 +39,18 @@
provided
+
+ org.apache.fluss
+ fluss-cli
+ ${project.version}
+ provided
+
+
+
+ commons-cli
+ commons-cli
+
+
org.apache.fluss
diff --git a/fluss-dist/src/main/assemblies/bin.xml b/fluss-dist/src/main/assemblies/bin.xml
index 3c1d879f08..5bbc1354d0 100644
--- a/fluss-dist/src/main/assemblies/bin.xml
+++ b/fluss-dist/src/main/assemblies/bin.xml
@@ -41,6 +41,7 @@
org.apache.logging.log4j:log4j-core
org.apache.logging.log4j:log4j-slf4j-impl
org.apache.logging.log4j:log4j-1.2-api
+ commons-cli:commons-cli
@@ -56,6 +57,22 @@
0644
+
+
+ ../fluss-cli/target/fluss-cli-${project.version}.jar
+ lib/
+ fluss-cli-${project.version}.jar
+ 0644
+
+
+
+
+ ../fluss-client/target/fluss-client-${project.version}.jar
+ lib/
+ fluss-client-${project.version}.jar
+ 0644
+
+
src/main/resources/server.yaml
diff --git a/fluss-dist/src/main/resources/bin/fluss-database.sh b/fluss-dist/src/main/resources/bin/fluss-database.sh
new file mode 100755
index 0000000000..51bb5543b4
--- /dev/null
+++ b/fluss-dist/src/main/resources/bin/fluss-database.sh
@@ -0,0 +1,20 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+exec "`dirname "$0"`"/fluss-run-class.sh org.apache.fluss.cli.DatabaseCommand "$@"
diff --git a/fluss-dist/src/main/resources/bin/fluss-run-class.sh b/fluss-dist/src/main/resources/bin/fluss-run-class.sh
new file mode 100755
index 0000000000..2d3144d9fe
--- /dev/null
+++ b/fluss-dist/src/main/resources/bin/fluss-run-class.sh
@@ -0,0 +1,47 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Launches a Java class with the Fluss classpath.
+# Usage: fluss-run-class.sh [opts]
+
+if [ $# -lt 1 ]; then
+ echo "USAGE: $0 classname [opts]"
+ exit 1
+fi
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+FLUSS_CLASSPATH=`constructFlussClassPath`
+
+CLASS_TO_RUN=$1
+shift
+
+CLI_LOG_CONF="${FLUSS_CONF_DIR}/log4j-cli.properties"
+if [ -f "$CLI_LOG_CONF" ]; then
+ log_setting=("-Dlog4j.configuration=file:${CLI_LOG_CONF}" "-Dlog4j.configurationFile=file:${CLI_LOG_CONF}")
+else
+ log_setting=("-Dlog4j.configuration=file:${FLUSS_CONF_DIR}/log4j-console.properties" "-Dlog4j.configurationFile=file:${FLUSS_CONF_DIR}/log4j-console.properties")
+fi
+
+FLUSS_ENV_JAVA_OPTS=$(eval echo ${FLUSS_ENV_JAVA_OPTS})
+
+exec "$JAVA_RUN" $JVM_ARGS ${FLUSS_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLUSS_CLASSPATH"`" ${CLASS_TO_RUN} "$@"
diff --git a/fluss-dist/src/main/resources/bin/fluss-table.sh b/fluss-dist/src/main/resources/bin/fluss-table.sh
new file mode 100755
index 0000000000..69b309e61c
--- /dev/null
+++ b/fluss-dist/src/main/resources/bin/fluss-table.sh
@@ -0,0 +1,20 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+exec "`dirname "$0"`"/fluss-run-class.sh org.apache.fluss.cli.TableCommand "$@"
diff --git a/fluss-dist/src/main/resources/conf/log4j-cli.properties b/fluss-dist/src/main/resources/conf/log4j-cli.properties
new file mode 100644
index 0000000000..3555f363d8
--- /dev/null
+++ b/fluss-dist/src/main/resources/conf/log4j-cli.properties
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+rootLogger.level=WARN
+rootLogger.appenderRef.console.ref=ConsoleAppender
+
+appender.console.name=ConsoleAppender
+appender.console.type=CONSOLE
+appender.console.layout.type=PatternLayout
+appender.console.layout.pattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %c - %m%n
diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto
index e8381215fc..8d02f97718 100644
--- a/fluss-rpc/src/main/proto/FlussApi.proto
+++ b/fluss-rpc/src/main/proto/FlussApi.proto
@@ -830,8 +830,8 @@ message PbBucketMetadata {
// optional as some time the leader may not elected yet
optional int32 leader_id = 2;
repeated int32 replica_id = 3 [packed = true];
- // TODO: Add isr here.
optional int32 leader_epoch = 4;
+ repeated int32 isr = 5 [packed = true];
}
message PbProduceLogReqForBucket {
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java
index 3441053671..37fe669c6c 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java
@@ -323,6 +323,8 @@ public void addUpdateMetadataRequestForTabletServers(
Integer leaderEpoch =
bucketLeaderAndIsr.map(LeaderAndIsr::leaderEpoch).orElse(null);
Integer leader = bucketLeaderAndIsr.map(LeaderAndIsr::leader).orElse(null);
+ List isr =
+ bucketLeaderAndIsr.map(LeaderAndIsr::isr).orElse(Collections.emptyList());
if (currentPartitionId == null) {
Map> tableAssignment =
coordinatorContext.getTableAssignment(currentTableId);
@@ -331,7 +333,8 @@ public void addUpdateMetadataRequestForTabletServers(
tableBucket.getBucket(),
leader,
leaderEpoch,
- tableAssignment.get(tableBucket.getBucket()));
+ tableAssignment.get(tableBucket.getBucket()),
+ isr);
updateMetadataRequestBucketMap
.computeIfAbsent(currentTableId, k -> new ArrayList<>())
.add(bucketMetadata);
@@ -345,7 +348,8 @@ public void addUpdateMetadataRequestForTabletServers(
tableBucket.getBucket(),
leader,
leaderEpoch,
- partitionAssignment.get(tableBucket.getBucket()));
+ partitionAssignment.get(tableBucket.getBucket()),
+ isr);
updateMetadataRequestPartitionMap
.computeIfAbsent(currentTableId, k -> new HashMap<>())
.computeIfAbsent(tableBucket.getPartitionId(), k -> new ArrayList<>())
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metadata/BucketMetadata.java b/fluss-server/src/main/java/org/apache/fluss/server/metadata/BucketMetadata.java
index b6b968f28e..9bc09ece8f 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/metadata/BucketMetadata.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/metadata/BucketMetadata.java
@@ -19,6 +19,7 @@
import javax.annotation.Nullable;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.OptionalInt;
@@ -29,16 +30,27 @@ public class BucketMetadata {
private final @Nullable Integer leaderId;
private final @Nullable Integer leaderEpoch;
private final List replicas;
+ private final List isr;
public BucketMetadata(
int bucketId,
@Nullable Integer leaderId,
@Nullable Integer leaderEpoch,
List replicas) {
+ this(bucketId, leaderId, leaderEpoch, replicas, Collections.emptyList());
+ }
+
+ public BucketMetadata(
+ int bucketId,
+ @Nullable Integer leaderId,
+ @Nullable Integer leaderEpoch,
+ List replicas,
+ List isr) {
this.bucketId = bucketId;
this.leaderId = leaderId;
this.leaderEpoch = leaderEpoch;
this.replicas = replicas;
+ this.isr = isr;
}
public int getBucketId() {
@@ -57,6 +69,10 @@ public List getReplicas() {
return replicas;
}
+ public List getIsr() {
+ return isr;
+ }
+
@Override
public String toString() {
return "BucketMetadata{"
@@ -68,6 +84,8 @@ public String toString() {
+ leaderEpoch
+ ", replicas="
+ replicas
+ + ", isr="
+ + isr
+ '}';
}
@@ -83,11 +101,12 @@ public boolean equals(Object o) {
return bucketId == that.bucketId
&& Objects.equals(leaderId, that.leaderId)
&& Objects.equals(leaderEpoch, that.leaderEpoch)
- && replicas.equals(that.replicas);
+ && replicas.equals(that.replicas)
+ && isr.equals(that.isr);
}
@Override
public int hashCode() {
- return Objects.hash(bucketId, leaderId, leaderEpoch, replicas);
+ return Objects.hash(bucketId, leaderId, leaderEpoch, replicas, isr);
}
}
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metadata/CoordinatorMetadataProvider.java b/fluss-server/src/main/java/org/apache/fluss/server/metadata/CoordinatorMetadataProvider.java
index 27511d0540..a1b06218c7 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/metadata/CoordinatorMetadataProvider.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/metadata/CoordinatorMetadataProvider.java
@@ -30,6 +30,7 @@
import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -136,12 +137,15 @@ private static List getBucketMetadataFromContext(
TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId);
Optional optLeaderAndIsr = ctx.getBucketLeaderAndIsr(tableBucket);
Integer leader = optLeaderAndIsr.map(LeaderAndIsr::leader).orElse(null);
+ List isr =
+ optLeaderAndIsr.map(LeaderAndIsr::isr).orElse(Collections.emptyList());
BucketMetadata bucketMetadata =
new BucketMetadata(
bucketId,
leader,
ctx.getBucketLeaderEpoch(tableBucket),
- serverIds);
+ serverIds,
+ isr);
bucketMetadataList.add(bucketMetadata);
});
return bucketMetadataList;
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
index f7a89828ab..b0d7c0ea8c 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
@@ -618,6 +618,10 @@ private static List toPbBucketMetadata(
pbBucketMetadata.addReplicaId(replica);
}
+ for (Integer isrNode : bucketMetadata.getIsr()) {
+ pbBucketMetadata.addIsr(isrNode);
+ }
+
pbBucketMetadataList.add(pbBucketMetadata);
}
return pbBucketMetadataList;
@@ -658,7 +662,8 @@ private static BucketMetadata toBucketMetadata(PbBucketMetadata pbBucketMetadata
pbBucketMetadata.hasLeaderEpoch() ? pbBucketMetadata.getLeaderEpoch() : null,
Arrays.stream(pbBucketMetadata.getReplicaIds())
.boxed()
- .collect(Collectors.toList()));
+ .collect(Collectors.toList()),
+ Arrays.stream(pbBucketMetadata.getIsrs()).boxed().collect(Collectors.toList()));
}
private static PartitionMetadata toPartitionMetadata(PbPartitionMetadata pbPartitionMetadata) {
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
index cb34a2f2ed..284d014825 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
@@ -1706,7 +1706,8 @@ private BucketMetadata createBucketMetadata(
Integer leader = leaderAndIsr != null ? leaderAndIsr.leader() : null;
Integer leaderEpoch = leaderAndIsr != null ? leaderAndIsr.leaderEpoch() : null;
List replicas = assignment.getBucketAssignments().get(bucketId).getReplicas();
- return new BucketMetadata(bucketId, leader, leaderEpoch, replicas);
+ List isr = leaderAndIsr != null ? leaderAndIsr.isr() : Collections.emptyList();
+ return new BucketMetadata(bucketId, leader, leaderEpoch, replicas, isr);
}
/** Close the underlying ZooKeeperClient. */
diff --git a/pom.xml b/pom.xml
index dc5ec59d62..a2e46d7e65 100644
--- a/pom.xml
+++ b/pom.xml
@@ -66,6 +66,7 @@
fluss-jmh
fluss-lake
fluss-kafka
+ fluss-cli
fluss-docgen
tools/ci/fluss-ci-tools
@@ -440,6 +441,12 @@
${assertj.version}
test
+
+
+ commons-cli
+ commons-cli
+ 1.5.0
+