From 51bb2349400ed0c638e57182070ab0398f128f0c Mon Sep 17 00:00:00 2001 From: Yang Guo Date: Tue, 19 May 2026 17:56:10 +0800 Subject: [PATCH 01/11] [cli] Add fluss-cli Maven module skeleton --- fluss-cli/pom.xml | 70 ++++++++++++++++++++++++++++++++++++++++++++++ fluss-dist/pom.xml | 7 +++++ pom.xml | 7 +++++ 3 files changed, 84 insertions(+) create mode 100644 fluss-cli/pom.xml diff --git a/fluss-cli/pom.xml b/fluss-cli/pom.xml new file mode 100644 index 0000000000..8353116736 --- /dev/null +++ b/fluss-cli/pom.xml @@ -0,0 +1,70 @@ + + + + 4.0.0 + + org.apache.fluss + fluss + 1.0-SNAPSHOT + + + fluss-cli + Fluss : CLI + jar + + + + org.apache.fluss + fluss-client + ${project.version} + + + + commons-cli + commons-cli + + + + org.slf4j + slf4j-api + + + + + org.apache.fluss + fluss-test-utils + ${project.version} + test + + + + org.junit.jupiter + junit-jupiter + test + + + + org.assertj + assertj-core + test + + + diff --git a/fluss-dist/pom.xml b/fluss-dist/pom.xml index 117f0a6469..e6ea5219d0 100644 --- a/fluss-dist/pom.xml +++ b/fluss-dist/pom.xml @@ -39,6 +39,13 @@ provided + + org.apache.fluss + fluss-cli + ${project.version} + provided + + org.apache.fluss diff --git a/pom.xml b/pom.xml index dc5ec59d62..a2e46d7e65 100644 --- a/pom.xml +++ b/pom.xml @@ -66,6 +66,7 @@ fluss-jmh fluss-lake fluss-kafka + fluss-cli fluss-docgen tools/ci/fluss-ci-tools @@ -440,6 +441,12 @@ ${assertj.version} test + + + commons-cli + commons-cli + 1.5.0 + From 22963a398231f8883bcd951edf6a6eb11908e02b Mon Sep 17 00:00:00 2001 From: Yang Guo Date: Tue, 19 May 2026 18:03:20 +0800 Subject: [PATCH 02/11] [cli] Add CommandDefaultOptions base class for CLI option parsing --- .../fluss/cli/CommandDefaultOptions.java | 117 ++++++++++++++++++ .../fluss/cli/CommandDefaultOptionsTest.java | 100 +++++++++++++++ 2 files changed, 217 insertions(+) create mode 100644 fluss-cli/src/main/java/org/apache/fluss/cli/CommandDefaultOptions.java create mode 100644 fluss-cli/src/test/java/org/apache/fluss/cli/CommandDefaultOptionsTest.java diff --git a/fluss-cli/src/main/java/org/apache/fluss/cli/CommandDefaultOptions.java b/fluss-cli/src/main/java/org/apache/fluss/cli/CommandDefaultOptions.java new file mode 100644 index 0000000000..102e110954 --- /dev/null +++ b/fluss-cli/src/main/java/org/apache/fluss/cli/CommandDefaultOptions.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.cli; + +import org.apache.fluss.annotation.Internal; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; + +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Properties; + +/** + * Base class for CLI command option parsers. Provides common options shared by all CLI tools: + * {@code --help}, {@code --version}, {@code --bootstrap-server}, {@code --command-config}. + */ +@Internal +public class CommandDefaultOptions { + + private static final Option HELP_OPT = + Option.builder().longOpt("help").desc("Print usage information.").build(); + + private static final Option VERSION_OPT = + Option.builder().longOpt("version").desc("Display Fluss version.").build(); + + private static final Option BOOTSTRAP_SERVER_OPT = + Option.builder() + .longOpt("bootstrap-server") + .hasArg() + .argName("server") + .desc("REQUIRED: The Fluss server to connect to.") + .build(); + + private static final Option COMMAND_CONFIG_OPT = + Option.builder() + .longOpt("command-config") + .hasArg() + .argName("file") + .desc("Property file containing configs to be passed to the client.") + .build(); + + protected final Options options; + protected CommandLine commandLine; + + protected CommandDefaultOptions() { + this.options = new Options(); + options.addOption(HELP_OPT); + options.addOption(VERSION_OPT); + options.addOption(BOOTSTRAP_SERVER_OPT); + options.addOption(COMMAND_CONFIG_OPT); + } + + protected void parse(String[] args) throws ParseException { + this.commandLine = new DefaultParser().parse(options, args); + if (!hasHelpOption() && !hasVersionOption() && bootstrapServer() == null) { + throw new ParseException("Missing required option: --bootstrap-server"); + } + } + + /** Returns {@code true} if the {@code --help} flag was specified. */ + public boolean hasHelpOption() { + return commandLine.hasOption(HELP_OPT); + } + + /** Returns {@code true} if the {@code --version} flag was specified. */ + public boolean hasVersionOption() { + return commandLine.hasOption(VERSION_OPT); + } + + /** Returns the value of {@code --bootstrap-server}, or {@code null} if not specified. */ + public String bootstrapServer() { + return commandLine.getOptionValue(BOOTSTRAP_SERVER_OPT); + } + + /** + * Loads the properties file specified by {@code --command-config}. Returns an empty {@link + * Properties} instance if the option was not specified. + */ + public Properties commandConfig() { + Properties props = new Properties(); + String file = commandLine.getOptionValue(COMMAND_CONFIG_OPT); + if (file == null) { + return props; + } + try (FileInputStream fis = new FileInputStream(file)) { + props.load(fis); + } catch (IOException e) { + throw new IllegalArgumentException("Failed to load command config file: " + file, e); + } + return props; + } + + /** Returns the configured {@link Options} for generating help text. */ + public Options options() { + return options; + } +} diff --git a/fluss-cli/src/test/java/org/apache/fluss/cli/CommandDefaultOptionsTest.java b/fluss-cli/src/test/java/org/apache/fluss/cli/CommandDefaultOptionsTest.java new file mode 100644 index 0000000000..4fb2a19143 --- /dev/null +++ b/fluss-cli/src/test/java/org/apache/fluss/cli/CommandDefaultOptionsTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.cli; + +import org.apache.commons.cli.ParseException; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Properties; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link CommandDefaultOptions}. */ +class CommandDefaultOptionsTest { + + @Test + void testParseBootstrapServer() throws ParseException { + CommandDefaultOptions opts = new CommandDefaultOptions(); + opts.parse(new String[] {"--bootstrap-server", "localhost:9123"}); + + assertThat(opts.bootstrapServer()).isEqualTo("localhost:9123"); + assertThat(opts.hasHelpOption()).isFalse(); + assertThat(opts.hasVersionOption()).isFalse(); + } + + @Test + void testMissingBootstrapServerThrows() { + CommandDefaultOptions opts = new CommandDefaultOptions(); + + assertThatThrownBy(() -> opts.parse(new String[] {"--list"})) + .isInstanceOf(ParseException.class); + } + + @Test + void testHelpOption() throws ParseException { + CommandDefaultOptions opts = new CommandDefaultOptions(); + opts.parse(new String[] {"--help"}); + + assertThat(opts.hasHelpOption()).isTrue(); + } + + @Test + void testVersionOption() throws ParseException { + CommandDefaultOptions opts = new CommandDefaultOptions(); + opts.parse(new String[] {"--version"}); + + assertThat(opts.hasVersionOption()).isTrue(); + } + + @Test + void testCommandConfigLoadsProperties(@TempDir Path tempDir) + throws ParseException, IOException { + Path configFile = tempDir.resolve("client.properties"); + Properties props = new Properties(); + props.setProperty("security.protocol", "SASL_SSL"); + try (FileOutputStream fos = new FileOutputStream(configFile.toFile())) { + props.store(fos, null); + } + + CommandDefaultOptions opts = new CommandDefaultOptions(); + opts.parse( + new String[] { + "--bootstrap-server", + "localhost:9123", + "--command-config", + configFile.toString() + }); + + Properties loaded = opts.commandConfig(); + assertThat(loaded.getProperty("security.protocol")).isEqualTo("SASL_SSL"); + } + + @Test + void testCommandConfigDefaultsToEmptyProperties() throws ParseException { + CommandDefaultOptions opts = new CommandDefaultOptions(); + opts.parse(new String[] {"--bootstrap-server", "localhost:9123"}); + + assertThat(opts.commandConfig()).isEmpty(); + } +} From 4f748f6c5970cf1efde15758bf86a7b8b7927931 Mon Sep 17 00:00:00 2001 From: Yang Guo Date: Tue, 19 May 2026 18:06:47 +0800 Subject: [PATCH 03/11] [cli] Add CommandUtils shared utilities --- .../org/apache/fluss/cli/CommandUtils.java | 109 ++++++++++++++++ .../apache/fluss/cli/CommandUtilsTest.java | 121 ++++++++++++++++++ 2 files changed, 230 insertions(+) create mode 100644 fluss-cli/src/main/java/org/apache/fluss/cli/CommandUtils.java create mode 100644 fluss-cli/src/test/java/org/apache/fluss/cli/CommandUtilsTest.java diff --git a/fluss-cli/src/main/java/org/apache/fluss/cli/CommandUtils.java b/fluss-cli/src/main/java/org/apache/fluss/cli/CommandUtils.java new file mode 100644 index 0000000000..3837b90e58 --- /dev/null +++ b/fluss-cli/src/main/java/org/apache/fluss/cli/CommandUtils.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.cli; + +import org.apache.fluss.annotation.Internal; +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.config.Configuration; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +/** Shared utility methods for CLI commands. */ +@Internal +public final class CommandUtils { + + public static final long DEFAULT_TIMEOUT_MS = 30_000; + + private CommandUtils() {} + + /** Create a {@link Connection} from common CLI options. */ + public static Connection createConnection(CommandDefaultOptions opts) { + Configuration conf = new Configuration(); + conf.setString("bootstrap.servers", opts.bootstrapServer()); + Properties cmdConfig = opts.commandConfig(); + cmdConfig.forEach((k, v) -> conf.setString((String) k, (String) v)); + return ConnectionFactory.createConnection(conf); + } + + /** Validate exactly one action flag is specified. */ + public static void validateActions(CommandLine cmd, Option... actions) { + long count = Arrays.stream(actions).filter(cmd::hasOption).count(); + if (count != 1) { + String names = + Arrays.stream(actions) + .map(o -> "--" + o.getLongOpt()) + .collect(Collectors.joining(", ")); + throw new IllegalArgumentException("Exactly one of " + names + " must be specified."); + } + } + + /** Print help and exit. */ + public static void printUsageAndExit(Options options, String cmdName) { + HelpFormatter formatter = new HelpFormatter(); + formatter.setWidth(100); + formatter.printHelp(cmdName, options, true); + System.exit(0); + } + + /** Print version and exit. */ + public static void printVersionAndExit() { + String version = CommandUtils.class.getPackage().getImplementationVersion(); + if (version == null) { + version = "(version unknown)"; + } + System.out.println("Fluss CLI version " + version); + System.exit(0); + } + + /** Parse repeatable {@code --property key=value} options into a Map. */ + public static Map parseProperties(String[] values) { + Map props = new HashMap<>(); + if (values == null) { + return props; + } + for (String prop : values) { + String[] kv = prop.split("=", 2); + if (kv.length != 2) { + throw new IllegalArgumentException( + "Invalid property format: '" + prop + "'. Expected key=value."); + } + props.put(kv[0].trim(), kv[1].trim()); + } + return props; + } + + /** Unwrap {@link ExecutionException} to get the root cause message. */ + public static String unwrapExceptionMessage(Throwable e) { + if (e instanceof ExecutionException && e.getCause() != null) { + return e.getCause().getMessage(); + } + return e.getMessage(); + } +} diff --git a/fluss-cli/src/test/java/org/apache/fluss/cli/CommandUtilsTest.java b/fluss-cli/src/test/java/org/apache/fluss/cli/CommandUtilsTest.java new file mode 100644 index 0000000000..6bc0ab766d --- /dev/null +++ b/fluss-cli/src/test/java/org/apache/fluss/cli/CommandUtilsTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.cli; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.junit.jupiter.api.Test; + +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link CommandUtils}. */ +class CommandUtilsTest { + + @Test + void testParsePropertiesValid() { + Map props = + CommandUtils.parseProperties(new String[] {"key1=value1", "key2=value2"}); + + assertThat(props).hasSize(2); + assertThat(props.get("key1")).isEqualTo("value1"); + assertThat(props.get("key2")).isEqualTo("value2"); + } + + @Test + void testParsePropertiesWithEqualsInValue() { + Map props = CommandUtils.parseProperties(new String[] {"key=val=ue"}); + + assertThat(props.get("key")).isEqualTo("val=ue"); + } + + @Test + void testParsePropertiesNull() { + Map props = CommandUtils.parseProperties(null); + + assertThat(props).isEmpty(); + } + + @Test + void testParsePropertiesInvalidFormat() { + assertThatThrownBy(() -> CommandUtils.parseProperties(new String[] {"noequalssign"})) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Invalid property format"); + } + + @Test + void testValidateActionsExactlyOne() throws Exception { + Option createOpt = Option.builder().longOpt("create").build(); + Option listOpt = Option.builder().longOpt("list").build(); + Options opts = new Options(); + opts.addOption(createOpt); + opts.addOption(listOpt); + CommandLine cmd = new DefaultParser().parse(opts, new String[] {"--create"}); + + // should not throw + CommandUtils.validateActions(cmd, createOpt, listOpt); + } + + @Test + void testValidateActionsNoneThrows() throws Exception { + Option createOpt = Option.builder().longOpt("create").build(); + Option listOpt = Option.builder().longOpt("list").build(); + Options opts = new Options(); + opts.addOption(createOpt); + opts.addOption(listOpt); + CommandLine cmd = new DefaultParser().parse(opts, new String[] {}); + + assertThatThrownBy(() -> CommandUtils.validateActions(cmd, createOpt, listOpt)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Exactly one of"); + } + + @Test + void testValidateActionsMultipleThrows() throws Exception { + Option createOpt = Option.builder().longOpt("create").build(); + Option listOpt = Option.builder().longOpt("list").build(); + Options opts = new Options(); + opts.addOption(createOpt); + opts.addOption(listOpt); + CommandLine cmd = new DefaultParser().parse(opts, new String[] {"--create", "--list"}); + + assertThatThrownBy(() -> CommandUtils.validateActions(cmd, createOpt, listOpt)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Exactly one of"); + } + + @Test + void testUnwrapExecutionException() { + ExecutionException e = new ExecutionException(new RuntimeException("root cause")); + + assertThat(CommandUtils.unwrapExceptionMessage(e)).isEqualTo("root cause"); + } + + @Test + void testUnwrapRegularException() { + RuntimeException e = new RuntimeException("direct message"); + + assertThat(CommandUtils.unwrapExceptionMessage(e)).isEqualTo("direct message"); + } +} From 29509e8c9a9d75b95c6c331ad3ca281632c209d4 Mon Sep 17 00:00:00 2001 From: Yang Guo Date: Tue, 19 May 2026 18:36:00 +0800 Subject: [PATCH 04/11] [cli] Add SchemaParser for --schema string parsing --- .../org/apache/fluss/cli/SchemaParser.java | 107 ++++++++++++++ .../apache/fluss/cli/SchemaParserTest.java | 134 ++++++++++++++++++ 2 files changed, 241 insertions(+) create mode 100644 fluss-cli/src/main/java/org/apache/fluss/cli/SchemaParser.java create mode 100644 fluss-cli/src/test/java/org/apache/fluss/cli/SchemaParserTest.java diff --git a/fluss-cli/src/main/java/org/apache/fluss/cli/SchemaParser.java b/fluss-cli/src/main/java/org/apache/fluss/cli/SchemaParser.java new file mode 100644 index 0000000000..b4608c28d3 --- /dev/null +++ b/fluss-cli/src/main/java/org/apache/fluss/cli/SchemaParser.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.cli; + +import org.apache.fluss.annotation.Internal; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.types.DataType; +import org.apache.fluss.types.DataTypes; + +import java.util.ArrayList; +import java.util.List; + +/** + * Parses a schema definition string into a {@link Schema}. + * + *

Format: {@code "col1 TYPE1, col2 TYPE2 NOT NULL, col3 TYPE3"} + * + *

Uses parenthesis-aware splitting to handle types like {@code DECIMAL(10,2)}. Type strings + * (including nullability suffix) are parsed by {@link DataTypes#parse(String)}. + */ +@Internal +public final class SchemaParser { + + private SchemaParser() {} + + /** + * Parse a schema definition string into a Schema. + * + * @param schemaStr comma-separated column definitions + * @param primaryKeys primary key column names (empty list if no primary key) + * @return the parsed Schema + */ + public static Schema parseSchema(String schemaStr, List primaryKeys) { + if (schemaStr == null || schemaStr.trim().isEmpty()) { + throw new IllegalArgumentException("Schema definition cannot be empty."); + } + + Schema.Builder builder = Schema.newBuilder(); + List columnDefs = splitRespectingParentheses(schemaStr, ','); + + for (String colDef : columnDefs) { + String trimmed = colDef.trim(); + if (trimmed.isEmpty()) { + continue; + } + + int firstSpace = trimmed.indexOf(' '); + if (firstSpace < 0) { + throw new IllegalArgumentException( + "Invalid column definition: '" + + trimmed + + "'. Expected format: 'name TYPE [NOT NULL]'."); + } + + String name = trimmed.substring(0, firstSpace).trim(); + String typeStr = trimmed.substring(firstSpace + 1).trim(); + + DataType dataType = DataTypes.parse(typeStr); + builder.column(name, dataType); + } + + if (primaryKeys != null && !primaryKeys.isEmpty()) { + builder.primaryKey(primaryKeys); + } + + return builder.build(); + } + + /** + * Split a string by delimiter, skipping delimiters inside parentheses. Handles types like + * {@code DECIMAL(10,2)}. + */ + static List splitRespectingParentheses(String input, char delimiter) { + List result = new ArrayList<>(); + int depth = 0; + int start = 0; + for (int i = 0; i < input.length(); i++) { + char c = input.charAt(i); + if (c == '(') { + depth++; + } else if (c == ')') { + depth--; + } else if (c == delimiter && depth == 0) { + result.add(input.substring(start, i)); + start = i + 1; + } + } + result.add(input.substring(start)); + return result; + } +} diff --git a/fluss-cli/src/test/java/org/apache/fluss/cli/SchemaParserTest.java b/fluss-cli/src/test/java/org/apache/fluss/cli/SchemaParserTest.java new file mode 100644 index 0000000000..9793a3397c --- /dev/null +++ b/fluss-cli/src/test/java/org/apache/fluss/cli/SchemaParserTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.cli; + +import org.apache.fluss.metadata.Schema; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link SchemaParser}. */ +class SchemaParserTest { + + @Test + void testSimpleColumns() { + Schema schema = SchemaParser.parseSchema("id INT, name STRING", Collections.emptyList()); + + List cols = schema.getColumns(); + assertThat(cols).hasSize(2); + assertThat(cols.get(0).getName()).isEqualTo("id"); + assertThat(cols.get(0).getDataType().isNullable()).isTrue(); + assertThat(cols.get(1).getName()).isEqualTo("name"); + assertThat(cols.get(1).getDataType().isNullable()).isTrue(); + } + + @Test + void testNotNullColumn() { + Schema schema = + SchemaParser.parseSchema("id INT NOT NULL, name STRING", Collections.emptyList()); + + List cols = schema.getColumns(); + assertThat(cols.get(0).getName()).isEqualTo("id"); + assertThat(cols.get(0).getDataType().isNullable()).isFalse(); + assertThat(cols.get(1).getDataType().isNullable()).isTrue(); + } + + @Test + void testDecimalWithCommaInParens() { + Schema schema = SchemaParser.parseSchema("amount DECIMAL(10,2)", Collections.emptyList()); + + List cols = schema.getColumns(); + assertThat(cols).hasSize(1); + assertThat(cols.get(0).getName()).isEqualTo("amount"); + assertThat(cols.get(0).getDataType().toString()).contains("DECIMAL"); + } + + @Test + void testMultipleNotNull() { + Schema schema = + SchemaParser.parseSchema( + "a INT NOT NULL, b STRING NOT NULL", Collections.emptyList()); + + List cols = schema.getColumns(); + assertThat(cols.get(0).getDataType().isNullable()).isFalse(); + assertThat(cols.get(1).getDataType().isNullable()).isFalse(); + } + + @Test + void testSingleColumn() { + Schema schema = SchemaParser.parseSchema("id BIGINT", Collections.emptyList()); + + assertThat(schema.getColumns()).hasSize(1); + assertThat(schema.getColumns().get(0).getName()).isEqualTo("id"); + } + + @Test + void testExtraWhitespace() { + Schema schema = + SchemaParser.parseSchema(" id INT , name STRING ", Collections.emptyList()); + + List cols = schema.getColumns(); + assertThat(cols).hasSize(2); + assertThat(cols.get(0).getName()).isEqualTo("id"); + assertThat(cols.get(1).getName()).isEqualTo("name"); + } + + @Test + void testWithPrimaryKey() { + Schema schema = SchemaParser.parseSchema("id INT, name STRING", Arrays.asList("id")); + + assertThat(schema.getPrimaryKey()).isPresent(); + assertThat(schema.getPrimaryKey().get().getColumnNames()).containsExactly("id"); + } + + @Test + void testInvalidColumnDefinition() { + assertThatThrownBy(() -> SchemaParser.parseSchema("nospace", Collections.emptyList())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Invalid column definition"); + } + + @Test + void testUnknownTypeThrows() { + assertThatThrownBy(() -> SchemaParser.parseSchema("x FOOBAR", Collections.emptyList())) + .isInstanceOf(Exception.class); + } + + @Test + void testEmptySchemaThrows() { + assertThatThrownBy(() -> SchemaParser.parseSchema("", Collections.emptyList())) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void testAllSupportedTypes() { + String schema = + "a BOOLEAN, b TINYINT, c SMALLINT, d INT, e BIGINT, " + + "f FLOAT, g DOUBLE, h STRING, i DATE, j TIMESTAMP"; + Schema result = SchemaParser.parseSchema(schema, Collections.emptyList()); + + assertThat(result.getColumns()).hasSize(10); + } +} From 3d058f17877afc015af429475b5b194110cd34dc Mon Sep 17 00:00:00 2001 From: Yang Guo Date: Tue, 19 May 2026 18:40:13 +0800 Subject: [PATCH 05/11] [cli] Add DatabaseCommand for database management --- .../org/apache/fluss/cli/DatabaseCommand.java | 225 ++++++++++++++++++ .../apache/fluss/cli/DatabaseCommandTest.java | 104 ++++++++ 2 files changed, 329 insertions(+) create mode 100644 fluss-cli/src/main/java/org/apache/fluss/cli/DatabaseCommand.java create mode 100644 fluss-cli/src/test/java/org/apache/fluss/cli/DatabaseCommandTest.java diff --git a/fluss-cli/src/main/java/org/apache/fluss/cli/DatabaseCommand.java b/fluss-cli/src/main/java/org/apache/fluss/cli/DatabaseCommand.java new file mode 100644 index 0000000000..272a114422 --- /dev/null +++ b/fluss-cli/src/main/java/org/apache/fluss/cli/DatabaseCommand.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.cli; + +import org.apache.fluss.annotation.Internal; +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.admin.Admin; +import org.apache.fluss.metadata.DatabaseDescriptor; +import org.apache.fluss.metadata.DatabaseInfo; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.ParseException; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** CLI tool for managing Fluss databases. */ +@Internal +public class DatabaseCommand { + + public static void main(String[] args) { + System.exit(mainNoExit(args)); + } + + static int mainNoExit(String[] args) { + try { + execute(args); + return 0; + } catch (Throwable e) { + System.err.println( + "Error while executing database command: " + + CommandUtils.unwrapExceptionMessage(e)); + return 1; + } + } + + static void execute(String[] args) throws Exception { + DatabaseCommandOptions opts = new DatabaseCommandOptions(args); + if (opts.hasHelpOption()) { + CommandUtils.printUsageAndExit(opts.options(), "fluss-database.sh"); + } + if (opts.hasVersionOption()) { + CommandUtils.printVersionAndExit(); + } + CommandUtils.validateActions( + opts.commandLine(), opts.createOpt, opts.listOpt, opts.describeOpt, opts.dropOpt); + + try (DatabaseService service = new DatabaseService(opts)) { + if (opts.hasCreateOption()) { + service.createDatabase(opts); + } else if (opts.hasListOption()) { + service.listDatabases(); + } else if (opts.hasDescribeOption()) { + service.describeDatabase(opts); + } else if (opts.hasDropOption()) { + service.dropDatabase(opts); + } + } + } + + /** Service class that wraps the Admin client for database operations. */ + static class DatabaseService implements AutoCloseable { + private final Admin admin; + private final Connection connection; + + DatabaseService(DatabaseCommandOptions opts) { + this.connection = CommandUtils.createConnection(opts); + this.admin = connection.getAdmin(); + } + + void createDatabase(DatabaseCommandOptions opts) throws Exception { + String dbName = opts.database(); + Map props = + CommandUtils.parseProperties(opts.commandLine().getOptionValues("property")); + DatabaseDescriptor.Builder builder = DatabaseDescriptor.builder(); + props.forEach(builder::customProperty); + admin.createDatabase(dbName, builder.build(), opts.ifNotExists()) + .get(30, TimeUnit.SECONDS); + System.out.println("Created database \"" + dbName + "\"."); + } + + void listDatabases() throws Exception { + List databases = admin.listDatabases().get(30, TimeUnit.SECONDS); + databases.forEach(System.out::println); + } + + void describeDatabase(DatabaseCommandOptions opts) throws Exception { + String dbName = opts.database(); + DatabaseInfo info = admin.getDatabaseInfo(dbName).get(30, TimeUnit.SECONDS); + System.out.println("Database: " + dbName); + info.getDatabaseDescriptor() + .getComment() + .ifPresent(c -> System.out.println("Comment: " + c)); + Map props = info.getDatabaseDescriptor().getCustomProperties(); + System.out.println("Properties:"); + if (props.isEmpty()) { + System.out.println(" (none)"); + } else { + props.forEach((k, v) -> System.out.println(" " + k + " = " + v)); + } + } + + void dropDatabase(DatabaseCommandOptions opts) throws Exception { + String dbName = opts.database(); + admin.dropDatabase(dbName, opts.ifExists(), opts.cascade()).get(30, TimeUnit.SECONDS); + System.out.println("Dropped database \"" + dbName + "\"."); + } + + @Override + public void close() throws Exception { + connection.close(); + } + } + + /** CLI option parser for database commands. */ + static class DatabaseCommandOptions extends CommandDefaultOptions { + final Option createOpt = + Option.builder().longOpt("create").desc("Create a new database.").build(); + final Option listOpt = Option.builder().longOpt("list").desc("List all databases.").build(); + final Option describeOpt = + Option.builder().longOpt("describe").desc("Describe a database.").build(); + final Option dropOpt = Option.builder().longOpt("drop").desc("Drop a database.").build(); + final Option databaseOpt = + Option.builder() + .longOpt("database") + .hasArg() + .argName("name") + .desc("The database name.") + .build(); + final Option ifExistsOpt = + Option.builder() + .longOpt("if-exists") + .desc("Only execute if the database exists.") + .build(); + final Option ifNotExistsOpt = + Option.builder() + .longOpt("if-not-exists") + .desc("Only execute if the database does not exist.") + .build(); + final Option cascadeOpt = + Option.builder() + .longOpt("cascade") + .desc("Drop all tables in the database (for --drop).") + .build(); + final Option propertyOpt = + Option.builder() + .longOpt("property") + .hasArg() + .argName("key=value") + .desc("Database property (repeatable).") + .build(); + + DatabaseCommandOptions(String[] args) throws ParseException { + super(); + options.addOption(createOpt); + options.addOption(listOpt); + options.addOption(describeOpt); + options.addOption(dropOpt); + options.addOption(databaseOpt); + options.addOption(ifExistsOpt); + options.addOption(ifNotExistsOpt); + options.addOption(cascadeOpt); + options.addOption(propertyOpt); + parse(args); + } + + boolean hasCreateOption() { + return commandLine.hasOption(createOpt); + } + + boolean hasListOption() { + return commandLine.hasOption(listOpt); + } + + boolean hasDescribeOption() { + return commandLine.hasOption(describeOpt); + } + + boolean hasDropOption() { + return commandLine.hasOption(dropOpt); + } + + String database() { + String db = commandLine.getOptionValue(databaseOpt); + if (db == null) { + throw new IllegalArgumentException("--database is required."); + } + return db; + } + + boolean ifExists() { + return commandLine.hasOption(ifExistsOpt); + } + + boolean ifNotExists() { + return commandLine.hasOption(ifNotExistsOpt); + } + + boolean cascade() { + return commandLine.hasOption(cascadeOpt); + } + + CommandLine commandLine() { + return commandLine; + } + } +} diff --git a/fluss-cli/src/test/java/org/apache/fluss/cli/DatabaseCommandTest.java b/fluss-cli/src/test/java/org/apache/fluss/cli/DatabaseCommandTest.java new file mode 100644 index 0000000000..066cd473b2 --- /dev/null +++ b/fluss-cli/src/test/java/org/apache/fluss/cli/DatabaseCommandTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.cli; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link DatabaseCommand} option parsing. */ +class DatabaseCommandTest { + + @Test + void testParseCreateOptions() throws Exception { + DatabaseCommand.DatabaseCommandOptions opts = + new DatabaseCommand.DatabaseCommandOptions( + new String[] { + "--bootstrap-server", "localhost:9123", "--create", "--database", "mydb" + }); + + assertThat(opts.hasCreateOption()).isTrue(); + assertThat(opts.hasListOption()).isFalse(); + assertThat(opts.database()).isEqualTo("mydb"); + } + + @Test + void testParseListOptions() throws Exception { + DatabaseCommand.DatabaseCommandOptions opts = + new DatabaseCommand.DatabaseCommandOptions( + new String[] {"--bootstrap-server", "localhost:9123", "--list"}); + + assertThat(opts.hasListOption()).isTrue(); + } + + @Test + void testParseDropWithCascadeAndIfExists() throws Exception { + DatabaseCommand.DatabaseCommandOptions opts = + new DatabaseCommand.DatabaseCommandOptions( + new String[] { + "--bootstrap-server", + "localhost:9123", + "--drop", + "--database", + "mydb", + "--cascade", + "--if-exists" + }); + + assertThat(opts.hasDropOption()).isTrue(); + assertThat(opts.database()).isEqualTo("mydb"); + assertThat(opts.cascade()).isTrue(); + assertThat(opts.ifExists()).isTrue(); + } + + @Test + void testMissingDatabaseForCreateThrows() throws Exception { + DatabaseCommand.DatabaseCommandOptions opts = + new DatabaseCommand.DatabaseCommandOptions( + new String[] {"--bootstrap-server", "localhost:9123", "--create"}); + + assertThatThrownBy(opts::database) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("--database is required"); + } + + @Test + void testIfNotExistsOption() throws Exception { + DatabaseCommand.DatabaseCommandOptions opts = + new DatabaseCommand.DatabaseCommandOptions( + new String[] { + "--bootstrap-server", + "localhost:9123", + "--create", + "--database", + "mydb", + "--if-not-exists" + }); + + assertThat(opts.ifNotExists()).isTrue(); + } + + @Test + void testMainNoExitReturnsOneOnError() { + // No args at all -> parse error -> exit code 1 + int exitCode = DatabaseCommand.mainNoExit(new String[] {}); + assertThat(exitCode).isEqualTo(1); + } +} From f95e37973d86f0b03c44c4755f5c95bcfa62d515 Mon Sep 17 00:00:00 2001 From: Yang Guo Date: Tue, 19 May 2026 18:45:12 +0800 Subject: [PATCH 06/11] [cli] Add TableCommand for table management Implements TableCommand with create, list, describe, and drop operations for Fluss tables. Follows the same pattern as DatabaseCommand with TableCommandOptions for CLI option parsing and TableService for Admin client operations. --- .../org/apache/fluss/cli/TableCommand.java | 359 ++++++++++++++++++ .../apache/fluss/cli/TableCommandTest.java | 191 ++++++++++ 2 files changed, 550 insertions(+) create mode 100644 fluss-cli/src/main/java/org/apache/fluss/cli/TableCommand.java create mode 100644 fluss-cli/src/test/java/org/apache/fluss/cli/TableCommandTest.java diff --git a/fluss-cli/src/main/java/org/apache/fluss/cli/TableCommand.java b/fluss-cli/src/main/java/org/apache/fluss/cli/TableCommand.java new file mode 100644 index 0000000000..8aa948529f --- /dev/null +++ b/fluss-cli/src/main/java/org/apache/fluss/cli/TableCommand.java @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.cli; + +import org.apache.fluss.annotation.Internal; +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.admin.Admin; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.metadata.TablePath; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.ParseException; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +/** CLI tool for managing Fluss tables. */ +@Internal +public class TableCommand { + + public static void main(String[] args) { + System.exit(mainNoExit(args)); + } + + static int mainNoExit(String[] args) { + try { + execute(args); + return 0; + } catch (Throwable e) { + System.err.println( + "Error while executing table command: " + + CommandUtils.unwrapExceptionMessage(e)); + return 1; + } + } + + static void execute(String[] args) throws Exception { + TableCommandOptions opts = new TableCommandOptions(args); + if (opts.hasHelpOption()) { + CommandUtils.printUsageAndExit(opts.options(), "fluss-table.sh"); + } + if (opts.hasVersionOption()) { + CommandUtils.printVersionAndExit(); + } + CommandUtils.validateActions( + opts.commandLine(), opts.createOpt, opts.listOpt, opts.describeOpt, opts.dropOpt); + + try (TableService service = new TableService(opts)) { + if (opts.hasCreateOption()) { + service.createTable(opts); + } else if (opts.hasListOption()) { + service.listTables(opts); + } else if (opts.hasDescribeOption()) { + service.describeTable(opts); + } else if (opts.hasDropOption()) { + service.dropTable(opts); + } + } + } + + /** Service class that wraps the Admin client for table operations. */ + static class TableService implements AutoCloseable { + private final Admin admin; + private final Connection connection; + + TableService(TableCommandOptions opts) { + this.connection = CommandUtils.createConnection(opts); + this.admin = connection.getAdmin(); + } + + void createTable(TableCommandOptions opts) throws Exception { + TablePath tablePath = opts.tablePath(); + String schemaStr = opts.schema(); + List primaryKeys = opts.primaryKeys(); + + Schema schema = SchemaParser.parseSchema(schemaStr, primaryKeys); + + TableDescriptor.Builder builder = TableDescriptor.builder().schema(schema); + + List partitionKeys = opts.partitionKeys(); + if (!partitionKeys.isEmpty()) { + builder.partitionedBy(partitionKeys); + } + + Integer buckets = opts.buckets(); + if (buckets != null) { + builder.distributedBy(buckets); + } + + Map props = + CommandUtils.parseProperties(opts.commandLine().getOptionValues("property")); + props.forEach(builder::property); + + admin.createTable(tablePath, builder.build(), opts.ifNotExists()) + .get(30, TimeUnit.SECONDS); + System.out.println("Created table \"" + tablePath + "\"."); + } + + void listTables(TableCommandOptions opts) throws Exception { + String database = opts.database(); + List tables = admin.listTables(database).get(30, TimeUnit.SECONDS); + tables.forEach(System.out::println); + } + + void describeTable(TableCommandOptions opts) throws Exception { + TablePath tablePath = opts.tablePath(); + TableInfo tableInfo = admin.getTableInfo(tablePath).get(30, TimeUnit.SECONDS); + + Schema schema = tableInfo.getSchema(); + boolean hasPK = tableInfo.hasPrimaryKey(); + + System.out.println("Table: " + tablePath); + System.out.println("Type: " + (hasPK ? "PrimaryKey" : "Log")); + + System.out.println("Schema:"); + for (Schema.Column col : schema.getColumns()) { + System.out.println(" " + col.getName() + " " + col.getDataType()); + } + + if (hasPK) { + System.out.println("PrimaryKey: " + String.join(", ", tableInfo.getPrimaryKeys())); + } + + List partitionKeys = tableInfo.getPartitionKeys(); + if (!partitionKeys.isEmpty()) { + System.out.println("PartitionKeys: " + String.join(", ", partitionKeys)); + } + + int numBuckets = tableInfo.getNumBuckets(); + if (numBuckets > 0) { + System.out.println("Buckets: " + numBuckets); + } + + List bucketKeys = tableInfo.getBucketKeys(); + if (!bucketKeys.isEmpty()) { + System.out.println("BucketKeys: " + String.join(", ", bucketKeys)); + } + + Optional comment = tableInfo.getComment(); + if (comment.isPresent()) { + System.out.println("Comment: " + comment.get()); + } + + Configuration props = tableInfo.getProperties(); + Configuration customProps = tableInfo.getCustomProperties(); + Map propsMap = props.toMap(); + Map customPropsMap = customProps.toMap(); + if (!propsMap.isEmpty() || !customPropsMap.isEmpty()) { + System.out.println("Properties:"); + propsMap.forEach((k, v) -> System.out.println(" " + k + " = " + v)); + customPropsMap.forEach((k, v) -> System.out.println(" " + k + " = " + v)); + } + } + + void dropTable(TableCommandOptions opts) throws Exception { + TablePath tablePath = opts.tablePath(); + admin.dropTable(tablePath, opts.ifExists()).get(30, TimeUnit.SECONDS); + System.out.println("Dropped table \"" + tablePath + "\"."); + } + + @Override + public void close() throws Exception { + connection.close(); + } + } + + /** CLI option parser for table commands. */ + static class TableCommandOptions extends CommandDefaultOptions { + final Option createOpt = + Option.builder().longOpt("create").desc("Create a new table.").build(); + final Option listOpt = + Option.builder().longOpt("list").desc("List all tables in a database.").build(); + final Option describeOpt = + Option.builder().longOpt("describe").desc("Describe a table.").build(); + final Option dropOpt = Option.builder().longOpt("drop").desc("Drop a table.").build(); + final Option databaseOpt = + Option.builder() + .longOpt("database") + .hasArg() + .argName("name") + .desc("The database name (for --list).") + .build(); + final Option tableOpt = + Option.builder() + .longOpt("table") + .hasArg() + .argName("db.table") + .desc("The table path in format 'database.table'.") + .build(); + final Option schemaOpt = + Option.builder() + .longOpt("schema") + .hasArg() + .argName("definition") + .desc("Column definitions: 'col1 INT, col2 STRING NOT NULL, ...'") + .build(); + final Option primaryKeyOpt = + Option.builder() + .longOpt("primary-key") + .hasArg() + .argName("columns") + .desc("Primary key columns (comma-separated).") + .build(); + final Option partitionByOpt = + Option.builder() + .longOpt("partition-by") + .hasArg() + .argName("columns") + .desc("Partition columns (comma-separated).") + .build(); + final Option bucketsOpt = + Option.builder() + .longOpt("buckets") + .hasArg() + .argName("num") + .desc("Number of buckets.") + .build(); + final Option propertyOpt = + Option.builder() + .longOpt("property") + .hasArg() + .argName("key=value") + .desc("Table property (repeatable).") + .build(); + final Option ifExistsOpt = + Option.builder() + .longOpt("if-exists") + .desc("Only execute if the table exists.") + .build(); + final Option ifNotExistsOpt = + Option.builder() + .longOpt("if-not-exists") + .desc("Only execute if the table does not exist.") + .build(); + + TableCommandOptions(String[] args) throws ParseException { + super(); + options.addOption(createOpt); + options.addOption(listOpt); + options.addOption(describeOpt); + options.addOption(dropOpt); + options.addOption(databaseOpt); + options.addOption(tableOpt); + options.addOption(schemaOpt); + options.addOption(primaryKeyOpt); + options.addOption(partitionByOpt); + options.addOption(bucketsOpt); + options.addOption(propertyOpt); + options.addOption(ifExistsOpt); + options.addOption(ifNotExistsOpt); + parse(args); + } + + boolean hasCreateOption() { + return commandLine.hasOption(createOpt); + } + + boolean hasListOption() { + return commandLine.hasOption(listOpt); + } + + boolean hasDescribeOption() { + return commandLine.hasOption(describeOpt); + } + + boolean hasDropOption() { + return commandLine.hasOption(dropOpt); + } + + TablePath tablePath() { + String table = commandLine.getOptionValue(tableOpt); + if (table == null) { + throw new IllegalArgumentException("--table is required."); + } + String[] parts = table.split("\\.", 2); + if (parts.length != 2 || parts[0].isEmpty() || parts[1].isEmpty()) { + throw new IllegalArgumentException( + "Invalid table path: '" + table + "'. Expected format: 'database.table'."); + } + return TablePath.of(parts[0], parts[1]); + } + + String database() { + String db = commandLine.getOptionValue(databaseOpt); + if (db == null) { + throw new IllegalArgumentException("--database is required."); + } + return db; + } + + String schema() { + String s = commandLine.getOptionValue(schemaOpt); + if (s == null) { + throw new IllegalArgumentException("--schema is required for --create."); + } + return s; + } + + List primaryKeys() { + String pk = commandLine.getOptionValue(primaryKeyOpt); + if (pk == null) { + return Collections.emptyList(); + } + return Arrays.asList(pk.split(",")); + } + + List partitionKeys() { + String pb = commandLine.getOptionValue(partitionByOpt); + if (pb == null) { + return Collections.emptyList(); + } + return Arrays.asList(pb.split(",")); + } + + Integer buckets() { + String b = commandLine.getOptionValue(bucketsOpt); + if (b == null) { + return null; + } + return Integer.parseInt(b); + } + + boolean ifExists() { + return commandLine.hasOption(ifExistsOpt); + } + + boolean ifNotExists() { + return commandLine.hasOption(ifNotExistsOpt); + } + + CommandLine commandLine() { + return commandLine; + } + } +} diff --git a/fluss-cli/src/test/java/org/apache/fluss/cli/TableCommandTest.java b/fluss-cli/src/test/java/org/apache/fluss/cli/TableCommandTest.java new file mode 100644 index 0000000000..61f0ac98ad --- /dev/null +++ b/fluss-cli/src/test/java/org/apache/fluss/cli/TableCommandTest.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.cli; + +import org.apache.fluss.metadata.TablePath; + +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link TableCommand} option parsing. */ +class TableCommandTest { + + @Test + void testParseCreateOptions() throws Exception { + TableCommand.TableCommandOptions opts = + new TableCommand.TableCommandOptions( + new String[] { + "--bootstrap-server", + "localhost:9123", + "--create", + "--table", + "mydb.users", + "--schema", + "id INT, name STRING", + "--primary-key", + "id", + "--buckets", + "4" + }); + + assertThat(opts.hasCreateOption()).isTrue(); + assertThat(opts.tablePath()).isEqualTo(TablePath.of("mydb", "users")); + assertThat(opts.schema()).isEqualTo("id INT, name STRING"); + assertThat(opts.primaryKeys()).containsExactly("id"); + assertThat(opts.buckets()).isEqualTo(4); + } + + @Test + void testParseListOptions() throws Exception { + TableCommand.TableCommandOptions opts = + new TableCommand.TableCommandOptions( + new String[] { + "--bootstrap-server", "localhost:9123", "--list", "--database", "mydb" + }); + + assertThat(opts.hasListOption()).isTrue(); + assertThat(opts.database()).isEqualTo("mydb"); + } + + @Test + void testTablePathParsing() throws Exception { + TableCommand.TableCommandOptions opts = + new TableCommand.TableCommandOptions( + new String[] { + "--bootstrap-server", + "localhost:9123", + "--describe", + "--table", + "mydb.orders" + }); + + TablePath path = opts.tablePath(); + assertThat(path.getDatabaseName()).isEqualTo("mydb"); + assertThat(path.getTableName()).isEqualTo("orders"); + } + + @Test + void testTablePathInvalidFormatNoDot() throws Exception { + TableCommand.TableCommandOptions opts = + new TableCommand.TableCommandOptions( + new String[] { + "--bootstrap-server", + "localhost:9123", + "--describe", + "--table", + "notable" + }); + + assertThatThrownBy(opts::tablePath) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Invalid table path"); + } + + @Test + void testTablePathInvalidEmptyParts() throws Exception { + TableCommand.TableCommandOptions opts = + new TableCommand.TableCommandOptions( + new String[] { + "--bootstrap-server", + "localhost:9123", + "--describe", + "--table", + ".table" + }); + + assertThatThrownBy(opts::tablePath) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Invalid table path"); + } + + @Test + void testMissingTableThrows() throws Exception { + TableCommand.TableCommandOptions opts = + new TableCommand.TableCommandOptions( + new String[] {"--bootstrap-server", "localhost:9123", "--describe"}); + + assertThatThrownBy(opts::tablePath) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("--table is required"); + } + + @Test + void testMissingSchemaForCreateThrows() throws Exception { + TableCommand.TableCommandOptions opts = + new TableCommand.TableCommandOptions( + new String[] { + "--bootstrap-server", + "localhost:9123", + "--create", + "--table", + "mydb.users" + }); + + assertThatThrownBy(opts::schema) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("--schema is required"); + } + + @Test + void testPartitionByOption() throws Exception { + TableCommand.TableCommandOptions opts = + new TableCommand.TableCommandOptions( + new String[] { + "--bootstrap-server", + "localhost:9123", + "--create", + "--table", + "mydb.orders", + "--schema", + "id INT, dt STRING", + "--partition-by", + "dt" + }); + + List keys = opts.partitionKeys(); + assertThat(keys).containsExactly("dt"); + } + + @Test + void testNoBucketsReturnsNull() throws Exception { + TableCommand.TableCommandOptions opts = + new TableCommand.TableCommandOptions( + new String[] { + "--bootstrap-server", + "localhost:9123", + "--create", + "--table", + "mydb.users", + "--schema", + "id INT" + }); + + assertThat(opts.buckets()).isNull(); + } + + @Test + void testMainNoExitReturnsOneOnError() { + int exitCode = TableCommand.mainNoExit(new String[] {}); + assertThat(exitCode).isEqualTo(1); + } +} From d8d93500229654f33f613902248755c150803700 Mon Sep 17 00:00:00 2001 From: Yang Guo Date: Tue, 19 May 2026 18:46:27 +0800 Subject: [PATCH 07/11] [cli] Add shell scripts for CLI tools --- .../src/main/resources/bin/fluss-database.sh | 20 +++++++++ .../src/main/resources/bin/fluss-run-class.sh | 42 +++++++++++++++++++ .../src/main/resources/bin/fluss-table.sh | 20 +++++++++ 3 files changed, 82 insertions(+) create mode 100755 fluss-dist/src/main/resources/bin/fluss-database.sh create mode 100755 fluss-dist/src/main/resources/bin/fluss-run-class.sh create mode 100755 fluss-dist/src/main/resources/bin/fluss-table.sh diff --git a/fluss-dist/src/main/resources/bin/fluss-database.sh b/fluss-dist/src/main/resources/bin/fluss-database.sh new file mode 100755 index 0000000000..51bb5543b4 --- /dev/null +++ b/fluss-dist/src/main/resources/bin/fluss-database.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +exec "`dirname "$0"`"/fluss-run-class.sh org.apache.fluss.cli.DatabaseCommand "$@" diff --git a/fluss-dist/src/main/resources/bin/fluss-run-class.sh b/fluss-dist/src/main/resources/bin/fluss-run-class.sh new file mode 100755 index 0000000000..3b27a276eb --- /dev/null +++ b/fluss-dist/src/main/resources/bin/fluss-run-class.sh @@ -0,0 +1,42 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Launches a Java class with the Fluss classpath. +# Usage: fluss-run-class.sh [opts] + +if [ $# -lt 1 ]; then + echo "USAGE: $0 classname [opts]" + exit 1 +fi + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +FLUSS_CLASSPATH=`constructFlussClassPath` + +CLASS_TO_RUN=$1 +shift + +log_setting=("-Dlog4j.configuration=file:${FLUSS_CONF_DIR}/log4j-console.properties" "-Dlog4j.configurationFile=file:${FLUSS_CONF_DIR}/log4j-console.properties") + +FLUSS_ENV_JAVA_OPTS=$(eval echo ${FLUSS_ENV_JAVA_OPTS}) + +exec "$JAVA_RUN" $JVM_ARGS ${FLUSS_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLUSS_CLASSPATH"`" ${CLASS_TO_RUN} "$@" diff --git a/fluss-dist/src/main/resources/bin/fluss-table.sh b/fluss-dist/src/main/resources/bin/fluss-table.sh new file mode 100755 index 0000000000..69b309e61c --- /dev/null +++ b/fluss-dist/src/main/resources/bin/fluss-table.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +exec "`dirname "$0"`"/fluss-run-class.sh org.apache.fluss.cli.TableCommand "$@" From c2d0a89ea0c121be3500cb578a27fae46f7863b1 Mon Sep 17 00:00:00 2001 From: Yang Guo Date: Tue, 19 May 2026 18:54:53 +0800 Subject: [PATCH 08/11] [cli] Fix review issues: trim key values, use timeout constant - Trim whitespace in primaryKeys() and partitionKeys() to handle user input like "id, name" correctly - Replace hardcoded timeout with CommandUtils.DEFAULT_TIMEOUT_SECS --- .../org/apache/fluss/cli/CommandUtils.java | 2 +- .../org/apache/fluss/cli/DatabaseCommand.java | 12 ++++++---- .../org/apache/fluss/cli/TableCommand.java | 24 ++++++++++++++----- 3 files changed, 27 insertions(+), 11 deletions(-) diff --git a/fluss-cli/src/main/java/org/apache/fluss/cli/CommandUtils.java b/fluss-cli/src/main/java/org/apache/fluss/cli/CommandUtils.java index 3837b90e58..7154a31462 100644 --- a/fluss-cli/src/main/java/org/apache/fluss/cli/CommandUtils.java +++ b/fluss-cli/src/main/java/org/apache/fluss/cli/CommandUtils.java @@ -39,7 +39,7 @@ @Internal public final class CommandUtils { - public static final long DEFAULT_TIMEOUT_MS = 30_000; + public static final long DEFAULT_TIMEOUT_SECS = 30; private CommandUtils() {} diff --git a/fluss-cli/src/main/java/org/apache/fluss/cli/DatabaseCommand.java b/fluss-cli/src/main/java/org/apache/fluss/cli/DatabaseCommand.java index 272a114422..0814242f02 100644 --- a/fluss-cli/src/main/java/org/apache/fluss/cli/DatabaseCommand.java +++ b/fluss-cli/src/main/java/org/apache/fluss/cli/DatabaseCommand.java @@ -93,18 +93,21 @@ void createDatabase(DatabaseCommandOptions opts) throws Exception { DatabaseDescriptor.Builder builder = DatabaseDescriptor.builder(); props.forEach(builder::customProperty); admin.createDatabase(dbName, builder.build(), opts.ifNotExists()) - .get(30, TimeUnit.SECONDS); + .get(CommandUtils.DEFAULT_TIMEOUT_SECS, TimeUnit.SECONDS); System.out.println("Created database \"" + dbName + "\"."); } void listDatabases() throws Exception { - List databases = admin.listDatabases().get(30, TimeUnit.SECONDS); + List databases = + admin.listDatabases().get(CommandUtils.DEFAULT_TIMEOUT_SECS, TimeUnit.SECONDS); databases.forEach(System.out::println); } void describeDatabase(DatabaseCommandOptions opts) throws Exception { String dbName = opts.database(); - DatabaseInfo info = admin.getDatabaseInfo(dbName).get(30, TimeUnit.SECONDS); + DatabaseInfo info = + admin.getDatabaseInfo(dbName) + .get(CommandUtils.DEFAULT_TIMEOUT_SECS, TimeUnit.SECONDS); System.out.println("Database: " + dbName); info.getDatabaseDescriptor() .getComment() @@ -120,7 +123,8 @@ void describeDatabase(DatabaseCommandOptions opts) throws Exception { void dropDatabase(DatabaseCommandOptions opts) throws Exception { String dbName = opts.database(); - admin.dropDatabase(dbName, opts.ifExists(), opts.cascade()).get(30, TimeUnit.SECONDS); + admin.dropDatabase(dbName, opts.ifExists(), opts.cascade()) + .get(CommandUtils.DEFAULT_TIMEOUT_SECS, TimeUnit.SECONDS); System.out.println("Dropped database \"" + dbName + "\"."); } diff --git a/fluss-cli/src/main/java/org/apache/fluss/cli/TableCommand.java b/fluss-cli/src/main/java/org/apache/fluss/cli/TableCommand.java index 8aa948529f..9c2020f494 100644 --- a/fluss-cli/src/main/java/org/apache/fluss/cli/TableCommand.java +++ b/fluss-cli/src/main/java/org/apache/fluss/cli/TableCommand.java @@ -116,19 +116,23 @@ void createTable(TableCommandOptions opts) throws Exception { props.forEach(builder::property); admin.createTable(tablePath, builder.build(), opts.ifNotExists()) - .get(30, TimeUnit.SECONDS); + .get(CommandUtils.DEFAULT_TIMEOUT_SECS, TimeUnit.SECONDS); System.out.println("Created table \"" + tablePath + "\"."); } void listTables(TableCommandOptions opts) throws Exception { String database = opts.database(); - List tables = admin.listTables(database).get(30, TimeUnit.SECONDS); + List tables = + admin.listTables(database) + .get(CommandUtils.DEFAULT_TIMEOUT_SECS, TimeUnit.SECONDS); tables.forEach(System.out::println); } void describeTable(TableCommandOptions opts) throws Exception { TablePath tablePath = opts.tablePath(); - TableInfo tableInfo = admin.getTableInfo(tablePath).get(30, TimeUnit.SECONDS); + TableInfo tableInfo = + admin.getTableInfo(tablePath) + .get(CommandUtils.DEFAULT_TIMEOUT_SECS, TimeUnit.SECONDS); Schema schema = tableInfo.getSchema(); boolean hasPK = tableInfo.hasPrimaryKey(); @@ -178,7 +182,8 @@ void describeTable(TableCommandOptions opts) throws Exception { void dropTable(TableCommandOptions opts) throws Exception { TablePath tablePath = opts.tablePath(); - admin.dropTable(tablePath, opts.ifExists()).get(30, TimeUnit.SECONDS); + admin.dropTable(tablePath, opts.ifExists()) + .get(CommandUtils.DEFAULT_TIMEOUT_SECS, TimeUnit.SECONDS); System.out.println("Dropped table \"" + tablePath + "\"."); } @@ -325,7 +330,7 @@ List primaryKeys() { if (pk == null) { return Collections.emptyList(); } - return Arrays.asList(pk.split(",")); + return Arrays.asList(trimElements(pk.split(","))); } List partitionKeys() { @@ -333,7 +338,14 @@ List partitionKeys() { if (pb == null) { return Collections.emptyList(); } - return Arrays.asList(pb.split(",")); + return Arrays.asList(trimElements(pb.split(","))); + } + + private static String[] trimElements(String[] elements) { + for (int i = 0; i < elements.length; i++) { + elements[i] = elements[i].trim(); + } + return elements; } Integer buckets() { From cd4706158aeabb1c4e6790f4e07c0e27b2e75aee Mon Sep 17 00:00:00 2001 From: Yang Guo Date: Wed, 20 May 2026 12:04:30 +0800 Subject: [PATCH 09/11] [cli] Add CLI jars to distribution and add docker-compose for local testing Include fluss-cli, fluss-client, and commons-cli in the binary distribution so CLI tools work at runtime. Add docker-compose.yml with ZooKeeper, coordinator, and tablet-server for local development. --- docker/docker-compose.yml | 56 ++++++++++++++++++++++++++ fluss-dist/pom.xml | 5 +++ fluss-dist/src/main/assemblies/bin.xml | 17 ++++++++ 3 files changed, 78 insertions(+) create mode 100644 docker/docker-compose.yml diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml new file mode 100644 index 0000000000..d9a7b48f17 --- /dev/null +++ b/docker/docker-compose.yml @@ -0,0 +1,56 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Fluss local development cluster +# Usage: +# docker compose -f docker/docker-compose.yml up -d +# docker compose -f docker/docker-compose.yml exec coordinator ./bin/fluss-database.sh --bootstrap-server coordinator:9123 --list + +services: + zookeeper: + image: zookeeper:3.8 + ports: + - "2181:2181" + environment: + ZOO_MY_ID: 1 + + coordinator: + image: fluss-local + depends_on: + - zookeeper + ports: + - "9123:9123" + environment: + FLUSS_PROPERTIES: | + zookeeper.address: zookeeper:2181 + bind.listeners: FLUSS://coordinator:9123 + remote.data.dir: /tmp/fluss-remote-data + command: coordinatorServer + + tablet-server-0: + image: fluss-local + depends_on: + - coordinator + environment: + FLUSS_PROPERTIES: | + zookeeper.address: zookeeper:2181 + bind.listeners: FLUSS://tablet-server-0:9123 + tablet-server.id: 0 + data.dir: /tmp/fluss-data + remote.data.dir: /tmp/fluss-remote-data + command: tabletServer diff --git a/fluss-dist/pom.xml b/fluss-dist/pom.xml index e6ea5219d0..eca37bd1fb 100644 --- a/fluss-dist/pom.xml +++ b/fluss-dist/pom.xml @@ -46,6 +46,11 @@ provided + + commons-cli + commons-cli + + org.apache.fluss diff --git a/fluss-dist/src/main/assemblies/bin.xml b/fluss-dist/src/main/assemblies/bin.xml index 3c1d879f08..5bbc1354d0 100644 --- a/fluss-dist/src/main/assemblies/bin.xml +++ b/fluss-dist/src/main/assemblies/bin.xml @@ -41,6 +41,7 @@ org.apache.logging.log4j:log4j-core org.apache.logging.log4j:log4j-slf4j-impl org.apache.logging.log4j:log4j-1.2-api + commons-cli:commons-cli @@ -56,6 +57,22 @@ 0644 + + + ../fluss-cli/target/fluss-cli-${project.version}.jar + lib/ + fluss-cli-${project.version}.jar + 0644 + + + + + ../fluss-client/target/fluss-client-${project.version}.jar + lib/ + fluss-client-${project.version}.jar + 0644 + + src/main/resources/server.yaml From aa8dbc11e9d3071717fe18530f64dddf518cf67c Mon Sep 17 00:00:00 2001 From: Yang Guo Date: Wed, 20 May 2026 16:50:39 +0800 Subject: [PATCH 10/11] [cli] Add bucket distribution with ISR to table describe command Add ISR field to PbBucketMetadata proto, propagate through BucketMetadata, BucketLocation, and MetadataUtils. The table describe command now shows per-bucket leader, replicas, and ISR information similar to Kafka's topic describe output. --- .../org/apache/fluss/cli/TableCommand.java | 42 +++++++++++++++++++ .../fluss/client/utils/MetadataUtils.java | 6 ++- .../apache/fluss/cluster/BucketLocation.java | 32 +++++++++++--- fluss-rpc/src/main/proto/FlussApi.proto | 2 +- .../coordinator/CoordinatorRequestBatch.java | 8 +++- .../fluss/server/metadata/BucketMetadata.java | 23 +++++++++- .../metadata/CoordinatorMetadataProvider.java | 6 ++- .../server/utils/ServerRpcMessageUtils.java | 7 +++- .../fluss/server/zk/ZooKeeperClient.java | 3 +- 9 files changed, 115 insertions(+), 14 deletions(-) diff --git a/fluss-cli/src/main/java/org/apache/fluss/cli/TableCommand.java b/fluss-cli/src/main/java/org/apache/fluss/cli/TableCommand.java index 9c2020f494..f7aa8c11e0 100644 --- a/fluss-cli/src/main/java/org/apache/fluss/cli/TableCommand.java +++ b/fluss-cli/src/main/java/org/apache/fluss/cli/TableCommand.java @@ -20,8 +20,13 @@ import org.apache.fluss.annotation.Internal; import org.apache.fluss.client.Connection; +import org.apache.fluss.client.FlussConnection; import org.apache.fluss.client.admin.Admin; +import org.apache.fluss.client.metadata.MetadataUpdater; +import org.apache.fluss.cluster.BucketLocation; +import org.apache.fluss.cluster.Cluster; import org.apache.fluss.config.Configuration; +import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TableInfo; @@ -178,6 +183,43 @@ void describeTable(TableCommandOptions opts) throws Exception { propsMap.forEach((k, v) -> System.out.println(" " + k + " = " + v)); customPropsMap.forEach((k, v) -> System.out.println(" " + k + " = " + v)); } + + printBucketDistribution(tablePath); + } + + private void printBucketDistribution(TablePath tablePath) { + MetadataUpdater metadataUpdater = ((FlussConnection) connection).getMetadataUpdater(); + metadataUpdater.updateTableOrPartitionMetadata(tablePath, null); + Cluster cluster = metadataUpdater.getCluster(); + PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tablePath); + java.util.List bucketLocations = + cluster.getAvailableBucketsForPhysicalTablePath(physicalTablePath); + if (bucketLocations == null || bucketLocations.isEmpty()) { + return; + } + System.out.println("Bucket Distribution:"); + System.out.printf(" %-8s%-8s%-12s%s%n", "Bucket", "Leader", "Replicas", "Isr"); + for (BucketLocation loc : bucketLocations) { + String leader = loc.getLeader() == null ? "none" : String.valueOf(loc.getLeader()); + System.out.printf( + " %-8d%-8s%-12s%s%n", + loc.getBucketId(), + leader, + formatIntArray(loc.getReplicas()), + formatIntArray(loc.getIsr())); + } + } + + private static String formatIntArray(int[] arr) { + StringBuilder sb = new StringBuilder("["); + for (int i = 0; i < arr.length; i++) { + if (i > 0) { + sb.append(","); + } + sb.append(arr[i]); + } + sb.append("]"); + return sb.toString(); } void dropTable(TableCommandOptions opts) throws Exception { diff --git a/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java b/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java index 2990054999..d4d8b86d06 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java @@ -291,6 +291,10 @@ private static List toBucketLocations( for (int i = 0; i < replicas.length; i++) { replicas[i] = pbBucketMetadata.getReplicaIdAt(i); } + int[] isr = new int[pbBucketMetadata.getIsrsCount()]; + for (int i = 0; i < isr.length; i++) { + isr[i] = pbBucketMetadata.getIsrAt(i); + } Integer leader = null; if (pbBucketMetadata.hasLeaderId()) { leader = pbBucketMetadata.getLeaderId(); @@ -298,7 +302,7 @@ private static List toBucketLocations( PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tablePath, partitionName); BucketLocation bucketLocation = - new BucketLocation(physicalTablePath, tableBucket, leader, replicas); + new BucketLocation(physicalTablePath, tableBucket, leader, replicas, isr); bucketLocations.add(bucketLocation); } return bucketLocations; diff --git a/fluss-common/src/main/java/org/apache/fluss/cluster/BucketLocation.java b/fluss-common/src/main/java/org/apache/fluss/cluster/BucketLocation.java index 36977136de..6977c6139a 100644 --- a/fluss-common/src/main/java/org/apache/fluss/cluster/BucketLocation.java +++ b/fluss-common/src/main/java/org/apache/fluss/cluster/BucketLocation.java @@ -33,6 +33,7 @@ public final class BucketLocation { private final TableBucket tableBucket; @Nullable private Integer leader; private final int[] replicas; + private final int[] isr; public BucketLocation( PhysicalTablePath physicalTablePath, @@ -40,7 +41,7 @@ public BucketLocation( int bucketId, @Nullable Integer leader, int[] replicas) { - this(physicalTablePath, new TableBucket(tableId, bucketId), leader, replicas); + this(physicalTablePath, new TableBucket(tableId, bucketId), leader, replicas, new int[0]); } public BucketLocation( @@ -48,10 +49,20 @@ public BucketLocation( TableBucket tableBucket, @Nullable Integer leader, int[] replicas) { + this(physicalTablePath, tableBucket, leader, replicas, new int[0]); + } + + public BucketLocation( + PhysicalTablePath physicalTablePath, + TableBucket tableBucket, + @Nullable Integer leader, + int[] replicas, + int[] isr) { this.physicalTablePath = physicalTablePath; this.tableBucket = tableBucket; this.leader = leader; this.replicas = replicas; + this.isr = isr; } public PhysicalTablePath getPhysicalTablePath() { @@ -79,6 +90,10 @@ public int[] getReplicas() { return replicas; } + public int[] getIsr() { + return isr; + } + @Override public boolean equals(Object object) { if (this == object) { @@ -91,22 +106,29 @@ public boolean equals(Object object) { return Objects.equals(physicalTablePath, that.physicalTablePath) && Objects.equals(tableBucket, that.tableBucket) && Objects.equals(leader, that.leader) - && Objects.deepEquals(replicas, that.replicas); + && Objects.deepEquals(replicas, that.replicas) + && Objects.deepEquals(isr, that.isr); } @Override public int hashCode() { - return Objects.hash(physicalTablePath, tableBucket, leader, Arrays.hashCode(replicas)); + return Objects.hash( + physicalTablePath, + tableBucket, + leader, + Arrays.hashCode(replicas), + Arrays.hashCode(isr)); } @Override public String toString() { return String.format( - "Bucket(physicalTablePath = %s, %s, leader = %s, replicas = %s)", + "Bucket(physicalTablePath = %s, %s, leader = %s, replicas = %s, isr = %s)", physicalTablePath, tableBucket, leader == null ? "none" : leader, - formatNodeIds(replicas)); + formatNodeIds(replicas), + formatNodeIds(isr)); } /** Format the node ids from each item in the array for display. */ diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index e8381215fc..8d02f97718 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -830,8 +830,8 @@ message PbBucketMetadata { // optional as some time the leader may not elected yet optional int32 leader_id = 2; repeated int32 replica_id = 3 [packed = true]; - // TODO: Add isr here. optional int32 leader_epoch = 4; + repeated int32 isr = 5 [packed = true]; } message PbProduceLogReqForBucket { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java index 3441053671..37fe669c6c 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java @@ -323,6 +323,8 @@ public void addUpdateMetadataRequestForTabletServers( Integer leaderEpoch = bucketLeaderAndIsr.map(LeaderAndIsr::leaderEpoch).orElse(null); Integer leader = bucketLeaderAndIsr.map(LeaderAndIsr::leader).orElse(null); + List isr = + bucketLeaderAndIsr.map(LeaderAndIsr::isr).orElse(Collections.emptyList()); if (currentPartitionId == null) { Map> tableAssignment = coordinatorContext.getTableAssignment(currentTableId); @@ -331,7 +333,8 @@ public void addUpdateMetadataRequestForTabletServers( tableBucket.getBucket(), leader, leaderEpoch, - tableAssignment.get(tableBucket.getBucket())); + tableAssignment.get(tableBucket.getBucket()), + isr); updateMetadataRequestBucketMap .computeIfAbsent(currentTableId, k -> new ArrayList<>()) .add(bucketMetadata); @@ -345,7 +348,8 @@ public void addUpdateMetadataRequestForTabletServers( tableBucket.getBucket(), leader, leaderEpoch, - partitionAssignment.get(tableBucket.getBucket())); + partitionAssignment.get(tableBucket.getBucket()), + isr); updateMetadataRequestPartitionMap .computeIfAbsent(currentTableId, k -> new HashMap<>()) .computeIfAbsent(tableBucket.getPartitionId(), k -> new ArrayList<>()) diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metadata/BucketMetadata.java b/fluss-server/src/main/java/org/apache/fluss/server/metadata/BucketMetadata.java index b6b968f28e..9bc09ece8f 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metadata/BucketMetadata.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metadata/BucketMetadata.java @@ -19,6 +19,7 @@ import javax.annotation.Nullable; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.OptionalInt; @@ -29,16 +30,27 @@ public class BucketMetadata { private final @Nullable Integer leaderId; private final @Nullable Integer leaderEpoch; private final List replicas; + private final List isr; public BucketMetadata( int bucketId, @Nullable Integer leaderId, @Nullable Integer leaderEpoch, List replicas) { + this(bucketId, leaderId, leaderEpoch, replicas, Collections.emptyList()); + } + + public BucketMetadata( + int bucketId, + @Nullable Integer leaderId, + @Nullable Integer leaderEpoch, + List replicas, + List isr) { this.bucketId = bucketId; this.leaderId = leaderId; this.leaderEpoch = leaderEpoch; this.replicas = replicas; + this.isr = isr; } public int getBucketId() { @@ -57,6 +69,10 @@ public List getReplicas() { return replicas; } + public List getIsr() { + return isr; + } + @Override public String toString() { return "BucketMetadata{" @@ -68,6 +84,8 @@ public String toString() { + leaderEpoch + ", replicas=" + replicas + + ", isr=" + + isr + '}'; } @@ -83,11 +101,12 @@ public boolean equals(Object o) { return bucketId == that.bucketId && Objects.equals(leaderId, that.leaderId) && Objects.equals(leaderEpoch, that.leaderEpoch) - && replicas.equals(that.replicas); + && replicas.equals(that.replicas) + && isr.equals(that.isr); } @Override public int hashCode() { - return Objects.hash(bucketId, leaderId, leaderEpoch, replicas); + return Objects.hash(bucketId, leaderId, leaderEpoch, replicas, isr); } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metadata/CoordinatorMetadataProvider.java b/fluss-server/src/main/java/org/apache/fluss/server/metadata/CoordinatorMetadataProvider.java index 27511d0540..a1b06218c7 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metadata/CoordinatorMetadataProvider.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metadata/CoordinatorMetadataProvider.java @@ -30,6 +30,7 @@ import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -136,12 +137,15 @@ private static List getBucketMetadataFromContext( TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId); Optional optLeaderAndIsr = ctx.getBucketLeaderAndIsr(tableBucket); Integer leader = optLeaderAndIsr.map(LeaderAndIsr::leader).orElse(null); + List isr = + optLeaderAndIsr.map(LeaderAndIsr::isr).orElse(Collections.emptyList()); BucketMetadata bucketMetadata = new BucketMetadata( bucketId, leader, ctx.getBucketLeaderEpoch(tableBucket), - serverIds); + serverIds, + isr); bucketMetadataList.add(bucketMetadata); }); return bucketMetadataList; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java index f7a89828ab..b0d7c0ea8c 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java @@ -618,6 +618,10 @@ private static List toPbBucketMetadata( pbBucketMetadata.addReplicaId(replica); } + for (Integer isrNode : bucketMetadata.getIsr()) { + pbBucketMetadata.addIsr(isrNode); + } + pbBucketMetadataList.add(pbBucketMetadata); } return pbBucketMetadataList; @@ -658,7 +662,8 @@ private static BucketMetadata toBucketMetadata(PbBucketMetadata pbBucketMetadata pbBucketMetadata.hasLeaderEpoch() ? pbBucketMetadata.getLeaderEpoch() : null, Arrays.stream(pbBucketMetadata.getReplicaIds()) .boxed() - .collect(Collectors.toList())); + .collect(Collectors.toList()), + Arrays.stream(pbBucketMetadata.getIsrs()).boxed().collect(Collectors.toList())); } private static PartitionMetadata toPartitionMetadata(PbPartitionMetadata pbPartitionMetadata) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java index cb34a2f2ed..284d014825 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java @@ -1706,7 +1706,8 @@ private BucketMetadata createBucketMetadata( Integer leader = leaderAndIsr != null ? leaderAndIsr.leader() : null; Integer leaderEpoch = leaderAndIsr != null ? leaderAndIsr.leaderEpoch() : null; List replicas = assignment.getBucketAssignments().get(bucketId).getReplicas(); - return new BucketMetadata(bucketId, leader, leaderEpoch, replicas); + List isr = leaderAndIsr != null ? leaderAndIsr.isr() : Collections.emptyList(); + return new BucketMetadata(bucketId, leader, leaderEpoch, replicas, isr); } /** Close the underlying ZooKeeperClient. */ From f7af5bcda433a2667416cbf7c33bcd159a4cb68b Mon Sep 17 00:00:00 2001 From: Yang Guo Date: Thu, 21 May 2026 17:10:29 +0800 Subject: [PATCH 11/11] [cli] Suppress INFO logs for CLI tools and add 3 tablet servers to docker-compose Add log4j-cli.properties with WARN level for CLI scripts to suppress verbose INFO logs from metrics and netty. Update fluss-run-class.sh to prefer the CLI log config when present. Expand docker-compose to run 3 tablet servers for testing replication. --- docker/docker-compose.yml | 26 +++++++++++++++++++ .../fluss/cluster/BucketLocationTest.java | 4 +-- .../src/main/resources/bin/fluss-run-class.sh | 7 ++++- .../main/resources/conf/log4j-cli.properties | 24 +++++++++++++++++ 4 files changed, 58 insertions(+), 3 deletions(-) create mode 100644 fluss-dist/src/main/resources/conf/log4j-cli.properties diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index d9a7b48f17..184eed7e39 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -54,3 +54,29 @@ services: data.dir: /tmp/fluss-data remote.data.dir: /tmp/fluss-remote-data command: tabletServer + + tablet-server-1: + image: fluss-local + depends_on: + - coordinator + environment: + FLUSS_PROPERTIES: | + zookeeper.address: zookeeper:2181 + bind.listeners: FLUSS://tablet-server-1:9123 + tablet-server.id: 1 + data.dir: /tmp/fluss-data + remote.data.dir: /tmp/fluss-remote-data + command: tabletServer + + tablet-server-2: + image: fluss-local + depends_on: + - coordinator + environment: + FLUSS_PROPERTIES: | + zookeeper.address: zookeeper:2181 + bind.listeners: FLUSS://tablet-server-2:9123 + tablet-server.id: 2 + data.dir: /tmp/fluss-data + remote.data.dir: /tmp/fluss-remote-data + command: tabletServer diff --git a/fluss-common/src/test/java/org/apache/fluss/cluster/BucketLocationTest.java b/fluss-common/src/test/java/org/apache/fluss/cluster/BucketLocationTest.java index 9e494829c6..32ee31f217 100644 --- a/fluss-common/src/test/java/org/apache/fluss/cluster/BucketLocationTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/cluster/BucketLocationTest.java @@ -47,7 +47,7 @@ void testToString() { assertThat(bucketLocation.toString()) .isEqualTo( "Bucket(physicalTablePath = test_db.test_table, TableBucket{tableId=150001, bucket=0}, " - + "leader = 0, replicas = [0,1,2])"); + + "leader = 0, replicas = [0,1,2], isr = [])"); bucketLocation = new BucketLocation( @@ -55,6 +55,6 @@ void testToString() { assertThat(bucketLocation.toString()) .isEqualTo( "Bucket(physicalTablePath = test_db.test_table, TableBucket{tableId=150001, bucket=0}, " - + "leader = none, replicas = [0,1,2])"); + + "leader = none, replicas = [0,1,2], isr = [])"); } } diff --git a/fluss-dist/src/main/resources/bin/fluss-run-class.sh b/fluss-dist/src/main/resources/bin/fluss-run-class.sh index 3b27a276eb..2d3144d9fe 100755 --- a/fluss-dist/src/main/resources/bin/fluss-run-class.sh +++ b/fluss-dist/src/main/resources/bin/fluss-run-class.sh @@ -35,7 +35,12 @@ FLUSS_CLASSPATH=`constructFlussClassPath` CLASS_TO_RUN=$1 shift -log_setting=("-Dlog4j.configuration=file:${FLUSS_CONF_DIR}/log4j-console.properties" "-Dlog4j.configurationFile=file:${FLUSS_CONF_DIR}/log4j-console.properties") +CLI_LOG_CONF="${FLUSS_CONF_DIR}/log4j-cli.properties" +if [ -f "$CLI_LOG_CONF" ]; then + log_setting=("-Dlog4j.configuration=file:${CLI_LOG_CONF}" "-Dlog4j.configurationFile=file:${CLI_LOG_CONF}") +else + log_setting=("-Dlog4j.configuration=file:${FLUSS_CONF_DIR}/log4j-console.properties" "-Dlog4j.configurationFile=file:${FLUSS_CONF_DIR}/log4j-console.properties") +fi FLUSS_ENV_JAVA_OPTS=$(eval echo ${FLUSS_ENV_JAVA_OPTS}) diff --git a/fluss-dist/src/main/resources/conf/log4j-cli.properties b/fluss-dist/src/main/resources/conf/log4j-cli.properties new file mode 100644 index 0000000000..3555f363d8 --- /dev/null +++ b/fluss-dist/src/main/resources/conf/log4j-cli.properties @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +rootLogger.level=WARN +rootLogger.appenderRef.console.ref=ConsoleAppender + +appender.console.name=ConsoleAppender +appender.console.type=CONSOLE +appender.console.layout.type=PatternLayout +appender.console.layout.pattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %c - %m%n