From 72e93c8144b54e3d01289f910515ce8a73b8cb54 Mon Sep 17 00:00:00 2001 From: Eric Niebler Date: Thu, 5 Mar 2026 10:11:40 -0800 Subject: [PATCH] change nvexec::let_* to tell the secondary sender where it is executing --- include/nvexec/stream/common.cuh | 200 +++++++++-------- include/nvexec/stream/let_xxx.cuh | 280 ++++++++++++++---------- include/nvexec/stream/repeat_n.cuh | 41 ++-- include/nvexec/stream/schedule_from.cuh | 2 +- include/stdexec/__detail/__meta.hpp | 6 + 5 files changed, 303 insertions(+), 226 deletions(-) diff --git a/include/nvexec/stream/common.cuh b/include/nvexec/stream/common.cuh index ed014a478..0bb263cef 100644 --- a/include/nvexec/stream/common.cuh +++ b/include/nvexec/stream/common.cuh @@ -488,27 +488,31 @@ namespace nv::execution template struct stream_enqueue_receiver { - Env* env_; - Variant* variant_; - queue::task_base* task_; - queue::producer producer_; - - public: using receiver_concept = STDEXEC::receiver_t; + explicit stream_enqueue_receiver(Env const * env, + Variant* variant, + queue::task_base* task, + queue::producer producer) + : env_(env) + , variant_(variant) + , task_(task) + , producer_(producer) + {} + template STDEXEC_ATTRIBUTE(host, device) void set_value(Args&&... args) noexcept { - variant_->template emplace>(set_value_t(), - static_cast( - args)...); + using tuple_t = decayed_tuple_t; + variant_->template emplace(set_value_t(), static_cast(args)...); producer_(task_); } STDEXEC_ATTRIBUTE(host, device) void set_stopped() noexcept { - variant_->template emplace>(set_stopped_t()); + using tuple_t = decayed_tuple_t; + variant_->template emplace(set_stopped_t()); producer_(task_); } @@ -519,14 +523,13 @@ namespace nv::execution if constexpr (__decays_to) { // What is `exception_ptr` but death pending - variant_->template emplace>(STDEXEC::set_error, - cudaErrorUnknown); + using tuple_t = decayed_tuple_t; + variant_->template emplace(STDEXEC::set_error, cudaErrorUnknown); } else { - variant_->template emplace>(set_error_t(), - static_cast( - err)); + using tuple_t = decayed_tuple_t; + variant_->template emplace(set_error_t(), static_cast(err)); } producer_(task_); } @@ -536,15 +539,11 @@ namespace nv::execution return *env_; } - stream_enqueue_receiver(Env* env, - Variant* variant, - queue::task_base* task, - queue::producer producer) - : env_(env) - , variant_(variant) - , task_(task) - , producer_(producer) - {} + private: + Env const * env_; + Variant* variant_; + queue::task_base* task_; + queue::producer producer_; }; template @@ -558,16 +557,10 @@ namespace nv::execution template struct continuation_task : queue::task_base { - Receiver rcvr_; - Variant* variant_; - cudaStream_t stream_{}; - std::pmr::memory_resource* pinned_resource_{}; - cudaError_t status_{cudaSuccess}; - - continuation_task(Receiver rcvr, - Variant* variant, - cudaStream_t stream, - std::pmr::memory_resource* pinned_resource) noexcept + explicit continuation_task(Receiver rcvr, + Variant* variant, + cudaStream_t stream, + std::pmr::memory_resource* pinned_resource) noexcept : rcvr_{rcvr} , variant_{variant} , stream_{stream} @@ -606,6 +599,18 @@ namespace nv::execution status_ = STDEXEC_LOG_CUDA_API(cudaMemsetAsync(this->atom_next_, 0, ptr_size, stream_)); } } + + cudaError_t status() const noexcept + { + return status_; + } + + private: + Receiver rcvr_; + Variant* variant_; + cudaStream_t stream_{}; + std::pmr::memory_resource* pinned_resource_{}; + cudaError_t status_{cudaSuccess}; }; template @@ -695,6 +700,7 @@ namespace nv::execution } } + [[nodiscard]] auto make_env() const noexcept -> env_t { return make_stream_env(get_env(rcvr_), get_stream_provider()); @@ -738,10 +744,12 @@ namespace nv::execution stream_provider stream_provider_; }; - template + template ().make_env())> struct propagate_receiver : stream_receiver_base { - opstate_base& opstate_; + explicit propagate_receiver(OpState& opstate) noexcept + : opstate_(opstate) + {} template void set_value(Args&&... args) noexcept @@ -760,10 +768,14 @@ namespace nv::execution opstate_.propagate_completion_signal(set_stopped_t()); } - auto get_env() const noexcept -> decltype(auto) + [[nodiscard]] + auto get_env() const noexcept -> Env { return opstate_.make_env(); } + + private: + OpState& opstate_; }; template @@ -780,38 +792,6 @@ namespace nv::execution __if_c, InnerReceiver, stream_enqueue_receiver_t>; using inner_opstate_t = connect_result_t; - void start() & noexcept - { - started_.test_and_set(::cuda::std::memory_order::relaxed); - - if (this->stream_provider_.status_ != cudaSuccess) - { - // Couldn't allocate memory for opstate state, complete with error - this->propagate_completion_signal(STDEXEC::set_error, - std::move(this->stream_provider_.status_)); - return; - } - - if constexpr (stream_receiver) - { - if (InnerReceiver::memory_allocation_size()) - { - STDEXEC_TRY - { - this->temp_storage_ = this->ctx_.managed_resource_->allocate( - InnerReceiver::memory_allocation_size()); - } - STDEXEC_CATCH_ALL - { - this->propagate_completion_signal(STDEXEC::set_error, cudaErrorMemoryAllocation); - return; - } - } - } - - STDEXEC::start(inner_op_); - } - template requires stream_sender opstate(CvSender&& sender, @@ -846,7 +826,7 @@ namespace nv::execution { if (this->stream_provider_.status_ == cudaSuccess) { - this->stream_provider_.status_ = task_->status_; + this->stream_provider_.status_ = task_->status(); } } @@ -870,6 +850,39 @@ namespace nv::execution STDEXEC_IMMOVABLE(opstate); + void start() & noexcept + { + started_.test_and_set(::cuda::std::memory_order::relaxed); + + if (this->stream_provider_.status_ != cudaSuccess) + { + // Couldn't allocate memory for opstate state, complete with error + this->propagate_completion_signal(STDEXEC::set_error, + std::move(this->stream_provider_.status_)); + return; + } + + if constexpr (stream_receiver) + { + if (InnerReceiver::memory_allocation_size()) + { + STDEXEC_TRY + { + this->temp_storage_ = this->ctx_.managed_resource_->allocate( + InnerReceiver::memory_allocation_size()); + } + STDEXEC_CATCH_ALL + { + this->propagate_completion_signal(STDEXEC::set_error, cudaErrorMemoryAllocation); + return; + } + } + } + + STDEXEC::start(inner_op_); + } + + private: host_ptr_t storage_; task_t* task_{}; ::cuda::std::atomic_flag started_{}; @@ -880,26 +893,26 @@ namespace nv::execution template requires stream_receiver using exit_opstate_t = - _strm::opstate, OuterReceiver>; + _strm::opstate>, OuterReceiver>; - template - auto exit_opstate(Sender&& sndr, OuterReceiver rcvr, context ctx) noexcept - -> exit_opstate_t + template + auto exit_opstate(CvSender&& sndr, OuterReceiver rcvr, context ctx) noexcept + -> exit_opstate_t { - return exit_opstate_t( - static_cast(sndr), + return exit_opstate_t( + static_cast(sndr), static_cast(rcvr), - [](opstate_base& op) -> propagate_receiver - { return propagate_receiver{{}, op}; }, + [](opstate_base& op) noexcept + { return propagate_receiver>(op); }, ctx); } - template + template concept stream_completing_sender = sender && gpu_stream_scheduler< - __result_of, env_of_t, E>, - E>; + __result_of, env_of_t, Env>, + Env>; template using inner_receiver_t = __call_result_t&>; @@ -907,33 +920,34 @@ namespace nv::execution template using stream_opstate_t = _strm::opstate; - template - requires stream_completing_sender> - auto - stream_opstate(Sender&& sndr, OuterReceiver&& out_receiver, ReceiverProvider receiver_provider) - -> stream_opstate_t, OuterReceiver> + template + requires stream_completing_sender> + auto stream_opstate(CvSender&& sndr, + OuterReceiver&& out_receiver, + ReceiverProvider receiver_provider) + -> stream_opstate_t, OuterReceiver> { auto sch = get_completion_scheduler(get_env(sndr), get_env(out_receiver)); context ctx = sch.ctx_; - return stream_opstate_t, - OuterReceiver>(static_cast(sndr), + OuterReceiver>(static_cast(sndr), static_cast(out_receiver), receiver_provider, ctx); } - template - auto stream_opstate(Sender&& sndr, + template + auto stream_opstate(CvSender&& sndr, OuterReceiver&& out_receiver, ReceiverProvider receiver_provider, context ctx) - -> stream_opstate_t, OuterReceiver> + -> stream_opstate_t, OuterReceiver> { - return stream_opstate_t, - OuterReceiver>(static_cast(sndr), + OuterReceiver>(static_cast(sndr), static_cast(out_receiver), receiver_provider, ctx); diff --git a/include/nvexec/stream/let_xxx.cuh b/include/nvexec/stream/let_xxx.cuh index 37b8d62b8..5f567ec5e 100644 --- a/include/nvexec/stream/let_xxx.cuh +++ b/include/nvexec/stream/let_xxx.cuh @@ -18,6 +18,7 @@ #pragma once +#include "../../stdexec/__detail/__variant.hpp" #include "../../stdexec/execution.hpp" #include "common.cuh" @@ -31,117 +32,135 @@ STDEXEC_PRAGMA_IGNORE_EDG(cuda_compile) namespace nv::execution::_strm { - namespace let_xxx + namespace _let { using namespace STDEXEC; template STDEXEC_ATTRIBUTE(launch_bounds(1)) - __global__ void _let_xxx_kernel(Fun fn, ResultSenderT* result, Args... args) + __global__ void _let_kernel(Fun fn, ResultSenderT* result, Args... args) { static_assert(trivially_copyable); new (result) ResultSenderT(::cuda::std::move(fn)(static_cast(args)...)); } - template - using __decay_ref_t = STDEXEC::__decay_t&; + template + struct _opstate; template - struct _mk_result_sender + struct _mk_result_sender_fn { template - using __f = __remove_rvalue_reference_t<__call_result_t...>>; + using __f = __remove_rvalue_reference_t<__call_result_t&...>>; }; template requires sender_in> - struct __max_sender_size + struct _max_sender_size { - struct _result_sender_size - { - template - using __f = __msize_t, Args...>)>; - }; - - static constexpr std::size_t value = __gather_completions_of_t, - _result_sender_size, - maxsize>::value; + using _env_t = env_of_t; + using _result_sender_size = __mcompose<__msizeof, _mk_result_sender_fn>; + using _value_t = + __gather_completions_of_t; + + static constexpr std::size_t value = _value_t::value; + }; + + // The environment of the receiver used to connect the secondary (result) sender must + // correctly report the scheduler and domain on which the sender's operation will be + // started. + inline constexpr auto _mk_sch_env = + [](CvSender&& sndr, Receiver&& rcvr, SetTag) + { + using cv_fn = __copy_cvref_fn; + return __mk_secondary_env_t()(cv_fn{}, sndr, STDEXEC::get_env(rcvr)); + }; + + template + using _sch_env_t = __result_of<_mk_sch_env, CvSender, Receiver, SetTag>; + + inline constexpr auto _mk_env2 = + []([[maybe_unused]] + SchEnv const & sch_env, + _strm::opstate_base const & opstate) + { + //return opstate.make_env(); + return __env::__join(sch_env, opstate.make_env()); }; - template - using opstate_for_t = __mcompose<__mbind_back_q>, - _mk_result_sender>; + template + using _env2_t = __result_of<_mk_env2, + _sch_env_t const &, + _strm::opstate_base const &>; + + template + using _propagate_receiver_t = propagate_receiver<_opstate, + _env2_t>; - template - struct __tfx_signal_fn + template + using _mk_opstate_fn = __mcompose< + __mbind_back_q>, + _mk_result_sender_fn>; + + template + struct _tfx_signal_fn { template using __f = completion_signatures; }; - template - struct __tfx_signal_fn + template + struct _tfx_signal_fn { template using __f = __transform_completion_signatures_t< - __completion_signatures_of_t<__minvoke<_mk_result_sender, Args...>, StreamEnv...>, + __completion_signatures_of_t<__minvoke<_mk_result_sender_fn, Args...>, StreamEnv...>, completion_signatures>; }; - template - using __tfx_signal_t = __minvoke<__tfx_signal_fn, Fun, StreamEnv...>; - - template - struct opstate; + template + using _tfx_signal_t = __minvoke<_tfx_signal_fn, Fun, StreamEnv...>; - template - struct receiver : public stream_receiver_base + template + struct _receiver : public stream_receiver_base { - using env_t = _strm::opstate_base::env_t; + using _env_t = _strm::opstate_base::env_t; + using _result_tuples_t = __mlist; + static constexpr std::size_t memory_allocation_size() noexcept { - return __max_sender_size, Fun, Set>::value; + using _propagate_receiver_t = _let::_propagate_receiver_t; + return _max_sender_size::value; } - template <__same_as Tag, class... Args> + template <__same_as Tag, class... Args> void _complete(Tag, Args&&... args) noexcept { - using result_sender_t = __minvoke<_mk_result_sender, Args...>; - using opstate_t = __minvoke, Args...>; + using result_sender_t = __minvoke<_mk_result_sender_fn, Args...>; cudaStream_t stream = opstate_->get_stream(); auto* result_sender = static_cast(opstate_->temp_storage_); - _let_xxx_kernel<<<1, 1, 0, stream>>>(std::move(opstate_->fun_), - result_sender, - static_cast(args)...); + _let_kernel<<<1, 1, 0, stream>>>(std::move(opstate_->fun_), + result_sender, + static_cast(args)...); - if (cudaError_t status = STDEXEC_LOG_CUDA_API(cudaStreamSynchronize(stream)); - status == cudaSuccess) + cudaError_t status = STDEXEC_LOG_CUDA_API(cudaStreamSynchronize(stream)); + if (status == cudaSuccess) { opstate_->defer_temp_storage_destruction(result_sender); - auto& op = opstate_->opstate3_.template emplace(__emplace_from{ - [&] - { - return connect(std::move(*result_sender), - propagate_receiver{ - {}, - static_cast<_strm::opstate_base&>(*opstate_)}); - }}); + auto& op = opstate_->_connect_result_sender(std::move(*result_sender)); STDEXEC::start(op); } else { - opstate_->propagate_completion_signal(STDEXEC::set_error, std::move(status)); + opstate_->propagate_completion_signal(STDEXEC::set_error, cudaError_t(status)); } } - template - void _complete(Tag, Args&&... args) noexcept + template + void _complete(SetTag2, Args&&... args) noexcept { - static_assert(__nothrow_callable); - opstate_->propagate_completion_signal(Tag(), static_cast(args)...); + opstate_->propagate_completion_signal(SetTag2(), static_cast(args)...); } template @@ -161,116 +180,151 @@ namespace nv::execution::_strm _complete(set_stopped_t()); } - auto get_env() const noexcept -> env_t + auto get_env() const noexcept -> _env_t { - return opstate_->make_env(); + return static_cast<_strm::opstate_base&>(*opstate_).make_env(); } - using opstate_variant_t = __minvoke< - __mtransform<__muncurry>, __qq<__nullable_std_variant>>, - Tuples...>; - - opstate* opstate_; + _opstate* opstate_; }; - template - using receiver_t = - __gather_completions_of_t>, - __q<__decayed_std_tuple>, - __munique<__mbind_front_q>>; + template + using _receiver_t = __gather_completions_of_t< + SetTag, + Sender, + stream_env_t>, + __q<__decayed_std_tuple>, + __munique<__mbind_front_q<_receiver, Sender, Receiver, Fun, SetTag>>>; - template - using opstate_base_t = _strm::opstate, Receiver>; + template + using _opstate_base_t = + _strm::opstate, Receiver>; - template - struct opstate : opstate_base_t + template + struct _opstate : _opstate_base_t { - using receiver_t = receiver_t; - using opstate_variant_t = receiver_t::opstate_variant_t; + using _env2_t = _sch_env_t; + using _receiver_t = _let::_receiver_t; + using _result_tuples_t = _receiver_t::_result_tuples_t; + using _mk_opstate_fn_t = _mk_opstate_fn; + using _mk_opstate_variant_fn = __mtransform<__muncurry<_mk_opstate_fn_t>, __qq<__variant>>; + using _opstate_variant_t = __mapply<_mk_opstate_variant_fn, _result_tuples_t>; + using _propagate_receiver_t = _let::_propagate_receiver_t; + + explicit _opstate(CvSender&& sndr, Receiver rcvr, Fun fun) + : _opstate(static_cast(sndr), + static_cast(rcvr), + static_cast(fun), + _mk_sch_env(sndr, rcvr, SetTag{})) + {} - opstate(Sender&& sndr, Receiver rcvr, Fun fun) - : opstate_base_t( - static_cast(sndr), + explicit _opstate(CvSender&& sndr, Receiver&& rcvr, Fun fun, _env2_t env2) + : _opstate_base_t( + static_cast(sndr), static_cast(rcvr), - [this](_strm::opstate_base&) -> receiver_t { return receiver_t{{}, this}; }, + [this](__ignore) noexcept { return _receiver_t{{}, this}; }, get_completion_scheduler(get_env(sndr), get_env(rcvr)).ctx_) , fun_(static_cast(fun)) + , env2_(env2) {} - STDEXEC_IMMOVABLE(opstate); + STDEXEC_IMMOVABLE(_opstate); - Fun fun_; - opstate_variant_t opstate3_; + [[nodiscard]] + auto make_env() const noexcept -> _let::_env2_t + { + return _let::_mk_env2(env2_, *this); + } + + template + auto _connect_result_sender(ResultSender&& sndr) + -> connect_result_t& + { + return opstate3_.__emplace_from(STDEXEC::connect, + static_cast(sndr), + _propagate_receiver_t(*this)); + } + + Fun fun_; + _env2_t env2_; + _opstate_variant_t opstate3_{__no_init}; }; - } // namespace let_xxx + } // namespace _let - template + template struct let_sender : public stream_sender_base { + private: template - using opstate_t = let_xxx::opstate<__copy_cvref_t, Receiver, Fun, Set>; + using _opstate_t = _let::_opstate<__copy_cvref_t, Receiver, Fun, SetTag>; template - using _receiver_t = let_xxx::receiver_t<__copy_cvref_t, Receiver, Fun, Set>; + using _receiver_t = _let::_receiver_t<__copy_cvref_t, Receiver, Fun, SetTag>; template using _completions_t = - __mapply<__mtransform<__mbind_back_q, + __mapply<__mtransform<__mbind_back_q<_let::_tfx_signal_t, Fun, SetTag, StreamEnv...>, __mtry_q<__concat_completion_signatures_t>>, __completion_signatures_of_t>; - template <__decays_to Self, receiver Receiver> - requires receiver_of< - Receiver, - _completions_t<__copy_cvref_t, stream_env_t>>> - STDEXEC_EXPLICIT_THIS_BEGIN(auto connect)(this Self&& self, Receiver rcvr) - -> opstate_t - { - return opstate_t{static_cast(self).sndr_, - static_cast(rcvr), - static_cast(self).fun_}; - } - STDEXEC_EXPLICIT_THIS_END(connect) + public: + explicit let_sender(Sender sndr, Fun fun, SetTag) + noexcept(__nothrow_move_constructible) + : sndr_(static_cast(sndr)) + , fun_(static_cast(fun)) + {} + [[nodiscard]] auto get_env() const noexcept -> stream_sender_attrs { return {&sndr_}; } - template <__decays_to Self, class... Env> + template static consteval auto get_completion_signatures() - -> _completions_t<__copy_cvref_t, stream_env_t...> { - return {}; + return _completions_t<__copy_cvref_t, stream_env_t...>{}; + } + + template + STDEXEC_EXPLICIT_THIS_BEGIN(auto connect)(this Self&& self, Receiver rcvr) + -> _opstate_t + { + return _opstate_t{static_cast(self).sndr_, + static_cast(rcvr), + static_cast(self).fun_}; } + STDEXEC_EXPLICIT_THIS_END(connect) + private: Sender sndr_; Fun fun_; }; - template - struct _transform_let_xxx_sender + template + STDEXEC_HOST_DEVICE_DEDUCTION_GUIDE + let_sender(Sender, Fun, SetTag) -> let_sender; + + template + struct _transform_let_sender { template Sender> auto operator()(Env const &, __ignore, Fun fn, Sender&& sndr) const { - using __sender_t = let_sender<__decay_t, Fun, Set>; - return __sender_t{{}, static_cast(sndr), static_cast(fn)}; + return let_sender{static_cast(sndr), static_cast(fn), SetTag{}}; } }; template <> - struct transform_sender_for : _transform_let_xxx_sender + struct transform_sender_for : _transform_let_sender {}; template <> - struct transform_sender_for : _transform_let_xxx_sender + struct transform_sender_for : _transform_let_sender {}; template <> - struct transform_sender_for : _transform_let_xxx_sender + struct transform_sender_for : _transform_let_sender {}; } // namespace nv::execution::_strm @@ -278,9 +332,9 @@ namespace nvexec = nv::execution; namespace STDEXEC::__detail { - template - extern __declfn_t, Fun, Set>> - __demangle_v>; + template + extern __declfn_t, Fun, SetTag>> + __demangle_v>; } // namespace STDEXEC::__detail STDEXEC_PRAGMA_POP() diff --git a/include/nvexec/stream/repeat_n.cuh b/include/nvexec/stream/repeat_n.cuh index e75f4894f..3ecbc01e3 100644 --- a/include/nvexec/stream/repeat_n.cuh +++ b/include/nvexec/stream/repeat_n.cuh @@ -34,7 +34,7 @@ namespace nv::execution::_strm template struct receiver : stream_receiver_base { - receiver(OpState& opstate) noexcept + explicit receiver(OpState& opstate) noexcept : opstate_(opstate) {} @@ -54,6 +54,7 @@ namespace nv::execution::_strm opstate_._complete(STDEXEC::set_stopped); } + [[nodiscard]] auto get_env() const noexcept -> OpState::env_t { return opstate_.make_env(); @@ -62,24 +63,23 @@ namespace nv::execution::_strm OpState& opstate_; }; - template + template struct opstate : _strm::opstate_base { using operation_state_concept = STDEXEC::operation_state_t; - using sender_t = STDEXEC::__decay_t; using scheduler_t = STDEXEC::__result_of, - STDEXEC::env_of_t, + STDEXEC::env_of_t, STDEXEC::env_of_t>; using inner_sender_t = - STDEXEC::__result_of, sender_t&>; + STDEXEC::__result_of, Sender&>; using inner_opstate_t = STDEXEC::connect_result_t>; - explicit opstate(CvSender&& sndr, Receiver rcvr, std::size_t count, scheduler_t sched) + explicit opstate(Sender&& sndr, Receiver rcvr, std::size_t count, scheduler_t sched) : _strm::opstate_base(static_cast(rcvr), sched.ctx_) - , sndr_(static_cast(sndr)) + , sndr_(static_cast(sndr)) , sched_(std::move(sched)) , count_(count) { @@ -90,6 +90,7 @@ namespace nv::execution::_strm { inner_opstate_.__emplace_from(STDEXEC::connect, exec::sequence(STDEXEC::schedule(sched_), sndr_), + //STDEXEC::on(sched_, sndr_), receiver{*this}); } @@ -146,7 +147,7 @@ namespace nv::execution::_strm } } - sender_t sndr_; + Sender sndr_; scheduler_t sched_; STDEXEC::__optional inner_opstate_; std::size_t count_{}; @@ -157,19 +158,22 @@ namespace nv::execution::_strm { using sender_concept = STDEXEC::sender_t; - using completion_signatures = - STDEXEC::completion_signatures; + template + static consteval auto get_completion_signatures() + { + return STDEXEC::completion_signatures(); + } template - auto connect(Receiver rcvr) && // - -> repeat_n::opstate + auto connect(Receiver rcvr) && -> repeat_n::opstate { auto sched = STDEXEC::get_completion_scheduler(STDEXEC::get_env(sndr_), STDEXEC::get_env(rcvr)); + return repeat_n::opstate(static_cast(sndr_), static_cast(rcvr), count_, @@ -190,11 +194,10 @@ namespace nv::execution::_strm template <> struct transform_sender_for { - template - auto operator()(Env const &, STDEXEC::__ignore, size_t count, CvSender&& sndr) const + template + auto operator()(Env const &, STDEXEC::__ignore, size_t count, Sender sndr) const { - using sender_t = repeat_n::sender; - return sender_t{static_cast(sndr), count}; + return repeat_n::sender{static_cast(sndr), count}; } }; } // namespace nv::execution::_strm diff --git a/include/nvexec/stream/schedule_from.cuh b/include/nvexec/stream/schedule_from.cuh index 10e23a29e..b4792247c 100644 --- a/include/nvexec/stream/schedule_from.cuh +++ b/include/nvexec/stream/schedule_from.cuh @@ -93,7 +93,7 @@ namespace nv::execution { if (this->status_ == cudaSuccess) { - this->status_ = task_->status_; + this->status_ = task_->status(); } } diff --git a/include/stdexec/__detail/__meta.hpp b/include/stdexec/__detail/__meta.hpp index 8f2424483..81748771d 100644 --- a/include/stdexec/__detail/__meta.hpp +++ b/include/stdexec/__detail/__meta.hpp @@ -681,6 +681,12 @@ namespace STDEXEC using __f = __msize_t; }; + struct __msizeof + { + template + using __f = __msize_t; + }; + template struct __mcount {