-
Notifications
You must be signed in to change notification settings - Fork 110
[k2] rpc client boost #1635
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
T-y-c-o-o-n
wants to merge
17
commits into
master
Choose a base branch
from
n.siniachenko/k2/rpc-client-boost
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
[k2] rpc client boost #1635
Changes from all commits
Commits
Show all changes
17 commits
Select commit
Hold shift + click to select a range
c82a52a
first impl
9758cab
added kphp rpc client bench
7ef70d1
break in switch + different debug exit codes
f127127
null timeout in bench
2ca69bd
rpc send functions now common functions instead of coroutines
894a467
remove one coroutine on rpc response fetch
bf28c06
removed test trash
b7a3bc9
error handling
83f1317
rpc_queue_push
c69a239
kphp::rpc::request_info
742da8d
fmt
2ad3c3e
added RpcKind + better docs
ed49edb
query_handle
8ed260f
send_and_get_handle
51e1458
refactored query_handle a bit
b55db67
refactorrr
e77636c
k2_rpc_get_response_size() and k2_rpc_fetch_response() now return `EA…
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -97,6 +97,10 @@ enum UpdateStatus { | |
| NewDescriptor = 2, | ||
| }; | ||
|
|
||
| enum RpcKind { | ||
| TL_RPC = 0, | ||
| }; | ||
|
|
||
| struct ImageInfo { | ||
| // Base | ||
| const char* image_name; | ||
|
|
@@ -316,6 +320,42 @@ int32_t k2_unlink(const char* path, size_t path_len); | |
| */ | ||
| int32_t k2_component_access(size_t name_len, const char* name); | ||
|
|
||
| /** | ||
| * Try to send rpc request. In case of success write descriptor of rpc request to `rpc_d`, otherwise return `errno` != 0, | ||
| * which should be later used to call `k2_rpc_fetch_response`. | ||
| * | ||
| * @return return `0` on success. libc-like `errno` otherwise | ||
| * | ||
| * Possible `errno` values: | ||
| * `EAI_MEMORY` => max descriptors count achieved | ||
| * `EINVAL` => invalid `actor_name` or request, or connection pool is empty for this actor. | ||
| */ | ||
| int32_t k2_rpc_send(const char* actor_name, size_t actor_name_len, const void* request_ptr, size_t request_size, enum RpcKind rpc_kind, uint64_t* rpc_d); | ||
|
|
||
| /** | ||
| * Get response size for corresponding request of this `rpc_d`. Write 0 to `response_size` and return `EAGAIN` if response is not ready. | ||
| * Write response size value to `response_size` if response is ready. | ||
| * | ||
| * @return return `0` on success. libc-like `errno` otherwise | ||
| * | ||
| * Possible `errno` values: | ||
| * `EINVAL` => invalid `rpc_d` descriptor, for example, it is unknown descriptor, or not rpc descriptor. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's add something like
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added |
||
| * `EAGAIN` => response is not ready yet. | ||
| */ | ||
| int32_t k2_rpc_get_response_size(uint64_t rpc_d, size_t* response_size); | ||
|
|
||
| /** | ||
| * Write response for corresponding request of this `rpc_d` to `buf`. Return `EAGAIN` if response is not ready. | ||
| * If `buf_size` < response size, then write first `buf_size` bytes of response to `buf`. | ||
| * | ||
| * @return return `0` on success. libc-like `errno` otherwise | ||
| * | ||
| * Possible `errno` values: | ||
| * `EINVAL` => invalid `rpc_d` descriptor, for example, it is unknown descriptor, or not rpc descriptor. | ||
| * `EAGAIN` => response is not ready yet. | ||
| */ | ||
| int32_t k2_rpc_fetch_response(uint64_t rpc_d, void* buf, size_t buf_size); | ||
|
|
||
| /** | ||
| * If the write or read status is `Blocked` - then the platform ensures that | ||
| * the component receives this `stream_d` via `k2_take_update` when the status is | ||
|
|
||
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,67 @@ | ||
| // Compiler for PHP (aka KPHP) | ||
| // Copyright (c) 2026 LLC «V Kontakte» | ||
| // Distributed under the GPL v3 License, see LICENSE.notice.txt | ||
|
|
||
| #include <chrono> | ||
| #include <cstddef> | ||
| #include <cstdint> | ||
| #include <expected> | ||
| #include <tuple> | ||
| #include <utility> | ||
|
|
||
| #include "common/rpc-error-codes.h" | ||
| #include "runtime-common/core/runtime-core.h" | ||
| #include "runtime-light/k2-platform/k2-api.h" | ||
| #include "runtime-light/stdlib/diagnostics/logs.h" | ||
| #include "runtime-light/stdlib/rpc/rpc-client-state.h" | ||
|
|
||
| namespace kphp::rpc { | ||
|
|
||
| std::expected<query_handle, int32_t> send_and_get_handle(std::string_view actor, bool collect_responses_extra_info, std::chrono::milliseconds timeout, | ||
| double timestamp, int64_t query_id, std::span<const std::byte> request_buffer) noexcept { | ||
| auto& rpc_client_instance_st{RpcClientInstanceState::get()}; | ||
|
|
||
| auto rpc_d_exp{k2::rpc_send_request(actor, request_buffer)}; | ||
| if (!rpc_d_exp) { | ||
| return std::unexpected{rpc_d_exp.error()}; | ||
| } | ||
| k2::descriptor rpc_d{*rpc_d_exp}; | ||
|
|
||
| // create response extra info | ||
| if (collect_responses_extra_info) { | ||
| rpc_client_instance_st.rpc_responses_extra_info.emplace(query_id, std::make_pair(response_extra_info_status::not_ready, response_extra_info{0, timestamp})); | ||
| } | ||
|
|
||
| std::chrono::nanoseconds deadline{timeout_to_deadline(timeout)}; | ||
|
|
||
| return {query_handle{rpc_d, query_id, deadline, collect_responses_extra_info}}; | ||
| } | ||
|
|
||
| std::expected<string, std::pair<int32_t, string>> query_handle::get_ready_response() noexcept { | ||
| std::expected<size_t, int32_t> first_response_size{k2::rpc_get_response_size(rpc_d)}; | ||
| if (!first_response_size) { | ||
| return std::unexpected{std::make_pair(TL_ERROR_INTERNAL, string{"error fetching rpc response"})}; | ||
| } | ||
| string response{reinterpret_cast<char*>(k2::alloc(*first_response_size)), static_cast<string::size_type>(*first_response_size)}; | ||
| std::expected<void, int32_t> response_fetch_result{k2::rpc_fetch_response(rpc_d, {reinterpret_cast<std::byte*>(response.buffer()), response.size()})}; | ||
| if (!response_fetch_result) { | ||
| return std::unexpected{std::make_pair(TL_ERROR_INTERNAL, string{"error fetching rpc response"})}; | ||
| } | ||
|
|
||
| // update response extra info if needed | ||
| if (collect_responses_extra_info) { | ||
| auto& rpc_client_instance_st{RpcClientInstanceState::get()}; | ||
| auto& extra_info_map{rpc_client_instance_st.rpc_responses_extra_info}; | ||
| if (const auto it_extra_info{extra_info_map.find(query_id)}; it_extra_info != extra_info_map.end()) [[likely]] { | ||
| const auto timestamp{std::chrono::duration<double>{std::chrono::system_clock::now().time_since_epoch()}.count()}; | ||
| it_extra_info->second.second = std::make_tuple(response.size(), timestamp - std::get<1>(it_extra_info->second.second)); | ||
| it_extra_info->second.first = response_extra_info_status::ready; | ||
| } else { | ||
| kphp::log::warning("can't find extra info for RPC query {}", query_id); | ||
| } | ||
| } | ||
|
|
||
| return {response}; | ||
| } | ||
|
|
||
| } // namespace kphp::rpc |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,99 @@ | ||
| // Compiler for PHP (aka KPHP) | ||
| // Copyright (c) 2026 LLC «V Kontakte» | ||
| // Distributed under the GPL v3 License, see LICENSE.notice.txt | ||
|
|
||
| #pragma once | ||
|
|
||
| #include <chrono> | ||
| #include <cstddef> | ||
| #include <cstdint> | ||
| #include <expected> | ||
| #include <memory> | ||
| #include <tuple> | ||
| #include <utility> | ||
|
|
||
| #include "common/rpc-error-codes.h" | ||
| #include "rpc-client-state.h" | ||
| #include "runtime-common/core/runtime-core.h" | ||
| #include "runtime-light/coroutine/io-scheduler.h" | ||
| #include "runtime-light/coroutine/task.h" | ||
| #include "runtime-light/k2-platform/k2-api.h" | ||
| #include "runtime-light/stdlib/diagnostics/logs.h" | ||
| #include "runtime-light/stdlib/rpc/rpc-client-state.h" | ||
| #include "runtime-light/stdlib/time/util.h" | ||
|
|
||
| namespace kphp::rpc { | ||
|
|
||
| class query_handle { | ||
| k2::descriptor rpc_d{k2::INVALID_PLATFORM_DESCRIPTOR}; | ||
| int64_t query_id; | ||
| std::chrono::nanoseconds deadline{}; | ||
| bool collect_responses_extra_info{false}; | ||
|
|
||
| public: | ||
| query_handle() = delete; | ||
|
|
||
| query_handle(k2::descriptor _rpc_d, int64_t _query_id, std::chrono::nanoseconds _deadline, bool _collect_responses_extra_info) noexcept | ||
| : rpc_d{_rpc_d}, | ||
| query_id{_query_id}, | ||
| deadline{_deadline}, | ||
| collect_responses_extra_info{_collect_responses_extra_info} {} | ||
|
|
||
| query_handle(query_handle&& other) noexcept | ||
| : rpc_d{std::exchange(other.rpc_d, k2::INVALID_PLATFORM_DESCRIPTOR)}, | ||
| query_id{other.query_id}, | ||
| deadline{other.deadline}, | ||
| collect_responses_extra_info{other.collect_responses_extra_info} {} | ||
|
|
||
| query_handle& operator=(query_handle&& other) noexcept { | ||
| if (this != std::addressof(other)) { | ||
| drop(); | ||
| rpc_d = std::exchange(other.rpc_d, k2::INVALID_PLATFORM_DESCRIPTOR); | ||
| query_id = other.query_id; | ||
| deadline = other.deadline; | ||
| collect_responses_extra_info = other.collect_responses_extra_info; | ||
| } | ||
| return *this; | ||
| } | ||
|
|
||
| query_handle(const query_handle& other) = delete; | ||
|
|
||
| query_handle& operator=(const query_handle& other) = delete; | ||
|
|
||
| ~query_handle() noexcept { | ||
| drop(); | ||
| } | ||
|
|
||
| void drop() noexcept { | ||
| if (rpc_d != k2::INVALID_PLATFORM_DESCRIPTOR) { | ||
| k2::free_descriptor(std::exchange(rpc_d, k2::INVALID_PLATFORM_DESCRIPTOR)); | ||
| } | ||
| } | ||
|
|
||
| kphp::coro::task<std::expected<string, std::pair<int32_t, string>>> get_response() noexcept { | ||
| if (rpc_d == k2::INVALID_PLATFORM_DESCRIPTOR) { | ||
| co_return std::unexpected{std::make_pair(TL_ERROR_INTERNAL, string{"fetching rpc response from empty handle"})}; | ||
| } | ||
|
|
||
| kphp::coro::io_scheduler& m_scheduler{kphp::coro::io_scheduler::get()}; | ||
| std::chrono::nanoseconds timeout{deadline_to_timeout(deadline)}; | ||
|
|
||
| switch (co_await m_scheduler.poll(rpc_d, kphp::coro::poll_op::read, timeout)) { | ||
| case kphp::coro::poll_status::event: | ||
| co_return get_ready_response(); | ||
| case kphp::coro::poll_status::closed: | ||
| case kphp::coro::poll_status::timeout: | ||
| co_return std::unexpected{std::make_pair(TL_ERROR_QUERY_TIMEOUT, string{"rpc response timeout"})}; | ||
| case kphp::coro::poll_status::error: | ||
| co_return std::unexpected{std::make_pair(TL_ERROR_INTERNAL, string{"error fetching rpc response"})}; | ||
| } | ||
| } | ||
|
|
||
| private: | ||
| std::expected<string, std::pair<int32_t, string>> get_ready_response() noexcept; | ||
| }; | ||
|
|
||
| std::expected<query_handle, int32_t> send_and_get_handle(std::string_view actor, bool collect_responses_extra_info, std::chrono::milliseconds timeout, | ||
| double timestamp, int64_t query_id, std::span<const std::byte> request_buffer) noexcept; | ||
|
|
||
| } // namespace kphp::rpc |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: fix grammar
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed