diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml new file mode 100644 index 0000000000..184eed7e39 --- /dev/null +++ b/docker/docker-compose.yml @@ -0,0 +1,82 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Fluss local development cluster +# Usage: +# docker compose -f docker/docker-compose.yml up -d +# docker compose -f docker/docker-compose.yml exec coordinator ./bin/fluss-database.sh --bootstrap-server coordinator:9123 --list + +services: + zookeeper: + image: zookeeper:3.8 + ports: + - "2181:2181" + environment: + ZOO_MY_ID: 1 + + coordinator: + image: fluss-local + depends_on: + - zookeeper + ports: + - "9123:9123" + environment: + FLUSS_PROPERTIES: | + zookeeper.address: zookeeper:2181 + bind.listeners: FLUSS://coordinator:9123 + remote.data.dir: /tmp/fluss-remote-data + command: coordinatorServer + + tablet-server-0: + image: fluss-local + depends_on: + - coordinator + environment: + FLUSS_PROPERTIES: | + zookeeper.address: zookeeper:2181 + bind.listeners: FLUSS://tablet-server-0:9123 + tablet-server.id: 0 + data.dir: /tmp/fluss-data + remote.data.dir: /tmp/fluss-remote-data + command: tabletServer + + tablet-server-1: + image: fluss-local + depends_on: + - coordinator + environment: + FLUSS_PROPERTIES: | + zookeeper.address: zookeeper:2181 + bind.listeners: FLUSS://tablet-server-1:9123 + tablet-server.id: 1 + data.dir: /tmp/fluss-data + remote.data.dir: /tmp/fluss-remote-data + command: tabletServer + + tablet-server-2: + image: fluss-local + depends_on: + - coordinator + environment: + FLUSS_PROPERTIES: | + zookeeper.address: zookeeper:2181 + bind.listeners: FLUSS://tablet-server-2:9123 + tablet-server.id: 2 + data.dir: /tmp/fluss-data + remote.data.dir: /tmp/fluss-remote-data + command: tabletServer diff --git a/fluss-cli/pom.xml b/fluss-cli/pom.xml new file mode 100644 index 0000000000..8353116736 --- /dev/null +++ b/fluss-cli/pom.xml @@ -0,0 +1,70 @@ + + + + 4.0.0 + + org.apache.fluss + fluss + 1.0-SNAPSHOT + + + fluss-cli + Fluss : CLI + jar + + + + org.apache.fluss + fluss-client + ${project.version} + + + + commons-cli + commons-cli + + + + org.slf4j + slf4j-api + + + + + org.apache.fluss + fluss-test-utils + ${project.version} + test + + + + org.junit.jupiter + junit-jupiter + test + + + + org.assertj + assertj-core + test + + + diff --git a/fluss-cli/src/main/java/org/apache/fluss/cli/CommandDefaultOptions.java b/fluss-cli/src/main/java/org/apache/fluss/cli/CommandDefaultOptions.java new file mode 100644 index 0000000000..102e110954 --- /dev/null +++ b/fluss-cli/src/main/java/org/apache/fluss/cli/CommandDefaultOptions.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.cli; + +import org.apache.fluss.annotation.Internal; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; + +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Properties; + +/** + * Base class for CLI command option parsers. Provides common options shared by all CLI tools: + * {@code --help}, {@code --version}, {@code --bootstrap-server}, {@code --command-config}. + */ +@Internal +public class CommandDefaultOptions { + + private static final Option HELP_OPT = + Option.builder().longOpt("help").desc("Print usage information.").build(); + + private static final Option VERSION_OPT = + Option.builder().longOpt("version").desc("Display Fluss version.").build(); + + private static final Option BOOTSTRAP_SERVER_OPT = + Option.builder() + .longOpt("bootstrap-server") + .hasArg() + .argName("server") + .desc("REQUIRED: The Fluss server to connect to.") + .build(); + + private static final Option COMMAND_CONFIG_OPT = + Option.builder() + .longOpt("command-config") + .hasArg() + .argName("file") + .desc("Property file containing configs to be passed to the client.") + .build(); + + protected final Options options; + protected CommandLine commandLine; + + protected CommandDefaultOptions() { + this.options = new Options(); + options.addOption(HELP_OPT); + options.addOption(VERSION_OPT); + options.addOption(BOOTSTRAP_SERVER_OPT); + options.addOption(COMMAND_CONFIG_OPT); + } + + protected void parse(String[] args) throws ParseException { + this.commandLine = new DefaultParser().parse(options, args); + if (!hasHelpOption() && !hasVersionOption() && bootstrapServer() == null) { + throw new ParseException("Missing required option: --bootstrap-server"); + } + } + + /** Returns {@code true} if the {@code --help} flag was specified. */ + public boolean hasHelpOption() { + return commandLine.hasOption(HELP_OPT); + } + + /** Returns {@code true} if the {@code --version} flag was specified. */ + public boolean hasVersionOption() { + return commandLine.hasOption(VERSION_OPT); + } + + /** Returns the value of {@code --bootstrap-server}, or {@code null} if not specified. */ + public String bootstrapServer() { + return commandLine.getOptionValue(BOOTSTRAP_SERVER_OPT); + } + + /** + * Loads the properties file specified by {@code --command-config}. Returns an empty {@link + * Properties} instance if the option was not specified. + */ + public Properties commandConfig() { + Properties props = new Properties(); + String file = commandLine.getOptionValue(COMMAND_CONFIG_OPT); + if (file == null) { + return props; + } + try (FileInputStream fis = new FileInputStream(file)) { + props.load(fis); + } catch (IOException e) { + throw new IllegalArgumentException("Failed to load command config file: " + file, e); + } + return props; + } + + /** Returns the configured {@link Options} for generating help text. */ + public Options options() { + return options; + } +} diff --git a/fluss-cli/src/main/java/org/apache/fluss/cli/CommandUtils.java b/fluss-cli/src/main/java/org/apache/fluss/cli/CommandUtils.java new file mode 100644 index 0000000000..7154a31462 --- /dev/null +++ b/fluss-cli/src/main/java/org/apache/fluss/cli/CommandUtils.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.cli; + +import org.apache.fluss.annotation.Internal; +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.config.Configuration; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +/** Shared utility methods for CLI commands. */ +@Internal +public final class CommandUtils { + + public static final long DEFAULT_TIMEOUT_SECS = 30; + + private CommandUtils() {} + + /** Create a {@link Connection} from common CLI options. */ + public static Connection createConnection(CommandDefaultOptions opts) { + Configuration conf = new Configuration(); + conf.setString("bootstrap.servers", opts.bootstrapServer()); + Properties cmdConfig = opts.commandConfig(); + cmdConfig.forEach((k, v) -> conf.setString((String) k, (String) v)); + return ConnectionFactory.createConnection(conf); + } + + /** Validate exactly one action flag is specified. */ + public static void validateActions(CommandLine cmd, Option... actions) { + long count = Arrays.stream(actions).filter(cmd::hasOption).count(); + if (count != 1) { + String names = + Arrays.stream(actions) + .map(o -> "--" + o.getLongOpt()) + .collect(Collectors.joining(", ")); + throw new IllegalArgumentException("Exactly one of " + names + " must be specified."); + } + } + + /** Print help and exit. */ + public static void printUsageAndExit(Options options, String cmdName) { + HelpFormatter formatter = new HelpFormatter(); + formatter.setWidth(100); + formatter.printHelp(cmdName, options, true); + System.exit(0); + } + + /** Print version and exit. */ + public static void printVersionAndExit() { + String version = CommandUtils.class.getPackage().getImplementationVersion(); + if (version == null) { + version = "(version unknown)"; + } + System.out.println("Fluss CLI version " + version); + System.exit(0); + } + + /** Parse repeatable {@code --property key=value} options into a Map. */ + public static Map parseProperties(String[] values) { + Map props = new HashMap<>(); + if (values == null) { + return props; + } + for (String prop : values) { + String[] kv = prop.split("=", 2); + if (kv.length != 2) { + throw new IllegalArgumentException( + "Invalid property format: '" + prop + "'. Expected key=value."); + } + props.put(kv[0].trim(), kv[1].trim()); + } + return props; + } + + /** Unwrap {@link ExecutionException} to get the root cause message. */ + public static String unwrapExceptionMessage(Throwable e) { + if (e instanceof ExecutionException && e.getCause() != null) { + return e.getCause().getMessage(); + } + return e.getMessage(); + } +} diff --git a/fluss-cli/src/main/java/org/apache/fluss/cli/DatabaseCommand.java b/fluss-cli/src/main/java/org/apache/fluss/cli/DatabaseCommand.java new file mode 100644 index 0000000000..0814242f02 --- /dev/null +++ b/fluss-cli/src/main/java/org/apache/fluss/cli/DatabaseCommand.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.cli; + +import org.apache.fluss.annotation.Internal; +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.admin.Admin; +import org.apache.fluss.metadata.DatabaseDescriptor; +import org.apache.fluss.metadata.DatabaseInfo; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.ParseException; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** CLI tool for managing Fluss databases. */ +@Internal +public class DatabaseCommand { + + public static void main(String[] args) { + System.exit(mainNoExit(args)); + } + + static int mainNoExit(String[] args) { + try { + execute(args); + return 0; + } catch (Throwable e) { + System.err.println( + "Error while executing database command: " + + CommandUtils.unwrapExceptionMessage(e)); + return 1; + } + } + + static void execute(String[] args) throws Exception { + DatabaseCommandOptions opts = new DatabaseCommandOptions(args); + if (opts.hasHelpOption()) { + CommandUtils.printUsageAndExit(opts.options(), "fluss-database.sh"); + } + if (opts.hasVersionOption()) { + CommandUtils.printVersionAndExit(); + } + CommandUtils.validateActions( + opts.commandLine(), opts.createOpt, opts.listOpt, opts.describeOpt, opts.dropOpt); + + try (DatabaseService service = new DatabaseService(opts)) { + if (opts.hasCreateOption()) { + service.createDatabase(opts); + } else if (opts.hasListOption()) { + service.listDatabases(); + } else if (opts.hasDescribeOption()) { + service.describeDatabase(opts); + } else if (opts.hasDropOption()) { + service.dropDatabase(opts); + } + } + } + + /** Service class that wraps the Admin client for database operations. */ + static class DatabaseService implements AutoCloseable { + private final Admin admin; + private final Connection connection; + + DatabaseService(DatabaseCommandOptions opts) { + this.connection = CommandUtils.createConnection(opts); + this.admin = connection.getAdmin(); + } + + void createDatabase(DatabaseCommandOptions opts) throws Exception { + String dbName = opts.database(); + Map props = + CommandUtils.parseProperties(opts.commandLine().getOptionValues("property")); + DatabaseDescriptor.Builder builder = DatabaseDescriptor.builder(); + props.forEach(builder::customProperty); + admin.createDatabase(dbName, builder.build(), opts.ifNotExists()) + .get(CommandUtils.DEFAULT_TIMEOUT_SECS, TimeUnit.SECONDS); + System.out.println("Created database \"" + dbName + "\"."); + } + + void listDatabases() throws Exception { + List databases = + admin.listDatabases().get(CommandUtils.DEFAULT_TIMEOUT_SECS, TimeUnit.SECONDS); + databases.forEach(System.out::println); + } + + void describeDatabase(DatabaseCommandOptions opts) throws Exception { + String dbName = opts.database(); + DatabaseInfo info = + admin.getDatabaseInfo(dbName) + .get(CommandUtils.DEFAULT_TIMEOUT_SECS, TimeUnit.SECONDS); + System.out.println("Database: " + dbName); + info.getDatabaseDescriptor() + .getComment() + .ifPresent(c -> System.out.println("Comment: " + c)); + Map props = info.getDatabaseDescriptor().getCustomProperties(); + System.out.println("Properties:"); + if (props.isEmpty()) { + System.out.println(" (none)"); + } else { + props.forEach((k, v) -> System.out.println(" " + k + " = " + v)); + } + } + + void dropDatabase(DatabaseCommandOptions opts) throws Exception { + String dbName = opts.database(); + admin.dropDatabase(dbName, opts.ifExists(), opts.cascade()) + .get(CommandUtils.DEFAULT_TIMEOUT_SECS, TimeUnit.SECONDS); + System.out.println("Dropped database \"" + dbName + "\"."); + } + + @Override + public void close() throws Exception { + connection.close(); + } + } + + /** CLI option parser for database commands. */ + static class DatabaseCommandOptions extends CommandDefaultOptions { + final Option createOpt = + Option.builder().longOpt("create").desc("Create a new database.").build(); + final Option listOpt = Option.builder().longOpt("list").desc("List all databases.").build(); + final Option describeOpt = + Option.builder().longOpt("describe").desc("Describe a database.").build(); + final Option dropOpt = Option.builder().longOpt("drop").desc("Drop a database.").build(); + final Option databaseOpt = + Option.builder() + .longOpt("database") + .hasArg() + .argName("name") + .desc("The database name.") + .build(); + final Option ifExistsOpt = + Option.builder() + .longOpt("if-exists") + .desc("Only execute if the database exists.") + .build(); + final Option ifNotExistsOpt = + Option.builder() + .longOpt("if-not-exists") + .desc("Only execute if the database does not exist.") + .build(); + final Option cascadeOpt = + Option.builder() + .longOpt("cascade") + .desc("Drop all tables in the database (for --drop).") + .build(); + final Option propertyOpt = + Option.builder() + .longOpt("property") + .hasArg() + .argName("key=value") + .desc("Database property (repeatable).") + .build(); + + DatabaseCommandOptions(String[] args) throws ParseException { + super(); + options.addOption(createOpt); + options.addOption(listOpt); + options.addOption(describeOpt); + options.addOption(dropOpt); + options.addOption(databaseOpt); + options.addOption(ifExistsOpt); + options.addOption(ifNotExistsOpt); + options.addOption(cascadeOpt); + options.addOption(propertyOpt); + parse(args); + } + + boolean hasCreateOption() { + return commandLine.hasOption(createOpt); + } + + boolean hasListOption() { + return commandLine.hasOption(listOpt); + } + + boolean hasDescribeOption() { + return commandLine.hasOption(describeOpt); + } + + boolean hasDropOption() { + return commandLine.hasOption(dropOpt); + } + + String database() { + String db = commandLine.getOptionValue(databaseOpt); + if (db == null) { + throw new IllegalArgumentException("--database is required."); + } + return db; + } + + boolean ifExists() { + return commandLine.hasOption(ifExistsOpt); + } + + boolean ifNotExists() { + return commandLine.hasOption(ifNotExistsOpt); + } + + boolean cascade() { + return commandLine.hasOption(cascadeOpt); + } + + CommandLine commandLine() { + return commandLine; + } + } +} diff --git a/fluss-cli/src/main/java/org/apache/fluss/cli/SchemaParser.java b/fluss-cli/src/main/java/org/apache/fluss/cli/SchemaParser.java new file mode 100644 index 0000000000..b4608c28d3 --- /dev/null +++ b/fluss-cli/src/main/java/org/apache/fluss/cli/SchemaParser.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.cli; + +import org.apache.fluss.annotation.Internal; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.types.DataType; +import org.apache.fluss.types.DataTypes; + +import java.util.ArrayList; +import java.util.List; + +/** + * Parses a schema definition string into a {@link Schema}. + * + *

