-
Notifications
You must be signed in to change notification settings - Fork 0
Add grpc circuit breaker utility using interceptors #68
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
bc60d8e
faa0e02
6a058f7
878a8cf
275c854
04ed487
fa3a3c3
90ca22b
5f0ecd0
8ca26b7
75fb8b1
77b0aac
ea47f32
f163a9d
5f3b287
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
plugins { | ||
`java-library` | ||
jacoco | ||
id("org.hypertrace.publish-plugin") | ||
id("org.hypertrace.jacoco-report-plugin") | ||
} | ||
|
||
dependencies { | ||
|
||
api(platform("io.grpc:grpc-bom:1.68.3")) | ||
api("io.grpc:grpc-api") | ||
api(project(":grpc-context-utils")) | ||
|
||
implementation("org.slf4j:slf4j-api:1.7.36") | ||
implementation("io.github.resilience4j:resilience4j-circuitbreaker:1.7.1") | ||
implementation("com.typesafe:config:1.4.2") | ||
implementation("com.google.guava:guava:32.0.1-jre") | ||
|
||
annotationProcessor("org.projectlombok:lombok:1.18.24") | ||
compileOnly("org.projectlombok:lombok:1.18.24") | ||
|
||
testImplementation("org.junit.jupiter:junit-jupiter:5.8.2") | ||
testImplementation("org.mockito:mockito-core:5.8.0") | ||
testImplementation("org.mockito:mockito-junit-jupiter:5.8.0") | ||
} | ||
|
||
tasks.test { | ||
useJUnitPlatform() | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
package org.hypertrace.circuitbreaker.grpcutils; | ||
|
||
import com.typesafe.config.Config; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.stream.Collectors; | ||
import lombok.extern.slf4j.Slf4j; | ||
|
||
@Slf4j | ||
public class CircuitBreakerConfigParser { | ||
Check warning on line 10 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfigParser.java
|
||
|
||
// Percentage of failures to trigger OPEN state | ||
private static final String FAILURE_RATE_THRESHOLD = "failureRateThreshold"; | ||
// Percentage of slow calls to trigger OPEN state | ||
private static final String SLOW_CALL_RATE_THRESHOLD = "slowCallRateThreshold"; | ||
// Define what a "slow" call is | ||
private static final String SLOW_CALL_DURATION_THRESHOLD = "slowCallDurationThreshold"; | ||
// Number of calls to consider in the sliding window | ||
private static final String SLIDING_WINDOW_SIZE = "slidingWindowSize"; | ||
// Time before retrying after OPEN state | ||
private static final String WAIT_DURATION_IN_OPEN_STATE = "waitDurationInOpenState"; | ||
// Minimum calls before evaluating failure rate | ||
private static final String MINIMUM_NUMBER_OF_CALLS = "minimumNumberOfCalls"; | ||
// Calls allowed in HALF_OPEN state before deciding to | ||
// CLOSE or OPEN again | ||
private static final String PERMITTED_NUMBER_OF_CALLS_IN_HALF_OPEN_STATE = | ||
"permittedNumberOfCallsInHalfOpenState"; | ||
private static final String SLIDING_WINDOW_TYPE = "slidingWindowType"; | ||
public static final String ENABLED = "enabled"; | ||
public static final String DEFAULT_THRESHOLDS = "defaultThresholds"; | ||
private static final Set<String> NON_THRESHOLD_KEYS = Set.of(ENABLED, DEFAULT_THRESHOLDS); | ||
Check warning on line 31 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfigParser.java
|
||
|
||
public static <T> CircuitBreakerConfiguration.CircuitBreakerConfigurationBuilder<T> parseConfig( | ||
Config config) { | ||
CircuitBreakerConfiguration.CircuitBreakerConfigurationBuilder<T> builder = | ||
CircuitBreakerConfiguration.builder(); | ||
Check warning on line 36 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfigParser.java
|
||
if (config.hasPath(ENABLED)) { | ||
builder.enabled(config.getBoolean(ENABLED)); | ||
Check warning on line 38 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfigParser.java
|
||
} | ||
|
||
Map<String, CircuitBreakerThresholds> circuitBreakerThresholdsMap = | ||
config.root().keySet().stream() | ||
Check warning on line 42 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfigParser.java
|
||
.filter(key -> !NON_THRESHOLD_KEYS.contains(key)) // Filter out non-threshold keys | ||
aaron-steinfeld marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.collect( | ||
Collectors.toMap( | ||
key -> key, // Circuit breaker key | ||
key -> buildCircuitBreakerThresholds(config.getConfig(key)))); | ||
Check warning on line 47 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfigParser.java
|
||
|
||
builder.defaultThresholds( | ||
Check warning on line 49 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfigParser.java
|
||
config.hasPath(DEFAULT_THRESHOLDS) | ||
? buildCircuitBreakerThresholds(config.getConfig(DEFAULT_THRESHOLDS)) | ||
: buildCircuitBreakerDefaultThresholds()); | ||
Check warning on line 52 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfigParser.java
|
||
|
||
builder.circuitBreakerThresholdsMap(circuitBreakerThresholdsMap); | ||
log.debug("Loaded circuit breaker configs: {}", builder); | ||
return builder; | ||
Check warning on line 56 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfigParser.java
|
||
} | ||
|
||
private static CircuitBreakerThresholds buildCircuitBreakerThresholds(Config config) { | ||
CircuitBreakerThresholds.CircuitBreakerThresholdsBuilder builder = | ||
CircuitBreakerThresholds.builder(); | ||
Check warning on line 61 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfigParser.java
|
||
|
||
if (config.hasPath(FAILURE_RATE_THRESHOLD)) { | ||
builder.failureRateThreshold((float) config.getDouble(FAILURE_RATE_THRESHOLD)); | ||
Check warning on line 64 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfigParser.java
|
||
} | ||
|
||
if (config.hasPath(SLOW_CALL_RATE_THRESHOLD)) { | ||
builder.slowCallRateThreshold((float) config.getDouble(SLOW_CALL_RATE_THRESHOLD)); | ||
Check warning on line 68 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfigParser.java
|
||
} | ||
|
||
if (config.hasPath(SLOW_CALL_DURATION_THRESHOLD)) { | ||
builder.slowCallDurationThreshold(config.getDuration(SLOW_CALL_DURATION_THRESHOLD)); | ||
Check warning on line 72 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfigParser.java
|
||
} | ||
|
||
if (config.hasPath(SLIDING_WINDOW_TYPE)) { | ||
builder.slidingWindowType(getSlidingWindowType(config.getString(SLIDING_WINDOW_TYPE))); | ||
Check warning on line 76 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfigParser.java
|
||
} | ||
|
||
if (config.hasPath(SLIDING_WINDOW_SIZE)) { | ||
builder.slidingWindowSize(config.getInt(SLIDING_WINDOW_SIZE)); | ||
Check warning on line 80 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfigParser.java
|
||
} | ||
|
||
if (config.hasPath(WAIT_DURATION_IN_OPEN_STATE)) { | ||
builder.waitDurationInOpenState(config.getDuration(WAIT_DURATION_IN_OPEN_STATE)); | ||
Check warning on line 84 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfigParser.java
|
||
} | ||
|
||
if (config.hasPath(PERMITTED_NUMBER_OF_CALLS_IN_HALF_OPEN_STATE)) { | ||
builder.permittedNumberOfCallsInHalfOpenState( | ||
config.getInt(PERMITTED_NUMBER_OF_CALLS_IN_HALF_OPEN_STATE)); | ||
Check warning on line 89 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfigParser.java
|
||
} | ||
|
||
if (config.hasPath(MINIMUM_NUMBER_OF_CALLS)) { | ||
builder.minimumNumberOfCalls(config.getInt(MINIMUM_NUMBER_OF_CALLS)); | ||
Check warning on line 93 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfigParser.java
|
||
} | ||
|
||
if (config.hasPath(ENABLED)) { | ||
builder.enabled(config.getBoolean(ENABLED)); | ||
Check warning on line 97 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfigParser.java
|
||
} | ||
|
||
return builder.build(); | ||
Check warning on line 100 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfigParser.java
|
||
} | ||
|
||
public static CircuitBreakerThresholds buildCircuitBreakerDefaultThresholds() { | ||
return CircuitBreakerThresholds.builder().build(); | ||
Check warning on line 104 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfigParser.java
|
||
} | ||
|
||
private static CircuitBreakerThresholds.SlidingWindowType getSlidingWindowType( | ||
String slidingWindowType) { | ||
return CircuitBreakerThresholds.SlidingWindowType.valueOf(slidingWindowType); | ||
Check warning on line 109 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfigParser.java
|
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
package org.hypertrace.circuitbreaker.grpcutils; | ||
|
||
import io.grpc.Status; | ||
import io.grpc.StatusRuntimeException; | ||
import java.util.Map; | ||
import java.util.function.BiFunction; | ||
import java.util.function.Function; | ||
import lombok.Builder; | ||
import lombok.Value; | ||
import org.hypertrace.core.grpcutils.context.RequestContext; | ||
|
||
@Value | ||
@Builder | ||
public class CircuitBreakerConfiguration<T> { | ||
Class<T> requestClass; | ||
BiFunction<RequestContext, T, String> keyFunction; | ||
@Builder.Default boolean enabled = false; | ||
Check warning on line 17 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfiguration.java
|
||
// Standard/default thresholds | ||
CircuitBreakerThresholds defaultThresholds; | ||
Check warning on line 19 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfiguration.java
|
||
// Custom overrides for specific cases (less common) | ||
@Builder.Default Map<String, CircuitBreakerThresholds> circuitBreakerThresholdsMap = Map.of(); | ||
Check warning on line 21 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfiguration.java
|
||
|
||
// New exception builder logic | ||
@Builder.Default | ||
Function<String, StatusRuntimeException> exceptionBuilder = | ||
reason -> Status.RESOURCE_EXHAUSTED.withDescription(reason).asRuntimeException(); | ||
Check warning on line 26 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfiguration.java
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
package org.hypertrace.circuitbreaker.grpcutils; | ||
|
||
import io.grpc.CallOptions; | ||
import io.grpc.Channel; | ||
import io.grpc.ClientCall; | ||
import io.grpc.ClientInterceptor; | ||
import io.grpc.MethodDescriptor; | ||
|
||
public abstract class CircuitBreakerInterceptor implements ClientInterceptor { | ||
@Override | ||
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( | ||
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) { | ||
if (!isCircuitBreakerEnabled()) { | ||
return next.newCall(method, callOptions); | ||
Check warning on line 14 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerInterceptor.java
|
||
} | ||
return createInterceptedCall(method, callOptions, next); | ||
Check warning on line 16 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerInterceptor.java
|
||
} | ||
|
||
protected abstract boolean isCircuitBreakerEnabled(); | ||
|
||
protected abstract <ReqT, RespT> ClientCall<ReqT, RespT> createInterceptedCall( | ||
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
package org.hypertrace.circuitbreaker.grpcutils; | ||
|
||
import java.time.Duration; | ||
import lombok.Builder; | ||
import lombok.Value; | ||
|
||
@Value | ||
@Builder | ||
public class CircuitBreakerThresholds { | ||
// Percentage of failures to trigger OPEN state | ||
@Builder.Default float failureRateThreshold = 50f; | ||
// Percentage of slow calls to trigger OPEN state | ||
@Builder.Default float slowCallRateThreshold = 50f; | ||
// Define what a "slow" call is | ||
@Builder.Default Duration slowCallDurationThreshold = Duration.ofSeconds(2); | ||
// Number of calls to consider in the sliding window | ||
@Builder.Default SlidingWindowType slidingWindowType = SlidingWindowType.TIME_BASED; | ||
@Builder.Default int slidingWindowSize = 60; | ||
// Time before retrying after OPEN state | ||
@Builder.Default Duration waitDurationInOpenState = Duration.ofSeconds(60); | ||
// Minimum calls before evaluating failure rate | ||
@Builder.Default int minimumNumberOfCalls = 10; | ||
// Calls allowed in HALF_OPEN state before deciding to | ||
// CLOSE or OPEN again | ||
@Builder.Default int permittedNumberOfCallsInHalfOpenState = 5; | ||
@Builder.Default boolean enabled = true; | ||
Check warning on line 26 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerThresholds.java
|
||
|
||
public enum SlidingWindowType { | ||
COUNT_BASED, | ||
TIME_BASED | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
package org.hypertrace.circuitbreaker.grpcutils.resilience; | ||
|
||
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.stream.Collectors; | ||
import org.hypertrace.circuitbreaker.grpcutils.CircuitBreakerThresholds; | ||
|
||
/** Utility class to parse CircuitBreakerConfiguration to Resilience4j CircuitBreakerConfig */ | ||
class ResilienceCircuitBreakerConfigConverter { | ||
Check warning on line 10 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerConfigConverter.java
|
||
|
||
public static Map<String, CircuitBreakerConfig> getCircuitBreakerConfigs( | ||
Map<String, CircuitBreakerThresholds> configurationMap) { | ||
return configurationMap.entrySet().stream() | ||
.collect(Collectors.toMap(Map.Entry::getKey, entry -> convertConfig(entry.getValue()))); | ||
} | ||
|
||
public static List<String> getDisabledKeys( | ||
Map<String, CircuitBreakerThresholds> configurationMap) { | ||
return configurationMap.entrySet().stream() | ||
.filter(entry -> entry.getValue().isEnabled()) | ||
.map(Map.Entry::getKey) | ||
.collect(Collectors.toList()); | ||
Check warning on line 23 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerConfigConverter.java
|
||
} | ||
|
||
static CircuitBreakerConfig convertConfig(CircuitBreakerThresholds configuration) { | ||
return CircuitBreakerConfig.custom() | ||
.failureRateThreshold(configuration.getFailureRateThreshold()) | ||
.slowCallRateThreshold(configuration.getSlowCallRateThreshold()) | ||
.slowCallDurationThreshold(configuration.getSlowCallDurationThreshold()) | ||
.slidingWindowType(getSlidingWindowType(configuration.getSlidingWindowType())) | ||
.slidingWindowSize(configuration.getSlidingWindowSize()) | ||
.waitDurationInOpenState(configuration.getWaitDurationInOpenState()) | ||
.permittedNumberOfCallsInHalfOpenState( | ||
configuration.getPermittedNumberOfCallsInHalfOpenState()) | ||
.minimumNumberOfCalls(configuration.getMinimumNumberOfCalls()) | ||
.build(); | ||
} | ||
|
||
private static CircuitBreakerConfig.SlidingWindowType getSlidingWindowType( | ||
CircuitBreakerThresholds.SlidingWindowType slidingWindowType) { | ||
switch (slidingWindowType) { | ||
case COUNT_BASED: | ||
return CircuitBreakerConfig.SlidingWindowType.COUNT_BASED; | ||
Check warning on line 44 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerConfigConverter.java
|
||
case TIME_BASED: | ||
default: | ||
return CircuitBreakerConfig.SlidingWindowType.TIME_BASED; | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
package org.hypertrace.circuitbreaker.grpcutils.resilience; | ||
|
||
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig; | ||
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry; | ||
import java.time.Clock; | ||
import java.util.Map; | ||
import org.hypertrace.circuitbreaker.grpcutils.CircuitBreakerConfiguration; | ||
|
||
public class ResilienceCircuitBreakerFactory { | ||
Check warning on line 9 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerFactory.java
|
||
public static ResilienceCircuitBreakerInterceptor getResilienceCircuitBreakerInterceptor( | ||
CircuitBreakerConfiguration<?> circuitBreakerConfiguration, Clock clock) { | ||
Map<String, CircuitBreakerConfig> resilienceCircuitBreakerConfigMap = | ||
ResilienceCircuitBreakerConfigConverter.getCircuitBreakerConfigs( | ||
circuitBreakerConfiguration.getCircuitBreakerThresholdsMap()); | ||
CircuitBreakerRegistry resilienceCircuitBreakerRegistry = | ||
Check warning on line 15 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerFactory.java
|
||
new ResilienceCircuitBreakerRegistryProvider( | ||
circuitBreakerConfiguration.getDefaultThresholds()) | ||
aaron-steinfeld marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.getCircuitBreakerRegistry(); | ||
ResilienceCircuitBreakerProvider resilienceCircuitBreakerProvider = | ||
Check warning on line 19 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerFactory.java
|
||
new ResilienceCircuitBreakerProvider( | ||
resilienceCircuitBreakerRegistry, | ||
resilienceCircuitBreakerConfigMap, | ||
ResilienceCircuitBreakerConfigConverter.getDisabledKeys( | ||
circuitBreakerConfiguration.getCircuitBreakerThresholdsMap()), | ||
circuitBreakerConfiguration.getDefaultThresholds().isEnabled()); | ||
return new ResilienceCircuitBreakerInterceptor( | ||
Check warning on line 26 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerFactory.java
|
||
circuitBreakerConfiguration, clock, resilienceCircuitBreakerProvider); | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.