Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 119 additions & 2 deletions be/src/io/fs/packed_file_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,15 @@
#include "cloud/config.h"
#include "common/config.h"
#include "gen_cpp/cloud.pb.h"
#include "io/cache/block_file_cache.h"
#include "io/cache/block_file_cache_factory.h"
#include "io/cache/file_block.h"
#include "io/cache/file_cache_common.h"
#include "io/fs/packed_file_trailer.h"
#include "olap/storage_engine.h"
#include "runtime/exec_env.h"
#include "util/coding.h"
#include "util/slice.h"
#include "util/uid_util.h"

namespace doris::io {
Expand Down Expand Up @@ -75,6 +80,12 @@ bvar::Window<bvar::IntRecorder> g_packed_file_uploading_to_uploaded_ms_window(
"packed_file_uploading_to_uploaded_ms", &g_packed_file_uploading_to_uploaded_ms_recorder,
/*window_size=*/10);

// Metrics for async small file cache write
bvar::Adder<int64_t> g_packed_file_cache_async_write_count("packed_file_cache",
"async_write_count");
bvar::Adder<int64_t> g_packed_file_cache_async_write_bytes("packed_file_cache",
"async_write_bytes");

Status append_packed_info_trailer(FileWriter* writer, const std::string& packed_file_path,
const cloud::PackedFileInfoPB& packed_file_info) {
if (writer == nullptr) {
Expand Down Expand Up @@ -108,6 +119,104 @@ Status append_packed_info_trailer(FileWriter* writer, const std::string& packed_
return writer->append(Slice(trailer));
}

// write small file data to file cache
void do_write_to_file_cache(const std::string& small_file_path, const std::string& data,
int64_t tablet_id) {
if (data.empty()) {
return;
}

// Generate cache key from small file path (e.g., "rowset_id_seg_id.dat")
Path path(small_file_path);
UInt128Wrapper cache_hash = BlockFileCache::hash(path.filename().native());

VLOG_DEBUG << "packed_file_cache_write: path=" << small_file_path
<< " filename=" << path.filename().native() << " hash=" << cache_hash.to_string()
<< " size=" << data.size() << " tablet_id=" << tablet_id;

BlockFileCache* file_cache = FileCacheFactory::instance()->get_by_path(cache_hash);
if (file_cache == nullptr) {
return; // Cache not available, skip
}

// Allocate cache blocks
CacheContext ctx;
ctx.cache_type = FileCacheType::NORMAL;
ReadStatistics stats;
ctx.stats = &stats;

FileBlocksHolder holder = file_cache->get_or_set(cache_hash, 0, data.size(), ctx);

// Write data to cache blocks
size_t data_offset = 0;
for (auto& block : holder.file_blocks) {
if (data_offset >= data.size()) {
break;
}
size_t block_size = block->range().size();
size_t write_size = std::min(block_size, data.size() - data_offset);

if (block->state() == FileBlock::State::EMPTY) {
block->get_or_set_downloader();
if (block->is_downloader()) {
Slice s(data.data() + data_offset, write_size);
Status st = block->append(s);
if (st.ok()) {
st = block->finalize();
}
if (!st.ok()) {
LOG(WARNING) << "Write small file to cache failed: " << st.msg();
}
}
}
data_offset += write_size;
}
}

// Async wrapper: submit cache write task to thread pool
// The data is copied into the lambda capture to ensure its lifetime extends beyond
// the async task execution. The original Slice may reference a buffer that gets
// reused or freed before the async task runs.
void write_small_file_to_cache_async(const std::string& small_file_path, const Slice& data,
int64_t tablet_id) {
if (!config::enable_file_cache || data.size == 0) {
return;
}

// Copy data since original buffer may be reused before async task executes
// For small files (< 1MB), copy overhead is acceptable
std::string data_copy(data.data, data.size);
size_t data_size = data.size;

auto* thread_pool = ExecEnv::GetInstance()->s3_file_upload_thread_pool();
if (thread_pool == nullptr) {
// Fallback to sync write if thread pool not available
do_write_to_file_cache(small_file_path, data_copy, tablet_id);
return;
}

// Track async write count and bytes
g_packed_file_cache_async_write_count << 1;
g_packed_file_cache_async_write_bytes << static_cast<int64_t>(data_size);

Status st = thread_pool->submit_func(
[path = small_file_path, data = std::move(data_copy), tablet_id, data_size]() {
do_write_to_file_cache(path, data, tablet_id);
// Decrement async write count after completion
g_packed_file_cache_async_write_count << -1;
g_packed_file_cache_async_write_bytes << -static_cast<int64_t>(data_size);
});

if (!st.ok()) {
// Revert metrics since task was not submitted
g_packed_file_cache_async_write_count << -1;
g_packed_file_cache_async_write_bytes << -static_cast<int64_t>(data_size);
LOG(WARNING) << "Failed to submit cache write task for " << small_file_path << ": "
<< st.msg();
// Don't block on failure, cache write is best-effort
}
}

} // namespace

PackedFileManager* PackedFileManager::instance() {
Expand Down Expand Up @@ -150,8 +259,11 @@ Status PackedFileManager::create_new_packed_file_context(
// Create file writer for the packed file
FileWriterPtr new_writer;
FileWriterOptions opts;
// enable write file cache for packed file
opts.write_file_cache = true;
// Disable write_file_cache for packed file itself.
// We write file cache for each small file separately in append_small_file()
// using the small file path as cache key, ensuring cache entries can be
// properly cleaned up when stale rowsets are removed.
opts.write_file_cache = false;
RETURN_IF_ERROR(
packed_file_ctx->file_system->create_file(Path(relative_path), &new_writer, &opts));
packed_file_ctx->writer = std::move(new_writer);
Expand Down Expand Up @@ -253,6 +365,11 @@ Status PackedFileManager::append_small_file(const std::string& path, const Slice
// Write data to current packed file
RETURN_IF_ERROR(active_state->writer->append(data));

// Async write data to file cache using small file path as cache key.
// This ensures cache key matches the cleanup key in Rowset::clear_cache(),
// allowing proper cache cleanup when stale rowsets are removed.
write_small_file_to_cache_async(path, data, info.tablet_id);

// Update index
PackedSliceLocation location;
location.packed_file_path = active_state->packed_file_path;
Expand Down
42 changes: 31 additions & 11 deletions be/src/io/fs/packed_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <utility>

#include "common/status.h"
#include "io/fs/file_reader.h"
#include "io/fs/packed_file_reader.h"
#include "io/fs/packed_file_writer.h"

Expand Down Expand Up @@ -69,26 +70,45 @@ Status PackedFileSystem::open_file_impl(const Path& file, FileReaderSPtr* reader
// File is in packed file, open packed file and wrap with PackedFileReader
const auto& index = it->second;
FileReaderSPtr inner_reader;
// Create a new FileReaderOptions with the correct file size
FileReaderOptions local_opts = opts ? *opts : FileReaderOptions();
// Set file_size to packed file size to avoid head object request
local_opts.file_size = index.packed_file_size;
LOG(INFO) << "open packed file: " << index.packed_file_path << ", file: " << file.native()
<< ", offset: " << index.offset << ", size: " << index.size
<< ", packed_file_size: " << index.packed_file_size;

// Create options for opening the packed file
// Disable cache at this layer - we'll add cache wrapper around PackedFileReader instead
// This ensures cache key is based on segment path, not packed file path
FileReaderOptions inner_opts = opts ? *opts : FileReaderOptions();
inner_opts.file_size = index.packed_file_size;
inner_opts.cache_type = FileCachePolicy::NO_CACHE;

VLOG_DEBUG << "open packed file: " << index.packed_file_path << ", file: " << file.native()
<< ", offset: " << index.offset << ", size: " << index.size
<< ", packed_file_size: " << index.packed_file_size;
RETURN_IF_ERROR(
_inner_fs->open_file(Path(index.packed_file_path), &inner_reader, &local_opts));
_inner_fs->open_file(Path(index.packed_file_path), &inner_reader, &inner_opts));

// Create PackedFileReader with segment path
// PackedFileReader.path() returns segment path, not packed file path
auto packed_reader = std::make_shared<PackedFileReader>(std::move(inner_reader), file,
index.offset, index.size);

*reader = std::make_shared<PackedFileReader>(std::move(inner_reader), file, index.offset,
index.size);
// If cache is requested, wrap PackedFileReader with CachedRemoteFileReader
// This ensures:
// 1. Cache key = hash(segment_path.filename()) - matches cleanup key
// 2. Cache size = segment size - correct boundary
// 3. Each segment has independent cache entry - no interference during cleanup
if (opts && opts->cache_type != FileCachePolicy::NO_CACHE) {
FileReaderOptions cache_opts = *opts;
cache_opts.file_size = index.size; // Use segment size for cache
*reader = DORIS_TRY(create_cached_file_reader(packed_reader, cache_opts));
} else {
*reader = packed_reader;
}
} else {
RETURN_IF_ERROR(_inner_fs->open_file(file, reader, opts));
}
return Status::OK();
}

Status PackedFileSystem::exists_impl(const Path& path, bool* res) const {
LOG(INFO) << "packed file system exist, rowset id " << _append_info.rowset_id;
VLOG_DEBUG << "packed file system exist, rowset id " << _append_info.rowset_id;
if (!_index_map_initialized) {
return Status::InternalError("PackedFileSystem index map is not initialized");
}
Expand Down
15 changes: 10 additions & 5 deletions be/test/io/fs/packed_file_concurrency_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,10 @@ TEST_F(MergeFileConcurrencyTest, ConcurrentWriteReadCorrectness) {
std::uniform_int_distribution<int> read_size_dist(4 * 1024, 32 * 1024);

for (int iter = 0; iter < kIterationPerThread; ++iter) {
std::string path = fmt::format("/tablet_{}/rowset_{}/file_{}", tid, iter, iter);
// Use unique file names to avoid cache key conflicts between threads
// since CachedRemoteFileReader uses path().filename() for cache hash
std::string path =
fmt::format("/tablet_{}/rowset_{}/file_t{}_i{}", tid, iter, tid, iter);

PackedAppendContext append_info;
append_info.resource_id = resource_ids[tid];
Expand Down Expand Up @@ -718,11 +721,13 @@ TEST_F(MergeFileConcurrencyTest, ConcurrentWriteReadCorrectness) {
opts.cache_type = FileCachePolicy::FILE_BLOCK_CACHE;
opts.is_doris_table = true;
ASSERT_TRUE(reader_fs.open_file(Path(path), &reader, &opts).ok());
auto* merge_reader = dynamic_cast<PackedFileReader*>(reader.get());
ASSERT_NE(merge_reader, nullptr);
auto* cached_reader =
dynamic_cast<CachedRemoteFileReader*>(merge_reader->_inner_reader.get());
// After the fix, CachedRemoteFileReader wraps PackedFileReader (not vice versa)
// This ensures cache key uses segment path for proper cleanup
auto* cached_reader = dynamic_cast<CachedRemoteFileReader*>(reader.get());
ASSERT_NE(cached_reader, nullptr);
auto* merge_reader =
dynamic_cast<PackedFileReader*>(cached_reader->get_remote_reader());
ASSERT_NE(merge_reader, nullptr);

IOContext io_ctx;
size_t verified = 0;
Expand Down
Loading