From c06494c46d669e9c2e54e4042a60da2a8b8b14cd Mon Sep 17 00:00:00 2001 From: jiuker Date: Fri, 5 Jun 2026 17:37:38 +0800 Subject: [PATCH 1/2] Add bounded multipart upload parallelism Add bounded multipart upload parallelism --- include/miniocpp/args.h | 1 + src/args.cc | 3 + src/client.cc | 474 +++++++++++++++++++++++++++++----------- tests/tests.cc | 64 ++++++ 4 files changed, 415 insertions(+), 127 deletions(-) diff --git a/include/miniocpp/args.h b/include/miniocpp/args.h index 99a5c3a..2212992 100644 --- a/include/miniocpp/args.h +++ b/include/miniocpp/args.h @@ -340,6 +340,7 @@ struct PutObjectArgs : public PutObjectBaseArgs { std::istream* stream = nullptr; char* buf = nullptr; std::optional size; + unsigned int max_inflight_parts = 1; http::ProgressFunction progressfunc = nullptr; void* progress_userdata = nullptr; #ifdef MINIO_CPP_RDMA diff --git a/src/args.cc b/src/args.cc index cf2af4a..b66b453 100644 --- a/src/args.cc +++ b/src/args.cc @@ -308,6 +308,9 @@ PutObjectArgs::PutObjectArgs(std::istream& istream, long object_size, error::Error PutObjectArgs::Validate() { if (error::Error err = ObjectArgs::Validate()) return err; + if (max_inflight_parts == 0) { + return error::Error("max_inflight_parts must be greater than 0"); + } const bool has_stream = (stream != nullptr); const bool has_buf = (buf != nullptr); if (has_stream == has_buf) { diff --git a/src/client.cc b/src/client.cc index ebbc393..8185e1d 100644 --- a/src/client.cc +++ b/src/client.cc @@ -24,14 +24,17 @@ #endif #include +#include #include #include +#include #include #include #include #include #include #include +#include #include "miniocpp/args.h" #include "miniocpp/baseclient.h" @@ -74,6 +77,17 @@ struct AlignedBuffer { explicit AlignedBuffer(void* p) : ptr(p) {} AlignedBuffer(const AlignedBuffer&) = delete; AlignedBuffer& operator=(const AlignedBuffer&) = delete; + AlignedBuffer(AlignedBuffer&& other) noexcept : ptr(other.ptr) { + other.ptr = nullptr; + } + AlignedBuffer& operator=(AlignedBuffer&& other) noexcept { + if (this != &other) { + if (ptr) AlignedFree(ptr); + ptr = other.ptr; + other.ptr = nullptr; + } + return *this; + } ~AlignedBuffer() { if (ptr) AlignedFree(ptr); } @@ -547,160 +561,225 @@ PutObjectResponse Client::PutObject(PutObjectArgs args, std::string& upload_id, size_t uploaded_size = 0; unsigned int part_number = 0; std::string one_byte; - bool stop = false; std::list parts; long part_count = args.part_count; double uploaded_bytes = 0; // for progress double upload_speed = -1; // for progress + size_t max_inflight_parts = static_cast(args.max_inflight_parts); - while (!stop) { - part_number++; + if (max_inflight_parts <= 1) { + bool stop = false; + while (!stop) { + part_number++; - size_t bytes_read = 0; - if (part_count > 0) { - if (part_number == part_count) { - part_size = object_size - uploaded_size; - stop = true; - } + size_t bytes_read = 0; + if (part_count > 0) { + if (part_number == part_count) { + part_size = object_size - uploaded_size; + stop = true; + } - if (error::Error err = - utils::ReadPart(*args.stream, buf, part_size, bytes_read)) { - return PutObjectResponse(err); - } + if (error::Error err = + utils::ReadPart(*args.stream, buf, part_size, bytes_read)) { + return PutObjectResponse(err); + } - if (bytes_read != part_size) { - return error::make( - "not enough data in the stream; expected: " + - std::to_string(part_size) + ", got: " + std::to_string(bytes_read) + - " bytes"); - } - } else { - char* b = buf; - size_t size = part_size + 1; - - if (!one_byte.empty()) { - buf[0] = one_byte.front(); - b = buf + 1; - size--; - bytes_read = 1; - one_byte = ""; - } + if (bytes_read != part_size) { + return error::make( + "not enough data in the stream; expected: " + + std::to_string(part_size) + + ", got: " + std::to_string(bytes_read) + " bytes"); + } + } else { + char* b = buf; + size_t size = part_size + 1; + + if (!one_byte.empty()) { + buf[0] = one_byte.front(); + b = buf + 1; + size--; + bytes_read = 1; + one_byte = ""; + } - size_t n = 0; - if (error::Error err = utils::ReadPart(*args.stream, b, size, n)) { - return PutObjectResponse(err); - } + size_t n = 0; + if (error::Error err = utils::ReadPart(*args.stream, b, size, n)) { + return PutObjectResponse(err); + } - bytes_read += n; + bytes_read += n; - // If bytes read is less than or equals to part size, then we have reached - // last part. - if (bytes_read <= part_size) { - part_count = part_number; - part_size = bytes_read; - stop = true; - } else { - one_byte = buf[part_size + 1]; + // If bytes read is less than or equals to part size, then we have + // reached last part. + if (bytes_read <= part_size) { + part_count = part_number; + part_size = bytes_read; + stop = true; + } else { + one_byte = buf[part_size]; + } } - } - std::string_view data(buf, part_size); + std::string_view data(buf, part_size); - uploaded_size += part_size; + uploaded_size += part_size; - if (part_count == 1) { - PutObjectApiArgs api_args; - api_args.extra_query_params = args.extra_query_params; - api_args.bucket = args.bucket; - api_args.region = args.region; - api_args.object = args.object; - api_args.data = data; - api_args.buf = buf; - api_args.size = part_size; - api_args.progressfunc = args.progressfunc; - api_args.progress_userdata = args.progress_userdata; - api_args.headers = headers; + if (part_count == 1) { + PutObjectApiArgs api_args; + api_args.extra_query_params = args.extra_query_params; + api_args.bucket = args.bucket; + api_args.region = args.region; + api_args.object = args.object; + api_args.data = data; + api_args.buf = buf; + api_args.size = part_size; + api_args.progressfunc = args.progressfunc; + api_args.progress_userdata = args.progress_userdata; + api_args.headers = headers; - return BaseClient::PutObject(api_args); - } + return BaseClient::PutObject(api_args); + } - if (upload_id.empty()) { - CreateMultipartUploadArgs cmu_args; - cmu_args.extra_query_params = args.extra_query_params; - cmu_args.bucket = args.bucket; - cmu_args.region = args.region; - cmu_args.object = args.object; - cmu_args.headers = headers; + if (upload_id.empty()) { + CreateMultipartUploadArgs cmu_args; + cmu_args.extra_query_params = args.extra_query_params; + cmu_args.bucket = args.bucket; + cmu_args.region = args.region; + cmu_args.object = args.object; + cmu_args.headers = headers; #ifdef MINIO_CPP_RDMA - // Declare CRC64NVME so the server enforces per-part integrity on the - // RDMA UploadPart path (server returns 501 if checksum is missing when - // an algorithm was declared on Create). - cmu_args.headers.Add("x-amz-checksum-algorithm", "CRC64NVME"); + cmu_args.headers.Add("x-amz-checksum-algorithm", "CRC64NVME"); #endif - if (CreateMultipartUploadResponse resp = - CreateMultipartUpload(cmu_args)) { - upload_id = resp.upload_id; - } else { - return PutObjectResponse(resp); + if (CreateMultipartUploadResponse resp = + CreateMultipartUpload(cmu_args)) { + upload_id = resp.upload_id; + } else { + return PutObjectResponse(resp); + } } - } - UploadPartArgs up_args; - up_args.bucket = args.bucket; - up_args.region = args.region; - up_args.object = args.object; - up_args.upload_id = upload_id; - up_args.part_number = part_number; - up_args.data = data; - up_args.buf = buf; - up_args.part_size = part_size; + UploadPartArgs up_args; + up_args.bucket = args.bucket; + up_args.region = args.region; + up_args.object = args.object; + up_args.upload_id = upload_id; + up_args.part_number = part_number; + up_args.data = data; + up_args.buf = buf; + up_args.part_size = part_size; #ifdef MINIO_CPP_RDMA - up_args.rdmaclient = args.rdmaclient; - if (buf != nullptr && - cuObjClient::getMemoryType(buf) == CUOBJ_MEMORY_SYSTEM) { - const std::string crc = utils::Crc64NvmeBase64(buf, part_size); - up_args.checksum_crc64nvme = crc; - up_args.headers.Add("x-amz-checksum-crc64nvme", crc); - } + up_args.rdmaclient = args.rdmaclient; + if (buf != nullptr && + cuObjClient::getMemoryType(buf) == CUOBJ_MEMORY_SYSTEM) { + const std::string crc = utils::Crc64NvmeBase64(buf, part_size); + up_args.checksum_crc64nvme = crc; + up_args.headers.Add("x-amz-checksum-crc64nvme", crc); + } #endif - if (args.progressfunc != nullptr) { - up_args.progressfunc = - [&object_size = object_size, &uploaded_bytes = uploaded_bytes, - &upload_speed = upload_speed, &progressfunc = args.progressfunc, - &progress_userdata = args.progress_userdata]( - http::ProgressFunctionArgs args) -> bool { - if (args.upload_speed > 0) { - if (upload_speed == -1) { - upload_speed = args.upload_speed; - } else { - upload_speed = (upload_speed + args.upload_speed) / 2; + if (args.progressfunc != nullptr) { + up_args.progressfunc = + [&object_size = object_size, &uploaded_bytes = uploaded_bytes, + &upload_speed = upload_speed, &progressfunc = args.progressfunc, + &progress_userdata = args.progress_userdata]( + http::ProgressFunctionArgs args) -> bool { + if (args.upload_speed > 0) { + if (upload_speed == -1) { + upload_speed = args.upload_speed; + } else { + upload_speed = (upload_speed + args.upload_speed) / 2; + } + return true; } - return true; + + http::ProgressFunctionArgs actual_args; + actual_args.upload_total_bytes = static_cast(object_size); + actual_args.uploaded_bytes = uploaded_bytes + args.uploaded_bytes; + actual_args.userdata = progress_userdata; + return progressfunc(actual_args); + }; + } + if (headers.Contains("x-amz-content-sha256")) { + up_args.headers.Add("x-amz-content-sha256", + headers.GetFront("x-amz-content-sha256")); + } + if (args.sse != nullptr) { + if (SseCustomerKey* ssec = dynamic_cast(args.sse)) { + up_args.headers.AddAll(ssec->Headers()); } + } - http::ProgressFunctionArgs actual_args; - actual_args.upload_total_bytes = static_cast(object_size); - actual_args.uploaded_bytes = uploaded_bytes + args.uploaded_bytes; - actual_args.userdata = progress_userdata; - return progressfunc(actual_args); - }; + if (UploadPartResponse resp = UploadPart(up_args)) { + if (args.progressfunc != nullptr) { + uploaded_bytes += static_cast(data.length()); + http::ProgressFunctionArgs actual_args; + actual_args.upload_total_bytes = static_cast(object_size); + actual_args.uploaded_bytes = uploaded_bytes; + actual_args.userdata = args.progress_userdata; + if (!args.progressfunc(actual_args)) { + return UploadPartResponse( + error::Error("aborted by progress function")); + } + } + parts.push_back(Part(part_number, std::move(resp.etag), + std::move(up_args.checksum_crc64nvme))); + } else { + return resp; + } } - // Propagate caller-supplied x-amz-content-sha256 (e.g. UNSIGNED-PAYLOAD - // for GPU-resident buffers) into each UploadPart so the per-part signing - // path also skips hashing the body. - if (headers.Contains("x-amz-content-sha256")) { - up_args.headers.Add("x-amz-content-sha256", - headers.GetFront("x-amz-content-sha256")); + } else { + std::vector extra_buffers; + std::vector buffers; + buffers.reserve(max_inflight_parts); + buffers.push_back(buf); + for (size_t i = 1; i < max_inflight_parts; ++i) { + void* raw = nullptr; + if (AlignedAlloc( + &raw, GetPageSize(), + (args.part_count > 0) ? args.part_size : args.part_size + 1)) { + return error::make( + "unable to allocate system memory with alignment"); + } + extra_buffers.emplace_back(raw); + buffers.push_back(static_cast(raw)); } - if (args.sse != nullptr) { - if (SseCustomerKey* ssec = dynamic_cast(args.sse)) { - up_args.headers.AddAll(ssec->Headers()); + +#ifdef MINIO_CPP_RDMA + std::vector rdma_regs; + if (args.rdmaclient != nullptr) { + for (size_t i = 1; i < max_inflight_parts; ++i) { + char* extra_buf = buffers[i]; + if (args.rdmaclient->cuMemObjGetDescriptor(extra_buf, + args.part_size) == 0) { + rdma_regs.emplace_back(args.rdmaclient, extra_buf); + } } } +#endif - if (UploadPartResponse resp = UploadPart(up_args)) { + struct InflightUpload { + unsigned int part_number = 0; + size_t bytes = 0; + std::string checksum_crc64nvme; + std::future future; + }; + + std::deque inflight; + bool stop = false; + size_t produced_parts = 0; + + auto drain_one = [&]() -> PutObjectResponse { + InflightUpload inflight_upload = std::move(inflight.front()); + inflight.pop_front(); + + UploadPartResponse resp = inflight_upload.future.get(); + if (!resp) { + return resp; + } + + parts.push_back(Part(inflight_upload.part_number, std::move(resp.etag), + std::move(inflight_upload.checksum_crc64nvme))); if (args.progressfunc != nullptr) { - uploaded_bytes += static_cast(data.length()); + uploaded_bytes += static_cast(inflight_upload.bytes); http::ProgressFunctionArgs actual_args; actual_args.upload_total_bytes = static_cast(object_size); actual_args.uploaded_bytes = uploaded_bytes; @@ -710,11 +789,152 @@ PutObjectResponse Client::PutObject(PutObjectArgs args, std::string& upload_id, error::Error("aborted by progress function")); } } - // HTTP fallback leaves resp.checksum_crc64nvme empty; use the local CRC. - parts.push_back(Part(part_number, std::move(resp.etag), - std::move(up_args.checksum_crc64nvme))); - } else { - return resp; + + return PutObjectResponse(); + }; + + while (!stop) { + if (inflight.size() == max_inflight_parts) { + PutObjectResponse drain_resp = drain_one(); + if (!drain_resp) { + return drain_resp; + } + } + + char* part_buf = buffers[produced_parts % max_inflight_parts]; + part_number++; + produced_parts++; + + size_t current_part_size = part_size; + size_t bytes_read = 0; + if (part_count > 0) { + if (part_number == part_count) { + current_part_size = object_size - uploaded_size; + stop = true; + } + + if (error::Error err = utils::ReadPart(*args.stream, part_buf, + current_part_size, bytes_read)) { + return PutObjectResponse(err); + } + + if (bytes_read != current_part_size) { + return error::make( + "not enough data in the stream; expected: " + + std::to_string(current_part_size) + + ", got: " + std::to_string(bytes_read) + " bytes"); + } + } else { + char* b = part_buf; + size_t size = part_size + 1; + + if (!one_byte.empty()) { + part_buf[0] = one_byte.front(); + b = part_buf + 1; + size--; + bytes_read = 1; + one_byte = ""; + } + + size_t n = 0; + if (error::Error err = utils::ReadPart(*args.stream, b, size, n)) { + return PutObjectResponse(err); + } + + bytes_read += n; + + if (bytes_read <= part_size) { + part_count = part_number; + current_part_size = bytes_read; + stop = true; + } else { + one_byte = part_buf[part_size]; + } + } + + uploaded_size += current_part_size; + + std::string_view data(part_buf, current_part_size); + if (part_count == 1) { + PutObjectApiArgs api_args; + api_args.extra_query_params = args.extra_query_params; + api_args.bucket = args.bucket; + api_args.region = args.region; + api_args.object = args.object; + api_args.data = data; + api_args.buf = part_buf; + api_args.size = current_part_size; + api_args.progressfunc = args.progressfunc; + api_args.progress_userdata = args.progress_userdata; + api_args.headers = headers; + + return BaseClient::PutObject(api_args); + } + + if (upload_id.empty()) { + CreateMultipartUploadArgs cmu_args; + cmu_args.extra_query_params = args.extra_query_params; + cmu_args.bucket = args.bucket; + cmu_args.region = args.region; + cmu_args.object = args.object; + cmu_args.headers = headers; +#ifdef MINIO_CPP_RDMA + cmu_args.headers.Add("x-amz-checksum-algorithm", "CRC64NVME"); +#endif + if (CreateMultipartUploadResponse resp = + CreateMultipartUpload(cmu_args)) { + upload_id = resp.upload_id; + } else { + return PutObjectResponse(resp); + } + } + + UploadPartArgs up_args; + up_args.bucket = args.bucket; + up_args.region = args.region; + up_args.object = args.object; + up_args.upload_id = upload_id; + up_args.part_number = part_number; + up_args.data = data; + up_args.buf = part_buf; + up_args.part_size = current_part_size; + if (headers.Contains("x-amz-content-sha256")) { + up_args.headers.Add("x-amz-content-sha256", + headers.GetFront("x-amz-content-sha256")); + } + if (args.sse != nullptr) { + if (SseCustomerKey* ssec = dynamic_cast(args.sse)) { + up_args.headers.AddAll(ssec->Headers()); + } + } +#ifdef MINIO_CPP_RDMA + up_args.rdmaclient = args.rdmaclient; + if (part_buf != nullptr && + cuObjClient::getMemoryType(part_buf) == CUOBJ_MEMORY_SYSTEM) { + const std::string crc = + utils::Crc64NvmeBase64(part_buf, current_part_size); + up_args.checksum_crc64nvme = crc; + up_args.headers.Add("x-amz-checksum-crc64nvme", crc); + } +#endif + + InflightUpload inflight_upload; + inflight_upload.part_number = part_number; + inflight_upload.bytes = current_part_size; + inflight_upload.checksum_crc64nvme = up_args.checksum_crc64nvme; + inflight_upload.future = std::async( + std::launch::async, + [this, up_args = std::move(up_args)]() mutable -> UploadPartResponse { + return UploadPart(up_args); + }); + inflight.push_back(std::move(inflight_upload)); + } + + while (!inflight.empty()) { + PutObjectResponse drain_resp = drain_one(); + if (!drain_resp) { + return drain_resp; + } } } diff --git a/tests/tests.cc b/tests/tests.cc index e1d3866..0d5309d 100644 --- a/tests/tests.cc +++ b/tests/tests.cc @@ -705,6 +705,69 @@ class Tests { throw; } } + void PutObjectWithInflight() { + std::cout << "PutObjectWithInflight()" << std::endl; + + // 10MB data -> auto-calc ~5MiB parts = 2 parts, exercising multipart path + const size_t data_size = 10 * 1024 * 1024; + std::string object_name = RandObjectName(); + std::string original_data = RandomString(charset, data_size); + std::string original_md5 = minio::utils::Md5sumHash(original_data); + + const unsigned int inflight_values[] = {1, 2, 4}; + for (auto inflight : inflight_values) { + std::stringstream ss(original_data); + minio::s3::PutObjectArgs args(ss, + static_cast(data_size), 0); + args.bucket = bucket_name_; + args.object = object_name; + args.max_inflight_parts = inflight; + + minio::s3::PutObjectResponse resp = client_.PutObject(args); + if (!resp) { + throw std::runtime_error( + "PutObjectWithInflight(max_inflight_parts=" + + std::to_string(inflight) + "): " + resp.Error().String()); + } + + try { + std::string downloaded_data; + minio::s3::GetObjectArgs get_args; + get_args.bucket = bucket_name_; + get_args.object = object_name; + get_args.datafunc = + [&downloaded_data]( + minio::http::DataFunctionArgs args) -> bool { + downloaded_data += args.datachunk; + return true; + }; + + minio::s3::GetObjectResponse get_resp = + client_.GetObject(get_args); + if (!get_resp) { + throw std::runtime_error( + "PutObjectWithInflight(max_inflight_parts=" + + std::to_string(inflight) + + "): GetObject failed: " + get_resp.Error().String()); + } + + std::string downloaded_md5 = + minio::utils::Md5sumHash(downloaded_data); + if (original_md5 != downloaded_md5) { + throw std::runtime_error( + "PutObjectWithInflight(max_inflight_parts=" + + std::to_string(inflight) + "): MD5 mismatch; " + + "original: " + original_md5 + + ", downloaded: " + downloaded_md5); + } + + RemoveObject(bucket_name_, object_name); + } catch (const std::runtime_error&) { + RemoveObject(bucket_name_, object_name); + throw; + } + } + } }; // class Tests int main(int /*argc*/, char* /*argv*/[]) { @@ -762,6 +825,7 @@ int main(int /*argc*/, char* /*argv*/[]) { tests.RemoveObjects(); tests.SelectObjectContent(); tests.ListenBucketNotification(); + tests.PutObjectWithInflight(); return EXIT_SUCCESS; } From 23b0b43e5364ad480da5113b1a12a339b8e425d7 Mon Sep 17 00:00:00 2001 From: jiuker Date: Fri, 5 Jun 2026 18:39:40 +0800 Subject: [PATCH 2/2] lint lint --- src/client.cc | 4 ++-- tests/tests.cc | 21 ++++++++------------- 2 files changed, 10 insertions(+), 15 deletions(-) diff --git a/src/client.cc b/src/client.cc index 8185e1d..238f883 100644 --- a/src/client.cc +++ b/src/client.cc @@ -748,8 +748,8 @@ PutObjectResponse Client::PutObject(PutObjectArgs args, std::string& upload_id, if (args.rdmaclient != nullptr) { for (size_t i = 1; i < max_inflight_parts; ++i) { char* extra_buf = buffers[i]; - if (args.rdmaclient->cuMemObjGetDescriptor(extra_buf, - args.part_size) == 0) { + if (args.rdmaclient->cuMemObjGetDescriptor(extra_buf, args.part_size) == + 0) { rdma_regs.emplace_back(args.rdmaclient, extra_buf); } } diff --git a/tests/tests.cc b/tests/tests.cc index 0d5309d..1fb10d8 100644 --- a/tests/tests.cc +++ b/tests/tests.cc @@ -717,17 +717,16 @@ class Tests { const unsigned int inflight_values[] = {1, 2, 4}; for (auto inflight : inflight_values) { std::stringstream ss(original_data); - minio::s3::PutObjectArgs args(ss, - static_cast(data_size), 0); + minio::s3::PutObjectArgs args(ss, static_cast(data_size), 0); args.bucket = bucket_name_; args.object = object_name; args.max_inflight_parts = inflight; minio::s3::PutObjectResponse resp = client_.PutObject(args); if (!resp) { - throw std::runtime_error( - "PutObjectWithInflight(max_inflight_parts=" + - std::to_string(inflight) + "): " + resp.Error().String()); + throw std::runtime_error("PutObjectWithInflight(max_inflight_parts=" + + std::to_string(inflight) + + "): " + resp.Error().String()); } try { @@ -736,14 +735,12 @@ class Tests { get_args.bucket = bucket_name_; get_args.object = object_name; get_args.datafunc = - [&downloaded_data]( - minio::http::DataFunctionArgs args) -> bool { + [&downloaded_data](minio::http::DataFunctionArgs args) -> bool { downloaded_data += args.datachunk; return true; }; - minio::s3::GetObjectResponse get_resp = - client_.GetObject(get_args); + minio::s3::GetObjectResponse get_resp = client_.GetObject(get_args); if (!get_resp) { throw std::runtime_error( "PutObjectWithInflight(max_inflight_parts=" + @@ -751,14 +748,12 @@ class Tests { "): GetObject failed: " + get_resp.Error().String()); } - std::string downloaded_md5 = - minio::utils::Md5sumHash(downloaded_data); + std::string downloaded_md5 = minio::utils::Md5sumHash(downloaded_data); if (original_md5 != downloaded_md5) { throw std::runtime_error( "PutObjectWithInflight(max_inflight_parts=" + std::to_string(inflight) + "): MD5 mismatch; " + - "original: " + original_md5 + - ", downloaded: " + downloaded_md5); + "original: " + original_md5 + ", downloaded: " + downloaded_md5); } RemoveObject(bucket_name_, object_name);