Skip to content

Commit b219e0e

Browse files
authored
FFM-5251 - Merge FFM-5149 (flags out of sync) from 1.0.5.X to main (#117)
FFM-5251 - Merge FFM-5149 (flags out of sync) from 1.0.5.x to main ###What SSE client should avoid using a read timeout of 0 since if the socket is not gracefully shutdown we have no way to detect that the socket is dead. Instead enforce a default read time out > 0 to force the connection to refresh and remove any stale socket. This change integrates the original bug fix FFM-5149 from 1.0.5.X into main. Main has previsouly been refactored and diverged so a 1:1 merge is not possible so a re-implementation has been done. Additional changes: - okhttp bumped from 3.9.1 to 4.9.0 to match 1.0.5.X version - sseTimeout is now defined in HarnessConfig since Config was deprecated - Fixed an issue where Poller would not restart on SSE disconnect due to the starttime not being set - Added additional logging around socket events and ScheduledService states ###Why This will allow the client to self heal in the event of a broken connection and avoid a situtation where the client gets out of sync with the FF server. ###Testing Manual testing, checked client was able to recover its connection even after the socket was broken on purpose.
1 parent 9700ab4 commit b219e0e

File tree

7 files changed

+83
-10
lines changed

7 files changed

+83
-10
lines changed

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,12 @@
8181
<dependency>
8282
<groupId>com.squareup.okhttp3</groupId>
8383
<artifactId>okhttp</artifactId>
84-
<version>3.9.1</version>
84+
<version>4.9.0</version>
8585
</dependency>
8686
<dependency>
8787
<groupId>com.squareup.okhttp3</groupId>
8888
<artifactId>logging-interceptor</artifactId>
89-
<version>3.9.1</version>
89+
<version>4.9.0</version>
9090
</dependency>
9191
<dependency>
9292
<groupId>org.apache.maven</groupId>

src/main/java/io/harness/cf/client/api/InnerClient.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ protected void setUp(@NonNull final Connector connector, @NonNull final BaseConf
111111
metricsProcessor = new MetricsProcessor(this.connector, this.options, this);
112112
updateProcessor = new UpdateProcessor(this.connector, this.repository, this);
113113

114+
pollerStartedAt = new Date();
115+
114116
// start with authentication
115117
authService.startAsync();
116118
}
@@ -205,13 +207,15 @@ public synchronized void onMetricsFailure() {
205207

206208
@Override
207209
public void onConnected() {
210+
log.info("onConnected triggered");
208211
if (pollProcessor.state() == Service.State.RUNNING) {
209212
pollProcessor.stop();
210213
}
211214
}
212215

213216
@Override
214217
public void onDisconnected() {
218+
log.info("onDisconnected triggered, starting poller to get latest flags");
215219
// onDisconnected can be called multiple times from updater because of retries
216220
// and we cannot create many poller instances so we need to check if
217221
// on closing the client, state of the poller and when poller is last time started
@@ -227,32 +231,44 @@ public void onDisconnected() {
227231
new PollingProcessor(connector, repository, options.getPollIntervalInSeconds(), this);
228232
pollProcessor.start();
229233
pollerStartedAt = new Date();
234+
} else {
235+
log.warn(
236+
"Poller was not restarted [closing={} terminated={} pollStartTime+interval={} now={} ]",
237+
closing,
238+
pollProcessor.state() == Service.State.TERMINATED,
239+
instant,
240+
now);
230241
}
231242
}
232243

233244
@Override
234245
public void onReady() {
246+
log.info("onReady triggered");
235247
initialize(Processor.STREAM);
236248
}
237249

238250
@Override
239251
public void onError() {
252+
log.info("onError triggered");
240253
// when error happens on updater (stream)
241254
onDisconnected();
242255
}
243256

244257
@Override
245258
public synchronized void onFailure(@NonNull final String error) {
259+
log.info("onFailure triggered [error={}] ", error);
246260
failure = true;
247261
notifyAll();
248262
}
249263

250264
@Override
251265
public void update(@NonNull final Message message) {
266+
log.info("update triggered [event={}] ", message.getEvent());
252267
updateProcessor.update(message);
253268
}
254269

255270
public void update(@NonNull final Message message, final boolean manual) {
271+
log.info("update triggered [event={} manual={}] ", message.getEvent(), manual);
256272
if (options.isStreamEnabled() && manual) {
257273
log.warn(
258274
"You have run update method manually with the stream enabled. Please turn off the stream in this case.");

src/main/java/io/harness/cf/client/api/PollingProcessor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package io.harness.cf.client.api;
22

33
import com.google.common.util.concurrent.AbstractScheduledService;
4+
import com.google.common.util.concurrent.MoreExecutors;
5+
import io.harness.cf.client.common.ScheduledServiceStateLogger;
46
import io.harness.cf.client.connector.Connector;
57
import io.harness.cf.client.connector.ConnectorException;
68
import io.harness.cf.model.FeatureConfig;
@@ -29,6 +31,9 @@ public PollingProcessor(
2931
this.pollIntervalSeconds = pollIntervalSeconds;
3032
this.repository = repository;
3133
this.callback = callback;
34+
this.addListener(
35+
new ScheduledServiceStateLogger(PollingProcessor.class.getSimpleName()),
36+
MoreExecutors.directExecutor());
3237
}
3338

3439
public CompletableFuture<List<FeatureConfig>> retrieveFlags() {
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package io.harness.cf.client.common;
2+
3+
import com.google.common.util.concurrent.Service;
4+
import lombok.AllArgsConstructor;
5+
import lombok.extern.slf4j.Slf4j;
6+
7+
@Slf4j
8+
@AllArgsConstructor
9+
public class ScheduledServiceStateLogger extends Service.Listener {
10+
private final String name;
11+
12+
@Override
13+
public void starting() {
14+
log.info("{}: ScheduledService starting", name);
15+
}
16+
17+
@Override
18+
public void running() {
19+
log.info("{}: ScheduledService running", name);
20+
}
21+
22+
@Override
23+
public void stopping(Service.State from) {
24+
log.info("{}: ScheduledService stopping [from={}]", name, from);
25+
}
26+
27+
@Override
28+
public void terminated(Service.State from) {
29+
log.info("{}: ScheduledService terminated [from={}]", name, from);
30+
}
31+
32+
@Override
33+
public void failed(Service.State from, Throwable failure) {
34+
log.warn("{}: ScheduledService failed [from={} message={}]", name, from, failure.getMessage());
35+
log.warn(name + ": ScheduledService failed exception", failure);
36+
}
37+
}

src/main/java/io/harness/cf/client/connector/EventSource.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,19 +32,25 @@ public class EventSource implements ServerSentEvent.Listener, AutoCloseable, Ser
3232
LogUtil.setSystemProps();
3333
}
3434

35-
public EventSource(@NonNull String url, Map<String, String> headers, @NonNull Updater updater) {
35+
public EventSource(
36+
@NonNull String url,
37+
Map<String, String> headers,
38+
@NonNull Updater updater,
39+
long sseReadTimeoutMins) {
3640
this.updater = updater;
37-
okSse = new OkSse(makeStreamClient());
41+
okSse = new OkSse(makeStreamClient(sseReadTimeoutMins));
3842
builder = new Request.Builder().url(url);
3943
headers.put("User-Agent", "JavaSDK " + io.harness.cf.Version.VERSION);
4044
headers.forEach(builder::header);
4145
updater.onReady();
4246
log.info("EventSource initialized with url {} and headers {}", url, headers);
4347
}
4448

45-
protected OkHttpClient makeStreamClient() {
49+
protected OkHttpClient makeStreamClient(long sseReadTimeoutMins) {
4650
OkHttpClient.Builder httpClientBuilder =
47-
new OkHttpClient.Builder().readTimeout(0L, TimeUnit.SECONDS).retryOnConnectionFailure(true);
51+
new OkHttpClient.Builder()
52+
.readTimeout(sseReadTimeoutMins, TimeUnit.MINUTES)
53+
.retryOnConnectionFailure(true);
4854
if (log.isDebugEnabled()) {
4955
loggingInterceptor = new HttpLoggingInterceptor();
5056
loggingInterceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
@@ -71,6 +77,7 @@ protected OkHttpClient makeStreamClient() {
7177

7278
@Override
7379
public void onOpen(ServerSentEvent serverSentEvent, Response response) {
80+
log.info("EventSource onOpen");
7481
if (updater != null) {
7582
log.info("EventSource connected!");
7683
updater.onConnected();
@@ -79,7 +86,7 @@ public void onOpen(ServerSentEvent serverSentEvent, Response response) {
7986

8087
@Override
8188
public void onMessage(ServerSentEvent sse, String id, String event, String message) {
82-
log.info("EventSource message received {}", message);
89+
log.info("EventSource onMessage {}", message);
8390
Message msg = gson.fromJson(message, Message.class);
8491
updater.update(msg);
8592
}
@@ -98,7 +105,11 @@ public boolean onRetryTime(ServerSentEvent serverSentEvent, long l) {
98105
@Override
99106
public boolean onRetryError(
100107
ServerSentEvent serverSentEvent, Throwable throwable, Response response) {
101-
log.warn("EventSource onRetryError");
108+
log.warn(
109+
"EventSource onRetryError [throwable={} message={}]",
110+
throwable.getClass().getSimpleName(),
111+
throwable.getMessage());
112+
log.trace("onRetryError exception", throwable);
102113
updater.onError();
103114
if (response != null) {
104115
return response.code() == 429 || response.code() >= 500;
@@ -108,7 +119,7 @@ public boolean onRetryError(
108119

109120
@Override
110121
public void onClosed(ServerSentEvent serverSentEvent) {
111-
log.info("EventSource disconnected");
122+
log.info("EventSource onClosed - disconnected");
112123
updater.onDisconnected();
113124
}
114125

src/main/java/io/harness/cf/client/connector/HarnessConfig.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,7 @@ public class HarnessConfig {
2020
@Builder.Default int readTimeout = 30000;
2121
/** timeout in milliseconds for writing data to CF Server */
2222
@Builder.Default int writeTimeout = 10000;
23+
24+
/** read timeout in minutes for SSE connections */
25+
@Builder.Default long sseReadTimeout = 1;
2326
}

src/main/java/io/harness/cf/client/connector/HarnessConnector.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ public List<FeatureConfig> getFlags() throws ConnectorException {
221221
featureConfig.size(),
222222
this.environment,
223223
this.cluster);
224+
log.info("Got the following features: " + featureConfig);
224225
return featureConfig;
225226
} catch (ApiException e) {
226227
log.error(
@@ -355,7 +356,7 @@ public Service stream(@NonNull final Updater updater) {
355356
map.put("Authorization", "Bearer " + token);
356357
map.put("API-Key", apiKey);
357358
log.info("Initialize new EventSource instance");
358-
eventSource = new EventSource(sseUrl, map, updater);
359+
eventSource = new EventSource(sseUrl, map, updater, Math.max(options.getSseReadTimeout(), 1));
359360
return eventSource;
360361
}
361362

0 commit comments

Comments
 (0)