fields = new ArrayList<>();
+ fields.add(new DataField(0, "pt", new IntType()));
+ fields.add(new DataField(1, "k", new IntType()));
+ for (int i = 0; i < p.valueCount; i++) {
+ fields.add(new DataField(2 + i, "f" + i, DataTypes.STRING()));
+ }
+
+ tableOptions.set(CoreOptions.BUCKET, p.numBuckets);
+ tableOptions.set(CoreOptions.WRITE_ONLY, false);
+ tableOptions.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 10);
+
+ // Primary keys must include all partition keys, so PK = (pt, k).
+ Schema schema =
+ new Schema(
+ fields,
+ Collections.singletonList("pt"),
+ Arrays.asList("pt", "k"),
+ tableOptions.toMap(),
+ "");
+ Identifier identifier = Identifier.create(database, tableName);
+ catalog.createTable(identifier, schema, false);
+ return catalog.getTable(identifier);
+ }
+
+ private void populateTable(Table table, BenchParams p) throws Exception {
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+ RandomDataGenerator random = new RandomDataGenerator();
+ for (int batchStart = 0;
+ batchStart < p.numPartitions;
+ batchStart += p.commitBatchPartitions) {
+ int batchEnd = Math.min(batchStart + p.commitBatchPartitions, p.numPartitions);
+ try (BatchTableWrite write = writeBuilder.newWrite();
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+ for (int pt = batchStart; pt < batchEnd; pt++) {
+ for (int k = 0; k < p.rowsPerPartition; k++) {
+ write.write(makeRow(pt, k, random, p));
+ }
+ }
+ commit.commit(write.prepareCommit());
+ }
+ }
+ }
+
+ private InternalRow makeRow(int pt, int k, RandomDataGenerator random, BenchParams p) {
+ GenericRow row = new GenericRow(2 + p.valueCount);
+ row.setField(0, pt);
+ row.setField(1, k);
+ for (int i = 0; i < p.valueCount; i++) {
+ row.setField(2 + i, BinaryString.fromString(random.nextHexString(p.valueCharCount)));
+ }
+ return row;
+ }
+
+ /**
+ * Reset the manifest {@link SegmentsCache} in play for this table. Always called at the start
+ * of every iteration so each measured iteration pays the cold-populate cost (the
+ * production-onset condition we're trying to reproduce).
+ *
+ * If a {@link SegmentsCache} is attached (per-table, attached by {@code
+ * CachingCatalog.putTableCache} when {@code CACHE_ENABLED=true}), it is replaced with a fresh
+ * instance preserving {@code pageSize} / {@code maxMemorySize} / {@code maxElementSize} /
+ * {@code ttl} / {@code softValues}. Replacing (rather than {@code invalidateAll()}-ing)
+ * sidesteps Caffeine's asynchronous eviction so the cold state is deterministic. A no-op when
+ * no cache is attached.
+ */
+ private static void resetCachesForIteration(FileStoreTable fst) {
+ SegmentsCache original = fst.getManifestCache();
+ if (original != null) {
+ fst.setManifestCache(
+ SegmentsCache.create(
+ original.pageSize(),
+ original.maxMemorySize(),
+ original.maxElementSize(),
+ original.ttl(),
+ original.softValues()));
+ }
+ }
+
+ /**
+ * Print a per-iteration footprint summary: total manifest directory bytes on disk (split by
+ * file-name prefix), the table's {@link SegmentsCache} accounting bytes, the just-sampled heap
+ * before/peak and post-GC usage, and memory-to-disk plus {@code Peak/After-GC} (spike
+ * multiplier) ratios.
+ *
+ * Caveats:
+ *
+ *
+ * - {@link SegmentsCache#totalCacheBytes()} walks {@code cache.asMap()} and re-applies the
+ * weigher per entry — it's an O(N) snapshot, fine here but not a free read.
+ *
- Peak is per-pool sum: {@code MemoryPoolMXBean.getPeakUsage()} is per-pool and peaks
+ * don't necessarily coincide across pools; summing slightly overcounts. Accurate enough
+ * for order-of-magnitude spike comparison.
+ *
+ */
+ private FootprintSample printCacheFootprint(
+ String label, FileStoreTable fst, long before, long peak, long afterGc)
+ throws Exception {
+ DiskFootprint disk = DiskFootprint.scan(fst);
+
+ SegmentsCache sc = fst.getManifestCache();
+ Long segmentsCacheBytes = sc == null ? null : sc.totalCacheBytes();
+ String segmentsCacheLine;
+ if (sc == null) {
+ segmentsCacheLine =
+ "SegmentsCache n/a (no manifest cache attached to table — cache disabled)";
+ } else {
+ segmentsCacheLine =
+ String.format(
+ "SegmentsCache bytes=%,d (estimatedSize=%d, maxMemory=%s, maxElementSize=%d)",
+ segmentsCacheBytes,
+ sc.estimatedSize(),
+ sc.maxMemorySize(),
+ sc.maxElementSize());
+ }
+
+ System.out.println();
+ System.out.println("Manifest cache footprint (" + label + "):");
+ printDiskLine(disk);
+ System.out.println(" " + segmentsCacheLine);
+ System.out.printf(
+ " Heap before=%,d bytes, peak=%,d bytes, after-gc=%,d bytes%n",
+ before, peak, afterGc);
+ System.out.println();
+
+ return new FootprintSample(disk, segmentsCacheBytes, before, peak, afterGc);
+ }
+
+ private static void printDiskLine(DiskFootprint disk) {
+ System.out.printf(
+ " Disk manifests=%,d bytes (%d files), manifest-lists=%,d bytes (%d files), index-manifests=%,d bytes (%d files); total=%,d bytes%n",
+ disk.manifestBytes,
+ disk.manifestCount,
+ disk.manifestListBytes,
+ disk.manifestListCount,
+ disk.indexManifestBytes,
+ disk.indexManifestCount,
+ disk.total);
+ }
+
+ /**
+ * Print one-block aggregate over the measured iterations (warmup iterations skipped).
+ * Reports Disk (constant — printed once), {@link SegmentsCache} bytes (avg/min/max + avg ratio
+ * to disk), and Heap before/peak/after-GC (avg/min/max + Peak/After-GC spike multiplier +
+ * heap/disk ratios).
+ */
+ private void printAggregateFootprint(
+ String name, BenchParams p, List samples) {
+ int start = p.numWarmupIters;
+ int end = samples.size();
+ if (start >= end) {
+ return;
+ }
+ int n = end - start;
+ DiskFootprint disk = samples.get(start).disk;
+
+ // SegmentsCache: aggregate non-null sample bytes; treat as absent if every sample is null.
+ LongStats sc = new LongStats();
+ LongStats before = new LongStats();
+ LongStats peak = new LongStats();
+ LongStats afterGc = new LongStats();
+
+ for (int i = start; i < end; i++) {
+ FootprintSample s = samples.get(i);
+ if (s.segmentsCacheBytes != null) {
+ sc.accept(s.segmentsCacheBytes);
+ }
+ before.accept(s.beforeHeap);
+ peak.accept(s.peakHeap);
+ afterGc.accept(s.afterGcHeap);
+ }
+
+ System.out.println(
+ "Manifest cache footprint aggregate (" + name + ", " + n + " measured iters):");
+ printDiskLine(disk);
+ if (sc.count > 0) {
+ System.out.printf(
+ " SegmentsCache bytes avg=%,d, min=%,d, max=%,d%n",
+ sc.avg(), sc.min, sc.max);
+ } else {
+ System.out.println(
+ " SegmentsCache n/a (no manifest cache attached to table — cache disabled)");
+ }
+ System.out.printf(
+ " Heap bytes before avg=%,d, min=%,d, max=%,d%n",
+ before.avg(), before.min, before.max);
+ System.out.printf(
+ " Heap bytes peak avg=%,d, min=%,d, max=%,d%n",
+ peak.avg(), peak.min, peak.max);
+ System.out.printf(
+ " Heap bytes after-gc avg=%,d, min=%,d, max=%,d%n",
+ afterGc.avg(), afterGc.min, afterGc.max);
+ System.out.println();
+ }
+
+ private static long currentHeapUsage() {
+ return ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();
+ }
+
+ private static void fullGc() {
+ System.gc();
+ System.runFinalization();
+ System.gc();
+ }
+
+ private static void resetHeapPeak() {
+ for (MemoryPoolMXBean pool : ManagementFactory.getMemoryPoolMXBeans()) {
+ if (pool.getType() == MemoryType.HEAP) {
+ pool.resetPeakUsage();
+ }
+ }
+ }
+
+ private static long peakHeapUsage() {
+ long peak = 0;
+ for (MemoryPoolMXBean pool : ManagementFactory.getMemoryPoolMXBeans()) {
+ if (pool.getType() == MemoryType.HEAP) {
+ peak += pool.getPeakUsage().getUsed();
+ }
+ }
+ return peak;
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
index 9fc7b124f905..afe4ed8ae6de 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
@@ -18,6 +18,7 @@
package org.apache.paimon.catalog;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.MemorySize;
@@ -52,6 +53,7 @@
import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_MAX_MEMORY;
import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_MEMORY;
import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_THRESHOLD;
+import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SOFT_VALUES;
import static org.apache.paimon.options.CatalogOptions.CACHE_PARTITION_MAX_NUM;
import static org.apache.paimon.options.CatalogOptions.CACHE_SNAPSHOT_MAX_NUM_PER_TABLE;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
@@ -97,7 +99,14 @@ public CachingCatalog(Catalog wrapped, Options options) {
}
this.snapshotMaxNumPerTable = options.get(CACHE_SNAPSHOT_MAX_NUM_PER_TABLE);
- this.manifestCache = SegmentsCache.create(manifestMaxMemory, manifestCacheThreshold);
+ boolean manifestCacheSoftValues = options.get(CACHE_MANIFEST_SOFT_VALUES);
+ this.manifestCache =
+ SegmentsCache.create(
+ (int) CoreOptions.PAGE_SIZE.defaultValue().getBytes(),
+ manifestMaxMemory,
+ manifestCacheThreshold,
+ expireAfterAccess,
+ manifestCacheSoftValues);
this.cachedPartitionMaxNum = options.get(CACHE_PARTITION_MAX_NUM);
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SegmentsCache.java b/paimon-core/src/main/java/org/apache/paimon/utils/SegmentsCache.java
index 12f65036bf09..cfea1ea219bc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SegmentsCache.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SegmentsCache.java
@@ -26,6 +26,8 @@
import javax.annotation.Nullable;
+import java.time.Duration;
+
import static org.apache.paimon.CoreOptions.PAGE_SIZE;
/** Cache {@link Segments}. */
@@ -37,18 +39,41 @@ public class SegmentsCache {
private final Cache cache;
private final MemorySize maxMemorySize;
private final long maxElementSize;
+ @Nullable private final Duration expireAfterAccess;
+ private final boolean softValues;
public SegmentsCache(int pageSize, MemorySize maxMemorySize, long maxElementSize) {
+ this(pageSize, maxMemorySize, maxElementSize, null, true);
+ }
+
+ public SegmentsCache(
+ int pageSize,
+ MemorySize maxMemorySize,
+ long maxElementSize,
+ @Nullable Duration expireAfterAccess,
+ boolean softValues) {
this.pageSize = pageSize;
- this.cache =
+ Caffeine builder =
Caffeine.newBuilder()
- .softValues()
.weigher(this::weigh)
.maximumWeight(maxMemorySize.getBytes())
- .executor(Runnable::run)
- .build();
+ .executor(Runnable::run);
+ // No idle TTL is applied unless one is explicitly supplied, preserving the original
+ // behaviour where entries are only evicted by weight (or GC, when soft values are on).
+ if (expireAfterAccess != null) {
+ builder.expireAfterAccess(expireAfterAccess);
+ }
+ // When soft values are enabled, entries may be reclaimed by the GC under memory pressure,
+ // which can trigger a cache-thrash spiral. Disabling them pins the working set with strong
+ // references, breaking the spiral at the cost of deterministic heap occupancy.
+ if (softValues) {
+ builder.softValues();
+ }
+ this.cache = builder.build();
this.maxMemorySize = maxMemorySize;
this.maxElementSize = maxElementSize;
+ this.expireAfterAccess = expireAfterAccess;
+ this.softValues = softValues;
}
public int pageSize() {
@@ -63,6 +88,15 @@ public long maxElementSize() {
return maxElementSize;
}
+ @Nullable
+ public Duration ttl() {
+ return expireAfterAccess;
+ }
+
+ public boolean softValues() {
+ return softValues;
+ }
+
@Nullable
public Segments getIfPresents(T key) {
return cache.getIfPresent(key);
@@ -84,11 +118,22 @@ public static SegmentsCache create(MemorySize maxMemorySize, long maxElem
@Nullable
public static SegmentsCache create(
int pageSize, MemorySize maxMemorySize, long maxElementSize) {
+ return create(pageSize, maxMemorySize, maxElementSize, null, true);
+ }
+
+ @Nullable
+ public static SegmentsCache create(
+ int pageSize,
+ MemorySize maxMemorySize,
+ long maxElementSize,
+ @Nullable Duration expireAfterAccess,
+ boolean softValues) {
if (maxMemorySize.getBytes() == 0) {
return null;
}
- return new SegmentsCache<>(pageSize, maxMemorySize, maxElementSize);
+ return new SegmentsCache<>(
+ pageSize, maxMemorySize, maxElementSize, expireAfterAccess, softValues);
}
public long estimatedSize() {
diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
index 53928aef66bf..c28299b79e38 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
@@ -70,6 +70,7 @@
import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_MAX_MEMORY;
import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_MEMORY;
import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_THRESHOLD;
+import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SOFT_VALUES;
import static org.apache.paimon.options.CatalogOptions.CACHE_PARTITION_MAX_NUM;
import static org.apache.paimon.options.CatalogOptions.CACHE_SNAPSHOT_MAX_NUM_PER_TABLE;
import static org.assertj.core.api.Assertions.assertThat;
@@ -537,5 +538,16 @@ public void testManifestCacheOptions() {
caching = (CachingCatalog) CachingCatalog.tryToCreate(catalog, options);
assertThat(caching.manifestCache.maxMemorySize()).isEqualTo(MemorySize.ofMebiBytes(256));
assertThat(caching.manifestCache.maxElementSize()).isEqualTo(Long.MAX_VALUE);
+
+ // soft values default to on and the manifest cache inherits the catalog idle TTL
+ assertThat(caching.manifestCache.softValues()).isTrue();
+ assertThat(caching.manifestCache.ttl()).isEqualTo(CACHE_EXPIRE_AFTER_ACCESS.defaultValue());
+
+ // soft values can be turned off to opt into strong references; the cache still inherits
+ // the catalog idle TTL
+ options.set(CACHE_MANIFEST_SOFT_VALUES, false);
+ caching = (CachingCatalog) CachingCatalog.tryToCreate(catalog, options);
+ assertThat(caching.manifestCache.softValues()).isFalse();
+ assertThat(caching.manifestCache.ttl()).isEqualTo(CACHE_EXPIRE_AFTER_ACCESS.defaultValue());
}
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/SegmentsCacheTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/SegmentsCacheTest.java
new file mode 100644
index 000000000000..5b2e704548f8
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/SegmentsCacheTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.data.SingleSegments;
+import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.options.MemorySize;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link SegmentsCache}. */
+public class SegmentsCacheTest {
+
+ @Test
+ public void testDefaultsSoftValuesEnabledAndNoTtl() {
+ SegmentsCache cache =
+ new SegmentsCache<>(1024, MemorySize.ofKibiBytes(64), Long.MAX_VALUE);
+ assertThat(cache.softValues()).isTrue();
+ assertThat(cache.ttl()).isNull();
+ }
+
+ @Test
+ public void testGettersReflectConstructorArgs() {
+ Duration ttl = Duration.ofMinutes(5);
+ SegmentsCache cache =
+ new SegmentsCache<>(1024, MemorySize.ofKibiBytes(64), 100L, ttl, false);
+ assertThat(cache.softValues()).isFalse();
+ assertThat(cache.ttl()).isEqualTo(ttl);
+ assertThat(cache.pageSize()).isEqualTo(1024);
+ assertThat(cache.maxElementSize()).isEqualTo(100L);
+ assertThat(cache.maxMemorySize()).isEqualTo(MemorySize.ofKibiBytes(64));
+ }
+
+ @Test
+ public void testCreateReturnsNullWhenMemoryZero() {
+ assertThat(SegmentsCache.create(1024, MemorySize.ofBytes(0), Long.MAX_VALUE, null, false))
+ .isNull();
+ }
+
+ @Test
+ public void testCreatePassesThroughTtlAndSoftValues() {
+ Duration ttl = Duration.ofMinutes(7);
+ SegmentsCache cache =
+ SegmentsCache.create(2048, MemorySize.ofKibiBytes(64), 100L, ttl, false);
+ assertThat(cache).isNotNull();
+ assertThat(cache.ttl()).isEqualTo(ttl);
+ assertThat(cache.softValues()).isFalse();
+ assertThat(cache.pageSize()).isEqualTo(2048);
+ }
+
+ @Test
+ public void testCreateDefaultOverloadHasNoTtlAndSoftValues() {
+ SegmentsCache cache =
+ SegmentsCache.create(1024, MemorySize.ofKibiBytes(64), Long.MAX_VALUE);
+ assertThat(cache).isNotNull();
+ assertThat(cache.ttl()).isNull();
+ assertThat(cache.softValues()).isTrue();
+ }
+
+ @Test
+ public void testStrongRefsAreBoundedByWeight() {
+ // With soft values disabled the cache holds strong references, so the only thing keeping
+ // it bounded is weight-based (SIZE) eviction. Insert far more than the budget allows and
+ // assert the retained footprint stays within the configured maximum.
+ MemorySize budget = MemorySize.ofKibiBytes(8);
+ SegmentsCache cache =
+ new SegmentsCache<>(1024, budget, Long.MAX_VALUE, null, false);
+ for (int i = 0; i < 100; i++) {
+ cache.put("k" + i, new SingleSegments(MemorySegment.allocateHeapMemory(1024), 1024));
+ }
+ assertThat(cache.totalCacheBytes()).isLessThanOrEqualTo(budget.getBytes());
+ assertThat(cache.estimatedSize()).isLessThan(100);
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 61c741fee288..678471ea337c 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -500,6 +500,32 @@ public class FlinkConnectorOptions {
.withDescription(
"Controls the cache memory of writer coordinator to cache manifest files in Job Manager.");
+ public static final ConfigOption SINK_WRITER_COORDINATOR_CACHE_EXPIRE_AFTER_ACCESS =
+ key("sink.writer-coordinator.cache-expire-after-access")
+ .durationType()
+ .noDefaultValue()
+ .withDescription(
+ "Optional idle TTL for writer coordinator manifest cache entries. "
+ + "Disabled by default. When set, an entry that has not been "
+ + "accessed within this duration is evicted, releasing its heap. "
+ + "The cache stays bounded by 'sink.writer-coordinator.cache-memory' "
+ + "regardless of this setting.");
+
+ public static final ConfigOption SINK_WRITER_COORDINATOR_CACHE_SOFT_VALUES =
+ key("sink.writer-coordinator.cache-soft-values")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "If true (default), writer coordinator manifest cache entries are held "
+ + "with soft references and may be reclaimed by the GC under "
+ + "memory pressure. This can trigger a cache-thrash spiral "
+ + "where reclaimed entries are refetched, spiking heap and "
+ + "forcing further reclamation. Set to false to hold entries "
+ + "with strong references, breaking the spiral; the cache then "
+ + "stays bounded by weight up to "
+ + "'sink.writer-coordinator.cache-memory' (size the Job Manager "
+ + "total heap memory to at least roughly twice that value).");
+
public static final ConfigOption SINK_WRITER_COORDINATOR_PAGE_SIZE =
key("sink.writer-coordinator.page-size")
.memoryType()
@@ -507,6 +533,18 @@ public class FlinkConnectorOptions {
.withDescription(
"Controls the page size for one RPC request of writer coordinator.");
+ public static final ConfigOption SINK_WRITER_COORDINATOR_PREFETCH_MANIFESTS =
+ key("sink.writer-coordinator.prefetch-manifests")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "If true, the writer coordinator eagerly reads all data manifests of the "
+ + "latest snapshot during refresh to warm the in-Job-Manager manifest "
+ + "cache. This avoids many concurrent cold manifest reads when "
+ + "high-parallelism writers restore at the same time, reducing Job "
+ + "Manager heap pressure at the cost of one full manifest read per "
+ + "refresh.");
+
public static final ConfigOption FILESYSTEM_JOB_LEVEL_SETTINGS_ENABLED =
key("filesystem.job-level-settings.enabled")
.booleanType()
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinator.java
index 0f315880e932..1cff11eeffc1 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinator.java
@@ -59,6 +59,7 @@ public class TableWriteCoordinator {
private final FileStoreScan scan;
private final IndexFileHandler indexFileHandler;
private final int pageSize;
+ private final boolean prefetchManifests;
private final Cache pagedCoordination;
private volatile Snapshot snapshot;
@@ -78,6 +79,10 @@ public TableWriteCoordinator(FileStoreTable table) {
.toConfiguration()
.get(FlinkConnectorOptions.SINK_WRITER_COORDINATOR_PAGE_SIZE)
.getBytes();
+ this.prefetchManifests =
+ table.coreOptions()
+ .toConfiguration()
+ .get(FlinkConnectorOptions.SINK_WRITER_COORDINATOR_PREFETCH_MANIFESTS);
this.pagedCoordination =
Caffeine.newBuilder()
.executor(Runnable::run)
@@ -93,6 +98,20 @@ private synchronized void refresh() {
}
this.snapshot = latestSnapshot.get();
this.scan.withSnapshot(snapshot);
+ if (prefetchManifests) {
+ // Eagerly read all data manifests of the current snapshot once to warm the
+ // table's SegmentsCache (the byte-level manifest cache attached to the table
+ // inside the Job Manager). This uses the same threaded `plan()` read path
+ // that per-task `scan` requests use, so subsequent concurrent requests hit
+ // warm bytes instead of each performing a cold manifest read. A fresh scan is
+ // used so the shared request `scan`'s bucket/partition state never narrows the
+ // warm-up.
+ FileStoreScan prefetchScan = table.store().newScan().withSnapshot(snapshot);
+ if (table.coreOptions().manifestDeleteFileDropStats()) {
+ prefetchScan.dropStats();
+ }
+ prefetchScan.plan();
+ }
}
public synchronized PagedCoordinationResponse scan(PagedCoordinationRequest request)
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/WriteOperatorCoordinator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/WriteOperatorCoordinator.java
index 334333fe760b..3fe12cac2d56 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/WriteOperatorCoordinator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/WriteOperatorCoordinator.java
@@ -18,9 +18,11 @@
package org.apache.paimon.flink.sink.coordinator;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.sink.TableWriteOperator;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.SegmentsCache;
@@ -31,11 +33,14 @@
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ThreadPoolExecutor;
+import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_WRITER_COORDINATOR_CACHE_EXPIRE_AFTER_ACCESS;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_WRITER_COORDINATOR_CACHE_MEMORY;
+import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_WRITER_COORDINATOR_CACHE_SOFT_VALUES;
import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
/**
@@ -56,13 +61,28 @@ public WriteOperatorCoordinator(FileStoreTable table) {
@Override
public void start() throws Exception {
executor = createCachedThreadPool(1, "WriteCoordinator");
- MemorySize cacheMemory =
- table.coreOptions().toConfiguration().get(SINK_WRITER_COORDINATOR_CACHE_MEMORY);
- SegmentsCache manifestCache = SegmentsCache.create(cacheMemory, Long.MAX_VALUE);
- table.setManifestCache(manifestCache);
+ table.setManifestCache(buildManifestCache(table.coreOptions().toConfiguration()));
coordinator = new TableWriteCoordinator(table);
}
+ /**
+ * Build the writer coordinator manifest cache from the given options. The idle TTL is disabled
+ * unless {@code sink.writer-coordinator.cache-expire-after-access} is set; the cache stays
+ * bounded by weight up to {@code sink.writer-coordinator.cache-memory} either way.
+ */
+ static SegmentsCache buildManifestCache(Options tableOptions) {
+ MemorySize cacheMemory = tableOptions.get(SINK_WRITER_COORDINATOR_CACHE_MEMORY);
+ Duration cacheExpireAfterAccess =
+ tableOptions.get(SINK_WRITER_COORDINATOR_CACHE_EXPIRE_AFTER_ACCESS);
+ boolean cacheSoftValues = tableOptions.get(SINK_WRITER_COORDINATOR_CACHE_SOFT_VALUES);
+ return SegmentsCache.create(
+ (int) CoreOptions.PAGE_SIZE.defaultValue().getBytes(),
+ cacheMemory,
+ Long.MAX_VALUE,
+ cacheExpireAfterAccess,
+ cacheSoftValues);
+ }
+
@Override
public void close() throws Exception {
if (executor != null) {
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinatorTest.java
index 8bc952f9baf4..16367204b6c3 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinatorTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinatorTest.java
@@ -18,19 +18,34 @@
package org.apache.paimon.flink.sink.coordinator;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.SegmentsCache;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import java.lang.reflect.Field;
+import java.time.Duration;
+import java.util.List;
+import java.util.stream.Collectors;
+
import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
+import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_WRITER_COORDINATOR_CACHE_EXPIRE_AFTER_ACCESS;
+import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_WRITER_COORDINATOR_CACHE_MEMORY;
+import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_WRITER_COORDINATOR_CACHE_SOFT_VALUES;
import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -66,6 +81,140 @@ public void testLatestIdentifierAndScan(boolean initSnapshot) throws Exception {
assertThat(scan.extractDataFiles().size()).isEqualTo(initSnapshot ? 2 : 1);
}
+ @Test
+ public void testPrefetchManifestsWarmsCache() throws Exception {
+ Identifier identifier = new Identifier("db", "table");
+ Schema schema =
+ Schema.newBuilder()
+ .column("f0", DataTypes.INT())
+ .option(
+ FlinkConnectorOptions.SINK_WRITER_COORDINATOR_PREFETCH_MANIFESTS
+ .key(),
+ "true")
+ .build();
+ catalog.createDatabase("db", false);
+ catalog.createTable(identifier, schema, false);
+ FileStoreTable table = getTable(identifier);
+
+ write(table, GenericRow.of(1));
+ write(table, GenericRow.of(2));
+
+ // reset the manifest cache to a fresh, cold instance (the writes above may have populated
+ // it) so we can assert that constructing the coordinator is what warms it
+ // the existing cache on the table comes from CachingCatalog, which is distinct from
+ // TableWriteCoordinator
+ SegmentsCache cache = table.getManifestCache();
+ table.setManifestCache(
+ SegmentsCache.create(
+ cache.pageSize(),
+ cache.maxMemorySize(),
+ cache.maxElementSize(),
+ cache.ttl(),
+ cache.softValues()));
+ assertThat(table.getManifestCache().totalCacheBytes()).isZero();
+
+ // constructing the coordinator runs refresh() which warms the manifest cache when the
+ // prefetch option is enabled
+ TableWriteCoordinator coordinator = new TableWriteCoordinator(table);
+ assertThat(table.getManifestCache().totalCacheBytes()).isGreaterThan(0);
+
+ // scan results remain correct after warming
+ ScanCoordinationRequest request =
+ new ScanCoordinationRequest(serializeBinaryRow(EMPTY_ROW), 0, false, false);
+ ScanCoordinationResponse scan = coordinator.scan(request);
+ assertThat(scan.snapshot().id()).isEqualTo(table.latestSnapshot().get().id());
+ assertThat(scan.extractDataFiles().size()).isEqualTo(2);
+ }
+
+ @Test
+ public void testPrefetchWarmsAllManifestsAfterScan() throws Exception {
+ Identifier identifier = new Identifier("db", "table");
+ // a fixed-bucket table so the data spans multiple buckets
+ Schema schema =
+ Schema.newBuilder()
+ .column("f0", DataTypes.INT())
+ .option(CoreOptions.BUCKET.key(), "2")
+ .option(CoreOptions.BUCKET_KEY.key(), "f0")
+ .build();
+ catalog.createDatabase("db", false);
+ catalog.createTable(identifier, schema, false);
+ FileStoreTable table = getTable(identifier);
+
+ // write each bucket in its own commit so manifests are confined to a single bucket: a scan
+ // for one bucket must skip the other bucket's manifest at the manifest-file level
+ writeWithBucketAssigner(table, row -> 0, GenericRow.of(1));
+ writeWithBucketAssigner(table, row -> 1, GenericRow.of(2));
+
+ // the scan returns the entries of both buckets, confirming the table spans more than one
+ // bucket
+ Snapshot latest = table.latestSnapshot().get();
+ List entries = table.store().newScan().withSnapshot(latest).plan().files();
+ assertThat(entries.stream().map(ManifestEntry::bucket).collect(Collectors.toSet()))
+ .containsExactlyInAnyOrder(0, 1);
+
+ // construct the coordinator with prefetch disabled (the default) on a cold cache, so the
+ // cache its shared scan is bound to stays cold until the scan request runs
+ SegmentsCache cache = table.getManifestCache();
+ table.setManifestCache(
+ SegmentsCache.create(
+ cache.pageSize(),
+ cache.maxMemorySize(),
+ cache.maxElementSize(),
+ cache.ttl(),
+ cache.softValues()));
+ TableWriteCoordinator coordinator = new TableWriteCoordinator(table);
+ assertThat(table.getManifestCache().totalCacheBytes()).isZero();
+
+ // a scan request for bucket 0 reads only the bucket-0 manifest, skipping the bucket-1
+ // manifest at the manifest-file level: the cache therefore holds only a single bucket's
+ // manifest, proving the bucket filter is active (and leaving the stale bucket state on the
+ // shared scan)
+ ScanCoordinationRequest request =
+ new ScanCoordinationRequest(serializeBinaryRow(EMPTY_ROW), 0, false, false);
+ coordinator.scan(request);
+ long filteredCacheBytes = table.getManifestCache().totalCacheBytes();
+ assertThat(filteredCacheBytes).isGreaterThan(0);
+
+ // enable prefetch via reflection (to avoid widening the coordinator's interface) and run a
+ // checkpoint refresh; the prefetch must warm the full set of manifests rather than
+ // inheriting the stale bucket state, so the cache grows beyond the single-bucket subset
+ Field prefetchManifests = TableWriteCoordinator.class.getDeclaredField("prefetchManifests");
+ prefetchManifests.setAccessible(true);
+ prefetchManifests.setBoolean(coordinator, true);
+ coordinator.checkpoint();
+ assertThat(table.getManifestCache().totalCacheBytes()).isGreaterThan(filteredCacheBytes);
+ }
+
+ @Test
+ public void testBuildManifestCacheOptions() {
+ // by default soft values are on and there is no idle TTL; the cache is bounded by memory
+ Options defaults = new Options();
+ SegmentsCache cache = WriteOperatorCoordinator.buildManifestCache(defaults);
+ assertThat(cache.softValues()).isTrue();
+ assertThat(cache.ttl()).isNull();
+ assertThat(cache.maxMemorySize())
+ .isEqualTo(SINK_WRITER_COORDINATOR_CACHE_MEMORY.defaultValue());
+
+ // an explicit expire-after-access TTL is honored
+ Options withTtl = new Options();
+ withTtl.set(SINK_WRITER_COORDINATOR_CACHE_EXPIRE_AFTER_ACCESS, Duration.ofMinutes(5));
+ cache = WriteOperatorCoordinator.buildManifestCache(withTtl);
+ assertThat(cache.ttl()).isEqualTo(Duration.ofMinutes(5));
+ assertThat(cache.softValues()).isTrue();
+
+ // disabling soft values switches to strong references; still no TTL by default
+ Options strongRefs = new Options();
+ strongRefs.set(SINK_WRITER_COORDINATOR_CACHE_SOFT_VALUES, false);
+ cache = WriteOperatorCoordinator.buildManifestCache(strongRefs);
+ assertThat(cache.softValues()).isFalse();
+ assertThat(cache.ttl()).isNull();
+
+ // a zero cache memory disables the cache entirely
+ Options noCache = new Options();
+ noCache.set(SINK_WRITER_COORDINATOR_CACHE_MEMORY, MemorySize.ofBytes(0));
+ assertThat(WriteOperatorCoordinator.buildManifestCache(noCache)).isNull();
+ }
+
@Test
public void testNoManifestCache() throws Exception {
Identifier identifier = new Identifier("db", "table");