bucketScanRecords = scanRecords.records(scanBucket);
if (!bucketScanRecords.isEmpty()) {
final ScanRecord lastRecord = bucketScanRecords.get(bucketScanRecords.size() - 1);
- // We keep the maximum message timestamp in the fetch for calculating lags
- maxConsumerRecordTimestampInFetch =
- Math.max(maxConsumerRecordTimestampInFetch, lastRecord.timestamp());
-
// After processing a record with offset of "stoppingOffset - 1", the split reader
// should not continue fetching because the record with stoppingOffset may not
// exist. Keep polling will just block forever
@@ -474,14 +466,6 @@ public String next() {
}
};
- // We use the timestamp on ScanRecord as the event time to calculate the
- // currentFetchEventTimeLag. This is not totally accurate as the event time could be
- // overridden by user's custom TimestampAssigner configured in source operator.
- if (maxConsumerRecordTimestampInFetch > 0) {
- flinkSourceReaderMetrics.reportRecordEventTime(
- fetchTimestamp - maxConsumerRecordTimestampInFetch);
- }
-
FlinkRecordsWithSplitIds recordsWithSplitIds =
new FlinkRecordsWithSplitIds(
splitRecords,
diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/LogRecordRowIterator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/LogRecordRowIterator.java
new file mode 100644
index 0000000000..55b5b93108
--- /dev/null
+++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/LogRecordRowIterator.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source.reader;
+
+import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.utils.CloseableIterator;
+
+/**
+ * A row iterator that is backed by {@link LogRecord}s and can preserve record metadata.
+ *
+ * Only iterators whose source records have meaningful {@link LogRecord} metadata should
+ * implement this interface. Row-only snapshot iterators should remain plain {@link
+ * CloseableIterator}s.
+ */
+public interface LogRecordRowIterator extends CloseableIterator {
+
+ /** Returns the next log record. */
+ LogRecord nextLogRecord();
+
+ @Override
+ default InternalRow next() {
+ return nextLogRecord().getRow();
+ }
+}
diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/metrics/FlinkSourceReaderMetricsTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/metrics/FlinkSourceReaderMetricsTest.java
index 255a5b6808..1179f15319 100644
--- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/metrics/FlinkSourceReaderMetricsTest.java
+++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/metrics/FlinkSourceReaderMetricsTest.java
@@ -21,6 +21,7 @@
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.testutils.MetricListener;
+import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
import org.junit.jupiter.api.Test;
@@ -64,6 +65,61 @@ void testCurrentOffsetTracking() {
assertCurrentOffset(t3, 15513L, metricListener);
}
+ @Test
+ void testCurrentFetchEventTimeLagTracksMaxLag() {
+ MetricListener metricListener = new MetricListener();
+ FlinkSourceReaderMetrics flinkSourceReaderMetrics =
+ new FlinkSourceReaderMetrics(
+ InternalSourceReaderMetricGroup.mock(metricListener.getMetricGroup()));
+ TableBucket tableBucket0 = new TableBucket(0, 0);
+ TableBucket tableBucket1 = new TableBucket(0, 1);
+
+ long timestamp = System.currentTimeMillis() - 100000L;
+ flinkSourceReaderMetrics.reportRecordEventTime(tableBucket0, timestamp);
+
+ Optional> readerEventTimeLagGauge =
+ metricListener.getGauge(MetricNames.CURRENT_FETCH_EVENT_TIME_LAG);
+ Optional> bucket0EventTimeLagGauge =
+ metricListener.getGauge(
+ FLUSS_METRIC_GROUP,
+ READER_METRIC_GROUP,
+ BUCKET_GROUP,
+ String.valueOf(tableBucket0.getBucket()),
+ MetricNames.CURRENT_FETCH_EVENT_TIME_LAG);
+ assertThat(readerEventTimeLagGauge).isPresent();
+ assertThat(bucket0EventTimeLagGauge).isPresent();
+ long readerEventTimeLag = readerEventTimeLagGauge.get().getValue();
+ long bucket0EventTimeLag = bucket0EventTimeLagGauge.get().getValue();
+
+ flinkSourceReaderMetrics.reportRecordEventTime(tableBucket0, timestamp - 100000L);
+ long maxReaderEventTimeLag = readerEventTimeLagGauge.get().getValue();
+ assertThat(maxReaderEventTimeLag).isGreaterThan(readerEventTimeLag);
+ assertThat((long) bucket0EventTimeLagGauge.get().getValue())
+ .isGreaterThan(bucket0EventTimeLag);
+
+ long newerTimestamp = System.currentTimeMillis();
+ flinkSourceReaderMetrics.reportRecordEventTime(tableBucket1, newerTimestamp);
+ Optional> bucket1EventTimeLagGauge =
+ metricListener.getGauge(
+ FLUSS_METRIC_GROUP,
+ READER_METRIC_GROUP,
+ BUCKET_GROUP,
+ String.valueOf(tableBucket1.getBucket()),
+ MetricNames.CURRENT_FETCH_EVENT_TIME_LAG);
+ assertThat(bucket1EventTimeLagGauge).isPresent();
+ assertThat((long) readerEventTimeLagGauge.get().getValue())
+ .isEqualTo(maxReaderEventTimeLag);
+ assertThat((long) bucket1EventTimeLagGauge.get().getValue())
+ .isLessThan(maxReaderEventTimeLag);
+
+ long updatedBucket0Timestamp = newerTimestamp - 50000L;
+ flinkSourceReaderMetrics.reportRecordEventTime(tableBucket0, updatedBucket0Timestamp);
+ assertThat((long) readerEventTimeLagGauge.get().getValue())
+ .isEqualTo(maxReaderEventTimeLag);
+ assertThat((long) bucket0EventTimeLagGauge.get().getValue())
+ .isLessThan(maxReaderEventTimeLag);
+ }
+
// ----------- Assertions --------------
private void assertCurrentOffset(
diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/BoundedSplitReaderTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/BoundedSplitReaderTest.java
index 2ecb48ea19..9ee71f25f5 100644
--- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/BoundedSplitReaderTest.java
+++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/BoundedSplitReaderTest.java
@@ -17,7 +17,10 @@
package org.apache.fluss.flink.source.reader;
+import org.apache.fluss.client.table.scanner.ScanRecord;
import org.apache.fluss.client.table.scanner.batch.BatchScanner;
+import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.record.LogRecord;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.row.ProjectedRow;
import org.apache.fluss.row.indexed.IndexedRow;
@@ -143,6 +146,25 @@ void testSizeInBytesWithGenericRow() throws IOException {
}
}
+ @Test
+ void testLakeLogRecordMetadataPreserved() throws IOException {
+ List logRecords = new ArrayList<>();
+ logRecords.add(new ScanRecord(1L, 1000L, ChangeType.INSERT, row(1, "a")));
+ logRecords.add(new ScanRecord(2L, 2000L, ChangeType.DELETE, row(2, "b")));
+
+ TestingLakeLogRecordBatchScanner scanner = new TestingLakeLogRecordBatchScanner(logRecords);
+ BoundedSplitReader reader = new BoundedSplitReader(scanner, 0);
+
+ List records = collectRecords(reader);
+ assertThat(records).hasSize(2);
+ assertThat(records.get(0).record().logOffset()).isEqualTo(1L);
+ assertThat(records.get(0).record().timestamp()).isEqualTo(1000L);
+ assertThat(records.get(0).record().getChangeType()).isEqualTo(ChangeType.INSERT);
+ assertThat(records.get(1).record().logOffset()).isEqualTo(2L);
+ assertThat(records.get(1).record().timestamp()).isEqualTo(2000L);
+ assertThat(records.get(1).record().getChangeType()).isEqualTo(ChangeType.DELETE);
+ }
+
/** A testing {@link BatchScanner} with static returned records. */
private static class TestingBatchScanner implements BatchScanner {
@@ -164,6 +186,51 @@ public void close() throws IOException {
}
}
+ /** A testing {@link BatchScanner} with static returned lake log records. */
+ private static class TestingLakeLogRecordBatchScanner implements BatchScanner {
+
+ private final CloseableIterator rowIterator;
+
+ public TestingLakeLogRecordBatchScanner(List records) {
+ this.rowIterator = new TestingLakeLogRecordIterator(records);
+ }
+
+ @Override
+ @Nullable
+ public CloseableIterator pollBatch(Duration timeout) {
+ return rowIterator.hasNext() ? rowIterator : null;
+ }
+
+ @Override
+ public void close() throws IOException {
+ // do nothing
+ }
+ }
+
+ private static class TestingLakeLogRecordIterator implements LogRecordRowIterator {
+
+ private final CloseableIterator records;
+
+ private TestingLakeLogRecordIterator(List records) {
+ this.records = CloseableIterator.wrap(records.iterator());
+ }
+
+ @Override
+ public void close() {
+ records.close();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return records.hasNext();
+ }
+
+ @Override
+ public LogRecord nextLogRecord() {
+ return records.next();
+ }
+ }
+
private List mockRows(int numRows) {
List rows = new ArrayList<>(numRows);
for (int i = 0; i < numRows; i++) {
diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkRecordsWithSplitIdsTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkRecordsWithSplitIdsTest.java
new file mode 100644
index 0000000000..171ab7bf74
--- /dev/null
+++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkRecordsWithSplitIdsTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source.reader;
+
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.flink.source.metrics.FlinkSourceReaderMetrics;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.utils.CloseableIterator;
+
+import org.apache.flink.metrics.testutils.MetricListener;
+import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashSet;
+
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for {@link FlinkRecordsWithSplitIds}. */
+class FlinkRecordsWithSplitIdsTest {
+
+ @Test
+ void testReportsEventTimeLagForBoundedRecords() {
+ RecordingFlinkSourceReaderMetrics metrics = new RecordingFlinkSourceReaderMetrics();
+ long timestamp = System.currentTimeMillis() - 10000L;
+ RecordAndPos record =
+ new RecordAndPos(
+ new ScanRecord(-1L, timestamp, ChangeType.INSERT, row(1, "a")), 1L);
+ FlinkRecordsWithSplitIds records =
+ new FlinkRecordsWithSplitIds(
+ "split",
+ new TableBucket(1L, 0),
+ CloseableIterator.wrap(Collections.singletonList(record).iterator()),
+ metrics);
+
+ assertThat(records.nextSplit()).isEqualTo("split");
+ assertThat(records.nextRecordFromSplit()).isEqualTo(record);
+ assertThat(metrics.reportedTableBucket).isEqualTo(new TableBucket(1L, 0));
+ assertThat(metrics.reportedTimestamp).isEqualTo(timestamp);
+ }
+
+ @Test
+ void testReportsEventTimeLagForMultiSplitRecords() {
+ RecordingFlinkSourceReaderMetrics metrics = new RecordingFlinkSourceReaderMetrics();
+ TableBucket tableBucket = new TableBucket(1L, 0);
+ long timestamp = System.currentTimeMillis() - 10000L;
+ RecordAndPos record =
+ new RecordAndPos(new ScanRecord(0L, timestamp, ChangeType.INSERT, row(1, "a")), 1L);
+ FlinkRecordsWithSplitIds records =
+ new FlinkRecordsWithSplitIds(
+ Collections.singletonMap(
+ "split",
+ CloseableIterator.wrap(
+ Collections.singletonList(record).iterator())),
+ Collections.singleton("split").iterator(),
+ Collections.singleton(tableBucket).iterator(),
+ new HashSet<>(),
+ metrics);
+
+ assertThat(records.nextSplit()).isEqualTo("split");
+ assertThat(records.nextRecordFromSplit()).isEqualTo(record);
+ assertThat(metrics.reportedTableBucket).isEqualTo(tableBucket);
+ assertThat(metrics.reportedTimestamp).isEqualTo(timestamp);
+ }
+
+ private static class RecordingFlinkSourceReaderMetrics extends FlinkSourceReaderMetrics {
+
+ private TableBucket reportedTableBucket;
+ private long reportedTimestamp = -1L;
+
+ private RecordingFlinkSourceReaderMetrics() {
+ super(InternalSourceReaderMetricGroup.mock(new MetricListener().getMetricGroup()));
+ }
+
+ @Override
+ public void reportRecordEventTime(TableBucket tableBucket, long timestamp) {
+ reportedTableBucket = tableBucket;
+ reportedTimestamp = timestamp;
+ }
+ }
+}