diff --git a/builtin-functions/kphp-light/stdlib/rpc.txt b/builtin-functions/kphp-light/stdlib/rpc.txt index c88b464163..c7ccf4a7a0 100644 --- a/builtin-functions/kphp-light/stdlib/rpc.txt +++ b/builtin-functions/kphp-light/stdlib/rpc.txt @@ -40,7 +40,6 @@ final class KphpRpcRequestsExtraInfo { public function get (); } -/** @kphp-extern-func-info interruptible */ function rpc_send_requests($actor ::: string, $arr ::: array, $timeout ::: ?float, @@ -48,7 +47,7 @@ function rpc_send_requests($actor ::: string, \KphpRpcRequestsExtraInfo $requests_extra_info, $need_responses_extra_info ::: bool) ::: int[]; -/** @kphp-extern-func-info tl_common_h_dep interruptible */ +/** @kphp-extern-func-info tl_common_h_dep */ function rpc_send_typed_query_requests($actor ::: string, @tl\RpcFunction[] $query_functions, $timeout ::: ?float, $ignore_answer ::: bool, diff --git a/runtime-light/k2-platform/k2-api.h b/runtime-light/k2-platform/k2-api.h index 617dad5e3f..2f6233646d 100644 --- a/runtime-light/k2-platform/k2-api.h +++ b/runtime-light/k2-platform/k2-api.h @@ -210,6 +210,30 @@ inline int32_t component_access(std::string_view component_name) noexcept { return k2_component_access(component_name.size(), component_name.data()); } +inline std::expected rpc_send_request(std::string_view actor_name, std::span request_buffer) noexcept { + uint64_t rpc_d{}; + if (auto error_code{k2_rpc_send(actor_name.data(), actor_name.size(), request_buffer.data(), request_buffer.size(), RpcKind::TL_RPC, std::addressof(rpc_d))}; + error_code != k2::errno_ok) { + return std::unexpected{error_code}; + } + return {rpc_d}; +} + +inline std::expected rpc_get_response_size(uint64_t rpc_d) noexcept { + size_t size{}; + if (auto error_code{k2_rpc_get_response_size(rpc_d, std::addressof(size))}; error_code != k2::errno_ok) { + return std::unexpected{error_code}; + } + return {size}; +} + +inline std::expected rpc_fetch_response(uint64_t rpc_d, std::span buffer) noexcept { + if (auto error_code{k2_rpc_fetch_response(rpc_d, buffer.data(), buffer.size())}; error_code != errno_ok) { + return std::unexpected{error_code}; + } + return {}; +} + inline void stream_status(k2::descriptor descriptor, StreamStatus* status) noexcept { k2_stream_status(descriptor, status); } diff --git a/runtime-light/k2-platform/k2-header.h b/runtime-light/k2-platform/k2-header.h index 25b8789f7f..b43b38bbc8 100644 --- a/runtime-light/k2-platform/k2-header.h +++ b/runtime-light/k2-platform/k2-header.h @@ -97,6 +97,10 @@ enum UpdateStatus { NewDescriptor = 2, }; +enum RpcKind { + TL_RPC = 0, +}; + struct ImageInfo { // Base const char* image_name; @@ -316,6 +320,42 @@ int32_t k2_unlink(const char* path, size_t path_len); */ int32_t k2_component_access(size_t name_len, const char* name); +/** + * Try to send rpc request. In case of success write descriptor of rpc request to `rpc_d`, otherwise return `errno` != 0, + * which should be later used to call `k2_rpc_fetch_response`. + * + * @return return `0` on success. libc-like `errno` otherwise + * + * Possible `errno` values: + * `EAI_MEMORY` => max descriptors count achieved + * `EINVAL` => invalid `actor_name` or request, or connection pool is empty for this actor. + */ +int32_t k2_rpc_send(const char* actor_name, size_t actor_name_len, const void* request_ptr, size_t request_size, enum RpcKind rpc_kind, uint64_t* rpc_d); + +/** + * Get response size for corresponding request of this `rpc_d`. Write 0 to `response_size` and return `EAGAIN` if response is not ready. + * Write response size value to `response_size` if response is ready. + * + * @return return `0` on success. libc-like `errno` otherwise + * + * Possible `errno` values: + * `EINVAL` => invalid `rpc_d` descriptor, for example, it is unknown descriptor, or not rpc descriptor. + * `EAGAIN` => response is not ready yet. + */ +int32_t k2_rpc_get_response_size(uint64_t rpc_d, size_t* response_size); + +/** + * Write response for corresponding request of this `rpc_d` to `buf`. Return `EAGAIN` if response is not ready. + * If `buf_size` < response size, then write first `buf_size` bytes of response to `buf`. + * + * @return return `0` on success. libc-like `errno` otherwise + * + * Possible `errno` values: + * `EINVAL` => invalid `rpc_d` descriptor, for example, it is unknown descriptor, or not rpc descriptor. + * `EAGAIN` => response is not ready yet. + */ +int32_t k2_rpc_fetch_response(uint64_t rpc_d, void* buf, size_t buf_size); + /** * If the write or read status is `Blocked` - then the platform ensures that * the component receives this `stream_d` via `k2_take_update` when the status is diff --git a/runtime-light/stdlib/rpc/rpc-api.cpp b/runtime-light/stdlib/rpc/rpc-api.cpp index d881fa3c11..d9fc6430d3 100644 --- a/runtime-light/stdlib/rpc/rpc-api.cpp +++ b/runtime-light/stdlib/rpc/rpc-api.cpp @@ -34,8 +34,10 @@ #include "runtime-light/stdlib/rpc/rpc-constants.h" #include "runtime-light/stdlib/rpc/rpc-extra-headers.h" #include "runtime-light/stdlib/rpc/rpc-extra-info.h" +#include "runtime-light/stdlib/rpc/rpc-query-handle.h" #include "runtime-light/stdlib/rpc/rpc-tl-error.h" #include "runtime-light/stdlib/rpc/rpc-tl-query.h" +#include "runtime-light/stdlib/time/util.h" #include "runtime-light/streams/read-ext.h" #include "runtime-light/streams/stream.h" #include "runtime-light/tl/tl-core.h" @@ -111,42 +113,42 @@ class_instance store_function(const mixed& tl_object) noexcept { return rpc_tl_query; } -kphp::coro::task rpc_tl_query_one_impl(std::string_view actor, mixed tl_object, std::optional opt_timeout, - bool collect_resp_extra_info, bool ignore_answer) noexcept { +kphp::rpc::query_info rpc_tl_query_one_impl(std::string_view actor, mixed tl_object, std::optional opt_timeout, bool collect_resp_extra_info, + bool ignore_answer) noexcept { if (!tl_object.is_array()) [[unlikely]] { kphp::log::warning("not an array passed to function rpc_tl_query"); - co_return kphp::rpc::query_info{}; + return kphp::rpc::query_info{}; } f$rpc_clean(); auto rpc_tl_query{store_function(tl_object)}; // THROWING // handle exceptions that could arise during store_function if (!TlRpcError::transform_exception_into_error_if_possible().empty() || rpc_tl_query.is_null()) [[unlikely]] { - co_return kphp::rpc::query_info{}; + return kphp::rpc::query_info{}; } - const auto query_info{co_await kphp::rpc::send_request(actor, opt_timeout, ignore_answer, collect_resp_extra_info)}; + const auto query_info{kphp::rpc::send_request(actor, opt_timeout, ignore_answer, collect_resp_extra_info)}; if (!ignore_answer) { RpcClientInstanceState::get().response_fetcher_instances.emplace(query_info.id, std::move(rpc_tl_query)); } - co_return query_info; + return query_info; } -kphp::coro::task typed_rpc_tl_query_one_impl(std::string_view actor, const RpcRequest& rpc_request, std::optional opt_timeout, - bool collect_responses_extra_info, bool ignore_answer) noexcept { +kphp::rpc::query_info typed_rpc_tl_query_one_impl(std::string_view actor, const RpcRequest& rpc_request, std::optional opt_timeout, + bool collect_responses_extra_info, bool ignore_answer) noexcept { if (rpc_request.empty()) [[unlikely]] { kphp::log::warning("query function is null"); - co_return kphp::rpc::query_info{}; + return kphp::rpc::query_info{}; } f$rpc_clean(); auto fetcher{rpc_request.store_request()}; // THROWING // handle exceptions that could arise during store_request if (!TlRpcError::transform_exception_into_error_if_possible().empty() || !static_cast(fetcher)) [[unlikely]] { - co_return kphp::rpc::query_info{}; + return kphp::rpc::query_info{}; } - const auto query_info{co_await kphp::rpc::send_request(actor, opt_timeout, ignore_answer, collect_responses_extra_info)}; + const auto query_info{kphp::rpc::send_request(actor, opt_timeout, ignore_answer, collect_responses_extra_info)}; if (!ignore_answer) { auto rpc_tl_query{make_instance()}; rpc_tl_query.get()->result_fetcher = std::move(fetcher); @@ -154,7 +156,7 @@ kphp::coro::task typed_rpc_tl_query_one_impl(std::string_ RpcClientInstanceState::get().response_fetcher_instances.emplace(query_info.id, std::move(rpc_tl_query)); } - co_return query_info; + return query_info; } kphp::coro::task> rpc_tl_query_result_one_impl(int64_t query_id) noexcept { @@ -164,26 +166,26 @@ kphp::coro::task> rpc_tl_query_result_one_impl(int64_t query_id) no auto& rpc_client_instance_st{RpcClientInstanceState::get()}; class_instance rpc_query{}; - std::optional>> opt_awaiter_task{}; + std::optional opt_rpc_request_handle{}; { const auto it_response_fetcher{rpc_client_instance_st.response_fetcher_instances.find(query_id)}; - const auto it_fork_task{rpc_client_instance_st.response_awaiter_tasks.find(query_id)}; - const vk::final_action finalizer{[&rpc_client_instance_st, it_response_fetcher, it_fork_task] noexcept { + const auto it_rpc_request_handle{rpc_client_instance_st.rpc_query_handles.find(query_id)}; + const vk::final_action finalizer{[&rpc_client_instance_st, it_response_fetcher, it_rpc_request_handle] noexcept { if (it_response_fetcher != rpc_client_instance_st.response_fetcher_instances.end()) [[likely]] { rpc_client_instance_st.response_fetcher_instances.erase(it_response_fetcher); } - if (it_fork_task != rpc_client_instance_st.response_awaiter_tasks.end()) [[likely]] { - rpc_client_instance_st.response_awaiter_tasks.erase(it_fork_task); + if (it_rpc_request_handle != rpc_client_instance_st.rpc_query_handles.end()) [[likely]] { + rpc_client_instance_st.rpc_query_handles.erase(it_rpc_request_handle); } }}; - if (it_response_fetcher == rpc_client_instance_st.response_fetcher_instances.end() || it_fork_task == rpc_client_instance_st.response_awaiter_tasks.end()) - [[unlikely]] { + if (it_response_fetcher == rpc_client_instance_st.response_fetcher_instances.end() || + it_rpc_request_handle == rpc_client_instance_st.rpc_query_handles.end()) [[unlikely]] { co_return TlRpcError::make_error(TL_ERROR_INTERNAL, string{"unexpectedly could not find query in pending queries"}); } rpc_query = std::move(it_response_fetcher->second); - opt_awaiter_task.emplace(std::move(it_fork_task->second)); + opt_rpc_request_handle.emplace(std::move(it_rpc_request_handle->second)); } if (rpc_query.is_null()) [[unlikely]] { @@ -196,13 +198,14 @@ kphp::coro::task> rpc_tl_query_result_one_impl(int64_t query_id) no co_return TlRpcError::make_error(TL_ERROR_INTERNAL, string{"can't get untyped result from typed TL query. Use consistent API for that"}); } - kphp::log::assertion(opt_awaiter_task.has_value()); - auto opt_response{co_await kphp::forks::id_managed(*std::exchange(opt_awaiter_task, std::nullopt))}; - if (!opt_response) [[unlikely]] { - co_return TlRpcError::make_error(TL_ERROR_QUERY_TIMEOUT, string{"rpc response timeout"}); + kphp::log::assertion(opt_rpc_request_handle.has_value()); + auto response_expected{co_await opt_rpc_request_handle->get_response()}; + if (!response_expected) [[unlikely]] { + std::pair error{std::move(response_expected.error())}; + co_return TlRpcError::make_error(error.first, std::move(error.second)); } - auto response{*std::move(opt_response)}; // don't check response's emptyness; will throw if it's empty, indicating a fetch error + auto response{*std::move(response_expected)}; // don't check response's emptiness; will throw if it's empty, indicating a fetch error f$rpc_clean(); RpcServerInstanceState::get().tl_fetcher = tl::fetcher{{reinterpret_cast(response.c_str()), response.size()}}; @@ -221,26 +224,26 @@ kphp::coro::task> typed_rpc_tl_query_result_ auto& rpc_client_instance_st{RpcClientInstanceState::get()}; class_instance rpc_query{}; - std::optional>> opt_awaiter_task{}; + std::optional opt_rpc_request_handle{}; { const auto it_response_fetcher{rpc_client_instance_st.response_fetcher_instances.find(query_id)}; - const auto it_fork_task{rpc_client_instance_st.response_awaiter_tasks.find(query_id)}; - const vk::final_action finalizer{[&rpc_client_instance_st, it_response_fetcher, it_fork_task] noexcept { + const auto it_rpc_request_handle{rpc_client_instance_st.rpc_query_handles.find(query_id)}; + const vk::final_action finalizer{[&rpc_client_instance_st, it_response_fetcher, it_rpc_request_handle] noexcept { if (it_response_fetcher != rpc_client_instance_st.response_fetcher_instances.end()) [[likely]] { rpc_client_instance_st.response_fetcher_instances.erase(it_response_fetcher); } - if (it_fork_task != rpc_client_instance_st.response_awaiter_tasks.end()) [[likely]] { - rpc_client_instance_st.response_awaiter_tasks.erase(it_fork_task); + if (it_rpc_request_handle != rpc_client_instance_st.rpc_query_handles.end()) [[likely]] { + rpc_client_instance_st.rpc_query_handles.erase(it_rpc_request_handle); } }}; - if (it_response_fetcher == rpc_client_instance_st.response_fetcher_instances.end() || it_fork_task == rpc_client_instance_st.response_awaiter_tasks.end()) - [[unlikely]] { + if (it_response_fetcher == rpc_client_instance_st.response_fetcher_instances.end() || + it_rpc_request_handle == rpc_client_instance_st.rpc_query_handles.end()) [[unlikely]] { co_return error_factory.make_error(TL_ERROR_INTERNAL, string{"unexpectedly could not find query in pending queries"}); } rpc_query = std::move(it_response_fetcher->second); - opt_awaiter_task.emplace(std::move(it_fork_task->second)); + opt_rpc_request_handle.emplace(std::move(it_rpc_request_handle->second)); } if (rpc_query.is_null()) [[unlikely]] { @@ -253,13 +256,14 @@ kphp::coro::task> typed_rpc_tl_query_result_ co_return error_factory.make_error(TL_ERROR_INTERNAL, string{"can't get typed result from untyped TL query. Use consistent API for that"}); } - kphp::log::assertion(opt_awaiter_task.has_value()); - auto opt_response{co_await kphp::forks::id_managed(*std::exchange(opt_awaiter_task, std::nullopt))}; - if (!opt_response) [[unlikely]] { - co_return error_factory.make_error(TL_ERROR_QUERY_TIMEOUT, string{"rpc response timeout"}); + kphp::log::assertion(opt_rpc_request_handle.has_value()); + auto response_expected{co_await opt_rpc_request_handle->get_response()}; + if (!response_expected) [[unlikely]] { + std::pair error{std::move(response_expected.error())}; + co_return error_factory.make_error(error.first, std::move(error.second)); } - auto response{*std::move(opt_response)}; // don't check response's emptyness; will throw if it's empty, indicating a fetch error + auto response{*std::move(response_expected)}; // don't check response's emptiness; will throw if it's empty, indicating a fetch error f$rpc_clean(); RpcServerInstanceState::get().tl_fetcher = tl::fetcher{{reinterpret_cast(response.c_str()), response.size()}}; @@ -273,82 +277,23 @@ kphp::coro::task> typed_rpc_tl_query_result_ } // namespace detail -kphp::coro::task send_request(std::string_view actor, std::optional opt_timeout, bool ignore_answer, - bool collect_responses_extra_info) noexcept { +kphp::rpc::query_info send_request(std::string_view actor, std::optional opt_timeout, bool ignore_answer, bool collect_responses_extra_info) noexcept { auto& rpc_client_instance_st{RpcClientInstanceState::get()}; auto& rpc_server_instance_st{RpcServerInstanceState::get()}; const size_t request_size{rpc_server_instance_st.tl_storer.view().size_bytes()}; const auto timestamp{std::chrono::duration{std::chrono::system_clock::now().time_since_epoch()}.count()}; - auto expected_stream{kphp::component::stream::open(actor, k2::stream_kind::component)}; - if (!expected_stream) [[unlikely]] { - co_return kphp::rpc::query_info{ - .id = kphp::rpc::INVALID_QUERY_ID, .request_size = rpc_server_instance_st.tl_storer.view().size_bytes(), .timestamp = timestamp}; - } - - auto stream{*std::move(expected_stream)}; - { - auto tl_storer{std::exchange(rpc_server_instance_st.tl_storer, tl::storer{0})}; - const vk::final_action finalizer{[&tl_storer, &rpc_server_instance_st] noexcept { - if (tl_storer.capacity() > rpc_server_instance_st.tl_storer.capacity()) { - std::swap(tl_storer, rpc_server_instance_st.tl_storer); - } - }}; - - // prepare and send RPC request - // 'request_buf' will look like this: - // [ RpcExtraHeaders (optional) ] [ payload ] - if (const auto& [opt_new_extra_header, cur_extra_header_size]{kphp::rpc::regularize_extra_headers(tl_storer.view(), ignore_answer)}; opt_new_extra_header) { - std::span request_body{tl_storer.view().subspan(cur_extra_header_size)}; - std::span new_header{reinterpret_cast(std::addressof(*opt_new_extra_header)), - sizeof(std::remove_cvref_t)}; - - if (!co_await stream.write_all(new_header) || !co_await kphp::component::send_request(stream, request_body)) [[unlikely]] { - co_return kphp::rpc::query_info{.id = kphp::rpc::INVALID_QUERY_ID, .request_size = request_size, .timestamp = timestamp}; - } - } else if (!co_await kphp::component::send_request(stream, tl_storer.view())) [[unlikely]] { - co_return kphp::rpc::query_info{.id = kphp::rpc::INVALID_QUERY_ID, .request_size = request_size, .timestamp = timestamp}; + auto tl_storer{std::exchange(rpc_server_instance_st.tl_storer, tl::storer{0})}; + const vk::final_action finalizer{[&tl_storer, &rpc_server_instance_st] noexcept { + if (tl_storer.capacity() > rpc_server_instance_st.tl_storer.capacity()) { + std::swap(tl_storer, rpc_server_instance_st.tl_storer); } - } + }}; const auto query_id{rpc_client_instance_st.current_query_id++}; - // create response extra info - if (collect_responses_extra_info) { - rpc_client_instance_st.rpc_responses_extra_info.emplace(query_id, std::make_pair(response_extra_info_status::not_ready, response_extra_info{0, timestamp})); - } - - // create a task to wait for RPC response. we need to do it even if 'ignore_answer' is 'true' to make sure - // that the stream will not be closed too early. otherwise, platform may even not send RPC request - static constexpr auto awaiter_coroutine{[](int64_t query_id, kphp::component::stream stream, std::chrono::milliseconds timeout, - bool collect_responses_extra_info) noexcept -> kphp::coro::shared_task> { - std::optional opt_response{std::in_place}; - auto fetch_task{kphp::component::fetch_response(stream, kphp::component::read_ext::append(*opt_response))}; - if (auto expected{co_await kphp::coro::io_scheduler::get().schedule(std::move(fetch_task), timeout)}; !expected) [[unlikely]] { - opt_response = std::nullopt; - } - - // update response extra info if needed - if (collect_responses_extra_info) { - auto& extra_info_map{RpcClientInstanceState::get().rpc_responses_extra_info}; - if (const auto it_extra_info{extra_info_map.find(query_id)}; it_extra_info != extra_info_map.end()) [[likely]] { - const auto timestamp{std::chrono::duration{std::chrono::system_clock::now().time_since_epoch()}.count()}; - const auto response_size{opt_response.transform([](const string& response) noexcept { return static_cast(response.size()); }).value_or(0)}; - it_extra_info->second.second = std::make_tuple(response_size, timestamp - std::get<1>(it_extra_info->second.second)); - it_extra_info->second.first = response_extra_info_status::ready; - } else { - kphp::log::warning("can't find extra info for RPC query {}", query_id); - } - } - - co_return std::move(opt_response); - }}; - static constexpr auto ignore_answer_awaiter_coroutine{ - [](kphp::component::stream stream, std::chrono::milliseconds timeout) noexcept -> kphp::coro::shared_task<> { - auto fetch_task{kphp::component::fetch_response(stream, [](std::span) noexcept {})}; - std::ignore = co_await kphp::coro::io_scheduler::get().schedule(std::move(fetch_task), timeout); - }}; + [](query_handle handle, std::chrono::milliseconds timeout) noexcept -> kphp::coro::shared_task<> { std::ignore = co_await handle.get_response(); }}; // normalize timeout using namespace std::chrono_literals; @@ -362,20 +307,24 @@ kphp::coro::task send_request(std::string_view actor, std }) .value_or(DEFAULT_TIMEOUT), MIN_TIMEOUT, MAX_TIMEOUT)}; + + auto query_handle_expected{kphp::rpc::send_and_get_handle(actor, collect_responses_extra_info, timeout, timestamp, query_id, tl_storer.view())}; + if (!query_handle_expected) { + return kphp::rpc::query_info{.id = kphp::rpc::INTERNAL_ERROR, .request_size = request_size, .timestamp = timestamp}; + } + auto query_handle{std::move(*query_handle_expected)}; + if (ignore_answer) { // start ignore answer awaiter task - auto ignore_answer_awaiter_task{ignore_answer_awaiter_coroutine(std::move(stream), timeout)}; + auto ignore_answer_awaiter_task{ignore_answer_awaiter_coroutine(std::move(query_handle), timeout)}; kphp::log::assertion(kphp::coro::io_scheduler::get().start(ignore_answer_awaiter_task)); rpc_client_instance_st.ignore_answer_request_awaiter_tasks.push(std::move(ignore_answer_awaiter_task)); - co_return kphp::rpc::query_info{.id = kphp::rpc::IGNORED_ANSWER_QUERY_ID, .request_size = request_size, .timestamp = timestamp}; + return kphp::rpc::query_info{.id = kphp::rpc::IGNORED_ANSWER_QUERY_ID, .request_size = request_size, .timestamp = timestamp}; } - // start awaiter task - auto awaiter_task{awaiter_coroutine(query_id, std::move(stream), timeout, collect_responses_extra_info)}; - kphp::log::assertion(kphp::coro::io_scheduler::get().start(awaiter_task)); - rpc_client_instance_st.response_awaiter_tasks.emplace(query_id, std::move(awaiter_task)); - co_return kphp::rpc::query_info{.id = query_id, .request_size = request_size, .timestamp = timestamp}; + rpc_client_instance_st.rpc_query_handles.emplace(query_id, std::move(query_handle)); + return kphp::rpc::query_info{.id = query_id, .request_size = request_size, .timestamp = timestamp}; } } // namespace kphp::rpc diff --git a/runtime-light/stdlib/rpc/rpc-api.h b/runtime-light/stdlib/rpc/rpc-api.h index 5a9a43872b..324130e975 100644 --- a/runtime-light/stdlib/rpc/rpc-api.h +++ b/runtime-light/stdlib/rpc/rpc-api.h @@ -40,8 +40,7 @@ struct query_info { double timestamp{0.0}; }; -kphp::coro::task send_request(std::string_view actor, std::optional timeout, bool ignore_answer, - bool collect_responses_extra_info) noexcept; +kphp::rpc::query_info send_request(std::string_view actor, std::optional timeout, bool ignore_answer, bool collect_responses_extra_info) noexcept; inline kphp::coro::task> send_response(std::span response) noexcept { auto& rpc_server_instance_st{RpcServerInstanceState::get()}; @@ -59,13 +58,13 @@ inline kphp::coro::task> send_response(std::span rpc_tl_query_one_impl(std::string_view actor, mixed tl_object, std::optional opt_timeout, - bool collect_resp_extra_info, bool ignore_answer) noexcept; +kphp::rpc::query_info rpc_tl_query_one_impl(std::string_view actor, mixed tl_object, std::optional opt_timeout, bool collect_resp_extra_info, + bool ignore_answer) noexcept; kphp::coro::task> rpc_tl_query_result_one_impl(int64_t query_id) noexcept; -kphp::coro::task typed_rpc_tl_query_one_impl(std::string_view actor, const RpcRequest& rpc_request, std::optional opt_timeout, - bool collect_responses_extra_info, bool ignore_answer) noexcept; +kphp::rpc::query_info typed_rpc_tl_query_one_impl(std::string_view actor, const RpcRequest& rpc_request, std::optional opt_timeout, + bool collect_responses_extra_info, bool ignore_answer) noexcept; kphp::coro::task> typed_rpc_tl_query_result_one_impl(int64_t query_id, const RpcErrorFactory& error_factory) noexcept; @@ -287,9 +286,8 @@ inline int64_t f$rpc_tl_pending_queries_count() noexcept { // === client untyped ============================================================================= -inline kphp::coro::task> f$rpc_send_requests(string actor, array tl_objects, Optional timeout, bool ignore_answer, - class_instance requests_extra_info, - bool need_responses_extra_info) noexcept { +inline array f$rpc_send_requests(string actor, array tl_objects, Optional timeout, bool ignore_answer, + class_instance requests_extra_info, bool need_responses_extra_info) noexcept { if (ignore_answer && need_responses_extra_info) [[unlikely]] { kphp::log::warning("both $ignore_answer and $need_responses_extra_info are 'true'. Metrics won't be collected"); } @@ -300,8 +298,8 @@ inline kphp::coro::task> f$rpc_send_requests(string actor, array< auto opt_timeout{timeout.has_value() ? std::optional{timeout.val()} : std::optional{}}; for (const auto& it : std::as_const(tl_objects)) { - const auto query_info{co_await kphp::forks::id_managed( - kphp::rpc::detail::rpc_tl_query_one_impl({actor.c_str(), actor.size()}, it.get_value(), opt_timeout, collect_resp_extra_info, ignore_answer))}; + const auto query_info{ + kphp::rpc::detail::rpc_tl_query_one_impl({actor.c_str(), actor.size()}, it.get_value(), opt_timeout, collect_resp_extra_info, ignore_answer)}; query_ids.set_value(it.get_key(), query_info.id); req_extra_info_arr.set_value(it.get_key(), kphp::rpc::request_extra_info{query_info.request_size}); } @@ -309,7 +307,7 @@ inline kphp::coro::task> f$rpc_send_requests(string actor, array< if (!requests_extra_info.is_null()) { requests_extra_info->extra_info_arr = std::move(req_extra_info_arr); } - co_return std::move(query_ids); + return query_ids; } inline kphp::coro::task>> f$rpc_fetch_responses(array query_ids) noexcept { @@ -337,9 +335,9 @@ kphp::coro::task>> f$rpc_fetch_responses_synchronously(array< // === client typed =============================================================================== template rpc_function_type, std::same_as rpc_request_type = KphpRpcRequest> -kphp::coro::task> -f$rpc_send_typed_query_requests(string actor, array> query_functions, Optional timeout, bool ignore_answer, - class_instance requests_extra_info, bool need_responses_extra_info) noexcept { +array f$rpc_send_typed_query_requests(string actor, array> query_functions, Optional timeout, + bool ignore_answer, class_instance requests_extra_info, + bool need_responses_extra_info) noexcept { if (ignore_answer && need_responses_extra_info) [[unlikely]] { kphp::log::warning("both $ignore_answer and $need_responses_extra_info are 'true'. Metrics won't be collected"); } @@ -350,8 +348,8 @@ f$rpc_send_typed_query_requests(string actor, array{timeout.val()} : std::optional{}}; for (const auto& it : std::as_const(query_functions)) { - const auto query_info{co_await kphp::forks::id_managed(kphp::rpc::detail::typed_rpc_tl_query_one_impl( - {actor.c_str(), actor.size()}, rpc_request_type{it.get_value()}, opt_timeout, collect_resp_extra_info, ignore_answer))}; + const auto query_info{kphp::rpc::detail::typed_rpc_tl_query_one_impl({actor.c_str(), actor.size()}, rpc_request_type{it.get_value()}, opt_timeout, + collect_resp_extra_info, ignore_answer)}; query_ids.set_value(it.get_key(), query_info.id); req_extra_info_arr.set_value(it.get_key(), kphp::rpc::request_extra_info{query_info.request_size}); } @@ -359,7 +357,7 @@ f$rpc_send_typed_query_requests(string actor, arrayextra_info_arr = std::move(req_extra_info_arr); } - co_return std::move(query_ids); + return query_ids; } template query_id_type = int64_t, std::same_as error_factory_type = RpcResponseErrorFactory> diff --git a/runtime-light/stdlib/rpc/rpc-client-state.h b/runtime-light/stdlib/rpc/rpc-client-state.h index c4d86416b2..a53d3f1fca 100644 --- a/runtime-light/stdlib/rpc/rpc-client-state.h +++ b/runtime-light/stdlib/rpc/rpc-client-state.h @@ -16,6 +16,7 @@ #include "runtime-light/coroutine/shared-task.h" #include "runtime-light/stdlib/rpc/rpc-constants.h" #include "runtime-light/stdlib/rpc/rpc-extra-info.h" +#include "runtime-light/stdlib/rpc/rpc-query-handle.h" #include "runtime-light/stdlib/rpc/rpc-tl-defs.h" #include "runtime-light/stdlib/rpc/rpc-tl-query.h" @@ -23,6 +24,7 @@ struct RpcClientInstanceState final : private vk::not_copyable { CurrentTlQuery current_client_query{}; int64_t current_query_id{kphp::rpc::VALID_QUERY_ID_RANGE_START}; + kphp::stl::unordered_map rpc_query_handles; kphp::stl::unordered_map>, kphp::memory::script_allocator> response_awaiter_tasks; kphp::stl::unordered_map, kphp::memory::script_allocator> response_fetcher_instances; kphp::stl::unordered_map, kphp::memory::script_allocator> diff --git a/runtime-light/stdlib/rpc/rpc-constants.h b/runtime-light/stdlib/rpc/rpc-constants.h index 2f004f12da..9f9c7a2034 100644 --- a/runtime-light/stdlib/rpc/rpc-constants.h +++ b/runtime-light/stdlib/rpc/rpc-constants.h @@ -11,6 +11,7 @@ namespace kphp::rpc { inline constexpr int64_t VALID_QUERY_ID_RANGE_START = 1; inline constexpr int64_t INVALID_QUERY_ID = 0; inline constexpr int64_t IGNORED_ANSWER_QUERY_ID = -1; +inline constexpr int64_t INTERNAL_ERROR = 2; enum class error : int32_t { // stream errors diff --git a/runtime-light/stdlib/rpc/rpc-query-handle.cpp b/runtime-light/stdlib/rpc/rpc-query-handle.cpp new file mode 100644 index 0000000000..6d898ad9e3 --- /dev/null +++ b/runtime-light/stdlib/rpc/rpc-query-handle.cpp @@ -0,0 +1,67 @@ +// Compiler for PHP (aka KPHP) +// Copyright (c) 2026 LLC «V Kontakte» +// Distributed under the GPL v3 License, see LICENSE.notice.txt + +#include +#include +#include +#include +#include +#include + +#include "common/rpc-error-codes.h" +#include "runtime-common/core/runtime-core.h" +#include "runtime-light/k2-platform/k2-api.h" +#include "runtime-light/stdlib/diagnostics/logs.h" +#include "runtime-light/stdlib/rpc/rpc-client-state.h" + +namespace kphp::rpc { + +std::expected send_and_get_handle(std::string_view actor, bool collect_responses_extra_info, std::chrono::milliseconds timeout, + double timestamp, int64_t query_id, std::span request_buffer) noexcept { + auto& rpc_client_instance_st{RpcClientInstanceState::get()}; + + auto rpc_d_exp{k2::rpc_send_request(actor, request_buffer)}; + if (!rpc_d_exp) { + return std::unexpected{rpc_d_exp.error()}; + } + k2::descriptor rpc_d{*rpc_d_exp}; + + // create response extra info + if (collect_responses_extra_info) { + rpc_client_instance_st.rpc_responses_extra_info.emplace(query_id, std::make_pair(response_extra_info_status::not_ready, response_extra_info{0, timestamp})); + } + + std::chrono::nanoseconds deadline{timeout_to_deadline(timeout)}; + + return {query_handle{rpc_d, query_id, deadline, collect_responses_extra_info}}; +} + +std::expected> query_handle::get_ready_response() noexcept { + std::expected first_response_size{k2::rpc_get_response_size(rpc_d)}; + if (!first_response_size) { + return std::unexpected{std::make_pair(TL_ERROR_INTERNAL, string{"error fetching rpc response"})}; + } + string response{reinterpret_cast(k2::alloc(*first_response_size)), static_cast(*first_response_size)}; + std::expected response_fetch_result{k2::rpc_fetch_response(rpc_d, {reinterpret_cast(response.buffer()), response.size()})}; + if (!response_fetch_result) { + return std::unexpected{std::make_pair(TL_ERROR_INTERNAL, string{"error fetching rpc response"})}; + } + + // update response extra info if needed + if (collect_responses_extra_info) { + auto& rpc_client_instance_st{RpcClientInstanceState::get()}; + auto& extra_info_map{rpc_client_instance_st.rpc_responses_extra_info}; + if (const auto it_extra_info{extra_info_map.find(query_id)}; it_extra_info != extra_info_map.end()) [[likely]] { + const auto timestamp{std::chrono::duration{std::chrono::system_clock::now().time_since_epoch()}.count()}; + it_extra_info->second.second = std::make_tuple(response.size(), timestamp - std::get<1>(it_extra_info->second.second)); + it_extra_info->second.first = response_extra_info_status::ready; + } else { + kphp::log::warning("can't find extra info for RPC query {}", query_id); + } + } + + return {response}; +} + +} // namespace kphp::rpc diff --git a/runtime-light/stdlib/rpc/rpc-query-handle.h b/runtime-light/stdlib/rpc/rpc-query-handle.h new file mode 100644 index 0000000000..a74806a404 --- /dev/null +++ b/runtime-light/stdlib/rpc/rpc-query-handle.h @@ -0,0 +1,99 @@ +// Compiler for PHP (aka KPHP) +// Copyright (c) 2026 LLC «V Kontakte» +// Distributed under the GPL v3 License, see LICENSE.notice.txt + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "common/rpc-error-codes.h" +#include "rpc-client-state.h" +#include "runtime-common/core/runtime-core.h" +#include "runtime-light/coroutine/io-scheduler.h" +#include "runtime-light/coroutine/task.h" +#include "runtime-light/k2-platform/k2-api.h" +#include "runtime-light/stdlib/diagnostics/logs.h" +#include "runtime-light/stdlib/rpc/rpc-client-state.h" +#include "runtime-light/stdlib/time/util.h" + +namespace kphp::rpc { + +class query_handle { + k2::descriptor rpc_d{k2::INVALID_PLATFORM_DESCRIPTOR}; + int64_t query_id; + std::chrono::nanoseconds deadline{}; + bool collect_responses_extra_info{false}; + +public: + query_handle() = delete; + + query_handle(k2::descriptor _rpc_d, int64_t _query_id, std::chrono::nanoseconds _deadline, bool _collect_responses_extra_info) noexcept + : rpc_d{_rpc_d}, + query_id{_query_id}, + deadline{_deadline}, + collect_responses_extra_info{_collect_responses_extra_info} {} + + query_handle(query_handle&& other) noexcept + : rpc_d{std::exchange(other.rpc_d, k2::INVALID_PLATFORM_DESCRIPTOR)}, + query_id{other.query_id}, + deadline{other.deadline}, + collect_responses_extra_info{other.collect_responses_extra_info} {} + + query_handle& operator=(query_handle&& other) noexcept { + if (this != std::addressof(other)) { + drop(); + rpc_d = std::exchange(other.rpc_d, k2::INVALID_PLATFORM_DESCRIPTOR); + query_id = other.query_id; + deadline = other.deadline; + collect_responses_extra_info = other.collect_responses_extra_info; + } + return *this; + } + + query_handle(const query_handle& other) = delete; + + query_handle& operator=(const query_handle& other) = delete; + + ~query_handle() noexcept { + drop(); + } + + void drop() noexcept { + if (rpc_d != k2::INVALID_PLATFORM_DESCRIPTOR) { + k2::free_descriptor(std::exchange(rpc_d, k2::INVALID_PLATFORM_DESCRIPTOR)); + } + } + + kphp::coro::task>> get_response() noexcept { + if (rpc_d == k2::INVALID_PLATFORM_DESCRIPTOR) { + co_return std::unexpected{std::make_pair(TL_ERROR_INTERNAL, string{"fetching rpc response from empty handle"})}; + } + + kphp::coro::io_scheduler& m_scheduler{kphp::coro::io_scheduler::get()}; + std::chrono::nanoseconds timeout{deadline_to_timeout(deadline)}; + + switch (co_await m_scheduler.poll(rpc_d, kphp::coro::poll_op::read, timeout)) { + case kphp::coro::poll_status::event: + co_return get_ready_response(); + case kphp::coro::poll_status::closed: + case kphp::coro::poll_status::timeout: + co_return std::unexpected{std::make_pair(TL_ERROR_QUERY_TIMEOUT, string{"rpc response timeout"})}; + case kphp::coro::poll_status::error: + co_return std::unexpected{std::make_pair(TL_ERROR_INTERNAL, string{"error fetching rpc response"})}; + } + } + +private: + std::expected> get_ready_response() noexcept; +}; + +std::expected send_and_get_handle(std::string_view actor, bool collect_responses_extra_info, std::chrono::milliseconds timeout, + double timestamp, int64_t query_id, std::span request_buffer) noexcept; + +} // namespace kphp::rpc diff --git a/runtime-light/stdlib/rpc/rpc-queue-functions.h b/runtime-light/stdlib/rpc/rpc-queue-functions.h index d120f452fd..0c894d4813 100644 --- a/runtime-light/stdlib/rpc/rpc-queue-functions.h +++ b/runtime-light/stdlib/rpc/rpc-queue-functions.h @@ -17,20 +17,22 @@ #include "runtime-light/stdlib/diagnostics/logs.h" #include "runtime-light/stdlib/fork/fork-functions.h" #include "runtime-light/stdlib/rpc/rpc-client-state.h" +#include "runtime-light/stdlib/rpc/rpc-query-handle.h" #include "runtime-light/stdlib/rpc/rpc-queue-state.h" +#include "runtime-light/stdlib/time/util.h" namespace kphp::rpc { inline void rpc_queue_push(int64_t queue_id, int64_t request_id) noexcept { - static constexpr auto rpc_queue_wrapper_task{[](kphp::coro::shared_task<> awaiter_task, int64_t request_id) noexcept -> kphp::coro::task { - co_await std::move(awaiter_task).when_ready(); + static constexpr auto rpc_queue_wrapper_task{[](kphp::rpc::query_handle query_handle, int64_t request_id) noexcept -> kphp::coro::task { + std::ignore = co_await query_handle.get_response(); co_return request_id; }}; auto& rpc_client_instance_st{RpcClientInstanceState::get()}; - const auto it_awaiter_task{rpc_client_instance_st.response_awaiter_tasks.find(request_id)}; - if (it_awaiter_task == rpc_client_instance_st.response_awaiter_tasks.end()) [[unlikely]] { + const auto it_rpc_request_info{rpc_client_instance_st.rpc_query_handles.find(request_id)}; + if (it_rpc_request_info == rpc_client_instance_st.rpc_query_handles.end()) [[unlikely]] { kphp::log::warning("could not find rpc query with id {} in pending queries", queue_id); return; } @@ -43,7 +45,7 @@ inline void rpc_queue_push(int64_t queue_id, int64_t request_id) noexcept { } auto& await_set{(*opt_await_set).get()}; - await_set.push(rpc_queue_wrapper_task(static_cast>(it_awaiter_task->second), request_id)); + await_set.push(rpc_queue_wrapper_task(std::move(it_rpc_request_info->second), request_id)); } inline int64_t rpc_queue_create(std::span request_ids) noexcept { diff --git a/runtime-light/stdlib/stdlib.cmake b/runtime-light/stdlib/stdlib.cmake index 92f67cd6b4..5ce1470ba5 100644 --- a/runtime-light/stdlib/stdlib.cmake +++ b/runtime-light/stdlib/stdlib.cmake @@ -29,6 +29,7 @@ prepend( rpc/rpc-client-state.cpp rpc/rpc-extra-headers.cpp rpc/rpc-extra-info.cpp + rpc/rpc-query-handle.cpp rpc/rpc-queue-state.cpp rpc/rpc-tl-builtins.cpp rpc/rpc-tl-error.cpp diff --git a/runtime-light/stdlib/time/util.h b/runtime-light/stdlib/time/util.h new file mode 100644 index 0000000000..4749b958b5 --- /dev/null +++ b/runtime-light/stdlib/time/util.h @@ -0,0 +1,41 @@ +// Compiler for PHP (aka KPHP) +// Copyright (c) 2026 LLC «V Kontakte» +// Distributed under the GPL v3 License, see LICENSE.notice.txt + +#pragma once + +#include +#include + +#include "runtime-light/k2-platform/k2-api.h" +#include "runtime-light/metaprogramming/concepts.h" + +/** + * Calculate time remaining to the deadline. + */ +template +inline duration_type deadline_to_timeout(duration_type deadline) noexcept { + k2::TimePoint now_instant{}; + k2::instant(std::addressof(now_instant)); + + std::chrono::nanoseconds now_ns{now_instant.time_point_ns}; + std::chrono::nanoseconds deadline_ns{duration_cast(deadline)}; + std::chrono::nanoseconds timeout_ns{deadline_ns - now_ns}; + + return duration_cast(timeout_ns); +} + +/** + * Converts timeout to time point, when timeout will elapse - deadline. + */ +template +inline duration_type timeout_to_deadline(duration_type timeout) { + k2::TimePoint now_instant{}; + k2::instant(std::addressof(now_instant)); + + std::chrono::nanoseconds now_ns{now_instant.time_point_ns}; + std::chrono::nanoseconds timeout_ns{duration_cast(timeout)}; + std::chrono::nanoseconds deadline{now_ns + timeout_ns}; + + return duration_cast(deadline); +}