Skip to content

Commit 866809f

Browse files
authored
FFM-12087 New configuration options + close related bug fixes (#203)
1 parent c3429e1 commit 866809f

File tree

16 files changed

+331
-48
lines changed

16 files changed

+331
-48
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,14 +78,14 @@ Add the following Maven dependency in your project's pom.xml file:
7878
<dependency>
7979
<groupId>io.harness</groupId>
8080
<artifactId>ff-java-server-sdk</artifactId>
81-
<version>1.7.0</version>
81+
<version>1.8.0</version>
8282
</dependency>
8383
```
8484

8585
#### Gradle
8686

8787
```
88-
implementation 'io.harness:ff-java-server-sdk:1.7.0'
88+
implementation 'io.harness:ff-java-server-sdk:1.8.0'
8989
```
9090

9191
### Code Sample

examples/src/main/java/io/harness/ff/examples/ConfigExample.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ public static void main(String... args)
3434
HarnessConfig.builder()
3535
.configUrl("http://localhost:3000/api/1.0")
3636
.eventUrl("http://localhost:3000/api/1.0")
37+
.maxRequestRetry(20)
38+
.flushAnalyticsOnClose(true)
39+
.flushAnalyticsOnCloseTimeout(30000)
3740
.build());
3841
client = new CfClient(hc);
3942
client.waitForInitialization();

settings.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ dependencyResolutionManagement {
44
versionCatalogs {
55
libs {
66
// main sdk version
7-
version('sdk', '1.7.0');
7+
version('sdk', '1.8.0');
88

99
// sdk deps
1010
version('okhttp3', '4.12.0')

src/main/java/io/harness/cf/client/api/BaseConfig.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
@Data
1515
public class BaseConfig {
1616
public static final int MIN_FREQUENCY = 60;
17+
public static final long DEFAULT_REQUEST_RETRIES = 10;
1718

1819
@Builder.Default private final boolean streamEnabled = true;
1920
@Builder.Default private final int pollIntervalInSeconds = 60;
@@ -53,4 +54,33 @@ public int getFrequency() {
5354
@Builder.Default private final Cache cache = new CaffeineCache(10000);
5455

5556
private final Storage store;
57+
58+
/**
59+
* Defines the maximum number of retry attempts for certain types of requests:
60+
* authentication, polling, metrics, and reacting to stream events. If a request fails,
61+
* the SDK will retry up to this number of times before giving up.
62+
* <p>
63+
* - Authentication: Used for retrying authentication requests when the server is unreachable.
64+
* - Polling: Applies to requests that fetch feature flags and target groups periodically.
65+
* - Metrics: Applies to analytics requests for sending metrics data to the server.
66+
* - Reacting to Stream Events: Applies to requests triggered by streamed flag or group changes,
67+
* where the SDK needs to fetch updated flag or group data.
68+
* <p>
69+
* <p>
70+
* The default value is {@code 10}.
71+
* <p>
72+
* <b>Note:</b> This setting does not apply to streaming requests (either the initial connection or
73+
* reconnecting after a disconnection). Streaming requests will always retry indefinitely
74+
* (infinite retries).
75+
* <p>
76+
* Example usage:
77+
* <pre>
78+
* {@code
79+
* BaseConfig config = BaseConfig.builder()
80+
* .maxRequestRetry(20)
81+
* .build();
82+
* }
83+
* </pre>
84+
*/
85+
@Builder.Default private final long maxRequestRetry = DEFAULT_REQUEST_RETRIES;
5686
}

src/main/java/io/harness/cf/client/api/InnerClient.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ protected void setUp(@NonNull final Connector connector, @NonNull final BaseConf
8787
log.info("Starting SDK client with configuration: {}", this.options);
8888
this.connector = connector;
8989
this.connector.setOnUnauthorized(this::onUnauthorized);
90-
9190
// initialization
9291
repository =
9392
new StorageRepository(
@@ -96,7 +95,9 @@ protected void setUp(@NonNull final Connector connector, @NonNull final BaseConf
9695
authService = new AuthService(this.connector, options.getPollIntervalInSeconds(), this);
9796
pollProcessor =
9897
new PollingProcessor(this.connector, repository, options.getPollIntervalInSeconds(), this);
99-
metricsProcessor = new MetricsProcessor(this.connector, this.options, this);
98+
metricsProcessor =
99+
new MetricsProcessor(
100+
this.connector, this.options, this, connector.getShouldFlushAnalyticsOnClose());
100101
updateProcessor = new UpdateProcessor(this.connector, this.repository, this);
101102

102103
// start with authentication
@@ -228,7 +229,9 @@ public void onDisconnected(String reason) {
228229
closing,
229230
options.getPollIntervalInSeconds());
230231
log.debug("SSE disconnect detected - asking poller to refresh flags");
231-
pollProcessor.retrieveAll();
232+
if (!closing) {
233+
pollProcessor.retrieveAll();
234+
}
232235
}
233236
}
234237

@@ -388,6 +391,12 @@ public void processEvaluation(
388391
public void close() {
389392
log.info("Closing the client");
390393
closing = true;
394+
395+
// Mark the connector as shutting down to stop request retries from taking place. The
396+
// connections will eventually
397+
// be evicted when the connector is closed, but this ensures that if metrics are flushed when
398+
// closed then it won't attempt to retry if the first request fails.
399+
connector.setIsShuttingDown();
391400
off();
392401
authService.close();
393402
repository.close();

src/main/java/io/harness/cf/client/api/MetricsProcessor.java

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import static io.harness.cf.client.common.Utils.shutdownExecutorService;
55
import static java.util.concurrent.TimeUnit.SECONDS;
66

7+
import io.harness.cf.Version;
78
import io.harness.cf.client.common.SdkCodes;
89
import io.harness.cf.client.common.StringUtils;
910
import io.harness.cf.client.connector.Connector;
@@ -15,10 +16,7 @@
1516
import io.harness.cf.model.TargetData;
1617
import io.harness.cf.model.Variation;
1718
import java.util.*;
18-
import java.util.concurrent.ConcurrentHashMap;
19-
import java.util.concurrent.Executors;
20-
import java.util.concurrent.ScheduledExecutorService;
21-
import java.util.concurrent.ScheduledFuture;
19+
import java.util.concurrent.*;
2220
import java.util.concurrent.atomic.LongAdder;
2321
import lombok.NonNull;
2422
import lombok.extern.slf4j.Slf4j;
@@ -113,13 +111,19 @@ public boolean containsKey(K key) {
113111
private final LongAdder metricsSent = new LongAdder();
114112
private final int maxFreqMapSize;
115113

114+
private final boolean shouldFlushMetricsOnClose;
115+
116116
public MetricsProcessor(
117-
@NonNull Connector connector, @NonNull BaseConfig config, @NonNull MetricsCallback callback) {
117+
@NonNull Connector connector,
118+
@NonNull BaseConfig config,
119+
@NonNull MetricsCallback callback,
120+
boolean shouldFlushMetricsOnClose) {
118121
this.connector = connector;
119122
this.config = config;
120123
this.frequencyMap = new FrequencyMap<>();
121124
this.targetsSeen = ConcurrentHashMap.newKeySet();
122125
this.maxFreqMapSize = clamp(config.getBufferSize(), 2048, MAX_FREQ_MAP_TO_RETAIN);
126+
this.shouldFlushMetricsOnClose = shouldFlushMetricsOnClose;
123127
callback.onMetricsReady();
124128
}
125129

@@ -218,7 +222,7 @@ protected Metrics prepareSummaryMetricsBody(Map<MetricEvent, Long> data, Set<Tar
218222
new KeyValue(TARGET_ATTRIBUTE, summary.getTargetIdentifier()),
219223
new KeyValue(SDK_TYPE, SERVER),
220224
new KeyValue(SDK_LANGUAGE, "java"),
221-
new KeyValue(SDK_VERSION, io.harness.cf.Version.VERSION)));
225+
new KeyValue(SDK_VERSION, Version.VERSION)));
222226
if (metrics.getMetricsData() != null) {
223227
metrics.getMetricsData().add(metricsData);
224228
}
@@ -305,6 +309,10 @@ public void start() {
305309
}
306310

307311
public void stop() {
312+
if (shouldFlushMetricsOnClose && config.isAnalyticsEnabled()) {
313+
flushQueue();
314+
}
315+
308316
log.debug("Stopping MetricsProcessor");
309317
if (scheduler.isShutdown()) {
310318
return;
@@ -324,7 +332,13 @@ public void close() {
324332
shutdownExecutorService(
325333
scheduler,
326334
SdkCodes::infoMetricsThreadExited,
327-
errMsg -> log.warn("failed to stop metrics scheduler: {}", errMsg));
335+
errMsg -> {
336+
if (shouldFlushMetricsOnClose) {
337+
log.warn("Waited for flush to finish {}", errMsg);
338+
} else {
339+
log.warn("Failed to stop metrics scheduler: {}", errMsg);
340+
}
341+
});
328342

329343
log.debug("Closing MetricsProcessor");
330344
}

src/main/java/io/harness/cf/client/connector/Connector.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,8 @@ public interface Connector {
2929
Service stream(Updater updater) throws ConnectorException;
3030

3131
void close();
32+
33+
boolean getShouldFlushAnalyticsOnClose();
34+
35+
void setIsShuttingDown();
3236
}

src/main/java/io/harness/cf/client/connector/EventSource.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import java.util.Map;
1616
import java.util.UUID;
1717
import java.util.concurrent.TimeUnit;
18+
import java.util.concurrent.atomic.AtomicBoolean;
1819
import javax.net.ssl.*;
1920
import lombok.NonNull;
2021
import lombok.extern.slf4j.Slf4j;
@@ -37,6 +38,7 @@ public class EventSource implements Callback, AutoCloseable, Service {
3738
private final Map<String, String> headers;
3839
private final long sseReadTimeoutMins;
3940
private final List<X509Certificate> trustedCAs;
41+
private final AtomicBoolean isShuttingDown;
4042

4143
static {
4244
LogUtil.setSystemProps();
@@ -48,7 +50,7 @@ public EventSource(
4850
@NonNull Updater updater,
4951
long sseReadTimeoutMins)
5052
throws ConnectorException {
51-
this(url, headers, updater, sseReadTimeoutMins, 2_000, null);
53+
this(url, headers, updater, sseReadTimeoutMins, 2_000, null, new AtomicBoolean(false));
5254
}
5355

5456
EventSource(
@@ -57,14 +59,16 @@ public EventSource(
5759
@NonNull Updater updater,
5860
long sseReadTimeoutMins,
5961
int retryBackoffDelay,
60-
List<X509Certificate> trustedCAs) {
62+
List<X509Certificate> trustedCAs,
63+
AtomicBoolean isShuttingDown) {
6164
this.url = url;
6265
this.headers = headers;
6366
this.updater = updater;
6467
this.sseReadTimeoutMins = sseReadTimeoutMins;
6568
this.retryBackoffDelay = retryBackoffDelay;
6669
this.trustedCAs = trustedCAs;
6770
this.loggingInterceptor = new HttpLoggingInterceptor();
71+
this.isShuttingDown = isShuttingDown;
6872
}
6973

7074
protected OkHttpClient makeStreamClient(long sseReadTimeoutMins, List<X509Certificate> trustedCAs)
@@ -83,7 +87,8 @@ protected OkHttpClient makeStreamClient(long sseReadTimeoutMins, List<X509Certif
8387
httpClientBuilder.interceptors().remove(loggingInterceptor);
8488
}
8589

86-
httpClientBuilder.addInterceptor(new NewRetryInterceptor(retryBackoffDelay));
90+
httpClientBuilder.addInterceptor(
91+
new NewRetryInterceptor(retryBackoffDelay, true, isShuttingDown));
8792
return httpClientBuilder.build();
8893
}
8994

@@ -149,6 +154,7 @@ public void stop() {
149154
public void close() {
150155
stop();
151156
if (this.streamClient != null) {
157+
this.streamClient.dispatcher().executorService().shutdown();
152158
this.streamClient.connectionPool().evictAll();
153159
}
154160
log.debug("EventSource closed");

src/main/java/io/harness/cf/client/connector/HarnessConfig.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,73 @@ public class HarnessConfig {
3535
* should include intermediate CAs too to allow the HTTP client to build a full trust chain.
3636
*/
3737
@Builder.Default List<X509Certificate> tlsTrustedCAs = null;
38+
39+
/**
40+
* Defines the maximum number of retry attempts for certain types of requests:
41+
* authentication, polling, metrics, and reacting to stream events. If a request fails,
42+
* the SDK will retry up to this number of times before giving up.
43+
* <p>
44+
* - Authentication: Used for retrying authentication requests when the server is unreachable.
45+
* - Polling: Applies to requests that fetch feature flags and target groups periodically.
46+
* - Metrics: Applies to analytics requests for sending metrics data to the server.
47+
* - Reacting to Stream Events: Applies to requests triggered by streamed flag or group changes,
48+
* where the SDK needs to fetch updated flag or group data.
49+
* <p>
50+
* Note: This setting does not apply to streaming requests (either the initial connection or
51+
* reconnecting after a disconnection). Streaming requests will always retry indefinitely
52+
* (infinite retries).
53+
*/
54+
@Builder.Default private long maxRequestRetry = 10;
55+
56+
/**
57+
* Indicates whether to flush analytics data when the SDK is closed.
58+
* <p>
59+
* When set to {@code true}, any remaining analytics data (such as metrics)
60+
* will be sent to the server before the SDK is fully closed. If {@code false},
61+
* the data will not be flushed, and any unsent analytics data may be lost.
62+
* <p>
63+
* The default value is {@code false}.
64+
* <p>
65+
* <b>Note:</b> The flush will attempt to send the data in a single request.
66+
* Any failures during this process will not be retried, and the analytics data
67+
* may be lost.
68+
*
69+
* <p>Example usage:
70+
* <pre>
71+
* {@code
72+
* HarnessConfig harnessConfig = HarnessConfig.builder()
73+
* .flushAnalyticsOnClose(true)
74+
* .build();
75+
* }
76+
* </pre>
77+
*/
78+
@Builder.Default private final boolean flushAnalyticsOnClose = false;
79+
80+
/**
81+
* The timeout for flushing analytics on SDK close.
82+
* <p>
83+
* This option sets the maximum duration, in milliseconds, the SDK will wait for the
84+
* analytics data to be flushed after the SDK has been closed. If the flush process takes longer
85+
* than this timeout, the request will be canceled, and any remaining data will
86+
* not be sent. This ensures that the SDK does not hang indefinitely during shutdown.
87+
* <p>
88+
* The default value is {@code 30000ms} which is the default read timeout for requests made by the SDK
89+
* <p>
90+
* <b>Note:</b> This timeout only applies to the flush process that happens when
91+
* {@code flushAnalyticsOnClose} is set to {@code true}. It does not affect other
92+
* requests made by the SDK during normal operation.
93+
*
94+
* <p>Example usage:
95+
* <pre>
96+
* {@code
97+
*
98+
* HarnessConfig harnessConfig = HarnessConfig.builder()
99+
* .flushAnalyticsOnClose(true)
100+
* // Timeout the analytics flush request in 3000ms (3 seconds)
101+
* .flushAnalyticsOnCloseTimeout(3000).build();
102+
* .build();
103+
* }
104+
* </pre>
105+
*/
106+
@Builder.Default private final int flushAnalyticsOnCloseTimeout = 30000;
38107
}

0 commit comments

Comments
 (0)