diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc index 1d9d8a53f7bbc..e1beb974dd443 100644 --- a/src/core/lib/iomgr/tcp_server_posix.cc +++ b/src/core/lib/iomgr/tcp_server_posix.cc @@ -107,12 +107,15 @@ static grpc_error_handle CreateEventEngineListener( acceptor->from_server = s; acceptor->port_index = -1; acceptor->fd_index = -1; + std::size_t cq_idx=0; if (!is_external) { auto it = s->listen_fd_to_index_map.find(listener_fd); if (it != s->listen_fd_to_index_map.end()) { acceptor->port_index = std::get<0>(it->second); acceptor->fd_index = std::get<1>(it->second); } + cq_idx=static_cast(gpr_atm_no_barrier_fetch_add( + &s->next_pollset_to_assign_ids[""], 1)) % s->pollsets->size(); } else { // External connection handling. grpc_resolved_address addr; @@ -131,6 +134,18 @@ static grpc_error_handle CreateEventEngineListener( } (void)grpc_set_socket_no_sigpipe_if_possible(fd); auto addr_uri = grpc_sockaddr_to_uri(&addr); + + std::string addr_str = addr_uri.value(); + std::size_t start = addr_str.find_first_of(":") + 1; + std::size_t end = addr_str.find(":", start); + std::string ip = addr_str.substr(start, end - start); + + cq_idx = static_cast(rand()) % s->pollsets->size(); + if (!gpr_atm_no_barrier_cas(&s->next_pollset_to_assign_ids[ip],0,cq_idx)){ + cq_idx=static_cast(gpr_atm_no_barrier_fetch_add( + &s->next_pollset_to_assign_ids[ip], 1)) % s->pollsets->size(); + } + if (!addr_uri.ok()) { gpr_log(GPR_ERROR, "Invalid address: %s", addr_uri.status().ToString().c_str()); @@ -142,10 +157,7 @@ static grpc_error_handle CreateEventEngineListener( addr_uri->c_str()); } } - grpc_pollset* read_notifier_pollset = - (*(s->pollsets))[static_cast(gpr_atm_no_barrier_fetch_add( - &s->next_pollset_to_assign, 1)) % - s->pollsets->size()]; + grpc_pollset* read_notifier_pollset = (*(s->pollsets))[cq_idx]; acceptor->external_connection = is_external; acceptor->listener_fd = listener_fd; grpc_byte_buffer* buf = nullptr; @@ -253,7 +265,7 @@ static grpc_error_handle tcp_server_create(grpc_closure* shutdown_complete, GPR_ASSERT(s->on_accept_cb); s->memory_quota = s->options.resource_quota->memory_quota(); s->pre_allocated_fd = -1; - gpr_atm_no_barrier_store(&s->next_pollset_to_assign, 0); + gpr_atm_no_barrier_store(&s->next_pollset_to_assign_ids[""], 0); s->n_bind_ports = 0; new (&s->listen_fd_to_index_map) absl::flat_hash_map>(); @@ -445,14 +457,23 @@ static void on_read(void* arg, grpc_error_handle err) { addr_uri->c_str()); } - std::string name = absl::StrCat("tcp-server-connection:", addr_uri.value()); + // addr_str format: ipv4/ipv6:ipv6:port + std::string addr_str = addr_uri.value(); + std::size_t start = addr_str.find_first_of(":") + 1; + std::size_t end = addr_str.find(":", start); + std::string ip = addr_str.substr(start, end - start); + + std::string name = absl::StrCat("tcp-server-connection:", addr_str); grpc_fd* fdobj = grpc_fd_create(fd, name.c_str(), true); - read_notifier_pollset = (*(sp->server->pollsets)) - [static_cast(gpr_atm_no_barrier_fetch_add( - &sp->server->next_pollset_to_assign, 1)) % - sp->server->pollsets->size()]; + std::size_t cq_idx = static_cast(rand()) % sp->server->pollsets->size(); + if (!gpr_atm_no_barrier_cas(&sp->server->next_pollset_to_assign_ids[ip],0,cq_idx)){ + cq_idx = static_cast(gpr_atm_no_barrier_fetch_add( + &sp->server->next_pollset_to_assign_ids[ip], 1)) % + sp->server->pollsets->size(); + } + read_notifier_pollset = (*(sp->server->pollsets))[cq_idx]; grpc_pollset_add_fd(read_notifier_pollset, fdobj); // Create acceptor. @@ -902,12 +923,22 @@ class ExternalConnectionHandler : public grpc_core::TcpServerFdHandler { gpr_log(GPR_INFO, "SERVER_CONNECT: incoming external connection: %s", addr_uri->c_str()); } - std::string name = absl::StrCat("tcp-server-connection:", addr_uri.value()); + + // addr_str format: ipv4/ipv6:ipv6:port + std::string addr_str = addr_uri.value(); + std::size_t start = addr_str.find_first_of(":") + 1; + std::size_t end = addr_str.find(":", start); + std::string ip = addr_str.substr(start, end - start); + + std::string name = absl::StrCat("tcp-server-connection:", addr_str); grpc_fd* fdobj = grpc_fd_create(fd, name.c_str(), true); - read_notifier_pollset = - (*(s_->pollsets))[static_cast(gpr_atm_no_barrier_fetch_add( - &s_->next_pollset_to_assign, 1)) % - s_->pollsets->size()]; + + std::size_t cq_idx = static_cast(rand()) % s_->pollsets->size(); + if (!gpr_atm_no_barrier_cas(&s_->next_pollset_to_assign_ids[ip],0,cq_idx)){ + cq_idx=static_cast(gpr_atm_no_barrier_fetch_add( + &s_->next_pollset_to_assign_ids[ip], 1)) % s_->pollsets->size(); + } + read_notifier_pollset =(*(s_->pollsets))[cq_idx]; grpc_pollset_add_fd(read_notifier_pollset, fdobj); grpc_tcp_server_acceptor* acceptor = static_cast(gpr_malloc(sizeof(*acceptor))); diff --git a/src/core/lib/iomgr/tcp_server_utils_posix.h b/src/core/lib/iomgr/tcp_server_utils_posix.h index afe6833e60d94..088f2614ada33 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix.h +++ b/src/core/lib/iomgr/tcp_server_utils_posix.h @@ -32,6 +32,7 @@ #include "src/core/lib/iomgr/tcp_server.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/resource_quota/memory_quota.h" +#include // one listening port typedef struct grpc_tcp_listener { @@ -98,8 +99,8 @@ struct grpc_tcp_server { // owned by this struct const std::vector* pollsets = nullptr; - // next pollset to assign a channel to - gpr_atm next_pollset_to_assign = 0; + // next pollset to assign a channel to, it is a map from pollset name to ip address. + std::map next_pollset_to_assign_ids; // Contains config extracted from channel args for this server grpc_core::PosixTcpOptions options;