Skip to content

Commit 8ca26b7

Browse files
Addressed comments
1 parent 5f0ecd0 commit 8ca26b7

File tree

7 files changed

+87
-55
lines changed

7 files changed

+87
-55
lines changed

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfigParser.java

+11-8
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public class CircuitBreakerConfigParser {
2828
private static final String SLIDING_WINDOW_TYPE = "slidingWindowType";
2929
public static final String ENABLED = "enabled";
3030
public static final String DEFAULT_THRESHOLDS = "defaultThresholds";
31-
private static final Set<String> NON_THRESHOLD_KEYS = Set.of(ENABLED, DEFAULT_THRESHOLDS);
31+
private static final Set<String> NON_THRESHOLD_KEYS = Set.of(ENABLED);
3232

3333
public static <T> CircuitBreakerConfiguration.CircuitBreakerConfigurationBuilder<T> parseConfig(
3434
Config config) {
@@ -38,20 +38,19 @@ public static <T> CircuitBreakerConfiguration.CircuitBreakerConfigurationBuilder
3838
builder.enabled(config.getBoolean(ENABLED));
3939
}
4040

41-
if (config.hasPath(DEFAULT_THRESHOLDS)) {
42-
builder.defaultThresholds(
43-
buildCircuitBreakerThresholds(config.getConfig(DEFAULT_THRESHOLDS)));
44-
} else {
45-
builder.defaultThresholds(buildCircuitBreakerDefaultThresholds());
46-
}
47-
4841
Map<String, CircuitBreakerThresholds> circuitBreakerThresholdsMap =
4942
config.root().keySet().stream()
5043
.filter(key -> !NON_THRESHOLD_KEYS.contains(key)) // Filter out non-threshold keys
5144
.collect(
5245
Collectors.toMap(
5346
key -> key, // Circuit breaker key
5447
key -> buildCircuitBreakerThresholds(config.getConfig(key))));
48+
49+
if (!config.hasPath(DEFAULT_THRESHOLDS)) {
50+
builder.defaultThresholds(buildCircuitBreakerDefaultThresholds());
51+
circuitBreakerThresholdsMap.put(DEFAULT_THRESHOLDS, buildCircuitBreakerDefaultThresholds());
52+
}
53+
5554
builder.circuitBreakerThresholdsMap(circuitBreakerThresholdsMap);
5655
log.debug("Loaded circuit breaker configs: {}", builder);
5756
return builder;
@@ -94,6 +93,10 @@ private static CircuitBreakerThresholds buildCircuitBreakerThresholds(Config con
9493
builder.minimumNumberOfCalls(config.getInt(MINIMUM_NUMBER_OF_CALLS));
9594
}
9695

96+
if (config.hasPath(ENABLED)) {
97+
builder.enabled(config.getBoolean(ENABLED));
98+
}
99+
97100
return builder.build();
98101
}
99102

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfiguration.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.hypertrace.circuitbreaker.grpcutils;
22

33
import io.grpc.Status;
4+
import io.grpc.StatusRuntimeException;
45
import java.util.Map;
56
import java.util.function.BiFunction;
67
import java.util.function.Function;
@@ -14,15 +15,13 @@ public class CircuitBreakerConfiguration<T> {
1415
Class<T> requestClass;
1516
BiFunction<RequestContext, T, String> keyFunction;
1617
@Builder.Default boolean enabled = false;
17-
// Default value be "global" if not override.
18-
String defaultCircuitBreakerKey = "global";
1918
// Standard/default thresholds
2019
CircuitBreakerThresholds defaultThresholds;
2120
// Custom overrides for specific cases (less common)
2221
@Builder.Default Map<String, CircuitBreakerThresholds> circuitBreakerThresholdsMap = Map.of();
2322

2423
// New exception builder logic
2524
@Builder.Default
26-
Function<String, RuntimeException> exceptionBuilder =
25+
Function<String, StatusRuntimeException> exceptionBuilder =
2726
reason -> Status.RESOURCE_EXHAUSTED.withDescription(reason).asRuntimeException();
2827
}

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerThresholds.java

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ public class CircuitBreakerThresholds {
2323
// Calls allowed in HALF_OPEN state before deciding to
2424
// CLOSE or OPEN again
2525
@Builder.Default int permittedNumberOfCallsInHalfOpenState = 5;
26+
@Builder.Default boolean enabled = true;
2627

2728
public enum SlidingWindowType {
2829
COUNT_BASED,

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerFactory.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ public static ResilienceCircuitBreakerInterceptor getResilienceCircuitBreakerInt
1818
.getCircuitBreakerRegistry();
1919
ResilienceCircuitBreakerProvider resilienceCircuitBreakerProvider =
2020
new ResilienceCircuitBreakerProvider(
21-
resilicenceCircuitBreakerRegistry, resilienceCircuitBreakerConfigMap);
21+
resilicenceCircuitBreakerRegistry,
22+
resilienceCircuitBreakerConfigMap,
23+
circuitBreakerConfiguration.getCircuitBreakerThresholdsMap());
2224
return new ResilienceCircuitBreakerInterceptor(
2325
circuitBreakerConfiguration, clock, resilienceCircuitBreakerProvider);
2426
}

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java

