Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/source/user_guide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ User Guide
user_guide/schema
user_guide/snapshot
user_guide/manifest
user_guide/manifest_cache
user_guide/data_types
user_guide/primary_key_table
user_guide/append_only_table
Expand Down
85 changes: 85 additions & 0 deletions docs/source/user_guide/manifest_cache.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
.. Copyright 2024-present Alibaba Inc.

.. Licensed under the Apache License, Version 2.0 (the "License");
.. you may not use this file except in compliance with the License.
.. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing, software
.. distributed under the License is distributed on an "AS IS" BASIS,
.. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
.. See the License for the specific language governing permissions and
.. limitations under the License.

Manifest Cache
==============

Overview
--------

Manifest cache stores raw manifest file bytes at the ``ObjectsFile<T>::Read()``
layer. It covers data manifests, manifest lists, and index manifests because
they all read through ``ObjectsFile<T>``.

For repeated ``get``, ``scan``, or batch ``get/scan -f`` requests in the same
process, the same snapshot often reads the same manifest files repeatedly. On a
cache hit, the read path skips remote filesystem ``open/read``, builds an
in-memory input stream from cached bytes, and still runs the format reader,
Arrow decoding, and object deserialization. This design primarily reduces
remote IO latency and bandwidth while keeping cache weight aligned with the
actual cached bytes.

Configuration
-------------

.. code-block:: text

manifest.cache-max-memory-size=64MB

The default value is ``64MB``. Set it to ``0`` to disable manifest cache.

Implementation Notes
--------------------

- The cache key is the full manifest file path with offset ``0`` and a whole-file
sentinel length, so cache hits do not need an extra file-status lookup.
- The cache value stores full manifest file bytes in a ``MemorySegment``.
- The cache weight is the ``MemorySegment`` size, and capacity is controlled by
``manifest.cache-max-memory-size``.
- The cache instance reuses the existing ``CacheManager``. ``CoreOptions``
initializes it eagerly from ``manifest.cache-max-memory-size`` and upper
layers pass it explicitly to ``ObjectsFile<T>``.
- On a cache hit, ``MemorySegmentInputStream`` reads cached bytes and keeps the
underlying ``MemorySegment`` alive without copying bytes. Caller filters are
applied after records are decoded.
- The cache is process-local LRU storage and is not shared across processes.
- Reads bypass the cache while ``IOHook`` is active, so IO fault-injection tests
still observe real IO operations.
- ``Read()`` keeps its existing append semantics, which is required when base
and delta manifests are loaded into the same result vector.

Test Coverage
-------------

- ``CoreOptionsTest`` validates the ``64MB`` default and the ``0`` disable
switch.
- ``ManifestFileTest.TestReadUsesManifestCache`` validates cache hits on
repeated reads, avoids repeated file opens, and checks filter behavior after
cached bytes are decoded.
- ``ManifestFileTest.TestManifestCacheCanBeDisabled`` validates that disabled
cache still reopens files.
- ``ManifestFileTest.TestManifestCacheBypassesWhenIOHookActive`` validates cache
bypass while ``IOHook`` is active.

Future Optimizations
--------------------

- Add hit, miss, bypass, and eviction metrics to read trace or metrics.
- Isolate capacity by table path or catalog to prevent one large table from
evicting unrelated manifest bytes.
- Add single-flight loading for high-concurrency misses on the same manifest
path.
- Evaluate a decoded-records second-level cache, configurable as a
CPU-vs-memory tradeoff.

