diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/IndexedLakeSplitRecordIterator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/IndexedLakeSplitRecordIterator.java index 4af2b5add1..f5fb6a1624 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/IndexedLakeSplitRecordIterator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/IndexedLakeSplitRecordIterator.java @@ -18,8 +18,8 @@ package org.apache.fluss.flink.lake.reader; +import org.apache.fluss.flink.source.reader.LogRecordRowIterator; import org.apache.fluss.record.LogRecord; -import org.apache.fluss.row.InternalRow; import org.apache.fluss.utils.CloseableIterator; /** @@ -39,7 +39,7 @@ *
  • Maintains iterator semantics for sequential data access * */ -public class IndexedLakeSplitRecordIterator implements CloseableIterator { +public class IndexedLakeSplitRecordIterator implements LogRecordRowIterator { private final CloseableIterator logRecordIterators; private final int currentLakeSplitIndex; @@ -64,7 +64,7 @@ public boolean hasNext() { } @Override - public InternalRow next() { - return logRecordIterators.next().getRow(); + public LogRecord nextLogRecord() { + return logRecordIterators.next(); } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/LakeSnapshotScanner.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/LakeSnapshotScanner.java index 97c57256fa..1b64e7e413 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/LakeSnapshotScanner.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/LakeSnapshotScanner.java @@ -19,6 +19,7 @@ import org.apache.fluss.client.table.scanner.batch.BatchScanner; import org.apache.fluss.flink.lake.split.LakeSnapshotSplit; +import org.apache.fluss.flink.source.reader.LogRecordRowIterator; import org.apache.fluss.lake.source.LakeSource; import org.apache.fluss.lake.source.LakeSplit; import org.apache.fluss.record.LogRecord; @@ -66,7 +67,7 @@ public void close() throws IOException { } } - private static class InternalRowIterator implements CloseableIterator { + private static class InternalRowIterator implements LogRecordRowIterator { private final CloseableIterator recordCloseableIterator; @@ -90,8 +91,8 @@ public boolean hasNext() { } @Override - public InternalRow next() { - return recordCloseableIterator.next().getRow(); + public LogRecord nextLogRecord() { + return recordCloseableIterator.next(); } } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/metrics/FlinkSourceReaderMetrics.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/metrics/FlinkSourceReaderMetrics.java index dc98ada820..56931ed932 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/metrics/FlinkSourceReaderMetrics.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/metrics/FlinkSourceReaderMetrics.java @@ -65,8 +65,9 @@ public class FlinkSourceReaderMetrics { // Map for tracking current consuming offsets private final Map offsets = new HashMap<>(); - // For currentFetchEventTimeLag metric - private volatile long currentFetchEventTimeLag = UNINITIALIZED; + private volatile long maxFetchEventTimeLag = UNINITIALIZED; + // Map for tracking current fetch event time lag by table bucket + private final Map currentFetchEventTimeLags = new HashMap<>(); public FlinkSourceReaderMetrics(SourceReaderMetricGroup sourceReaderMetricGroup) { this.sourceReaderMetricGroup = sourceReaderMetricGroup; @@ -74,17 +75,20 @@ public FlinkSourceReaderMetrics(SourceReaderMetricGroup sourceReaderMetricGroup) sourceReaderMetricGroup.addGroup(FLUSS_METRIC_GROUP).addGroup(READER_METRIC_GROUP); } - public void reportRecordEventTime(long lag) { - if (currentFetchEventTimeLag == UNINITIALIZED) { - // Lazily register the currentFetchEventTimeLag - // Set the lag before registering the metric to avoid metric reporter getting - // the uninitialized value - currentFetchEventTimeLag = lag; - sourceReaderMetricGroup.gauge( - MetricNames.CURRENT_FETCH_EVENT_TIME_LAG, () -> currentFetchEventTimeLag); - return; + public void reportRecordEventTime(TableBucket tableBucket, long timestamp) { + if (!currentFetchEventTimeLags.containsKey(tableBucket)) { + registerEventTimeLagMetricsForTableBucket(tableBucket); + } + long lag = System.currentTimeMillis() - timestamp; + currentFetchEventTimeLags.put(tableBucket, lag); + + if (lag > maxFetchEventTimeLag) { + if (maxFetchEventTimeLag == UNINITIALIZED) { + sourceReaderMetricGroup.gauge( + MetricNames.CURRENT_FETCH_EVENT_TIME_LAG, () -> maxFetchEventTimeLag); + } + maxFetchEventTimeLag = lag; } - currentFetchEventTimeLag = lag; } public void registerTableBucket(TableBucket tableBucket) { @@ -105,16 +109,26 @@ public void recordCurrentOffset(TableBucket tb, long offset) { // -------- Helper functions -------- private void registerOffsetMetricsForTableBucket(TableBucket tableBucket) { + getMetricGroupForTableBucket(tableBucket) + .gauge( + CURRENT_OFFSET_METRIC_GAUGE, + () -> offsets.getOrDefault(tableBucket, INITIAL_OFFSET)); + } + + private void registerEventTimeLagMetricsForTableBucket(TableBucket tableBucket) { + getMetricGroupForTableBucket(tableBucket) + .gauge( + MetricNames.CURRENT_FETCH_EVENT_TIME_LAG, + () -> currentFetchEventTimeLags.getOrDefault(tableBucket, UNINITIALIZED)); + } + + private MetricGroup getMetricGroupForTableBucket(TableBucket tableBucket) { final MetricGroup metricGroup = tableBucket.getPartitionId() == null ? this.flussSourceReaderMetricGroup : this.flussSourceReaderMetricGroup.addGroup( PARTITION_GROUP, String.valueOf(tableBucket.getPartitionId())); - final MetricGroup bucketGroup = - metricGroup.addGroup(BUCKET_GROUP, String.valueOf(tableBucket.getBucket())); - bucketGroup.gauge( - CURRENT_OFFSET_METRIC_GAUGE, - () -> offsets.getOrDefault(tableBucket, INITIAL_OFFSET)); + return metricGroup.addGroup(BUCKET_GROUP, String.valueOf(tableBucket.getBucket())); } private void checkTableBucketTracked(TableBucket tableBucket) { diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/BoundedSplitReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/BoundedSplitReader.java index 0f842da069..2d366c66e0 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/BoundedSplitReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/BoundedSplitReader.java @@ -21,6 +21,7 @@ import org.apache.fluss.client.table.scanner.batch.BatchScanner; import org.apache.fluss.flink.lake.reader.IndexedLakeSplitRecordIterator; import org.apache.fluss.flink.source.split.SnapshotSplit; +import org.apache.fluss.record.LogRecord; import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.MemoryAwareGetters; import org.apache.fluss.row.ProjectedRow; @@ -155,7 +156,16 @@ public boolean hasNext() { @Override public ScanRecord next() { - InternalRow row = rowIterator.next(); + LogRecord logRecord = null; + InternalRow row; + // Most bounded scanners expose row-only iterators. LogRecord-backed iterators preserve + // metadata such as timestamp for event-time lag reporting. + if (rowIterator instanceof LogRecordRowIterator) { + logRecord = ((LogRecordRowIterator) rowIterator).nextLogRecord(); + row = logRecord.getRow(); + } else { + row = rowIterator.next(); + } // Extract row size when the underlying row supports it (e.g. BinaryRow in // KV-snapshot paths); falls back to -1 for other row types. int sizeInBytes = ScanRecord.UNKNOWN_SIZE_IN_BYTES; @@ -164,6 +174,14 @@ public ScanRecord next() { if (underlyingRow instanceof MemoryAwareGetters) { sizeInBytes = ((MemoryAwareGetters) underlyingRow).getSizeInBytes(); } + if (logRecord != null) { + return new ScanRecord( + logRecord.logOffset(), + logRecord.timestamp(), + logRecord.getChangeType(), + row, + sizeInBytes); + } return new ScanRecord(row, sizeInBytes); } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkRecordsWithSplitIds.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkRecordsWithSplitIds.java index cf5c60a671..458a0329d4 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkRecordsWithSplitIds.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkRecordsWithSplitIds.java @@ -142,6 +142,11 @@ public RecordAndPos nextRecordFromSplit() { flinkSourceReaderMetrics.recordCurrentOffset(currentTableBucket, offset); } + long timestamp = recordAndPos.record().timestamp(); + if (timestamp > 0) { + flinkSourceReaderMetrics.reportRecordEventTime(currentTableBucket, timestamp); + } + return recordAndPos; } return null; diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java index 90f2b6e9a6..a12c81152d 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java @@ -424,10 +424,6 @@ private void checkSnapshotSplitOrStartNext() { } private FlinkRecordsWithSplitIds forLogRecords(ScanRecords scanRecords) { - // For calculating the currentFetchEventTimeLag - long fetchTimestamp = System.currentTimeMillis(); - long maxConsumerRecordTimestampInFetch = -1; - Map> splitRecords = new HashMap<>(); Map stoppingOffsets = new HashMap<>(); Set finishedSplits = new HashSet<>(); @@ -445,10 +441,6 @@ private FlinkRecordsWithSplitIds forLogRecords(ScanRecords scanRecords) { List bucketScanRecords = scanRecords.records(scanBucket); if (!bucketScanRecords.isEmpty()) { final ScanRecord lastRecord = bucketScanRecords.get(bucketScanRecords.size() - 1); - // We keep the maximum message timestamp in the fetch for calculating lags - maxConsumerRecordTimestampInFetch = - Math.max(maxConsumerRecordTimestampInFetch, lastRecord.timestamp()); - // After processing a record with offset of "stoppingOffset - 1", the split reader // should not continue fetching because the record with stoppingOffset may not // exist. Keep polling will just block forever @@ -474,14 +466,6 @@ public String next() { } }; - // We use the timestamp on ScanRecord as the event time to calculate the - // currentFetchEventTimeLag. This is not totally accurate as the event time could be - // overridden by user's custom TimestampAssigner configured in source operator. - if (maxConsumerRecordTimestampInFetch > 0) { - flinkSourceReaderMetrics.reportRecordEventTime( - fetchTimestamp - maxConsumerRecordTimestampInFetch); - } - FlinkRecordsWithSplitIds recordsWithSplitIds = new FlinkRecordsWithSplitIds( splitRecords, diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/LogRecordRowIterator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/LogRecordRowIterator.java new file mode 100644 index 0000000000..55b5b93108 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/LogRecordRowIterator.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source.reader; + +import org.apache.fluss.record.LogRecord; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.utils.CloseableIterator; + +/** + * A row iterator that is backed by {@link LogRecord}s and can preserve record metadata. + * + *

    Only iterators whose source records have meaningful {@link LogRecord} metadata should + * implement this interface. Row-only snapshot iterators should remain plain {@link + * CloseableIterator}s. + */ +public interface LogRecordRowIterator extends CloseableIterator { + + /** Returns the next log record. */ + LogRecord nextLogRecord(); + + @Override + default InternalRow next() { + return nextLogRecord().getRow(); + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/metrics/FlinkSourceReaderMetricsTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/metrics/FlinkSourceReaderMetricsTest.java index 255a5b6808..1179f15319 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/metrics/FlinkSourceReaderMetricsTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/metrics/FlinkSourceReaderMetricsTest.java @@ -21,6 +21,7 @@ import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.testutils.MetricListener; +import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup; import org.junit.jupiter.api.Test; @@ -64,6 +65,61 @@ void testCurrentOffsetTracking() { assertCurrentOffset(t3, 15513L, metricListener); } + @Test + void testCurrentFetchEventTimeLagTracksMaxLag() { + MetricListener metricListener = new MetricListener(); + FlinkSourceReaderMetrics flinkSourceReaderMetrics = + new FlinkSourceReaderMetrics( + InternalSourceReaderMetricGroup.mock(metricListener.getMetricGroup())); + TableBucket tableBucket0 = new TableBucket(0, 0); + TableBucket tableBucket1 = new TableBucket(0, 1); + + long timestamp = System.currentTimeMillis() - 100000L; + flinkSourceReaderMetrics.reportRecordEventTime(tableBucket0, timestamp); + + Optional> readerEventTimeLagGauge = + metricListener.getGauge(MetricNames.CURRENT_FETCH_EVENT_TIME_LAG); + Optional> bucket0EventTimeLagGauge = + metricListener.getGauge( + FLUSS_METRIC_GROUP, + READER_METRIC_GROUP, + BUCKET_GROUP, + String.valueOf(tableBucket0.getBucket()), + MetricNames.CURRENT_FETCH_EVENT_TIME_LAG); + assertThat(readerEventTimeLagGauge).isPresent(); + assertThat(bucket0EventTimeLagGauge).isPresent(); + long readerEventTimeLag = readerEventTimeLagGauge.get().getValue(); + long bucket0EventTimeLag = bucket0EventTimeLagGauge.get().getValue(); + + flinkSourceReaderMetrics.reportRecordEventTime(tableBucket0, timestamp - 100000L); + long maxReaderEventTimeLag = readerEventTimeLagGauge.get().getValue(); + assertThat(maxReaderEventTimeLag).isGreaterThan(readerEventTimeLag); + assertThat((long) bucket0EventTimeLagGauge.get().getValue()) + .isGreaterThan(bucket0EventTimeLag); + + long newerTimestamp = System.currentTimeMillis(); + flinkSourceReaderMetrics.reportRecordEventTime(tableBucket1, newerTimestamp); + Optional> bucket1EventTimeLagGauge = + metricListener.getGauge( + FLUSS_METRIC_GROUP, + READER_METRIC_GROUP, + BUCKET_GROUP, + String.valueOf(tableBucket1.getBucket()), + MetricNames.CURRENT_FETCH_EVENT_TIME_LAG); + assertThat(bucket1EventTimeLagGauge).isPresent(); + assertThat((long) readerEventTimeLagGauge.get().getValue()) + .isEqualTo(maxReaderEventTimeLag); + assertThat((long) bucket1EventTimeLagGauge.get().getValue()) + .isLessThan(maxReaderEventTimeLag); + + long updatedBucket0Timestamp = newerTimestamp - 50000L; + flinkSourceReaderMetrics.reportRecordEventTime(tableBucket0, updatedBucket0Timestamp); + assertThat((long) readerEventTimeLagGauge.get().getValue()) + .isEqualTo(maxReaderEventTimeLag); + assertThat((long) bucket0EventTimeLagGauge.get().getValue()) + .isLessThan(maxReaderEventTimeLag); + } + // ----------- Assertions -------------- private void assertCurrentOffset( diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/BoundedSplitReaderTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/BoundedSplitReaderTest.java index 2ecb48ea19..9ee71f25f5 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/BoundedSplitReaderTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/BoundedSplitReaderTest.java @@ -17,7 +17,10 @@ package org.apache.fluss.flink.source.reader; +import org.apache.fluss.client.table.scanner.ScanRecord; import org.apache.fluss.client.table.scanner.batch.BatchScanner; +import org.apache.fluss.record.ChangeType; +import org.apache.fluss.record.LogRecord; import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.ProjectedRow; import org.apache.fluss.row.indexed.IndexedRow; @@ -143,6 +146,25 @@ void testSizeInBytesWithGenericRow() throws IOException { } } + @Test + void testLakeLogRecordMetadataPreserved() throws IOException { + List logRecords = new ArrayList<>(); + logRecords.add(new ScanRecord(1L, 1000L, ChangeType.INSERT, row(1, "a"))); + logRecords.add(new ScanRecord(2L, 2000L, ChangeType.DELETE, row(2, "b"))); + + TestingLakeLogRecordBatchScanner scanner = new TestingLakeLogRecordBatchScanner(logRecords); + BoundedSplitReader reader = new BoundedSplitReader(scanner, 0); + + List records = collectRecords(reader); + assertThat(records).hasSize(2); + assertThat(records.get(0).record().logOffset()).isEqualTo(1L); + assertThat(records.get(0).record().timestamp()).isEqualTo(1000L); + assertThat(records.get(0).record().getChangeType()).isEqualTo(ChangeType.INSERT); + assertThat(records.get(1).record().logOffset()).isEqualTo(2L); + assertThat(records.get(1).record().timestamp()).isEqualTo(2000L); + assertThat(records.get(1).record().getChangeType()).isEqualTo(ChangeType.DELETE); + } + /** A testing {@link BatchScanner} with static returned records. */ private static class TestingBatchScanner implements BatchScanner { @@ -164,6 +186,51 @@ public void close() throws IOException { } } + /** A testing {@link BatchScanner} with static returned lake log records. */ + private static class TestingLakeLogRecordBatchScanner implements BatchScanner { + + private final CloseableIterator rowIterator; + + public TestingLakeLogRecordBatchScanner(List records) { + this.rowIterator = new TestingLakeLogRecordIterator(records); + } + + @Override + @Nullable + public CloseableIterator pollBatch(Duration timeout) { + return rowIterator.hasNext() ? rowIterator : null; + } + + @Override + public void close() throws IOException { + // do nothing + } + } + + private static class TestingLakeLogRecordIterator implements LogRecordRowIterator { + + private final CloseableIterator records; + + private TestingLakeLogRecordIterator(List records) { + this.records = CloseableIterator.wrap(records.iterator()); + } + + @Override + public void close() { + records.close(); + } + + @Override + public boolean hasNext() { + return records.hasNext(); + } + + @Override + public LogRecord nextLogRecord() { + return records.next(); + } + } + private List mockRows(int numRows) { List rows = new ArrayList<>(numRows); for (int i = 0; i < numRows; i++) { diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkRecordsWithSplitIdsTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkRecordsWithSplitIdsTest.java new file mode 100644 index 0000000000..171ab7bf74 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkRecordsWithSplitIdsTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source.reader; + +import org.apache.fluss.client.table.scanner.ScanRecord; +import org.apache.fluss.flink.source.metrics.FlinkSourceReaderMetrics; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.record.ChangeType; +import org.apache.fluss.utils.CloseableIterator; + +import org.apache.flink.metrics.testutils.MetricListener; +import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashSet; + +import static org.apache.fluss.testutils.DataTestUtils.row; +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit tests for {@link FlinkRecordsWithSplitIds}. */ +class FlinkRecordsWithSplitIdsTest { + + @Test + void testReportsEventTimeLagForBoundedRecords() { + RecordingFlinkSourceReaderMetrics metrics = new RecordingFlinkSourceReaderMetrics(); + long timestamp = System.currentTimeMillis() - 10000L; + RecordAndPos record = + new RecordAndPos( + new ScanRecord(-1L, timestamp, ChangeType.INSERT, row(1, "a")), 1L); + FlinkRecordsWithSplitIds records = + new FlinkRecordsWithSplitIds( + "split", + new TableBucket(1L, 0), + CloseableIterator.wrap(Collections.singletonList(record).iterator()), + metrics); + + assertThat(records.nextSplit()).isEqualTo("split"); + assertThat(records.nextRecordFromSplit()).isEqualTo(record); + assertThat(metrics.reportedTableBucket).isEqualTo(new TableBucket(1L, 0)); + assertThat(metrics.reportedTimestamp).isEqualTo(timestamp); + } + + @Test + void testReportsEventTimeLagForMultiSplitRecords() { + RecordingFlinkSourceReaderMetrics metrics = new RecordingFlinkSourceReaderMetrics(); + TableBucket tableBucket = new TableBucket(1L, 0); + long timestamp = System.currentTimeMillis() - 10000L; + RecordAndPos record = + new RecordAndPos(new ScanRecord(0L, timestamp, ChangeType.INSERT, row(1, "a")), 1L); + FlinkRecordsWithSplitIds records = + new FlinkRecordsWithSplitIds( + Collections.singletonMap( + "split", + CloseableIterator.wrap( + Collections.singletonList(record).iterator())), + Collections.singleton("split").iterator(), + Collections.singleton(tableBucket).iterator(), + new HashSet<>(), + metrics); + + assertThat(records.nextSplit()).isEqualTo("split"); + assertThat(records.nextRecordFromSplit()).isEqualTo(record); + assertThat(metrics.reportedTableBucket).isEqualTo(tableBucket); + assertThat(metrics.reportedTimestamp).isEqualTo(timestamp); + } + + private static class RecordingFlinkSourceReaderMetrics extends FlinkSourceReaderMetrics { + + private TableBucket reportedTableBucket; + private long reportedTimestamp = -1L; + + private RecordingFlinkSourceReaderMetrics() { + super(InternalSourceReaderMetricGroup.mock(new MetricListener().getMetricGroup())); + } + + @Override + public void reportRecordEventTime(TableBucket tableBucket, long timestamp) { + reportedTableBucket = tableBucket; + reportedTimestamp = timestamp; + } + } +}