From 35e7069f1cfe34884d0e5011dfc04b0e48006c51 Mon Sep 17 00:00:00 2001 From: sanshi <1715734693@qq.com> Date: Fri, 22 May 2026 15:39:15 +0800 Subject: [PATCH] [server] Allow table-level overrides for log segment/index/flush options --- .../apache/fluss/config/ConfigOptions.java | 50 +++++++ .../apache/fluss/config/FlussConfigUtils.java | 7 +- .../apache/fluss/server/log/LogManager.java | 25 +++- .../apache/fluss/server/log/LogTablet.java | 58 ++++++++ .../apache/fluss/server/replica/Replica.java | 3 +- .../utils/TableDescriptorValidation.java | 16 +++ .../fluss/server/log/LogTabletTest.java | 49 +++++++ .../utils/TableDescriptorValidationTest.java | 126 ++++++++++++++++++ 8 files changed, 331 insertions(+), 3 deletions(-) create mode 100644 fluss-server/src/test/java/org/apache/fluss/server/utils/TableDescriptorValidationTest.java diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index 1818e16ef3..d3a1705c1b 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -1557,6 +1557,56 @@ public class ConfigOptions { "The number of log segments to retain in local for each table when log tiered storage is enabled. " + "It must be greater that 0. The default is 2."); + public static final ConfigOption TABLE_LOG_SEGMENT_FILE_SIZE = + key("table.log.segment.file-size") + .memoryType() + .noDefaultValue() + .withDescription( + "The segment file size for the log of this table. " + + "When set, overrides the cluster-level '" + + LOG_SEGMENT_FILE_SIZE.key() + + "' for this table only."); + + public static final ConfigOption TABLE_LOG_INDEX_FILE_SIZE = + key("table.log.index.file-size") + .memoryType() + .noDefaultValue() + .withDescription( + "The index file size for the log of this table. " + + "When set, overrides the cluster-level '" + + LOG_INDEX_FILE_SIZE.key() + + "' for this table only."); + + public static final ConfigOption TABLE_LOG_INDEX_INTERVAL_SIZE = + key("table.log.index.interval-size") + .memoryType() + .noDefaultValue() + .withDescription( + "The index interval size for the log of this table. " + + "When set, overrides the cluster-level '" + + LOG_INDEX_INTERVAL_SIZE.key() + + "' for this table only."); + + public static final ConfigOption TABLE_LOG_FILE_PREALLOCATE = + key("table.log.file-preallocate") + .booleanType() + .noDefaultValue() + .withDescription( + "Whether to preallocate log segment files for this table. " + + "When set, overrides the cluster-level '" + + LOG_FILE_PREALLOCATE.key() + + "' for this table only."); + + public static final ConfigOption TABLE_LOG_FLUSH_INTERVAL_MESSAGES = + key("table.log.flush.interval-messages") + .longType() + .noDefaultValue() + .withDescription( + "The flush interval in number of messages for the log of this table. " + + "When set, overrides the cluster-level '" + + LOG_FLUSH_INTERVAL_MESSAGES.key() + + "' for this table only."); + public static final ConfigOption TABLE_DATALAKE_ENABLED = key("table.datalake.enabled") .booleanType() diff --git a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java index adbbce0af4..ddc7d93560 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java @@ -50,7 +50,12 @@ public class FlussConfigUtils { ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(), ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key(), ConfigOptions.TABLE_AUTO_PARTITION_NUM_RETENTION.key(), - ConfigOptions.TABLE_STATISTICS_COLUMNS.key()); + ConfigOptions.TABLE_STATISTICS_COLUMNS.key(), + ConfigOptions.TABLE_LOG_SEGMENT_FILE_SIZE.key(), + ConfigOptions.TABLE_LOG_INDEX_FILE_SIZE.key(), + ConfigOptions.TABLE_LOG_INDEX_INTERVAL_SIZE.key(), + ConfigOptions.TABLE_LOG_FILE_PREALLOCATE.key(), + ConfigOptions.TABLE_LOG_FLUSH_INTERVAL_MESSAGES.key()); } public static boolean isTableStorageConfig(String key) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java b/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java index ae04328202..2d9b09b510 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java @@ -263,6 +263,7 @@ private void waitForLoadLogsInDir(LogRecoveryTask recoveryTask) throws Throwable * @param logFormat the log format * @param tieredLogLocalSegments the number of segments to retain in local for tiered log * @param isChangelog whether the log is a changelog of primary key table + * @param tableProperties table-level configuration properties */ public LogTablet getOrCreateLog( File dataDir, @@ -270,7 +271,8 @@ public LogTablet getOrCreateLog( TableBucket tableBucket, LogFormat logFormat, int tieredLogLocalSegments, - boolean isChangelog) + boolean isChangelog, + Configuration tableProperties) throws Exception { return inLock( logCreationOrDeletionLock, @@ -287,6 +289,7 @@ public LogTablet getOrCreateLog( tablePath, tabletDir, conf, + tableProperties, serverMetricGroup, 0L, scheduler, @@ -306,6 +309,25 @@ public LogTablet getOrCreateLog( }); } + @VisibleForTesting + public LogTablet getOrCreateLog( + File dataDir, + PhysicalTablePath tablePath, + TableBucket tableBucket, + LogFormat logFormat, + int tieredLogLocalSegments, + boolean isChangelog) + throws Exception { + return getOrCreateLog( + dataDir, + tablePath, + tableBucket, + logFormat, + tieredLogLocalSegments, + isChangelog, + new Configuration()); + } + public Optional getLog(TableBucket tableBucket) { return Optional.ofNullable(currentLogs.get(tableBucket)); } @@ -392,6 +414,7 @@ private LogTablet loadLog( physicalTablePath, tabletDir, conf, + tableInfo.getProperties(), serverMetricGroup, logRecoveryPoint, scheduler, diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java index 23c60c82e1..bd14374f76 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java @@ -316,6 +316,38 @@ public static LogTablet create( Clock clock, boolean isCleanShutdown) throws Exception { + return create( + dataDir, + tablePath, + tabletDir, + conf, + new Configuration(), + serverMetricGroup, + recoveryPoint, + scheduler, + logFormat, + tieredLogLocalSegments, + isChangelog, + clock, + isCleanShutdown); + } + + public static LogTablet create( + File dataDir, + PhysicalTablePath tablePath, + File tabletDir, + Configuration conf, + Configuration tableProperties, + TabletServerMetricGroup serverMetricGroup, + long recoveryPoint, + Scheduler scheduler, + LogFormat logFormat, + int tieredLogLocalSegments, + boolean isChangelog, + Clock clock, + boolean isCleanShutdown) + throws Exception { + conf = buildEffectiveLogConfig(conf, tableProperties); // create the log directory if it doesn't exist Files.createDirectories(tabletDir.toPath()); @@ -1378,4 +1410,30 @@ public List getSegments() { public long getMinRetainOffset() { return minRetainOffset; } + + /** + * Builds an effective log configuration by overlaying table-level log.* overrides onto the + * server-level configuration. Table-level values take precedence. Options not set at table + * level transparently fall back to the server value. + */ + private static Configuration buildEffectiveLogConfig( + Configuration serverConf, Configuration tableProperties) { + Configuration effective = new Configuration(serverConf); + tableProperties + .getOptional(ConfigOptions.TABLE_LOG_SEGMENT_FILE_SIZE) + .ifPresent(v -> effective.set(ConfigOptions.LOG_SEGMENT_FILE_SIZE, v)); + tableProperties + .getOptional(ConfigOptions.TABLE_LOG_INDEX_FILE_SIZE) + .ifPresent(v -> effective.set(ConfigOptions.LOG_INDEX_FILE_SIZE, v)); + tableProperties + .getOptional(ConfigOptions.TABLE_LOG_INDEX_INTERVAL_SIZE) + .ifPresent(v -> effective.set(ConfigOptions.LOG_INDEX_INTERVAL_SIZE, v)); + tableProperties + .getOptional(ConfigOptions.TABLE_LOG_FILE_PREALLOCATE) + .ifPresent(v -> effective.set(ConfigOptions.LOG_FILE_PREALLOCATE, v)); + tableProperties + .getOptional(ConfigOptions.TABLE_LOG_FLUSH_INTERVAL_MESSAGES) + .ifPresent(v -> effective.set(ConfigOptions.LOG_FLUSH_INTERVAL_MESSAGES, v)); + return effective; + } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java index 1f8b893ab4..5b3508f66e 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java @@ -2159,7 +2159,8 @@ private LogTablet createLog( tableBucket, tableConfig.getLogFormat(), tableConfig.getTieredLogLocalSegments(), - isKvTable()); + isKvTable(), + tableInfo.getProperties()); // update high watermark. Optional watermarkOpt = lazyHighWatermarkCheckpoint.fetch(tableBucket); long watermark = diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java index fcd5f1688a..a038ad6ad6 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java @@ -127,6 +127,7 @@ public static void validateTableDescriptor( checkSystemColumns(schema.getRowType()); validateStatisticsConfig(tableDescriptor); checkTableLakeFormatMatchesCluster(tableConf, clusterDataLakeFormat); + checkLogSegmentFileSize(tableConf); } private static void checkTableLakeFormatMatchesCluster( @@ -408,6 +409,21 @@ private static void validateAggregationFunctionParameters(Schema schema) { } } + private static void checkLogSegmentFileSize(Configuration tableConf) { + tableConf + .getOptional(ConfigOptions.TABLE_LOG_SEGMENT_FILE_SIZE) + .ifPresent( + size -> { + if (size.getBytes() > Integer.MAX_VALUE) { + throw new InvalidConfigException( + String.format( + "Invalid configuration for '%s', it must be less than or equal to %d bytes.", + ConfigOptions.TABLE_LOG_SEGMENT_FILE_SIZE.key(), + Integer.MAX_VALUE)); + } + }); + } + private static void checkTieredLog(Configuration tableConf) { if (tableConf.get(ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS) <= 0) { throw new InvalidConfigException( diff --git a/fluss-server/src/test/java/org/apache/fluss/server/log/LogTabletTest.java b/fluss-server/src/test/java/org/apache/fluss/server/log/LogTabletTest.java index 46685cfe05..3d8e91ac04 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/log/LogTabletTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/log/LogTabletTest.java @@ -482,7 +482,55 @@ void testDuplicateAppends() throws Exception { logTablet.getTableBucket().getBucket())); } + @Test + void testTableLevelSegmentFileSizeOverridesServerConfig() throws Exception { + // Server-level default is 1024m; set table-level override to a tiny value so that + // each single-row append triggers a segment roll. + MemoryLogRecords records = + genMemoryLogRecordsByObject(Collections.singletonList(new Object[] {1, "a"})); + int recordsSize = records.sizeInBytes(); + + // Without table-level override: segment file size is the server default (1024m), + // appending a few small records stays in one segment. + LogTablet logDefault = createLogTablet(conf); + logDefault.appendAsLeader(records); + logDefault.appendAsLeader( + genMemoryLogRecordsByObject(Collections.singletonList(new Object[] {2, "b"}))); + assertThat(logDefault.logSegments().size()).isEqualTo(1); + + // With table-level override: set segment size just large enough for one batch so that + // the second append triggers a roll to a new segment. + Configuration tableProperties = new Configuration(); + tableProperties.set(ConfigOptions.TABLE_LOG_SEGMENT_FILE_SIZE, new MemorySize(recordsSize)); + LogTablet logOverride = createLogTablet(conf, tableProperties); + logOverride.appendAsLeader(records); + logOverride.appendAsLeader( + genMemoryLogRecordsByObject(Collections.singletonList(new Object[] {2, "b"}))); + assertThat(logOverride.logSegments().size()).isGreaterThan(1); + } + + @Test + void testTableLevelFlushIntervalMessagesOverridesServerConfig() throws Exception { + // Server-level default is Long.MAX_VALUE (flush only on explicit flush()). + // With table-level override of 1, every message triggers a flush. + Configuration tableProperties = new Configuration(); + tableProperties.set(ConfigOptions.TABLE_LOG_FLUSH_INTERVAL_MESSAGES, 1L); + LogTablet log = createLogTablet(conf, tableProperties); + + MemoryLogRecords records = + genMemoryLogRecordsByObject(Collections.singletonList(new Object[] {1, "a"})); + log.appendAsLeader(records); + // After appending with flushIntervalMessages=1, the unflushed offset should equal + // the log end offset (i.e., the flush happened automatically). + assertThat(log.localLogEndOffset()).isEqualTo(log.getRecoveryPoint()); + } + private LogTablet createLogTablet(Configuration config) throws Exception { + return createLogTablet(config, new Configuration()); + } + + private LogTablet createLogTablet(Configuration config, Configuration tableProperties) + throws Exception { File logDir = LogTestUtils.makeRandomLogTabletDir( tempDir, @@ -496,6 +544,7 @@ private LogTablet createLogTablet(Configuration config) throws Exception { PhysicalTablePath.of(DATA1_TABLE_PATH), logDir, config, + tableProperties, TestingMetricGroups.TABLET_SERVER_METRICS, 0, scheduler, diff --git a/fluss-server/src/test/java/org/apache/fluss/server/utils/TableDescriptorValidationTest.java b/fluss-server/src/test/java/org/apache/fluss/server/utils/TableDescriptorValidationTest.java new file mode 100644 index 0000000000..53747bce6f --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/utils/TableDescriptorValidationTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.utils; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.MemorySize; +import org.apache.fluss.exception.InvalidConfigException; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.types.DataTypes; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for table-level log config options in {@link TableDescriptorValidation}. */ +class TableDescriptorValidationTest { + + private static final Schema SCHEMA = Schema.newBuilder().column("id", DataTypes.INT()).build(); + + /** Builds a minimal valid {@link TableDescriptor} with the given extra property. */ + private static TableDescriptor descriptorWithProperty(String key, String value) { + return TableDescriptor.builder() + .schema(SCHEMA) + .distributedBy(1) + .property(ConfigOptions.TABLE_REPLICATION_FACTOR.key(), "1") + .property(key, value) + .build(); + } + + @Test + void testTableLogSegmentFileSizeValidValue() { + // A valid value (e.g. 16m) should not throw. + TableDescriptor descriptor = + descriptorWithProperty(ConfigOptions.TABLE_LOG_SEGMENT_FILE_SIZE.key(), "16m"); + // Should succeed without exception + TableDescriptorValidation.validateTableDescriptor(descriptor, 100, null); + } + + @Test + void testTableLogSegmentFileSizeOverflowThrows() { + // Integer.MAX_VALUE bytes = 2147483647 bytes; set one byte more. + long overflowBytes = (long) Integer.MAX_VALUE + 1; + String overflowValue = overflowBytes + "b"; + TableDescriptor descriptor = + descriptorWithProperty( + ConfigOptions.TABLE_LOG_SEGMENT_FILE_SIZE.key(), overflowValue); + assertThatThrownBy( + () -> + TableDescriptorValidation.validateTableDescriptor( + descriptor, 100, null)) + .isInstanceOf(InvalidConfigException.class) + .hasMessageContaining(ConfigOptions.TABLE_LOG_SEGMENT_FILE_SIZE.key()) + .hasMessageContaining(String.valueOf(Integer.MAX_VALUE)); + } + + @Test + void testTableLogSegmentFileSizeAtMaxIntegerAllowed() { + // Exactly Integer.MAX_VALUE bytes should be accepted. + MemorySize maxSize = new MemorySize(Integer.MAX_VALUE); + TableDescriptor descriptor = + descriptorWithProperty( + ConfigOptions.TABLE_LOG_SEGMENT_FILE_SIZE.key(), maxSize.toString()); + // Should succeed without exception + TableDescriptorValidation.validateTableDescriptor(descriptor, 100, null); + } + + @Test + void testTableLogIndexFileSizeValidValue() { + TableDescriptor descriptor = + descriptorWithProperty(ConfigOptions.TABLE_LOG_INDEX_FILE_SIZE.key(), "8m"); + TableDescriptorValidation.validateTableDescriptor(descriptor, 100, null); + } + + @Test + void testTableLogIndexIntervalSizeValidValue() { + TableDescriptor descriptor = + descriptorWithProperty(ConfigOptions.TABLE_LOG_INDEX_INTERVAL_SIZE.key(), "4096b"); + TableDescriptorValidation.validateTableDescriptor(descriptor, 100, null); + } + + @Test + void testTableLogFilePreallocateValidValue() { + TableDescriptor descriptor = + descriptorWithProperty(ConfigOptions.TABLE_LOG_FILE_PREALLOCATE.key(), "true"); + TableDescriptorValidation.validateTableDescriptor(descriptor, 100, null); + } + + @Test + void testTableLogFlushIntervalMessagesValidValue() { + TableDescriptor descriptor = + descriptorWithProperty( + ConfigOptions.TABLE_LOG_FLUSH_INTERVAL_MESSAGES.key(), "100"); + TableDescriptorValidation.validateTableDescriptor(descriptor, 100, null); + } + + @Test + void testTableLogOptionsRecognizedAsTableOptions() { + // All five new options must be recognised as valid table properties (i.e. present in + // TABLE_OPTIONS); otherwise validateTableDescriptor would throw InvalidConfigException + // with "not a recognized Fluss table property". + assertThat(org.apache.fluss.config.FlussConfigUtils.TABLE_OPTIONS.keySet()) + .contains( + ConfigOptions.TABLE_LOG_SEGMENT_FILE_SIZE.key(), + ConfigOptions.TABLE_LOG_INDEX_FILE_SIZE.key(), + ConfigOptions.TABLE_LOG_INDEX_INTERVAL_SIZE.key(), + ConfigOptions.TABLE_LOG_FILE_PREALLOCATE.key(), + ConfigOptions.TABLE_LOG_FLUSH_INTERVAL_MESSAGES.key()); + } +}