From 2cdcfe2698ad44928610af733152a0fc7df35695 Mon Sep 17 00:00:00 2001 From: liaoxin Date: Sun, 11 Jan 2026 17:01:25 +0800 Subject: [PATCH 1/4] [fix](cache) Fix packed file cache cleanup issue Previously, file cache used packed file path as cache key, which caused cache entries to be orphaned when stale rowsets were cleaned up (cleanup uses segment path as key). This fix moves the cache layer from inner reader to PackedFileReader wrapper level, ensuring: 1. Cache key = hash(segment_path.filename()) - matches cleanup key 2. Cache size = segment size - correct boundary 3. Each segment has independent cache entry - no interference --- be/src/io/fs/packed_file_manager.cpp | 102 ++++++++++++++++++++++++++- be/src/io/fs/packed_file_system.cpp | 42 ++++++++--- 2 files changed, 131 insertions(+), 13 deletions(-) diff --git a/be/src/io/fs/packed_file_manager.cpp b/be/src/io/fs/packed_file_manager.cpp index 3cf65f12e49540..3101cfb3d8ed79 100644 --- a/be/src/io/fs/packed_file_manager.cpp +++ b/be/src/io/fs/packed_file_manager.cpp @@ -41,10 +41,15 @@ #include "cloud/config.h" #include "common/config.h" #include "gen_cpp/cloud.pb.h" +#include "io/cache/block_file_cache.h" +#include "io/cache/block_file_cache_factory.h" +#include "io/cache/file_block.h" +#include "io/cache/file_cache_common.h" #include "io/fs/packed_file_trailer.h" #include "olap/storage_engine.h" #include "runtime/exec_env.h" #include "util/coding.h" +#include "util/slice.h" #include "util/uid_util.h" namespace doris::io { @@ -108,6 +113,91 @@ Status append_packed_info_trailer(FileWriter* writer, const std::string& packed_ return writer->append(Slice(trailer)); } +// write small file data to file cache +void do_write_to_file_cache(const std::string& small_file_path, const std::string& data, + int64_t tablet_id) { + if (data.empty()) { + return; + } + + // Generate cache key from small file path (e.g., "rowset_id_seg_id.dat") + Path path(small_file_path); + UInt128Wrapper cache_hash = BlockFileCache::hash(path.filename().native()); + + VLOG_DEBUG << "packed_file_cache_write: path=" << small_file_path + << " filename=" << path.filename().native() << " hash=" << cache_hash.to_string() + << " size=" << data.size() << " tablet_id=" << tablet_id; + + BlockFileCache* file_cache = FileCacheFactory::instance()->get_by_path(cache_hash); + if (file_cache == nullptr) { + return; // Cache not available, skip + } + + // Allocate cache blocks + CacheContext ctx; + ctx.cache_type = FileCacheType::NORMAL; + ctx.tablet_id = tablet_id; + ReadStatistics stats; + ctx.stats = &stats; + + FileBlocksHolder holder = file_cache->get_or_set(cache_hash, 0, data.size(), ctx); + + // Write data to cache blocks + size_t data_offset = 0; + for (auto& block : holder.file_blocks) { + if (data_offset >= data.size()) { + break; + } + size_t block_size = block->range().size(); + size_t write_size = std::min(block_size, data.size() - data_offset); + + if (block->state() == FileBlock::State::EMPTY) { + block->get_or_set_downloader(); + if (block->is_downloader()) { + Slice s(data.data() + data_offset, write_size); + Status st = block->append(s); + if (st.ok()) { + st = block->finalize(); + } + if (!st.ok()) { + LOG(WARNING) << "Write small file to cache failed: " << st.msg(); + } + } + } + data_offset += write_size; + } +} + +// Async wrapper: submit cache write task to thread pool +// Note: data is copied to ensure lifetime beyond async execution +void write_small_file_to_cache_async(const std::string& small_file_path, const Slice& data, + int64_t tablet_id) { + if (!config::enable_file_cache || data.size == 0) { + return; + } + + // Copy data since original buffer may be reused before async task executes + // For small files (< 1MB), copy overhead is acceptable + std::string data_copy(data.data, data.size); + + auto* thread_pool = ExecEnv::GetInstance()->s3_file_upload_thread_pool(); + if (thread_pool == nullptr) { + // Fallback to sync write if thread pool not available + do_write_to_file_cache(small_file_path, data_copy, tablet_id); + return; + } + + Status st = thread_pool->submit_func( + [path = small_file_path, data = std::move(data_copy), tablet_id]() { + do_write_to_file_cache(path, data, tablet_id); + }); + + if (!st.ok()) { + LOG(WARNING) << "Failed to submit cache write task: " << st.msg(); + // Don't block on failure, cache write is best-effort + } +} + } // namespace PackedFileManager* PackedFileManager::instance() { @@ -150,8 +240,11 @@ Status PackedFileManager::create_new_packed_file_context( // Create file writer for the packed file FileWriterPtr new_writer; FileWriterOptions opts; - // enable write file cache for packed file - opts.write_file_cache = true; + // Disable write_file_cache for packed file itself. + // We write file cache for each small file separately in append_small_file() + // using the small file path as cache key, ensuring cache entries can be + // properly cleaned up when stale rowsets are removed. + opts.write_file_cache = false; RETURN_IF_ERROR( packed_file_ctx->file_system->create_file(Path(relative_path), &new_writer, &opts)); packed_file_ctx->writer = std::move(new_writer); @@ -253,6 +346,11 @@ Status PackedFileManager::append_small_file(const std::string& path, const Slice // Write data to current packed file RETURN_IF_ERROR(active_state->writer->append(data)); + // Async write data to file cache using small file path as cache key. + // This ensures cache key matches the cleanup key in Rowset::clear_cache(), + // allowing proper cache cleanup when stale rowsets are removed. + write_small_file_to_cache_async(path, data, info.tablet_id); + // Update index PackedSliceLocation location; location.packed_file_path = active_state->packed_file_path; diff --git a/be/src/io/fs/packed_file_system.cpp b/be/src/io/fs/packed_file_system.cpp index dd5b136ba3b750..7ce027a94a2ddb 100644 --- a/be/src/io/fs/packed_file_system.cpp +++ b/be/src/io/fs/packed_file_system.cpp @@ -20,6 +20,7 @@ #include #include "common/status.h" +#include "io/fs/file_reader.h" #include "io/fs/packed_file_reader.h" #include "io/fs/packed_file_writer.h" @@ -69,18 +70,37 @@ Status PackedFileSystem::open_file_impl(const Path& file, FileReaderSPtr* reader // File is in packed file, open packed file and wrap with PackedFileReader const auto& index = it->second; FileReaderSPtr inner_reader; - // Create a new FileReaderOptions with the correct file size - FileReaderOptions local_opts = opts ? *opts : FileReaderOptions(); - // Set file_size to packed file size to avoid head object request - local_opts.file_size = index.packed_file_size; - LOG(INFO) << "open packed file: " << index.packed_file_path << ", file: " << file.native() - << ", offset: " << index.offset << ", size: " << index.size - << ", packed_file_size: " << index.packed_file_size; + + // Create options for opening the packed file + // Disable cache at this layer - we'll add cache wrapper around PackedFileReader instead + // This ensures cache key is based on segment path, not packed file path + FileReaderOptions inner_opts = opts ? *opts : FileReaderOptions(); + inner_opts.file_size = index.packed_file_size; + inner_opts.cache_type = FileCachePolicy::NO_CACHE; + + VLOG_DEBUG << "open packed file: " << index.packed_file_path << ", file: " << file.native() + << ", offset: " << index.offset << ", size: " << index.size + << ", packed_file_size: " << index.packed_file_size; RETURN_IF_ERROR( - _inner_fs->open_file(Path(index.packed_file_path), &inner_reader, &local_opts)); + _inner_fs->open_file(Path(index.packed_file_path), &inner_reader, &inner_opts)); + + // Create PackedFileReader with segment path + // PackedFileReader.path() returns segment path, not packed file path + auto packed_reader = std::make_shared(std::move(inner_reader), file, + index.offset, index.size); - *reader = std::make_shared(std::move(inner_reader), file, index.offset, - index.size); + // If cache is requested, wrap PackedFileReader with CachedRemoteFileReader + // This ensures: + // 1. Cache key = hash(segment_path.filename()) - matches cleanup key + // 2. Cache size = segment size - correct boundary + // 3. Each segment has independent cache entry - no interference during cleanup + if (opts && opts->cache_type != FileCachePolicy::NO_CACHE) { + FileReaderOptions cache_opts = *opts; + cache_opts.file_size = index.size; // Use segment size for cache + *reader = DORIS_TRY(create_cached_file_reader(packed_reader, cache_opts)); + } else { + *reader = packed_reader; + } } else { RETURN_IF_ERROR(_inner_fs->open_file(file, reader, opts)); } @@ -88,7 +108,7 @@ Status PackedFileSystem::open_file_impl(const Path& file, FileReaderSPtr* reader } Status PackedFileSystem::exists_impl(const Path& path, bool* res) const { - LOG(INFO) << "packed file system exist, rowset id " << _append_info.rowset_id; + VLOG_DEBUG << "packed file system exist, rowset id " << _append_info.rowset_id; if (!_index_map_initialized) { return Status::InternalError("PackedFileSystem index map is not initialized"); } From 26ccf1d8096ec03208de911a49b43ef310cf63c4 Mon Sep 17 00:00:00 2001 From: liaoxin Date: Fri, 16 Jan 2026 15:52:50 +0800 Subject: [PATCH 2/4] fix comment and ut --- be/src/io/fs/packed_file_manager.cpp | 26 ++++++++++++++++--- .../io/fs/packed_file_concurrency_test.cpp | 10 ++++--- 2 files changed, 29 insertions(+), 7 deletions(-) diff --git a/be/src/io/fs/packed_file_manager.cpp b/be/src/io/fs/packed_file_manager.cpp index 3101cfb3d8ed79..4651fc083053ec 100644 --- a/be/src/io/fs/packed_file_manager.cpp +++ b/be/src/io/fs/packed_file_manager.cpp @@ -80,6 +80,12 @@ bvar::Window g_packed_file_uploading_to_uploaded_ms_window( "packed_file_uploading_to_uploaded_ms", &g_packed_file_uploading_to_uploaded_ms_recorder, /*window_size=*/10); +// Metrics for async small file cache write +bvar::Adder g_packed_file_cache_async_write_count("packed_file_cache", + "async_write_count"); +bvar::Adder g_packed_file_cache_async_write_bytes("packed_file_cache", + "async_write_bytes"); + Status append_packed_info_trailer(FileWriter* writer, const std::string& packed_file_path, const cloud::PackedFileInfoPB& packed_file_info) { if (writer == nullptr) { @@ -169,7 +175,9 @@ void do_write_to_file_cache(const std::string& small_file_path, const std::strin } // Async wrapper: submit cache write task to thread pool -// Note: data is copied to ensure lifetime beyond async execution +// The data is copied into the lambda capture to ensure its lifetime extends beyond +// the async task execution. The original Slice may reference a buffer that gets +// reused or freed before the async task runs. void write_small_file_to_cache_async(const std::string& small_file_path, const Slice& data, int64_t tablet_id) { if (!config::enable_file_cache || data.size == 0) { @@ -179,6 +187,7 @@ void write_small_file_to_cache_async(const std::string& small_file_path, const S // Copy data since original buffer may be reused before async task executes // For small files (< 1MB), copy overhead is acceptable std::string data_copy(data.data, data.size); + size_t data_size = data.size; auto* thread_pool = ExecEnv::GetInstance()->s3_file_upload_thread_pool(); if (thread_pool == nullptr) { @@ -187,13 +196,24 @@ void write_small_file_to_cache_async(const std::string& small_file_path, const S return; } + // Track async write count and bytes + g_packed_file_cache_async_write_count << 1; + g_packed_file_cache_async_write_bytes << static_cast(data_size); + Status st = thread_pool->submit_func( - [path = small_file_path, data = std::move(data_copy), tablet_id]() { + [path = small_file_path, data = std::move(data_copy), tablet_id, data_size]() { do_write_to_file_cache(path, data, tablet_id); + // Decrement async write count after completion + g_packed_file_cache_async_write_count << -1; + g_packed_file_cache_async_write_bytes << -static_cast(data_size); }); if (!st.ok()) { - LOG(WARNING) << "Failed to submit cache write task: " << st.msg(); + // Revert metrics since task was not submitted + g_packed_file_cache_async_write_count << -1; + g_packed_file_cache_async_write_bytes << -static_cast(data_size); + LOG(WARNING) << "Failed to submit cache write task for " << small_file_path << ": " + << st.msg(); // Don't block on failure, cache write is best-effort } } diff --git a/be/test/io/fs/packed_file_concurrency_test.cpp b/be/test/io/fs/packed_file_concurrency_test.cpp index 06db472b95e24d..c3d62752091fa4 100644 --- a/be/test/io/fs/packed_file_concurrency_test.cpp +++ b/be/test/io/fs/packed_file_concurrency_test.cpp @@ -718,11 +718,13 @@ TEST_F(MergeFileConcurrencyTest, ConcurrentWriteReadCorrectness) { opts.cache_type = FileCachePolicy::FILE_BLOCK_CACHE; opts.is_doris_table = true; ASSERT_TRUE(reader_fs.open_file(Path(path), &reader, &opts).ok()); - auto* merge_reader = dynamic_cast(reader.get()); - ASSERT_NE(merge_reader, nullptr); - auto* cached_reader = - dynamic_cast(merge_reader->_inner_reader.get()); + // After the fix, CachedRemoteFileReader wraps PackedFileReader (not vice versa) + // This ensures cache key uses segment path for proper cleanup + auto* cached_reader = dynamic_cast(reader.get()); ASSERT_NE(cached_reader, nullptr); + auto* merge_reader = + dynamic_cast(cached_reader->get_remote_reader()); + ASSERT_NE(merge_reader, nullptr); IOContext io_ctx; size_t verified = 0; From 380d01435306e05e62edc342088a254b7a4ff33a Mon Sep 17 00:00:00 2001 From: liaoxin Date: Fri, 16 Jan 2026 18:06:45 +0800 Subject: [PATCH 3/4] fix ut --- be/test/io/fs/packed_file_concurrency_test.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/be/test/io/fs/packed_file_concurrency_test.cpp b/be/test/io/fs/packed_file_concurrency_test.cpp index c3d62752091fa4..e45ad7dccd5de2 100644 --- a/be/test/io/fs/packed_file_concurrency_test.cpp +++ b/be/test/io/fs/packed_file_concurrency_test.cpp @@ -677,7 +677,9 @@ TEST_F(MergeFileConcurrencyTest, ConcurrentWriteReadCorrectness) { std::uniform_int_distribution read_size_dist(4 * 1024, 32 * 1024); for (int iter = 0; iter < kIterationPerThread; ++iter) { - std::string path = fmt::format("/tablet_{}/rowset_{}/file_{}", tid, iter, iter); + // Use unique file names to avoid cache key conflicts between threads + // since CachedRemoteFileReader uses path().filename() for cache hash + std::string path = fmt::format("/tablet_{}/rowset_{}/file_t{}_i{}", tid, iter, tid, iter); PackedAppendContext append_info; append_info.resource_id = resource_ids[tid]; From 11c2daefed300b79be10a46d2f18fa5bd51b04c0 Mon Sep 17 00:00:00 2001 From: liaoxin Date: Fri, 16 Jan 2026 18:15:09 +0800 Subject: [PATCH 4/4] fix --- be/test/io/fs/packed_file_concurrency_test.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/be/test/io/fs/packed_file_concurrency_test.cpp b/be/test/io/fs/packed_file_concurrency_test.cpp index e45ad7dccd5de2..31c2db19fb8675 100644 --- a/be/test/io/fs/packed_file_concurrency_test.cpp +++ b/be/test/io/fs/packed_file_concurrency_test.cpp @@ -679,7 +679,8 @@ TEST_F(MergeFileConcurrencyTest, ConcurrentWriteReadCorrectness) { for (int iter = 0; iter < kIterationPerThread; ++iter) { // Use unique file names to avoid cache key conflicts between threads // since CachedRemoteFileReader uses path().filename() for cache hash - std::string path = fmt::format("/tablet_{}/rowset_{}/file_t{}_i{}", tid, iter, tid, iter); + std::string path = + fmt::format("/tablet_{}/rowset_{}/file_t{}_i{}", tid, iter, tid, iter); PackedAppendContext append_info; append_info.resource_id = resource_ids[tid];