Skip to content

Commit 589ee5c

Browse files
Add grpc circuit breaker utility using interceptors (#68)
* Add grpc circuit breaker utility using interceptors
1 parent 20402cf commit 589ee5c

14 files changed

+803
-1
lines changed

build.gradle.kts

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ plugins {
77
id("org.hypertrace.publish-plugin") version "1.0.5" apply false
88
id("org.hypertrace.jacoco-report-plugin") version "0.2.1" apply false
99
id("org.hypertrace.code-style-plugin") version "2.0.0" apply false
10-
id("org.owasp.dependencycheck") version "10.0.3"
10+
id("org.owasp.dependencycheck") version "12.1.0"
1111
}
1212

1313
subprojects {
+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
plugins {
2+
`java-library`
3+
jacoco
4+
id("org.hypertrace.publish-plugin")
5+
id("org.hypertrace.jacoco-report-plugin")
6+
}
7+
8+
dependencies {
9+
10+
api(platform("io.grpc:grpc-bom:1.68.3"))
11+
api("io.grpc:grpc-api")
12+
api(project(":grpc-context-utils"))
13+
14+
implementation("org.slf4j:slf4j-api:1.7.36")
15+
implementation("io.github.resilience4j:resilience4j-circuitbreaker:1.7.1")
16+
implementation("com.typesafe:config:1.4.2")
17+
implementation("com.google.guava:guava:32.0.1-jre")
18+
19+
annotationProcessor("org.projectlombok:lombok:1.18.24")
20+
compileOnly("org.projectlombok:lombok:1.18.24")
21+
22+
testImplementation("org.junit.jupiter:junit-jupiter:5.8.2")
23+
testImplementation("org.mockito:mockito-core:5.8.0")
24+
testImplementation("org.mockito:mockito-junit-jupiter:5.8.0")
25+
}
26+
27+
tasks.test {
28+
useJUnitPlatform()
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package org.hypertrace.circuitbreaker.grpcutils;
2+
3+
import com.typesafe.config.Config;
4+
import java.util.Map;
5+
import java.util.Set;
6+
import java.util.stream.Collectors;
7+
import lombok.extern.slf4j.Slf4j;
8+
9+
@Slf4j
10+
public class CircuitBreakerConfigParser {
11+
12+
// Percentage of failures to trigger OPEN state
13+
private static final String FAILURE_RATE_THRESHOLD = "failureRateThreshold";
14+
// Percentage of slow calls to trigger OPEN state
15+
private static final String SLOW_CALL_RATE_THRESHOLD = "slowCallRateThreshold";
16+
// Define what a "slow" call is
17+
private static final String SLOW_CALL_DURATION_THRESHOLD = "slowCallDurationThreshold";
18+
// Number of calls to consider in the sliding window
19+
private static final String SLIDING_WINDOW_SIZE = "slidingWindowSize";
20+
// Time before retrying after OPEN state
21+
private static final String WAIT_DURATION_IN_OPEN_STATE = "waitDurationInOpenState";
22+
// Minimum calls before evaluating failure rate
23+
private static final String MINIMUM_NUMBER_OF_CALLS = "minimumNumberOfCalls";
24+
// Calls allowed in HALF_OPEN state before deciding to
25+
// CLOSE or OPEN again
26+
private static final String PERMITTED_NUMBER_OF_CALLS_IN_HALF_OPEN_STATE =
27+
"permittedNumberOfCallsInHalfOpenState";
28+
private static final String SLIDING_WINDOW_TYPE = "slidingWindowType";
29+
public static final String ENABLED = "enabled";
30+
public static final String DEFAULT_THRESHOLDS = "defaultThresholds";
31+
private static final Set<String> NON_THRESHOLD_KEYS = Set.of(ENABLED, DEFAULT_THRESHOLDS);
32+
33+
public static <T> CircuitBreakerConfiguration.CircuitBreakerConfigurationBuilder<T> parseConfig(
34+
Config config) {
35+
CircuitBreakerConfiguration.CircuitBreakerConfigurationBuilder<T> builder =
36+
CircuitBreakerConfiguration.builder();
37+
if (config.hasPath(ENABLED)) {
38+
builder.enabled(config.getBoolean(ENABLED));
39+
}
40+
41+
Map<String, CircuitBreakerThresholds> circuitBreakerThresholdsMap =
42+
config.root().keySet().stream()
43+
.filter(key -> !NON_THRESHOLD_KEYS.contains(key)) // Filter out non-threshold keys
44+
.collect(
45+
Collectors.toMap(
46+
key -> key, // Circuit breaker key
47+
key -> buildCircuitBreakerThresholds(config.getConfig(key))));
48+
49+
builder.defaultThresholds(
50+
config.hasPath(DEFAULT_THRESHOLDS)
51+
? buildCircuitBreakerThresholds(config.getConfig(DEFAULT_THRESHOLDS))
52+
: buildCircuitBreakerDefaultThresholds());
53+
54+
builder.circuitBreakerThresholdsMap(circuitBreakerThresholdsMap);
55+
log.debug("Loaded circuit breaker configs: {}", builder);
56+
return builder;
57+
}
58+
59+
private static CircuitBreakerThresholds buildCircuitBreakerThresholds(Config config) {
60+
CircuitBreakerThresholds.CircuitBreakerThresholdsBuilder builder =
61+
CircuitBreakerThresholds.builder();
62+
63+
if (config.hasPath(FAILURE_RATE_THRESHOLD)) {
64+
builder.failureRateThreshold((float) config.getDouble(FAILURE_RATE_THRESHOLD));
65+
}
66+
67+
if (config.hasPath(SLOW_CALL_RATE_THRESHOLD)) {
68+
builder.slowCallRateThreshold((float) config.getDouble(SLOW_CALL_RATE_THRESHOLD));
69+
}
70+
71+
if (config.hasPath(SLOW_CALL_DURATION_THRESHOLD)) {
72+
builder.slowCallDurationThreshold(config.getDuration(SLOW_CALL_DURATION_THRESHOLD));
73+
}
74+
75+
if (config.hasPath(SLIDING_WINDOW_TYPE)) {
76+
builder.slidingWindowType(getSlidingWindowType(config.getString(SLIDING_WINDOW_TYPE)));
77+
}
78+
79+
if (config.hasPath(SLIDING_WINDOW_SIZE)) {
80+
builder.slidingWindowSize(config.getInt(SLIDING_WINDOW_SIZE));
81+
}
82+
83+
if (config.hasPath(WAIT_DURATION_IN_OPEN_STATE)) {
84+
builder.waitDurationInOpenState(config.getDuration(WAIT_DURATION_IN_OPEN_STATE));
85+
}
86+
87+
if (config.hasPath(PERMITTED_NUMBER_OF_CALLS_IN_HALF_OPEN_STATE)) {
88+
builder.permittedNumberOfCallsInHalfOpenState(
89+
config.getInt(PERMITTED_NUMBER_OF_CALLS_IN_HALF_OPEN_STATE));
90+
}
91+
92+
if (config.hasPath(MINIMUM_NUMBER_OF_CALLS)) {
93+
builder.minimumNumberOfCalls(config.getInt(MINIMUM_NUMBER_OF_CALLS));
94+
}
95+
96+
if (config.hasPath(ENABLED)) {
97+
builder.enabled(config.getBoolean(ENABLED));
98+
}
99+
100+
return builder.build();
101+
}
102+
103+
public static CircuitBreakerThresholds buildCircuitBreakerDefaultThresholds() {
104+
return CircuitBreakerThresholds.builder().build();
105+
}
106+
107+
private static CircuitBreakerThresholds.SlidingWindowType getSlidingWindowType(
108+
String slidingWindowType) {
109+
return CircuitBreakerThresholds.SlidingWindowType.valueOf(slidingWindowType);
110+
}
111+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package org.hypertrace.circuitbreaker.grpcutils;
2+
3+
import io.grpc.Status;
4+
import io.grpc.StatusRuntimeException;
5+
import java.util.Map;
6+
import java.util.function.BiFunction;
7+
import java.util.function.Function;
8+
import lombok.Builder;
9+
import lombok.Value;
10+
import org.hypertrace.core.grpcutils.context.RequestContext;
11+
12+
@Value
13+
@Builder
14+
public class CircuitBreakerConfiguration<T> {
15+
Class<T> requestClass;
16+
BiFunction<RequestContext, T, String> keyFunction;
17+
@Builder.Default boolean enabled = false;
18+
// Standard/default thresholds
19+
CircuitBreakerThresholds defaultThresholds;
20+
// Custom overrides for specific cases (less common)
21+
@Builder.Default Map<String, CircuitBreakerThresholds> circuitBreakerThresholdsMap = Map.of();
22+
23+
// New exception builder logic
24+
@Builder.Default
25+
Function<String, StatusRuntimeException> exceptionBuilder =
26+
reason -> Status.RESOURCE_EXHAUSTED.withDescription(reason).asRuntimeException();
27+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package org.hypertrace.circuitbreaker.grpcutils;
2+
3+
import io.grpc.CallOptions;
4+
import io.grpc.Channel;
5+
import io.grpc.ClientCall;
6+
import io.grpc.ClientInterceptor;
7+
import io.grpc.MethodDescriptor;
8+
9+
public abstract class CircuitBreakerInterceptor implements ClientInterceptor {
10+
@Override
11+
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
12+
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
13+
if (!isCircuitBreakerEnabled()) {
14+
return next.newCall(method, callOptions);
15+
}
16+
return createInterceptedCall(method, callOptions, next);
17+
}
18+
19+
protected abstract boolean isCircuitBreakerEnabled();
20+
21+
protected abstract <ReqT, RespT> ClientCall<ReqT, RespT> createInterceptedCall(
22+
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next);
23+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package org.hypertrace.circuitbreaker.grpcutils;
2+
3+
import java.time.Duration;
4+
import lombok.Builder;
5+
import lombok.Value;
6+
7+
@Value
8+
@Builder
9+
public class CircuitBreakerThresholds {
10+
// Percentage of failures to trigger OPEN state
11+
@Builder.Default float failureRateThreshold = 50f;
12+
// Percentage of slow calls to trigger OPEN state
13+
@Builder.Default float slowCallRateThreshold = 50f;
14+
// Define what a "slow" call is
15+
@Builder.Default Duration slowCallDurationThreshold = Duration.ofSeconds(2);
16+
// Number of calls to consider in the sliding window
17+
@Builder.Default SlidingWindowType slidingWindowType = SlidingWindowType.TIME_BASED;
18+
@Builder.Default int slidingWindowSize = 60;
19+
// Time before retrying after OPEN state
20+
@Builder.Default Duration waitDurationInOpenState = Duration.ofSeconds(60);
21+
// Minimum calls before evaluating failure rate
22+
@Builder.Default int minimumNumberOfCalls = 10;
23+
// Calls allowed in HALF_OPEN state before deciding to
24+
// CLOSE or OPEN again
25+
@Builder.Default int permittedNumberOfCallsInHalfOpenState = 5;
26+
@Builder.Default boolean enabled = true;
27+
28+
public enum SlidingWindowType {
29+
COUNT_BASED,
30+
TIME_BASED
31+
}
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package org.hypertrace.circuitbreaker.grpcutils.resilience;
2+
3+
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
4+
import java.util.List;
5+
import java.util.Map;
6+
import java.util.stream.Collectors;
7+
import org.hypertrace.circuitbreaker.grpcutils.CircuitBreakerThresholds;
8+
9+
/** Utility class to parse CircuitBreakerConfiguration to Resilience4j CircuitBreakerConfig */
10+
class ResilienceCircuitBreakerConfigConverter {
11+
12+
public static Map<String, CircuitBreakerConfig> getCircuitBreakerConfigs(
13+
Map<String, CircuitBreakerThresholds> configurationMap) {
14+
return configurationMap.entrySet().stream()
15+
.collect(Collectors.toMap(Map.Entry::getKey, entry -> convertConfig(entry.getValue())));
16+
}
17+
18+
public static List<String> getDisabledKeys(
19+
Map<String, CircuitBreakerThresholds> configurationMap) {
20+
return configurationMap.entrySet().stream()
21+
.filter(entry -> entry.getValue().isEnabled())
22+
.map(Map.Entry::getKey)
23+
.collect(Collectors.toList());
24+
}
25+
26+
static CircuitBreakerConfig convertConfig(CircuitBreakerThresholds configuration) {
27+
return CircuitBreakerConfig.custom()
28+
.failureRateThreshold(configuration.getFailureRateThreshold())
29+
.slowCallRateThreshold(configuration.getSlowCallRateThreshold())
30+
.slowCallDurationThreshold(configuration.getSlowCallDurationThreshold())
31+
.slidingWindowType(getSlidingWindowType(configuration.getSlidingWindowType()))
32+
.slidingWindowSize(configuration.getSlidingWindowSize())
33+
.waitDurationInOpenState(configuration.getWaitDurationInOpenState())
34+
.permittedNumberOfCallsInHalfOpenState(
35+
configuration.getPermittedNumberOfCallsInHalfOpenState())
36+
.minimumNumberOfCalls(configuration.getMinimumNumberOfCalls())
37+
.build();
38+
}
39+
40+
private static CircuitBreakerConfig.SlidingWindowType getSlidingWindowType(
41+
CircuitBreakerThresholds.SlidingWindowType slidingWindowType) {
42+
switch (slidingWindowType) {
43+
case COUNT_BASED:
44+
return CircuitBreakerConfig.SlidingWindowType.COUNT_BASED;
45+
case TIME_BASED:
46+
default:
47+
return CircuitBreakerConfig.SlidingWindowType.TIME_BASED;
48+
}
49+
}
50+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package org.hypertrace.circuitbreaker.grpcutils.resilience;
2+
3+
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
4+
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
5+
import java.time.Clock;
6+
import java.util.Map;
7+
import org.hypertrace.circuitbreaker.grpcutils.CircuitBreakerConfiguration;
8+
9+
public class ResilienceCircuitBreakerFactory {
10+
public static ResilienceCircuitBreakerInterceptor getResilienceCircuitBreakerInterceptor(
11+
CircuitBreakerConfiguration<?> circuitBreakerConfiguration, Clock clock) {
12+
Map<String, CircuitBreakerConfig> resilienceCircuitBreakerConfigMap =
13+
ResilienceCircuitBreakerConfigConverter.getCircuitBreakerConfigs(
14+
circuitBreakerConfiguration.getCircuitBreakerThresholdsMap());
15+
CircuitBreakerRegistry resilienceCircuitBreakerRegistry =
16+
new ResilienceCircuitBreakerRegistryProvider(
17+
circuitBreakerConfiguration.getDefaultThresholds())
18+
.getCircuitBreakerRegistry();
19+
ResilienceCircuitBreakerProvider resilienceCircuitBreakerProvider =
20+
new ResilienceCircuitBreakerProvider(
21+
resilienceCircuitBreakerRegistry,
22+
resilienceCircuitBreakerConfigMap,
23+
ResilienceCircuitBreakerConfigConverter.getDisabledKeys(
24+
circuitBreakerConfiguration.getCircuitBreakerThresholdsMap()),
25+
circuitBreakerConfiguration.getDefaultThresholds().isEnabled());
26+
return new ResilienceCircuitBreakerInterceptor(
27+
circuitBreakerConfiguration, clock, resilienceCircuitBreakerProvider);
28+
}
29+
}

0 commit comments

Comments
 (0)