Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
383 changes: 142 additions & 241 deletions examples/nvexec/maxwell/snr.cuh

Large diffs are not rendered by default.

39 changes: 23 additions & 16 deletions include/nvexec/stream/bulk.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -401,28 +401,35 @@ namespace nv::execution::_strm
template <>
struct transform_sender_for<STDEXEC::bulk_t>
{
template <class Env, class Data, stream_completing_sender<Env> Sender>
template <class Env, class Data, class Sender>
auto operator()(Env const & env, __ignore, Data data, Sender&& sndr) const
{
auto [policy, shape, fun] = static_cast<Data&&>(data);
using shape_t = decltype(shape);
using fun_t = decltype(fun);
auto sched = get_completion_scheduler<set_value_t>(get_env(sndr), env);
if constexpr (__std::same_as<decltype(sched), stream_scheduler>)
if constexpr (stream_completing_sender<Sender, Env>)
{
// Use the bulk sender for a single GPU
using _sender_t = bulk_sender<__decay_t<Sender>, shape_t, fun_t>;
return _sender_t{{}, static_cast<Sender&&>(sndr), shape, static_cast<fun_t&&>(fun)};
auto [policy, shape, fun] = static_cast<Data&&>(data);
using shape_t = decltype(shape);
using fun_t = decltype(fun);
auto sched = get_completion_scheduler<set_value_t>(get_env(sndr), env);
if constexpr (__std::same_as<decltype(sched), stream_scheduler>)
{
// Use the bulk sender for a single GPU
using _sender_t = bulk_sender<__decay_t<Sender>, shape_t, fun_t>;
return _sender_t{{}, static_cast<Sender&&>(sndr), shape, static_cast<fun_t&&>(fun)};
}
else
{
// Use the bulk sender for a multiple GPUs
using _sender_t = multi_gpu_bulk_sender<__decay_t<Sender>, shape_t, fun_t>;
return _sender_t{{},
sched.num_devices_,
static_cast<Sender&&>(sndr),
shape,
static_cast<fun_t&&>(fun)};
}
}
else
{
// Use the bulk sender for a multiple GPUs
using _sender_t = multi_gpu_bulk_sender<__decay_t<Sender>, shape_t, fun_t>;
return _sender_t{{},
sched.num_devices_,
static_cast<Sender&&>(sndr),
shape,
static_cast<fun_t&&>(fun)};
return _strm::_no_stream_scheduler_in_env<STDEXEC::bulk_t, Sender, Env>();
}
}
};
Expand Down
88 changes: 57 additions & 31 deletions include/nvexec/stream/common.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ namespace nv::execution
};

#if defined(__clang__) && defined(__CUDA__) && !defined(STDEXEC_CLANG_TIDY_INVOKED)
__host__ inline auto get_device_type() noexcept -> device_type
inline __host__ auto get_device_type() noexcept -> device_type
{
return device_type::host;
}
Expand All @@ -64,7 +64,7 @@ namespace nv::execution
return device_type::device;
}
#else
__host__ __device__ inline auto get_device_type() noexcept -> device_type
inline __host__ __device__ auto get_device_type() noexcept -> device_type
{
NV_IF_TARGET(NV_IS_HOST, (return device_type::host;), (return device_type::device;));
}
Expand All @@ -75,6 +75,12 @@ namespace nv::execution
return get_device_type() == device_type::device;
}

struct stream_context;
struct stream_domain;

struct CANNOT_DISPATCH_THIS_ALGORITHM_TO_THE_CUDA_STREAM_SCHEDULER;
struct BECAUSE_THERE_IS_NO_CUDA_STREAM_SCHEDULER_IN_THE_ENVIRONMENT;

