Skip to content
Open
3 changes: 1 addition & 2 deletions builtin-functions/kphp-light/stdlib/rpc.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,14 @@ final class KphpRpcRequestsExtraInfo {
public function get ();
}

/** @kphp-extern-func-info interruptible */
function rpc_send_requests($actor ::: string,
$arr ::: array,
$timeout ::: ?float,
$ignore_answer ::: bool,
\KphpRpcRequestsExtraInfo $requests_extra_info,
$need_responses_extra_info ::: bool) ::: int[];

/** @kphp-extern-func-info tl_common_h_dep interruptible */
/** @kphp-extern-func-info tl_common_h_dep */
function rpc_send_typed_query_requests($actor ::: string, @tl\RpcFunction[] $query_functions,
$timeout ::: ?float,
$ignore_answer ::: bool,
Expand Down
24 changes: 24 additions & 0 deletions runtime-light/k2-platform/k2-api.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,30 @@ inline int32_t component_access(std::string_view component_name) noexcept {
return k2_component_access(component_name.size(), component_name.data());
}

inline std::expected<uint64_t, int32_t> rpc_send_request(std::string_view actor_name, std::span<const std::byte> request_buffer) noexcept {
uint64_t rpc_d{};
if (auto error_code{k2_rpc_send(actor_name.data(), actor_name.size(), request_buffer.data(), request_buffer.size(), RpcKind::TL_RPC, std::addressof(rpc_d))};
error_code != k2::errno_ok) {
return std::unexpected{error_code};
}
return {rpc_d};
}

inline std::expected<size_t, int32_t> rpc_get_response_size(uint64_t rpc_d) noexcept {
size_t size{};
if (auto error_code{k2_rpc_get_response_size(rpc_d, std::addressof(size))}; error_code != k2::errno_ok) {
return std::unexpected{error_code};
}
return {size};
}

inline std::expected<void, int32_t> rpc_fetch_response(uint64_t rpc_d, std::span<std::byte> buffer) noexcept {
if (auto error_code{k2_rpc_fetch_response(rpc_d, buffer.data(), buffer.size())}; error_code != errno_ok) {
return std::unexpected{error_code};
}
return {};
}

inline void stream_status(k2::descriptor descriptor, StreamStatus* status) noexcept {
k2_stream_status(descriptor, status);
}
Expand Down
40 changes: 40 additions & 0 deletions runtime-light/k2-platform/k2-header.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ enum UpdateStatus {
NewDescriptor = 2,
};

enum RpcKind {
TL_RPC = 0,
};

struct ImageInfo {
// Base
const char* image_name;
Expand Down Expand Up @@ -316,6 +320,42 @@ int32_t k2_unlink(const char* path, size_t path_len);
*/
int32_t k2_component_access(size_t name_len, const char* name);

/**
* Try to send rpc request. In case of success write descriptor of rpc request to `rpc_d`, otherwise return `errno` != 0,
* which should be later used to call `k2_rpc_fetch_response`.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: fix grammar

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

*
* @return return `0` on success. libc-like `errno` otherwise
*
* Possible `errno` values:
* `EAI_MEMORY` => max descriptors count achieved
* `EINVAL` => invalid `actor_name` or request, or connection pool is empty for this actor.
*/
int32_t k2_rpc_send(const char* actor_name, size_t actor_name_len, const void* request_ptr, size_t request_size, enum RpcKind rpc_kind, uint64_t* rpc_d);

/**
* Get response size for corresponding request of this `rpc_d`. Write 0 to `response_size` and return `EAGAIN` if response is not ready.
* Write response size value to `response_size` if response is ready.
*
* @return return `0` on success. libc-like `errno` otherwise
*
* Possible `errno` values:
* `EINVAL` => invalid `rpc_d` descriptor, for example, it is unknown descriptor, or not rpc descriptor.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add something like * Possible errno values:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

* `EAGAIN` => response is not ready yet.
*/
int32_t k2_rpc_get_response_size(uint64_t rpc_d, size_t* response_size);

/**
* Write response for corresponding request of this `rpc_d` to `buf`. Return `EAGAIN` if response is not ready.
* If `buf_size` < response size, then write first `buf_size` bytes of response to `buf`.
*
* @return return `0` on success. libc-like `errno` otherwise
*
* Possible `errno` values:
* `EINVAL` => invalid `rpc_d` descriptor, for example, it is unknown descriptor, or not rpc descriptor.
* `EAGAIN` => response is not ready yet.
*/
int32_t k2_rpc_fetch_response(uint64_t rpc_d, void* buf, size_t buf_size);

/**
* If the write or read status is `Blocked` - then the platform ensures that
* the component receives this `stream_d` via `k2_take_update` when the status is
Expand Down
171 changes: 60 additions & 111 deletions runtime-light/stdlib/rpc/rpc-api.cpp

Large diffs are not rendered by default.

34 changes: 16 additions & 18 deletions runtime-light/stdlib/rpc/rpc-api.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ struct query_info {
double timestamp{0.0};
};

kphp::coro::task<kphp::rpc::query_info> send_request(std::string_view actor, std::optional<double> timeout, bool ignore_answer,
bool collect_responses_extra_info) noexcept;
kphp::rpc::query_info send_request(std::string_view actor, std::optional<double> timeout, bool ignore_answer, bool collect_responses_extra_info) noexcept;

inline kphp::coro::task<std::expected<void, int32_t>> send_response(std::span<const std::byte> response) noexcept {
auto& rpc_server_instance_st{RpcServerInstanceState::get()};
Expand All @@ -59,13 +58,13 @@ inline kphp::coro::task<std::expected<void, int32_t>> send_response(std::span<co

namespace detail {

kphp::coro::task<kphp::rpc::query_info> rpc_tl_query_one_impl(std::string_view actor, mixed tl_object, std::optional<double> opt_timeout,
bool collect_resp_extra_info, bool ignore_answer) noexcept;
kphp::rpc::query_info rpc_tl_query_one_impl(std::string_view actor, mixed tl_object, std::optional<double> opt_timeout, bool collect_resp_extra_info,
bool ignore_answer) noexcept;

kphp::coro::task<array<mixed>> rpc_tl_query_result_one_impl(int64_t query_id) noexcept;

kphp::coro::task<kphp::rpc::query_info> typed_rpc_tl_query_one_impl(std::string_view actor, const RpcRequest& rpc_request, std::optional<double> opt_timeout,
bool collect_responses_extra_info, bool ignore_answer) noexcept;
kphp::rpc::query_info typed_rpc_tl_query_one_impl(std::string_view actor, const RpcRequest& rpc_request, std::optional<double> opt_timeout,
bool collect_responses_extra_info, bool ignore_answer) noexcept;

kphp::coro::task<class_instance<C$VK$TL$RpcResponse>> typed_rpc_tl_query_result_one_impl(int64_t query_id, const RpcErrorFactory& error_factory) noexcept;

Expand Down Expand Up @@ -287,9 +286,8 @@ inline int64_t f$rpc_tl_pending_queries_count() noexcept {

// === client untyped =============================================================================

inline kphp::coro::task<array<int64_t>> f$rpc_send_requests(string actor, array<mixed> tl_objects, Optional<double> timeout, bool ignore_answer,
class_instance<C$KphpRpcRequestsExtraInfo> requests_extra_info,
bool need_responses_extra_info) noexcept {
inline array<int64_t> f$rpc_send_requests(string actor, array<mixed> tl_objects, Optional<double> timeout, bool ignore_answer,
class_instance<C$KphpRpcRequestsExtraInfo> requests_extra_info, bool need_responses_extra_info) noexcept {
if (ignore_answer && need_responses_extra_info) [[unlikely]] {
kphp::log::warning("both $ignore_answer and $need_responses_extra_info are 'true'. Metrics won't be collected");
}
Expand All @@ -300,16 +298,16 @@ inline kphp::coro::task<array<int64_t>> f$rpc_send_requests(string actor, array<
auto opt_timeout{timeout.has_value() ? std::optional<double>{timeout.val()} : std::optional<double>{}};

for (const auto& it : std::as_const(tl_objects)) {
const auto query_info{co_await kphp::forks::id_managed(
kphp::rpc::detail::rpc_tl_query_one_impl({actor.c_str(), actor.size()}, it.get_value(), opt_timeout, collect_resp_extra_info, ignore_answer))};
const auto query_info{
kphp::rpc::detail::rpc_tl_query_one_impl({actor.c_str(), actor.size()}, it.get_value(), opt_timeout, collect_resp_extra_info, ignore_answer)};
query_ids.set_value(it.get_key(), query_info.id);
req_extra_info_arr.set_value(it.get_key(), kphp::rpc::request_extra_info{query_info.request_size});
}

if (!requests_extra_info.is_null()) {
requests_extra_info->extra_info_arr = std::move(req_extra_info_arr);
}
co_return std::move(query_ids);
return query_ids;
}

inline kphp::coro::task<array<array<mixed>>> f$rpc_fetch_responses(array<int64_t> query_ids) noexcept {
Expand Down Expand Up @@ -337,9 +335,9 @@ kphp::coro::task<array<array<mixed>>> f$rpc_fetch_responses_synchronously(array<
// === client typed ===============================================================================

template<std::derived_from<C$VK$TL$RpcFunction> rpc_function_type, std::same_as<KphpRpcRequest> rpc_request_type = KphpRpcRequest>
kphp::coro::task<array<int64_t>>
f$rpc_send_typed_query_requests(string actor, array<class_instance<rpc_function_type>> query_functions, Optional<double> timeout, bool ignore_answer,
class_instance<C$KphpRpcRequestsExtraInfo> requests_extra_info, bool need_responses_extra_info) noexcept {
array<int64_t> f$rpc_send_typed_query_requests(string actor, array<class_instance<rpc_function_type>> query_functions, Optional<double> timeout,
bool ignore_answer, class_instance<C$KphpRpcRequestsExtraInfo> requests_extra_info,
bool need_responses_extra_info) noexcept {
if (ignore_answer && need_responses_extra_info) [[unlikely]] {
kphp::log::warning("both $ignore_answer and $need_responses_extra_info are 'true'. Metrics won't be collected");
}
Expand All @@ -350,16 +348,16 @@ f$rpc_send_typed_query_requests(string actor, array<class_instance<rpc_function_
auto opt_timeout{timeout.has_value() ? std::optional<double>{timeout.val()} : std::optional<double>{}};

for (const auto& it : std::as_const(query_functions)) {
const auto query_info{co_await kphp::forks::id_managed(kphp::rpc::detail::typed_rpc_tl_query_one_impl(
{actor.c_str(), actor.size()}, rpc_request_type{it.get_value()}, opt_timeout, collect_resp_extra_info, ignore_answer))};
const auto query_info{kphp::rpc::detail::typed_rpc_tl_query_one_impl({actor.c_str(), actor.size()}, rpc_request_type{it.get_value()}, opt_timeout,
collect_resp_extra_info, ignore_answer)};
query_ids.set_value(it.get_key(), query_info.id);
req_extra_info_arr.set_value(it.get_key(), kphp::rpc::request_extra_info{query_info.request_size});
}

if (!requests_extra_info.is_null()) {
requests_extra_info->extra_info_arr = std::move(req_extra_info_arr);
}
co_return std::move(query_ids);
return query_ids;
}

template<std::same_as<int64_t> query_id_type = int64_t, std::same_as<RpcResponseErrorFactory> error_factory_type = RpcResponseErrorFactory>
Expand Down
2 changes: 2 additions & 0 deletions runtime-light/stdlib/rpc/rpc-client-state.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
#include "runtime-light/coroutine/shared-task.h"
#include "runtime-light/stdlib/rpc/rpc-constants.h"
#include "runtime-light/stdlib/rpc/rpc-extra-info.h"
#include "runtime-light/stdlib/rpc/rpc-query-handle.h"
#include "runtime-light/stdlib/rpc/rpc-tl-defs.h"
#include "runtime-light/stdlib/rpc/rpc-tl-query.h"

struct RpcClientInstanceState final : private vk::not_copyable {
CurrentTlQuery current_client_query{};
int64_t current_query_id{kphp::rpc::VALID_QUERY_ID_RANGE_START};

kphp::stl::unordered_map<int64_t, kphp::rpc::query_handle, kphp::memory::script_allocator> rpc_query_handles;
kphp::stl::unordered_map<int64_t, kphp::coro::shared_task<std::optional<string>>, kphp::memory::script_allocator> response_awaiter_tasks;
kphp::stl::unordered_map<int64_t, class_instance<RpcTlQuery>, kphp::memory::script_allocator> response_fetcher_instances;
kphp::stl::unordered_map<int64_t, std::pair<kphp::rpc::response_extra_info_status, kphp::rpc::response_extra_info>, kphp::memory::script_allocator>
Expand Down
1 change: 1 addition & 0 deletions runtime-light/stdlib/rpc/rpc-constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace kphp::rpc {
inline constexpr int64_t VALID_QUERY_ID_RANGE_START = 1;
inline constexpr int64_t INVALID_QUERY_ID = 0;
inline constexpr int64_t IGNORED_ANSWER_QUERY_ID = -1;
inline constexpr int64_t INTERNAL_ERROR = 2;

enum class error : int32_t {
// stream errors
Expand Down
67 changes: 67 additions & 0 deletions runtime-light/stdlib/rpc/rpc-query-handle.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Compiler for PHP (aka KPHP)
// Copyright (c) 2026 LLC «V Kontakte»
// Distributed under the GPL v3 License, see LICENSE.notice.txt

#include <chrono>
#include <cstddef>
#include <cstdint>
#include <expected>
#include <tuple>
#include <utility>

#include "common/rpc-error-codes.h"
#include "runtime-common/core/runtime-core.h"
#include "runtime-light/k2-platform/k2-api.h"
#include "runtime-light/stdlib/diagnostics/logs.h"
#include "runtime-light/stdlib/rpc/rpc-client-state.h"

namespace kphp::rpc {

std::expected<query_handle, int32_t> send_and_get_handle(std::string_view actor, bool collect_responses_extra_info, std::chrono::milliseconds timeout,
double timestamp, int64_t query_id, std::span<const std::byte> request_buffer) noexcept {
auto& rpc_client_instance_st{RpcClientInstanceState::get()};

auto rpc_d_exp{k2::rpc_send_request(actor, request_buffer)};
if (!rpc_d_exp) {
return std::unexpected{rpc_d_exp.error()};
}
k2::descriptor rpc_d{*rpc_d_exp};

// create response extra info
if (collect_responses_extra_info) {
rpc_client_instance_st.rpc_responses_extra_info.emplace(query_id, std::make_pair(response_extra_info_status::not_ready, response_extra_info{0, timestamp}));
}

std::chrono::nanoseconds deadline{timeout_to_deadline(timeout)};

return {query_handle{rpc_d, query_id, deadline, collect_responses_extra_info}};
}

std::expected<string, std::pair<int32_t, string>> query_handle::get_ready_response() noexcept {
std::expected<size_t, int32_t> first_response_size{k2::rpc_get_response_size(rpc_d)};
if (!first_response_size) {
return std::unexpected{std::make_pair(TL_ERROR_INTERNAL, string{"error fetching rpc response"})};
}
string response{reinterpret_cast<char*>(k2::alloc(*first_response_size)), static_cast<string::size_type>(*first_response_size)};
std::expected<void, int32_t> response_fetch_result{k2::rpc_fetch_response(rpc_d, {reinterpret_cast<std::byte*>(response.buffer()), response.size()})};
if (!response_fetch_result) {
return std::unexpected{std::make_pair(TL_ERROR_INTERNAL, string{"error fetching rpc response"})};
}

// update response extra info if needed
if (collect_responses_extra_info) {
auto& rpc_client_instance_st{RpcClientInstanceState::get()};
auto& extra_info_map{rpc_client_instance_st.rpc_responses_extra_info};
if (const auto it_extra_info{extra_info_map.find(query_id)}; it_extra_info != extra_info_map.end()) [[likely]] {
const auto timestamp{std::chrono::duration<double>{std::chrono::system_clock::now().time_since_epoch()}.count()};
it_extra_info->second.second = std::make_tuple(response.size(), timestamp - std::get<1>(it_extra_info->second.second));
it_extra_info->second.first = response_extra_info_status::ready;
} else {
kphp::log::warning("can't find extra info for RPC query {}", query_id);
}
}

return {response};
}

} // namespace kphp::rpc
99 changes: 99 additions & 0 deletions runtime-light/stdlib/rpc/rpc-query-handle.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Compiler for PHP (aka KPHP)
// Copyright (c) 2026 LLC «V Kontakte»
// Distributed under the GPL v3 License, see LICENSE.notice.txt

#pragma once

#include <chrono>
#include <cstddef>
#include <cstdint>
#include <expected>
#include <memory>
#include <tuple>
#include <utility>

#include "common/rpc-error-codes.h"
#include "rpc-client-state.h"
#include "runtime-common/core/runtime-core.h"
#include "runtime-light/coroutine/io-scheduler.h"
#include "runtime-light/coroutine/task.h"
#include "runtime-light/k2-platform/k2-api.h"
#include "runtime-light/stdlib/diagnostics/logs.h"
#include "runtime-light/stdlib/rpc/rpc-client-state.h"
#include "runtime-light/stdlib/time/util.h"

namespace kphp::rpc {

class query_handle {
k2::descriptor rpc_d{k2::INVALID_PLATFORM_DESCRIPTOR};
int64_t query_id;
std::chrono::nanoseconds deadline{};
bool collect_responses_extra_info{false};

public:
query_handle() = delete;

query_handle(k2::descriptor _rpc_d, int64_t _query_id, std::chrono::nanoseconds _deadline, bool _collect_responses_extra_info) noexcept
: rpc_d{_rpc_d},
query_id{_query_id},
deadline{_deadline},
collect_responses_extra_info{_collect_responses_extra_info} {}

query_handle(query_handle&& other) noexcept
: rpc_d{std::exchange(other.rpc_d, k2::INVALID_PLATFORM_DESCRIPTOR)},
query_id{other.query_id},
deadline{other.deadline},
collect_responses_extra_info{other.collect_responses_extra_info} {}

query_handle& operator=(query_handle&& other) noexcept {
if (this != std::addressof(other)) {
drop();
rpc_d = std::exchange(other.rpc_d, k2::INVALID_PLATFORM_DESCRIPTOR);
query_id = other.query_id;
deadline = other.deadline;
collect_responses_extra_info = other.collect_responses_extra_info;
}
return *this;
}

query_handle(const query_handle& other) = delete;

query_handle& operator=(const query_handle& other) = delete;

~query_handle() noexcept {
drop();
}

void drop() noexcept {
if (rpc_d != k2::INVALID_PLATFORM_DESCRIPTOR) {
k2::free_descriptor(std::exchange(rpc_d, k2::INVALID_PLATFORM_DESCRIPTOR));
}
}

kphp::coro::task<std::expected<string, std::pair<int32_t, string>>> get_response() noexcept {
if (rpc_d == k2::INVALID_PLATFORM_DESCRIPTOR) {
co_return std::unexpected{std::make_pair(TL_ERROR_INTERNAL, string{"fetching rpc response from empty handle"})};
}

kphp::coro::io_scheduler& m_scheduler{kphp::coro::io_scheduler::get()};
std::chrono::nanoseconds timeout{deadline_to_timeout(deadline)};

switch (co_await m_scheduler.poll(rpc_d, kphp::coro::poll_op::read, timeout)) {
case kphp::coro::poll_status::event:
co_return get_ready_response();
case kphp::coro::poll_status::closed:
case kphp::coro::poll_status::timeout:
co_return std::unexpected{std::make_pair(TL_ERROR_QUERY_TIMEOUT, string{"rpc response timeout"})};
case kphp::coro::poll_status::error:
co_return std::unexpected{std::make_pair(TL_ERROR_INTERNAL, string{"error fetching rpc response"})};
}
}

private:
std::expected<string, std::pair<int32_t, string>> get_ready_response() noexcept;
};

std::expected<query_handle, int32_t> send_and_get_handle(std::string_view actor, bool collect_responses_extra_info, std::chrono::milliseconds timeout,
double timestamp, int64_t query_id, std::span<const std::byte> request_buffer) noexcept;

} // namespace kphp::rpc
Loading
Loading