+20-12
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.time.Duration;
1414
import java.time.Instant;
1515
import java.util.Map;
16+
import java.util.Optional;
1617
import java.util.concurrent.TimeUnit;
1718
import lombok.extern.slf4j.Slf4j;
1819
import org.hypertrace.circuitbreaker.grpcutils.CircuitBreakerConfiguration;
@@ -45,7 +46,7 @@ protected <ReqT, RespT> ClientCall<ReqT, RespT> createInterceptedCall(
4546
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
4647
return new ForwardingClientCall.SimpleForwardingClientCall<>(
4748
next.newCall(method, callOptions)) {
48-
CircuitBreaker circuitBreaker;
49+
Optional<CircuitBreaker> circuitBreaker;
4950
String circuitBreakerKey;
5051

5152
@Override
@@ -67,17 +68,21 @@ public void sendMessage(ReqT message) {
6768
super.sendMessage(message);
6869
return;
6970
}
70-
if (config.getKeyFunction() == null) {
71-
log.debug("Circuit breaker will apply to all requests as keyFunction config is not set");
72-
circuitBreakerKey = config.getDefaultCircuitBreakerKey();
73-
} else {
71+
if (config.getKeyFunction() != null) {
7472
circuitBreakerKey = config.getKeyFunction().apply(RequestContext.CURRENT.get(), message);
73+
circuitBreaker = resilienceCircuitBreakerProvider.getCircuitBreaker(circuitBreakerKey);
74+
} else {
75+
log.debug("Circuit breaker will apply to all requests as keyFunction config is not set");
76+
circuitBreaker = resilienceCircuitBreakerProvider.getDefaultCircuitBreaker();
77+
}
78+
if (circuitBreaker.isEmpty()) {
79+
super.sendMessage(message);
80+
return;
7581
}
76-
circuitBreaker = resilienceCircuitBreakerProvider.getCircuitBreaker(circuitBreakerKey);
77-
if (!circuitBreaker.tryAcquirePermission()) {
78-
logCircuitBreakerRejection(circuitBreakerKey, circuitBreaker);
82+
if (!circuitBreaker.get().tryAcquirePermission()) {
83+
logCircuitBreakerRejection(circuitBreakerKey, circuitBreaker.get());
7984
String rejectionReason =
80-
circuitBreaker.getState() == CircuitBreaker.State.HALF_OPEN
85+
circuitBreaker.get().getState() == CircuitBreaker.State.HALF_OPEN
8186
? "Circuit Breaker is HALF-OPEN and rejecting excess requests"
8287
: "Circuit Breaker is OPEN and blocking requests";
8388
throw config.getExceptionBuilder().apply(rejectionReason);
@@ -89,18 +94,21 @@ public void sendMessage(ReqT message) {
8994
wrapListenerWithCircuitBreaker(Listener<RespT> responseListener, Instant startTime) {
9095
return new ForwardingClientCallListener.SimpleForwardingClientCallListener<>(
9196
responseListener) {
97+
@SuppressWarnings("OptionalGetWithoutIsPresent")
9298
@Override
9399
public void onClose(Status status, Metadata trailers) {
94100
long duration = Duration.between(startTime, clock.instant()).toNanos();
95101
if (status.isOk()) {
96-
circuitBreaker.onSuccess(duration, TimeUnit.NANOSECONDS);
102+
circuitBreaker.get().onSuccess(duration, TimeUnit.NANOSECONDS);
97103
} else {
98104
log.debug(
99105
"Circuit Breaker '{}' detected failure. Status: {}, Description: {}",
100-
circuitBreaker.getName(),
106+
circuitBreaker.get().getName(),
101107
status.getCode(),
102108
status.getDescription());
103-
circuitBreaker.onError(duration, TimeUnit.NANOSECONDS, status.asRuntimeException());
109+
circuitBreaker
110+
.get()
111+
.onError(duration, TimeUnit.NANOSECONDS, status.asRuntimeException());
104112
}
105113
super.onClose(status, trailers);
106114
}

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerProvider.java

+41-27
Original file line numberDiff line numberDiff line change
@@ -7,48 +7,62 @@
77
import java.util.Optional;
88
import java.util.concurrent.ConcurrentHashMap;
99
import lombok.extern.slf4j.Slf4j;
10+
import org.hypertrace.circuitbreaker.grpcutils.CircuitBreakerConfigParser;
11+
import org.hypertrace.circuitbreaker.grpcutils.CircuitBreakerThresholds;
1012

1113
/** Utility class to provide Resilience4j CircuitBreaker */
1214
@Slf4j
1315
class ResilienceCircuitBreakerProvider {
1416

1517
private final CircuitBreakerRegistry circuitBreakerRegistry;
1618
private final Map<String, CircuitBreakerConfig> circuitBreakerConfigMap;
19+
private final Map<String, CircuitBreakerThresholds> circuitBreakerThresholdsMap;
1720
private final Map<String, CircuitBreaker> circuitBreakerCache = new ConcurrentHashMap<>();
1821

1922
public ResilienceCircuitBreakerProvider(
2023
CircuitBreakerRegistry circuitBreakerRegistry,
21-
Map<String, CircuitBreakerConfig> circuitBreakerConfigMap) {
24+
Map<String, CircuitBreakerConfig> circuitBreakerConfigMap,
25+
Map<String, CircuitBreakerThresholds> circuitBreakerThresholdsMap) {
2226
this.circuitBreakerRegistry = circuitBreakerRegistry;
2327
this.circuitBreakerConfigMap = circuitBreakerConfigMap;
28+
this.circuitBreakerThresholdsMap = circuitBreakerThresholdsMap;
2429
}
2530

26-
public CircuitBreaker getCircuitBreaker(String circuitBreakerKey) {
27-
return circuitBreakerCache.computeIfAbsent(
28-
circuitBreakerKey,
29-
key -> {
30-
CircuitBreaker circuitBreaker = getCircuitBreakerFromConfigMap(circuitBreakerKey);
31-
circuitBreaker
32-
.getEventPublisher()
33-
.onStateTransition(
34-
event ->
35-
log.info(
36-
"State transition: {} for circuit breaker {}",
37-
event.getStateTransition(),
38-
event.getCircuitBreakerName()))
39-
.onCallNotPermitted(
40-
event ->
41-
log.debug(
42-
"Call not permitted: Circuit is OPEN for circuit breaker {}",
43-
event.getCircuitBreakerName()))
44-
.onEvent(
45-
event ->
46-
log.debug(
47-
"Circuit breaker event type {} for circuit breaker name {}",
48-
event.getEventType(),
49-
event.getCircuitBreakerName()));
50-
return circuitBreaker;
51-
});
31+
public Optional<CircuitBreaker> getCircuitBreaker(String circuitBreakerKey) {
32+
if (circuitBreakerThresholdsMap.containsKey(circuitBreakerKey)
33+
&& !circuitBreakerThresholdsMap.get(circuitBreakerKey).isEnabled()) {
34+
return Optional.empty();
35+
}
36+
return Optional.of(
37+
circuitBreakerCache.computeIfAbsent(
38+
circuitBreakerKey,
39+
key -> {
40+
CircuitBreaker circuitBreaker = getCircuitBreakerFromConfigMap(circuitBreakerKey);
41+
circuitBreaker
42+
.getEventPublisher()
43+
.onStateTransition(
44+
event ->
45+
log.info(
46+
"State transition: {} for circuit breaker {}",
47+
event.getStateTransition(),
48+
event.getCircuitBreakerName()))
49+
.onCallNotPermitted(
50+
event ->
51+
log.debug(
52+
"Call not permitted: Circuit is OPEN for circuit breaker {}",
53+
event.getCircuitBreakerName()))
54+
.onEvent(
55+
event ->
56+
log.debug(
57+
"Circuit breaker event type {} for circuit breaker name {}",
58+
event.getEventType(),
59+
event.getCircuitBreakerName()));
60+
return circuitBreaker;
61+
}));
62+
}
63+
64+
public Optional<CircuitBreaker> getDefaultCircuitBreaker() {
65+
return getCircuitBreaker(CircuitBreakerConfigParser.DEFAULT_THRESHOLDS);
5266
}
5367

5468
private CircuitBreaker getCircuitBreakerFromConfigMap(String circuitBreakerKey) {

grpc-circuitbreaker-utils/src/test/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptorTest.java

+9-4
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import static org.junit.jupiter.api.Assertions.assertThrows;
44
import static org.mockito.ArgumentMatchers.any;
5-
import static org.mockito.ArgumentMatchers.anyString;
65
import static org.mockito.Mockito.*;
76

87
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
@@ -11,6 +10,7 @@
1110
import java.time.Clock;
1211
import java.time.Instant;
1312
import java.time.ZoneOffset;
13+
import java.util.Optional;
1414
import java.util.concurrent.TimeUnit;
1515
import org.hypertrace.circuitbreaker.grpcutils.CircuitBreakerConfiguration;
1616
import org.junit.jupiter.api.BeforeEach;
@@ -40,14 +40,11 @@ void setUp() {
4040

4141
fixedClock = Clock.fixed(Instant.now(), ZoneOffset.UTC);
4242
when(mockChannel.newCall(any(), any())).thenReturn(mockClientCall);
43-
when(mockCircuitBreakerProvider.getCircuitBreaker(anyString())).thenReturn(mockCircuitBreaker);
44-
when(mockCircuitBreakerConfig.getDefaultCircuitBreakerKey()).thenReturn("global");
4543
}
4644

4745
@Test
4846
void testSendMessage_CallsSuperSendMessage_Success() {
4947
doNothing().when(mockClientCall).sendMessage(any());
50-
when(mockCircuitBreaker.tryAcquirePermission()).thenReturn(true);
5148

5249
ResilienceCircuitBreakerInterceptor interceptor =
5350
new ResilienceCircuitBreakerInterceptor(
@@ -67,6 +64,8 @@ void testSendMessage_CallsSuperSendMessage_Success() {
6764
void testSendMessage_CircuitBreakerRejectsRequest() {
6865
when(mockCircuitBreaker.tryAcquirePermission()).thenReturn(false);
6966
when(mockCircuitBreaker.getState()).thenReturn(CircuitBreaker.State.OPEN);
67+
when(mockCircuitBreakerProvider.getDefaultCircuitBreaker())
68+
.thenReturn(Optional.of(mockCircuitBreaker));
7069
when(mockCircuitBreakerConfig.getExceptionBuilder())
7170
.thenReturn(
7271
reason ->
@@ -94,6 +93,8 @@ void testSendMessage_CircuitBreakerRejectsRequest() {
9493
void testSendMessage_CircuitBreakerInHalfOpenState() {
9594
when(mockCircuitBreaker.tryAcquirePermission()).thenReturn(false);
9695
when(mockCircuitBreaker.getState()).thenReturn(CircuitBreaker.State.HALF_OPEN);
96+
when(mockCircuitBreakerProvider.getDefaultCircuitBreaker())
97+
.thenReturn(Optional.of(mockCircuitBreaker));
9798
when(mockCircuitBreakerConfig.getExceptionBuilder())
9899
.thenReturn(
99100
reason ->
@@ -120,6 +121,8 @@ void testSendMessage_CircuitBreakerInHalfOpenState() {
120121
@Test
121122
void testWrapListenerWithCircuitBreaker_Success() {
122123
when(mockCircuitBreaker.tryAcquirePermission()).thenReturn(true);
124+
when(mockCircuitBreakerProvider.getDefaultCircuitBreaker())
125+
.thenReturn(Optional.of(mockCircuitBreaker));
123126
ResilienceCircuitBreakerInterceptor interceptor =
124127
new ResilienceCircuitBreakerInterceptor(
125128
mockCircuitBreakerConfig, fixedClock, mockCircuitBreakerProvider);
@@ -144,6 +147,8 @@ void testWrapListenerWithCircuitBreaker_Success() {
144147
@Test
145148
void testWrapListenerWithCircuitBreaker_Failure() {
146149
when(mockCircuitBreaker.tryAcquirePermission()).thenReturn(true);
150+
when(mockCircuitBreakerProvider.getDefaultCircuitBreaker())
151+
.thenReturn(Optional.of(mockCircuitBreaker));
147152
ResilienceCircuitBreakerInterceptor interceptor =
148153
new ResilienceCircuitBreakerInterceptor(
149154
mockCircuitBreakerConfig, fixedClock, mockCircuitBreakerProvider);

0 commit comments

Comments
 (0)