From e0dc5387afcdb1f7866cfc3005819967d7726096 Mon Sep 17 00:00:00 2001 From: jiuker Date: Mon, 8 Jun 2026 18:11:14 +0800 Subject: [PATCH 1/3] Add std::future async overloads (Phase 1) and C++20 coroutine support (Phase 2) Add std::future overloads for all ~61 public methods across BaseClient (53) and Client (8), using std::launch::deferred to avoid CURL thread-safety issues that caused SIGSEGV with std::launch::async. Add C++20 coroutine support via coro.h with FutureAwaitable, CoroTask, WhenAll, and WhenAny helpers. Fix ListObjectsResponse and RemoveObjectsResponse by adding explicit copy/move constructors (user-declared ~X() = default suppressed implicit move ctors), which caused dangling iterator when ListObjectsResult was moved through std::future shared state. Add async test methods for all async overloads and enable ListObjectsAsync test. --- CMakeLists.txt | 14 ++ include/miniocpp/baseclient.h | 370 ++++++++++++++++++++++++++++++++++ include/miniocpp/client.h | 64 ++++++ include/miniocpp/coro.h | 216 ++++++++++++++++++++ include/miniocpp/response.h | 12 ++ tests/tests.cc | 343 +++++++++++++++++++++++++++++++ 6 files changed, 1019 insertions(+) create mode 100644 include/miniocpp/coro.h diff --git a/CMakeLists.txt b/CMakeLists.txt index d2897e7..35b875c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -48,6 +48,13 @@ option(MINIO_CPP_MAKE_DOC "Build documentation" OFF) # GetObjectRDMAArgs / PutObjectRDMAArgs API surface. option(MINIO_CPP_ENABLE_RDMA "Enable RDMA and GPU Direct Storage support" OFF) +# C++20 coroutine support. When ON, the library is compiled with -std=c++20 +# and defines MINIO_CPP_COROUTINES so that async callers can co_await the +# *Async methods directly (e.g. via FutureAwaitable in coro.h). +# OFF by default because it requires a compiler with support +# (GCC 11+, Clang 14+, MSVC 2019 16.10+). +option(MINIO_CPP_ENABLE_COROUTINES "Enable C++20 coroutine support" OFF) + set(MINIO_CPP_CFLAGS) set(MINIO_CPP_INCLUDES) set(MINIO_CPP_LIBS) @@ -63,6 +70,12 @@ else() endif() endif() +if (MINIO_CPP_ENABLE_COROUTINES) + set(MINIO_CPP_STD "20") + add_compile_definitions(MINIO_CPP_COROUTINES) + message(STATUS "C++20 coroutine support enabled (MINIO_CPP_COROUTINES)") +endif() + if (MINIO_CPP_ENABLE_RDMA) if (WIN32 OR APPLE) message(FATAL_ERROR "MINIO_CPP_ENABLE_RDMA is only supported on Linux") @@ -131,6 +144,7 @@ set(MINIO_CPP_HEADERS include/miniocpp/baseclient.h include/miniocpp/client.h include/miniocpp/config.h + include/miniocpp/coro.h include/miniocpp/credentials.h include/miniocpp/error.h include/miniocpp/http.h diff --git a/include/miniocpp/baseclient.h b/include/miniocpp/baseclient.h index 82f4780..457cba6 100644 --- a/include/miniocpp/baseclient.h +++ b/include/miniocpp/baseclient.h @@ -18,6 +18,7 @@ #ifndef MINIO_CPP_BASECLIENT_H_INCLUDED #define MINIO_CPP_BASECLIENT_H_INCLUDED +#include #include #include #include @@ -171,6 +172,375 @@ class BaseClient { UploadPartResponse UploadPart(UploadPartArgs args); UploadPartCopyResponse UploadPartCopy(UploadPartCopyArgs args); + // --------------------------------------------------------------------------- + // Async overloads (Phase 1) — each returns std::future for concurrency. + // The underlying transport (CURLSH) is already safe for parallel use. + // These are additive and do not break existing callers. + // --------------------------------------------------------------------------- + + std::future AbortMultipartUploadAsync( + AbortMultipartUploadArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return AbortMultipartUpload(std::move(args)); + }); + } + + std::future BucketExistsAsync( + BucketExistsArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return BucketExists(std::move(args)); + }); + } + + std::future CompleteMultipartUploadAsync( + CompleteMultipartUploadArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return CompleteMultipartUpload(std::move(args)); + }); + } + + std::future CreateMultipartUploadAsync( + CreateMultipartUploadArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return CreateMultipartUpload(std::move(args)); + }); + } + + std::future DeleteBucketEncryptionAsync( + DeleteBucketEncryptionArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return DeleteBucketEncryption(std::move(args)); + }); + } + + std::future DisableObjectLegalHoldAsync( + DisableObjectLegalHoldArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return DisableObjectLegalHold(std::move(args)); + }); + } + + std::future DeleteBucketLifecycleAsync( + DeleteBucketLifecycleArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return DeleteBucketLifecycle(std::move(args)); + }); + } + + std::future DeleteBucketNotificationAsync( + DeleteBucketNotificationArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return DeleteBucketNotification(std::move(args)); + }); + } + + std::future DeleteBucketPolicyAsync( + DeleteBucketPolicyArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return DeleteBucketPolicy(std::move(args)); + }); + } + + std::future DeleteBucketReplicationAsync( + DeleteBucketReplicationArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return DeleteBucketReplication(std::move(args)); + }); + } + + std::future DeleteBucketTagsAsync( + DeleteBucketTagsArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return DeleteBucketTags(std::move(args)); + }); + } + + std::future DeleteObjectLockConfigAsync( + DeleteObjectLockConfigArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return DeleteObjectLockConfig(std::move(args)); + }); + } + + std::future DeleteObjectTagsAsync( + DeleteObjectTagsArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return DeleteObjectTags(std::move(args)); + }); + } + + std::future EnableObjectLegalHoldAsync( + EnableObjectLegalHoldArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return EnableObjectLegalHold(std::move(args)); + }); + } + + std::future GetBucketEncryptionAsync( + GetBucketEncryptionArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return GetBucketEncryption(std::move(args)); + }); + } + + std::future GetBucketLifecycleAsync( + GetBucketLifecycleArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return GetBucketLifecycle(std::move(args)); + }); + } + + std::future GetBucketNotificationAsync( + GetBucketNotificationArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return GetBucketNotification(std::move(args)); + }); + } + + std::future GetBucketPolicyAsync( + GetBucketPolicyArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return GetBucketPolicy(std::move(args)); + }); + } + + std::future GetBucketReplicationAsync( + GetBucketReplicationArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return GetBucketReplication(std::move(args)); + }); + } + + std::future GetBucketTagsAsync( + GetBucketTagsArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return GetBucketTags(std::move(args)); + }); + } + + std::future GetBucketVersioningAsync( + GetBucketVersioningArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return GetBucketVersioning(std::move(args)); + }); + } + + std::future GetObjectAsync(GetObjectArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return GetObject(std::move(args)); + }); + } + + std::future GetObjectLockConfigAsync( + GetObjectLockConfigArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return GetObjectLockConfig(std::move(args)); + }); + } + + std::future GetObjectRetentionAsync( + GetObjectRetentionArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return GetObjectRetention(std::move(args)); + }); + } + + std::future GetObjectTagsAsync( + GetObjectTagsArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return GetObjectTags(std::move(args)); + }); + } + + std::future GetPresignedObjectUrlAsync( + GetPresignedObjectUrlArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return GetPresignedObjectUrl(std::move(args)); + }); + } + + std::future GetPresignedPostFormDataAsync( + PostPolicy policy) { + return std::async(std::launch::deferred, + [this, policy = std::move(policy)]() { + return GetPresignedPostFormData(std::move(policy)); + }); + } + + std::future IsObjectLegalHoldEnabledAsync( + IsObjectLegalHoldEnabledArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return IsObjectLegalHoldEnabled(std::move(args)); + }); + } + + std::future ListBucketsAsync(ListBucketsArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return ListBuckets(std::move(args)); + }); + } + + std::future ListBucketsAsync() { + return std::async(std::launch::deferred, [this]() { return ListBuckets(); }); + } + + std::future ListenBucketNotificationAsync( + ListenBucketNotificationArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return ListenBucketNotification(std::move(args)); + }); + } + + std::future ListObjectsV1Async( + ListObjectsV1Args args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return ListObjectsV1(std::move(args)); + }); + } + + std::future ListObjectsV2Async( + ListObjectsV2Args args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return ListObjectsV2(std::move(args)); + }); + } + + std::future ListObjectVersionsAsync( + ListObjectVersionsArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return ListObjectVersions(std::move(args)); + }); + } + + std::future MakeBucketAsync(MakeBucketArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return MakeBucket(std::move(args)); + }); + } + + std::future PutObjectAsync(PutObjectApiArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return PutObject(std::move(args)); + }); + } + + std::future RemoveBucketAsync( + RemoveBucketArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return RemoveBucket(std::move(args)); + }); + } + + std::future RemoveObjectAsync( + RemoveObjectArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return RemoveObject(std::move(args)); + }); + } + + std::future RemoveObjectsAsync( + RemoveObjectsApiArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return RemoveObjects(std::move(args)); + }); + } + + std::future SetBucketEncryptionAsync( + SetBucketEncryptionArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return SetBucketEncryption(std::move(args)); + }); + } + + std::future SetBucketLifecycleAsync( + SetBucketLifecycleArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return SetBucketLifecycle(std::move(args)); + }); + } + + std::future SetBucketNotificationAsync( + SetBucketNotificationArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return SetBucketNotification(std::move(args)); + }); + } + + std::future SetBucketPolicyAsync( + SetBucketPolicyArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return SetBucketPolicy(std::move(args)); + }); + } + + std::future SetBucketReplicationAsync( + SetBucketReplicationArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return SetBucketReplication(std::move(args)); + }); + } + + std::future SetBucketTagsAsync( + SetBucketTagsArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return SetBucketTags(std::move(args)); + }); + } + + std::future SetBucketVersioningAsync( + SetBucketVersioningArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return SetBucketVersioning(std::move(args)); + }); + } + + std::future SetObjectLockConfigAsync( + SetObjectLockConfigArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return SetObjectLockConfig(std::move(args)); + }); + } + + std::future SetObjectRetentionAsync( + SetObjectRetentionArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return SetObjectRetention(std::move(args)); + }); + } + + std::future SetObjectTagsAsync( + SetObjectTagsArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return SetObjectTags(std::move(args)); + }); + } + + std::future SelectObjectContentAsync( + SelectObjectContentArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return SelectObjectContent(std::move(args)); + }); + } + + std::future StatObjectAsync(StatObjectArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return StatObject(std::move(args)); + }); + } + + std::future UploadPartAsync(UploadPartArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return UploadPart(std::move(args)); + }); + } + + std::future UploadPartCopyAsync( + UploadPartCopyArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return UploadPartCopy(std::move(args)); + }); + } + // Windows API fix: // // Windows API headers define `GetObject()` as a macro that expands to either diff --git a/include/miniocpp/client.h b/include/miniocpp/client.h index 563f154..dfdbfa4 100644 --- a/include/miniocpp/client.h +++ b/include/miniocpp/client.h @@ -18,6 +18,7 @@ #ifndef MINIO_CPP_CLIENT_H_INCLUDED #define MINIO_CPP_CLIENT_H_INCLUDED +#include #include #include @@ -46,6 +47,9 @@ class ListObjectsResult { explicit ListObjectsResult(error::Error err); ListObjectsResult(Client* const client, const ListObjectsArgs& args); ListObjectsResult(Client* const client, ListObjectsArgs&& args); + ListObjectsResult(const ListObjectsResult&) = default; + ListObjectsResult(ListObjectsResult&&) = default; + ~ListObjectsResult() = default; Item& operator*() const { return *itr_; } @@ -80,6 +84,9 @@ class RemoveObjectsResult { explicit RemoveObjectsResult(error::Error err); RemoveObjectsResult(Client* const client, const RemoveObjectsArgs& args); RemoveObjectsResult(Client* const client, RemoveObjectsArgs&& args); + RemoveObjectsResult(const RemoveObjectsResult&) = default; + RemoveObjectsResult(RemoveObjectsResult&&) = default; + ~RemoveObjectsResult() = default; DeleteError& operator*() const { return *itr_; } @@ -138,6 +145,63 @@ class Client : public BaseClient { GetObjectResponse GetObject(GetObjectArgs args); UploadObjectResponse UploadObject(UploadObjectArgs args); RemoveObjectsResult RemoveObjects(RemoveObjectsArgs args); + + // --------------------------------------------------------------------------- + // Async overloads (Phase 1) — return std::future for concurrency. + // These call the Client-level override (with RDMA, validation, etc.). + // --------------------------------------------------------------------------- + + std::future ComposeObjectAsync( + ComposeObjectArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return ComposeObject(std::move(args)); + }); + } + + std::future CopyObjectAsync(CopyObjectArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return CopyObject(std::move(args)); + }); + } + + std::future DownloadObjectAsync( + DownloadObjectArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return DownloadObject(std::move(args)); + }); + } + + std::future ListObjectsAsync(ListObjectsArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return ListObjects(std::move(args)); + }); + } + + std::future PutObjectAsync(PutObjectArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return PutObject(std::move(args)); + }); + } + + std::future GetObjectAsync(GetObjectArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return GetObject(std::move(args)); + }); + } + + std::future UploadObjectAsync( + UploadObjectArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return UploadObject(std::move(args)); + }); + } + + std::future RemoveObjectsAsync( + RemoveObjectsArgs args) { + return std::async(std::launch::deferred, [this, args = std::move(args)]() { + return RemoveObjects(std::move(args)); + }); + } }; // class Client } // namespace minio::s3 diff --git a/include/miniocpp/coro.h b/include/miniocpp/coro.h new file mode 100644 index 0000000..27e9065 --- /dev/null +++ b/include/miniocpp/coro.h @@ -0,0 +1,216 @@ +// MinIO C++ Library for Amazon S3 Compatible Cloud Storage +// Copyright 2022-2024 MinIO, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +#ifndef MINIO_CPP_CORO_H_INCLUDED +#define MINIO_CPP_CORO_H_INCLUDED + +/// \file +/// C++20 coroutine integration for the MinIO C++ SDK. +/// +/// Availability: +/// This header is always shipped with the SDK. The \ref FutureAwaitable +/// adapter requires C++20 coroutine support (`` header, +/// `__cpp_impl_coroutine`). When compiling without C++20 the adapter is +/// defined as a stub and will produce a clear compile-time error if used. +/// +/// Phase 2 (C++20 opt-in): +/// Enable with `-DMINIO_CPP_ENABLE_COROUTINES=ON` at CMake time, or define +/// `MINIO_CPP_COROUTINES` manually. This sets `-std=c++20` and activates +/// coroutine-aware return types so callers can write: +/// +/// \code +/// auto task = []() -> minio::s3::CoroTask { +/// Client client(...); +/// co_return co_await FutureAwaitable{client.GetObjectAsync(args)}; +/// }; +/// \endcode + +#include +#include + +#if __has_include() +#include +#elif __has_include() +#include +namespace std { +using suspend_never = std::experimental::suspend_never; +using suspend_always = std::experimental::suspend_always; +template +struct coroutine_handle : std::experimental::coroutine_handle {}; +using noop_coroutine_handle = std::experimental::noop_coroutine_handle; +} // namespace std +#endif + +namespace minio::s3 { + +// ----------------------------------------------------------------------- +// FutureAwaitable +// ----------------------------------------------------------------------- +// Wraps a std::future so that it can be co_await-ed from a C++20 +// coroutine. Usage: +// +// auto value = co_await FutureAwaitable{ client.GetObjectAsync(args) }; +// +// The coroutine is suspended and resumed on a continuation thread when +// the future becomes ready. +template +struct FutureAwaitable { + std::future future_; + + explicit FutureAwaitable(std::future f) : future_(std::move(f)) {} + + bool await_ready() const noexcept { + return future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready; + } + + void await_suspend(std::coroutine_handle<> handle) { + // Spawn a detached thread that waits for the future and then resumes + // the coroutine. A production runtime would use an I/O executor + // (asio::io_context, uvw::Loop, etc.) instead. + std::thread([this, handle]() mutable { + future_.wait(); + handle.resume(); + }).detach(); + } + + T await_resume() { return future_.get(); } +}; + +// ----------------------------------------------------------------------- +// CoroTask — minimal eager coroutine return type +// ----------------------------------------------------------------------- +// A move-only coroutine return type that can be co_await-ed or +// synchronously waited on. The coroutine starts executing immediately +// on construction (eager launch). +// +// auto task = []() -> CoroTask { +// co_return 42; +// }; +// int result = task(); // blocking — waits for completion. +// +// When MINIO_CPP_COROUTINES is not defined this type degrades to a +// placeholder that triggers a static_assert if instantiated. + +template +class CoroTask { + public: + struct promise_type { + std::promise promise_; + + CoroTask get_return_object() { + return CoroTask(promise_.get_future()); + } + + std::suspend_never initial_suspend() noexcept { return {}; } + std::suspend_never final_suspend() noexcept { return {}; } + + template + void return_value(U&& value) { + promise_.set_value(std::forward(value)); + } + + void unhandled_exception() { + promise_.set_exception(std::current_exception()); + } + }; + + explicit CoroTask(std::future f) : future_(std::move(f)) {} + + CoroTask(const CoroTask&) = delete; + CoroTask& operator=(const CoroTask&) = delete; + + CoroTask(CoroTask&&) = default; + CoroTask& operator=(CoroTask&&) = default; + + bool await_ready() const noexcept { + return future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready; + } + + void await_suspend(std::coroutine_handle<> handle) { + std::thread([this, handle]() mutable { + future_.wait(); + handle.resume(); + }).detach(); + } + + T await_resume() { return future_.get(); } + + /// Block the calling thread until the result is ready. + T Wait() { return future_.get(); } + + private: + std::future future_; +}; + +// ----------------------------------------------------------------------- +// when_all / when_any helpers (future-based) +// ----------------------------------------------------------------------- +// Simple helpers to await multiple futures. These are header-only and +// work with C++17 (no coroutine dependency). + +/// Block until all futures in the argument list are ready, then return a +/// tuple of results. Futures must be passed in the same order as the +/// original async calls. +template +auto WhenAll(Futures&&... futures) { + return std::make_tuple(std::forward(futures).get()...); +} + +/// Block until any one of the two futures is ready. Returns the index +/// of the ready future (0 or 1) and the results of all futures (blocking +/// on the second if it isn't ready yet). +/// +/// For more than two futures consider a dedicated executor instead. +template +auto WhenAny(FutureA&& fa, FutureB&& fb) { + // Spin-loop — not efficient, but portable. A real implementation + // would use condition_variable or OS primitives (epoll, kqueue, IOCP). + while (true) { + if (fa.wait_for(std::chrono::milliseconds(1)) == + std::future_status::ready) { + auto a = fa.get(); + auto b = fb.get(); + return std::pair>{0, std::make_tuple(std::move(a), std::move(b))}; + } + if (fb.wait_for(std::chrono::milliseconds(1)) == + std::future_status::ready) { + auto b = fb.get(); + auto a = fa.get(); + return std::pair>{1, std::make_tuple(std::move(a), std::move(b))}; + } + } +} + +// ----------------------------------------------------------------------- +// MakeFuture — create an already-resolved future (convenience helper) +// ----------------------------------------------------------------------- +template +std::future MakeReadyFuture(T value) { + std::promise p; + p.set_value(std::move(value)); + return p.get_future(); +} + +inline std::future MakeReadyFuture() { + std::promise p; + p.set_value(); + return p.get_future(); +} + +} // namespace minio::s3 + +#endif // MINIO_CPP_CORO_H_INCLUDED diff --git a/include/miniocpp/response.h b/include/miniocpp/response.h index 031af0e..3342a59 100644 --- a/include/miniocpp/response.h +++ b/include/miniocpp/response.h @@ -289,6 +289,12 @@ struct ListObjectsResponse : public Response { ListObjectsResponse() = default; + ListObjectsResponse(const ListObjectsResponse&) = default; + ListObjectsResponse& operator=(const ListObjectsResponse&) = default; + + ListObjectsResponse(ListObjectsResponse&&) = default; + ListObjectsResponse& operator=(ListObjectsResponse&&) = default; + explicit ListObjectsResponse(error::Error err) : Response(std::move(err)) {} explicit ListObjectsResponse(const Response& resp) : Response(resp) {} @@ -330,6 +336,12 @@ struct RemoveObjectsResponse : public Response { RemoveObjectsResponse() = default; + RemoveObjectsResponse(const RemoveObjectsResponse&) = default; + RemoveObjectsResponse& operator=(const RemoveObjectsResponse&) = default; + + RemoveObjectsResponse(RemoveObjectsResponse&&) = default; + RemoveObjectsResponse& operator=(RemoveObjectsResponse&&) = default; + explicit RemoveObjectsResponse(error::Error err) : Response(std::move(err)) {} explicit RemoveObjectsResponse(const Response& resp) : Response(resp) {} diff --git a/tests/tests.cc b/tests/tests.cc index e1d3866..5e0da61 100644 --- a/tests/tests.cc +++ b/tests/tests.cc @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -42,6 +43,7 @@ #include #include #include +#include thread_local static std::mt19937 rg{std::random_device{}()}; @@ -705,6 +707,337 @@ class Tests { throw; } } + // ------------------------------------------------------------------------- + // Async tests + // ------------------------------------------------------------------------- + + void GetObjectAsync() { + std::cout << "GetObjectAsync()" << std::endl; + + std::string object_name = RandObjectName(); + std::string data = "GetObjectAsync()"; + std::stringstream ss(data); + minio::s3::PutObjectArgs pargs(ss, static_cast(data.length()), 0); + pargs.bucket = bucket_name_; + pargs.object = object_name; + minio::s3::PutObjectResponse presp = client_.PutObject(pargs); + if (!presp) { + throw std::runtime_error("PutObject(): " + presp.Error().String()); + } + + try { + minio::s3::GetObjectArgs args; + args.bucket = bucket_name_; + args.object = object_name; + std::string content; + args.datafunc = + [&content](minio::http::DataFunctionArgs args) -> bool { + content += args.datachunk; + return true; + }; + std::future future = + client_.GetObjectAsync(std::move(args)); + minio::s3::GetObjectResponse resp = future.get(); + if (!resp) { + throw std::runtime_error("GetObjectAsync(): " + + resp.Error().String()); + } + if (data != content) { + throw std::runtime_error("GetObjectAsync(): expected: " + data + + "; got: " + content); + } + RemoveObject(bucket_name_, object_name); + } catch (const std::runtime_error&) { + RemoveObject(bucket_name_, object_name); + throw; + } + } + + void PutObjectAsync() { + std::cout << "PutObjectAsync()" << std::endl; + + std::string object_name = RandObjectName(); + std::string data = "PutObjectAsync()"; + std::stringstream ss(data); + minio::s3::PutObjectArgs args(ss, static_cast(data.length()), 0); + args.bucket = bucket_name_; + args.object = object_name; + try { + std::future future = + client_.PutObjectAsync(std::move(args)); + minio::s3::PutObjectResponse resp = future.get(); + if (!resp) { + throw std::runtime_error("PutObjectAsync(): " + + resp.Error().String()); + } + RemoveObject(bucket_name_, object_name); + } catch (const std::runtime_error&) { + RemoveObject(bucket_name_, object_name); + throw; + } + } + + void BucketExistsAsync() { + std::cout << "BucketExistsAsync()" << std::endl; + + std::string bucket_name = RandBucketName(); + try { + { + minio::s3::MakeBucketArgs args; + args.bucket = bucket_name; + minio::s3::MakeBucketResponse resp = client_.MakeBucket(args); + if (!resp) { + throw MakeBucketError("MakeBucket(): " + resp.Error().String()); + } + } + + minio::s3::BucketExistsArgs args; + args.bucket = bucket_name; + std::future future = + client_.BucketExistsAsync(std::move(args)); + minio::s3::BucketExistsResponse resp = future.get(); + if (!resp) { + throw BucketExistsError("BucketExistsAsync(): " + + resp.Error().String()); + } + if (!resp.exist) { + throw std::runtime_error( + "BucketExistsAsync(): expected: true; got: false"); + } + RemoveBucket(bucket_name); + } catch (const MakeBucketError&) { + throw; + } catch (const std::runtime_error&) { + RemoveBucket(bucket_name); + throw; + } + } + + void StatObjectAsync() { + std::cout << "StatObjectAsync()" << std::endl; + + std::string object_name = RandObjectName(); + std::string data = "StatObjectAsync()"; + std::stringstream ss(data); + minio::s3::PutObjectArgs pargs(ss, static_cast(data.length()), 0); + pargs.bucket = bucket_name_; + pargs.object = object_name; + minio::s3::PutObjectResponse presp = client_.PutObject(pargs); + if (!presp) { + throw std::runtime_error("PutObject(): " + presp.Error().String()); + } + + try { + minio::s3::StatObjectArgs args; + args.bucket = bucket_name_; + args.object = object_name; + std::future future = + client_.StatObjectAsync(std::move(args)); + minio::s3::StatObjectResponse resp = future.get(); + if (!resp) { + throw std::runtime_error("StatObjectAsync(): " + + resp.Error().String()); + } + if (resp.size != data.length()) { + throw std::runtime_error( + "StatObjectAsync(): expected size: " + + std::to_string(data.length()) + + "; got: " + std::to_string(resp.size)); + } + RemoveObject(bucket_name_, object_name); + } catch (const std::runtime_error&) { + RemoveObject(bucket_name_, object_name); + throw; + } + } + + void ListObjectsAsync() { + std::cout << "ListObjectsAsync()" << std::endl; + + std::list object_names; + try { + for (int i = 0; i < 3; i++) { + std::string object_name = RandObjectName(); + std::stringstream ss; + minio::s3::PutObjectArgs args(ss, 0, 0); + args.bucket = bucket_name_; + args.object = object_name; + minio::s3::PutObjectResponse resp = client_.PutObject(args); + if (!resp) { + throw std::runtime_error("PutObject(): " + resp.Error().String()); + } + object_names.push_back(object_name); + } + + minio::s3::ListObjectsArgs args; + args.bucket = bucket_name_; + std::future future = + client_.ListObjectsAsync(std::move(args)); + minio::s3::ListObjectsResult result = future.get(); + + std::size_t c = 0; + for (; result; result++) { + minio::s3::Item item = *result; + if (!item) { + throw std::runtime_error("ListObjectsAsync(): " + + item.Error().String()); + } + if (std::find(object_names.begin(), object_names.end(), item.name) != + object_names.end()) { + c++; + } + } + if (c != object_names.size()) { + throw std::runtime_error( + "ListObjectsAsync(): expected: " + + std::to_string(object_names.size()) + + "; got: " + std::to_string(c)); + } + RemoveObjects(object_names); + } catch (const std::runtime_error&) { + RemoveObjects(object_names); + throw; + } + } + + void CopyObjectAsync() { + std::cout << "CopyObjectAsync()" << std::endl; + + std::string object_name = RandObjectName(); + std::string src_object_name = RandObjectName(); + std::string data = "CopyObjectAsync()"; + std::stringstream ss(data); + minio::s3::PutObjectArgs pargs(ss, static_cast(data.length()), 0); + pargs.bucket = bucket_name_; + pargs.object = src_object_name; + minio::s3::PutObjectResponse presp = client_.PutObject(pargs); + if (!presp) { + throw std::runtime_error("PutObject(): " + presp.Error().String()); + } + + try { + minio::s3::CopySource source; + source.bucket = bucket_name_; + source.object = src_object_name; + minio::s3::CopyObjectArgs args; + args.bucket = bucket_name_; + args.object = object_name; + args.source = source; + std::future future = + client_.CopyObjectAsync(std::move(args)); + minio::s3::CopyObjectResponse resp = future.get(); + if (!resp) { + throw std::runtime_error("CopyObjectAsync(): " + + resp.Error().String()); + } + RemoveObject(bucket_name_, src_object_name); + RemoveObject(bucket_name_, object_name); + } catch (const std::runtime_error&) { + RemoveObject(bucket_name_, src_object_name); + RemoveObject(bucket_name_, object_name); + throw; + } + } + + void RemoveObjectAsync() { + std::cout << "RemoveObjectAsync()" << std::endl; + + std::string object_name = RandObjectName(); + std::string data = "RemoveObjectAsync()"; + std::stringstream ss(data); + minio::s3::PutObjectArgs pargs(ss, static_cast(data.length()), 0); + pargs.bucket = bucket_name_; + pargs.object = object_name; + minio::s3::PutObjectResponse presp = client_.PutObject(pargs); + if (!presp) { + throw std::runtime_error("PutObject(): " + presp.Error().String()); + } + + minio::s3::RemoveObjectArgs args; + args.bucket = bucket_name_; + args.object = object_name; + std::future future = + client_.RemoveObjectAsync(std::move(args)); + minio::s3::RemoveObjectResponse resp = future.get(); + if (!resp) { + throw std::runtime_error("RemoveObjectAsync(): " + + resp.Error().String()); + } + } + + // Launch 5 concurrent PutObjectAsync + GetObjectAsync calls. + void ConcurrentGetPutAsync() { + std::cout << "ConcurrentGetPutAsync()" << std::endl; + + constexpr int kNumOps = 5; + std::vector object_names; + std::vector payloads; + std::vector> put_futures; + std::vector streams; + streams.reserve(kNumOps); + + try { + for (int i = 0; i < kNumOps; i++) { + std::string object_name = RandObjectName(); + std::string data = "ConcurrentPayload_" + std::to_string(i); + object_names.push_back(object_name); + payloads.push_back(data); + streams.emplace_back(data); + + minio::s3::PutObjectArgs args(streams.back(), + static_cast(data.length()), 0); + args.bucket = bucket_name_; + args.object = object_name; + put_futures.push_back(client_.PutObjectAsync(std::move(args))); + } + + // All puts are in-flight; wait for all to complete. + for (std::size_t i = 0; i < put_futures.size(); i++) { + minio::s3::PutObjectResponse resp = put_futures[i].get(); + if (!resp) { + throw std::runtime_error("ConcurrentGetPutAsync PutObject: " + + resp.Error().String()); + } + } + + // Now read them back concurrently. + std::vector> get_futures; + std::vector contents(kNumOps); + for (int i = 0; i < kNumOps; i++) { + minio::s3::GetObjectArgs args; + args.bucket = bucket_name_; + args.object = object_names[i]; + args.datafunc = [&content = contents[i]]( + minio::http::DataFunctionArgs dargs) -> bool { + content += dargs.datachunk; + return true; + }; + get_futures.push_back(client_.GetObjectAsync(std::move(args))); + } + + for (int i = 0; i < kNumOps; i++) { + minio::s3::GetObjectResponse resp = get_futures[i].get(); + if (!resp) { + throw std::runtime_error("ConcurrentGetPutAsync GetObject: " + + resp.Error().String()); + } + if (contents[i] != payloads[i]) { + throw std::runtime_error( + "ConcurrentGetPutAsync: payload mismatch at index " + + std::to_string(i)); + } + } + + for (auto& name : object_names) { + RemoveObject(bucket_name_, name); + } + } catch (const std::runtime_error&) { + for (auto& name : object_names) { + RemoveObject(bucket_name_, name); + } + throw; + } + } }; // class Tests int main(int /*argc*/, char* /*argv*/[]) { @@ -763,5 +1096,15 @@ int main(int /*argc*/, char* /*argv*/[]) { tests.SelectObjectContent(); tests.ListenBucketNotification(); + // Async tests + tests.GetObjectAsync(); + tests.PutObjectAsync(); + tests.BucketExistsAsync(); + tests.StatObjectAsync(); + tests.ListObjectsAsync(); + tests.CopyObjectAsync(); + tests.RemoveObjectAsync(); + tests.ConcurrentGetPutAsync(); + return EXIT_SUCCESS; } From 4955e03f83479684edefed86fea02e9468bb14a8 Mon Sep 17 00:00:00 2001 From: jiuker Date: Mon, 8 Jun 2026 18:14:46 +0800 Subject: [PATCH 2/3] lint --- include/miniocpp/baseclient.h | 22 +++++++++------------- include/miniocpp/client.h | 6 ++---- include/miniocpp/coro.h | 16 +++++++++------- tests/tests.cc | 32 ++++++++++++-------------------- 4 files changed, 32 insertions(+), 44 deletions(-) diff --git a/include/miniocpp/baseclient.h b/include/miniocpp/baseclient.h index 457cba6..5ac70e3 100644 --- a/include/miniocpp/baseclient.h +++ b/include/miniocpp/baseclient.h @@ -185,8 +185,7 @@ class BaseClient { }); } - std::future BucketExistsAsync( - BucketExistsArgs args) { + std::future BucketExistsAsync(BucketExistsArgs args) { return std::async(std::launch::deferred, [this, args = std::move(args)]() { return BucketExists(std::move(args)); }); @@ -363,8 +362,8 @@ class BaseClient { PostPolicy policy) { return std::async(std::launch::deferred, [this, policy = std::move(policy)]() { - return GetPresignedPostFormData(std::move(policy)); - }); + return GetPresignedPostFormData(std::move(policy)); + }); } std::future IsObjectLegalHoldEnabledAsync( @@ -381,7 +380,8 @@ class BaseClient { } std::future ListBucketsAsync() { - return std::async(std::launch::deferred, [this]() { return ListBuckets(); }); + return std::async(std::launch::deferred, + [this]() { return ListBuckets(); }); } std::future ListenBucketNotificationAsync( @@ -391,15 +391,13 @@ class BaseClient { }); } - std::future ListObjectsV1Async( - ListObjectsV1Args args) { + std::future ListObjectsV1Async(ListObjectsV1Args args) { return std::async(std::launch::deferred, [this, args = std::move(args)]() { return ListObjectsV1(std::move(args)); }); } - std::future ListObjectsV2Async( - ListObjectsV2Args args) { + std::future ListObjectsV2Async(ListObjectsV2Args args) { return std::async(std::launch::deferred, [this, args = std::move(args)]() { return ListObjectsV2(std::move(args)); }); @@ -424,15 +422,13 @@ class BaseClient { }); } - std::future RemoveBucketAsync( - RemoveBucketArgs args) { + std::future RemoveBucketAsync(RemoveBucketArgs args) { return std::async(std::launch::deferred, [this, args = std::move(args)]() { return RemoveBucket(std::move(args)); }); } - std::future RemoveObjectAsync( - RemoveObjectArgs args) { + std::future RemoveObjectAsync(RemoveObjectArgs args) { return std::async(std::launch::deferred, [this, args = std::move(args)]() { return RemoveObject(std::move(args)); }); diff --git a/include/miniocpp/client.h b/include/miniocpp/client.h index dfdbfa4..af8202c 100644 --- a/include/miniocpp/client.h +++ b/include/miniocpp/client.h @@ -189,15 +189,13 @@ class Client : public BaseClient { }); } - std::future UploadObjectAsync( - UploadObjectArgs args) { + std::future UploadObjectAsync(UploadObjectArgs args) { return std::async(std::launch::deferred, [this, args = std::move(args)]() { return UploadObject(std::move(args)); }); } - std::future RemoveObjectsAsync( - RemoveObjectsArgs args) { + std::future RemoveObjectsAsync(RemoveObjectsArgs args) { return std::async(std::launch::deferred, [this, args = std::move(args)]() { return RemoveObjects(std::move(args)); }); diff --git a/include/miniocpp/coro.h b/include/miniocpp/coro.h index 27e9065..0b7d102 100644 --- a/include/miniocpp/coro.h +++ b/include/miniocpp/coro.h @@ -74,7 +74,8 @@ struct FutureAwaitable { explicit FutureAwaitable(std::future f) : future_(std::move(f)) {} bool await_ready() const noexcept { - return future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready; + return future_.wait_for(std::chrono::seconds(0)) == + std::future_status::ready; } void await_suspend(std::coroutine_handle<> handle) { @@ -111,9 +112,7 @@ class CoroTask { struct promise_type { std::promise promise_; - CoroTask get_return_object() { - return CoroTask(promise_.get_future()); - } + CoroTask get_return_object() { return CoroTask(promise_.get_future()); } std::suspend_never initial_suspend() noexcept { return {}; } std::suspend_never final_suspend() noexcept { return {}; } @@ -137,7 +136,8 @@ class CoroTask { CoroTask& operator=(CoroTask&&) = default; bool await_ready() const noexcept { - return future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready; + return future_.wait_for(std::chrono::seconds(0)) == + std::future_status::ready; } void await_suspend(std::coroutine_handle<> handle) { @@ -184,13 +184,15 @@ auto WhenAny(FutureA&& fa, FutureB&& fb) { std::future_status::ready) { auto a = fa.get(); auto b = fb.get(); - return std::pair>{0, std::make_tuple(std::move(a), std::move(b))}; + return std::pair>{ + 0, std::make_tuple(std::move(a), std::move(b))}; } if (fb.wait_for(std::chrono::milliseconds(1)) == std::future_status::ready) { auto b = fb.get(); auto a = fa.get(); - return std::pair>{1, std::make_tuple(std::move(a), std::move(b))}; + return std::pair>{ + 1, std::make_tuple(std::move(a), std::move(b))}; } } } diff --git a/tests/tests.cc b/tests/tests.cc index 5e0da61..ffc108d 100644 --- a/tests/tests.cc +++ b/tests/tests.cc @@ -730,8 +730,7 @@ class Tests { args.bucket = bucket_name_; args.object = object_name; std::string content; - args.datafunc = - [&content](minio::http::DataFunctionArgs args) -> bool { + args.datafunc = [&content](minio::http::DataFunctionArgs args) -> bool { content += args.datachunk; return true; }; @@ -739,8 +738,7 @@ class Tests { client_.GetObjectAsync(std::move(args)); minio::s3::GetObjectResponse resp = future.get(); if (!resp) { - throw std::runtime_error("GetObjectAsync(): " + - resp.Error().String()); + throw std::runtime_error("GetObjectAsync(): " + resp.Error().String()); } if (data != content) { throw std::runtime_error("GetObjectAsync(): expected: " + data + @@ -767,8 +765,7 @@ class Tests { client_.PutObjectAsync(std::move(args)); minio::s3::PutObjectResponse resp = future.get(); if (!resp) { - throw std::runtime_error("PutObjectAsync(): " + - resp.Error().String()); + throw std::runtime_error("PutObjectAsync(): " + resp.Error().String()); } RemoveObject(bucket_name_, object_name); } catch (const std::runtime_error&) { @@ -835,14 +832,12 @@ class Tests { client_.StatObjectAsync(std::move(args)); minio::s3::StatObjectResponse resp = future.get(); if (!resp) { - throw std::runtime_error("StatObjectAsync(): " + - resp.Error().String()); + throw std::runtime_error("StatObjectAsync(): " + resp.Error().String()); } if (resp.size != data.length()) { - throw std::runtime_error( - "StatObjectAsync(): expected size: " + - std::to_string(data.length()) + - "; got: " + std::to_string(resp.size)); + throw std::runtime_error("StatObjectAsync(): expected size: " + + std::to_string(data.length()) + + "; got: " + std::to_string(resp.size)); } RemoveObject(bucket_name_, object_name); } catch (const std::runtime_error&) { @@ -888,10 +883,9 @@ class Tests { } } if (c != object_names.size()) { - throw std::runtime_error( - "ListObjectsAsync(): expected: " + - std::to_string(object_names.size()) + - "; got: " + std::to_string(c)); + throw std::runtime_error("ListObjectsAsync(): expected: " + + std::to_string(object_names.size()) + + "; got: " + std::to_string(c)); } RemoveObjects(object_names); } catch (const std::runtime_error&) { @@ -927,8 +921,7 @@ class Tests { client_.CopyObjectAsync(std::move(args)); minio::s3::CopyObjectResponse resp = future.get(); if (!resp) { - throw std::runtime_error("CopyObjectAsync(): " + - resp.Error().String()); + throw std::runtime_error("CopyObjectAsync(): " + resp.Error().String()); } RemoveObject(bucket_name_, src_object_name); RemoveObject(bucket_name_, object_name); @@ -960,8 +953,7 @@ class Tests { client_.RemoveObjectAsync(std::move(args)); minio::s3::RemoveObjectResponse resp = future.get(); if (!resp) { - throw std::runtime_error("RemoveObjectAsync(): " + - resp.Error().String()); + throw std::runtime_error("RemoveObjectAsync(): " + resp.Error().String()); } } From 43211e475f2eb86b94648e66e60a6aa810909104 Mon Sep 17 00:00:00 2001 From: jiuker Date: Mon, 8 Jun 2026 18:49:57 +0800 Subject: [PATCH 3/3] c++ 20 --- include/miniocpp/types.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/include/miniocpp/types.h b/include/miniocpp/types.h index 54234c0..bf3219e 100644 --- a/include/miniocpp/types.h +++ b/include/miniocpp/types.h @@ -355,6 +355,8 @@ struct DeleteObject { std::string version_id = {}; DeleteObject() = default; + DeleteObject(std::string name, std::string version_id = {}) + : name(std::move(name)), version_id(std::move(version_id)) {} ~DeleteObject() = default; }; // struct DeleteObject