4 changes: 4 additions & 0 deletions include/paimon/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ struct PAIMON_EXPORT Options {
/// compaction of manifest, default value is 16MB.
static const char MANIFEST_FULL_COMPACTION_FILE_SIZE[];

/// "manifest.cache-max-memory-size" - Maximum process-local memory used to cache raw manifest
/// file bytes. Set to 0 to disable. Default value is 64MB.
static const char MANIFEST_CACHE_MAX_MEMORY_SIZE[];

/// "source.split.target-size" - Target size of a source split when scanning a bucket. Default
/// value is 128MB.
static const char SOURCE_SPLIT_TARGET_SIZE[];
Expand Down
1 change: 1 addition & 0 deletions src/paimon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ set(PAIMON_COMMON_SRCS
common/io/byte_array_input_stream.cpp
common/io/data_input_stream.cpp
common/io/data_output_stream.cpp
common/io/memory_segment_input_stream.cpp
common/io/memory_segment_output_stream.cpp
common/io/offset_input_stream.cpp
common/io/cache/cache_key.cpp
Expand Down
1 change: 1 addition & 0 deletions src/paimon/common/defs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const char Options::MANIFEST_COMPRESSION[] = "manifest.compression";
const char Options::MANIFEST_MERGE_MIN_COUNT[] = "manifest.merge-min-count";
const char Options::MANIFEST_FULL_COMPACTION_FILE_SIZE[] =
"manifest.full-compaction-threshold-size";
const char Options::MANIFEST_CACHE_MAX_MEMORY_SIZE[] = "manifest.cache-max-memory-size";
const char Options::SOURCE_SPLIT_TARGET_SIZE[] = "source.split.target-size";
const char Options::SOURCE_SPLIT_OPEN_FILE_COST[] = "source.split.open-file-cost";
const char Options::SCAN_SNAPSHOT_ID[] = "scan.snapshot-id";
Expand Down
8 changes: 8 additions & 0 deletions src/paimon/common/factories/io_hook.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ class IOHook::Impl {
return io_count_.load();
}

bool IsActive() const {
return pos_.load() >= 0;
}

void Clear() {
Reset(-1, IOHook::Mode::SILENT);
}
Expand All @@ -77,6 +81,10 @@ int64_t IOHook::IOCount() const {
return impl_->IOCount();
}

bool IOHook::IsActive() const {
return impl_->IsActive();
}

void IOHook::Clear() {
return impl_->Clear();
}
Expand Down
3 changes: 3 additions & 0 deletions src/paimon/common/factories/io_hook.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ class PAIMON_EXPORT IOHook : public Singleton<IOHook> {
/// @return The number of IO operations executed.
int64_t IOCount() const;

/// Whether IOHook currently has an injection position configured.
bool IsActive() const;

/// Clear the state of the IOHook, including resetting IO count and
/// any stored exception state.
void Clear();
Expand Down
24 changes: 24 additions & 0 deletions src/paimon/common/io/memory_segment_input_stream.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2026-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "paimon/common/io/memory_segment_input_stream.h"

namespace paimon {

MemorySegmentInputStream::MemorySegmentInputStream(const MemorySegment& segment)
: ByteArrayInputStream(segment.Data(), segment.Size()), segment_(segment) {}

} // namespace paimon
33 changes: 33 additions & 0 deletions src/paimon/common/io/memory_segment_input_stream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2026-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "paimon/common/memory/memory_segment.h"
#include "paimon/io/byte_array_input_stream.h"

namespace paimon {

/// Input stream for a MemorySegment. The stream keeps the segment alive and does not copy bytes.
class MemorySegmentInputStream : public ByteArrayInputStream {
public:
explicit MemorySegmentInputStream(const MemorySegment& segment);

private:
MemorySegment segment_;
};

} // namespace paimon
3 changes: 2 additions & 1 deletion src/paimon/core/append/append_compact_coordinator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ Result<std::unique_ptr<FileStoreScan>> CreateFileStoreScan(
PAIMON_ASSIGN_OR_RAISE(
std::shared_ptr<ManifestList> manifest_list,
ManifestList::Create(core_options.GetFileSystem(), core_options.GetManifestFormat(),
core_options.GetManifestCompression(), path_factory, pool));
core_options.GetManifestCompression(), path_factory, pool,
core_options.GetManifestCacheManager()));
PAIMON_ASSIGN_OR_RAISE(
std::shared_ptr<ManifestFile> manifest_file,
ManifestFile::Create(core_options.GetFileSystem(), core_options.GetManifestFormat(),
Expand Down
28 changes: 27 additions & 1 deletion src/paimon/core/core_options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "fmt/format.h"
#include "paimon/common/fs/resolving_file_system.h"
#include "paimon/common/io/cache/cache_manager.h"
#include "paimon/common/options/memory_size.h"
#include "paimon/common/options/time_duration.h"
#include "paimon/common/utils/path_util.h"
Expand Down Expand Up @@ -363,6 +364,7 @@ struct CoreOptions::Impl {
int64_t source_split_target_size = 128 * 1024 * 1024;
int64_t source_split_open_file_cost = 4 * 1024 * 1024;
int64_t manifest_target_file_size = 8 * 1024 * 1024;
int64_t manifest_cache_max_memory_size = 64 * 1024 * 1024;
int64_t deletion_vector_target_file_size = 2 * 1024 * 1024;
int64_t manifest_full_compaction_file_size = 16 * 1024 * 1024;
int64_t write_buffer_size = 256 * 1024 * 1024;
Expand All @@ -371,6 +373,7 @@ struct CoreOptions::Impl {
std::shared_ptr<FileFormat> file_format;
std::shared_ptr<FileSystem> file_system;
std::shared_ptr<FileFormat> manifest_file_format;
std::shared_ptr<CacheManager> manifest_cache_manager;

std::optional<int64_t> scan_snapshot_id;
std::optional<int64_t> scan_timestamp_millis;
Expand Down Expand Up @@ -601,9 +604,21 @@ struct CoreOptions::Impl {
// Parse manifest.full-compaction-threshold-size - size threshold for full compaction
PAIMON_RETURN_NOT_OK(parser.ParseMemorySize(Options::MANIFEST_FULL_COMPACTION_FILE_SIZE,
&manifest_full_compaction_file_size));
// Parse manifest.cache-max-memory-size - raw manifest file bytes cache capacity
PAIMON_RETURN_NOT_OK(parser.ParseMemorySize(Options::MANIFEST_CACHE_MAX_MEMORY_SIZE,
&manifest_cache_max_memory_size));
return Status::OK();
}

void InitManifestCacheManager() {
if (manifest_cache_max_memory_size <= 0) {
manifest_cache_manager = nullptr;
return;
}
manifest_cache_manager = std::make_shared<CacheManager>(manifest_cache_max_memory_size,
/*high_priority_pool_ratio=*/0.0);
}

// Parse snapshot expiration and retention configurations.
Status ParseExpireOptions(const ConfigParser& parser) {
// Parse snapshot.num-retained.min - minimum completed snapshots to retain, default 10
Expand Down Expand Up @@ -864,11 +879,14 @@ Result<CoreOptions> CoreOptions::FromMap(
PAIMON_RETURN_NOT_OK(impl->ParseIndexOptions(parser));
PAIMON_RETURN_NOT_OK(impl->ParseCompactionOptions(parser));
PAIMON_RETURN_NOT_OK(impl->ParseLookupOptions(parser));
impl->InitManifestCacheManager();

return options;
}

CoreOptions::CoreOptions() : impl_(std::make_unique<Impl>()) {}
CoreOptions::CoreOptions() : impl_(std::make_unique<Impl>()) {
impl_->InitManifestCacheManager();
}

CoreOptions::CoreOptions(const CoreOptions& rhs)
: impl_(std::make_unique<Impl>(*(rhs.impl_.get()))) {}
Expand Down Expand Up @@ -967,6 +985,14 @@ int64_t CoreOptions::GetManifestTargetFileSize() const {
return impl_->manifest_target_file_size;
}

int64_t CoreOptions::GetManifestCacheMaxMemorySize() const {
return impl_->manifest_cache_max_memory_size;
}

std::shared_ptr<CacheManager> CoreOptions::GetManifestCacheManager() const {
return impl_->manifest_cache_manager;
}

int32_t CoreOptions::GetManifestMergeMinCount() const {
return impl_->manifest_merge_min_count;
}
Expand Down
3 changes: 3 additions & 0 deletions src/paimon/core/core_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

namespace paimon {

class CacheManager;
class ExpireConfig;

class PAIMON_EXPORT CoreOptions {
Expand Down Expand Up @@ -77,6 +78,8 @@ class PAIMON_EXPORT CoreOptions {
std::optional<int64_t> GetScanTimestampMillis() const;

int64_t GetManifestTargetFileSize() const;
int64_t GetManifestCacheMaxMemorySize() const;
std::shared_ptr<CacheManager> GetManifestCacheManager() const;
StartupMode GetStartupMode() const;

int32_t GetReadBatchSize() const;
Expand Down
8 changes: 8 additions & 0 deletions src/paimon/core/core_options_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ TEST(CoreOptionsTest, TestDefaultValue) {
ASSERT_EQ(8 * 1024 * 1024L, core_options.GetManifestTargetFileSize());
ASSERT_EQ(16 * 1024 * 1024L, core_options.GetManifestFullCompactionThresholdSize());
ASSERT_EQ(30, core_options.GetManifestMergeMinCount());
ASSERT_EQ(64 * 1024 * 1024L, core_options.GetManifestCacheMaxMemorySize());
ASSERT_NE(nullptr, core_options.GetManifestCacheManager());
ASSERT_EQ(128 * 1024 * 1024L, core_options.GetSourceSplitTargetSize());
ASSERT_EQ(4 * 1024 * 1024L, core_options.GetSourceSplitOpenFileCost());
ASSERT_EQ(1024, core_options.GetReadBatchSize());
Expand Down Expand Up @@ -166,6 +168,7 @@ TEST(CoreOptionsTest, TestFromMap) {
{Options::MANIFEST_TARGET_FILE_SIZE, "16MB"},
{Options::MANIFEST_FULL_COMPACTION_FILE_SIZE, "32MB"},
{Options::MANIFEST_MERGE_MIN_COUNT, "2"},
{Options::MANIFEST_CACHE_MAX_MEMORY_SIZE, "0"},
{Options::SOURCE_SPLIT_TARGET_SIZE, "24MB"},
{Options::SOURCE_SPLIT_OPEN_FILE_COST, "32MB"},
{Options::READ_BATCH_SIZE, "2048"},
Expand Down Expand Up @@ -283,6 +286,8 @@ TEST(CoreOptionsTest, TestFromMap) {
ASSERT_EQ(16 * 1024 * 1024L, core_options.GetManifestTargetFileSize());
ASSERT_EQ(32 * 1024 * 1024L, core_options.GetManifestFullCompactionThresholdSize());
ASSERT_EQ(2, core_options.GetManifestMergeMinCount());
ASSERT_EQ(0, core_options.GetManifestCacheMaxMemorySize());
ASSERT_EQ(nullptr, core_options.GetManifestCacheManager());
ASSERT_EQ(24 * 1024 * 1024L, core_options.GetSourceSplitTargetSize());
ASSERT_EQ(32 * 1024 * 1024L, core_options.GetSourceSplitOpenFileCost());
ASSERT_EQ(2048, core_options.GetReadBatchSize());
Expand Down Expand Up @@ -852,9 +857,11 @@ TEST(CoreOptionsTest, TestCopyAssignmentOperator) {

// Verify the target's ToMap matches the source's ToMap
ASSERT_EQ(source.ToMap(), target.ToMap());
ASSERT_EQ(source.GetManifestCacheManager(), target.GetManifestCacheManager());

CoreOptions target2 = source;
ASSERT_EQ(source.ToMap(), target2.ToMap());
ASSERT_EQ(source.GetManifestCacheManager(), target2.GetManifestCacheManager());
}

TEST(CoreOptionsTest, TestAssignmentIndependence) {
Expand Down Expand Up @@ -885,6 +892,7 @@ TEST(CoreOptionsTest, TestAssignmentIndependence) {
// Source should have new values
ASSERT_EQ(99, source.GetBucket());
ASSERT_EQ(MergeEngine::DEDUPLICATE, source.GetMergeEngine());
ASSERT_NE(source.GetManifestCacheManager(), target.GetManifestCacheManager());
}

TEST(CoreOptionsTest, TestFallback) {
Expand Down
11 changes: 6 additions & 5 deletions src/paimon/core/manifest/index_manifest_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,21 @@ Result<std::unique_ptr<IndexManifestFile>> IndexManifestFile::Create(

std::shared_ptr<PathFactory> index_manifest_file_factory =
path_factory->CreateIndexManifestFileFactory();
return std::unique_ptr<IndexManifestFile>(
new IndexManifestFile(file_system, reader_builder, writer_builder, compression,
index_manifest_file_factory, bucket_mode, pool));
return std::unique_ptr<IndexManifestFile>(new IndexManifestFile(
file_system, reader_builder, writer_builder, compression, index_manifest_file_factory,
bucket_mode, pool, options.GetManifestCacheManager()));
}

IndexManifestFile::IndexManifestFile(const std::shared_ptr<FileSystem>& file_system,
const std::shared_ptr<ReaderBuilder>& reader_builder,
const std::shared_ptr<WriterBuilder>& writer_builder,
const std::string& compression,
const std::shared_ptr<PathFactory>& path_factory,
int32_t bucket_mode, const std::shared_ptr<MemoryPool>& pool)
int32_t bucket_mode, const std::shared_ptr<MemoryPool>& pool,
const std::shared_ptr<CacheManager>& cache_manager)
: ObjectsFile<IndexManifestEntry>(file_system, reader_builder, writer_builder,
std::make_unique<IndexManifestEntrySerializer>(pool),
compression, path_factory, pool),
compression, path_factory, pool, cache_manager),
bucket_mode_(bucket_mode) {}

Result<std::optional<std::string>> IndexManifestFile::WriteIndexFiles(
Expand Down
Loading
Loading