Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1557,6 +1557,56 @@ public class ConfigOptions {
"The number of log segments to retain in local for each table when log tiered storage is enabled. "
+ "It must be greater that 0. The default is 2.");

public static final ConfigOption<MemorySize> TABLE_LOG_SEGMENT_FILE_SIZE =
key("table.log.segment.file-size")
.memoryType()
.noDefaultValue()
.withDescription(
"The segment file size for the log of this table. "
+ "When set, overrides the cluster-level '"
+ LOG_SEGMENT_FILE_SIZE.key()
+ "' for this table only.");

public static final ConfigOption<MemorySize> TABLE_LOG_INDEX_FILE_SIZE =
key("table.log.index.file-size")
.memoryType()
.noDefaultValue()
.withDescription(
"The index file size for the log of this table. "
+ "When set, overrides the cluster-level '"
+ LOG_INDEX_FILE_SIZE.key()
+ "' for this table only.");

public static final ConfigOption<MemorySize> TABLE_LOG_INDEX_INTERVAL_SIZE =
key("table.log.index.interval-size")
.memoryType()
.noDefaultValue()
.withDescription(
"The index interval size for the log of this table. "
+ "When set, overrides the cluster-level '"
+ LOG_INDEX_INTERVAL_SIZE.key()
+ "' for this table only.");

public static final ConfigOption<Boolean> TABLE_LOG_FILE_PREALLOCATE =
key("table.log.file-preallocate")
.booleanType()
.noDefaultValue()
.withDescription(
"Whether to preallocate log segment files for this table. "
+ "When set, overrides the cluster-level '"
+ LOG_FILE_PREALLOCATE.key()
+ "' for this table only.");

public static final ConfigOption<Long> TABLE_LOG_FLUSH_INTERVAL_MESSAGES =
key("table.log.flush.interval-messages")
.longType()
.noDefaultValue()
.withDescription(
"The flush interval in number of messages for the log of this table. "
+ "When set, overrides the cluster-level '"
+ LOG_FLUSH_INTERVAL_MESSAGES.key()
+ "' for this table only.");

public static final ConfigOption<Boolean> TABLE_DATALAKE_ENABLED =
key("table.datalake.enabled")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,12 @@ public class FlussConfigUtils {
ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(),
ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key(),
ConfigOptions.TABLE_AUTO_PARTITION_NUM_RETENTION.key(),
ConfigOptions.TABLE_STATISTICS_COLUMNS.key());
ConfigOptions.TABLE_STATISTICS_COLUMNS.key(),
ConfigOptions.TABLE_LOG_SEGMENT_FILE_SIZE.key(),
ConfigOptions.TABLE_LOG_INDEX_FILE_SIZE.key(),
ConfigOptions.TABLE_LOG_INDEX_INTERVAL_SIZE.key(),
ConfigOptions.TABLE_LOG_FILE_PREALLOCATE.key(),
ConfigOptions.TABLE_LOG_FLUSH_INTERVAL_MESSAGES.key());
}