namespace _strm
{
// Used by stream_domain to late-customize senders for execution
Expand All @@ -84,30 +90,64 @@ namespace nv::execution

template <class Tag>
struct apply_sender_for;

struct context;

template <class Scheduler, class Env>
concept gpu_stream_scheduler =
scheduler<Scheduler>
&& __std::derived_from<__result_of<get_completion_domain<set_value_t>, Scheduler, Env>,
stream_domain>
&& requires(Scheduler sched) {
{ sched.ctx_ } -> __decays_to<context>;
};

template <class Sender, class Env>
concept stream_completing_sender =
sender<Sender>
&& gpu_stream_scheduler<
__result_of<get_completion_scheduler<set_value_t>, env_of_t<Sender>, Env>,
Env>;

template <class Sender, class Env>
concept has_stream_transform =
STDEXEC::__callable<STDEXEC::__structured_apply_t,
transform_sender_for<STDEXEC::tag_of_t<Sender>>,
Sender,
Env const &>;

template <class Sender, class Env>
concept has_nothrow_stream_transform =
STDEXEC::__nothrow_callable<STDEXEC::__structured_apply_t,
transform_sender_for<STDEXEC::tag_of_t<Sender>>,
Sender,
Env const &>;

template <class Tag, class Sender, class Env>
auto _no_stream_scheduler_in_env() noexcept
{
using namespace STDEXEC;
return __not_a_sender<_WHAT_(CANNOT_DISPATCH_THIS_ALGORITHM_TO_THE_CUDA_STREAM_SCHEDULER),
_WHY_(BECAUSE_THERE_IS_NO_CUDA_STREAM_SCHEDULER_IN_THE_ENVIRONMENT),
_WHERE_(_IN_ALGORITHM_, Tag),
_WITH_PRETTY_SENDER_<Sender>,
_WITH_ENVIRONMENT_(Env)>{};
}
} // namespace _strm
} // namespace nv::execution

namespace nvexec = nv::execution;

