diff --git a/.gitmodules b/.gitmodules index 8acc6267..7b1c6310 100644 --- a/.gitmodules +++ b/.gitmodules @@ -7,3 +7,6 @@ [submodule "third_party/doxygen-awesome-css"] path = third_party/doxygen-awesome-css url = https://github.com/jothepro/doxygen-awesome-css.git +[submodule "third_party/stdexec"] + path = third_party/stdexec + url = https://github.com/NVIDIA/stdexec.git diff --git a/CMakeLists.txt b/CMakeLists.txt index 69b3be2d..d1071506 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -47,12 +47,18 @@ if(BUILD_EXAMPLES OR BUILD_BENCHMARKS OR BUILD_TESTS OR LINK_LIBURING) set_target_properties(uring PROPERTIES IMPORTED_LOCATION ${LIBURING_SOURCE_DIR}/src/liburing.a) target_include_directories(uring INTERFACE ${LIBURING_SOURCE_DIR}/src/include) add_dependencies(uring liburing_ext) + + # stdexec + add_library(stdexec INTERFACE) + target_include_directories(stdexec INTERFACE ${CMAKE_CURRENT_SOURCE_DIR}/third_party/stdexec/include) endif() if(LINK_LIBURING) target_link_libraries(condy INTERFACE uring) endif() +target_link_libraries(condy INTERFACE stdexec) + # Warnings as errors add_compile_options(-Wall -Wextra -Werror) if(CMAKE_CXX_COMPILER_ID MATCHES "GNU") diff --git a/include/condy/execution.hpp b/include/condy/execution.hpp new file mode 100644 index 00000000..6df98a03 --- /dev/null +++ b/include/condy/execution.hpp @@ -0,0 +1,144 @@ +#pragma once + +#include "condy/runtime.hpp" +#include + +namespace condy { + +namespace ex = stdexec; + +namespace detail { + +template struct set_value_traits { + using type = ex::set_value_t(R); +}; +template struct set_value_traits> { + using type = ex::set_value_t(R1, R2); +}; +template struct set_value_traits> { + using type = ex::set_value_t(R...); +}; +template +using set_value_traits_t = typename set_value_traits::type; + +template class StandardSender : public SenderImpl { +public: + using SenderImpl::SenderImpl; + + using sender_concept = ex::sender_t; + using completion_signatures = ex::completion_signatures< + set_value_traits_t, + ex::set_error_t(std::error_code), ex::set_stopped_t()>; + + template auto connect(Receiver receiver) noexcept { + using OpState = decltype(this->connect_impl( + ReceiverWrapper{std::move(receiver)})); + return OperationStateWrapper{ + this->connect_impl(ReceiverWrapper{std::move(receiver)})}; + } + +private: + template struct ReceiverWrapper { + Receiver receiver; + + void operator()(int32_t res) noexcept { + if (res >= 0) { + ex::set_value(std::move(receiver), res); + } else if (res == -ECANCELED) { + ex::set_stopped(std::move(receiver)); + } else { + ex::set_error(std::move(receiver), + std::error_code(-res, std::generic_category())); + } + } + + template + void operator()(std::pair res) noexcept { + auto &[res_code, _] = res; + if (res_code >= 0) { + ex::set_value(std::move(receiver), std::move(res)); + } else if (res_code == -ECANCELED) { + ex::set_stopped(std::move(receiver)); + } else { + ex::set_error( + std::move(receiver), + std::error_code(-res_code, std::generic_category())); + } + } + + template void operator()(T &&res) noexcept { + ex::set_value(std::move(receiver), std::forward(res)); + } + + auto get_stop_token() const noexcept { + auto env = ex::get_env(receiver); + return ex::get_stop_token(env); + } + }; + + template struct OperationStateWrapper { + OperationState op_state; + void start() noexcept { op_state.start(0); } + }; +}; + +class ScheduleSender { +public: + using sender_concept = ex::sender_t; + + using completion_signatures = ex::completion_signatures; + + ScheduleSender(Runtime &runtime) : runtime_(runtime) {} + + template auto connect(Receiver receiver) { + return OperationState>(runtime_, + std::move(receiver)); + } + +private: + template + class OperationState + : public InvokerAdapter, WorkInvoker> { + public: + OperationState(Runtime &runtime, Receiver receiver) + : runtime_(runtime), receiver_(std::move(receiver)) {} + + OperationState(const OperationState &) = delete; + OperationState &operator=(const OperationState &) = delete; + OperationState(OperationState &&) = delete; + OperationState &operator=(OperationState &&) = delete; + + public: + void start() noexcept { runtime_.schedule(this); } + + void invoke() noexcept { ex::set_value(std::move(receiver_)); } + + private: + Runtime &runtime_; + Receiver receiver_; + }; + + Runtime &runtime_; +}; + +} // namespace detail + +class Scheduler { +public: + Scheduler(Runtime &runtime) : runtime_(&runtime) {} + + bool operator==(const Scheduler &other) const noexcept { + return runtime_ == other.runtime_; + } + + auto schedule() const noexcept { return detail::ScheduleSender{*runtime_}; } + + Runtime &runtime() const noexcept { return *runtime_; } + +private: + Runtime *runtime_; +}; + +inline Scheduler get_scheduler(Runtime &runtime) { return {runtime}; } + +} // namespace condy \ No newline at end of file diff --git a/include/condy/senders.hpp b/include/condy/senders.hpp index 191350dd..7f893ed3 100644 --- a/include/condy/senders.hpp +++ b/include/condy/senders.hpp @@ -6,26 +6,69 @@ #pragma once #include "condy/detail/senders.hpp" +#include "condy/execution.hpp" namespace condy { -// TODO: This re-export is intentional. We may adapt these senders to standard -// sender/receiver concepts in the future. -using detail::FlaggedOpSender; -using detail::HardLinkSender; -using detail::LinkSender; -using detail::MultiShotOpSender; -using detail::OpSender; -using detail::ParallelAllSender; -using detail::ParallelAnySender; -using detail::RangedHardLinkSender; -using detail::RangedLinkSender; -using detail::RangedParallelAllSender; -using detail::RangedParallelAnySender; -using detail::RangedWhenAllSender; -using detail::RangedWhenAnySender; -using detail::WhenAllSender; -using detail::WhenAnySender; -using detail::ZeroCopyOpSender; +template +using OpSender = detail::StandardSender>; + +template +using MultiShotOpSender = detail::StandardSender< + detail::MultiShotOpSender>; + +template +using ZeroCopyOpSender = detail::StandardSender< + detail::ZeroCopyOpSender>; + +template +using FlaggedOpSender = + detail::StandardSender>; + +template +using ParallelAllSender = + detail::StandardSender>; + +template +using ParallelAnySender = + detail::StandardSender>; + +template +using WhenAllSender = detail::StandardSender>; + +template +using WhenAnySender = detail::StandardSender>; + +template +using LinkSender = detail::StandardSender>; + +template +using HardLinkSender = + detail::StandardSender>; + +template +using RangedParallelAllSender = + detail::StandardSender>; + +template +using RangedParallelAnySender = + detail::StandardSender>; + +template +using RangedWhenAllSender = + detail::StandardSender>; + +template +using RangedWhenAnySender = + detail::StandardSender>; + +template +using RangedLinkSender = + detail::StandardSender>; + +template +using RangedHardLinkSender = + detail::StandardSender>; } // namespace condy \ No newline at end of file diff --git a/tests/test_execution.cpp b/tests/test_execution.cpp new file mode 100644 index 00000000..b210db1e --- /dev/null +++ b/tests/test_execution.cpp @@ -0,0 +1,157 @@ +#include "condy/async_operations.hpp" +#include "condy/execution.hpp" +#include "condy/runtime.hpp" +#include "condy/sender_operations.hpp" +#include +#include + +namespace ex = stdexec; + +TEST_CASE("test execution - schedule") { + condy::Runtime runtime; + std::thread::id runtime_thread_id; + std::thread runtime_thread([&] { + runtime_thread_id = std::this_thread::get_id(); + runtime.run(); + }); + + auto scheduler = condy::get_scheduler(runtime); + + bool executed = false; + ex::sender auto sender = ex::schedule(scheduler) | ex::then([&] { + executed = true; + return std::this_thread::get_id(); + }); + + auto [thread_id] = ex::sync_wait(sender).value(); + REQUIRE(executed); + REQUIRE(thread_id == runtime_thread_id); + REQUIRE(runtime_thread_id != std::this_thread::get_id()); + + runtime.allow_exit(); + runtime_thread.join(); +} + +TEST_CASE("test execution - sender") { + condy::Runtime runtime; + std::thread runtime_thread([&] { runtime.run(); }); + + auto scheduler = condy::get_scheduler(runtime); + + bool executed = false; + ex::sender auto sender = ex::schedule(scheduler) | ex::let_value([&] { + executed = true; + return condy::async_nop(); + }); + + auto [r] = ex::sync_wait(sender).value(); + REQUIRE(executed); + REQUIRE(r == 0); + + runtime.allow_exit(); + runtime_thread.join(); +} + +TEST_CASE("test execution - when_all") { + condy::Runtime runtime; + std::thread runtime_thread([&] { runtime.run(); }); + + auto scheduler = condy::get_scheduler(runtime); + + bool executed1 = false; + bool executed2 = false; + + auto sender1 = ex::schedule(scheduler) | ex::then([&] { + executed1 = true; + return 42; + }); + auto sender2 = ex::schedule(scheduler) | ex::then([&] { + executed2 = true; + return 0; + }); + + auto when_all_sender = ex::when_all(sender1, sender2); + auto [r1, r2] = ex::sync_wait(when_all_sender).value(); + + REQUIRE(executed1); + REQUIRE(executed2); + REQUIRE(r1 == 42); + REQUIRE(r2 == 0); + + runtime.allow_exit(); + runtime_thread.join(); +} + +TEST_CASE("test execution - when_any") { + condy::Runtime runtime; + std::thread runtime_thread([&] { runtime.run(); }); + + auto scheduler = condy::get_scheduler(runtime); + + __kernel_timespec ts = { + .tv_sec = 60ll * 60ll, + .tv_nsec = 0, + }; + auto sender = ex::schedule(scheduler) | ex::let_value([&] { + return exec::when_any(condy::async_timeout(&ts, 0, 0), + condy::async_nop()); + }); + + auto [r] = ex::sync_wait(sender).value(); + REQUIRE(r == 0); + + runtime.allow_exit(); + runtime_thread.join(); +} + +TEST_CASE("test execution - when_any with different thread") { + condy::Runtime runtime1, runtime2; + std::thread thread1([&] { runtime1.run(); }); + std::thread thread2([&] { runtime2.run(); }); + + auto scheduler1 = condy::get_scheduler(runtime1); + auto scheduler2 = condy::get_scheduler(runtime2); + + __kernel_timespec ts = { + .tv_sec = 60ll * 60ll, + .tv_nsec = 0, + }; + auto sender1 = ex::schedule(scheduler1) | ex::let_value([&] { + return condy::async_timeout(&ts, 0, 0); + }); + auto sender2 = ex::schedule(scheduler2) | ex::then([] { return 42; }); + auto when_any_sender = exec::when_any(sender1, sender2); + auto [r] = ex::sync_wait(when_any_sender).value(); + REQUIRE(r == 42); + + runtime1.allow_exit(); + runtime2.allow_exit(); + thread1.join(); + thread2.join(); +} + +TEST_CASE("test execution - condy when_all") { + using condy::operators::operator&&; + + condy::Runtime runtime; + std::thread runtime_thread([&] { runtime.run(); }); + + auto scheduler = condy::get_scheduler(runtime); + + bool executed = false; + ex::sender auto sender = ex::schedule(scheduler) | ex::let_value([&] { + executed = true; + return condy::async_nop() && + condy::async_nop() && + condy::async_nop(); + }); + + auto [r1, r2, r3] = ex::sync_wait(sender).value(); + REQUIRE(executed); + REQUIRE(r1 == 0); + REQUIRE(r2 == 0); + REQUIRE(r3 == 0); + + runtime.allow_exit(); + runtime_thread.join(); +} \ No newline at end of file diff --git a/third_party/stdexec b/third_party/stdexec new file mode 160000 index 00000000..1a218278 --- /dev/null +++ b/third_party/stdexec @@ -0,0 +1 @@ +Subproject commit 1a21827866b49bb895f26308b4671336770ffac1