Skip to content

Commit 4650a38

Browse files
chhy2009huiyongchen
authored and
huiyongchen
committed
Feat: Support circuit breaking for faulty nodes in direct and domain selector modes
1 parent 1bcfa53 commit 4650a38

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+2412
-370
lines changed

trpc/common/config/BUILD

-28
Original file line numberDiff line numberDiff line change
@@ -288,34 +288,6 @@ cc_test(
288288
],
289289
)
290290

291-
cc_library(
292-
name = "domain_naming_conf",
293-
srcs = ["domain_naming_conf.cc"],
294-
hdrs = ["domain_naming_conf.h"],
295-
deps = [
296-
"//trpc/util/log:logging",
297-
"@com_github_jbeder_yaml_cpp//:yaml-cpp",
298-
],
299-
)
300-
301-
cc_library(
302-
name = "domain_naming_conf_parser",
303-
hdrs = ["domain_naming_conf_parser.h"],
304-
deps = [
305-
":domain_naming_conf",
306-
],
307-
)
308-
309-
cc_test(
310-
name = "domain_naming_conf_test",
311-
srcs = ["domain_naming_conf_test.cc"],
312-
deps = [
313-
":domain_naming_conf",
314-
":domain_naming_conf_parser",
315-
"@com_google_googletest//:gtest_main",
316-
],
317-
)
318-
319291
cc_library(
320292
name = "loadbalance_naming_conf",
321293
srcs = ["loadbalance_naming_conf.cc"],

trpc/naming/BUILD

+2
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,8 @@ cc_library(
173173
":selector_factory",
174174
"//trpc/filter:filter_manager",
175175
"//trpc/naming:load_balance_factory",
176+
"//trpc/naming/common/util/circuit_break:circuit_breaker_creator_factory",
177+
"//trpc/naming/common/util/circuit_break:default_circuit_breaker",
176178
"//trpc/naming/direct:direct_selector_filter",
177179
"//trpc/naming/direct:selector_direct",
178180
"//trpc/naming/domain:domain_selector_filter",

trpc/naming/common/BUILD

-6
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,6 @@ package(default_visibility = ["//visibility:public"])
77
cc_library(
88
name = "common_defs",
99
hdrs = ["common_defs.h"],
10-
visibility = [
11-
"//visibility:public",
12-
],
1310
deps = [
1411
":common_inc_deprecated",
1512
"//trpc/client:client_context",
@@ -34,8 +31,5 @@ cc_library(
3431
cc_library(
3532
name = "constants",
3633
hdrs = ["constants.h"],
37-
visibility = [
38-
"//visibility:public",
39-
],
4034
deps = [],
4135
)
+121
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
licenses(["notice"])
2+
3+
package(default_visibility = ["//visibility:public"])
4+
5+
cc_library(
6+
name = "bucket_circular_array",
7+
srcs = [
8+
"bucket_circular_array.cc",
9+
],
10+
hdrs = [
11+
"bucket_circular_array.h",
12+
],
13+
deps = [
14+
"//trpc/util/log:logging",
15+
],
16+
)
17+
18+
cc_library(
19+
name = "circuit_break_whitelist",
20+
srcs = [
21+
"circuit_break_whitelist.cc",
22+
],
23+
hdrs = [
24+
"circuit_break_whitelist.h",
25+
],
26+
deps = [
27+
"//trpc/codec/trpc",
28+
],
29+
)
30+
31+
cc_test(
32+
name = "circuit_break_whitelist_test",
33+
srcs = [
34+
"circuit_break_whitelist_test.cc",
35+
],
36+
deps = [
37+
":circuit_break_whitelist",
38+
"@com_google_googletest//:gtest_main",
39+
],
40+
)
41+
42+
cc_library(
43+
name = "circuit_breaker_config",
44+
hdrs = [
45+
"circuit_breaker_config.h",
46+
],
47+
deps = [
48+
"//trpc/util/log:logging",
49+
"@com_github_jbeder_yaml_cpp//:yaml-cpp",
50+
],
51+
)
52+
53+
cc_library(
54+
name = "default_circuit_breaker_config",
55+
hdrs = [
56+
"default_circuit_breaker_config.h",
57+
],
58+
deps = [
59+
"@com_github_jbeder_yaml_cpp//:yaml-cpp",
60+
],
61+
)
62+
63+
cc_library(
64+
name = "default_circuit_breaker",
65+
srcs = [
66+
"default_circuit_breaker.cc",
67+
],
68+
hdrs = [
69+
"default_circuit_breaker.h",
70+
],
71+
deps = [
72+
":bucket_circular_array",
73+
":circuit_breaker",
74+
":default_circuit_breaker_config",
75+
"//trpc/util/log:logging",
76+
],
77+
)
78+
79+
cc_library(
80+
name = "circuit_breaker",
81+
srcs = [],
82+
hdrs = [
83+
"circuit_breaker.h",
84+
],
85+
deps = [
86+
"//trpc/naming/common:common_defs",
87+
],
88+
)
89+
90+
cc_library(
91+
name = "circuit_breaker_creator_factory",
92+
srcs = [],
93+
hdrs = [
94+
"circuit_breaker_creator_factory.h",
95+
],
96+
deps = [
97+
":circuit_breaker",
98+
"@com_github_jbeder_yaml_cpp//:yaml-cpp",
99+
],
100+
)
101+
102+
cc_test(
103+
name = "default_circuit_breaker_test",
104+
srcs = ["default_circuit_breaker_test.cc"],
105+
deps = [
106+
":default_circuit_breaker",
107+
"//trpc/util:time",
108+
"@com_google_googletest//:gtest",
109+
"@com_google_googletest//:gtest_main",
110+
],
111+
)
112+
113+
cc_test(
114+
name = "default_circuit_beaker_config_test",
115+
srcs = ["default_circuit_beaker_config_test.cc"],
116+
deps = [
117+
":default_circuit_breaker_config",
118+
"@com_google_googletest//:gtest",
119+
"@com_google_googletest//:gtest_main",
120+
],
121+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
//
2+
//
3+
// Tencent is pleased to support the open source community by making tRPC available.
4+
//
5+
// Copyright (C) 2023 THL A29 Limited, a Tencent company.
6+
// All rights reserved.
7+
//
8+
// If you have downloaded a copy of the tRPC source code from Tencent,
9+
// please note that tRPC source code is licensed under the Apache 2.0 License,
10+
// A copy of the Apache 2.0 License is included in this file.
11+
//
12+
//
13+
14+
#include "trpc/naming/common/util/circuit_break/bucket_circular_array.h"
15+
16+
#include "trpc/util/log/logging.h"
17+
18+
namespace trpc::naming {
19+
20+
BucketCircularArray::BucketCircularArray(uint32_t stat_window_ms, uint32_t buckets_num)
21+
: buckets_num_(buckets_num), stat_window_ms_per_bucket_(stat_window_ms / buckets_num), buckets_(buckets_num) {
22+
for (auto& bucket : buckets_) {
23+
bucket.bucket_time.store(0, std::memory_order_relaxed);
24+
bucket.total_count.store(0, std::memory_order_relaxed);
25+
bucket.error_count.store(0, std::memory_order_relaxed);
26+
}
27+
}
28+
29+
void BucketCircularArray::AddMetrics(uint64_t current_ms, bool success) {
30+
uint64_t bucket_time = current_ms / stat_window_ms_per_bucket_;
31+
int bucket_index = bucket_time % buckets_num_;
32+
auto& bucket = buckets_[bucket_index];
33+
// If it is data from the previous round, reset the data for that window.
34+
uint64_t store_bucket_time = bucket.bucket_time;
35+
if (bucket_time != store_bucket_time) {
36+
if (bucket.bucket_time.compare_exchange_weak(store_bucket_time, bucket_time, std::memory_order_relaxed)) {
37+
bucket.total_count.store(0, std::memory_order_relaxed);
38+
bucket.error_count.store(0, std::memory_order_relaxed);
39+
}
40+
}
41+
42+
bucket.total_count.fetch_add(1, std::memory_order_relaxed);
43+
if (!success) {
44+
bucket.error_count.fetch_add(1, std::memory_order_relaxed);
45+
}
46+
}
47+
48+
void BucketCircularArray::ClearMetrics() {
49+
// Since the time of data is checked when adding metrics, here we only need to reset the bucket_time.
50+
for (auto& bucket : buckets_) {
51+
bucket.bucket_time = 0;
52+
}
53+
}
54+
55+
float BucketCircularArray::GetErrorRate(uint64_t current_ms, uint32_t request_volume_threshold) {
56+
uint64_t bucket_time = current_ms / stat_window_ms_per_bucket_;
57+
uint64_t error_count = 0;
58+
uint64_t total_count = 0;
59+
for (auto& bucket : buckets_) {
60+
// Only collect data from the most recent round.
61+
if (bucket.bucket_time.load(std::memory_order_relaxed) > (bucket_time - buckets_num_)) {
62+
total_count += bucket.total_count.load(std::memory_order_relaxed);
63+
error_count += bucket.error_count.load(std::memory_order_relaxed);
64+
}
65+
}
66+
67+
if (total_count >= request_volume_threshold) {
68+
return static_cast<float>(error_count) / total_count;
69+
}
70+
71+
// If the minimum number of requests is not reached, return a failure rate of 0.
72+
return 0;
73+
}
74+
75+
} // namespace trpc::naming
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
2+
//
3+
//
4+
// Tencent is pleased to support the open source community by making tRPC available.
5+
//
6+
// Copyright (C) 2023 THL A29 Limited, a Tencent company.
7+
// All rights reserved.
8+
//
9+
// If you have downloaded a copy of the tRPC source code from Tencent,
10+
// please note that tRPC source code is licensed under the Apache 2.0 License,
11+
// A copy of the Apache 2.0 License is included in this file.
12+
//
13+
//
14+
15+
#pragma once
16+
17+
#include <atomic>
18+
#include <cstdint>
19+
#include <vector>
20+
21+
namespace trpc::naming {
22+
23+
/// @brief A thread-safe class for tracking invocation statistics using a sliding window implementation.
24+
class BucketCircularArray {
25+
public:
26+
/// @brief Construct a bucket circular array
27+
/// @note It is necessary to ensure that stat_window_ms is divisible by buckets_num.
28+
BucketCircularArray(uint32_t stat_window_ms, uint32_t buckets_num);
29+
30+
/// @brief Add statistical data
31+
void AddMetrics(uint64_t current_ms, bool success);
32+
33+
/// @brief Clear statistical data
34+
void ClearMetrics();
35+
36+
/// @brief Retrieve the failure rate within the statistical time period
37+
float GetErrorRate(uint64_t current_ms, uint32_t request_volume_threshold);
38+
39+
private:
40+
struct Metrics {
41+
std::atomic<uint64_t> bucket_time{0}; // The start time of the bucket
42+
std::atomic<uint32_t> total_count{0}; // The request count during current time period
43+
std::atomic<uint32_t> error_count{0}; // The error count during current time period
44+
};
45+
46+
uint32_t buckets_num_;
47+
uint32_t stat_window_ms_per_bucket_;
48+
std::vector<Metrics> buckets_;
49+
};
50+
51+
} // namespace trpc::naming
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
//
2+
//
3+
// Tencent is pleased to support the open source community by making tRPC available.
4+
//
5+
// Copyright (C) 2023 THL A29 Limited, a Tencent company.
6+
// All rights reserved.
7+
//
8+
// If you have downloaded a copy of the tRPC source code from Tencent,
9+
// please note that tRPC source code is licensed under the Apache 2.0 License,
10+
// A copy of the Apache 2.0 License is included in this file.
11+
//
12+
//
13+
14+
#include "trpc/naming/common/util/circuit_break/circuit_break_whitelist.h"
15+
16+
#include "trpc/codec/trpc/trpc.pb.h"
17+
18+
namespace trpc::naming {
19+
20+
CircuitBreakWhiteList::CircuitBreakWhiteList() {
21+
// Add default error code to whitelist
22+
circuitbreak_whitelist_.Writer().insert(TrpcRetCode::TRPC_SERVER_OVERLOAD_ERR);
23+
circuitbreak_whitelist_.Writer().insert(TrpcRetCode::TRPC_SERVER_LIMITED_ERR);
24+
circuitbreak_whitelist_.Swap();
25+
}
26+
27+
void CircuitBreakWhiteList::SetCircuitBreakWhiteList(const std::vector<int>& retcodes) {
28+
auto& writer = circuitbreak_whitelist_.Writer();
29+
writer.clear();
30+
writer.insert(retcodes.begin(), retcodes.end());
31+
circuitbreak_whitelist_.Swap();
32+
}
33+
34+
bool CircuitBreakWhiteList::Contains(int retcode) {
35+
auto& reader = circuitbreak_whitelist_.Reader();
36+
if (reader.find(retcode) != reader.end()) {
37+
return true;
38+
}
39+
40+
return false;
41+
}
42+
43+
} // namespace trpc::naming

0 commit comments

Comments
 (0)