Format: {@code "col1 TYPE1, col2 TYPE2 NOT NULL, col3 TYPE3"} + * + *

Uses parenthesis-aware splitting to handle types like {@code DECIMAL(10,2)}. Type strings + * (including nullability suffix) are parsed by {@link DataTypes#parse(String)}. + */ +@Internal +public final class SchemaParser { + + private SchemaParser() {} + + /** + * Parse a schema definition string into a Schema. + * + * @param schemaStr comma-separated column definitions + * @param primaryKeys primary key column names (empty list if no primary key) + * @return the parsed Schema + */ + public static Schema parseSchema(String schemaStr, List primaryKeys) { + if (schemaStr == null || schemaStr.trim().isEmpty()) { + throw new IllegalArgumentException("Schema definition cannot be empty."); + } + + Schema.Builder builder = Schema.newBuilder(); + List columnDefs = splitRespectingParentheses(schemaStr, ','); + + for (String colDef : columnDefs) { + String trimmed = colDef.trim(); + if (trimmed.isEmpty()) { + continue; + } + + int firstSpace = trimmed.indexOf(' '); + if (firstSpace < 0) { + throw new IllegalArgumentException( + "Invalid column definition: '" + + trimmed + + "'. Expected format: 'name TYPE [NOT NULL]'."); + } + + String name = trimmed.substring(0, firstSpace).trim(); + String typeStr = trimmed.substring(firstSpace + 1).trim(); + + DataType dataType = DataTypes.parse(typeStr); + builder.column(name, dataType); + } + + if (primaryKeys != null && !primaryKeys.isEmpty()) { + builder.primaryKey(primaryKeys); + } + + return builder.build(); + } + + /** + * Split a string by delimiter, skipping delimiters inside parentheses. Handles types like + * {@code DECIMAL(10,2)}. + */ + static List splitRespectingParentheses(String input, char delimiter) { + List result = new ArrayList<>(); + int depth = 0; + int start = 0; + for (int i = 0; i < input.length(); i++) { + char c = input.charAt(i); + if (c == '(') { + depth++; + } else if (c == ')') { + depth--; + } else if (c == delimiter && depth == 0) { + result.add(input.substring(start, i)); + start = i + 1; + } + } + result.add(input.substring(start)); + return result; + } +} diff --git a/fluss-cli/src/main/java/org/apache/fluss/cli/TableCommand.java b/fluss-cli/src/main/java/org/apache/fluss/cli/TableCommand.java new file mode 100644 index 0000000000..f7aa8c11e0 --- /dev/null +++ b/fluss-cli/src/main/java/org/apache/fluss/cli/TableCommand.java @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.cli; + +import org.apache.fluss.annotation.Internal; +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.FlussConnection; +import org.apache.fluss.client.admin.Admin; +import org.apache.fluss.client.metadata.MetadataUpdater; +import org.apache.fluss.cluster.BucketLocation; +import org.apache.fluss.cluster.Cluster; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.metadata.PhysicalTablePath; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.metadata.TablePath; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.ParseException; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +/** CLI tool for managing Fluss tables. */ +@Internal +public class TableCommand { + + public static void main(String[] args) { + System.exit(mainNoExit(args)); + } + + static int mainNoExit(String[] args) { + try { + execute(args); + return 0; + } catch (Throwable e) { + System.err.println( + "Error while executing table command: " + + CommandUtils.unwrapExceptionMessage(e)); + return 1; + } + } + + static void execute(String[] args) throws Exception { + TableCommandOptions opts = new TableCommandOptions(args); + if (opts.hasHelpOption()) { + CommandUtils.printUsageAndExit(opts.options(), "fluss-table.sh"); + } + if (opts.hasVersionOption()) { + CommandUtils.printVersionAndExit(); + } + CommandUtils.validateActions( + opts.commandLine(), opts.createOpt, opts.listOpt, opts.describeOpt, opts.dropOpt); + + try (TableService service = new TableService(opts)) { + if (opts.hasCreateOption()) { + service.createTable(opts); + } else if (opts.hasListOption()) { + service.listTables(opts); + } else if (opts.hasDescribeOption()) { + service.describeTable(opts); + } else if (opts.hasDropOption()) { + service.dropTable(opts); + } + } + } + + /** Service class that wraps the Admin client for table operations. */ + static class TableService implements AutoCloseable { + private final Admin admin; + private final Connection connection; + + TableService(TableCommandOptions opts) { + this.connection = CommandUtils.createConnection(opts); + this.admin = connection.getAdmin(); + } + + void createTable(TableCommandOptions opts) throws Exception { + TablePath tablePath = opts.tablePath(); + String schemaStr = opts.schema(); + List primaryKeys = opts.primaryKeys(); + + Schema schema = SchemaParser.parseSchema(schemaStr, primaryKeys); + + TableDescriptor.Builder builder = TableDescriptor.builder().schema(schema); + + List partitionKeys = opts.partitionKeys(); + if (!partitionKeys.isEmpty()) { + builder.partitionedBy(partitionKeys); + } + + Integer buckets = opts.buckets(); + if (buckets != null) { + builder.distributedBy(buckets); + } + + Map props = + CommandUtils.parseProperties(opts.commandLine().getOptionValues("property")); + props.forEach(builder::property); + + admin.createTable(tablePath, builder.build(), opts.ifNotExists()) + .get(CommandUtils.DEFAULT_TIMEOUT_SECS, TimeUnit.SECONDS); + System.out.println("Created table \"" + tablePath + "\"."); + } + + void listTables(TableCommandOptions opts) throws Exception { + String database = opts.database(); + List tables = + admin.listTables(database) + .get(CommandUtils.DEFAULT_TIMEOUT_SECS, TimeUnit.SECONDS); + tables.forEach(System.out::println); + } + + void describeTable(TableCommandOptions opts) throws Exception { + TablePath tablePath = opts.tablePath(); + TableInfo tableInfo = + admin.getTableInfo(tablePath) + .get(CommandUtils.DEFAULT_TIMEOUT_SECS, TimeUnit.SECONDS); + + Schema schema = tableInfo.getSchema(); + boolean hasPK = tableInfo.hasPrimaryKey(); + + System.out.println("Table: " + tablePath); + System.out.println("Type: " + (hasPK ? "PrimaryKey" : "Log")); + + System.out.println("Schema:"); + for (Schema.Column col : schema.getColumns()) { + System.out.println(" " + col.getName() + " " + col.getDataType()); + } + + if (hasPK) { + System.out.println("PrimaryKey: " + String.join(", ", tableInfo.getPrimaryKeys())); + } + + List partitionKeys = tableInfo.getPartitionKeys(); + if (!partitionKeys.isEmpty()) { + System.out.println("PartitionKeys: " + String.join(", ", partitionKeys)); + } + + int numBuckets = tableInfo.getNumBuckets(); + if (numBuckets > 0) { + System.out.println("Buckets: " + numBuckets); + } + + List bucketKeys = tableInfo.getBucketKeys(); + if (!bucketKeys.isEmpty()) { + System.out.println("BucketKeys: " + String.join(", ", bucketKeys)); + } + + Optional comment = tableInfo.getComment(); + if (comment.isPresent()) { + System.out.println("Comment: " + comment.get()); + } + + Configuration props = tableInfo.getProperties(); + Configuration customProps = tableInfo.getCustomProperties(); + Map propsMap = props.toMap(); + Map customPropsMap = customProps.toMap(); + if (!propsMap.isEmpty() || !customPropsMap.isEmpty()) { + System.out.println("Properties:"); + propsMap.forEach((k, v) -> System.out.println(" " + k + " = " + v)); + customPropsMap.forEach((k, v) -> System.out.println(" " + k + " = " + v)); + } + + printBucketDistribution(tablePath); + } + + private void printBucketDistribution(TablePath tablePath) { + MetadataUpdater metadataUpdater = ((FlussConnection) connection).getMetadataUpdater(); + metadataUpdater.updateTableOrPartitionMetadata(tablePath, null); + Cluster cluster = metadataUpdater.getCluster(); + PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tablePath); + java.util.List bucketLocations = + cluster.getAvailableBucketsForPhysicalTablePath(physicalTablePath); + if (bucketLocations == null || bucketLocations.isEmpty()) { + return; + } + System.out.println("Bucket Distribution:"); + System.out.printf(" %-8s%-8s%-12s%s%n", "Bucket", "Leader", "Replicas", "Isr"); + for (BucketLocation loc : bucketLocations) { + String leader = loc.getLeader() == null ? "none" : String.valueOf(loc.getLeader()); + System.out.printf( + " %-8d%-8s%-12s%s%n", + loc.getBucketId(), + leader, + formatIntArray(loc.getReplicas()), + formatIntArray(loc.getIsr())); + } + } + + private static String formatIntArray(int[] arr) { + StringBuilder sb = new StringBuilder("["); + for (int i = 0; i < arr.length; i++) { + if (i > 0) { + sb.append(","); + } + sb.append(arr[i]); + } + sb.append("]"); + return sb.toString(); + } + + void dropTable(TableCommandOptions opts) throws Exception { + TablePath tablePath = opts.tablePath(); + admin.dropTable(tablePath, opts.ifExists()) + .get(CommandUtils.DEFAULT_TIMEOUT_SECS, TimeUnit.SECONDS); + System.out.println("Dropped table \"" + tablePath + "\"."); + } + + @Override + public void close() throws Exception { + connection.close(); + } + } + + /** CLI option parser for table commands. */ + static class TableCommandOptions extends CommandDefaultOptions { + final Option createOpt = + Option.builder().longOpt("create").desc("Create a new table.").build(); + final Option listOpt = + Option.builder().longOpt("list").desc("List all tables in a database.").build(); + final Option describeOpt = + Option.builder().longOpt("describe").desc("Describe a table.").build(); + final Option dropOpt = Option.builder().longOpt("drop").desc("Drop a table.").build(); + final Option databaseOpt = + Option.builder() + .longOpt("database") + .hasArg() + .argName("name") + .desc("The database name (for --list).") + .build(); + final Option tableOpt = + Option.builder() + .longOpt("table") + .hasArg() + .argName("db.table") + .desc("The table path in format 'database.table'.") + .build(); + final Option schemaOpt = + Option.builder() + .longOpt("schema") + .hasArg() + .argName("definition") + .desc("Column definitions: 'col1 INT, col2 STRING NOT NULL, ...'") + .build(); + final Option primaryKeyOpt = + Option.builder() + .longOpt("primary-key") + .hasArg() + .argName("columns") + .desc("Primary key columns (comma-separated).") + .build(); + final Option partitionByOpt = + Option.builder() + .longOpt("partition-by") + .hasArg() + .argName("columns") + .desc("Partition columns (comma-separated).") + .build(); + final Option bucketsOpt = + Option.builder() + .longOpt("buckets") + .hasArg() + .argName("num") + .desc("Number of buckets.") + .build(); + final Option propertyOpt = + Option.builder() + .longOpt("property") + .hasArg() + .argName("key=value") + .desc("Table property (repeatable).") + .build(); + final Option ifExistsOpt = + Option.builder() + .longOpt("if-exists") + .desc("Only execute if the table exists.") + .build(); + final Option ifNotExistsOpt = + Option.builder() + .longOpt("if-not-exists") + .desc("Only execute if the table does not exist.") + .build(); + + TableCommandOptions(String[] args) throws ParseException { + super(); + options.addOption(createOpt); + options.addOption(listOpt); + options.addOption(describeOpt); + options.addOption(dropOpt); + options.addOption(databaseOpt); + options.addOption(tableOpt); + options.addOption(schemaOpt); + options.addOption(primaryKeyOpt); + options.addOption(partitionByOpt); + options.addOption(bucketsOpt); + options.addOption(propertyOpt); + options.addOption(ifExistsOpt); + options.addOption(ifNotExistsOpt); + parse(args); + } + + boolean hasCreateOption() { + return commandLine.hasOption(createOpt); + } + + boolean hasListOption() { + return commandLine.hasOption(listOpt); + } + + boolean hasDescribeOption() { + return commandLine.hasOption(describeOpt); + } + + boolean hasDropOption() { + return commandLine.hasOption(dropOpt); + } + + TablePath tablePath() { + String table = commandLine.getOptionValue(tableOpt); + if (table == null) { + throw new IllegalArgumentException("--table is required."); + } + String[] parts = table.split("\\.", 2); + if (parts.length != 2 || parts[0].isEmpty() || parts[1].isEmpty()) { + throw new IllegalArgumentException( + "Invalid table path: '" + table + "'. Expected format: 'database.table'."); + } + return TablePath.of(parts[0], parts[1]); + } + + String database() { + String db = commandLine.getOptionValue(databaseOpt); + if (db == null) { + throw new IllegalArgumentException("--database is required."); + } + return db; + } + + String schema() { + String s = commandLine.getOptionValue(schemaOpt); + if (s == null) { + throw new IllegalArgumentException("--schema is required for --create."); + } + return s; + } + + List primaryKeys() { + String pk = commandLine.getOptionValue(primaryKeyOpt); + if (pk == null) { + return Collections.emptyList(); + } + return Arrays.asList(trimElements(pk.split(","))); + } + + List partitionKeys() { + String pb = commandLine.getOptionValue(partitionByOpt); + if (pb == null) { + return Collections.emptyList(); + } + return Arrays.asList(trimElements(pb.split(","))); + } + + private static String[] trimElements(String[] elements) { + for (int i = 0; i < elements.length; i++) { + elements[i] = elements[i].trim(); + } + return elements; + } + + Integer buckets() { + String b = commandLine.getOptionValue(bucketsOpt); + if (b == null) { + return null; + } + return Integer.parseInt(b); + } + + boolean ifExists() { + return commandLine.hasOption(ifExistsOpt); + } + + boolean ifNotExists() { + return commandLine.hasOption(ifNotExistsOpt); + } + + CommandLine commandLine() { + return commandLine; + } + } +} diff --git a/fluss-cli/src/test/java/org/apache/fluss/cli/CommandDefaultOptionsTest.java b/fluss-cli/src/test/java/org/apache/fluss/cli/CommandDefaultOptionsTest.java new file mode 100644 index 0000000000..4fb2a19143 --- /dev/null +++ b/fluss-cli/src/test/java/org/apache/fluss/cli/CommandDefaultOptionsTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.cli; + +import org.apache.commons.cli.ParseException; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Properties; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link CommandDefaultOptions}. */ +class CommandDefaultOptionsTest { + + @Test + void testParseBootstrapServer() throws ParseException { + CommandDefaultOptions opts = new CommandDefaultOptions(); + opts.parse(new String[] {"--bootstrap-server", "localhost:9123"}); + + assertThat(opts.bootstrapServer()).isEqualTo("localhost:9123"); + assertThat(opts.hasHelpOption()).isFalse(); + assertThat(opts.hasVersionOption()).isFalse(); + } + + @Test + void testMissingBootstrapServerThrows() { + CommandDefaultOptions opts = new CommandDefaultOptions(); + + assertThatThrownBy(() -> opts.parse(new String[] {"--list"})) + .isInstanceOf(ParseException.class); + } + + @Test + void testHelpOption() throws ParseException { + CommandDefaultOptions opts = new CommandDefaultOptions(); + opts.parse(new String[] {"--help"}); + + assertThat(opts.hasHelpOption()).isTrue(); + } + + @Test + void testVersionOption() throws ParseException { + CommandDefaultOptions opts = new CommandDefaultOptions(); + opts.parse(new String[] {"--version"}); + + assertThat(opts.hasVersionOption()).isTrue(); + } + + @Test + void testCommandConfigLoadsProperties(@TempDir Path tempDir) + throws ParseException, IOException { + Path configFile = tempDir.resolve("client.properties"); + Properties props = new Properties(); + props.setProperty("security.protocol", "SASL_SSL"); + try (FileOutputStream fos = new FileOutputStream(configFile.toFile())) { + props.store(fos, null); + } + + CommandDefaultOptions opts = new CommandDefaultOptions(); + opts.parse( + new String[] { + "--bootstrap-server", + "localhost:9123", + "--command-config", + configFile.toString() + }); + + Properties loaded = opts.commandConfig(); + assertThat(loaded.getProperty("security.protocol")).isEqualTo("SASL_SSL"); + } + + @Test + void testCommandConfigDefaultsToEmptyProperties() throws ParseException { + CommandDefaultOptions opts = new CommandDefaultOptions(); + opts.parse(new String[] {"--bootstrap-server", "localhost:9123"}); + + assertThat(opts.commandConfig()).isEmpty(); + } +} diff --git a/fluss-cli/src/test/java/org/apache/fluss/cli/CommandUtilsTest.java b/fluss-cli/src/test/java/org/apache/fluss/cli/CommandUtilsTest.java new file mode 100644 index 0000000000..6bc0ab766d --- /dev/null +++ b/fluss-cli/src/test/java/org/apache/fluss/cli/CommandUtilsTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.cli; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.junit.jupiter.api.Test; + +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link CommandUtils}. */ +class CommandUtilsTest { + + @Test + void testParsePropertiesValid() { + Map props = + CommandUtils.parseProperties(new String[] {"key1=value1", "key2=value2"}); + + assertThat(props).hasSize(2); + assertThat(props.get("key1")).isEqualTo("value1"); + assertThat(props.get("key2")).isEqualTo("value2"); + } + + @Test + void testParsePropertiesWithEqualsInValue() { + Map props = CommandUtils.parseProperties(new String[] {"key=val=ue"}); + + assertThat(props.get("key")).isEqualTo("val=ue"); + } + + @Test + void testParsePropertiesNull() { + Map props = CommandUtils.parseProperties(null); + + assertThat(props).isEmpty(); + } + + @Test + void testParsePropertiesInvalidFormat() { + assertThatThrownBy(() -> CommandUtils.parseProperties(new String[] {"noequalssign"})) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Invalid property format"); + } + + @Test + void testValidateActionsExactlyOne() throws Exception { + Option createOpt = Option.builder().longOpt("create").build(); + Option listOpt = Option.builder().longOpt("list").build(); + Options opts = new Options(); + opts.addOption(createOpt); + opts.addOption(listOpt); + CommandLine cmd = new DefaultParser().parse(opts, new String[] {"--create"}); + + // should not throw + CommandUtils.validateActions(cmd, createOpt, listOpt); + } + + @Test + void testValidateActionsNoneThrows() throws Exception { + Option createOpt = Option.builder().longOpt("create").build(); + Option listOpt = Option.builder().longOpt("list").build(); + Options opts = new Options(); + opts.addOption(createOpt); + opts.addOption(listOpt); + CommandLine cmd = new DefaultParser().parse(opts, new String[] {}); + + assertThatThrownBy(() -> CommandUtils.validateActions(cmd, createOpt, listOpt)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Exactly one of"); + } + + @Test + void testValidateActionsMultipleThrows() throws Exception { + Option createOpt = Option.builder().longOpt("create").build(); + Option listOpt = Option.builder().longOpt("list").build(); + Options opts = new Options(); + opts.addOption(createOpt); + opts.addOption(listOpt); + CommandLine cmd = new DefaultParser().parse(opts, new String[] {"--create", "--list"}); + + assertThatThrownBy(() -> CommandUtils.validateActions(cmd, createOpt, listOpt)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Exactly one of"); + } + + @Test + void testUnwrapExecutionException() { + ExecutionException e = new ExecutionException(new RuntimeException("root cause")); + + assertThat(CommandUtils.unwrapExceptionMessage(e)).isEqualTo("root cause"); + } + + @Test + void testUnwrapRegularException() { + RuntimeException e = new RuntimeException("direct message"); + + assertThat(CommandUtils.unwrapExceptionMessage(e)).isEqualTo("direct message"); + } +} diff --git a/fluss-cli/src/test/java/org/apache/fluss/cli/DatabaseCommandTest.java b/fluss-cli/src/test/java/org/apache/fluss/cli/DatabaseCommandTest.java new file mode 100644 index 0000000000..066cd473b2 --- /dev/null +++ b/fluss-cli/src/test/java/org/apache/fluss/cli/DatabaseCommandTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.cli; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link DatabaseCommand} option parsing. */ +class DatabaseCommandTest { + + @Test + void testParseCreateOptions() throws Exception { + DatabaseCommand.DatabaseCommandOptions opts = + new DatabaseCommand.DatabaseCommandOptions( + new String[] { + "--bootstrap-server", "localhost:9123", "--create", "--database", "mydb" + }); + + assertThat(opts.hasCreateOption()).isTrue(); + assertThat(opts.hasListOption()).isFalse(); + assertThat(opts.database()).isEqualTo("mydb"); + } + + @Test + void testParseListOptions() throws Exception { + DatabaseCommand.DatabaseCommandOptions opts = + new DatabaseCommand.DatabaseCommandOptions( + new String[] {"--bootstrap-server", "localhost:9123", "--list"}); + + assertThat(opts.hasListOption()).isTrue(); + } + + @Test + void testParseDropWithCascadeAndIfExists() throws Exception { + DatabaseCommand.DatabaseCommandOptions opts = + new DatabaseCommand.DatabaseCommandOptions( + new String[] { + "--bootstrap-server", + "localhost:9123", + "--drop", + "--database", + "mydb", + "--cascade", + "--if-exists" + }); + + assertThat(opts.hasDropOption()).isTrue(); + assertThat(opts.database()).isEqualTo("mydb"); + assertThat(opts.cascade()).isTrue(); + assertThat(opts.ifExists()).isTrue(); + } + + @Test + void testMissingDatabaseForCreateThrows() throws Exception { + DatabaseCommand.DatabaseCommandOptions opts = + new DatabaseCommand.DatabaseCommandOptions( + new String[] {"--bootstrap-server", "localhost:9123", "--create"}); + + assertThatThrownBy(opts::database) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("--database is required"); + } + + @Test + void testIfNotExistsOption() throws Exception { + DatabaseCommand.DatabaseCommandOptions opts = + new DatabaseCommand.DatabaseCommandOptions( + new String[] { + "--bootstrap-server", + "localhost:9123", + "--create", + "--database", + "mydb", + "--if-not-exists" + }); + + assertThat(opts.ifNotExists()).isTrue(); + } + + @Test + void testMainNoExitReturnsOneOnError() { + // No args at all -> parse error -> exit code 1 + int exitCode = DatabaseCommand.mainNoExit(new String[] {}); + assertThat(exitCode).isEqualTo(1); + } +} diff --git a/fluss-cli/src/test/java/org/apache/fluss/cli/SchemaParserTest.java b/fluss-cli/src/test/java/org/apache/fluss/cli/SchemaParserTest.java new file mode 100644 index 0000000000..9793a3397c --- /dev/null +++ b/fluss-cli/src/test/java/org/apache/fluss/cli/SchemaParserTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.cli; + +import org.apache.fluss.metadata.Schema; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link SchemaParser}. */ +class SchemaParserTest { + + @Test + void testSimpleColumns() { + Schema schema = SchemaParser.parseSchema("id INT, name STRING", Collections.emptyList()); + + List cols = schema.getColumns(); + assertThat(cols).hasSize(2); + assertThat(cols.get(0).getName()).isEqualTo("id"); + assertThat(cols.get(0).getDataType().isNullable()).isTrue(); + assertThat(cols.get(1).getName()).isEqualTo("name"); + assertThat(cols.get(1).getDataType().isNullable()).isTrue(); + } + + @Test + void testNotNullColumn() { + Schema schema = + SchemaParser.parseSchema("id INT NOT NULL, name STRING", Collections.emptyList()); + + List cols = schema.getColumns(); + assertThat(cols.get(0).getName()).isEqualTo("id"); + assertThat(cols.get(0).getDataType().isNullable()).isFalse(); + assertThat(cols.get(1).getDataType().isNullable()).isTrue(); + } + + @Test + void testDecimalWithCommaInParens() { + Schema schema = SchemaParser.parseSchema("amount DECIMAL(10,2)", Collections.emptyList()); + + List cols = schema.getColumns(); + assertThat(cols).hasSize(1); + assertThat(cols.get(0).getName()).isEqualTo("amount"); + assertThat(cols.get(0).getDataType().toString()).contains("DECIMAL"); + } + + @Test + void testMultipleNotNull() { + Schema schema = + SchemaParser.parseSchema( + "a INT NOT NULL, b STRING NOT NULL", Collections.emptyList()); + + List cols = schema.getColumns(); + assertThat(cols.get(0).getDataType().isNullable()).isFalse(); + assertThat(cols.get(1).getDataType().isNullable()).isFalse(); + } + + @Test + void testSingleColumn() { + Schema schema = SchemaParser.parseSchema("id BIGINT", Collections.emptyList()); + + assertThat(schema.getColumns()).hasSize(1); + assertThat(schema.getColumns().get(0).getName()).isEqualTo("id"); + } + + @Test + void testExtraWhitespace() { + Schema schema = + SchemaParser.parseSchema(" id INT , name STRING ", Collections.emptyList()); + + List cols = schema.getColumns(); + assertThat(cols).hasSize(2); + assertThat(cols.get(0).getName()).isEqualTo("id"); + assertThat(cols.get(1).getName()).isEqualTo("name"); + } + + @Test + void testWithPrimaryKey() { + Schema schema = SchemaParser.parseSchema("id INT, name STRING", Arrays.asList("id")); + + assertThat(schema.getPrimaryKey()).isPresent(); + assertThat(schema.getPrimaryKey().get().getColumnNames()).containsExactly("id"); + } + + @Test + void testInvalidColumnDefinition() { + assertThatThrownBy(() -> SchemaParser.parseSchema("nospace", Collections.emptyList())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Invalid column definition"); + } + + @Test + void testUnknownTypeThrows() { + assertThatThrownBy(() -> SchemaParser.parseSchema("x FOOBAR", Collections.emptyList())) + .isInstanceOf(Exception.class); + } + + @Test + void testEmptySchemaThrows() { + assertThatThrownBy(() -> SchemaParser.parseSchema("", Collections.emptyList())) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void testAllSupportedTypes() { + String schema = + "a BOOLEAN, b TINYINT, c SMALLINT, d INT, e BIGINT, " + + "f FLOAT, g DOUBLE, h STRING, i DATE, j TIMESTAMP"; + Schema result = SchemaParser.parseSchema(schema, Collections.emptyList()); + + assertThat(result.getColumns()).hasSize(10); + } +} diff --git a/fluss-cli/src/test/java/org/apache/fluss/cli/TableCommandTest.java b/fluss-cli/src/test/java/org/apache/fluss/cli/TableCommandTest.java new file mode 100644 index 0000000000..61f0ac98ad --- /dev/null +++ b/fluss-cli/src/test/java/org/apache/fluss/cli/TableCommandTest.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.cli; + +import org.apache.fluss.metadata.TablePath; + +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link TableCommand} option parsing. */ +class TableCommandTest { + + @Test + void testParseCreateOptions() throws Exception { + TableCommand.TableCommandOptions opts = + new TableCommand.TableCommandOptions( + new String[] { + "--bootstrap-server", + "localhost:9123", + "--create", + "--table", + "mydb.users", + "--schema", + "id INT, name STRING", + "--primary-key", + "id", + "--buckets", + "4" + }); + + assertThat(opts.hasCreateOption()).isTrue(); + assertThat(opts.tablePath()).isEqualTo(TablePath.of("mydb", "users")); + assertThat(opts.schema()).isEqualTo("id INT, name STRING"); + assertThat(opts.primaryKeys()).containsExactly("id"); + assertThat(opts.buckets()).isEqualTo(4); + } + + @Test + void testParseListOptions() throws Exception { + TableCommand.TableCommandOptions opts = + new TableCommand.TableCommandOptions( + new String[] { + "--bootstrap-server", "localhost:9123", "--list", "--database", "mydb" + }); + + assertThat(opts.hasListOption()).isTrue(); + assertThat(opts.database()).isEqualTo("mydb"); + } + + @Test + void testTablePathParsing() throws Exception { + TableCommand.TableCommandOptions opts = + new TableCommand.TableCommandOptions( + new String[] { + "--bootstrap-server", + "localhost:9123", + "--describe", + "--table", + "mydb.orders" + }); + + TablePath path = opts.tablePath(); + assertThat(path.getDatabaseName()).isEqualTo("mydb"); + assertThat(path.getTableName()).isEqualTo("orders"); + } + + @Test + void testTablePathInvalidFormatNoDot() throws Exception { + TableCommand.TableCommandOptions opts = + new TableCommand.TableCommandOptions( + new String[] { + "--bootstrap-server", + "localhost:9123", + "--describe", + "--table", + "notable" + }); + + assertThatThrownBy(opts::tablePath) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Invalid table path"); + } + + @Test + void testTablePathInvalidEmptyParts() throws Exception { + TableCommand.TableCommandOptions opts = + new TableCommand.TableCommandOptions( + new String[] { + "--bootstrap-server", + "localhost:9123", + "--describe", + "--table", + ".table" + }); + + assertThatThrownBy(opts::tablePath) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Invalid table path"); + } + + @Test + void testMissingTableThrows() throws Exception { + TableCommand.TableCommandOptions opts = + new TableCommand.TableCommandOptions( + new String[] {"--bootstrap-server", "localhost:9123", "--describe"}); + + assertThatThrownBy(opts::tablePath) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("--table is required"); + } + + @Test + void testMissingSchemaForCreateThrows() throws Exception { + TableCommand.TableCommandOptions opts = + new TableCommand.TableCommandOptions( + new String[] { + "--bootstrap-server", + "localhost:9123", + "--create", + "--table", + "mydb.users" + }); + + assertThatThrownBy(opts::schema) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("--schema is required"); + } + + @Test + void testPartitionByOption() throws Exception { + TableCommand.TableCommandOptions opts = + new TableCommand.TableCommandOptions( + new String[] { + "--bootstrap-server", + "localhost:9123", + "--create", + "--table", + "mydb.orders", + "--schema", + "id INT, dt STRING", + "--partition-by", + "dt" + }); + + List keys = opts.partitionKeys(); + assertThat(keys).containsExactly("dt"); + } + + @Test + void testNoBucketsReturnsNull() throws Exception { + TableCommand.TableCommandOptions opts = + new TableCommand.TableCommandOptions( + new String[] { + "--bootstrap-server", + "localhost:9123", + "--create", + "--table", + "mydb.users", + "--schema", + "id INT" + }); + + assertThat(opts.buckets()).isNull(); + } + + @Test + void testMainNoExitReturnsOneOnError() { + int exitCode = TableCommand.mainNoExit(new String[] {}); + assertThat(exitCode).isEqualTo(1); + } +} diff --git a/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java b/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java index 2990054999..d4d8b86d06 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java @@ -291,6 +291,10 @@ private static List toBucketLocations( for (int i = 0; i < replicas.length; i++) { replicas[i] = pbBucketMetadata.getReplicaIdAt(i); } + int[] isr = new int[pbBucketMetadata.getIsrsCount()]; + for (int i = 0; i < isr.length; i++) { + isr[i] = pbBucketMetadata.getIsrAt(i); + } Integer leader = null; if (pbBucketMetadata.hasLeaderId()) { leader = pbBucketMetadata.getLeaderId(); @@ -298,7 +302,7 @@ private static List toBucketLocations( PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tablePath, partitionName); BucketLocation bucketLocation = - new BucketLocation(physicalTablePath, tableBucket, leader, replicas); + new BucketLocation(physicalTablePath, tableBucket, leader, replicas, isr); bucketLocations.add(bucketLocation); } return bucketLocations; diff --git a/fluss-common/src/main/java/org/apache/fluss/cluster/BucketLocation.java b/fluss-common/src/main/java/org/apache/fluss/cluster/BucketLocation.java index 36977136de..6977c6139a 100644 --- a/fluss-common/src/main/java/org/apache/fluss/cluster/BucketLocation.java +++ b/fluss-common/src/main/java/org/apache/fluss/cluster/BucketLocation.java @@ -33,6 +33,7 @@ public final class BucketLocation { private final TableBucket tableBucket; @Nullable private Integer leader; private final int[] replicas; + private final int[] isr; public BucketLocation( PhysicalTablePath physicalTablePath, @@ -40,7 +41,7 @@ public BucketLocation( int bucketId, @Nullable Integer leader, int[] replicas) { - this(physicalTablePath, new TableBucket(tableId, bucketId), leader, replicas); + this(physicalTablePath, new TableBucket(tableId, bucketId), leader, replicas, new int[0]); } public BucketLocation( @@ -48,10 +49,20 @@ public BucketLocation( TableBucket tableBucket, @Nullable Integer leader, int[] replicas) { + this(physicalTablePath, tableBucket, leader, replicas, new int[0]); + } + + public BucketLocation( + PhysicalTablePath physicalTablePath, + TableBucket tableBucket, + @Nullable Integer leader, + int[] replicas, + int[] isr) { this.physicalTablePath = physicalTablePath; this.tableBucket = tableBucket; this.leader = leader; this.replicas = replicas; + this.isr = isr; } public PhysicalTablePath getPhysicalTablePath() { @@ -79,6 +90,10 @@ public int[] getReplicas() { return replicas; } + public int[] getIsr() { + return isr; + } + @Override public boolean equals(Object object) { if (this == object) { @@ -91,22 +106,29 @@ public boolean equals(Object object) { return Objects.equals(physicalTablePath, that.physicalTablePath) && Objects.equals(tableBucket, that.tableBucket) && Objects.equals(leader, that.leader) - && Objects.deepEquals(replicas, that.replicas); + && Objects.deepEquals(replicas, that.replicas) + && Objects.deepEquals(isr, that.isr); } @Override public int hashCode() { - return Objects.hash(physicalTablePath, tableBucket, leader, Arrays.hashCode(replicas)); + return Objects.hash( + physicalTablePath, + tableBucket, + leader, + Arrays.hashCode(replicas), + Arrays.hashCode(isr)); } @Override public String toString() { return String.format( - "Bucket(physicalTablePath = %s, %s, leader = %s, replicas = %s)", + "Bucket(physicalTablePath = %s, %s, leader = %s, replicas = %s, isr = %s)", physicalTablePath, tableBucket, leader == null ? "none" : leader, - formatNodeIds(replicas)); + formatNodeIds(replicas), + formatNodeIds(isr)); } /** Format the node ids from each item in the array for display. */ diff --git a/fluss-common/src/test/java/org/apache/fluss/cluster/BucketLocationTest.java b/fluss-common/src/test/java/org/apache/fluss/cluster/BucketLocationTest.java index 9e494829c6..32ee31f217 100644 --- a/fluss-common/src/test/java/org/apache/fluss/cluster/BucketLocationTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/cluster/BucketLocationTest.java @@ -47,7 +47,7 @@ void testToString() { assertThat(bucketLocation.toString()) .isEqualTo( "Bucket(physicalTablePath = test_db.test_table, TableBucket{tableId=150001, bucket=0}, " - + "leader = 0, replicas = [0,1,2])"); + + "leader = 0, replicas = [0,1,2], isr = [])"); bucketLocation = new BucketLocation( @@ -55,6 +55,6 @@ void testToString() { assertThat(bucketLocation.toString()) .isEqualTo( "Bucket(physicalTablePath = test_db.test_table, TableBucket{tableId=150001, bucket=0}, " - + "leader = none, replicas = [0,1,2])"); + + "leader = none, replicas = [0,1,2], isr = [])"); } } diff --git a/fluss-dist/pom.xml b/fluss-dist/pom.xml index 117f0a6469..eca37bd1fb 100644 --- a/fluss-dist/pom.xml +++ b/fluss-dist/pom.xml @@ -39,6 +39,18 @@ provided + + org.apache.fluss + fluss-cli + ${project.version} + provided + + + + commons-cli + commons-cli + + org.apache.fluss diff --git a/fluss-dist/src/main/assemblies/bin.xml b/fluss-dist/src/main/assemblies/bin.xml index 3c1d879f08..5bbc1354d0 100644 --- a/fluss-dist/src/main/assemblies/bin.xml +++ b/fluss-dist/src/main/assemblies/bin.xml @@ -41,6 +41,7 @@ org.apache.logging.log4j:log4j-core org.apache.logging.log4j:log4j-slf4j-impl org.apache.logging.log4j:log4j-1.2-api + commons-cli:commons-cli @@ -56,6 +57,22 @@ 0644 + + + ../fluss-cli/target/fluss-cli-${project.version}.jar + lib/ + fluss-cli-${project.version}.jar + 0644 + + + + + ../fluss-client/target/fluss-client-${project.version}.jar + lib/ + fluss-client-${project.version}.jar + 0644 + + src/main/resources/server.yaml diff --git a/fluss-dist/src/main/resources/bin/fluss-database.sh b/fluss-dist/src/main/resources/bin/fluss-database.sh new file mode 100755 index 0000000000..51bb5543b4 --- /dev/null +++ b/fluss-dist/src/main/resources/bin/fluss-database.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +exec "`dirname "$0"`"/fluss-run-class.sh org.apache.fluss.cli.DatabaseCommand "$@" diff --git a/fluss-dist/src/main/resources/bin/fluss-run-class.sh b/fluss-dist/src/main/resources/bin/fluss-run-class.sh new file mode 100755 index 0000000000..2d3144d9fe --- /dev/null +++ b/fluss-dist/src/main/resources/bin/fluss-run-class.sh @@ -0,0 +1,47 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Launches a Java class with the Fluss classpath. +# Usage: fluss-run-class.sh [opts] + +if [ $# -lt 1 ]; then + echo "USAGE: $0 classname [opts]" + exit 1 +fi + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +FLUSS_CLASSPATH=`constructFlussClassPath` + +CLASS_TO_RUN=$1 +shift + +CLI_LOG_CONF="${FLUSS_CONF_DIR}/log4j-cli.properties" +if [ -f "$CLI_LOG_CONF" ]; then + log_setting=("-Dlog4j.configuration=file:${CLI_LOG_CONF}" "-Dlog4j.configurationFile=file:${CLI_LOG_CONF}") +else + log_setting=("-Dlog4j.configuration=file:${FLUSS_CONF_DIR}/log4j-console.properties" "-Dlog4j.configurationFile=file:${FLUSS_CONF_DIR}/log4j-console.properties") +fi + +FLUSS_ENV_JAVA_OPTS=$(eval echo ${FLUSS_ENV_JAVA_OPTS}) + +exec "$JAVA_RUN" $JVM_ARGS ${FLUSS_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLUSS_CLASSPATH"`" ${CLASS_TO_RUN} "$@" diff --git a/fluss-dist/src/main/resources/bin/fluss-table.sh b/fluss-dist/src/main/resources/bin/fluss-table.sh new file mode 100755 index 0000000000..69b309e61c --- /dev/null +++ b/fluss-dist/src/main/resources/bin/fluss-table.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +exec "`dirname "$0"`"/fluss-run-class.sh org.apache.fluss.cli.TableCommand "$@" diff --git a/fluss-dist/src/main/resources/conf/log4j-cli.properties b/fluss-dist/src/main/resources/conf/log4j-cli.properties new file mode 100644 index 0000000000..3555f363d8 --- /dev/null +++ b/fluss-dist/src/main/resources/conf/log4j-cli.properties @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +rootLogger.level=WARN +rootLogger.appenderRef.console.ref=ConsoleAppender + +appender.console.name=ConsoleAppender +appender.console.type=CONSOLE +appender.console.layout.type=PatternLayout +appender.console.layout.pattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %c - %m%n diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index e8381215fc..8d02f97718 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -830,8 +830,8 @@ message PbBucketMetadata { // optional as some time the leader may not elected yet optional int32 leader_id = 2; repeated int32 replica_id = 3 [packed = true]; - // TODO: Add isr here. optional int32 leader_epoch = 4; + repeated int32 isr = 5 [packed = true]; } message PbProduceLogReqForBucket { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java index 3441053671..37fe669c6c 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java @@ -323,6 +323,8 @@ public void addUpdateMetadataRequestForTabletServers( Integer leaderEpoch = bucketLeaderAndIsr.map(LeaderAndIsr::leaderEpoch).orElse(null); Integer leader = bucketLeaderAndIsr.map(LeaderAndIsr::leader).orElse(null); + List isr = + bucketLeaderAndIsr.map(LeaderAndIsr::isr).orElse(Collections.emptyList()); if (currentPartitionId == null) { Map> tableAssignment = coordinatorContext.getTableAssignment(currentTableId); @@ -331,7 +333,8 @@ public void addUpdateMetadataRequestForTabletServers( tableBucket.getBucket(), leader, leaderEpoch, - tableAssignment.get(tableBucket.getBucket())); + tableAssignment.get(tableBucket.getBucket()), + isr); updateMetadataRequestBucketMap .computeIfAbsent(currentTableId, k -> new ArrayList<>()) .add(bucketMetadata); @@ -345,7 +348,8 @@ public void addUpdateMetadataRequestForTabletServers( tableBucket.getBucket(), leader, leaderEpoch, - partitionAssignment.get(tableBucket.getBucket())); + partitionAssignment.get(tableBucket.getBucket()), + isr); updateMetadataRequestPartitionMap .computeIfAbsent(currentTableId, k -> new HashMap<>()) .computeIfAbsent(tableBucket.getPartitionId(), k -> new ArrayList<>()) diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metadata/BucketMetadata.java b/fluss-server/src/main/java/org/apache/fluss/server/metadata/BucketMetadata.java index b6b968f28e..9bc09ece8f 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metadata/BucketMetadata.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metadata/BucketMetadata.java @@ -19,6 +19,7 @@ import javax.annotation.Nullable; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.OptionalInt; @@ -29,16 +30,27 @@ public class BucketMetadata { private final @Nullable Integer leaderId; private final @Nullable Integer leaderEpoch; private final List replicas; + private final List isr; public BucketMetadata( int bucketId, @Nullable Integer leaderId, @Nullable Integer leaderEpoch, List replicas) { + this(bucketId, leaderId, leaderEpoch, replicas, Collections.emptyList()); + } + + public BucketMetadata( + int bucketId, + @Nullable Integer leaderId, + @Nullable Integer leaderEpoch, + List replicas, + List isr) { this.bucketId = bucketId; this.leaderId = leaderId; this.leaderEpoch = leaderEpoch; this.replicas = replicas; + this.isr = isr; } public int getBucketId() { @@ -57,6 +69,10 @@ public List getReplicas() { return replicas; } + public List getIsr() { + return isr; + } + @Override public String toString() { return "BucketMetadata{" @@ -68,6 +84,8 @@ public String toString() { + leaderEpoch + ", replicas=" + replicas + + ", isr=" + + isr + '}'; } @@ -83,11 +101,12 @@ public boolean equals(Object o) { return bucketId == that.bucketId && Objects.equals(leaderId, that.leaderId) && Objects.equals(leaderEpoch, that.leaderEpoch) - && replicas.equals(that.replicas); + && replicas.equals(that.replicas) + && isr.equals(that.isr); } @Override public int hashCode() { - return Objects.hash(bucketId, leaderId, leaderEpoch, replicas); + return Objects.hash(bucketId, leaderId, leaderEpoch, replicas, isr); } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metadata/CoordinatorMetadataProvider.java b/fluss-server/src/main/java/org/apache/fluss/server/metadata/CoordinatorMetadataProvider.java index 27511d0540..a1b06218c7 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metadata/CoordinatorMetadataProvider.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metadata/CoordinatorMetadataProvider.java @@ -30,6 +30,7 @@ import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -136,12 +137,15 @@ private static List getBucketMetadataFromContext( TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId); Optional optLeaderAndIsr = ctx.getBucketLeaderAndIsr(tableBucket); Integer leader = optLeaderAndIsr.map(LeaderAndIsr::leader).orElse(null); + List isr = + optLeaderAndIsr.map(LeaderAndIsr::isr).orElse(Collections.emptyList()); BucketMetadata bucketMetadata = new BucketMetadata( bucketId, leader, ctx.getBucketLeaderEpoch(tableBucket), - serverIds); + serverIds, + isr); bucketMetadataList.add(bucketMetadata); }); return bucketMetadataList; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java index f7a89828ab..b0d7c0ea8c 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java @@ -618,6 +618,10 @@ private static List toPbBucketMetadata( pbBucketMetadata.addReplicaId(replica); } + for (Integer isrNode : bucketMetadata.getIsr()) { + pbBucketMetadata.addIsr(isrNode); + } + pbBucketMetadataList.add(pbBucketMetadata); } return pbBucketMetadataList; @@ -658,7 +662,8 @@ private static BucketMetadata toBucketMetadata(PbBucketMetadata pbBucketMetadata pbBucketMetadata.hasLeaderEpoch() ? pbBucketMetadata.getLeaderEpoch() : null, Arrays.stream(pbBucketMetadata.getReplicaIds()) .boxed() - .collect(Collectors.toList())); + .collect(Collectors.toList()), + Arrays.stream(pbBucketMetadata.getIsrs()).boxed().collect(Collectors.toList())); } private static PartitionMetadata toPartitionMetadata(PbPartitionMetadata pbPartitionMetadata) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java index cb34a2f2ed..284d014825 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java @@ -1706,7 +1706,8 @@ private BucketMetadata createBucketMetadata( Integer leader = leaderAndIsr != null ? leaderAndIsr.leader() : null; Integer leaderEpoch = leaderAndIsr != null ? leaderAndIsr.leaderEpoch() : null; List replicas = assignment.getBucketAssignments().get(bucketId).getReplicas(); - return new BucketMetadata(bucketId, leader, leaderEpoch, replicas); + List isr = leaderAndIsr != null ? leaderAndIsr.isr() : Collections.emptyList(); + return new BucketMetadata(bucketId, leader, leaderEpoch, replicas, isr); } /** Close the underlying ZooKeeperClient. */ diff --git a/pom.xml b/pom.xml index dc5ec59d62..a2e46d7e65 100644 --- a/pom.xml +++ b/pom.xml @@ -66,6 +66,7 @@ fluss-jmh fluss-lake fluss-kafka + fluss-cli fluss-docgen tools/ci/fluss-ci-tools @@ -440,6 +441,12 @@ ${assertj.version} test + + + commons-cli + commons-cli + 1.5.0 +