public static boolean isTableStorageConfig(String key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,14 +263,16 @@ private void waitForLoadLogsInDir(LogRecoveryTask recoveryTask) throws Throwable
* @param logFormat the log format
* @param tieredLogLocalSegments the number of segments to retain in local for tiered log
* @param isChangelog whether the log is a changelog of primary key table
* @param tableProperties table-level configuration properties
*/
public LogTablet getOrCreateLog(
File dataDir,
PhysicalTablePath tablePath,
TableBucket tableBucket,
LogFormat logFormat,
int tieredLogLocalSegments,
boolean isChangelog)
boolean isChangelog,
Configuration tableProperties)
throws Exception {
return inLock(
logCreationOrDeletionLock,
Expand All @@ -287,6 +289,7 @@ public LogTablet getOrCreateLog(
tablePath,
tabletDir,
conf,
tableProperties,
serverMetricGroup,
0L,
scheduler,
Expand All @@ -306,6 +309,25 @@ public LogTablet getOrCreateLog(
});
}

@VisibleForTesting
public LogTablet getOrCreateLog(
File dataDir,
PhysicalTablePath tablePath,
TableBucket tableBucket,
LogFormat logFormat,
int tieredLogLocalSegments,
boolean isChangelog)
throws Exception {
return getOrCreateLog(
dataDir,
tablePath,
tableBucket,
logFormat,
tieredLogLocalSegments,
isChangelog,
new Configuration());
}

public Optional<LogTablet> getLog(TableBucket tableBucket) {
return Optional.ofNullable(currentLogs.get(tableBucket));
}
Expand Down Expand Up @@ -392,6 +414,7 @@ private LogTablet loadLog(
physicalTablePath,
tabletDir,
conf,
tableInfo.getProperties(),
serverMetricGroup,
logRecoveryPoint,
scheduler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,38 @@ public static LogTablet create(
Clock clock,
boolean isCleanShutdown)
throws Exception {
return create(
dataDir,
tablePath,
tabletDir,
conf,
new Configuration(),
serverMetricGroup,
recoveryPoint,
scheduler,
logFormat,
tieredLogLocalSegments,
isChangelog,
clock,
isCleanShutdown);
}

public static LogTablet create(
File dataDir,
PhysicalTablePath tablePath,
File tabletDir,
Configuration conf,
Configuration tableProperties,
TabletServerMetricGroup serverMetricGroup,
long recoveryPoint,
Scheduler scheduler,
LogFormat logFormat,
int tieredLogLocalSegments,
boolean isChangelog,
Clock clock,
boolean isCleanShutdown)
throws Exception {
conf = buildEffectiveLogConfig(conf, tableProperties);
// create the log directory if it doesn't exist
Files.createDirectories(tabletDir.toPath());

Expand Down Expand Up @@ -1378,4 +1410,30 @@ public List<LogSegment> getSegments() {
public long getMinRetainOffset() {
return minRetainOffset;
}

/**
* Builds an effective log configuration by overlaying table-level log.* overrides onto the
* server-level configuration. Table-level values take precedence. Options not set at table
* level transparently fall back to the server value.
*/
private static Configuration buildEffectiveLogConfig(
Configuration serverConf, Configuration tableProperties) {
Configuration effective = new Configuration(serverConf);
tableProperties
.getOptional(ConfigOptions.TABLE_LOG_SEGMENT_FILE_SIZE)
.ifPresent(v -> effective.set(ConfigOptions.LOG_SEGMENT_FILE_SIZE, v));
tableProperties
.getOptional(ConfigOptions.TABLE_LOG_INDEX_FILE_SIZE)
.ifPresent(v -> effective.set(ConfigOptions.LOG_INDEX_FILE_SIZE, v));
tableProperties
.getOptional(ConfigOptions.TABLE_LOG_INDEX_INTERVAL_SIZE)
.ifPresent(v -> effective.set(ConfigOptions.LOG_INDEX_INTERVAL_SIZE, v));
tableProperties
.getOptional(ConfigOptions.TABLE_LOG_FILE_PREALLOCATE)
.ifPresent(v -> effective.set(ConfigOptions.LOG_FILE_PREALLOCATE, v));
tableProperties
.getOptional(ConfigOptions.TABLE_LOG_FLUSH_INTERVAL_MESSAGES)
.ifPresent(v -> effective.set(ConfigOptions.LOG_FLUSH_INTERVAL_MESSAGES, v));
return effective;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2159,7 +2159,8 @@ private LogTablet createLog(
tableBucket,
tableConfig.getLogFormat(),
tableConfig.getTieredLogLocalSegments(),
isKvTable());
isKvTable(),
tableInfo.getProperties());
// update high watermark.
Optional<Long> watermarkOpt = lazyHighWatermarkCheckpoint.fetch(tableBucket);
long watermark =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public static void validateTableDescriptor(
checkSystemColumns(schema.getRowType());
validateStatisticsConfig(tableDescriptor);
checkTableLakeFormatMatchesCluster(tableConf, clusterDataLakeFormat);
checkLogSegmentFileSize(tableConf);
}

private static void checkTableLakeFormatMatchesCluster(
Expand Down Expand Up @@ -408,6 +409,21 @@ private static void validateAggregationFunctionParameters(Schema schema) {
}
}

private static void checkLogSegmentFileSize(Configuration tableConf) {
tableConf
.getOptional(ConfigOptions.TABLE_LOG_SEGMENT_FILE_SIZE)
.ifPresent(
size -> {
if (size.getBytes() > Integer.MAX_VALUE) {
throw new InvalidConfigException(
String.format(
"Invalid configuration for '%s', it must be less than or equal to %d bytes.",
ConfigOptions.TABLE_LOG_SEGMENT_FILE_SIZE.key(),
Integer.MAX_VALUE));
}
});
}

private static void checkTieredLog(Configuration tableConf) {
if (tableConf.get(ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS) <= 0) {
throw new InvalidConfigException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,55 @@ void testDuplicateAppends() throws Exception {
logTablet.getTableBucket().getBucket()));
}

@Test
void testTableLevelSegmentFileSizeOverridesServerConfig() throws Exception {
// Server-level default is 1024m; set table-level override to a tiny value so that
// each single-row append triggers a segment roll.
MemoryLogRecords records =
genMemoryLogRecordsByObject(Collections.singletonList(new Object[] {1, "a"}));
int recordsSize = records.sizeInBytes();

// Without table-level override: segment file size is the server default (1024m),
// appending a few small records stays in one segment.
LogTablet logDefault = createLogTablet(conf);
logDefault.appendAsLeader(records);
logDefault.appendAsLeader(
genMemoryLogRecordsByObject(Collections.singletonList(new Object[] {2, "b"})));
assertThat(logDefault.logSegments().size()).isEqualTo(1);

// With table-level override: set segment size just large enough for one batch so that
// the second append triggers a roll to a new segment.
Configuration tableProperties = new Configuration();
tableProperties.set(ConfigOptions.TABLE_LOG_SEGMENT_FILE_SIZE, new MemorySize(recordsSize));
LogTablet logOverride = createLogTablet(conf, tableProperties);
logOverride.appendAsLeader(records);
logOverride.appendAsLeader(
genMemoryLogRecordsByObject(Collections.singletonList(new Object[] {2, "b"})));
assertThat(logOverride.logSegments().size()).isGreaterThan(1);
}

@Test
void testTableLevelFlushIntervalMessagesOverridesServerConfig() throws Exception {
// Server-level default is Long.MAX_VALUE (flush only on explicit flush()).
// With table-level override of 1, every message triggers a flush.
Configuration tableProperties = new Configuration();
tableProperties.set(ConfigOptions.TABLE_LOG_FLUSH_INTERVAL_MESSAGES, 1L);
LogTablet log = createLogTablet(conf, tableProperties);

MemoryLogRecords records =
genMemoryLogRecordsByObject(Collections.singletonList(new Object[] {1, "a"}));
log.appendAsLeader(records);
// After appending with flushIntervalMessages=1, the unflushed offset should equal
// the log end offset (i.e., the flush happened automatically).
assertThat(log.localLogEndOffset()).isEqualTo(log.getRecoveryPoint());
}

private LogTablet createLogTablet(Configuration config) throws Exception {
return createLogTablet(config, new Configuration());
}

private LogTablet createLogTablet(Configuration config, Configuration tableProperties)
throws Exception {
File logDir =
LogTestUtils.makeRandomLogTabletDir(
tempDir,
Expand All @@ -496,6 +544,7 @@ private LogTablet createLogTablet(Configuration config) throws Exception {
PhysicalTablePath.of(DATA1_TABLE_PATH),
logDir,
config,
tableProperties,
TestingMetricGroups.TABLET_SERVER_METRICS,
0,
scheduler,
Expand Down
Loading