diff --git a/docs/source/user_guide.rst b/docs/source/user_guide.rst index 85dd0b379..522a35a3c 100644 --- a/docs/source/user_guide.rst +++ b/docs/source/user_guide.rst @@ -24,6 +24,7 @@ User Guide user_guide/schema user_guide/snapshot user_guide/manifest + user_guide/manifest_cache user_guide/data_types user_guide/primary_key_table user_guide/append_only_table diff --git a/docs/source/user_guide/manifest_cache.rst b/docs/source/user_guide/manifest_cache.rst new file mode 100644 index 000000000..42dbefe5c --- /dev/null +++ b/docs/source/user_guide/manifest_cache.rst @@ -0,0 +1,85 @@ +.. Copyright 2024-present Alibaba Inc. + +.. Licensed under the Apache License, Version 2.0 (the "License"); +.. you may not use this file except in compliance with the License. +.. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, software +.. distributed under the License is distributed on an "AS IS" BASIS, +.. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +.. See the License for the specific language governing permissions and +.. limitations under the License. + +Manifest Cache +============== + +Overview +-------- + +Manifest cache stores raw manifest file bytes at the ``ObjectsFile::Read()`` +layer. It covers data manifests, manifest lists, and index manifests because +they all read through ``ObjectsFile``. + +For repeated ``get``, ``scan``, or batch ``get/scan -f`` requests in the same +process, the same snapshot often reads the same manifest files repeatedly. On a +cache hit, the read path skips remote filesystem ``open/read``, builds an +in-memory input stream from cached bytes, and still runs the format reader, +Arrow decoding, and object deserialization. This design primarily reduces +remote IO latency and bandwidth while keeping cache weight aligned with the +actual cached bytes. + +Configuration +------------- + +.. code-block:: text + + manifest.cache-max-memory-size=64MB + +The default value is ``64MB``. Set it to ``0`` to disable manifest cache. + +Implementation Notes +-------------------- + +- The cache key is the full manifest file path with offset ``0`` and a whole-file + sentinel length, so cache hits do not need an extra file-status lookup. +- The cache value stores full manifest file bytes in a ``MemorySegment``. +- The cache weight is the ``MemorySegment`` size, and capacity is controlled by + ``manifest.cache-max-memory-size``. +- The cache instance reuses the existing ``CacheManager``. ``CoreOptions`` + initializes it eagerly from ``manifest.cache-max-memory-size`` and upper + layers pass it explicitly to ``ObjectsFile``. +- On a cache hit, ``MemorySegmentInputStream`` reads cached bytes and keeps the + underlying ``MemorySegment`` alive without copying bytes. Caller filters are + applied after records are decoded. +- The cache is process-local LRU storage and is not shared across processes. +- Reads bypass the cache while ``IOHook`` is active, so IO fault-injection tests + still observe real IO operations. +- ``Read()`` keeps its existing append semantics, which is required when base + and delta manifests are loaded into the same result vector. + +Test Coverage +------------- + +- ``CoreOptionsTest`` validates the ``64MB`` default and the ``0`` disable + switch. +- ``ManifestFileTest.TestReadUsesManifestCache`` validates cache hits on + repeated reads, avoids repeated file opens, and checks filter behavior after + cached bytes are decoded. +- ``ManifestFileTest.TestManifestCacheCanBeDisabled`` validates that disabled + cache still reopens files. +- ``ManifestFileTest.TestManifestCacheBypassesWhenIOHookActive`` validates cache + bypass while ``IOHook`` is active. + +Future Optimizations +-------------------- + +- Add hit, miss, bypass, and eviction metrics to read trace or metrics. +- Isolate capacity by table path or catalog to prevent one large table from + evicting unrelated manifest bytes. +- Add single-flight loading for high-concurrency misses on the same manifest + path. +- Evaluate a decoded-records second-level cache, configurable as a + CPU-vs-memory tradeoff. + diff --git a/include/paimon/defs.h b/include/paimon/defs.h index 70f2aa0b0..d74155ca9 100644 --- a/include/paimon/defs.h +++ b/include/paimon/defs.h @@ -150,6 +150,10 @@ struct PAIMON_EXPORT Options { /// compaction of manifest, default value is 16MB. static const char MANIFEST_FULL_COMPACTION_FILE_SIZE[]; + /// "manifest.cache-max-memory-size" - Maximum process-local memory used to cache raw manifest + /// file bytes. Set to 0 to disable. Default value is 64MB. + static const char MANIFEST_CACHE_MAX_MEMORY_SIZE[]; + /// "source.split.target-size" - Target size of a source split when scanning a bucket. Default /// value is 128MB. static const char SOURCE_SPLIT_TARGET_SIZE[]; diff --git a/src/paimon/CMakeLists.txt b/src/paimon/CMakeLists.txt index a4e919030..dfd7e8bf7 100644 --- a/src/paimon/CMakeLists.txt +++ b/src/paimon/CMakeLists.txt @@ -64,6 +64,7 @@ set(PAIMON_COMMON_SRCS common/io/byte_array_input_stream.cpp common/io/data_input_stream.cpp common/io/data_output_stream.cpp + common/io/memory_segment_input_stream.cpp common/io/memory_segment_output_stream.cpp common/io/offset_input_stream.cpp common/io/cache/cache_key.cpp diff --git a/src/paimon/common/defs.cpp b/src/paimon/common/defs.cpp index b328213d6..76570c454 100644 --- a/src/paimon/common/defs.cpp +++ b/src/paimon/common/defs.cpp @@ -43,6 +43,7 @@ const char Options::MANIFEST_COMPRESSION[] = "manifest.compression"; const char Options::MANIFEST_MERGE_MIN_COUNT[] = "manifest.merge-min-count"; const char Options::MANIFEST_FULL_COMPACTION_FILE_SIZE[] = "manifest.full-compaction-threshold-size"; +const char Options::MANIFEST_CACHE_MAX_MEMORY_SIZE[] = "manifest.cache-max-memory-size"; const char Options::SOURCE_SPLIT_TARGET_SIZE[] = "source.split.target-size"; const char Options::SOURCE_SPLIT_OPEN_FILE_COST[] = "source.split.open-file-cost"; const char Options::SCAN_SNAPSHOT_ID[] = "scan.snapshot-id"; diff --git a/src/paimon/common/factories/io_hook.cpp b/src/paimon/common/factories/io_hook.cpp index 8f576402b..42f67016e 100644 --- a/src/paimon/common/factories/io_hook.cpp +++ b/src/paimon/common/factories/io_hook.cpp @@ -56,6 +56,10 @@ class IOHook::Impl { return io_count_.load(); } + bool IsActive() const { + return pos_.load() >= 0; + } + void Clear() { Reset(-1, IOHook::Mode::SILENT); } @@ -77,6 +81,10 @@ int64_t IOHook::IOCount() const { return impl_->IOCount(); } +bool IOHook::IsActive() const { + return impl_->IsActive(); +} + void IOHook::Clear() { return impl_->Clear(); } diff --git a/src/paimon/common/factories/io_hook.h b/src/paimon/common/factories/io_hook.h index 1676c3a91..a7ebb5f20 100644 --- a/src/paimon/common/factories/io_hook.h +++ b/src/paimon/common/factories/io_hook.h @@ -58,6 +58,9 @@ class PAIMON_EXPORT IOHook : public Singleton { /// @return The number of IO operations executed. int64_t IOCount() const; + /// Whether IOHook currently has an injection position configured. + bool IsActive() const; + /// Clear the state of the IOHook, including resetting IO count and /// any stored exception state. void Clear(); diff --git a/src/paimon/common/io/memory_segment_input_stream.cpp b/src/paimon/common/io/memory_segment_input_stream.cpp new file mode 100644 index 000000000..0feeb0509 --- /dev/null +++ b/src/paimon/common/io/memory_segment_input_stream.cpp @@ -0,0 +1,24 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/io/memory_segment_input_stream.h" + +namespace paimon { + +MemorySegmentInputStream::MemorySegmentInputStream(const MemorySegment& segment) + : ByteArrayInputStream(segment.Data(), segment.Size()), segment_(segment) {} + +} // namespace paimon diff --git a/src/paimon/common/io/memory_segment_input_stream.h b/src/paimon/common/io/memory_segment_input_stream.h new file mode 100644 index 000000000..ca86a7d19 --- /dev/null +++ b/src/paimon/common/io/memory_segment_input_stream.h @@ -0,0 +1,33 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "paimon/common/memory/memory_segment.h" +#include "paimon/io/byte_array_input_stream.h" + +namespace paimon { + +/// Input stream for a MemorySegment. The stream keeps the segment alive and does not copy bytes. +class MemorySegmentInputStream : public ByteArrayInputStream { + public: + explicit MemorySegmentInputStream(const MemorySegment& segment); + + private: + MemorySegment segment_; +}; + +} // namespace paimon diff --git a/src/paimon/core/append/append_compact_coordinator.cpp b/src/paimon/core/append/append_compact_coordinator.cpp index 4d36436d1..5abefebbb 100644 --- a/src/paimon/core/append/append_compact_coordinator.cpp +++ b/src/paimon/core/append/append_compact_coordinator.cpp @@ -135,7 +135,8 @@ Result> CreateFileStoreScan( PAIMON_ASSIGN_OR_RAISE( std::shared_ptr manifest_list, ManifestList::Create(core_options.GetFileSystem(), core_options.GetManifestFormat(), - core_options.GetManifestCompression(), path_factory, pool)); + core_options.GetManifestCompression(), path_factory, pool, + core_options.GetManifestCacheManager())); PAIMON_ASSIGN_OR_RAISE( std::shared_ptr manifest_file, ManifestFile::Create(core_options.GetFileSystem(), core_options.GetManifestFormat(), diff --git a/src/paimon/core/core_options.cpp b/src/paimon/core/core_options.cpp index 818995706..2b05dd3a8 100644 --- a/src/paimon/core/core_options.cpp +++ b/src/paimon/core/core_options.cpp @@ -24,6 +24,7 @@ #include "fmt/format.h" #include "paimon/common/fs/resolving_file_system.h" +#include "paimon/common/io/cache/cache_manager.h" #include "paimon/common/options/memory_size.h" #include "paimon/common/options/time_duration.h" #include "paimon/common/utils/path_util.h" @@ -363,6 +364,7 @@ struct CoreOptions::Impl { int64_t source_split_target_size = 128 * 1024 * 1024; int64_t source_split_open_file_cost = 4 * 1024 * 1024; int64_t manifest_target_file_size = 8 * 1024 * 1024; + int64_t manifest_cache_max_memory_size = 64 * 1024 * 1024; int64_t deletion_vector_target_file_size = 2 * 1024 * 1024; int64_t manifest_full_compaction_file_size = 16 * 1024 * 1024; int64_t write_buffer_size = 256 * 1024 * 1024; @@ -371,6 +373,7 @@ struct CoreOptions::Impl { std::shared_ptr file_format; std::shared_ptr file_system; std::shared_ptr manifest_file_format; + std::shared_ptr manifest_cache_manager; std::optional scan_snapshot_id; std::optional scan_timestamp_millis; @@ -601,9 +604,21 @@ struct CoreOptions::Impl { // Parse manifest.full-compaction-threshold-size - size threshold for full compaction PAIMON_RETURN_NOT_OK(parser.ParseMemorySize(Options::MANIFEST_FULL_COMPACTION_FILE_SIZE, &manifest_full_compaction_file_size)); + // Parse manifest.cache-max-memory-size - raw manifest file bytes cache capacity + PAIMON_RETURN_NOT_OK(parser.ParseMemorySize(Options::MANIFEST_CACHE_MAX_MEMORY_SIZE, + &manifest_cache_max_memory_size)); return Status::OK(); } + void InitManifestCacheManager() { + if (manifest_cache_max_memory_size <= 0) { + manifest_cache_manager = nullptr; + return; + } + manifest_cache_manager = std::make_shared(manifest_cache_max_memory_size, + /*high_priority_pool_ratio=*/0.0); + } + // Parse snapshot expiration and retention configurations. Status ParseExpireOptions(const ConfigParser& parser) { // Parse snapshot.num-retained.min - minimum completed snapshots to retain, default 10 @@ -864,11 +879,14 @@ Result CoreOptions::FromMap( PAIMON_RETURN_NOT_OK(impl->ParseIndexOptions(parser)); PAIMON_RETURN_NOT_OK(impl->ParseCompactionOptions(parser)); PAIMON_RETURN_NOT_OK(impl->ParseLookupOptions(parser)); + impl->InitManifestCacheManager(); return options; } -CoreOptions::CoreOptions() : impl_(std::make_unique()) {} +CoreOptions::CoreOptions() : impl_(std::make_unique()) { + impl_->InitManifestCacheManager(); +} CoreOptions::CoreOptions(const CoreOptions& rhs) : impl_(std::make_unique(*(rhs.impl_.get()))) {} @@ -967,6 +985,14 @@ int64_t CoreOptions::GetManifestTargetFileSize() const { return impl_->manifest_target_file_size; } +int64_t CoreOptions::GetManifestCacheMaxMemorySize() const { + return impl_->manifest_cache_max_memory_size; +} + +std::shared_ptr CoreOptions::GetManifestCacheManager() const { + return impl_->manifest_cache_manager; +} + int32_t CoreOptions::GetManifestMergeMinCount() const { return impl_->manifest_merge_min_count; } diff --git a/src/paimon/core/core_options.h b/src/paimon/core/core_options.h index 699506413..a0a075e7e 100644 --- a/src/paimon/core/core_options.h +++ b/src/paimon/core/core_options.h @@ -40,6 +40,7 @@ namespace paimon { +class CacheManager; class ExpireConfig; class PAIMON_EXPORT CoreOptions { @@ -77,6 +78,8 @@ class PAIMON_EXPORT CoreOptions { std::optional GetScanTimestampMillis() const; int64_t GetManifestTargetFileSize() const; + int64_t GetManifestCacheMaxMemorySize() const; + std::shared_ptr GetManifestCacheManager() const; StartupMode GetStartupMode() const; int32_t GetReadBatchSize() const; diff --git a/src/paimon/core/core_options_test.cpp b/src/paimon/core/core_options_test.cpp index 4d8a26884..38c660ad4 100644 --- a/src/paimon/core/core_options_test.cpp +++ b/src/paimon/core/core_options_test.cpp @@ -55,6 +55,8 @@ TEST(CoreOptionsTest, TestDefaultValue) { ASSERT_EQ(8 * 1024 * 1024L, core_options.GetManifestTargetFileSize()); ASSERT_EQ(16 * 1024 * 1024L, core_options.GetManifestFullCompactionThresholdSize()); ASSERT_EQ(30, core_options.GetManifestMergeMinCount()); + ASSERT_EQ(64 * 1024 * 1024L, core_options.GetManifestCacheMaxMemorySize()); + ASSERT_NE(nullptr, core_options.GetManifestCacheManager()); ASSERT_EQ(128 * 1024 * 1024L, core_options.GetSourceSplitTargetSize()); ASSERT_EQ(4 * 1024 * 1024L, core_options.GetSourceSplitOpenFileCost()); ASSERT_EQ(1024, core_options.GetReadBatchSize()); @@ -166,6 +168,7 @@ TEST(CoreOptionsTest, TestFromMap) { {Options::MANIFEST_TARGET_FILE_SIZE, "16MB"}, {Options::MANIFEST_FULL_COMPACTION_FILE_SIZE, "32MB"}, {Options::MANIFEST_MERGE_MIN_COUNT, "2"}, + {Options::MANIFEST_CACHE_MAX_MEMORY_SIZE, "0"}, {Options::SOURCE_SPLIT_TARGET_SIZE, "24MB"}, {Options::SOURCE_SPLIT_OPEN_FILE_COST, "32MB"}, {Options::READ_BATCH_SIZE, "2048"}, @@ -283,6 +286,8 @@ TEST(CoreOptionsTest, TestFromMap) { ASSERT_EQ(16 * 1024 * 1024L, core_options.GetManifestTargetFileSize()); ASSERT_EQ(32 * 1024 * 1024L, core_options.GetManifestFullCompactionThresholdSize()); ASSERT_EQ(2, core_options.GetManifestMergeMinCount()); + ASSERT_EQ(0, core_options.GetManifestCacheMaxMemorySize()); + ASSERT_EQ(nullptr, core_options.GetManifestCacheManager()); ASSERT_EQ(24 * 1024 * 1024L, core_options.GetSourceSplitTargetSize()); ASSERT_EQ(32 * 1024 * 1024L, core_options.GetSourceSplitOpenFileCost()); ASSERT_EQ(2048, core_options.GetReadBatchSize()); @@ -852,9 +857,11 @@ TEST(CoreOptionsTest, TestCopyAssignmentOperator) { // Verify the target's ToMap matches the source's ToMap ASSERT_EQ(source.ToMap(), target.ToMap()); + ASSERT_EQ(source.GetManifestCacheManager(), target.GetManifestCacheManager()); CoreOptions target2 = source; ASSERT_EQ(source.ToMap(), target2.ToMap()); + ASSERT_EQ(source.GetManifestCacheManager(), target2.GetManifestCacheManager()); } TEST(CoreOptionsTest, TestAssignmentIndependence) { @@ -885,6 +892,7 @@ TEST(CoreOptionsTest, TestAssignmentIndependence) { // Source should have new values ASSERT_EQ(99, source.GetBucket()); ASSERT_EQ(MergeEngine::DEDUPLICATE, source.GetMergeEngine()); + ASSERT_NE(source.GetManifestCacheManager(), target.GetManifestCacheManager()); } TEST(CoreOptionsTest, TestFallback) { diff --git a/src/paimon/core/manifest/index_manifest_file.cpp b/src/paimon/core/manifest/index_manifest_file.cpp index 4fadbf811..9c7dec585 100644 --- a/src/paimon/core/manifest/index_manifest_file.cpp +++ b/src/paimon/core/manifest/index_manifest_file.cpp @@ -59,9 +59,9 @@ Result> IndexManifestFile::Create( std::shared_ptr index_manifest_file_factory = path_factory->CreateIndexManifestFileFactory(); - return std::unique_ptr( - new IndexManifestFile(file_system, reader_builder, writer_builder, compression, - index_manifest_file_factory, bucket_mode, pool)); + return std::unique_ptr(new IndexManifestFile( + file_system, reader_builder, writer_builder, compression, index_manifest_file_factory, + bucket_mode, pool, options.GetManifestCacheManager())); } IndexManifestFile::IndexManifestFile(const std::shared_ptr& file_system, @@ -69,10 +69,11 @@ IndexManifestFile::IndexManifestFile(const std::shared_ptr& file_sys const std::shared_ptr& writer_builder, const std::string& compression, const std::shared_ptr& path_factory, - int32_t bucket_mode, const std::shared_ptr& pool) + int32_t bucket_mode, const std::shared_ptr& pool, + const std::shared_ptr& cache_manager) : ObjectsFile(file_system, reader_builder, writer_builder, std::make_unique(pool), - compression, path_factory, pool), + compression, path_factory, pool, cache_manager), bucket_mode_(bucket_mode) {} Result> IndexManifestFile::WriteIndexFiles( diff --git a/src/paimon/core/manifest/index_manifest_file.h b/src/paimon/core/manifest/index_manifest_file.h index 31473eff8..fe1ce22c0 100644 --- a/src/paimon/core/manifest/index_manifest_file.h +++ b/src/paimon/core/manifest/index_manifest_file.h @@ -20,6 +20,7 @@ #include #include +#include "paimon/common/io/cache/cache_manager.h" #include "paimon/core/core_options.h" #include "paimon/core/manifest/index_manifest_entry.h" #include "paimon/core/utils/file_store_path_factory.h" @@ -60,7 +61,8 @@ class IndexManifestFile : public ObjectsFile { const std::shared_ptr& writer_builder, const std::string& compression, const std::shared_ptr& path_factory, int32_t bucket_mode, - const std::shared_ptr& pool); + const std::shared_ptr& pool, + const std::shared_ptr& cache_manager); const int32_t bucket_mode_; }; diff --git a/src/paimon/core/manifest/manifest_file.cpp b/src/paimon/core/manifest/manifest_file.cpp index a349762d8..a8cc8e98b 100644 --- a/src/paimon/core/manifest/manifest_file.cpp +++ b/src/paimon/core/manifest/manifest_file.cpp @@ -54,7 +54,7 @@ ManifestFile::ManifestFile(const std::shared_ptr& file_system, const std::shared_ptr& partition_type) : ObjectsFile(file_system, reader_builder, writer_builder, std::make_unique(pool), compression, - path_factory, pool), + path_factory, pool, options.GetManifestCacheManager()), target_file_size_(target_file_size), options_(options), partition_type_(partition_type) {} diff --git a/src/paimon/core/manifest/manifest_file_test.cpp b/src/paimon/core/manifest/manifest_file_test.cpp index 8c2adb76d..fcd05bb8e 100644 --- a/src/paimon/core/manifest/manifest_file_test.cpp +++ b/src/paimon/core/manifest/manifest_file_test.cpp @@ -27,6 +27,8 @@ #include "gtest/gtest.h" #include "paimon/common/data/binary_row.h" #include "paimon/common/data/data_define.h" +#include "paimon/common/factories/io_hook.h" +#include "paimon/common/utils/scope_guard.h" #include "paimon/core/io/data_file_meta.h" #include "paimon/core/manifest/file_kind.h" #include "paimon/core/manifest/file_source.h" @@ -44,6 +46,58 @@ #include "paimon/testing/utils/testharness.h" namespace paimon::test { + +class CountingFileSystem : public FileSystem { + public: + Result> Open(const std::string& path) const override { + ++open_count; + return local_.Open(path); + } + + Result> Create(const std::string& path, + bool overwrite) const override { + return local_.Create(path, overwrite); + } + + Status Mkdirs(const std::string& path) const override { + return local_.Mkdirs(path); + } + + Status Rename(const std::string& src, const std::string& dst) const override { + return local_.Rename(src, dst); + } + + Status Delete(const std::string& path, bool recursive = true) const override { + return local_.Delete(path, recursive); + } + + Result> GetFileStatus(const std::string& path) const override { + ++get_file_status_count; + return local_.GetFileStatus(path); + } + + Status ListDir(const std::string& directory, + std::vector>* file_status_list) const override { + return local_.ListDir(directory, file_status_list); + } + + Status ListFileStatus( + const std::string& path, + std::vector>* file_status_list) const override { + return local_.ListFileStatus(path, file_status_list); + } + + Result Exists(const std::string& path) const override { + return local_.Exists(path); + } + + mutable int open_count = 0; + mutable int get_file_status_count = 0; + + private: + LocalFileSystem local_; +}; + class ManifestFileTest : public testing::Test { public: std::vector ReadManifestEntry(const std::string& file_format_str, @@ -181,6 +235,123 @@ TEST_F(ManifestFileTest, TestSimple) { ASSERT_EQ(expected_manifest_entries, manifest_entries); } +TEST_F(ManifestFileTest, TestReadUsesManifestCache) { + auto pool = GetDefaultPool(); + auto counting_file_system = std::make_shared(); + EXPECT_OK_AND_ASSIGN(std::shared_ptr file_format, + FileFormatFactory::Get("orc", {})); + std::string root_path = paimon::test::GetDataDir() + "/orc/append_09.db/append_09"; + auto unused_schema = arrow::schema(arrow::FieldVector({arrow::field("f0", arrow::utf8())})); + EXPECT_OK_AND_ASSIGN( + std::shared_ptr path_factory, + FileStorePathFactory::Create(root_path, unused_schema, /*partition_keys=*/{}, + /*default_part_value=*/"", file_format->Identifier(), + /*data_file_prefix=*/"data-", + /*legacy_partition_name_enabled=*/true, /*external_paths=*/{}, + /*global_index_external_path=*/std::nullopt, + /*index_file_in_data_file_dir=*/false, pool)); + EXPECT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::FILE_FORMAT, "orc"}, + {Options::MANIFEST_FORMAT, "orc"}, + {Options::MANIFEST_CACHE_MAX_MEMORY_SIZE, "5MB"}})); + EXPECT_OK_AND_ASSIGN( + std::unique_ptr manifest_file, + ManifestFile::Create(counting_file_system, file_format, "zstd", path_factory, + /*target_file_size=*/1024, pool, options, unused_schema)); + + std::vector first_read; + ASSERT_OK(manifest_file->Read("manifest-3ea5ee21-d399-4f1c-a749-2fc63dbf0852-1", + /*filter=*/nullptr, &first_read)); + ASSERT_EQ(5, first_read.size()); + ASSERT_EQ(1, counting_file_system->open_count); + ASSERT_EQ(0, counting_file_system->get_file_status_count); + + std::vector filtered_read; + ASSERT_OK(manifest_file->Read( + "manifest-3ea5ee21-d399-4f1c-a749-2fc63dbf0852-1", + [](const ManifestEntry& entry) -> Result { return entry.Kind() == FileKind::Add(); }, + &filtered_read)); + ASSERT_EQ(1, filtered_read.size()); + ASSERT_EQ(1, counting_file_system->open_count); + ASSERT_EQ(0, counting_file_system->get_file_status_count); +} + +TEST_F(ManifestFileTest, TestManifestCacheCanBeDisabled) { + auto pool = GetDefaultPool(); + auto counting_file_system = std::make_shared(); + EXPECT_OK_AND_ASSIGN(std::shared_ptr file_format, + FileFormatFactory::Get("orc", {})); + std::string root_path = paimon::test::GetDataDir() + "/orc/append_09.db/append_09"; + auto unused_schema = arrow::schema(arrow::FieldVector({arrow::field("f0", arrow::utf8())})); + EXPECT_OK_AND_ASSIGN( + std::shared_ptr path_factory, + FileStorePathFactory::Create(root_path, unused_schema, /*partition_keys=*/{}, + /*default_part_value=*/"", file_format->Identifier(), + /*data_file_prefix=*/"data-", + /*legacy_partition_name_enabled=*/true, /*external_paths=*/{}, + /*global_index_external_path=*/std::nullopt, + /*index_file_in_data_file_dir=*/false, pool)); + EXPECT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::FILE_FORMAT, "orc"}, + {Options::MANIFEST_FORMAT, "orc"}, + {Options::MANIFEST_CACHE_MAX_MEMORY_SIZE, "0"}})); + EXPECT_OK_AND_ASSIGN( + std::unique_ptr manifest_file, + ManifestFile::Create(counting_file_system, file_format, "zstd", path_factory, + /*target_file_size=*/1024, pool, options, unused_schema)); + + std::vector first_read; + ASSERT_OK(manifest_file->Read("manifest-3ea5ee21-d399-4f1c-a749-2fc63dbf0852-1", + /*filter=*/nullptr, &first_read)); + std::vector second_read; + ASSERT_OK(manifest_file->Read("manifest-3ea5ee21-d399-4f1c-a749-2fc63dbf0852-1", + /*filter=*/nullptr, &second_read)); + + ASSERT_EQ(first_read, second_read); + ASSERT_EQ(2, counting_file_system->open_count); + ASSERT_EQ(0, counting_file_system->get_file_status_count); +} + +TEST_F(ManifestFileTest, TestManifestCacheBypassesWhenIOHookActive) { + auto pool = GetDefaultPool(); + auto counting_file_system = std::make_shared(); + EXPECT_OK_AND_ASSIGN(std::shared_ptr file_format, + FileFormatFactory::Get("orc", {})); + std::string root_path = paimon::test::GetDataDir() + "/orc/append_09.db/append_09"; + auto unused_schema = arrow::schema(arrow::FieldVector({arrow::field("f0", arrow::utf8())})); + EXPECT_OK_AND_ASSIGN( + std::shared_ptr path_factory, + FileStorePathFactory::Create(root_path, unused_schema, /*partition_keys=*/{}, + /*default_part_value=*/"", file_format->Identifier(), + /*data_file_prefix=*/"data-", + /*legacy_partition_name_enabled=*/true, /*external_paths=*/{}, + /*global_index_external_path=*/std::nullopt, + /*index_file_in_data_file_dir=*/false, pool)); + EXPECT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::FILE_FORMAT, "orc"}, + {Options::MANIFEST_FORMAT, "orc"}, + {Options::MANIFEST_CACHE_MAX_MEMORY_SIZE, "6MB"}})); + EXPECT_OK_AND_ASSIGN( + std::unique_ptr manifest_file, + ManifestFile::Create(counting_file_system, file_format, "zstd", path_factory, + /*target_file_size=*/1024, pool, options, unused_schema)); + + std::vector first_read; + ASSERT_OK(manifest_file->Read("manifest-3ea5ee21-d399-4f1c-a749-2fc63dbf0852-1", + /*filter=*/nullptr, &first_read)); + ASSERT_EQ(1, counting_file_system->open_count); + + auto io_hook = IOHook::GetInstance(); + io_hook->Reset(/*pos=*/100, IOHook::Mode::SILENT); + ScopeGuard guard([&io_hook]() { io_hook->Clear(); }); + + std::vector second_read; + ASSERT_OK(manifest_file->Read("manifest-3ea5ee21-d399-4f1c-a749-2fc63dbf0852-1", + /*filter=*/nullptr, &second_read)); + ASSERT_EQ(first_read, second_read); + ASSERT_EQ(2, counting_file_system->open_count); +} + TEST_F(ManifestFileTest, TestWithNullCount) { auto pool = GetDefaultPool(); auto manifest_entries = diff --git a/src/paimon/core/manifest/manifest_list.cpp b/src/paimon/core/manifest/manifest_list.cpp index de36d3308..c9aed6127 100644 --- a/src/paimon/core/manifest/manifest_list.cpp +++ b/src/paimon/core/manifest/manifest_list.cpp @@ -41,15 +41,16 @@ ManifestList::ManifestList(const std::shared_ptr& file_system, const std::shared_ptr& writer_builder, const std::string& compression, const std::shared_ptr& path_factory, - const std::shared_ptr& pool) + const std::shared_ptr& pool, + const std::shared_ptr& cache_manager) : ObjectsFile(file_system, reader_builder, writer_builder, std::make_unique(pool), compression, - std::move(path_factory), pool) {} + std::move(path_factory), pool, cache_manager) {} Result> ManifestList::Create( const std::shared_ptr& fs, const std::shared_ptr& file_format, const std::string& compression, const std::shared_ptr& path_factory, - const std::shared_ptr& pool) { + const std::shared_ptr& pool, const std::shared_ptr& cache_manager) { std::shared_ptr data_type = VersionedObjectSerializer::VersionType(ManifestFileMeta::DataType()); // prepare format reader builder @@ -68,8 +69,9 @@ Result> ManifestList::Create( // create manifest list std::shared_ptr manifest_list_path_factory = path_factory->CreateManifestListFactory(); - return std::unique_ptr(new ManifestList( - fs, reader_builder, writer_builder, compression, manifest_list_path_factory, pool)); + return std::unique_ptr(new ManifestList(fs, reader_builder, writer_builder, + compression, manifest_list_path_factory, + pool, cache_manager)); } Result> ManifestList::Write( diff --git a/src/paimon/core/manifest/manifest_list.h b/src/paimon/core/manifest/manifest_list.h index e36368ac0..d5dd9c37d 100644 --- a/src/paimon/core/manifest/manifest_list.h +++ b/src/paimon/core/manifest/manifest_list.h @@ -24,6 +24,7 @@ #include #include +#include "paimon/common/io/cache/cache_manager.h" #include "paimon/core/manifest/manifest_file_meta.h" #include "paimon/core/snapshot.h" #include "paimon/core/utils/objects_file.h" @@ -50,7 +51,8 @@ class ManifestList : public ObjectsFile { const std::shared_ptr& file_system, const std::shared_ptr& file_format, const std::string& compression, const std::shared_ptr& path_factory, - const std::shared_ptr& pool); + const std::shared_ptr& pool, + const std::shared_ptr& cache_manager = nullptr); /// Write several `ManifestFileMeta`s into a manifest list. /// @@ -122,7 +124,8 @@ class ManifestList : public ObjectsFile { const std::shared_ptr& reader_builder, const std::shared_ptr& writer_builder, const std::string& compression, const std::shared_ptr& path_factory, - const std::shared_ptr& pool); + const std::shared_ptr& pool, + const std::shared_ptr& cache_manager); }; } // namespace paimon diff --git a/src/paimon/core/operation/append_only_file_store_write.cpp b/src/paimon/core/operation/append_only_file_store_write.cpp index 0986c6d85..e67b5fa26 100644 --- a/src/paimon/core/operation/append_only_file_store_write.cpp +++ b/src/paimon/core/operation/append_only_file_store_write.cpp @@ -87,7 +87,8 @@ Result> AppendOnlyFileStoreWrite::CreateFileStore PAIMON_ASSIGN_OR_RAISE( std::shared_ptr manifest_list, ManifestList::Create(options_.GetFileSystem(), options_.GetManifestFormat(), - options_.GetManifestCompression(), file_store_path_factory_, pool_)); + options_.GetManifestCompression(), file_store_path_factory_, pool_, + options_.GetManifestCacheManager())); PAIMON_ASSIGN_OR_RAISE( std::shared_ptr manifest_file, ManifestFile::Create(options_.GetFileSystem(), options_.GetManifestFormat(), diff --git a/src/paimon/core/operation/file_store_commit.cpp b/src/paimon/core/operation/file_store_commit.cpp index a7675c041..6f089fdd1 100644 --- a/src/paimon/core/operation/file_store_commit.cpp +++ b/src/paimon/core/operation/file_store_commit.cpp @@ -113,7 +113,8 @@ Result> FileStoreCommit::Create( PAIMON_ASSIGN_OR_RAISE( std::shared_ptr manifest_list, ManifestList::Create(options.GetFileSystem(), options.GetManifestFormat(), - options.GetManifestCompression(), path_factory, ctx->GetMemoryPool())); + options.GetManifestCompression(), path_factory, ctx->GetMemoryPool(), + options.GetManifestCacheManager())); PAIMON_ASSIGN_OR_RAISE( std::shared_ptr partition_schema, diff --git a/src/paimon/core/operation/key_value_file_store_write.cpp b/src/paimon/core/operation/key_value_file_store_write.cpp index 16b5180b5..a4acb360f 100644 --- a/src/paimon/core/operation/key_value_file_store_write.cpp +++ b/src/paimon/core/operation/key_value_file_store_write.cpp @@ -79,7 +79,8 @@ Result> KeyValueFileStoreWrite::CreateFileStoreSc PAIMON_ASSIGN_OR_RAISE( std::shared_ptr manifest_list, ManifestList::Create(options_.GetFileSystem(), options_.GetManifestFormat(), - options_.GetManifestCompression(), file_store_path_factory_, pool_)); + options_.GetManifestCompression(), file_store_path_factory_, pool_, + options_.GetManifestCacheManager())); PAIMON_ASSIGN_OR_RAISE( std::shared_ptr manifest_file, ManifestFile::Create(options_.GetFileSystem(), options_.GetManifestFormat(), diff --git a/src/paimon/core/operation/orphan_files_cleaner.cpp b/src/paimon/core/operation/orphan_files_cleaner.cpp index c3a001cca..58fb3a253 100644 --- a/src/paimon/core/operation/orphan_files_cleaner.cpp +++ b/src/paimon/core/operation/orphan_files_cleaner.cpp @@ -195,7 +195,8 @@ Result> OrphanFilesCleaner::Create( PAIMON_ASSIGN_OR_RAISE( std::shared_ptr manifest_list, ManifestList::Create(options.GetFileSystem(), options.GetManifestFormat(), - options.GetManifestCompression(), path_factory, ctx->GetMemoryPool())); + options.GetManifestCompression(), path_factory, ctx->GetMemoryPool(), + options.GetManifestCacheManager())); PAIMON_ASSIGN_OR_RAISE( std::shared_ptr partition_schema, FieldMapping::GetPartitionSchema(arrow_schema, table_schema.value()->PartitionKeys())); diff --git a/src/paimon/core/table/source/table_scan.cpp b/src/paimon/core/table/source/table_scan.cpp index c6e6f3190..22130f271 100644 --- a/src/paimon/core/table/source/table_scan.cpp +++ b/src/paimon/core/table/source/table_scan.cpp @@ -86,7 +86,8 @@ class TableScanImpl { PAIMON_ASSIGN_OR_RAISE( std::shared_ptr manifest_list, ManifestList::Create(fs, manifest_file_format, core_options.GetManifestCompression(), - path_factory, memory_pool)); + path_factory, memory_pool, + core_options.GetManifestCacheManager())); PAIMON_ASSIGN_OR_RAISE( std::shared_ptr partition_schema, FieldMapping::GetPartitionSchema(arrow_schema, table_schema->PartitionKeys())); diff --git a/src/paimon/core/utils/objects_file.h b/src/paimon/core/utils/objects_file.h index cc3242b07..b09b949fa 100644 --- a/src/paimon/core/utils/objects_file.h +++ b/src/paimon/core/utils/objects_file.h @@ -16,7 +16,9 @@ #pragma once +#include #include +#include #include #include #include @@ -24,7 +26,13 @@ #include "arrow/c/bridge.h" #include "arrow/c/helpers.h" +#include "fmt/format.h" #include "paimon/common/data/columnar/columnar_row.h" +#include "paimon/common/factories/io_hook.h" +#include "paimon/common/io/cache/cache_key.h" +#include "paimon/common/io/cache/cache_manager.h" +#include "paimon/common/io/memory_segment_input_stream.h" +#include "paimon/common/memory/memory_segment.h" #include "paimon/common/utils/arrow/arrow_utils.h" #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/common/utils/path_util.h" @@ -50,7 +58,8 @@ class ObjectsFile { const std::shared_ptr& writer_builder, std::unique_ptr>&& serializer, const std::string& compression, const std::shared_ptr& path_factory, - const std::shared_ptr& pool); + const std::shared_ptr& pool, + const std::shared_ptr& cache_manager = nullptr); virtual ~ObjectsFile() = default; @@ -80,6 +89,9 @@ class ObjectsFile { std::shared_ptr file_system_; std::shared_ptr reader_builder_; std::string compression_; + std::shared_ptr cache_manager_; + + Result ReadFileBytes(const std::string& file_path) const; }; template @@ -89,16 +101,16 @@ ObjectsFile::ObjectsFile(const std::shared_ptr& file_system, std::unique_ptr>&& serializer, const std::string& compression, const std::shared_ptr& path_factory, - const std::shared_ptr& pool) + const std::shared_ptr& pool, + const std::shared_ptr& cache_manager) : path_factory_(path_factory), pool_(pool), serializer_(std::move(serializer)), writer_builder_(std::move(writer_builder)), file_system_(file_system), reader_builder_(std::move(reader_builder)), - compression_(compression) { - // TODO(xinyu.lxy): add cache -} + compression_(compression), + cache_manager_(cache_manager) {} template Status ObjectsFile::ReadIfFileExist(const std::string& file_name, @@ -117,8 +129,29 @@ Status ObjectsFile::Read(const std::string& file_name, const std::function(const T&)>& filter, std::vector* result) const { std::string file_path = path_factory_->ToPath(file_name); - PAIMON_ASSIGN_OR_RAISE(std::shared_ptr file_input_stream, - file_system_->Open(file_path)); + bool should_cache = cache_manager_ && !IOHook::GetInstance()->IsActive(); + std::shared_ptr file_input_stream; + if (should_cache) { + // Use a whole-file key so cache hits do not need a metadata lookup just to discover file + // length. + std::shared_ptr cache_key = + CacheKey::ForPosition(file_path, /*position=*/0, /*length=*/-1, /*is_index=*/false); + Result segment_result = cache_manager_->GetPage( + cache_key, + [&](const std::shared_ptr&) -> Result { + return ReadFileBytes(file_path); + }, + /*eviction_callback=*/nullptr); + if (segment_result.ok()) { + file_input_stream = std::make_shared(segment_result.value()); + } + } + if (!file_input_stream) { + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr unique_file_input_stream, + file_system_->Open(file_path)); + file_input_stream = std::shared_ptr(std::move(unique_file_input_stream)); + } + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr batch_reader, reader_builder_->Build(file_input_stream)); auto reader = std::make_unique(std::move(batch_reader), @@ -154,6 +187,39 @@ Status ObjectsFile::Read(const std::string& file_name, return Status::OK(); } +template +Result ObjectsFile::ReadFileBytes(const std::string& file_path) const { + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr input_stream, + file_system_->Open(file_path)); + Result input_length_result = input_stream->Length(); + if (!input_length_result.ok()) { + return input_length_result.status(); + } + if (input_length_result.value() > std::numeric_limits::max()) { + return Status::Invalid( + fmt::format("file {}, length {} is too large", file_path, input_length_result.value())); + } + int32_t input_length = input_length_result.value(); + + PAIMON_RETURN_NOT_OK(input_stream->Seek(0, FS_SEEK_SET)); + MemorySegment segment = + MemorySegment::AllocateHeapMemory(static_cast(input_length), pool_.get()); + int64_t offset = 0; + while (offset < static_cast(input_length)) { + uint32_t read_size = static_cast(std::min( + static_cast(input_length) - offset, std::numeric_limits::max())); + PAIMON_ASSIGN_OR_RAISE(int32_t actual_read_size, + input_stream->Read(segment.MutableData() + offset, read_size)); + if (actual_read_size <= 0) { + return Status::IOError(fmt::format( + "Unexpected EOF while reading manifest file {}, expected {} bytes, got {} bytes", + file_path, input_length, offset)); + } + offset += actual_read_size; + } + return segment; +} + template Result> ObjectsFile::WriteWithoutRolling( const std::vector& records) {