Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,23 +61,38 @@ public class LakeCommitResult {
// 2: committedIsReadable is true, committed snapshot is just also readable
@Nullable private final ReadableSnapshot readableSnapshot;

@Nullable private final TieringStats tieringStats;

private LakeCommitResult(
long committedSnapshotId,
boolean committedIsReadable,
@Nullable ReadableSnapshot readableSnapshot,
@Nullable Long earliestSnapshotIDToKeep) {
@Nullable Long earliestSnapshotIDToKeep,
@Nullable TieringStats tieringStats) {
this.committedSnapshotId = committedSnapshotId;
this.committedIsReadable = committedIsReadable;
this.readableSnapshot = readableSnapshot;
this.earliestSnapshotIDToKeep = earliestSnapshotIDToKeep;
this.tieringStats = tieringStats;
}

public static LakeCommitResult committedIsReadable(long committedSnapshotId) {
return new LakeCommitResult(committedSnapshotId, true, null, KEEP_LATEST);
return committedIsReadable(committedSnapshotId, null);
}

public static LakeCommitResult committedIsReadable(
long committedSnapshotId, @Nullable TieringStats tieringStats) {
return new LakeCommitResult(committedSnapshotId, true, null, KEEP_LATEST, tieringStats);
}

public static LakeCommitResult unknownReadableSnapshot(long committedSnapshotId) {
return new LakeCommitResult(committedSnapshotId, false, null, KEEP_ALL_PREVIOUS);
return unknownReadableSnapshot(committedSnapshotId, null);
}

public static LakeCommitResult unknownReadableSnapshot(
long committedSnapshotId, @Nullable TieringStats tieringStats) {
return new LakeCommitResult(
committedSnapshotId, false, null, KEEP_ALL_PREVIOUS, tieringStats);
}

public static LakeCommitResult withReadableSnapshot(
Expand All @@ -89,12 +104,29 @@ public static LakeCommitResult withReadableSnapshot(
// the readable log end offset for readable snapshot
Map<TableBucket, Long> readableLogEndOffsets,
@Nullable Long earliestSnapshotIDToKeep) {
return withReadableSnapshot(
committedSnapshotId,
readableSnapshotId,
tieredLogEndOffsets,
readableLogEndOffsets,
earliestSnapshotIDToKeep,
null);
}

public static LakeCommitResult withReadableSnapshot(
long committedSnapshotId,
long readableSnapshotId,
Map<TableBucket, Long> tieredLogEndOffsets,
Map<TableBucket, Long> readableLogEndOffsets,
@Nullable Long earliestSnapshotIDToKeep,
@Nullable TieringStats tieringStats) {
return new LakeCommitResult(
committedSnapshotId,
false,
new ReadableSnapshot(
readableSnapshotId, tieredLogEndOffsets, readableLogEndOffsets),
earliestSnapshotIDToKeep);
earliestSnapshotIDToKeep,
tieringStats);
}

public long getCommittedSnapshotId() {
Expand All @@ -110,6 +142,16 @@ public ReadableSnapshot getReadableSnapshot() {
return readableSnapshot;
}

/**
* Gets the tiering stats.
*
* @return the tiering stats
*/
@Nullable
public TieringStats getTieringStats() {
return tieringStats;
}

/**
* Gets the earliest snapshot ID to keep.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.lake.committer;

import javax.annotation.Nullable;

import java.io.Serializable;

/**
* Immutable statistics for a single completed tiering round of a lake table.
*
* <p>Fields use {@code null} to represent "unknown / not supported".
*/
public final class TieringStats implements Serializable {

private static final long serialVersionUID = 1L;

/**
* A {@code TieringStats} instance where every field is {@code null} (unknown/unsupported). Use
* this as the default when no stats are available.
*/
public static final TieringStats UNKNOWN = new TieringStats(null, null);

// -----------------------------------------------------------------------------------------
// Lake data stats (reported by the lake committer)
// -----------------------------------------------------------------------------------------

/** Cumulative total file size (bytes) of the lake table after this tiering round. */
@Nullable private final Long fileSize;

/** Cumulative total record count of the lake table after this tiering round. */
@Nullable private final Long recordCount;

public TieringStats(@Nullable Long fileSize, @Nullable Long recordCount) {
this.fileSize = fileSize;
this.recordCount = recordCount;
}

@Nullable
public Long getFileSize() {
return fileSize;
}

@Nullable
public Long getRecordCount() {
return recordCount;
}

/**
* Returns {@code true} when at least one stat field is non-{@code null}, meaning actual data
* was written during this tiering round.
*/
public boolean isAvailableStats() {
return fileSize != null || recordCount != null;
}

@Override
public String toString() {
return "TieringStats{" + "fileSize=" + fileSize + ", recordCount=" + recordCount + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ public class MetricNames {
public static final String LAKE_TIERING_PENDING_TABLES_COUNT = "pendingTablesCount";
public static final String LAKE_TIERING_RUNNING_TABLES_COUNT = "runningTablesCount";

// for lake tiering table-level metrics
public static final String LAKE_TIERING_TABLE_TIER_LAG = "tierLag";
public static final String LAKE_TIERING_TABLE_TIER_DURATION = "tierDuration";
public static final String LAKE_TIERING_TABLE_FAILURES_TOTAL = "failuresTotal";
public static final String LAKE_TIERING_TABLE_FILE_SIZE = "fileSize";
public static final String LAKE_TIERING_TABLE_RECORD_COUNT = "recordCount";

// --------------------------------------------------------------------------------------------
// metrics for tablet server
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
import org.apache.fluss.lake.committer.LakeCommitResult;
import org.apache.fluss.lake.committer.LakeCommitter;
import org.apache.fluss.lake.committer.TieringStats;
import org.apache.fluss.lake.writer.LakeTieringFactory;
import org.apache.fluss.lake.writer.LakeWriter;
import org.apache.fluss.metadata.TableBucket;
Expand Down Expand Up @@ -94,6 +95,22 @@ public class TieringCommitOperator<WriteResult, Committable>
private final Map<Long, List<TableBucketWriteResult<WriteResult>>>
collectedTableBucketWriteResults;

/**
* The result of one table's commit round, holding the lake committable (nullable for empty
* commits where no data was written) and the associated tiering statistics.
*/
private final class CommitResult {
/** The lake committable, or {@code null} if nothing was written in this round. */
@Nullable final Committable committable;
/** Per-table tiering statistics collected during this round. */
@Nullable final TieringStats stats;

CommitResult(@Nullable Committable committable, @Nullable TieringStats stats) {
this.committable = committable;
this.stats = stats;
}
}

public TieringCommitOperator(
StreamOperatorParameters<CommittableMessage<Committable>> parameters,
Configuration flussConf,
Expand Down Expand Up @@ -135,18 +152,20 @@ public void processElement(StreamRecord<TableBucketWriteResult<WriteResult>> str

if (committableWriteResults != null) {
try {
Committable committable =
CommitResult commitResult =
commitWriteResults(
tableId,
tableBucketWriteResult.tablePath(),
committableWriteResults);
// only emit when committable is not-null
if (committable != null) {
output.collect(new StreamRecord<>(new CommittableMessage<>(committable)));
// only emit downstream when actual data was written
if (commitResult.committable != null) {
output.collect(
new StreamRecord<>(new CommittableMessage<>(commitResult.committable)));
}
// notify that the table id has been finished tier
operatorEventGateway.sendEventToCoordinator(
new SourceEventWrapper(new FinishedTieringEvent(tableId)));
new SourceEventWrapper(
new FinishedTieringEvent(tableId, commitResult.stats)));
} catch (Exception e) {
// if any exception happens, send to source coordinator to mark it as failed
operatorEventGateway.sendEventToCoordinator(
Expand All @@ -162,28 +181,31 @@ public void processElement(StreamRecord<TableBucketWriteResult<WriteResult>> str
}
}

@Nullable
private Committable commitWriteResults(
/**
* Commits the collected write results for one table to the lake and Fluss.
*
* <p>Always returns a non-null {@link CommitResult}. When all buckets produced no data (empty
* commit), {@link CommitResult#committable} is {@code null} and stats are {@link
* TieringStats#UNKNOWN}.
*/
private CommitResult commitWriteResults(
long tableId,
TablePath tablePath,
List<TableBucketWriteResult<WriteResult>> committableWriteResults)
throws Exception {
// filter out non-null write result
committableWriteResults =
// filter down to buckets that actually produced data
List<TableBucketWriteResult<WriteResult>> nonEmptyResults =
committableWriteResults.stream()
.filter(
writeResultTableBucketWriteResult ->
writeResultTableBucketWriteResult.writeResult() != null)
.filter(r -> r.writeResult() != null)
.collect(Collectors.toList());

// empty, means all write result is null, which is a empty commit,
// return null to skip the empty commit
if (committableWriteResults.isEmpty()) {
// all buckets were empty — nothing to commit to the lake
if (nonEmptyResults.isEmpty()) {
LOG.info(
"Commit tiering write results is empty for table {}, table path {}",
tableId,
tablePath);
return null;
return new CommitResult(null, null);
Comment thread
luoyuxia marked this conversation as resolved.
}

// Check if the table was dropped and recreated during tiering.
Expand All @@ -202,18 +224,15 @@ private Committable commitWriteResults(
try (LakeCommitter<WriteResult, Committable> lakeCommitter =
lakeTieringFactory.createLakeCommitter(
new TieringCommitterInitContext(
tablePath,
admin.getTableInfo(tablePath).get(),
lakeTieringConfig,
flussConfig))) {
tablePath, currentTableInfo, lakeTieringConfig, flussConfig))) {
List<WriteResult> writeResults =
committableWriteResults.stream()
nonEmptyResults.stream()
.map(TableBucketWriteResult::writeResult)
.collect(Collectors.toList());

Map<TableBucket, Long> logEndOffsets = new HashMap<>();
Map<TableBucket, Long> logMaxTieredTimestamps = new HashMap<>();
for (TableBucketWriteResult<WriteResult> writeResult : committableWriteResults) {
for (TableBucketWriteResult<WriteResult> writeResult : nonEmptyResults) {
TableBucket tableBucket = writeResult.tableBucket();
logEndOffsets.put(tableBucket, writeResult.logEndOffset());
logMaxTieredTimestamps.put(tableBucket, writeResult.maxTimestamp());
Expand Down Expand Up @@ -251,7 +270,7 @@ private Committable commitWriteResults(
lakeBucketTieredOffsetsFile,
logEndOffsets,
logMaxTieredTimestamps);
return committable;
return new CommitResult(committable, lakeCommitResult.getTieringStats());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,37 @@

package org.apache.fluss.flink.tiering.event;

import org.apache.fluss.lake.committer.TieringStats;

import org.apache.flink.api.connector.source.SourceEvent;

import javax.annotation.Nullable;

/** SourceEvent used to represent a Fluss table has been tiered finished. */
public class FinishedTieringEvent implements SourceEvent {

private static final long serialVersionUID = 1L;

private final long tableId;

public FinishedTieringEvent(long tableId) {
/** Statistics collected during this tiering round. */
@Nullable private final TieringStats stats;

public FinishedTieringEvent(long tableId, @Nullable TieringStats stats) {
this.tableId = tableId;
this.stats = stats;
}

public FinishedTieringEvent(long tableId) {
this(tableId, null);
}

public long getTableId() {
return tableId;
}

@Nullable
public TieringStats getStats() {
return stats;
}
}
Loading