From 0dc7235c03d72fb382f8a62279b05068e7ed38ea Mon Sep 17 00:00:00 2001 From: Xin Liao Date: Mon, 19 Jan 2026 16:03:44 +0800 Subject: [PATCH] branch-4.0: [fix](packed-file) Fix packed file cache cleanup issue (#59892) Previously, file cache used packed file path as cache key, which caused cache entries to be orphaned when stale rowsets were cleaned up (cleanup uses segment path as key). This fix moves the cache layer from inner reader to PackedFileReader wrapper level, ensuring: 1. Cache key = hash(segment_path.filename()) - matches cleanup key 2. Cache size = segment size - correct boundary 3. Each segment has independent cache entry - no interference --- be/src/io/fs/packed_file_manager.cpp | 121 +++++++++++++++++- be/src/io/fs/packed_file_system.cpp | 42 ++++-- .../io/fs/packed_file_concurrency_test.cpp | 15 ++- 3 files changed, 160 insertions(+), 18 deletions(-) diff --git a/be/src/io/fs/packed_file_manager.cpp b/be/src/io/fs/packed_file_manager.cpp index 3cf65f12e49540..53a445502c7df5 100644 --- a/be/src/io/fs/packed_file_manager.cpp +++ b/be/src/io/fs/packed_file_manager.cpp @@ -41,10 +41,15 @@ #include "cloud/config.h" #include "common/config.h" #include "gen_cpp/cloud.pb.h" +#include "io/cache/block_file_cache.h" +#include "io/cache/block_file_cache_factory.h" +#include "io/cache/file_block.h" +#include "io/cache/file_cache_common.h" #include "io/fs/packed_file_trailer.h" #include "olap/storage_engine.h" #include "runtime/exec_env.h" #include "util/coding.h" +#include "util/slice.h" #include "util/uid_util.h" namespace doris::io { @@ -75,6 +80,12 @@ bvar::Window g_packed_file_uploading_to_uploaded_ms_window( "packed_file_uploading_to_uploaded_ms", &g_packed_file_uploading_to_uploaded_ms_recorder, /*window_size=*/10); +// Metrics for async small file cache write +bvar::Adder g_packed_file_cache_async_write_count("packed_file_cache", + "async_write_count"); +bvar::Adder g_packed_file_cache_async_write_bytes("packed_file_cache", + "async_write_bytes"); + Status append_packed_info_trailer(FileWriter* writer, const std::string& packed_file_path, const cloud::PackedFileInfoPB& packed_file_info) { if (writer == nullptr) { @@ -108,6 +119,104 @@ Status append_packed_info_trailer(FileWriter* writer, const std::string& packed_ return writer->append(Slice(trailer)); } +// write small file data to file cache +void do_write_to_file_cache(const std::string& small_file_path, const std::string& data, + int64_t tablet_id) { + if (data.empty()) { + return; + } + + // Generate cache key from small file path (e.g., "rowset_id_seg_id.dat") + Path path(small_file_path); + UInt128Wrapper cache_hash = BlockFileCache::hash(path.filename().native()); + + VLOG_DEBUG << "packed_file_cache_write: path=" << small_file_path + << " filename=" << path.filename().native() << " hash=" << cache_hash.to_string() + << " size=" << data.size() << " tablet_id=" << tablet_id; + + BlockFileCache* file_cache = FileCacheFactory::instance()->get_by_path(cache_hash); + if (file_cache == nullptr) { + return; // Cache not available, skip + } + + // Allocate cache blocks + CacheContext ctx; + ctx.cache_type = FileCacheType::NORMAL; + ReadStatistics stats; + ctx.stats = &stats; + + FileBlocksHolder holder = file_cache->get_or_set(cache_hash, 0, data.size(), ctx); + + // Write data to cache blocks + size_t data_offset = 0; + for (auto& block : holder.file_blocks) { + if (data_offset >= data.size()) { + break; + } + size_t block_size = block->range().size(); + size_t write_size = std::min(block_size, data.size() - data_offset); + + if (block->state() == FileBlock::State::EMPTY) { + block->get_or_set_downloader(); + if (block->is_downloader()) { + Slice s(data.data() + data_offset, write_size); + Status st = block->append(s); + if (st.ok()) { + st = block->finalize(); + } + if (!st.ok()) { + LOG(WARNING) << "Write small file to cache failed: " << st.msg(); + } + } + } + data_offset += write_size; + } +} + +// Async wrapper: submit cache write task to thread pool +// The data is copied into the lambda capture to ensure its lifetime extends beyond +// the async task execution. The original Slice may reference a buffer that gets +// reused or freed before the async task runs. +void write_small_file_to_cache_async(const std::string& small_file_path, const Slice& data, + int64_t tablet_id) { + if (!config::enable_file_cache || data.size == 0) { + return; + } + + // Copy data since original buffer may be reused before async task executes + // For small files (< 1MB), copy overhead is acceptable + std::string data_copy(data.data, data.size); + size_t data_size = data.size; + + auto* thread_pool = ExecEnv::GetInstance()->s3_file_upload_thread_pool(); + if (thread_pool == nullptr) { + // Fallback to sync write if thread pool not available + do_write_to_file_cache(small_file_path, data_copy, tablet_id); + return; + } + + // Track async write count and bytes + g_packed_file_cache_async_write_count << 1; + g_packed_file_cache_async_write_bytes << static_cast(data_size); + + Status st = thread_pool->submit_func( + [path = small_file_path, data = std::move(data_copy), tablet_id, data_size]() { + do_write_to_file_cache(path, data, tablet_id); + // Decrement async write count after completion + g_packed_file_cache_async_write_count << -1; + g_packed_file_cache_async_write_bytes << -static_cast(data_size); + }); + + if (!st.ok()) { + // Revert metrics since task was not submitted + g_packed_file_cache_async_write_count << -1; + g_packed_file_cache_async_write_bytes << -static_cast(data_size); + LOG(WARNING) << "Failed to submit cache write task for " << small_file_path << ": " + << st.msg(); + // Don't block on failure, cache write is best-effort + } +} + } // namespace PackedFileManager* PackedFileManager::instance() { @@ -150,8 +259,11 @@ Status PackedFileManager::create_new_packed_file_context( // Create file writer for the packed file FileWriterPtr new_writer; FileWriterOptions opts; - // enable write file cache for packed file - opts.write_file_cache = true; + // Disable write_file_cache for packed file itself. + // We write file cache for each small file separately in append_small_file() + // using the small file path as cache key, ensuring cache entries can be + // properly cleaned up when stale rowsets are removed. + opts.write_file_cache = false; RETURN_IF_ERROR( packed_file_ctx->file_system->create_file(Path(relative_path), &new_writer, &opts)); packed_file_ctx->writer = std::move(new_writer); @@ -253,6 +365,11 @@ Status PackedFileManager::append_small_file(const std::string& path, const Slice // Write data to current packed file RETURN_IF_ERROR(active_state->writer->append(data)); + // Async write data to file cache using small file path as cache key. + // This ensures cache key matches the cleanup key in Rowset::clear_cache(), + // allowing proper cache cleanup when stale rowsets are removed. + write_small_file_to_cache_async(path, data, info.tablet_id); + // Update index PackedSliceLocation location; location.packed_file_path = active_state->packed_file_path; diff --git a/be/src/io/fs/packed_file_system.cpp b/be/src/io/fs/packed_file_system.cpp index dd5b136ba3b750..7ce027a94a2ddb 100644 --- a/be/src/io/fs/packed_file_system.cpp +++ b/be/src/io/fs/packed_file_system.cpp @@ -20,6 +20,7 @@ #include #include "common/status.h" +#include "io/fs/file_reader.h" #include "io/fs/packed_file_reader.h" #include "io/fs/packed_file_writer.h" @@ -69,18 +70,37 @@ Status PackedFileSystem::open_file_impl(const Path& file, FileReaderSPtr* reader // File is in packed file, open packed file and wrap with PackedFileReader const auto& index = it->second; FileReaderSPtr inner_reader; - // Create a new FileReaderOptions with the correct file size - FileReaderOptions local_opts = opts ? *opts : FileReaderOptions(); - // Set file_size to packed file size to avoid head object request - local_opts.file_size = index.packed_file_size; - LOG(INFO) << "open packed file: " << index.packed_file_path << ", file: " << file.native() - << ", offset: " << index.offset << ", size: " << index.size - << ", packed_file_size: " << index.packed_file_size; + + // Create options for opening the packed file + // Disable cache at this layer - we'll add cache wrapper around PackedFileReader instead + // This ensures cache key is based on segment path, not packed file path + FileReaderOptions inner_opts = opts ? *opts : FileReaderOptions(); + inner_opts.file_size = index.packed_file_size; + inner_opts.cache_type = FileCachePolicy::NO_CACHE; + + VLOG_DEBUG << "open packed file: " << index.packed_file_path << ", file: " << file.native() + << ", offset: " << index.offset << ", size: " << index.size + << ", packed_file_size: " << index.packed_file_size; RETURN_IF_ERROR( - _inner_fs->open_file(Path(index.packed_file_path), &inner_reader, &local_opts)); + _inner_fs->open_file(Path(index.packed_file_path), &inner_reader, &inner_opts)); + + // Create PackedFileReader with segment path + // PackedFileReader.path() returns segment path, not packed file path + auto packed_reader = std::make_shared(std::move(inner_reader), file, + index.offset, index.size); - *reader = std::make_shared(std::move(inner_reader), file, index.offset, - index.size); + // If cache is requested, wrap PackedFileReader with CachedRemoteFileReader + // This ensures: + // 1. Cache key = hash(segment_path.filename()) - matches cleanup key + // 2. Cache size = segment size - correct boundary + // 3. Each segment has independent cache entry - no interference during cleanup + if (opts && opts->cache_type != FileCachePolicy::NO_CACHE) { + FileReaderOptions cache_opts = *opts; + cache_opts.file_size = index.size; // Use segment size for cache + *reader = DORIS_TRY(create_cached_file_reader(packed_reader, cache_opts)); + } else { + *reader = packed_reader; + } } else { RETURN_IF_ERROR(_inner_fs->open_file(file, reader, opts)); } @@ -88,7 +108,7 @@ Status PackedFileSystem::open_file_impl(const Path& file, FileReaderSPtr* reader } Status PackedFileSystem::exists_impl(const Path& path, bool* res) const { - LOG(INFO) << "packed file system exist, rowset id " << _append_info.rowset_id; + VLOG_DEBUG << "packed file system exist, rowset id " << _append_info.rowset_id; if (!_index_map_initialized) { return Status::InternalError("PackedFileSystem index map is not initialized"); } diff --git a/be/test/io/fs/packed_file_concurrency_test.cpp b/be/test/io/fs/packed_file_concurrency_test.cpp index 06db472b95e24d..31c2db19fb8675 100644 --- a/be/test/io/fs/packed_file_concurrency_test.cpp +++ b/be/test/io/fs/packed_file_concurrency_test.cpp @@ -677,7 +677,10 @@ TEST_F(MergeFileConcurrencyTest, ConcurrentWriteReadCorrectness) { std::uniform_int_distribution read_size_dist(4 * 1024, 32 * 1024); for (int iter = 0; iter < kIterationPerThread; ++iter) { - std::string path = fmt::format("/tablet_{}/rowset_{}/file_{}", tid, iter, iter); + // Use unique file names to avoid cache key conflicts between threads + // since CachedRemoteFileReader uses path().filename() for cache hash + std::string path = + fmt::format("/tablet_{}/rowset_{}/file_t{}_i{}", tid, iter, tid, iter); PackedAppendContext append_info; append_info.resource_id = resource_ids[tid]; @@ -718,11 +721,13 @@ TEST_F(MergeFileConcurrencyTest, ConcurrentWriteReadCorrectness) { opts.cache_type = FileCachePolicy::FILE_BLOCK_CACHE; opts.is_doris_table = true; ASSERT_TRUE(reader_fs.open_file(Path(path), &reader, &opts).ok()); - auto* merge_reader = dynamic_cast(reader.get()); - ASSERT_NE(merge_reader, nullptr); - auto* cached_reader = - dynamic_cast(merge_reader->_inner_reader.get()); + // After the fix, CachedRemoteFileReader wraps PackedFileReader (not vice versa) + // This ensures cache key uses segment path for proper cleanup + auto* cached_reader = dynamic_cast(reader.get()); ASSERT_NE(cached_reader, nullptr); + auto* merge_reader = + dynamic_cast(cached_reader->get_remote_reader()); + ASSERT_NE(merge_reader, nullptr); IOContext io_ctx; size_t verified = 0;