Skip to content

Commit 94f686e

Browse files
authored
BugFix: Fix the problem that the CLIENT_POST_SCHED_RECV_MSG filter point is not executed in case of decoding failure (#151)
1 parent a1b8e6c commit 94f686e

File tree

5 files changed

+24
-7
lines changed

5 files changed

+24
-7
lines changed

trpc/future/future.h

+7-1
Original file line numberDiff line numberDiff line change
@@ -452,7 +452,13 @@ class FutureImplBase {
452452
int64_t timeout_us = timeout * 1000;
453453
auto start = std::chrono::system_clock::now();
454454

455-
while (!state_base_->has_result) {
455+
while (true) {
456+
{
457+
std::lock_guard<std::mutex> lock(state_base_->mtx);
458+
if (state_base_->has_result) {
459+
break;
460+
}
461+
}
456462
std::this_thread::sleep_for(std::chrono::microseconds(100));
457463
auto now = std::chrono::system_clock::now();
458464
auto diff = std::chrono::duration_cast<std::chrono::microseconds>(now - start);

trpc/transport/client/future/common/utils.cc

+8-2
Original file line numberDiff line numberDiff line change
@@ -88,11 +88,17 @@ void DispatchResponse(CTransportReqMsg* req_msg, CTransportRspMsg* rsp_msg,
8888
}
8989

9090
void DispatchException(CTransportReqMsg* req_msg, int ret, std::string&& err_msg,
91-
const TransInfo::RspDispatchFunction& rsp_dispatch_function) {
91+
const TransInfo::RspDispatchFunction& rsp_dispatch_function,
92+
const TransInfo::RunClientFiltersFunction& run_client_filters_function) {
9293
object_pool::LwUniquePtr<MsgTask> task = object_pool::MakeLwUnique<MsgTask>();
9394
task->task_type = runtime::kResponseMsg;
9495
task->param = req_msg;
95-
task->handler = [req_msg, ret, msg = std::move(err_msg)]() mutable {
96+
task->handler = [req_msg, ret, msg = std::move(err_msg), run_client_filters_function]() mutable {
97+
// For rpcz
98+
if (run_client_filters_function != nullptr) {
99+
run_client_filters_function(FilterPoint::CLIENT_POST_SCHED_RECV_MSG, req_msg);
100+
}
101+
96102
auto* backup_promise = req_msg->extend_info->backup_promise;
97103
Exception ex(CommonException(msg.c_str(), ret));
98104
req_msg->extend_info->promise.SetException(ex);

trpc/transport/client/future/common/utils.h

+3-1
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,10 @@ void DispatchResponse(CTransportReqMsg* req_msg, CTransportRspMsg* rsp_msg,
4343
/// @param ret Error code.
4444
/// @param err_msg Error message.
4545
/// @param rsp_dispatch_function Function to decide which handle thread to dispatch to.
46+
/// @param run_client_filters_function Function to run client filters.
4647
void DispatchException(CTransportReqMsg* req_msg, int ret, std::string&& err_msg,
47-
const TransInfo::RspDispatchFunction& rsp_dispatch_function);
48+
const TransInfo::RspDispatchFunction& rsp_dispatch_function,
49+
const TransInfo::RunClientFiltersFunction& run_client_filters_function = nullptr);
4850

4951
/// @brief To trigger backup request withdraw.
5052
/// @param promise Related Promise of backup request.

trpc/transport/client/future/conn_pool/future_conn_pool_connection_handler.cc

+4-2
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ bool FutureTcpConnPoolConnectionHandler::HandleMessage(const ConnectionPtr& conn
5858
std::string error = "decode failed peer addr = ";
5959
error += options_.group_options->peer_addr.ToString();
6060
future::DispatchException(msg, TrpcRetCode::TRPC_CLIENT_DECODE_ERR, std::move(error),
61-
options_.group_options->trans_info->rsp_dispatch_function);
61+
options_.group_options->trans_info->rsp_dispatch_function,
62+
options_.group_options->trans_info->run_client_filters_function);
6263
// Decode error, let caller close connection.
6364
return false;
6465
}
@@ -158,7 +159,8 @@ bool FutureUdpIoPoolConnectionHandler::HandleMessage(const ConnectionPtr& conn,
158159
std::string error = "decode failed peer addr = ";
159160
error += options_.group_options->peer_addr.ToString();
160161
future::DispatchException(msg, TrpcRetCode::TRPC_CLIENT_DECODE_ERR, std::move(error),
161-
options_.group_options->trans_info->rsp_dispatch_function);
162+
options_.group_options->trans_info->rsp_dispatch_function,
163+
options_.group_options->trans_info->run_client_filters_function);
162164
}
163165

164166
// When there is no pending request to send, just recycle connection.

trpc/transport/client/future/pipeline/future_tcp_pipeline_connector.cc

+2-1
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,8 @@ bool FuturePipelineConnectionHandler::HandleMessage(const ConnectionPtr& conn,
7474
std::string error = "decode failed peer addr= ";
7575
error += options_.group_options->peer_addr.ToString();
7676
future::DispatchException(msg, TrpcRetCode::TRPC_CLIENT_DECODE_ERR, std::move(error),
77-
options_.group_options->trans_info->rsp_dispatch_function);
77+
options_.group_options->trans_info->rsp_dispatch_function,
78+
options_.group_options->trans_info->run_client_filters_function);
7879
return false;
7980
}
8081

0 commit comments

Comments
 (0)