Skip to content

Commit faa0e02

Browse files
Refactored classes and added test cases
1 parent bc60d8e commit faa0e02

14 files changed

+559
-306
lines changed

grpc-circuitbreaker-utils/build.gradle.kts

-12
Original file line numberDiff line numberDiff line change
@@ -8,30 +8,18 @@ plugins {
88
dependencies {
99

1010
api(platform("io.grpc:grpc-bom:1.68.3"))
11-
api("io.grpc:grpc-context")
1211
api("io.grpc:grpc-api")
13-
api("io.grpc:grpc-inprocess")
14-
api(platform("io.netty:netty-bom:4.1.118.Final"))
15-
constraints {
16-
api("com.google.protobuf:protobuf-java:3.25.5") {
17-
because("https://nvd.nist.gov/vuln/detail/CVE-2024-7254")
18-
}
19-
}
2012

21-
implementation(project(":grpc-context-utils"))
2213
implementation("org.slf4j:slf4j-api:1.7.36")
23-
implementation("io.grpc:grpc-core")
2414
implementation("io.github.resilience4j:resilience4j-circuitbreaker:1.7.1")
2515
implementation("com.typesafe:config:1.4.2")
2616
implementation("com.google.inject:guice:7.0.0")
27-
implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.87")
2817

2918
annotationProcessor("org.projectlombok:lombok:1.18.24")
3019
compileOnly("org.projectlombok:lombok:1.18.24")
3120

3221
testImplementation("org.junit.jupiter:junit-jupiter:5.8.2")
3322
testImplementation("org.mockito:mockito-core:5.8.0")
34-
testRuntimeOnly("io.grpc:grpc-netty")
3523
}
3624

3725
tasks.test {
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,13 @@
11
package org.hypertrace.circuitbreaker.grpcutils;
22

33
import com.typesafe.config.Config;
4-
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
54
import java.util.Map;
6-
import java.util.concurrent.ConcurrentHashMap;
75
import java.util.stream.Collectors;
86
import lombok.extern.slf4j.Slf4j;
97

108
@Slf4j
119
public class CircuitBreakerConfigProvider {
1210

13-
public static final String CIRCUIT_BREAKER_CONFIG = "circuit.breaker.config";
1411
public static final String DEFAULT_CONFIG_KEY = "default";
1512

1613
// Whether to enable circuit breaker or not.
@@ -34,66 +31,42 @@ public class CircuitBreakerConfigProvider {
3431
"permittedNumberOfCallsInHalfOpenState";
3532
private static final String SLIDING_WINDOW_TYPE = "slidingWindowType";
3633

37-
// Cache for storing CircuitBreakerConfig instances
38-
private static final ConcurrentHashMap<String, CircuitBreakerConfig> configCache =
39-
new ConcurrentHashMap<>();
40-
4134
// Global flag for circuit breaker enablement
4235
private boolean circuitBreakerEnabled = false;
36+
private Map<String, CircuitBreakerConfiguration> circuitBreakerConfigurationMap;
4337

4438
public CircuitBreakerConfigProvider(Config config) {
45-
initialize(config);
39+
this.initialize(config);
4640
}
4741

48-
public CircuitBreakerConfigProvider() {}
49-
50-
/** Initializes and caches all CircuitBreaker configurations. */
51-
public void initialize(Config config) {
52-
if (!config.hasPath(CIRCUIT_BREAKER_CONFIG)) {
53-
log.warn("No circuit breaker configurations found in the config file.");
54-
return;
55-
}
56-
57-
Config circuitBreakerConfig = config.getConfig(CIRCUIT_BREAKER_CONFIG);
42+
/** Checks if Circuit Breaker is globally enabled. */
43+
public boolean isCircuitBreakerEnabled() {
44+
return circuitBreakerEnabled;
45+
}
5846

59-
// Read global enabled flag (default to false if not provided)
47+
private void initialize(Config circuitBreakerConfig) {
6048
circuitBreakerEnabled =
6149
circuitBreakerConfig.hasPath(ENABLED) && circuitBreakerConfig.getBoolean(ENABLED);
62-
63-
// Load all circuit breaker configurations and cache them
64-
Map<String, CircuitBreakerConfig> allConfigs =
50+
this.circuitBreakerConfigurationMap =
6551
circuitBreakerConfig.root().keySet().stream()
6652
.filter(key -> !key.equals(ENABLED)) // Ignore the global enabled flag
6753
.collect(
6854
Collectors.toMap(
6955
key -> key, // Circuit breaker key
7056
key -> createCircuitBreakerConfig(circuitBreakerConfig.getConfig(key))));
71-
72-
// Store in cache
73-
configCache.putAll(allConfigs);
74-
7557
log.info(
7658
"Loaded {} circuit breaker configurations, Global Enabled: {}. Configs: {}",
77-
allConfigs.size(),
59+
circuitBreakerConfigurationMap.size(),
7860
circuitBreakerEnabled,
79-
allConfigs);
80-
}
81-
82-
/**
83-
* Retrieves the CircuitBreakerConfig for a specific key. Falls back to default if key-specific
84-
* config is not found.
85-
*/
86-
public CircuitBreakerConfig getConfig(String circuitBreakerKey) {
87-
return configCache.getOrDefault(circuitBreakerKey, configCache.get(DEFAULT_CONFIG_KEY));
61+
circuitBreakerConfigurationMap);
8862
}
8963

90-
/** Checks if Circuit Breaker is globally enabled. */
91-
public boolean isCircuitBreakerEnabled() {
92-
return circuitBreakerEnabled;
64+
public Map<String, CircuitBreakerConfiguration> getConfigMap() {
65+
return circuitBreakerConfigurationMap;
9366
}
9467

95-
private CircuitBreakerConfig createCircuitBreakerConfig(Config config) {
96-
return CircuitBreakerConfig.custom()
68+
private CircuitBreakerConfiguration createCircuitBreakerConfig(Config config) {
69+
return CircuitBreakerConfiguration.builder()
9770
.failureRateThreshold((float) config.getDouble(FAILURE_RATE_THRESHOLD))
9871
.slowCallRateThreshold((float) config.getDouble(SLOW_CALL_RATE_THRESHOLD))
9972
.slowCallDurationThreshold(config.getDuration(SLOW_CALL_DURATION_THRESHOLD))
@@ -106,7 +79,8 @@ private CircuitBreakerConfig createCircuitBreakerConfig(Config config) {
10679
.build();
10780
}
10881

109-
private CircuitBreakerConfig.SlidingWindowType getSlidingWindowType(String slidingWindowType) {
110-
return CircuitBreakerConfig.SlidingWindowType.valueOf(slidingWindowType);
82+
private CircuitBreakerConfiguration.SlidingWindowType getSlidingWindowType(
83+
String slidingWindowType) {
84+
return CircuitBreakerConfiguration.SlidingWindowType.valueOf(slidingWindowType);
11185
}
11286
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package org.hypertrace.circuitbreaker.grpcutils;
2+
3+
import java.time.Duration;
4+
import lombok.Builder;
5+
import lombok.Setter;
6+
import lombok.Value;
7+
8+
@Value
9+
@Builder
10+
@Setter
11+
public class CircuitBreakerConfiguration {
12+
// Percentage of failures to trigger OPEN state
13+
float failureRateThreshold;
14+
// Percentage of slow calls to trigger OPEN state
15+
float slowCallRateThreshold;
16+
// Define what a "slow" call is
17+
Duration slowCallDurationThreshold;
18+
// Number of calls to consider in the sliding window
19+
SlidingWindowType slidingWindowType;
20+
int slidingWindowSize;
21+
// Time before retrying after OPEN state
22+
Duration waitDurationInOpenState;
23+
// Minimum calls before evaluating failure rate
24+
int minimumNumberOfCalls;
25+
// Calls allowed in HALF_OPEN state before deciding to
26+
// CLOSE or OPEN again
27+
int permittedNumberOfCallsInHalfOpenState;
28+
29+
public enum SlidingWindowType {
30+
COUNT_BASED,
31+
TIME_BASED
32+
}
33+
}

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerEventListener.java

-38
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -1,130 +1,23 @@
11
package org.hypertrace.circuitbreaker.grpcutils;
22

3-
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
4-
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
53
import io.grpc.CallOptions;
64
import io.grpc.Channel;
75
import io.grpc.ClientCall;
86
import io.grpc.ClientInterceptor;
9-
import io.grpc.ForwardingClientCall;
10-
import io.grpc.ForwardingClientCallListener;
11-
import io.grpc.Metadata;
127
import io.grpc.MethodDescriptor;
13-
import io.grpc.Status;
14-
import java.util.concurrent.TimeUnit;
15-
import lombok.extern.slf4j.Slf4j;
168

17-
@Slf4j
18-
public class CircuitBreakerInterceptor implements ClientInterceptor {
19-
20-
public static final CallOptions.Key<String> CIRCUIT_BREAKER_KEY =
21-
CallOptions.Key.createWithDefault("circuitBreakerKey", "default");
22-
private final CircuitBreakerRegistry circuitBreakerRegistry;
23-
private final CircuitBreakerConfigProvider circuitBreakerConfigProvider;
24-
private final CircuitBreakerMetricsNotifier circuitBreakerMetricsNotifier;
25-
26-
public CircuitBreakerInterceptor(
27-
CircuitBreakerRegistry circuitBreakerRegistry,
28-
CircuitBreakerConfigProvider circuitBreakerConfigProvider,
29-
CircuitBreakerMetricsNotifier circuitBreakerMetricsNotifier) {
30-
this.circuitBreakerRegistry = circuitBreakerRegistry;
31-
this.circuitBreakerConfigProvider = circuitBreakerConfigProvider;
32-
this.circuitBreakerMetricsNotifier = circuitBreakerMetricsNotifier;
33-
}
34-
35-
// Intercepts the call and applies circuit breaker logic
9+
public abstract class CircuitBreakerInterceptor implements ClientInterceptor {
3610
@Override
3711
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
3812
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
39-
if (!circuitBreakerConfigProvider.isCircuitBreakerEnabled()) {
13+
if (!isCircuitBreakerEnabled()) {
4014
return next.newCall(method, callOptions);
4115
}
42-
43-
// Get circuit breaker key from CallOptions
44-
String circuitBreakerKey = callOptions.getOption(CIRCUIT_BREAKER_KEY);
45-
CircuitBreaker circuitBreaker = getCircuitBreaker(circuitBreakerKey);
46-
return new ForwardingClientCall.SimpleForwardingClientCall<>(
47-
next.newCall(method, callOptions)) {
48-
@Override
49-
public void start(Listener<RespT> responseListener, Metadata headers) {
50-
long startTime = System.nanoTime();
51-
52-
// Wrap response listener to track failures
53-
Listener<RespT> wrappedListener =
54-
new ForwardingClientCallListener.SimpleForwardingClientCallListener<>(
55-
responseListener) {
56-
@Override
57-
public void onClose(Status status, Metadata trailers) {
58-
long duration = System.nanoTime() - startTime;
59-
if (status.isOk()) {
60-
circuitBreaker.onSuccess(duration, TimeUnit.NANOSECONDS);
61-
} else {
62-
log.debug(
63-
"Circuit Breaker '{}' detected failure. Status: {}, Description: {}",
64-
circuitBreaker.getName(),
65-
status.getCode(),
66-
status.getDescription());
67-
circuitBreaker.onError(
68-
duration, TimeUnit.NANOSECONDS, status.asRuntimeException());
69-
}
70-
super.onClose(status, trailers);
71-
}
72-
};
73-
74-
super.start(wrappedListener, headers);
75-
}
76-
77-
@Override
78-
public void sendMessage(ReqT message) {
79-
if (!circuitBreaker.tryAcquirePermission()) {
80-
handleCircuitBreakerRejection(circuitBreakerKey, circuitBreaker);
81-
String rejectionReason =
82-
circuitBreaker.getState() == CircuitBreaker.State.HALF_OPEN
83-
? "Circuit Breaker is HALF-OPEN and rejecting excess requests"
84-
: "Circuit Breaker is OPEN and blocking requests";
85-
throw Status.UNAVAILABLE.withDescription(rejectionReason).asRuntimeException();
86-
}
87-
super.sendMessage(message);
88-
}
89-
};
90-
}
91-
92-
private void handleCircuitBreakerRejection(
93-
String circuitBreakerKey, CircuitBreaker circuitBreaker) {
94-
String tenantId = getTenantId(circuitBreakerKey);
95-
if (circuitBreaker.getState() == CircuitBreaker.State.HALF_OPEN) {
96-
circuitBreakerMetricsNotifier.incrementCount(tenantId, "circuitbreaker.halfopen.rejected");
97-
log.debug(
98-
"Circuit Breaker '{}' is HALF-OPEN and rejecting excess requests for tenant '{}'.",
99-
circuitBreakerKey,
100-
tenantId);
101-
} else if (circuitBreaker.getState() == CircuitBreaker.State.OPEN) {
102-
circuitBreakerMetricsNotifier.incrementCount(tenantId, "circuitbreaker.open.blocked");
103-
log.debug(
104-
"Circuit Breaker '{}' is OPEN. Blocking request for tenant '{}'.",
105-
circuitBreakerKey,
106-
tenantId);
107-
} else {
108-
log.debug( // Added unexpected state handling for safety
109-
"Unexpected Circuit Breaker state '{}' for '{}'. Blocking request.",
110-
circuitBreaker.getState(),
111-
circuitBreakerKey);
112-
}
16+
return createInterceptedCall(method, callOptions, next);
11317
}
11418

115-
private static String getTenantId(String circuitBreakerKey) {
116-
if (!circuitBreakerKey.contains(".")) {
117-
return "Unknown";
118-
}
119-
return circuitBreakerKey.split("\\.", 2)[0]; // Ensures only the first split
120-
}
19+
protected abstract boolean isCircuitBreakerEnabled();
12120

122-
/** Retrieve the Circuit Breaker based on the key. */
123-
private CircuitBreaker getCircuitBreaker(String circuitBreakerKey) {
124-
CircuitBreaker circuitBreaker =
125-
circuitBreakerRegistry.circuitBreaker(
126-
circuitBreakerKey, circuitBreakerConfigProvider.getConfig(circuitBreakerKey));
127-
CircuitBreakerEventListener.attachListeners(circuitBreaker);
128-
return circuitBreaker;
129-
}
21+
protected abstract <ReqT, RespT> ClientCall<ReqT, RespT> createInterceptedCall(
22+
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next);
13023
}

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerMetricsNotifier.java

-33
This file was deleted.

0 commit comments

Comments
 (0)