diff --git a/.github/workflows/ci.cpu.yml b/.github/workflows/ci.cpu.yml index 947577859..e4c90f93c 100644 --- a/.github/workflows/ci.cpu.yml +++ b/.github/workflows/ci.cpu.yml @@ -170,7 +170,7 @@ jobs: - run: echo "CI (CPU) (Windows) success" build-cpu-macos: - runs-on: macos-15-large + runs-on: macos-26-large name: macos-${{ matrix.name }} strategy: fail-fast: false diff --git a/include/exec/any_sender_of.hpp b/include/exec/any_sender_of.hpp index ed24f088b..05b5fb794 100644 --- a/include/exec/any_sender_of.hpp +++ b/include/exec/any_sender_of.hpp @@ -977,7 +977,7 @@ namespace experimental::execution STDEXEC_ATTRIBUTE(no_unique_address) _Receiver __rcvr_; STDEXEC::inplace_stop_source __stop_source_{}; using __stop_callback = typename STDEXEC::stop_token_of_t< - STDEXEC::env_of_t<_Receiver>>::template callback_type; + STDEXEC::env_of_t<_Receiver>>::template callback_type>; std::optional<__stop_callback> __on_stop_{}; }; diff --git a/include/exec/task.hpp b/include/exec/task.hpp index eebf599c1..cc4a31a32 100644 --- a/include/exec/task.hpp +++ b/include/exec/task.hpp @@ -186,7 +186,7 @@ namespace experimental::execution struct __default_awaiter_context<_ParentPromise> { using __stop_token_t = stop_token_of_t>; - using __stop_callback_t = __stop_token_t::template callback_type<__forward_stop_request>; + using __stop_callback_t = __stop_token_t::template callback_type<__forward_stop_request<>>; template <__scheduler_affinity _Affinity> constexpr explicit __default_awaiter_context(__default_task_context_impl<_Affinity>& __self, @@ -199,7 +199,7 @@ namespace experimental::execution { static_assert(std::is_nothrow_constructible_v<__stop_callback_t, __stop_token_t, - __forward_stop_request>); + __forward_stop_request<>>); __self.__stop_token_ = __stop_source_.get_token(); } @@ -251,7 +251,7 @@ namespace experimental::execution // stop_source when stop is requested on the parent coroutine's stop // token. using __stop_token_t = stop_token_of_t>; - using __stop_callback_t = stop_callback_for_t<__stop_token_t, __forward_stop_request>; + using __stop_callback_t = stop_callback_for_t<__stop_token_t, __forward_stop_request<>>; if constexpr (std::same_as<__stop_token_t, inplace_stop_token>) { diff --git a/include/exec/when_any.hpp b/include/exec/when_any.hpp index fa31f4dd6..8691813ba 100644 --- a/include/exec/when_any.hpp +++ b/include/exec/when_any.hpp @@ -95,7 +95,7 @@ namespace experimental::execution {} using __on_stop = - stop_callback_for_t&>, __forward_stop_request>; + stop_callback_for_t&>, __forward_stop_request<>>; inplace_stop_source __stop_source_{}; std::optional<__on_stop> __on_stop_{}; @@ -260,7 +260,7 @@ namespace experimental::execution __copy_cvref_t<_Self, _Senders>...>) -> __opstate_t<_Self, _Receiver> { - return STDEXEC::__apply(STDEXEC::__construct<__opstate_t<_Self, _Receiver>>{}, + return STDEXEC::__apply(STDEXEC::__construct_from<__opstate_t<_Self, _Receiver>>{}, static_cast<_Self&&>(__self).__sndrs_, static_cast<_Receiver&&>(__rcvr)); } diff --git a/include/nvexec/stream/ensure_started.cuh b/include/nvexec/stream/ensure_started.cuh index 385816a86..8844020ae 100644 --- a/include/nvexec/stream/ensure_started.cuh +++ b/include/nvexec/stream/ensure_started.cuh @@ -247,7 +247,7 @@ namespace nv::execution::_strm , public _strm::opstate_base { using on_stop_t = std::optional< - stop_callback_for_t>, __forward_stop_request>>; + stop_callback_for_t>, __forward_stop_request<>>>; on_stop_t on_stop_{}; __intrusive_ptr> shared_state_; diff --git a/include/nvexec/stream/split.cuh b/include/nvexec/stream/split.cuh index 87234c6f6..a63be1bc4 100644 --- a/include/nvexec/stream/split.cuh +++ b/include/nvexec/stream/split.cuh @@ -246,7 +246,7 @@ namespace nv::execution::_strm , public _strm::opstate_base { using on_stop_t = std::optional< - stop_callback_for_t>, __forward_stop_request>>; + stop_callback_for_t>, __forward_stop_request<>>>; on_stop_t on_stop_{}; std::shared_ptr> sh_state_; diff --git a/include/nvexec/stream/when_all.cuh b/include/nvexec/stream/when_all.cuh index 68919b875..c76c5a22b 100644 --- a/include/nvexec/stream/when_all.cuh +++ b/include/nvexec/stream/when_all.cuh @@ -301,7 +301,7 @@ namespace nv::execution::_strm using _indices_t = __indices_for; using _stream_providers_t = std::array; using _stop_callback_t = - stop_callback_for_t>, __forward_stop_request>; + stop_callback_for_t>, __forward_stop_request<>>; template using _child_opstate_t = diff --git a/include/stdexec/__detail/__as_awaitable.hpp b/include/stdexec/__detail/__as_awaitable.hpp index d46e3c4ba..8cb841810 100644 --- a/include/stdexec/__detail/__as_awaitable.hpp +++ b/include/stdexec/__detail/__as_awaitable.hpp @@ -17,6 +17,7 @@ #include "__execution_fwd.hpp" +#include "../functional.hpp" #include "__atomic.hpp" #include "__awaitable.hpp" #include "__completion_signatures_of.hpp" @@ -40,7 +41,10 @@ STDEXEC_PRAGMA_IGNORE_MSVC(4714) // marked as __forceinline not inlined namespace STDEXEC { #if !STDEXEC_NO_STDCPP_COROUTINES() - namespace __detail + ///////////////////////////////////////////////////////////////////////////// + // STDEXEC::as_awaitable [exec.as.awaitable] + + namespace __as_awaitable { template extern __q<__decayed_std_tuple> const __as_single; @@ -74,12 +78,7 @@ namespace STDEXEC template using __adapted_sender_t = __remove_rvalue_reference_t<__call_result_t<__adapt_completion_t<_Sender>, _Sender>>; - } // namespace __detail - ///////////////////////////////////////////////////////////////////////////// - // STDEXEC::as_awaitable [exec.as.awaitable] - namespace __as_awaitable - { struct __void {}; @@ -90,6 +89,8 @@ namespace STDEXEC using __expected_t = std::variant, std::exception_ptr>; + using __connect_await::__has_as_awaitable_member; + template concept __completes_inline_for = __never_sends<_Tag, _Sender, _Env...> || STDEXEC::__completes_inline<_Tag, env_of_t<_Sender>, _Env...>; @@ -278,19 +279,18 @@ namespace STDEXEC }; template - using __sync_receiver_t = __sync_receiver<_Promise, __detail::__value_t<_Sender, _Promise>>; + using __sync_receiver_t = __sync_receiver<_Promise, __value_t<_Sender, _Promise>>; template - using __async_receiver_t = __async_receiver<_Promise, __detail::__value_t<_Sender, _Promise>>; + using __async_receiver_t = __async_receiver<_Promise, __value_t<_Sender, _Promise>>; ////////////////////////////////////////////////////////////////////////////////////// // __sender_awaitable: awaitable type returned by as_awaitable when given a sender // that does not have an as_awaitable member function - template - struct __sender_awaitable - : __sender_awaitable_base<__detail::__value_t<_Sender, _Promise>, false> + template > _Sender> + struct __sender_awaitable : __sender_awaitable_base<__value_t<_Sender, _Promise>, false> { - using __value_t = __detail::__value_t<_Sender, _Promise>; + using __value_t = __as_awaitable::__value_t<_Sender, _Promise>; constexpr explicit __sender_awaitable(_Sender&& __sndr, __std::coroutine_handle<_Promise> __hcoro) @@ -350,12 +350,12 @@ namespace STDEXEC // When the sender is known to complete inline, we can connect and start the operation // in await_suspend. - template + template > _Sender> requires __completes_inline<_Sender, env_of_t<_Promise&>> struct __sender_awaitable<_Promise, _Sender> - : __sender_awaitable_base<__detail::__value_t<_Sender, _Promise>, true> + : __sender_awaitable_base<__value_t<_Sender, _Promise>, true> { - using __value_t = __detail::__value_t<_Sender, _Promise>; + using __value_t = __as_awaitable::__value_t<_Sender, _Promise>; constexpr explicit __sender_awaitable(_Sender&& sndr, __std::coroutine_handle<_Promise> __hcoro) @@ -404,7 +404,7 @@ namespace STDEXEC template concept __awaitable_adapted_sender = sender_in<_Sender, env_of_t<_Promise&>> - && __minvocable_q<__detail::__value_t, _Sender, _Promise> + && __minvocable_q<__value_t, _Sender, _Promise> && requires(_Promise& __promise) { { __promise.unhandled_stopped() @@ -412,8 +412,7 @@ namespace STDEXEC }; template - concept __awaitable_sender = - __awaitable_adapted_sender<__detail::__adapted_sender_t<_Sender>, _Promise>; + concept __awaitable_sender = __awaitable_adapted_sender<__adapted_sender_t<_Sender>, _Promise>; struct __unspecified { @@ -426,92 +425,68 @@ namespace STDEXEC }; template - concept __incompatible_sender = sender<_Sender> - && __merror<__detail::__value_t<_Sender, _Promise>>; - } // namespace __as_awaitable + concept __incompatible_sender = sender<_Sender> && __merror<__value_t<_Sender, _Promise>>; - struct as_awaitable_t - { - template - static consteval auto __get_declfn() noexcept - { - using namespace __as_awaitable; - if constexpr (__connect_await::__has_as_awaitable_member<_Tp, _Promise>) - { - using __result_t = decltype(__declval<_Tp>().as_awaitable(__declval<_Promise&>())); - constexpr bool __is_nothrow = noexcept( - __declval<_Tp>().as_awaitable(__declval<_Promise&>())); - return __declfn<__result_t, __is_nothrow>(); - } - else if constexpr (__awaitable<_Tp, __unspecified>) // NOT __awaitable<_Tp, _Promise> !! - { // NOLINT(bugprone-branch-clone) - return __declfn<_Tp&&>(); - } - else if constexpr (__awaitable_sender<_Tp, _Promise>) - { - using __result_t = decltype( // - __sender_awaitable{__detail::__adapt_sender_for_await(__declval<_Tp>()), - __std::coroutine_handle<_Promise>()}); - constexpr bool __is_nothrow = noexcept( - __sender_awaitable{__detail::__adapt_sender_for_await(__declval<_Tp>()), - __std::coroutine_handle<_Promise>()}); - return __declfn<__result_t, __is_nothrow>(); - } - else if constexpr (__incompatible_sender<_Tp, _Promise>) - { - // NOT TO SPEC: It's a sender, but it isn't a sender in the current promise's - // environment, so we can return the error type that results from trying to - // compute the sender's value type: - return __declfn<__detail::__value_t<_Tp, _Promise>>(); - } - else - { - return __declfn<_Tp&&>(); - } - } + template + concept __has_transform_as_awaitable_member = + sender_in<_Sender, env_of_t<_Promise>> + && __has_as_awaitable_member>, + _Promise>; - template ()> - requires __callable<__mtypeof<_DeclFn>> - auto operator()(_Tp&& __t, _Promise& __promise) const noexcept(noexcept(_DeclFn())) - -> decltype(_DeclFn()) + template + concept __awaitable_transform_sender = // + sender_in<_Sender, env_of_t<_Promise>> + && __awaitable_sender>, _Promise>; + + inline constexpr auto __with_member = // + [] _Tp>(_Tp&& __t, auto& __promise) + STDEXEC_AUTO_RETURN(static_cast<_Tp&&>(__t).as_awaitable(__promise)); + + inline constexpr auto __with_transform_member = // + [] _Tp>(_Tp&& __t, + _Promise& __promise) + STDEXEC_AUTO_RETURN( + STDEXEC::transform_sender(static_cast<_Tp&&>(__t), STDEXEC::get_env(__promise)) + .as_awaitable(__promise)); + + inline constexpr auto __with_await = // + []<__awaitable<__unspecified> _Tp>(_Tp&& __t, __ignore) + STDEXEC_AUTO_RETURN(static_cast<_Tp&&>(__t)); + + inline constexpr auto __with_sender = // + [] _Tp>(_Tp&& __t, _Promise& __promise) + STDEXEC_AUTO_RETURN(__sender_awaitable{ + __as_awaitable::__adapt_sender_for_await( + STDEXEC::transform_sender(static_cast<_Tp&&>(__t), STDEXEC::get_env(__promise))), + __std::coroutine_handle<_Promise>::from_promise(__promise)}); + + // NOT TO SPEC: It's a sender, but it isn't a sender in the current promise's + // environment, so we can return the error type that results from trying to + // compute the sender's value type: + inline constexpr auto __with_incompatible_sender = // + [] _Tp>(_Tp&&, _Promise&) { - using namespace __as_awaitable; - if constexpr (__connect_await::__has_as_awaitable_member<_Tp, _Promise>) - { - return static_cast<_Tp&&>(__t).as_awaitable(__promise); - } - else if constexpr (__awaitable<_Tp, __unspecified>) // NOT __awaitable<_Tp, _Promise> !! - { // NOLINT(bugprone-branch-clone) - return static_cast<_Tp&&>(__t); - } - else if constexpr (__awaitable_sender<_Tp, _Promise>) - { - auto __hcoro = __std::coroutine_handle<_Promise>::from_promise(__promise); - return __sender_awaitable{__detail::__adapt_sender_for_await(static_cast<_Tp&&>(__t)), - __hcoro}; - } - else if constexpr (__incompatible_sender<_Tp, _Promise>) - { - return __detail::__value_t<_Tp, _Promise>(); - } - else - { - return static_cast<_Tp&&>(__t); - } - } + return __value_t<_Tp, _Promise>{}; + }; - template ()> - requires __callable<__mtypeof<_DeclFn>> || __tag_invocable - [[deprecated("the use of tag_invoke for as_awaitable is deprecated")]] - auto operator()(_Tp&& __t, _Promise& __promise) const - noexcept(__nothrow_tag_invocable) - -> __tag_invoke_result_t + inline constexpr auto __identity = // + [](_Tp&& __t, __ignore) noexcept -> decltype(auto) { - using __result_t = __tag_invoke_result_t; - static_assert(__awaitable<__result_t, _Promise>); - return __tag_invoke(*this, static_cast<_Tp&&>(__t), __promise); - } - }; + return static_cast<_Tp&&>(__t); + }; + + inline constexpr auto __as_awaitable_impl = // + __first_callable{__with_member, + __with_transform_member, + __with_await, + __with_sender, + __with_incompatible_sender, + __identity}; + + } // namespace __as_awaitable + + struct as_awaitable_t : decltype(__as_awaitable::__as_awaitable_impl) + {}; inline constexpr as_awaitable_t as_awaitable{}; #endif diff --git a/include/stdexec/__detail/__parallel_scheduler_backend.hpp b/include/stdexec/__detail/__parallel_scheduler_backend.hpp index 502c89264..421f86548 100644 --- a/include/stdexec/__detail/__parallel_scheduler_backend.hpp +++ b/include/stdexec/__detail/__parallel_scheduler_backend.hpp @@ -116,7 +116,7 @@ namespace STDEXEC template struct __stop_callback_for { - using __callback_t = stop_callback_for_t<_Token, __forward_stop_request>; + using __callback_t = stop_callback_for_t<_Token, __forward_stop_request<>>; bool __register_stop_callback(_Token __token) { diff --git a/include/stdexec/__detail/__task.hpp b/include/stdexec/__detail/__task.hpp index a3c125cfd..3577c074f 100644 --- a/include/stdexec/__detail/__task.hpp +++ b/include/stdexec/__detail/__task.hpp @@ -15,11 +15,11 @@ */ #pragma once +#include "../stop_token.hpp" #include "__affine_on.hpp" #include "__as_awaitable.hpp" #include "__config.hpp" #include "__inline_scheduler.hpp" -#include "__manual_lifetime.hpp" #include "__meta.hpp" #include "__optional.hpp" #include "__schedulers.hpp" @@ -58,19 +58,6 @@ namespace STDEXEC constexpr void return_void() {} }; - template - struct __on_stopped - { - void operator()() noexcept - { - __source_.request_stop(); - } - _StopSource& __source_; - }; - - template - __on_stopped(_StopSource&) -> __on_stopped<_StopSource>; - constexpr size_t __divmod(size_t __total_size, size_t __chunk_size) noexcept { return (__total_size / __chunk_size) + (__total_size % __chunk_size != 0); @@ -135,18 +122,68 @@ namespace STDEXEC template using __stop_source_type = _Env::stop_source_type; - template - using __environment_type = _Env::template env_type>; - template - using __error_types = __error_types_t>, - __mcompose<__qf, __q1<__decay_t>>>; - - template - concept __has_allocator_compatible_with = requires(_Rcvr& __rcvr) { - _Alloc(STDEXEC::get_allocator(STDEXEC::get_env(__rcvr))); - } || std::default_initializable<_Alloc>; + using __error_types = _Env::error_types; + + template + using __environment_type = _Env::template env_type>; + + template + concept __has_allocator_compatible_with = requires(_EnvProvider const & __has_env) { + _Alloc(STDEXEC::get_allocator(STDEXEC::get_env(__has_env))); + }; + + template + concept __has_scheduler_compatible_with = requires(_EnvProvider const & __has_env) { + _Scheduler(STDEXEC::get_scheduler(STDEXEC::get_env(__has_env))); + }; + + template + using __stop_source_token_t = decltype(__declval<_StopSource>().get_token()); + + template + struct __stop_callback_box + { + void __register_callback(__ignore, __ignore) noexcept {} + void __reset_callback() noexcept {} + }; + + template + requires __not_same_as<__stop_source_token_t<_StopSource>, _StopToken> + struct __stop_callback_box<_StopToken, _StopSource> + { + using __stop_variant_t = __variant<_StopSource, __stop_source_token_t<_StopSource>>; + using __callback_fn_t = __forward_stop_request<_StopSource>; + using __stop_callback_t = stop_callback_for_t<_StopToken, __callback_fn_t>; + + constexpr __stop_callback_box() {} + + void __register_callback(auto const & __has_env, __stop_variant_t& __stop) + noexcept(__nothrow_constructible_from<__stop_callback_t, _StopToken, _StopSource&>) + { + std::construct_at(&__cb_, get_stop_token(get_env(__has_env)), __var::__get<0>(__stop)); + } + + void __reset_callback() noexcept + { + std::destroy_at(&__cb_); + } + + union + { + __stop_callback_t __cb_; + }; + }; + + template + using __stop_callback_box_t = + __stop_callback_box>, _StopSource>; + + inline constexpr auto __throw_error = __overload{ + []([[maybe_unused]] auto&& __error) { STDEXEC_THROW((decltype(__error)&&) __error); }, + []([[maybe_unused]] std::error_code __ec) { STDEXEC_THROW(std::system_error(__ec)); }, + []([[maybe_unused]] std::exception_ptr __eptr) { std::rethrow_exception(__eptr); }}; + } // namespace __task //////////////////////////////////////////////////////////////////////////////// @@ -169,6 +206,8 @@ namespace STDEXEC struct __promise; template struct __opstate; + template + using __own_env_t = __minvoke_or_q<__task::__environment_type, env<>, _Env, _EnvProvider>; public: using sender_concept = sender_t; using promise_type = __promise; @@ -193,9 +232,10 @@ namespace STDEXEC template constexpr auto connect(_Rcvr rcvr) && -> __opstate<_Rcvr> { + static_assert(__task::__has_allocator_compatible_with<_Rcvr, allocator_type> + || std::default_initializable); STDEXEC_ASSERT(__coro_); - static_assert(__task::__has_allocator_compatible_with<_Rcvr, allocator_type>); - return __opstate<_Rcvr>(std::exchange(__coro_, {}), static_cast<_Rcvr&&>(rcvr)); + return __opstate<_Rcvr>(static_cast(*this), static_cast<_Rcvr&&>(rcvr)); } template @@ -207,133 +247,98 @@ namespace STDEXEC [[nodiscard]] constexpr auto get_env() const noexcept { - return __env{}; + return __attrs{}; + } + + template + constexpr auto as_awaitable(_ParentPromise& __parent) && noexcept + { + return __awaiter<_ParentPromise>(static_cast(*this), __parent); } private: - using __on_stopped_t = __task::__on_stopped; + using __on_stopped_t = __forward_stop_request; + using __stop_variant_t = __variant; - using __error_variant_t = __error_types_t, __q1<__decay_t>>; + template + using __stop_callback_t = + stop_callback_for_t>, __on_stopped_t>; - using __completions_t = __concat_completion_signatures_t< - completion_signatures<__detail::__single_value_sig_t<_Ty>, set_stopped_t()>, - error_types>; - - template - using __stop_callback_t = stop_callback_for_t>, __on_stopped_t>; + template + using __stop_callback_box_t = __task::__stop_callback_box_t<_EnvProvider, stop_source_type>; - template + template static constexpr bool __needs_stop_callback = - __not_same_as>>; - - struct __env - { - template - [[nodiscard]] - constexpr auto query(__get_completion_behavior_t<_Tag>) const noexcept - { - return __completion_behavior::__asynchronous_affine - | __completion_behavior::__inline_completion; - } - }; + __not_same_as>>; - struct __opstate_base - { - constexpr explicit __opstate_base(scheduler_type __sched) noexcept - : __sch_(std::move(__sched)) - {} - - virtual void __completed() noexcept = 0; - virtual void __canceled() noexcept = 0; - virtual auto __get_allocator() noexcept -> allocator_type = 0; - - scheduler_type __sch_; - __error_variant_t __errors_{__no_init}; - }; + template + static constexpr bool __nothrow_callback_registration = noexcept( + __declval<__stop_callback_box_t<_EnvProvider>&>() + .__register_callback(__declval<_EnvProvider&>(), __declval<__stop_variant_t&>())); - constexpr explicit task(__std::coroutine_handle __coro) noexcept - : __coro_(std::move(__coro)) - {} + using __error_variant_t = __error_types_t, __q1<__decay_t>>; - __std::coroutine_handle __coro_; - }; + using __completions_t = __concat_completion_signatures_t< + completion_signatures<__detail::__single_value_sig_t<_Ty>, set_stopped_t()>, + error_types>; - //////////////////////////////////////////////////////////////////////////////////////// - // task::__opstate - template - template - struct STDEXEC_ATTRIBUTE(empty_bases) task<_Ty, _Env>::__opstate final - : __opstate_base - , __if_c<__needs_stop_callback<_Rcvr>, __manual_lifetime<__stop_callback_t<_Rcvr>>, __empty> - { - public: - using operation_state_concept = operation_state_t; + static constexpr void __sink(task) noexcept {} - explicit __opstate(__std::coroutine_handle __coro, _Rcvr&& __rcvr) noexcept - : __opstate_base(__mk_sched(__rcvr)) - , __coro_(std::move(__coro)) - , __rcvr_(static_cast<_Rcvr&&>(__rcvr)) - , __own_env_(__mk_own_env(__rcvr_)) - , __env_(__mk_env(__rcvr_, __own_env_)) + template + [[nodiscard]] + static auto __mk_alloc(_EnvProvider const & __has_env) noexcept -> allocator_type { - // Set the promise's state pointer to this operation state, so it can call back into - // it when the coroutine completes or is stopped. - __coro_.promise().__state_ = this; - // Initialize the promise's stop source if translation is needed between the - // receiver's stop token and the task's stop token: - if constexpr (__needs_stop_callback<_Rcvr>) + if constexpr (__task::__has_allocator_compatible_with<_EnvProvider, allocator_type>) { - __coro_.promise().__stop_.template emplace<0>(); + return allocator_type(get_allocator(STDEXEC::get_env(__has_env))); } else { - __coro_.promise().__stop_.template emplace<1>(get_stop_token(STDEXEC::get_env(__rcvr_))); + return allocator_type{}; } } - ~__opstate() - { - if (__coro_) - __coro_.destroy(); - } - - void start() & noexcept + template + [[nodiscard]] + static auto __mk_sched(_EnvProvider const & __has_env) noexcept -> scheduler_type { - if constexpr (__needs_stop_callback<_Rcvr>) + if constexpr (__task::__has_scheduler_compatible_with<_EnvProvider, scheduler_type>) + { + return scheduler_type(get_scheduler(STDEXEC::get_env(__has_env))); + } + else { - // If the receiver's stop token is different from the task's stop token, then we need - // to set up a callback to request a stop on the task's stop source when the receiver's - // stop token is triggered: - __stop_callback().__construct(get_stop_token(STDEXEC::get_env(__rcvr_)), - __on_stopped_t{__var::__get<0>(__coro_.promise().__stop_)}); + return scheduler_type{}; } - __coro_.resume(); } - private: - using __own_env_t = __minvoke_or_q<__task::__environment_type, env<>, _Env, _Rcvr>; - - static auto __mk_own_env(_Rcvr const & __rcvr) noexcept -> __own_env_t + template + [[nodiscard]] + static auto __mk_own_env(_EnvProvider const & __has_env) noexcept { - if constexpr (__std::constructible_from<__own_env_t, env_of_t<_Rcvr>>) + if constexpr (__std::constructible_from<__own_env_t<_EnvProvider>, env_of_t<_EnvProvider>>) { - return __own_env_t(STDEXEC::get_env(__rcvr)); + return __own_env_t<_EnvProvider>(STDEXEC::get_env(__has_env)); } else { - return __own_env_t{}; + return __own_env_t<_EnvProvider>{}; } } - static auto __mk_env(_Rcvr const & __rcvr, __own_env_t const & __own_env) noexcept -> _Env + template + [[nodiscard]] + static auto + __mk_env(_EnvProvider const & __has_env, __own_env_t<_EnvProvider> const & __own_env) noexcept + -> _Env { - if constexpr (__std::constructible_from<_Env, __own_env_t const &>) + if constexpr (__std::constructible_from<_Env, __own_env_t<_EnvProvider> const &>) { return _Env(__own_env); } - else if constexpr (__std::constructible_from<_Env, env_of_t<_Rcvr>>) + else if constexpr (__std::constructible_from<_Env, env_of_t<_EnvProvider>>) { - return _Env(STDEXEC::get_env(__rcvr)); + return _Env(STDEXEC::get_env(__has_env)); } else { @@ -341,93 +346,247 @@ namespace STDEXEC } } - static auto __mk_sched(_Rcvr const & __rcvr) noexcept -> scheduler_type + struct __opstate_base : allocator_type { - if constexpr (requires { scheduler_type(get_scheduler(STDEXEC::get_env(__rcvr))); }) + template + constexpr explicit __opstate_base(task&& __task, _EnvProvider const & __has_env) noexcept + : allocator_type(__mk_alloc(__has_env)) + , __sch_(__mk_sched(__has_env)) + , __task_(static_cast(__task)) { - return scheduler_type(get_scheduler(STDEXEC::get_env(__rcvr))); + auto& __promise = __task_.__coro_.promise(); + // Set the promise's state pointer to this operation state, so it can call back into + // it when the coroutine completes or is stopped. + __promise.__state_ = this; + + // Initialize the promise's stop source if translation is needed between the + // receiver's stop token and the task's stop token: + if constexpr (__needs_stop_callback<_EnvProvider>) + { + __promise.__stop_.template emplace<0>(); + } + else + { + __promise.__stop_.template emplace<1>(get_stop_token(STDEXEC::get_env(__has_env))); + } } - else + + STDEXEC_IMMOVABLE(__opstate_base); + + virtual auto __completed() noexcept -> __std::coroutine_handle<> = 0; + virtual auto __canceled() noexcept -> __std::coroutine_handle<> = 0; + + [[nodiscard]] + constexpr auto __get_allocator() const noexcept -> allocator_type { - return scheduler_type{}; + return static_cast(*this); } - } - auto __stop_callback() noexcept -> __manual_lifetime<__stop_callback_t<_Rcvr>>& - requires __needs_stop_callback<_Rcvr> - { - return *this; - } + constexpr auto __handle() const noexcept -> __std::coroutine_handle + { + return __task_.__coro_; + } + + scheduler_type __sch_; + task __task_; + __error_variant_t __errors_{__no_init}; + }; - void __completed() noexcept final + template + struct STDEXEC_ATTRIBUTE(empty_bases) __awaiter final + : __opstate_base + , __stop_callback_box_t<_ParentPromise> { - if constexpr (__needs_stop_callback<_Rcvr>) + constexpr explicit __awaiter(task&& __task, _ParentPromise& __parent) noexcept + : __opstate_base(static_cast(__task), __parent) + , __own_env_(__mk_own_env(__parent)) + , __env_(__mk_env(__parent, __own_env_)) + , __parent_(__parent) + {} + + static constexpr auto await_ready() noexcept -> bool { - // If we set up a stop callback on the receiver's stop token, then we need to - // disable it when the operation completes: - __stop_callback().__destroy(); + return false; } - if (this->__errors_.index() != __variant_npos) + constexpr auto await_suspend(__std::coroutine_handle<_ParentPromise> __h) + noexcept(__nothrow_callback_registration<_ParentPromise>) -> __std::coroutine_handle<> { - std::exchange(__coro_, {}).destroy(); - __visit(STDEXEC::set_error, std::move(this->__errors_), static_cast<_Rcvr&&>(__rcvr_)); + auto& __task_promise = this->__handle().promise(); + // If the following throws, the coroutine is immediately resumed and the exception + // is rethrown at the suspension point. + this->__register_callback(__h.promise(), __task_promise.__stop_); + __task_promise.__state_ = this; + __continuation_ = __h; + return this->__handle(); } - else if constexpr (__same_as<_Ty, void>) + + constexpr auto await_resume() -> _Ty { - std::exchange(__coro_, {}).destroy(); - STDEXEC::set_value(static_cast<_Rcvr&&>(__rcvr_)); + // Destroy the coroutine after moving the result/error out of it + auto __task = std::move(this->__task_); + if (!this->__errors_.__is_valueless()) + { + __visit(__task::__throw_error, std::move(this->__errors_)); + } + else if constexpr (__same_as<_Ty, void>) + { + return; + } + else + { + return static_cast<_Ty&&>(*__task.__coro_.promise().__result_); + } + __std::unreachable(); } - else + + [[nodiscard]] + auto __completed() noexcept -> __std::coroutine_handle<> final { - STDEXEC_TRY + this->__reset_callback(); + return __continuation_; + } + + [[nodiscard]] + auto __canceled() noexcept -> __std::coroutine_handle<> final + { + this->__reset_callback(); + return __parent_.unhandled_stopped(); + } + + STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS + __own_env_t<_ParentPromise> __own_env_; + STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS + _Env __env_; + __std::coroutine_handle<> __continuation_; + _ParentPromise& __parent_; + }; + + struct __attrs + { + template + [[nodiscard]] + constexpr auto query(__get_completion_behavior_t<_Tag>, _OtherEnv&&...) const noexcept + { + using __attrs_t = env_of_t>; + + if constexpr (__completes_inline) { - // Move the result out of the promise before destroying the coroutine. - _Ty __result = static_cast<_Ty&&>(*__coro_.promise().__result_); - std::exchange(__coro_, {}).destroy(); - STDEXEC::set_value(static_cast<_Rcvr&&>(__rcvr_), static_cast<_Ty&&>(__result)); + return __completion_behavior::__unknown; } - STDEXEC_CATCH_ALL + else { - if constexpr (!__nothrow_move_constructible<_Ty>) - { - std::exchange(__coro_, {}).destroy(); - STDEXEC::set_error(static_cast<_Rcvr&&>(__rcvr_), std::current_exception()); - } + return __completion_behavior::__asynchronous_affine + | __completion_behavior::__inline_completion; } } - } + }; + + constexpr explicit task(__std::coroutine_handle __coro) noexcept + : __coro_(std::move(__coro)) + {} + + __std::coroutine_handle __coro_; + }; - void __canceled() noexcept final + //////////////////////////////////////////////////////////////////////////////////////// + // task::__opstate + template + template + struct STDEXEC_ATTRIBUTE(empty_bases) task<_Ty, _Env>::__opstate final + : __opstate_base + , __stop_callback_box_t<_Rcvr> + { + public: + using operation_state_concept = operation_state_t; + + explicit __opstate(task&& __task, _Rcvr&& __rcvr) noexcept + : __opstate_base(static_cast(__task), __rcvr) + , __own_env_(__mk_own_env(__rcvr)) + , __env_(__mk_env(__rcvr, __own_env_)) + , __rcvr_(static_cast<_Rcvr&&>(__rcvr)) + {} + + void start() & noexcept { - if constexpr (__needs_stop_callback<_Rcvr>) + STDEXEC_TRY { - __stop_callback().__destroy(); + // Register a stop callback if needed + this->__register_callback(__rcvr_, this->__handle().promise().__stop_); + this->__handle().resume(); + } + STDEXEC_CATCH_ALL + { + if constexpr (__nothrow_callback_registration<_Rcvr>) + { + // no-op + } + else if constexpr (__mapply<__mcontains, + error_types>::value) + { + STDEXEC::set_error(static_cast<_Rcvr&&>(__rcvr_), std::current_exception()); + } + else + { + STDEXEC::__die("Starting the task failed due to an exception being thrown while " + "registering a stop callback, but the task's error_types does not " + "include std::exception_ptr, so the exception cannot be propagated."); + } } - - std::exchange(__coro_, {}).destroy(); - STDEXEC::set_stopped(static_cast<_Rcvr&&>(__rcvr_)); } - [[nodiscard]] - auto __get_allocator() noexcept -> allocator_type final + private: + auto __completed() noexcept -> __std::coroutine_handle<> final { - if constexpr (requires { allocator_type(get_allocator(STDEXEC::get_env(__rcvr_))); }) + STDEXEC_TRY { - return allocator_type(get_allocator(STDEXEC::get_env(__rcvr_))); + this->__reset_callback(); + + if (!this->__errors_.__is_valueless()) + { + // Move the errors out of the promise before destroying the coroutine. + auto __errors = std::move(this->__errors_); + __sink(static_cast(this->__task_)); + __visit(STDEXEC::set_error, std::move(__errors), static_cast<_Rcvr&&>(__rcvr_)); + } + else if constexpr (__same_as<_Ty, void>) + { + __sink(static_cast(this->__task_)); + STDEXEC::set_value(static_cast<_Rcvr&&>(__rcvr_)); + } + else + { + // Move the result out of the promise before destroying the coroutine. + _Ty __result = static_cast<_Ty&&>(*this->__handle().promise().__result_); + __sink(static_cast(this->__task_)); + STDEXEC::set_value(static_cast<_Rcvr&&>(__rcvr_), static_cast<_Ty&&>(__result)); + } } - else + STDEXEC_CATCH_ALL { - return allocator_type{}; + if constexpr (!__nothrow_move_constructible<_Ty> + || !__nothrow_move_constructible<__error_variant_t>) + { + __sink(static_cast(this->__task_)); + STDEXEC::set_error(static_cast<_Rcvr&&>(__rcvr_), std::current_exception()); + } } + return std::noop_coroutine(); + } + + auto __canceled() noexcept -> __std::coroutine_handle<> final + { + this->__reset_callback(); + __sink(static_cast(this->__task_)); + STDEXEC::set_stopped(static_cast<_Rcvr&&>(__rcvr_)); + return std::noop_coroutine(); } - __std::coroutine_handle __coro_; - _Rcvr __rcvr_; STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS - __own_env_t __own_env_; + __own_env_t<_Rcvr> __own_env_; STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS - _Env __env_; + _Env __env_; + _Rcvr __rcvr_; }; //////////////////////////////////////////////////////////////////////////////////////// @@ -450,7 +609,7 @@ namespace STDEXEC auto final_suspend() noexcept { - return __completed_awaitable{}; + return __completed_awaiter{}; } void unhandled_exception() @@ -458,10 +617,8 @@ namespace STDEXEC if constexpr (!__mapply<__mcontains, __error_variant_t>::value) { STDEXEC::__die("A task threw an exception but does not have std::exception_ptr in its " - "error_types. " - "Either add std::exception_ptr to the task's error_types or ensure that all " - "code called " - "by the task is noexcept."); + "error_types. Either add std::exception_ptr to the task's error_types or " + "ensure that all code called by the task is noexcept."); } else { @@ -472,13 +629,12 @@ namespace STDEXEC [[nodiscard]] auto unhandled_stopped() noexcept -> __std::coroutine_handle<> { - __state_->__canceled(); - return std::noop_coroutine(); + return __state_->__canceled(); } template - constexpr auto - yield_value(with_error<_Error> __error) noexcept(__nothrow_decay_copyable<_Error>) + constexpr auto yield_value(with_error<_Error> __error) // + noexcept(__nothrow_decay_copyable<_Error>) { if constexpr (__mapply<__mcontains<__decay_t<_Error>>, __error_variant_t>::value) { @@ -488,13 +644,14 @@ namespace STDEXEC { static_assert(__mnever<_Error>, "Error type not in task's error_types"); } - return __completed_awaitable{}; + return __completed_awaiter{}; } template constexpr auto await_transform(_Sender&& __sndr) noexcept { - if constexpr (__same_as) + using __schedule_sndr_t = schedule_result_t; + if constexpr (__completes_inline, __env>) { return STDEXEC::as_awaitable(static_cast<_Sender&&>(__sndr), *this); } @@ -537,10 +694,10 @@ namespace STDEXEC __task::__divmod(sizeof(__task::__any_alloc<__palloc_t>), sizeof(__task::__memblock)); size_t const __promise_blocks = __task::__divmod(__bytes, sizeof(__task::__memblock)); - __palloc_t __palloc(__alloc); - __pointer_t const __ptr = std::allocator_traits<__palloc_t>::allocate(__palloc, - __promise_blocks - + __alloc_blocks); + __palloc_t __palloc(__alloc); + auto* const __ptr = std::allocator_traits<__palloc_t>::allocate(__palloc, + __promise_blocks + + __alloc_blocks); // construct the allocator in the blocks immediately following the promise object: void* const __alloc_loc = __ptr + __promise_blocks; @@ -557,20 +714,23 @@ namespace STDEXEC } private: + template + friend struct __awaiter; template friend struct __opstate; + friend struct __opstate_base; - struct __completed_awaitable + struct __completed_awaiter { static constexpr bool await_ready() noexcept { return false; } - static constexpr void await_suspend(__std::coroutine_handle<__promise> __coro) noexcept + static constexpr auto await_suspend(__std::coroutine_handle<__promise> __coro) noexcept // + -> __std::coroutine_handle<> { - __promise& __self = __coro.promise(); - __self.__state_->__completed(); + return __coro.promise().__state_->__completed(); } static constexpr void await_resume() noexcept {} @@ -606,8 +766,8 @@ namespace STDEXEC __promise const * __promise_; }; - __variant __stop_{__no_init}; - __opstate_base* __state_ = nullptr; + __stop_variant_t __stop_{__no_init}; + __opstate_base* __state_ = nullptr; }; #endif // !STDEXEC_NO_STDCPP_COROUTINES() } // namespace STDEXEC diff --git a/include/stdexec/__detail/__utility.hpp b/include/stdexec/__detail/__utility.hpp index c3ff206d8..f19373191 100644 --- a/include/stdexec/__detail/__utility.hpp +++ b/include/stdexec/__detail/__utility.hpp @@ -70,16 +70,6 @@ namespace STDEXEC auto operator=(__move_only const &) -> __move_only& = delete; }; - // Helper to combine multiple function objects into one overload set - template - struct __overload : _Fns... - { - using _Fns::operator()...; - }; - - template - STDEXEC_HOST_DEVICE_DEDUCTION_GUIDE __overload(_Fns...) -> __overload<_Fns...>; - template using __call_result_t = decltype(__declval<_Fun>()(__declval<_As>()...)); diff --git a/include/stdexec/functional.hpp b/include/stdexec/functional.hpp index 21f018014..77d5cbf2e 100644 --- a/include/stdexec/functional.hpp +++ b/include/stdexec/functional.hpp @@ -16,6 +16,7 @@ #pragma once #include "__detail/__config.hpp" +#include "__detail/__tuple.hpp" #include "__detail/__utility.hpp" #include "concepts.hpp" // IWYU pragma: keep @@ -28,32 +29,19 @@ namespace STDEXEC template struct __composed { - STDEXEC_ATTRIBUTE(no_unique_address) _Fun0 __t0_; - STDEXEC_ATTRIBUTE(no_unique_address) _Fun1 __t1_; - - template - requires __callable<_Fun1, _Ts...> && __callable<_Fun0, __call_result_t<_Fun1, _Ts...>> + template + requires __callable<__copy_cvref_t<_Self, _Fun1>, _Ts...> + && __callable<__copy_cvref_t<_Self, _Fun0>, + __call_result_t<__copy_cvref_t<_Self, _Fun1>, _Ts...>> STDEXEC_ATTRIBUTE(host, device, always_inline) - constexpr auto - operator()(_Ts &&...__ts) && noexcept(__callable<_Fun1, _Ts...> - && __callable<_Fun0, __call_result_t<_Fun1, _Ts...>>) - -> __call_result_t<_Fun0, __call_result_t<_Fun1, _Ts...>> - { - return static_cast<_Fun0 &&>(__t0_)( - static_cast<_Fun1 &&>(__t1_)(static_cast<_Ts &&>(__ts)...)); - } + constexpr STDEXEC_EXPLICIT_THIS_BEGIN(auto operator())(this _Self &&__self, _Ts &&...__ts) + STDEXEC_AUTO_RETURN( // + static_cast<_Self &&>(__self).__t0_( + static_cast<_Self &&>(__self).__t1_(static_cast<_Ts &&>(__ts)...))) // + STDEXEC_EXPLICIT_THIS_END(operator()) - template - requires __callable<_Fun1 const &, _Ts...> - && __callable<_Fun0 const &, __call_result_t<_Fun1 const &, _Ts...>> - STDEXEC_ATTRIBUTE(host, device, always_inline) - constexpr auto operator()(_Ts &&...__ts) const & noexcept( - __callable<_Fun1 const &, _Ts...> - && __callable<_Fun0 const &, __call_result_t<_Fun1 const &, _Ts...>>) - -> __call_result_t<_Fun0, __call_result_t<_Fun1, _Ts...>> - { - return __t0_(__t1_(static_cast<_Ts &&>(__ts)...)); - } + STDEXEC_ATTRIBUTE(no_unique_address) _Fun0 __t0_; + STDEXEC_ATTRIBUTE(no_unique_address) _Fun1 __t1_; }; inline constexpr struct __compose_t @@ -245,8 +233,8 @@ namespace STDEXEC template STDEXEC_ATTRIBUTE(host, device, always_inline) - constexpr void - operator()(_Ts &&...__ts) const noexcept((__nothrow_callable<_Fn const &, _Ts> && ...)) + constexpr void operator()(_Ts &&...__ts) const // + noexcept((__nothrow_callable<_Fn const &, _Ts> && ...)) { (static_cast(__fn_(static_cast<_Ts &&>(__ts))), ...); } @@ -277,7 +265,7 @@ namespace STDEXEC STDEXEC_HOST_DEVICE_DEDUCTION_GUIDE __always(_Ty) -> __always>; template - struct __construct + struct __construct_from { template requires __std::constructible_from<_Ty, _As...> @@ -288,4 +276,96 @@ namespace STDEXEC return _Ty(static_cast<_As &&>(__as)...); } }; + + //! \brief Helper to combine multiple function objects into one overload set + template + struct __overload : _Fns... + { + using _Fns::operator()...; + }; + + template + STDEXEC_HOST_DEVICE_DEDUCTION_GUIDE __overload(_Fns...) -> __overload<_Fns...>; + + namespace __detail + { + template + struct __get_1st_fn + { + STDEXEC_ATTRIBUTE(host, device, always_inline) + constexpr void operator()() const noexcept {} + + template + STDEXEC_ATTRIBUTE(host, device, always_inline) + constexpr auto operator()(_Fn0 &&__fn0, _Fns &&...__fns) const noexcept -> decltype(auto) + { + if constexpr (__callable<_Fn0, _Args...>) + { + return static_cast<_Fn0 &&>(__fn0); + } + else + { + return (*this)(static_cast<_Fns &&>(__fns)...); + } + } + }; + } // namespace __detail + + //! \brief A callable that wraps a set of functions and calls the first one that is + //! callable with a given set of arguments. + template + struct __first_callable + { + //! \brief Alias for the type of the first function that is callable with a given set of arguments. + template + using __1st_fn_t = + __call_result_t<__detail::__get_1st_fn<_Args...>, __copy_cvref_t<_Self, _Fns>...>; + + //! \brief Calls the first function that is callable with a given set of arguments. + template + requires __callable<__1st_fn_t<_Self, _Args...>, _Args...> + constexpr STDEXEC_EXPLICIT_THIS_BEGIN(auto operator())(this _Self &&__self, _Args &&...__args) + noexcept(__nothrow_callable<__1st_fn_t<_Self, _Args...>, _Args...>) + -> __call_result_t<__1st_fn_t<_Self, _Args...>, _Args...> + { + return __apply(__detail::__get_1st_fn<_Args...>(), + static_cast<_Self &&>(__self).__fns_)(static_cast<_Args &&>(__args)...); + } + STDEXEC_EXPLICIT_THIS_END(operator()) + + __tuple<_Fns...> __fns_; + }; + + template + STDEXEC_HOST_DEVICE_DEDUCTION_GUIDE __first_callable(_Fns...) -> __first_callable<_Fns...>; + + template + struct __back_binder + { + template + requires __callable<_Fn, _Args..., __copy_cvref_t<_Self, _BoundArgs>...> + STDEXEC_ATTRIBUTE(host, device) + constexpr STDEXEC_EXPLICIT_THIS_BEGIN(auto operator())(this _Self &&__self, + _Args &&...__args) // + noexcept(__nothrow_callable<_Fn, _Args..., __copy_cvref_t<_Self, _BoundArgs>...>) // + -> __call_result_t<_Fn, _Args..., __copy_cvref_t<_Self, _BoundArgs>...> + { + return STDEXEC::__apply(static_cast<_Self &&>(__self).__fn_, + static_cast<_Self &&>(__self).__bound_args_, + static_cast<_Args &&>(__args)...); + } + STDEXEC_EXPLICIT_THIS_END(operator()) + + _Fn __fn_; + __tuple<_BoundArgs...> __bound_args_; + }; + + template + constexpr auto __bind_back(_Fn &&__fn, _BoundArgs... __bound_args) + noexcept(__nothrow_move_constructible<_BoundArgs...> && __nothrow_decay_copyable<_Fn>) + { + return __back_binder<__decay_t<_Fn>, _BoundArgs...>{static_cast<_Fn &&>(__fn), + static_cast<_BoundArgs &&>( + __bound_args)...}; + }; } // namespace STDEXEC diff --git a/include/stdexec/stop_token.hpp b/include/stdexec/stop_token.hpp index 6669729c2..15a3decab 100644 --- a/include/stdexec/stop_token.hpp +++ b/include/stdexec/stop_token.hpp @@ -457,6 +457,7 @@ namespace STDEXEC } } // namespace __stok + template struct __forward_stop_request { void operator()() const noexcept @@ -464,8 +465,12 @@ namespace STDEXEC __stop_source_.request_stop(); } - inplace_stop_source& __stop_source_; + _StopSource& __stop_source_; }; + + template + STDEXEC_HOST_DEVICE_DEDUCTION_GUIDE + __forward_stop_request(_StopSource&) -> __forward_stop_request<_StopSource>; } // namespace STDEXEC STDEXEC_PRAGMA_POP()