Skip to content

Commit 2c7a492

Browse files
*: Round-robin choose channel by source ip
Derived from: tikv#53 Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>
1 parent 38a9cd9 commit 2c7a492

2 files changed

Lines changed: 32 additions & 20 deletions

File tree

src/core/lib/iomgr/tcp_server_posix.cc

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ static grpc_error_handle tcp_server_create(grpc_closure* shutdown_complete,
104104
s->fd_handler = nullptr;
105105
s->memory_quota =
106106
grpc_core::ResourceQuotaFromChannelArgs(args)->memory_quota();
107-
gpr_atm_no_barrier_store(&s->next_pollset_to_assign, 0);
107+
gpr_atm_no_barrier_store(&s->next_pollset_to_assign_ids[""], 0);
108108
*server = s;
109109
return GRPC_ERROR_NONE;
110110
}
@@ -253,15 +253,8 @@ static void on_read(void* arg, grpc_error_handle err) {
253253
addr_str.c_str());
254254
}
255255

256-
std::string name = absl::StrCat("tcp-server-connection:", addr_str);
257-
grpc_fd* fdobj = grpc_fd_create(fd, name.c_str(), true);
258-
259-
read_notifier_pollset = (*(sp->server->pollsets))
260-
[static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
261-
&sp->server->next_pollset_to_assign, 1)) %
262-
sp->server->pollsets->size()];
263-
264-
grpc_pollset_add_fd(read_notifier_pollset, fdobj);
256+
// Create and bind fd randomly with the given addr.
257+
grpc_fd* fdobj = randomly_bind_tcp_server(fd, addr_str, sp);
265258

266259
// Create acceptor.
267260
grpc_tcp_server_acceptor* acceptor =
@@ -276,7 +269,7 @@ static void on_read(void* arg, grpc_error_handle err) {
276269
read_notifier_pollset, acceptor);
277270
}
278271

279-
GPR_UNREACHABLE_CODE(return );
272+
GPR_UNREACHABLE_CODE(return);
280273

281274
error:
282275
gpr_mu_lock(&sp->server->mu);
@@ -600,13 +593,8 @@ class ExternalConnectionHandler : public grpc_core::TcpServerFdHandler {
600593
gpr_log(GPR_INFO, "SERVER_CONNECT: incoming external connection: %s",
601594
addr_str.c_str());
602595
}
603-
std::string name = absl::StrCat("tcp-server-connection:", addr_str);
604-
grpc_fd* fdobj = grpc_fd_create(fd, name.c_str(), true);
605-
read_notifier_pollset =
606-
(*(s_->pollsets))[static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
607-
&s_->next_pollset_to_assign, 1)) %
608-
s_->pollsets->size()];
609-
grpc_pollset_add_fd(read_notifier_pollset, fdobj);
596+
// Create and bind fd randomly with the given addr.
597+
grpc_fd* fdobj = randomly_bind_tcp_server(fd, addr_str, s_);
610598
grpc_tcp_server_acceptor* acceptor =
611599
static_cast<grpc_tcp_server_acceptor*>(gpr_malloc(sizeof(*acceptor)));
612600
acceptor->from_server = s_;
@@ -631,6 +619,27 @@ static grpc_core::TcpServerFdHandler* tcp_server_create_fd_handler(
631619
return s->fd_handler;
632620
}
633621

622+
grpc_fd* randomly_bind_tcp_server(int fd, const std::string& addr_str,
623+
grpc_tcp_server* s) {
624+
// addr_str format: ipv4/ipv6:ipv6:port
625+
std::size_t start = addr_str.find_first_of(":") + 1;
626+
std::size_t end = addr_str.find(":", start);
627+
std::string ip = addr_str.substr(start, end - start);
628+
629+
std::string name = absl::StrCat("tcp-server-connection:", addr_str);
630+
grpc_fd* fdobj = grpc_fd_create(fd, name.c_str(), true);
631+
632+
// Randomly choose one channel idx for this fd.
633+
std::size_t cq_idx = static_cast<size_t>(rand()) % s->pollsets->size();
634+
if (!gpr_atm_no_barrier_cas(&s->next_pollset_to_assign_ids[ip], 0, cq_idx)) {
635+
cq_idx = static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
636+
&s->next_pollset_to_assign_ids[ip], 1)) %
637+
s->pollsets->size();
638+
}
639+
grpc_pollset_add_fd((*(s->pollsets))[cq_idx], fdobj);
640+
return fdobj;
641+
}
642+
634643
grpc_tcp_server_vtable grpc_posix_tcp_server_vtable = {
635644
tcp_server_create, tcp_server_start,
636645
tcp_server_add_port, tcp_server_create_fd_handler,

src/core/lib/iomgr/tcp_server_utils_posix.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
#include <grpc/support/port_platform.h>
2323

24+
#include <unordered_map>
25+
2426
#include "src/core/lib/iomgr/ev_posix.h"
2527
#include "src/core/lib/iomgr/resolve_address.h"
2628
#include "src/core/lib/iomgr/socket_utils_posix.h"
@@ -87,8 +89,9 @@ struct grpc_tcp_server {
8789
* owned by this struct */
8890
const std::vector<grpc_pollset*>* pollsets = nullptr;
8991

90-
/* next pollset to assign a channel to */
91-
gpr_atm next_pollset_to_assign = 0;
92+
/* next pollset to assign a channel to, it is a map from pollset name to ip
93+
* address */
94+
std::unordered_map<std::string, gpr_atm> next_pollset_to_assign_ids;
9295

9396
/* channel args for this server */
9497
grpc_channel_args* channel_args = nullptr;

0 commit comments

Comments
 (0)