namespace nv::execution
{
struct stream_context;

// The stream_domain is how the stream scheduler customizes the sender algorithms. All of the
// algorithms use the current scheduler's domain to transform senders before starting them.
struct stream_domain : STDEXEC::default_domain
{
template <::exec::sender_for Sender, class Tag = STDEXEC::tag_of_t<Sender>, class Env>
requires STDEXEC::__callable<STDEXEC::__structured_apply_t,
_strm::transform_sender_for<Tag>,
Sender,
Env const &>
requires _strm::has_stream_transform<Sender, Env>
static auto transform_sender(STDEXEC::set_value_t, Sender&& sndr, Env const & env)
noexcept(STDEXEC::__nothrow_callable<STDEXEC::__structured_apply_t,
_strm::transform_sender_for<Tag>,
Sender,
Env const &>)

noexcept(_strm::has_nothrow_stream_transform<Sender, Env>)
{
return STDEXEC::__structured_apply(_strm::transform_sender_for<Tag>{},
static_cast<Sender&&>(sndr),
Expand Down Expand Up @@ -278,15 +318,6 @@ namespace nv::execution
template <class Sender, class Shape, class Fn>
struct multi_gpu_bulk_sender;

template <class Scheduler, class Env>
concept gpu_stream_scheduler =
scheduler<Scheduler>
&& __std::derived_from<__result_of<get_completion_domain<set_value_t>, Scheduler, Env>,
stream_domain>
&& requires(Scheduler sched) {
{ sched.ctx_ } -> __decays_to<context>;
};

struct stream_sender_base
{
using sender_concept = STDEXEC::sender_t;
Expand Down Expand Up @@ -907,13 +938,6 @@ namespace nv::execution
ctx);
}

template <class Sender, class Env>
concept stream_completing_sender =
sender<Sender>
&& gpu_stream_scheduler<
__result_of<get_completion_scheduler<set_value_t>, env_of_t<Sender>, Env>,
Env>;

template <class InnerReceiverProvider, class OuterReceiver>
using inner_receiver_t = __call_result_t<InnerReceiverProvider, opstate_base<OuterReceiver>&>;

Expand Down Expand Up @@ -957,8 +981,10 @@ namespace nv::execution
inline constexpr _strm::get_stream_t get_stream{};

#if CUDART_VERSION >= 13'00'0
__host__ inline cudaError_t
cudaMemPrefetchAsync(const void* dev_ptr, size_t count, int dst_device, cudaStream_t stream = 0)
inline __host__ cudaError_t cudaMemPrefetchAsync(void const * dev_ptr,
size_t count,
int dst_device,
cudaStream_t stream = 0)
{
return ::cudaMemPrefetchAsync(dev_ptr,
count,
Expand Down
15 changes: 11 additions & 4 deletions include/nvexec/stream/ensure_started.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -409,11 +409,18 @@ namespace nv::execution::_strm
template <class Sender>
using _sender_t = ensure_started_sender<__decay_t<Sender>>;

template <class Env, stream_completing_sender<Env> Sender>
auto operator()(Env const & env, __ignore, __ignore, Sender&& sndr) const -> _sender_t<Sender>
template <class Env, class Sender>
auto operator()(Env const & env, __ignore, __ignore, Sender&& sndr) const
{
auto sched = get_completion_scheduler<set_value_t>(get_env(sndr), env);
return _sender_t<Sender>{sched.ctx_, static_cast<Sender&&>(sndr)};
if constexpr (stream_completing_sender<Sender, Env>)
{
auto sched = get_completion_scheduler<set_value_t>(get_env(sndr), env);
return _sender_t<Sender>{sched.ctx_, static_cast<Sender&&>(sndr)};
}
else
{
return _strm::_no_stream_scheduler_in_env<exec::ensure_started_t, Sender, Env>();
}
}
};
} // namespace nv::execution::_strm
Expand Down
14 changes: 11 additions & 3 deletions include/nvexec/stream/let_xxx.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ namespace nv::execution::_strm
static_cast<CvSender&&>(sndr),
static_cast<Receiver&&>(rcvr),
[this](__ignore) noexcept { return _receiver_t{{}, this}; },
get_completion_scheduler<set_value_t>(get_env(sndr), get_env(rcvr)).ctx_)
get_scheduler(env2).ctx_)
, fun_(static_cast<Fun&&>(fun))
, env2_(env2)
{}
Expand Down Expand Up @@ -308,10 +308,18 @@ namespace nv::execution::_strm
template <class SetTag>
struct _transform_let_sender
{
template <class Env, class Fun, stream_completing_sender<Env> Sender>
template <class Env, class Fun, class Sender>
auto operator()(Env const &, __ignore, Fun fn, Sender&& sndr) const
{
return let_sender{static_cast<Sender&&>(sndr), static_cast<Fun&&>(fn), SetTag{}};
if constexpr (stream_completing_sender<Sender, Env>)
{
return let_sender{static_cast<Sender&&>(sndr), static_cast<Fun&&>(fn), SetTag{}};
}
else
{
using _let_t = decltype(STDEXEC::__let::__let_from_set<SetTag>);
return _strm::_no_stream_scheduler_in_env<_let_t, Sender, Env>();
}
}
};

Expand Down
17 changes: 12 additions & 5 deletions include/nvexec/stream/repeat_n.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,16 @@ namespace nv::execution::_strm
, sched_(std::move(sched))
, count_(count)
{
_connect();
if (count_ != 0)
{
_connect();
}
}

void _connect()
auto& _connect()
{
inner_opstate_.__emplace_from(STDEXEC::connect,
exec::sequence(STDEXEC::schedule(sched_), sndr_),
//STDEXEC::on(sched_, sndr_),
receiver{*this});
}

Expand All @@ -114,8 +116,7 @@ namespace nv::execution::_strm
}
else
{
_connect();
STDEXEC::start(*inner_opstate_);
STDEXEC::start(_connect());
}
}
else
Expand Down Expand Up @@ -167,6 +168,11 @@ namespace nv::execution::_strm
STDEXEC::set_error_t(cudaError_t)>();
}

explicit sender(CvSender&& sndr, std::size_t count)
: sndr_(static_cast<CvSender&&>(sndr))
, count_(count)
{}

template <STDEXEC::receiver Receiver>
auto connect(Receiver rcvr) && -> repeat_n::opstate<CvSender, Receiver>
{
Expand All @@ -186,6 +192,7 @@ namespace nv::execution::_strm
return STDEXEC::get_env(sndr_);
}

private:
CvSender sndr_; // could be a value or a reference
std::size_t count_;
};
Expand Down
14 changes: 1 addition & 13 deletions include/nvexec/stream/schedule_from.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@

namespace nv::execution
{
struct CANNOT_DISPATCH_THE_SCHEDULE_FROM_ALGORITHM_TO_THE_CUDA_STREAM_SCHEDULER;
struct BECAUSE_THERE_IS_NO_CUDA_STREAM_SCHEDULER_IN_THE_ENVIRONMENT;
struct ADD_A_CONTINUES_ON_TRANSITION_TO_THE_CUDA_STREAM_SCHEDULER_BEFORE_THE_SCHEDULE_FROM_ALGORITHM;

namespace _strm
{
namespace _schfr
Expand Down Expand Up @@ -188,15 +184,7 @@ namespace nv::execution
}
else
{
return STDEXEC::__not_a_sender<
STDEXEC::_WHAT_(
CANNOT_DISPATCH_THE_SCHEDULE_FROM_ALGORITHM_TO_THE_CUDA_STREAM_SCHEDULER),
STDEXEC::_WHY_(BECAUSE_THERE_IS_NO_CUDA_STREAM_SCHEDULER_IN_THE_ENVIRONMENT),
STDEXEC::_WHERE_(STDEXEC::_IN_ALGORITHM_, STDEXEC::schedule_from_t),
// STDEXEC::_TO_FIX_THIS_ERROR_(
// ADD_A_CONTINUES_ON_TRANSITION_TO_THE_CUDA_STREAM_SCHEDULER_BEFORE_THE_SCHEDULE_FROM_ALGORITHM),
STDEXEC::_WITH_PRETTY_SENDER_<Sender>,
STDEXEC::_WITH_ENVIRONMENT_(Env)>{};
return _strm::_no_stream_scheduler_in_env<STDEXEC::schedule_from_t, Sender, Env>();
}
}
};
Expand Down
15 changes: 11 additions & 4 deletions include/nvexec/stream/split.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -391,11 +391,18 @@ namespace nv::execution::_strm
template <class Sender>
using _sender_t = split_sender<__decay_t<Sender>>;

template <class Env, stream_completing_sender<Env> Sender>
auto operator()(Env const & env, __ignore, __ignore, Sender&& sndr) const -> _sender_t<Sender>
template <class Env, class Sender>
auto operator()(Env const & env, __ignore, __ignore, Sender&& sndr) const
{
auto sched = get_completion_scheduler<set_value_t>(get_env(sndr), env);
return _sender_t<Sender>{sched.ctx_, static_cast<Sender&&>(sndr)};
if constexpr (stream_completing_sender<Sender, Env>)
{
auto sched = get_completion_scheduler<set_value_t>(get_env(sndr), env);
return _sender_t<Sender>{sched.ctx_, static_cast<Sender&&>(sndr)};
}
else
{
return _strm::_no_stream_scheduler_in_env<exec::split_t, _sender_t<Sender>, Env>();
}
}
};
} // namespace nv::execution::_strm
Expand Down
13 changes: 10 additions & 3 deletions include/nvexec/stream/then.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,18 @@ namespace nv::execution::_strm
template <>
struct transform_sender_for<STDEXEC::then_t>
{
template <class Env, class Fn, stream_completing_sender<Env> CvSender>
template <class Env, class Fn, class CvSender>
auto operator()(Env const &, __ignore, Fn fun, CvSender&& sndr) const
{
using _sender_t = then_sender<__decay_t<CvSender>, Fn>;
return _sender_t{static_cast<CvSender&&>(sndr), static_cast<Fn&&>(fun)};
if constexpr (stream_completing_sender<CvSender, Env>)
{
using _sender_t = then_sender<__decay_t<CvSender>, Fn>;
return _sender_t{static_cast<CvSender&&>(sndr), static_cast<Fn&&>(fun)};
}
else
{
return _strm::_no_stream_scheduler_in_env<STDEXEC::then_t, CvSender, Env>();
}
}
};
} // namespace nv::execution::_strm
Expand Down
13 changes: 10 additions & 3 deletions include/nvexec/stream/upon_error.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,18 @@ namespace nv::execution::_strm
template <>
struct transform_sender_for<STDEXEC::upon_error_t>
{
template <class Env, class Fun, stream_completing_sender<Env> Sender>
template <class Env, class Fun, class Sender>
auto operator()(Env const &, __ignore, Fun fun, Sender&& sndr) const
{
using _sender_t = upon_error_sender<__decay_t<Sender>, Fun>;
return _sender_t{static_cast<Sender&&>(sndr), static_cast<Fun&&>(fun)};
if constexpr (stream_completing_sender<Sender, Env>)
{
using _sender_t = upon_error_sender<__decay_t<Sender>, Fun>;
return _sender_t{static_cast<Sender&&>(sndr), static_cast<Fun&&>(fun)};
}
else
{
return _strm::_no_stream_scheduler_in_env<STDEXEC::upon_error_t, Sender, Env>();
}
}
};
} // namespace nv::execution::_strm
Expand Down
Loading
Loading