From 1643c026ea027dfe10c1b8e4751e06e01f69d968 Mon Sep 17 00:00:00 2001 From: mao-liu <1684060+mao-liu@users.noreply.github.com> Date: Tue, 9 Jun 2026 10:08:55 +1000 Subject: [PATCH 1/3] Expose more manifest cache options + benchmarks Remove cache page size changes - not needed Tidying up --- docs/docs/maintenance/write-performance.md | 16 + docs/generated/catalog_configuration.html | 6 + .../flink_connector_configuration.html | 18 + .../apache/paimon/options/CatalogOptions.java | 14 + .../benchmark/WriteRestoreScanBenchmark.java | 642 ++++++++++++++++++ .../apache/paimon/catalog/CachingCatalog.java | 11 +- .../apache/paimon/utils/SegmentsCache.java | 55 +- .../paimon/catalog/CachingCatalogTest.java | 12 + .../paimon/utils/SegmentsCacheTest.java | 94 +++ .../paimon/flink/FlinkConnectorOptions.java | 38 ++ .../coordinator/TableWriteCoordinator.java | 17 + .../coordinator/WriteOperatorCoordinator.java | 28 +- .../TableWriteCoordinatorTest.java | 85 +++ 13 files changed, 1026 insertions(+), 10 deletions(-) create mode 100644 paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/WriteRestoreScanBenchmark.java create mode 100644 paimon-core/src/test/java/org/apache/paimon/utils/SegmentsCacheTest.java diff --git a/docs/docs/maintenance/write-performance.md b/docs/docs/maintenance/write-performance.md index 670260715eaf..d1ca2a414d4e 100644 --- a/docs/docs/maintenance/write-performance.md +++ b/docs/docs/maintenance/write-performance.md @@ -128,6 +128,22 @@ here (For example, writing a large number of partitions simultaneously), you can to use a Flink coordinator to cache the read manifest data to accelerate initialization. The cache memory for coordinator is `sink.writer-coordinator.cache-memory`, default is 1GB in Job Manager. +The coordinator manifest cache normally holds entries with soft references, so the JVM can reclaim them when the Job +Manager runs low on heap. On a heavily loaded Job Manager this can backfire: the JVM reclaims cached manifests, writers +immediately re-read and decompress them, and that work drives heap back up, triggering more reclamation. The cache +thrashes instead of helping. + +If you see this, set `sink.writer-coordinator.cache-soft-values` to `false`. Entries are then held with strong +references, so GC never reclaims them and the thrash loop cannot start. + +With soft references off the cache no longer shrinks under GC, but it stays bounded by weight: it occupies up to +`sink.writer-coordinator.cache-memory` and evicts the least-recently-used entries beyond that. Size the Job Manager +total heap memory to at least twice that value so an undersized heap fails fast with an `OutOfMemoryError` instead of +degrading silently. Optionally set `sink.writer-coordinator.cache-expire-after-access` to also release entries that +have been idle for a while. + +The same `cache.manifest.soft-values` / `cache.manifest.max-memory` settings apply to the catalog manifest cache. + ## Write Memory There are three main places in Paimon writer that takes up memory: diff --git a/docs/generated/catalog_configuration.html b/docs/generated/catalog_configuration.html index 74053392d1ca..5464bccf3d38 100644 --- a/docs/generated/catalog_configuration.html +++ b/docs/generated/catalog_configuration.html @@ -68,6 +68,12 @@ MemorySize Controls the threshold of small manifest file. + +
cache.manifest.soft-values
+ true + Boolean + If true (default), manifest cache entries are held with soft references and may be reclaimed by the GC under memory pressure. This can trigger a cache-thrash spiral where reclaimed entries are refetched, spiking heap and forcing further reclamation. Set to false to hold entries with strong references, breaking the spiral; the cache then stays bounded by weight up to 'cache.manifest.max-memory' (size the total heap memory to at least roughly twice that value). +
cache.partition.max-num
0 diff --git a/docs/generated/flink_connector_configuration.html b/docs/generated/flink_connector_configuration.html index 686349e7bdfc..0f2786b038dc 100644 --- a/docs/generated/flink_connector_configuration.html +++ b/docs/generated/flink_connector_configuration.html @@ -302,12 +302,24 @@ Boolean If true, flink sink will use managed memory for merge tree; otherwise, it will create an independent memory allocator. + +
sink.writer-coordinator.cache-expire-after-access
+ (none) + Duration + Optional idle TTL for writer coordinator manifest cache entries. Disabled by default. When set, an entry that has not been accessed within this duration is evicted, releasing its heap. The cache stays bounded by 'sink.writer-coordinator.cache-memory' regardless of this setting. +
sink.writer-coordinator.cache-memory
2 gb MemorySize Controls the cache memory of writer coordinator to cache manifest files in Job Manager. + +
sink.writer-coordinator.cache-soft-values
+ true + Boolean + If true (default), writer coordinator manifest cache entries are held with soft references and may be reclaimed by the GC under memory pressure. This can trigger a cache-thrash spiral where reclaimed entries are refetched, spiking heap and forcing further reclamation. Set to false to hold entries with strong references, breaking the spiral; the cache then stays bounded by weight up to 'sink.writer-coordinator.cache-memory' (size the Job Manager total heap memory to at least roughly twice that value). +
sink.writer-coordinator.enabled
false @@ -320,6 +332,12 @@ MemorySize Controls the page size for one RPC request of writer coordinator. + +
sink.writer-coordinator.prefetch-manifests
+ false + Boolean + If true, the writer coordinator eagerly reads all data manifests of the latest snapshot during refresh to warm the in-Job-Manager manifest cache. This avoids many concurrent cold manifest reads when high-parallelism writers restore at the same time, reducing Job Manager heap pressure at the cost of one full manifest read per refresh. +
sink.writer-cpu
1.0 diff --git a/paimon-api/src/main/java/org/apache/paimon/options/CatalogOptions.java b/paimon-api/src/main/java/org/apache/paimon/options/CatalogOptions.java index f900603897bc..609d7ea436b7 100644 --- a/paimon-api/src/main/java/org/apache/paimon/options/CatalogOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/options/CatalogOptions.java @@ -130,6 +130,20 @@ public class CatalogOptions { .noDefaultValue() .withDescription("Controls the maximum memory to cache manifest content."); + public static final ConfigOption CACHE_MANIFEST_SOFT_VALUES = + key("cache.manifest.soft-values") + .booleanType() + .defaultValue(true) + .withDescription( + "If true (default), manifest cache entries are held with soft references " + + "and may be reclaimed by the GC under memory pressure. This can " + + "trigger a cache-thrash spiral where reclaimed entries are " + + "refetched, spiking heap and forcing further reclamation. Set to " + + "false to hold entries with strong references, breaking the spiral; " + + "the cache then stays bounded by weight up to " + + "'cache.manifest.max-memory' (size the total heap memory to at " + + "least roughly twice that value)."); + public static final ConfigOption CACHE_SNAPSHOT_MAX_NUM_PER_TABLE = key("cache.snapshot.max-num-per-table") .intType() diff --git a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/WriteRestoreScanBenchmark.java b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/WriteRestoreScanBenchmark.java new file mode 100644 index 000000000000..8a49baeed580 --- /dev/null +++ b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/WriteRestoreScanBenchmark.java @@ -0,0 +1,642 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.benchmark; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.fs.FileStatus; +import org.apache.paimon.fs.Path; +import org.apache.paimon.manifest.BucketEntry; +import org.apache.paimon.operation.FileSystemWriteRestore; +import org.apache.paimon.options.CatalogOptions; +import org.apache.paimon.options.MemorySize; +import org.apache.paimon.options.Options; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.sink.BatchTableCommit; +import org.apache.paimon.table.sink.BatchTableWrite; +import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.IntType; +import org.apache.paimon.utils.FileStorePathFactory; +import org.apache.paimon.utils.SegmentsCache; + +import org.apache.commons.math3.random.RandomDataGenerator; +import org.junit.jupiter.api.Test; + +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryPoolMXBean; +import java.lang.management.MemoryType; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Benchmark for the {@link FileSystemWriteRestore#restoreFiles} hot loop, instrumented to surface + * the manifest-cache memory spike that writers can pay during cold cache population. + * + *

Builds a primary-key table with many partitions and a small number of rows per partition, then + * enumerates every (partition, bucket) pair and invokes {@code restoreFiles} on each — the same + * call pattern a writer pays during restore. The arms isolate the contribution of the {@link + * SegmentsCache} (the byte-level manifest cache) and of its {@code soft-values} configuration: + * + *

+ * + *

Spike-reproduction characteristics, applied uniformly to all arms: + * + *

+ */ +public class WriteRestoreScanBenchmark extends TableBenchmark { + + /** + * Default parallelism for the restore worker pool. Bumping this approximates packing more Flink + * writer subtasks onto a single TM. + */ + private static final int NUM_RESTORE_THREADS = 4; + + /** All tunables for one benchmark run. */ + private static final class BenchParams { + int numPartitions = 2_000; + int rowsPerPartition = 16; + int numBuckets = 4; + + /** Smaller -> more, smaller manifest files (fragmentation). */ + int commitBatchPartitions = 10; + + /** Number of value columns; widens DataFileMeta stats per manifest entry. */ + int valueCount = 10; + + /** Length of each random hex value string; widens per-stat min/max blobs. */ + int valueCharCount = 64; + + /** Parallelism for the restore worker pool. */ + int numRestoreThreads = NUM_RESTORE_THREADS; + + int numWarmupIters = 1; + int numMeasuredIters = 3; + } + + /** + * Manifest-directory bytes on disk, split by file-name prefix. Constant across iterations (the + * table is populated once), but captured per-iteration so each {@link FootprintSample} is + * self-contained. + */ + private static final class DiskFootprint { + final long manifestBytes; + final int manifestCount; + final long manifestListBytes; + final int manifestListCount; + final long indexManifestBytes; + final int indexManifestCount; + final long total; + + private DiskFootprint( + long manifestBytes, + int manifestCount, + long manifestListBytes, + int manifestListCount, + long indexManifestBytes, + int indexManifestCount) { + this.manifestBytes = manifestBytes; + this.manifestCount = manifestCount; + this.manifestListBytes = manifestListBytes; + this.manifestListCount = manifestListCount; + this.indexManifestBytes = indexManifestBytes; + this.indexManifestCount = indexManifestCount; + this.total = manifestBytes + manifestListBytes + indexManifestBytes; + } + + /** List the table's manifest directory and classify each file by name prefix. */ + static DiskFootprint scan(FileStoreTable fst) throws Exception { + Path manifestDir = new Path(fst.snapshotManager().tablePath(), "manifest"); + FileStatus[] statuses = fst.snapshotManager().fileIO().listStatus(manifestDir); + long manifestBytes = 0; + long manifestListBytes = 0; + long indexManifestBytes = 0; + int manifestCount = 0; + int manifestListCount = 0; + int indexManifestCount = 0; + for (FileStatus s : statuses) { + String fileName = s.getPath().getName(); + // INDEX_MANIFEST_PREFIX and MANIFEST_LIST_PREFIX both start with "manifest-", + // so the more specific prefixes must be checked first. + if (fileName.startsWith(FileStorePathFactory.INDEX_MANIFEST_PREFIX)) { + indexManifestBytes += s.getLen(); + indexManifestCount++; + } else if (fileName.startsWith(FileStorePathFactory.MANIFEST_LIST_PREFIX)) { + manifestListBytes += s.getLen(); + manifestListCount++; + } else if (fileName.startsWith(FileStorePathFactory.MANIFEST_PREFIX)) { + manifestBytes += s.getLen(); + manifestCount++; + } + } + return new DiskFootprint( + manifestBytes, + manifestCount, + manifestListBytes, + manifestListCount, + indexManifestBytes, + indexManifestCount); + } + } + + /** + * All numbers captured during a single iteration's footprint print. The aggregate at the end of + * {@link #innerTest} consumes one of these per iteration so it can report SegmentsCache and + * Heap dimensions side by side. + */ + private static final class FootprintSample { + final DiskFootprint disk; + + /** {@code null} when no {@link SegmentsCache} is attached to the table. */ + final Long segmentsCacheBytes; + + final long beforeHeap; + final long peakHeap; + final long afterGcHeap; + + FootprintSample( + DiskFootprint disk, + Long segmentsCacheBytes, + long beforeHeap, + long peakHeap, + long afterGcHeap) { + this.disk = disk; + this.segmentsCacheBytes = segmentsCacheBytes; + this.beforeHeap = beforeHeap; + this.peakHeap = peakHeap; + this.afterGcHeap = afterGcHeap; + } + } + + /** Sum/min/max/count accumulator for the per-metric aggregate over measured iterations. */ + private static final class LongStats { + long sum = 0; + long min = Long.MAX_VALUE; + long max = Long.MIN_VALUE; + int count = 0; + + void accept(long value) { + sum += value; + min = Math.min(min, value); + max = Math.max(max, value); + count++; + } + + long avg() { + return count == 0 ? 0 : sum / count; + } + } + + @Test + public void testRestoreFiles_segmentsCacheDisabled() throws Exception { + Options catalogOptions = new Options(); + catalogOptions.set(CatalogOptions.CACHE_ENABLED, false); + Options tableOptions = new Options(); + + BenchParams p = new BenchParams(); + innerTest("segmentsCacheDisabled", catalogOptions, tableOptions, p); + + /* + OpenJDK 64-Bit Server VM 11.0.28+0 on Mac OS X 26.5 + Apple M4 Pro + segmentsCacheDisabled: Best/Avg Time(ms) Row Rate(K/s) Per Row(ns) Relative + ----------------------------------------------------------------------------------------------------------- + OPERATORTEST_segmentsCacheDisabled_restore 20299 / 20792 0.4 2537410.2 1.0X + + Manifest cache footprint aggregate (segmentsCacheDisabled, 3 measured iters): + Disk manifests=1,703,457 bytes (26 files), manifest-lists=28,363 bytes (20 files), index-manifests=0 bytes (0 files); total=1,731,820 bytes + SegmentsCache n/a (no manifest cache attached to table — cache disabled) + Heap bytes before avg=54,524,533, min=54,460,872, max=54,557,016 + Heap bytes peak avg=470,198,938, min=396,855,096, max=560,410,880 + Heap bytes after-gc avg=54,507,109, min=54,459,880, max=54,556,952 + */ + } + + @Test + public void testRestoreFiles_segmentsCacheEnabled() throws Exception { + Options catalogOptions = new Options(); + Options tableOptions = new Options(); + catalogOptions.set( + CatalogOptions.CACHE_MANIFEST_SMALL_FILE_MEMORY, MemorySize.ofMebiBytes(2048)); + catalogOptions.set(CatalogOptions.CACHE_MANIFEST_MAX_MEMORY, MemorySize.ofMebiBytes(4096)); + catalogOptions.set(CatalogOptions.CACHE_MANIFEST_SOFT_VALUES, false); + + BenchParams p = new BenchParams(); + innerTest("segmentsCacheEnabled", catalogOptions, tableOptions, p); + /* + OpenJDK 64-Bit Server VM 11.0.28+0 on Mac OS X 26.5 + Apple M4 Pro + segmentsCacheEnabled: Best/Avg Time(ms) Row Rate(K/s) Per Row(ns) Relative + ---------------------------------------------------------------------------------------------------------- + OPERATORTEST_segmentsCacheEnabled_restore 675 / 679 11.9 84382.8 1.0X + + Manifest cache footprint aggregate (segmentsCacheEnabled, 3 measured iters): + Disk manifests=1,773,080 bytes (26 files), manifest-lists=28,406 bytes (20 files), index-manifests=0 bytes (0 files); total=1,801,486 bytes + SegmentsCache bytes avg=16,422,496, min=16,422,496, max=16,422,496 + Heap bytes before avg=71,640,053, min=70,333,608, max=73,453,168 + Heap bytes peak avg=460,976,730, min=443,345,432, max=480,326,824 + Heap bytes after-gc avg=72,115,114, min=71,131,888, max=73,451,672 + */ + } + + private void innerTest(String name, Options catalogOptions, Options tableOptions, BenchParams p) + throws Exception { + Table table = createPartitionedTable(catalogOptions, tableOptions, "T", p); + populateTable(table, p); + + FileStoreTable fst = (FileStoreTable) table; + List bucketEntries = fst.newSnapshotReader().bucketEntries(); + System.out.printf( + "Populated table has %d (partition, bucket) pairs across %d partitions " + + "(%d restore threads, %dx value cols, %d-char values, commit batch=%d).%n", + bucketEntries.size(), + p.numPartitions, + p.numRestoreThreads, + p.valueCount, + p.valueCharCount, + p.commitBatchPartitions); + + long valuesPerIteration = bucketEntries.size(); + ExecutorService executor = Executors.newFixedThreadPool(p.numRestoreThreads); + AtomicInteger iterCounter = new AtomicInteger(0); + List perIterSamples = + new ArrayList<>(p.numWarmupIters + p.numMeasuredIters); + + try { + Benchmark benchmark = + new Benchmark(name, valuesPerIteration) + .setNumWarmupIters(p.numWarmupIters) + .setOutputPerIteration(true); + benchmark.addCase( + "restore", + p.numMeasuredIters, + () -> { + int iter = iterCounter.getAndIncrement(); + String iterLabel = + iter < p.numWarmupIters + ? "warmup-" + iter + : "iter-" + (iter - p.numWarmupIters); + try { + perIterSamples.add( + runMeasuredIteration( + name + " " + iterLabel, fst, executor, bucketEntries)); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + benchmark.run(); + } finally { + executor.shutdownNow(); + } + + printAggregateFootprint(name, p, perIterSamples); + } + + /** + * Run one iteration end to end: reset the cache to its cold state, sample heap before, drive + * {@code restoreFiles} across every (partition, bucket) pair on the worker pool, sample heap + * peak and post-GC usage, then print and return the footprint. + */ + private FootprintSample runMeasuredIteration( + String label, + FileStoreTable fst, + ExecutorService executor, + List bucketEntries) + throws Exception { + resetCachesForIteration(fst); + // Fully run gc before starting the iteration. + fullGc(); + + // Fresh ThreadLocal each iteration so the first worker access constructs a fresh + // FileSystemWriteRestore + scan that observes the just-reset cache. (AbstractFileStoreScan + // is stateful, so one FSWR per thread is required.) + ThreadLocal threadLocalRestore = + ThreadLocal.withInitial( + () -> + new FileSystemWriteRestore( + fst.coreOptions(), + fst.snapshotManager(), + fst.store().newScan(), + fst.store().newIndexFileHandler())); + + resetHeapPeak(); + long before = currentHeapUsage(); + + runRestoreAcrossBuckets(executor, threadLocalRestore, bucketEntries); + + long peak = peakHeapUsage(); + fullGc(); + long afterGc = currentHeapUsage(); + + return printCacheFootprint(label, fst, before, peak, afterGc); + } + + /** Submit a {@code restoreFiles} task per (partition, bucket) pair and wait for all of them. */ + private static void runRestoreAcrossBuckets( + ExecutorService executor, + ThreadLocal threadLocalRestore, + List bucketEntries) { + List> futures = new ArrayList<>(bucketEntries.size()); + for (BucketEntry entry : bucketEntries) { + futures.add( + executor.submit( + () -> + threadLocalRestore + .get() + .restoreFiles( + entry.partition(), + entry.bucket(), + false, + false))); + } + for (Future f : futures) { + try { + f.get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + private Table createPartitionedTable( + Options catalogOptions, Options tableOptions, String tableName, BenchParams p) + throws Exception { + catalogOptions.set(CatalogOptions.WAREHOUSE, tempFile.toUri().toString()); + Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(catalogOptions)); + String database = "default"; + catalog.createDatabase(database, true); + + List fields = new ArrayList<>(); + fields.add(new DataField(0, "pt", new IntType())); + fields.add(new DataField(1, "k", new IntType())); + for (int i = 0; i < p.valueCount; i++) { + fields.add(new DataField(2 + i, "f" + i, DataTypes.STRING())); + } + + tableOptions.set(CoreOptions.BUCKET, p.numBuckets); + tableOptions.set(CoreOptions.WRITE_ONLY, false); + tableOptions.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 10); + + // Primary keys must include all partition keys, so PK = (pt, k). + Schema schema = + new Schema( + fields, + Collections.singletonList("pt"), + Arrays.asList("pt", "k"), + tableOptions.toMap(), + ""); + Identifier identifier = Identifier.create(database, tableName); + catalog.createTable(identifier, schema, false); + return catalog.getTable(identifier); + } + + private void populateTable(Table table, BenchParams p) throws Exception { + BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder(); + RandomDataGenerator random = new RandomDataGenerator(); + for (int batchStart = 0; + batchStart < p.numPartitions; + batchStart += p.commitBatchPartitions) { + int batchEnd = Math.min(batchStart + p.commitBatchPartitions, p.numPartitions); + try (BatchTableWrite write = writeBuilder.newWrite(); + BatchTableCommit commit = writeBuilder.newCommit()) { + for (int pt = batchStart; pt < batchEnd; pt++) { + for (int k = 0; k < p.rowsPerPartition; k++) { + write.write(makeRow(pt, k, random, p)); + } + } + commit.commit(write.prepareCommit()); + } + } + } + + private InternalRow makeRow(int pt, int k, RandomDataGenerator random, BenchParams p) { + GenericRow row = new GenericRow(2 + p.valueCount); + row.setField(0, pt); + row.setField(1, k); + for (int i = 0; i < p.valueCount; i++) { + row.setField(2 + i, BinaryString.fromString(random.nextHexString(p.valueCharCount))); + } + return row; + } + + /** + * Reset the manifest {@link SegmentsCache} in play for this table. Always called at the start + * of every iteration so each measured iteration pays the cold-populate cost (the + * production-onset condition we're trying to reproduce). + * + *

If a {@link SegmentsCache} is attached (per-table, attached by {@code + * CachingCatalog.putTableCache} when {@code CACHE_ENABLED=true}), it is replaced with a fresh + * instance preserving {@code pageSize} / {@code maxMemorySize} / {@code maxElementSize} / + * {@code ttl} / {@code softValues}. Replacing (rather than {@code invalidateAll()}-ing) + * sidesteps Caffeine's asynchronous eviction so the cold state is deterministic. A no-op when + * no cache is attached. + */ + private static void resetCachesForIteration(FileStoreTable fst) { + SegmentsCache original = fst.getManifestCache(); + if (original != null) { + fst.setManifestCache( + SegmentsCache.create( + original.pageSize(), + original.maxMemorySize(), + original.maxElementSize(), + original.ttl(), + original.softValues())); + } + } + + /** + * Print a per-iteration footprint summary: total manifest directory bytes on disk (split by + * file-name prefix), the table's {@link SegmentsCache} accounting bytes, the just-sampled heap + * before/peak and post-GC usage, and memory-to-disk plus {@code Peak/After-GC} (spike + * multiplier) ratios. + * + *

Caveats: + * + *

    + *
  • {@link SegmentsCache#totalCacheBytes()} walks {@code cache.asMap()} and re-applies the + * weigher per entry — it's an O(N) snapshot, fine here but not a free read. + *
  • Peak is per-pool sum: {@code MemoryPoolMXBean.getPeakUsage()} is per-pool and peaks + * don't necessarily coincide across pools; summing slightly overcounts. Accurate enough + * for order-of-magnitude spike comparison. + *
+ */ + private FootprintSample printCacheFootprint( + String label, FileStoreTable fst, long before, long peak, long afterGc) + throws Exception { + DiskFootprint disk = DiskFootprint.scan(fst); + + SegmentsCache sc = fst.getManifestCache(); + Long segmentsCacheBytes = sc == null ? null : sc.totalCacheBytes(); + String segmentsCacheLine; + if (sc == null) { + segmentsCacheLine = + "SegmentsCache n/a (no manifest cache attached to table — cache disabled)"; + } else { + segmentsCacheLine = + String.format( + "SegmentsCache bytes=%,d (estimatedSize=%d, maxMemory=%s, maxElementSize=%d)", + segmentsCacheBytes, + sc.estimatedSize(), + sc.maxMemorySize(), + sc.maxElementSize()); + } + + System.out.println(); + System.out.println("Manifest cache footprint (" + label + "):"); + printDiskLine(disk); + System.out.println(" " + segmentsCacheLine); + System.out.printf( + " Heap before=%,d bytes, peak=%,d bytes, after-gc=%,d bytes%n", + before, peak, afterGc); + System.out.println(); + + return new FootprintSample(disk, segmentsCacheBytes, before, peak, afterGc); + } + + private static void printDiskLine(DiskFootprint disk) { + System.out.printf( + " Disk manifests=%,d bytes (%d files), manifest-lists=%,d bytes (%d files), index-manifests=%,d bytes (%d files); total=%,d bytes%n", + disk.manifestBytes, + disk.manifestCount, + disk.manifestListBytes, + disk.manifestListCount, + disk.indexManifestBytes, + disk.indexManifestCount, + disk.total); + } + + /** + * Print one-block aggregate over the measured iterations (warmup iterations skipped). + * Reports Disk (constant — printed once), {@link SegmentsCache} bytes (avg/min/max + avg ratio + * to disk), and Heap before/peak/after-GC (avg/min/max + Peak/After-GC spike multiplier + + * heap/disk ratios). + */ + private void printAggregateFootprint( + String name, BenchParams p, List samples) { + int start = p.numWarmupIters; + int end = samples.size(); + if (start >= end) { + return; + } + int n = end - start; + DiskFootprint disk = samples.get(start).disk; + + // SegmentsCache: aggregate non-null sample bytes; treat as absent if every sample is null. + LongStats sc = new LongStats(); + LongStats before = new LongStats(); + LongStats peak = new LongStats(); + LongStats afterGc = new LongStats(); + + for (int i = start; i < end; i++) { + FootprintSample s = samples.get(i); + if (s.segmentsCacheBytes != null) { + sc.accept(s.segmentsCacheBytes); + } + before.accept(s.beforeHeap); + peak.accept(s.peakHeap); + afterGc.accept(s.afterGcHeap); + } + + System.out.println( + "Manifest cache footprint aggregate (" + name + ", " + n + " measured iters):"); + printDiskLine(disk); + if (sc.count > 0) { + System.out.printf( + " SegmentsCache bytes avg=%,d, min=%,d, max=%,d%n", sc.avg(), sc.min, sc.max); + } else { + System.out.println( + " SegmentsCache n/a (no manifest cache attached to table — cache disabled)"); + } + System.out.printf( + " Heap bytes before avg=%,d, min=%,d, max=%,d%n", + before.avg(), before.min, before.max); + System.out.printf( + " Heap bytes peak avg=%,d, min=%,d, max=%,d%n", + peak.avg(), peak.min, peak.max); + System.out.printf( + " Heap bytes after-gc avg=%,d, min=%,d, max=%,d%n", + afterGc.avg(), afterGc.min, afterGc.max); + System.out.println(); + } + + private static long currentHeapUsage() { + return ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed(); + } + + private static void fullGc() { + System.gc(); + System.runFinalization(); + System.gc(); + } + + private static void resetHeapPeak() { + for (MemoryPoolMXBean pool : ManagementFactory.getMemoryPoolMXBeans()) { + if (pool.getType() == MemoryType.HEAP) { + pool.resetPeakUsage(); + } + } + } + + private static long peakHeapUsage() { + long peak = 0; + for (MemoryPoolMXBean pool : ManagementFactory.getMemoryPoolMXBeans()) { + if (pool.getType() == MemoryType.HEAP) { + peak += pool.getPeakUsage().getUsed(); + } + } + return peak; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java index 9fc7b124f905..afe4ed8ae6de 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java @@ -18,6 +18,7 @@ package org.apache.paimon.catalog; +import org.apache.paimon.CoreOptions; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.fs.Path; import org.apache.paimon.options.MemorySize; @@ -52,6 +53,7 @@ import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_MAX_MEMORY; import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_MEMORY; import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_THRESHOLD; +import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SOFT_VALUES; import static org.apache.paimon.options.CatalogOptions.CACHE_PARTITION_MAX_NUM; import static org.apache.paimon.options.CatalogOptions.CACHE_SNAPSHOT_MAX_NUM_PER_TABLE; import static org.apache.paimon.utils.Preconditions.checkNotNull; @@ -97,7 +99,14 @@ public CachingCatalog(Catalog wrapped, Options options) { } this.snapshotMaxNumPerTable = options.get(CACHE_SNAPSHOT_MAX_NUM_PER_TABLE); - this.manifestCache = SegmentsCache.create(manifestMaxMemory, manifestCacheThreshold); + boolean manifestCacheSoftValues = options.get(CACHE_MANIFEST_SOFT_VALUES); + this.manifestCache = + SegmentsCache.create( + (int) CoreOptions.PAGE_SIZE.defaultValue().getBytes(), + manifestMaxMemory, + manifestCacheThreshold, + expireAfterAccess, + manifestCacheSoftValues); this.cachedPartitionMaxNum = options.get(CACHE_PARTITION_MAX_NUM); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SegmentsCache.java b/paimon-core/src/main/java/org/apache/paimon/utils/SegmentsCache.java index 12f65036bf09..cfea1ea219bc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SegmentsCache.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SegmentsCache.java @@ -26,6 +26,8 @@ import javax.annotation.Nullable; +import java.time.Duration; + import static org.apache.paimon.CoreOptions.PAGE_SIZE; /** Cache {@link Segments}. */ @@ -37,18 +39,41 @@ public class SegmentsCache { private final Cache cache; private final MemorySize maxMemorySize; private final long maxElementSize; + @Nullable private final Duration expireAfterAccess; + private final boolean softValues; public SegmentsCache(int pageSize, MemorySize maxMemorySize, long maxElementSize) { + this(pageSize, maxMemorySize, maxElementSize, null, true); + } + + public SegmentsCache( + int pageSize, + MemorySize maxMemorySize, + long maxElementSize, + @Nullable Duration expireAfterAccess, + boolean softValues) { this.pageSize = pageSize; - this.cache = + Caffeine builder = Caffeine.newBuilder() - .softValues() .weigher(this::weigh) .maximumWeight(maxMemorySize.getBytes()) - .executor(Runnable::run) - .build(); + .executor(Runnable::run); + // No idle TTL is applied unless one is explicitly supplied, preserving the original + // behaviour where entries are only evicted by weight (or GC, when soft values are on). + if (expireAfterAccess != null) { + builder.expireAfterAccess(expireAfterAccess); + } + // When soft values are enabled, entries may be reclaimed by the GC under memory pressure, + // which can trigger a cache-thrash spiral. Disabling them pins the working set with strong + // references, breaking the spiral at the cost of deterministic heap occupancy. + if (softValues) { + builder.softValues(); + } + this.cache = builder.build(); this.maxMemorySize = maxMemorySize; this.maxElementSize = maxElementSize; + this.expireAfterAccess = expireAfterAccess; + this.softValues = softValues; } public int pageSize() { @@ -63,6 +88,15 @@ public long maxElementSize() { return maxElementSize; } + @Nullable + public Duration ttl() { + return expireAfterAccess; + } + + public boolean softValues() { + return softValues; + } + @Nullable public Segments getIfPresents(T key) { return cache.getIfPresent(key); @@ -84,11 +118,22 @@ public static SegmentsCache create(MemorySize maxMemorySize, long maxElem @Nullable public static SegmentsCache create( int pageSize, MemorySize maxMemorySize, long maxElementSize) { + return create(pageSize, maxMemorySize, maxElementSize, null, true); + } + + @Nullable + public static SegmentsCache create( + int pageSize, + MemorySize maxMemorySize, + long maxElementSize, + @Nullable Duration expireAfterAccess, + boolean softValues) { if (maxMemorySize.getBytes() == 0) { return null; } - return new SegmentsCache<>(pageSize, maxMemorySize, maxElementSize); + return new SegmentsCache<>( + pageSize, maxMemorySize, maxElementSize, expireAfterAccess, softValues); } public long estimatedSize() { diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java index 53928aef66bf..c28299b79e38 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java @@ -70,6 +70,7 @@ import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_MAX_MEMORY; import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_MEMORY; import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_THRESHOLD; +import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SOFT_VALUES; import static org.apache.paimon.options.CatalogOptions.CACHE_PARTITION_MAX_NUM; import static org.apache.paimon.options.CatalogOptions.CACHE_SNAPSHOT_MAX_NUM_PER_TABLE; import static org.assertj.core.api.Assertions.assertThat; @@ -537,5 +538,16 @@ public void testManifestCacheOptions() { caching = (CachingCatalog) CachingCatalog.tryToCreate(catalog, options); assertThat(caching.manifestCache.maxMemorySize()).isEqualTo(MemorySize.ofMebiBytes(256)); assertThat(caching.manifestCache.maxElementSize()).isEqualTo(Long.MAX_VALUE); + + // soft values default to on and the manifest cache inherits the catalog idle TTL + assertThat(caching.manifestCache.softValues()).isTrue(); + assertThat(caching.manifestCache.ttl()).isEqualTo(CACHE_EXPIRE_AFTER_ACCESS.defaultValue()); + + // soft values can be turned off to opt into strong references; the cache still inherits + // the catalog idle TTL + options.set(CACHE_MANIFEST_SOFT_VALUES, false); + caching = (CachingCatalog) CachingCatalog.tryToCreate(catalog, options); + assertThat(caching.manifestCache.softValues()).isFalse(); + assertThat(caching.manifestCache.ttl()).isEqualTo(CACHE_EXPIRE_AFTER_ACCESS.defaultValue()); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/SegmentsCacheTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/SegmentsCacheTest.java new file mode 100644 index 000000000000..5b2e704548f8 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/utils/SegmentsCacheTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.data.SingleSegments; +import org.apache.paimon.memory.MemorySegment; +import org.apache.paimon.options.MemorySize; + +import org.junit.jupiter.api.Test; + +import java.time.Duration; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link SegmentsCache}. */ +public class SegmentsCacheTest { + + @Test + public void testDefaultsSoftValuesEnabledAndNoTtl() { + SegmentsCache cache = + new SegmentsCache<>(1024, MemorySize.ofKibiBytes(64), Long.MAX_VALUE); + assertThat(cache.softValues()).isTrue(); + assertThat(cache.ttl()).isNull(); + } + + @Test + public void testGettersReflectConstructorArgs() { + Duration ttl = Duration.ofMinutes(5); + SegmentsCache cache = + new SegmentsCache<>(1024, MemorySize.ofKibiBytes(64), 100L, ttl, false); + assertThat(cache.softValues()).isFalse(); + assertThat(cache.ttl()).isEqualTo(ttl); + assertThat(cache.pageSize()).isEqualTo(1024); + assertThat(cache.maxElementSize()).isEqualTo(100L); + assertThat(cache.maxMemorySize()).isEqualTo(MemorySize.ofKibiBytes(64)); + } + + @Test + public void testCreateReturnsNullWhenMemoryZero() { + assertThat(SegmentsCache.create(1024, MemorySize.ofBytes(0), Long.MAX_VALUE, null, false)) + .isNull(); + } + + @Test + public void testCreatePassesThroughTtlAndSoftValues() { + Duration ttl = Duration.ofMinutes(7); + SegmentsCache cache = + SegmentsCache.create(2048, MemorySize.ofKibiBytes(64), 100L, ttl, false); + assertThat(cache).isNotNull(); + assertThat(cache.ttl()).isEqualTo(ttl); + assertThat(cache.softValues()).isFalse(); + assertThat(cache.pageSize()).isEqualTo(2048); + } + + @Test + public void testCreateDefaultOverloadHasNoTtlAndSoftValues() { + SegmentsCache cache = + SegmentsCache.create(1024, MemorySize.ofKibiBytes(64), Long.MAX_VALUE); + assertThat(cache).isNotNull(); + assertThat(cache.ttl()).isNull(); + assertThat(cache.softValues()).isTrue(); + } + + @Test + public void testStrongRefsAreBoundedByWeight() { + // With soft values disabled the cache holds strong references, so the only thing keeping + // it bounded is weight-based (SIZE) eviction. Insert far more than the budget allows and + // assert the retained footprint stays within the configured maximum. + MemorySize budget = MemorySize.ofKibiBytes(8); + SegmentsCache cache = + new SegmentsCache<>(1024, budget, Long.MAX_VALUE, null, false); + for (int i = 0; i < 100; i++) { + cache.put("k" + i, new SingleSegments(MemorySegment.allocateHeapMemory(1024), 1024)); + } + assertThat(cache.totalCacheBytes()).isLessThanOrEqualTo(budget.getBytes()); + assertThat(cache.estimatedSize()).isLessThan(100); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java index 61c741fee288..678471ea337c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java @@ -500,6 +500,32 @@ public class FlinkConnectorOptions { .withDescription( "Controls the cache memory of writer coordinator to cache manifest files in Job Manager."); + public static final ConfigOption SINK_WRITER_COORDINATOR_CACHE_EXPIRE_AFTER_ACCESS = + key("sink.writer-coordinator.cache-expire-after-access") + .durationType() + .noDefaultValue() + .withDescription( + "Optional idle TTL for writer coordinator manifest cache entries. " + + "Disabled by default. When set, an entry that has not been " + + "accessed within this duration is evicted, releasing its heap. " + + "The cache stays bounded by 'sink.writer-coordinator.cache-memory' " + + "regardless of this setting."); + + public static final ConfigOption SINK_WRITER_COORDINATOR_CACHE_SOFT_VALUES = + key("sink.writer-coordinator.cache-soft-values") + .booleanType() + .defaultValue(true) + .withDescription( + "If true (default), writer coordinator manifest cache entries are held " + + "with soft references and may be reclaimed by the GC under " + + "memory pressure. This can trigger a cache-thrash spiral " + + "where reclaimed entries are refetched, spiking heap and " + + "forcing further reclamation. Set to false to hold entries " + + "with strong references, breaking the spiral; the cache then " + + "stays bounded by weight up to " + + "'sink.writer-coordinator.cache-memory' (size the Job Manager " + + "total heap memory to at least roughly twice that value)."); + public static final ConfigOption SINK_WRITER_COORDINATOR_PAGE_SIZE = key("sink.writer-coordinator.page-size") .memoryType() @@ -507,6 +533,18 @@ public class FlinkConnectorOptions { .withDescription( "Controls the page size for one RPC request of writer coordinator."); + public static final ConfigOption SINK_WRITER_COORDINATOR_PREFETCH_MANIFESTS = + key("sink.writer-coordinator.prefetch-manifests") + .booleanType() + .defaultValue(false) + .withDescription( + "If true, the writer coordinator eagerly reads all data manifests of the " + + "latest snapshot during refresh to warm the in-Job-Manager manifest " + + "cache. This avoids many concurrent cold manifest reads when " + + "high-parallelism writers restore at the same time, reducing Job " + + "Manager heap pressure at the cost of one full manifest read per " + + "refresh."); + public static final ConfigOption FILESYSTEM_JOB_LEVEL_SETTINGS_ENABLED = key("filesystem.job-level-settings.enabled") .booleanType() diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinator.java index 0f315880e932..337a5707b4a3 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinator.java @@ -27,7 +27,9 @@ import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.operation.FileStoreScan; import org.apache.paimon.operation.WriteRestore; +import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.Filter; import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine; @@ -59,6 +61,7 @@ public class TableWriteCoordinator { private final FileStoreScan scan; private final IndexFileHandler indexFileHandler; private final int pageSize; + private final boolean prefetchManifests; private final Cache pagedCoordination; private volatile Snapshot snapshot; @@ -78,6 +81,10 @@ public TableWriteCoordinator(FileStoreTable table) { .toConfiguration() .get(FlinkConnectorOptions.SINK_WRITER_COORDINATOR_PAGE_SIZE) .getBytes(); + this.prefetchManifests = + table.coreOptions() + .toConfiguration() + .get(FlinkConnectorOptions.SINK_WRITER_COORDINATOR_PREFETCH_MANIFESTS); this.pagedCoordination = Caffeine.newBuilder() .executor(Runnable::run) @@ -93,6 +100,16 @@ private synchronized void refresh() { } this.snapshot = latestSnapshot.get(); this.scan.withSnapshot(snapshot); + if (prefetchManifests) { + // Eagerly read all data manifests of the current snapshot once to warm the + // table's SegmentsCache (the byte-level manifest cache attached to the table + // inside the Job Manager). This reuses the same threaded `plan()` read path + // that per-task `scan` requests use, so subsequent concurrent requests hit + // warm bytes instead of each performing a cold manifest read. + scan.withPartitionFilter(PartitionPredicate.ALWAYS_TRUE) + .withBucketFilter(Filter.alwaysTrue()) + .plan(); + } } public synchronized PagedCoordinationResponse scan(PagedCoordinationRequest request) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/WriteOperatorCoordinator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/WriteOperatorCoordinator.java index 334333fe760b..3fe12cac2d56 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/WriteOperatorCoordinator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/WriteOperatorCoordinator.java @@ -18,9 +18,11 @@ package org.apache.paimon.flink.sink.coordinator; +import org.apache.paimon.CoreOptions; import org.apache.paimon.flink.sink.TableWriteOperator; import org.apache.paimon.fs.Path; import org.apache.paimon.options.MemorySize; +import org.apache.paimon.options.Options; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.utils.SegmentsCache; @@ -31,11 +33,14 @@ import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ThreadPoolExecutor; +import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_WRITER_COORDINATOR_CACHE_EXPIRE_AFTER_ACCESS; import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_WRITER_COORDINATOR_CACHE_MEMORY; +import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_WRITER_COORDINATOR_CACHE_SOFT_VALUES; import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool; /** @@ -56,13 +61,28 @@ public WriteOperatorCoordinator(FileStoreTable table) { @Override public void start() throws Exception { executor = createCachedThreadPool(1, "WriteCoordinator"); - MemorySize cacheMemory = - table.coreOptions().toConfiguration().get(SINK_WRITER_COORDINATOR_CACHE_MEMORY); - SegmentsCache manifestCache = SegmentsCache.create(cacheMemory, Long.MAX_VALUE); - table.setManifestCache(manifestCache); + table.setManifestCache(buildManifestCache(table.coreOptions().toConfiguration())); coordinator = new TableWriteCoordinator(table); } + /** + * Build the writer coordinator manifest cache from the given options. The idle TTL is disabled + * unless {@code sink.writer-coordinator.cache-expire-after-access} is set; the cache stays + * bounded by weight up to {@code sink.writer-coordinator.cache-memory} either way. + */ + static SegmentsCache buildManifestCache(Options tableOptions) { + MemorySize cacheMemory = tableOptions.get(SINK_WRITER_COORDINATOR_CACHE_MEMORY); + Duration cacheExpireAfterAccess = + tableOptions.get(SINK_WRITER_COORDINATOR_CACHE_EXPIRE_AFTER_ACCESS); + boolean cacheSoftValues = tableOptions.get(SINK_WRITER_COORDINATOR_CACHE_SOFT_VALUES); + return SegmentsCache.create( + (int) CoreOptions.PAGE_SIZE.defaultValue().getBytes(), + cacheMemory, + Long.MAX_VALUE, + cacheExpireAfterAccess, + cacheSoftValues); + } + @Override public void close() throws Exception { if (executor != null) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinatorTest.java index 8bc952f9baf4..580a4eb03d57 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinatorTest.java @@ -21,16 +21,26 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.GenericRow; +import org.apache.paimon.flink.FlinkConnectorOptions; +import org.apache.paimon.fs.Path; +import org.apache.paimon.options.MemorySize; +import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.TableTestBase; import org.apache.paimon.types.DataTypes; +import org.apache.paimon.utils.SegmentsCache; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import java.time.Duration; + import static org.apache.paimon.data.BinaryRow.EMPTY_ROW; +import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_WRITER_COORDINATOR_CACHE_EXPIRE_AFTER_ACCESS; +import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_WRITER_COORDINATOR_CACHE_MEMORY; +import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_WRITER_COORDINATOR_CACHE_SOFT_VALUES; import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -66,6 +76,81 @@ public void testLatestIdentifierAndScan(boolean initSnapshot) throws Exception { assertThat(scan.extractDataFiles().size()).isEqualTo(initSnapshot ? 2 : 1); } + @Test + public void testPrefetchManifestsWarmsCache() throws Exception { + Identifier identifier = new Identifier("db", "table"); + Schema schema = + Schema.newBuilder() + .column("f0", DataTypes.INT()) + .option( + FlinkConnectorOptions.SINK_WRITER_COORDINATOR_PREFETCH_MANIFESTS + .key(), + "true") + .build(); + catalog.createDatabase("db", false); + catalog.createTable(identifier, schema, false); + FileStoreTable table = getTable(identifier); + + write(table, GenericRow.of(1)); + write(table, GenericRow.of(2)); + + // reset the manifest cache to a fresh, cold instance (the writes above may have populated + // it) so we can assert that constructing the coordinator is what warms it + // the existing cache on the table comes from CachingCatalog, which is distinct from + // TableWriteCoordinator + SegmentsCache cache = table.getManifestCache(); + table.setManifestCache( + SegmentsCache.create( + cache.pageSize(), + cache.maxMemorySize(), + cache.maxElementSize(), + cache.ttl(), + cache.softValues())); + assertThat(table.getManifestCache().totalCacheBytes()).isZero(); + + // constructing the coordinator runs refresh() which warms the manifest cache when the + // prefetch option is enabled + TableWriteCoordinator coordinator = new TableWriteCoordinator(table); + assertThat(table.getManifestCache().totalCacheBytes()).isGreaterThan(0); + + // scan results remain correct after warming + ScanCoordinationRequest request = + new ScanCoordinationRequest(serializeBinaryRow(EMPTY_ROW), 0, false, false); + ScanCoordinationResponse scan = coordinator.scan(request); + assertThat(scan.snapshot().id()).isEqualTo(table.latestSnapshot().get().id()); + assertThat(scan.extractDataFiles().size()).isEqualTo(2); + } + + @Test + public void testBuildManifestCacheOptions() { + // by default soft values are on and there is no idle TTL; the cache is bounded by memory + Options defaults = new Options(); + SegmentsCache cache = WriteOperatorCoordinator.buildManifestCache(defaults); + assertThat(cache.softValues()).isTrue(); + assertThat(cache.ttl()).isNull(); + assertThat(cache.maxMemorySize()) + .isEqualTo(SINK_WRITER_COORDINATOR_CACHE_MEMORY.defaultValue()); + + // an explicit expire-after-access TTL is honored + Options withTtl = new Options(); + withTtl.set(SINK_WRITER_COORDINATOR_CACHE_EXPIRE_AFTER_ACCESS, Duration.ofMinutes(5)); + cache = WriteOperatorCoordinator.buildManifestCache(withTtl); + assertThat(cache.ttl()).isEqualTo(Duration.ofMinutes(5)); + assertThat(cache.softValues()).isTrue(); + + // disabling soft values switches to strong references; still no TTL by default + Options strongRefs = new Options(); + strongRefs.set(SINK_WRITER_COORDINATOR_CACHE_SOFT_VALUES, false); + cache = WriteOperatorCoordinator.buildManifestCache(strongRefs); + assertThat(cache.softValues()).isFalse(); + assertThat(cache.ttl()).isNull(); + + // a zero cache memory disables the cache entirely + Options noCache = new Options(); + noCache.set(SINK_WRITER_COORDINATOR_CACHE_MEMORY, MemorySize.ofBytes(0)); + assertThat(WriteOperatorCoordinator.buildManifestCache(noCache)).isNull(); + } + @Test public void testNoManifestCache() throws Exception { Identifier identifier = new Identifier("db", "table"); From ca0b76e5cb77730283b4320acbfe1f4a5cf69666 Mon Sep 17 00:00:00 2001 From: mao-liu <1684060+mao-liu@users.noreply.github.com> Date: Wed, 10 Jun 2026 13:32:14 +1000 Subject: [PATCH 2/3] spotless --- .../org/apache/paimon/benchmark/WriteRestoreScanBenchmark.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/WriteRestoreScanBenchmark.java b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/WriteRestoreScanBenchmark.java index 8a49baeed580..9edc73ed6dc6 100644 --- a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/WriteRestoreScanBenchmark.java +++ b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/WriteRestoreScanBenchmark.java @@ -595,7 +595,8 @@ private void printAggregateFootprint( printDiskLine(disk); if (sc.count > 0) { System.out.printf( - " SegmentsCache bytes avg=%,d, min=%,d, max=%,d%n", sc.avg(), sc.min, sc.max); + " SegmentsCache bytes avg=%,d, min=%,d, max=%,d%n", + sc.avg(), sc.min, sc.max); } else { System.out.println( " SegmentsCache n/a (no manifest cache attached to table — cache disabled)"); From 709c79b808b20f3046c017f73f5e488664ed7002 Mon Sep 17 00:00:00 2001 From: mao-liu <1684060+mao-liu@users.noreply.github.com> Date: Thu, 11 Jun 2026 09:59:38 +1000 Subject: [PATCH 3/3] fix cache prefetch with new scan instance --- .../coordinator/TableWriteCoordinator.java | 16 +++-- .../TableWriteCoordinatorTest.java | 64 +++++++++++++++++++ 2 files changed, 73 insertions(+), 7 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinator.java index 337a5707b4a3..1cff11eeffc1 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinator.java @@ -27,9 +27,7 @@ import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.operation.FileStoreScan; import org.apache.paimon.operation.WriteRestore; -import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.utils.Filter; import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine; @@ -103,12 +101,16 @@ private synchronized void refresh() { if (prefetchManifests) { // Eagerly read all data manifests of the current snapshot once to warm the // table's SegmentsCache (the byte-level manifest cache attached to the table - // inside the Job Manager). This reuses the same threaded `plan()` read path + // inside the Job Manager). This uses the same threaded `plan()` read path // that per-task `scan` requests use, so subsequent concurrent requests hit - // warm bytes instead of each performing a cold manifest read. - scan.withPartitionFilter(PartitionPredicate.ALWAYS_TRUE) - .withBucketFilter(Filter.alwaysTrue()) - .plan(); + // warm bytes instead of each performing a cold manifest read. A fresh scan is + // used so the shared request `scan`'s bucket/partition state never narrows the + // warm-up. + FileStoreScan prefetchScan = table.store().newScan().withSnapshot(snapshot); + if (table.coreOptions().manifestDeleteFileDropStats()) { + prefetchScan.dropStats(); + } + prefetchScan.plan(); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinatorTest.java index 580a4eb03d57..16367204b6c3 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinatorTest.java @@ -18,11 +18,13 @@ package org.apache.paimon.flink.sink.coordinator; +import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.GenericRow; import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.fs.Path; +import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; @@ -35,7 +37,10 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import java.lang.reflect.Field; import java.time.Duration; +import java.util.List; +import java.util.stream.Collectors; import static org.apache.paimon.data.BinaryRow.EMPTY_ROW; import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_WRITER_COORDINATOR_CACHE_EXPIRE_AFTER_ACCESS; @@ -121,6 +126,65 @@ public void testPrefetchManifestsWarmsCache() throws Exception { assertThat(scan.extractDataFiles().size()).isEqualTo(2); } + @Test + public void testPrefetchWarmsAllManifestsAfterScan() throws Exception { + Identifier identifier = new Identifier("db", "table"); + // a fixed-bucket table so the data spans multiple buckets + Schema schema = + Schema.newBuilder() + .column("f0", DataTypes.INT()) + .option(CoreOptions.BUCKET.key(), "2") + .option(CoreOptions.BUCKET_KEY.key(), "f0") + .build(); + catalog.createDatabase("db", false); + catalog.createTable(identifier, schema, false); + FileStoreTable table = getTable(identifier); + + // write each bucket in its own commit so manifests are confined to a single bucket: a scan + // for one bucket must skip the other bucket's manifest at the manifest-file level + writeWithBucketAssigner(table, row -> 0, GenericRow.of(1)); + writeWithBucketAssigner(table, row -> 1, GenericRow.of(2)); + + // the scan returns the entries of both buckets, confirming the table spans more than one + // bucket + Snapshot latest = table.latestSnapshot().get(); + List entries = table.store().newScan().withSnapshot(latest).plan().files(); + assertThat(entries.stream().map(ManifestEntry::bucket).collect(Collectors.toSet())) + .containsExactlyInAnyOrder(0, 1); + + // construct the coordinator with prefetch disabled (the default) on a cold cache, so the + // cache its shared scan is bound to stays cold until the scan request runs + SegmentsCache cache = table.getManifestCache(); + table.setManifestCache( + SegmentsCache.create( + cache.pageSize(), + cache.maxMemorySize(), + cache.maxElementSize(), + cache.ttl(), + cache.softValues())); + TableWriteCoordinator coordinator = new TableWriteCoordinator(table); + assertThat(table.getManifestCache().totalCacheBytes()).isZero(); + + // a scan request for bucket 0 reads only the bucket-0 manifest, skipping the bucket-1 + // manifest at the manifest-file level: the cache therefore holds only a single bucket's + // manifest, proving the bucket filter is active (and leaving the stale bucket state on the + // shared scan) + ScanCoordinationRequest request = + new ScanCoordinationRequest(serializeBinaryRow(EMPTY_ROW), 0, false, false); + coordinator.scan(request); + long filteredCacheBytes = table.getManifestCache().totalCacheBytes(); + assertThat(filteredCacheBytes).isGreaterThan(0); + + // enable prefetch via reflection (to avoid widening the coordinator's interface) and run a + // checkpoint refresh; the prefetch must warm the full set of manifests rather than + // inheriting the stale bucket state, so the cache grows beyond the single-bucket subset + Field prefetchManifests = TableWriteCoordinator.class.getDeclaredField("prefetchManifests"); + prefetchManifests.setAccessible(true); + prefetchManifests.setBoolean(coordinator, true); + coordinator.checkpoint(); + assertThat(table.getManifestCache().totalCacheBytes()).isGreaterThan(filteredCacheBytes); + } + @Test public void testBuildManifestCacheOptions() { // by default soft values are on and there is no idle TTL; the cache is bounded by memory