Skip to content

Commit 6a058f7

Browse files
Changed design of CircuitBreakerInterceptor
1 parent faa0e02 commit 6a058f7

11 files changed

+251
-235
lines changed

grpc-circuitbreaker-utils/build.gradle.kts

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ dependencies {
99

1010
api(platform("io.grpc:grpc-bom:1.68.3"))
1111
api("io.grpc:grpc-api")
12+
api(project(":grpc-context-utils"))
1213

1314
implementation("org.slf4j:slf4j-api:1.7.36")
1415
implementation("io.github.resilience4j:resilience4j-circuitbreaker:1.7.1")

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfigProvider.java grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfigParser.java

+15-37
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,10 @@
66
import lombok.extern.slf4j.Slf4j;
77

88
@Slf4j
9-
public class CircuitBreakerConfigProvider {
9+
public class CircuitBreakerConfigParser {
1010

1111
public static final String DEFAULT_CONFIG_KEY = "default";
1212

13-
// Whether to enable circuit breaker or not.
14-
private static final String ENABLED = "enabled";
15-
1613
// Percentage of failures to trigger OPEN state
1714
private static final String FAILURE_RATE_THRESHOLD = "failureRateThreshold";
1815
// Percentage of slow calls to trigger OPEN state
@@ -31,42 +28,23 @@ public class CircuitBreakerConfigProvider {
3128
"permittedNumberOfCallsInHalfOpenState";
3229
private static final String SLIDING_WINDOW_TYPE = "slidingWindowType";
3330

34-
// Global flag for circuit breaker enablement
35-
private boolean circuitBreakerEnabled = false;
36-
private Map<String, CircuitBreakerConfiguration> circuitBreakerConfigurationMap;
37-
38-
public CircuitBreakerConfigProvider(Config config) {
39-
this.initialize(config);
40-
}
41-
42-
/** Checks if Circuit Breaker is globally enabled. */
43-
public boolean isCircuitBreakerEnabled() {
44-
return circuitBreakerEnabled;
45-
}
46-
47-
private void initialize(Config circuitBreakerConfig) {
48-
circuitBreakerEnabled =
49-
circuitBreakerConfig.hasPath(ENABLED) && circuitBreakerConfig.getBoolean(ENABLED);
50-
this.circuitBreakerConfigurationMap =
51-
circuitBreakerConfig.root().keySet().stream()
52-
.filter(key -> !key.equals(ENABLED)) // Ignore the global enabled flag
31+
public static <T> CircuitBreakerConfiguration.CircuitBreakerConfigurationBuilder<T> parseConfig(
32+
Config config) {
33+
CircuitBreakerConfiguration.CircuitBreakerConfigurationBuilder<T> builder =
34+
CircuitBreakerConfiguration.builder();
35+
Map<String, CircuitBreakerThresholds> circuitBreakerThresholdsMap =
36+
config.root().keySet().stream()
5337
.collect(
5438
Collectors.toMap(
5539
key -> key, // Circuit breaker key
56-
key -> createCircuitBreakerConfig(circuitBreakerConfig.getConfig(key))));
57-
log.info(
58-
"Loaded {} circuit breaker configurations, Global Enabled: {}. Configs: {}",
59-
circuitBreakerConfigurationMap.size(),
60-
circuitBreakerEnabled,
61-
circuitBreakerConfigurationMap);
62-
}
63-
64-
public Map<String, CircuitBreakerConfiguration> getConfigMap() {
65-
return circuitBreakerConfigurationMap;
40+
key -> buildCircuitBreakerThresholds(config.getConfig(key))));
41+
builder.circuitBreakerThresholdsMap(circuitBreakerThresholdsMap);
42+
log.info("Loaded circuit breaker configs: {}", circuitBreakerThresholdsMap);
43+
return builder;
6644
}
6745

68-
private CircuitBreakerConfiguration createCircuitBreakerConfig(Config config) {
69-
return CircuitBreakerConfiguration.builder()
46+
private static CircuitBreakerThresholds buildCircuitBreakerThresholds(Config config) {
47+
return CircuitBreakerThresholds.builder()
7048
.failureRateThreshold((float) config.getDouble(FAILURE_RATE_THRESHOLD))
7149
.slowCallRateThreshold((float) config.getDouble(SLOW_CALL_RATE_THRESHOLD))
7250
.slowCallDurationThreshold(config.getDuration(SLOW_CALL_DURATION_THRESHOLD))
@@ -79,8 +57,8 @@ private CircuitBreakerConfiguration createCircuitBreakerConfig(Config config) {
7957
.build();
8058
}
8159

82-
private CircuitBreakerConfiguration.SlidingWindowType getSlidingWindowType(
60+
private static CircuitBreakerThresholds.SlidingWindowType getSlidingWindowType(
8361
String slidingWindowType) {
84-
return CircuitBreakerConfiguration.SlidingWindowType.valueOf(slidingWindowType);
62+
return CircuitBreakerThresholds.SlidingWindowType.valueOf(slidingWindowType);
8563
}
8664
}
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,16 @@
11
package org.hypertrace.circuitbreaker.grpcutils;
22

3-
import java.time.Duration;
3+
import java.util.Map;
4+
import java.util.function.BiFunction;
45
import lombok.Builder;
5-
import lombok.Setter;
66
import lombok.Value;
7+
import org.hypertrace.core.grpcutils.context.RequestContext;
78

89
@Value
910
@Builder
10-
@Setter
11-
public class CircuitBreakerConfiguration {
12-
// Percentage of failures to trigger OPEN state
13-
float failureRateThreshold;
14-
// Percentage of slow calls to trigger OPEN state
15-
float slowCallRateThreshold;
16-
// Define what a "slow" call is
17-
Duration slowCallDurationThreshold;
18-
// Number of calls to consider in the sliding window
19-
SlidingWindowType slidingWindowType;
20-
int slidingWindowSize;
21-
// Time before retrying after OPEN state
22-
Duration waitDurationInOpenState;
23-
// Minimum calls before evaluating failure rate
24-
int minimumNumberOfCalls;
25-
// Calls allowed in HALF_OPEN state before deciding to
26-
// CLOSE or OPEN again
27-
int permittedNumberOfCallsInHalfOpenState;
28-
29-
public enum SlidingWindowType {
30-
COUNT_BASED,
31-
TIME_BASED
32-
}
11+
public class CircuitBreakerConfiguration<T> {
12+
Class<T> requestClass;
13+
BiFunction<RequestContext, T, String> keyFunction;
14+
boolean enabled;
15+
Map<String, CircuitBreakerThresholds> circuitBreakerThresholdsMap;
3316
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package org.hypertrace.circuitbreaker.grpcutils;
2+
3+
import java.time.Duration;
4+
import lombok.Builder;
5+
import lombok.Value;
6+
7+
@Value
8+
@Builder
9+
public class CircuitBreakerThresholds {
10+
// Percentage of failures to trigger OPEN state
11+
float failureRateThreshold;
12+
// Percentage of slow calls to trigger OPEN state
13+
float slowCallRateThreshold;
14+
// Define what a "slow" call is
15+
Duration slowCallDurationThreshold;
16+
// Number of calls to consider in the sliding window
17+
SlidingWindowType slidingWindowType;
18+
int slidingWindowSize;
19+
// Time before retrying after OPEN state
20+
Duration waitDurationInOpenState;
21+
// Minimum calls before evaluating failure rate
22+
int minimumNumberOfCalls;
23+
// Calls allowed in HALF_OPEN state before deciding to
24+
// CLOSE or OPEN again
25+
int permittedNumberOfCallsInHalfOpenState;
26+
27+
public enum SlidingWindowType {
28+
COUNT_BASED,
29+
TIME_BASED
30+
}
31+
}

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerConfigParser.java

+9-6
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,23 @@
33
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
44
import java.util.Map;
55
import java.util.stream.Collectors;
6-
import org.hypertrace.circuitbreaker.grpcutils.CircuitBreakerConfiguration;
6+
import org.hypertrace.circuitbreaker.grpcutils.CircuitBreakerThresholds;
77

88
/** Utility class to parse CircuitBreakerConfiguration to Resilience4j CircuitBreakerConfig */
99
public class ResilienceCircuitBreakerConfigParser {
1010

1111
public static Map<String, CircuitBreakerConfig> getCircuitBreakerConfigs(
12-
Map<String, CircuitBreakerConfiguration> configurationMap) {
12+
Map<String, CircuitBreakerThresholds> configurationMap) {
1313
return configurationMap.entrySet().stream()
1414
.collect(Collectors.toMap(Map.Entry::getKey, entry -> getConfig(entry.getValue())));
1515
}
1616

17-
static CircuitBreakerConfig getConfig(CircuitBreakerConfiguration configuration) {
17+
static CircuitBreakerConfig getConfig(CircuitBreakerThresholds configuration) {
1818
return CircuitBreakerConfig.custom()
1919
.failureRateThreshold(configuration.getFailureRateThreshold())
2020
.slowCallRateThreshold(configuration.getSlowCallRateThreshold())
2121
.slowCallDurationThreshold(configuration.getSlowCallDurationThreshold())
22-
.slidingWindowType(getSlidingWindowType(configuration))
22+
.slidingWindowType(getSlidingWindowType(configuration.getSlidingWindowType()))
2323
.slidingWindowSize(configuration.getSlidingWindowSize())
2424
.waitDurationInOpenState(configuration.getWaitDurationInOpenState())
2525
.permittedNumberOfCallsInHalfOpenState(
@@ -29,8 +29,11 @@ static CircuitBreakerConfig getConfig(CircuitBreakerConfiguration configuration)
2929
}
3030

3131
private static CircuitBreakerConfig.SlidingWindowType getSlidingWindowType(
32-
CircuitBreakerConfiguration configuration) {
33-
switch (configuration.getSlidingWindowType()) {
32+
CircuitBreakerThresholds.SlidingWindowType slidingWindowType) {
33+
if (slidingWindowType == null) {
34+
return CircuitBreakerConfig.SlidingWindowType.TIME_BASED;
35+
}
36+
switch (slidingWindowType) {
3437
case COUNT_BASED:
3538
return CircuitBreakerConfig.SlidingWindowType.COUNT_BASED;
3639
case TIME_BASED:

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java

+35-25
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package org.hypertrace.circuitbreaker.grpcutils.resilience;
22

33
import com.google.common.annotations.VisibleForTesting;
4-
import com.google.inject.Singleton;
5-
import com.typesafe.config.Config;
64
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
75
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
86
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
@@ -20,64 +18,62 @@
2018
import java.util.Map;
2119
import java.util.concurrent.TimeUnit;
2220
import lombok.extern.slf4j.Slf4j;
23-
import org.hypertrace.circuitbreaker.grpcutils.CircuitBreakerConfigProvider;
21+
import org.hypertrace.circuitbreaker.grpcutils.CircuitBreakerConfiguration;
2422
import org.hypertrace.circuitbreaker.grpcutils.CircuitBreakerInterceptor;
23+
import org.hypertrace.core.grpcutils.context.RequestContext;
2524

2625
@Slf4j
27-
@Singleton
2826
public class ResilienceCircuitBreakerInterceptor extends CircuitBreakerInterceptor {
2927

30-
public static final CallOptions.Key<String> CIRCUIT_BREAKER_KEY =
31-
CallOptions.Key.createWithDefault("circuitBreakerKey", "default");
3228
private final CircuitBreakerRegistry resilicenceCircuitBreakerRegistry;
33-
private final CircuitBreakerConfigProvider circuitBreakerConfigProvider;
34-
private final Map<String, CircuitBreakerConfig> resilienceCircuitBreakerConfig;
29+
private final Map<String, CircuitBreakerConfig> resilienceCircuitBreakerConfigMap;
3530
private final ResilienceCircuitBreakerProvider resilienceCircuitBreakerProvider;
31+
private final CircuitBreakerConfiguration<?> circuitBreakerConfiguration;
3632
private final Clock clock;
3733

38-
public ResilienceCircuitBreakerInterceptor(Config config, Clock clock) {
39-
this.circuitBreakerConfigProvider = new CircuitBreakerConfigProvider(config);
40-
this.resilienceCircuitBreakerConfig =
34+
public ResilienceCircuitBreakerInterceptor(
35+
CircuitBreakerConfiguration<?> circuitBreakerConfiguration, Clock clock) {
36+
this.circuitBreakerConfiguration = circuitBreakerConfiguration;
37+
this.clock = clock;
38+
this.resilienceCircuitBreakerConfigMap =
4139
ResilienceCircuitBreakerConfigParser.getCircuitBreakerConfigs(
42-
circuitBreakerConfigProvider.getConfigMap());
40+
circuitBreakerConfiguration.getCircuitBreakerThresholdsMap());
4341
this.resilicenceCircuitBreakerRegistry =
44-
new ResilienceCircuitBreakerRegistryProvider(resilienceCircuitBreakerConfig)
42+
new ResilienceCircuitBreakerRegistryProvider(resilienceCircuitBreakerConfigMap)
4543
.getCircuitBreakerRegistry();
4644
this.resilienceCircuitBreakerProvider =
4745
new ResilienceCircuitBreakerProvider(
48-
resilicenceCircuitBreakerRegistry, resilienceCircuitBreakerConfig);
49-
this.clock = clock;
46+
resilicenceCircuitBreakerRegistry, resilienceCircuitBreakerConfigMap);
5047
}
5148

5249
@VisibleForTesting
5350
public ResilienceCircuitBreakerInterceptor(
54-
Config config,
5551
Clock clock,
5652
CircuitBreakerRegistry resilicenceCircuitBreakerRegistry,
57-
ResilienceCircuitBreakerProvider resilienceCircuitBreakerProvider) {
58-
this.circuitBreakerConfigProvider = new CircuitBreakerConfigProvider(config);
59-
this.resilienceCircuitBreakerConfig =
53+
ResilienceCircuitBreakerProvider resilienceCircuitBreakerProvider,
54+
CircuitBreakerConfiguration<?> circuitBreakerConfiguration) {
55+
this.circuitBreakerConfiguration = circuitBreakerConfiguration;
56+
this.resilienceCircuitBreakerConfigMap =
6057
ResilienceCircuitBreakerConfigParser.getCircuitBreakerConfigs(
61-
circuitBreakerConfigProvider.getConfigMap());
58+
circuitBreakerConfiguration.getCircuitBreakerThresholdsMap());
6259
this.resilicenceCircuitBreakerRegistry = resilicenceCircuitBreakerRegistry;
6360
this.resilienceCircuitBreakerProvider = resilienceCircuitBreakerProvider;
6461
this.clock = clock;
6562
}
6663

6764
@Override
6865
protected boolean isCircuitBreakerEnabled() {
69-
return circuitBreakerConfigProvider.isCircuitBreakerEnabled();
66+
return circuitBreakerConfiguration.isEnabled();
7067
}
7168

7269
@Override
7370
protected <ReqT, RespT> ClientCall<ReqT, RespT> createInterceptedCall(
7471
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
75-
// Get circuit breaker key from CallOptions
76-
String circuitBreakerKey = callOptions.getOption(CIRCUIT_BREAKER_KEY);
77-
CircuitBreaker circuitBreaker =
78-
resilienceCircuitBreakerProvider.getCircuitBreaker(circuitBreakerKey);
7972
return new ForwardingClientCall.SimpleForwardingClientCall<>(
8073
next.newCall(method, callOptions)) {
74+
CircuitBreaker circuitBreaker;
75+
String circuitBreakerKey;
76+
8177
@Override
8278
public void start(Listener<RespT> responseListener, Metadata headers) {
8379
Instant startTime = clock.instant();
@@ -87,8 +83,22 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
8783
super.start(wrappedListener, headers);
8884
}
8985

86+
@SuppressWarnings("unchecked")
9087
@Override
9188
public void sendMessage(ReqT message) {
89+
CircuitBreakerConfiguration<ReqT> config =
90+
(CircuitBreakerConfiguration<ReqT>) circuitBreakerConfiguration;
91+
if (config.getRequestClass() == null
92+
|| (!message.getClass().equals(config.getRequestClass()))) {
93+
log.warn("Invalid config for message type: {}", message.getClass());
94+
super.sendMessage(message);
95+
}
96+
if (config.getKeyFunction() != null) {
97+
circuitBreakerKey = config.getKeyFunction().apply(RequestContext.CURRENT.get(), message);
98+
} else {
99+
circuitBreakerKey = "default";
100+
}
101+
circuitBreaker = resilienceCircuitBreakerProvider.getCircuitBreaker(circuitBreakerKey);
92102
if (!circuitBreaker.tryAcquirePermission()) {
93103
logCircuitBreakerRejection(circuitBreakerKey, circuitBreaker);
94104
String rejectionReason =

0 commit comments

Comments
 (0)