diff --git a/be/src/io/fs/packed_file_manager.cpp b/be/src/io/fs/packed_file_manager.cpp index 3cf65f12e49540..53a445502c7df5 100644 --- a/be/src/io/fs/packed_file_manager.cpp +++ b/be/src/io/fs/packed_file_manager.cpp @@ -41,10 +41,15 @@ #include "cloud/config.h" #include "common/config.h" #include "gen_cpp/cloud.pb.h" +#include "io/cache/block_file_cache.h" +#include "io/cache/block_file_cache_factory.h" +#include "io/cache/file_block.h" +#include "io/cache/file_cache_common.h" #include "io/fs/packed_file_trailer.h" #include "olap/storage_engine.h" #include "runtime/exec_env.h" #include "util/coding.h" +#include "util/slice.h" #include "util/uid_util.h" namespace doris::io { @@ -75,6 +80,12 @@ bvar::Window g_packed_file_uploading_to_uploaded_ms_window( "packed_file_uploading_to_uploaded_ms", &g_packed_file_uploading_to_uploaded_ms_recorder, /*window_size=*/10); +// Metrics for async small file cache write +bvar::Adder g_packed_file_cache_async_write_count("packed_file_cache", + "async_write_count"); +bvar::Adder g_packed_file_cache_async_write_bytes("packed_file_cache", + "async_write_bytes"); + Status append_packed_info_trailer(FileWriter* writer, const std::string& packed_file_path, const cloud::PackedFileInfoPB& packed_file_info) { if (writer == nullptr) { @@ -108,6 +119,104 @@ Status append_packed_info_trailer(FileWriter* writer, const std::string& packed_ return writer->append(Slice(trailer)); } +// write small file data to file cache +void do_write_to_file_cache(const std::string& small_file_path, const std::string& data, + int64_t tablet_id) { + if (data.empty()) { + return; + } + + // Generate cache key from small file path (e.g., "rowset_id_seg_id.dat") + Path path(small_file_path); + UInt128Wrapper cache_hash = BlockFileCache::hash(path.filename().native()); + + VLOG_DEBUG << "packed_file_cache_write: path=" << small_file_path + << " filename=" << path.filename().native() << " hash=" << cache_hash.to_string() + << " size=" << data.size() << " tablet_id=" << tablet_id; + + BlockFileCache* file_cache = FileCacheFactory::instance()->get_by_path(cache_hash); + if (file_cache == nullptr) { + return; // Cache not available, skip + } + + // Allocate cache blocks + CacheContext ctx; + ctx.cache_type = FileCacheType::NORMAL; + ReadStatistics stats; + ctx.stats = &stats; + + FileBlocksHolder holder = file_cache->get_or_set(cache_hash, 0, data.size(), ctx); + + // Write data to cache blocks + size_t data_offset = 0; + for (auto& block : holder.file_blocks) { + if (data_offset >= data.size()) { + break; + } + size_t block_size = block->range().size(); + size_t write_size = std::min(block_size, data.size() - data_offset); + + if (block->state() == FileBlock::State::EMPTY) { + block->get_or_set_downloader(); + if (block->is_downloader()) { + Slice s(data.data() + data_offset, write_size); + Status st = block->append(s); + if (st.ok()) { + st = block->finalize(); + } + if (!st.ok()) { + LOG(WARNING) << "Write small file to cache failed: " << st.msg(); + } + } + } + data_offset += write_size; + } +} + +// Async wrapper: submit cache write task to thread pool +// The data is copied into the lambda capture to ensure its lifetime extends beyond +// the async task execution. The original Slice may reference a buffer that gets +// reused or freed before the async task runs. +void write_small_file_to_cache_async(const std::string& small_file_path, const Slice& data, + int64_t tablet_id) { + if (!config::enable_file_cache || data.size == 0) { + return; + } + + // Copy data since original buffer may be reused before async task executes + // For small files (< 1MB), copy overhead is acceptable + std::string data_copy(data.data, data.size); + size_t data_size = data.size; + + auto* thread_pool = ExecEnv::GetInstance()->s3_file_upload_thread_pool(); + if (thread_pool == nullptr) { + // Fallback to sync write if thread pool not available + do_write_to_file_cache(small_file_path, data_copy, tablet_id); + return; + } + + // Track async write count and bytes + g_packed_file_cache_async_write_count << 1; + g_packed_file_cache_async_write_bytes << static_cast(data_size); + + Status st = thread_pool->submit_func( + [path = small_file_path, data = std::move(data_copy), tablet_id, data_size]() { + do_write_to_file_cache(path, data, tablet_id); + // Decrement async write count after completion + g_packed_file_cache_async_write_count << -1; + g_packed_file_cache_async_write_bytes << -static_cast(data_size); + }); + + if (!st.ok()) { + // Revert metrics since task was not submitted + g_packed_file_cache_async_write_count << -1; + g_packed_file_cache_async_write_bytes << -static_cast(data_size); + LOG(WARNING) << "Failed to submit cache write task for " << small_file_path << ": " + << st.msg(); + // Don't block on failure, cache write is best-effort + } +} + } // namespace PackedFileManager* PackedFileManager::instance() { @@ -150,8 +259,11 @@ Status PackedFileManager::create_new_packed_file_context( // Create file writer for the packed file FileWriterPtr new_writer; FileWriterOptions opts; - // enable write file cache for packed file - opts.write_file_cache = true; + // Disable write_file_cache for packed file itself. + // We write file cache for each small file separately in append_small_file() + // using the small file path as cache key, ensuring cache entries can be + // properly cleaned up when stale rowsets are removed. + opts.write_file_cache = false; RETURN_IF_ERROR( packed_file_ctx->file_system->create_file(Path(relative_path), &new_writer, &opts)); packed_file_ctx->writer = std::move(new_writer); @@ -253,6 +365,11 @@ Status PackedFileManager::append_small_file(const std::string& path, const Slice // Write data to current packed file RETURN_IF_ERROR(active_state->writer->append(data)); + // Async write data to file cache using small file path as cache key. + // This ensures cache key matches the cleanup key in Rowset::clear_cache(), + // allowing proper cache cleanup when stale rowsets are removed. + write_small_file_to_cache_async(path, data, info.tablet_id); + // Update index PackedSliceLocation location; location.packed_file_path = active_state->packed_file_path; diff --git a/be/src/io/fs/packed_file_system.cpp b/be/src/io/fs/packed_file_system.cpp index dd5b136ba3b750..7ce027a94a2ddb 100644 --- a/be/src/io/fs/packed_file_system.cpp +++ b/be/src/io/fs/packed_file_system.cpp @@ -20,6 +20,7 @@ #include #include "common/status.h" +#include "io/fs/file_reader.h" #include "io/fs/packed_file_reader.h" #include "io/fs/packed_file_writer.h" @@ -69,18 +70,37 @@ Status PackedFileSystem::open_file_impl(const Path& file, FileReaderSPtr* reader // File is in packed file, open packed file and wrap with PackedFileReader const auto& index = it->second; FileReaderSPtr inner_reader; - // Create a new FileReaderOptions with the correct file size - FileReaderOptions local_opts = opts ? *opts : FileReaderOptions(); - // Set file_size to packed file size to avoid head object request - local_opts.file_size = index.packed_file_size; - LOG(INFO) << "open packed file: " << index.packed_file_path << ", file: " << file.native() - << ", offset: " << index.offset << ", size: " << index.size - << ", packed_file_size: " << index.packed_file_size; + + // Create options for opening the packed file + // Disable cache at this layer - we'll add cache wrapper around PackedFileReader instead + // This ensures cache key is based on segment path, not packed file path + FileReaderOptions inner_opts = opts ? *opts : FileReaderOptions(); + inner_opts.file_size = index.packed_file_size; + inner_opts.cache_type = FileCachePolicy::NO_CACHE; + + VLOG_DEBUG << "open packed file: " << index.packed_file_path << ", file: " << file.native() + << ", offset: " << index.offset << ", size: " << index.size + << ", packed_file_size: " << index.packed_file_size; RETURN_IF_ERROR( - _inner_fs->open_file(Path(index.packed_file_path), &inner_reader, &local_opts)); + _inner_fs->open_file(Path(index.packed_file_path), &inner_reader, &inner_opts)); + + // Create PackedFileReader with segment path + // PackedFileReader.path() returns segment path, not packed file path + auto packed_reader = std::make_shared(std::move(inner_reader), file, + index.offset, index.size); - *reader = std::make_shared(std::move(inner_reader), file, index.offset, - index.size); + // If cache is requested, wrap PackedFileReader with CachedRemoteFileReader + // This ensures: + // 1. Cache key = hash(segment_path.filename()) - matches cleanup key + // 2. Cache size = segment size - correct boundary + // 3. Each segment has independent cache entry - no interference during cleanup + if (opts && opts->cache_type != FileCachePolicy::NO_CACHE) { + FileReaderOptions cache_opts = *opts; + cache_opts.file_size = index.size; // Use segment size for cache + *reader = DORIS_TRY(create_cached_file_reader(packed_reader, cache_opts)); + } else { + *reader = packed_reader; + } } else { RETURN_IF_ERROR(_inner_fs->open_file(file, reader, opts)); } @@ -88,7 +108,7 @@ Status PackedFileSystem::open_file_impl(const Path& file, FileReaderSPtr* reader } Status PackedFileSystem::exists_impl(const Path& path, bool* res) const { - LOG(INFO) << "packed file system exist, rowset id " << _append_info.rowset_id; + VLOG_DEBUG << "packed file system exist, rowset id " << _append_info.rowset_id; if (!_index_map_initialized) { return Status::InternalError("PackedFileSystem index map is not initialized"); } diff --git a/be/test/io/fs/packed_file_concurrency_test.cpp b/be/test/io/fs/packed_file_concurrency_test.cpp index 06db472b95e24d..31c2db19fb8675 100644 --- a/be/test/io/fs/packed_file_concurrency_test.cpp +++ b/be/test/io/fs/packed_file_concurrency_test.cpp @@ -677,7 +677,10 @@ TEST_F(MergeFileConcurrencyTest, ConcurrentWriteReadCorrectness) { std::uniform_int_distribution read_size_dist(4 * 1024, 32 * 1024); for (int iter = 0; iter < kIterationPerThread; ++iter) { - std::string path = fmt::format("/tablet_{}/rowset_{}/file_{}", tid, iter, iter); + // Use unique file names to avoid cache key conflicts between threads + // since CachedRemoteFileReader uses path().filename() for cache hash + std::string path = + fmt::format("/tablet_{}/rowset_{}/file_t{}_i{}", tid, iter, tid, iter); PackedAppendContext append_info; append_info.resource_id = resource_ids[tid]; @@ -718,11 +721,13 @@ TEST_F(MergeFileConcurrencyTest, ConcurrentWriteReadCorrectness) { opts.cache_type = FileCachePolicy::FILE_BLOCK_CACHE; opts.is_doris_table = true; ASSERT_TRUE(reader_fs.open_file(Path(path), &reader, &opts).ok()); - auto* merge_reader = dynamic_cast(reader.get()); - ASSERT_NE(merge_reader, nullptr); - auto* cached_reader = - dynamic_cast(merge_reader->_inner_reader.get()); + // After the fix, CachedRemoteFileReader wraps PackedFileReader (not vice versa) + // This ensures cache key uses segment path for proper cleanup + auto* cached_reader = dynamic_cast(reader.get()); ASSERT_NE(cached_reader, nullptr); + auto* merge_reader = + dynamic_cast(cached_reader->get_remote_reader()); + ASSERT_NE(merge_reader, nullptr); IOContext io_ctx; size_t verified = 0;