Skip to content

Commit 50b46db

Browse files
authored
FFM-5149 - Avoid using read timeout of 0 for SSE connections (#114)
What SSE client should avoid using a read timeout of 0 since if the socket is not gracefully shutdown we have no way to detect that the socket is dead. Instead enforce a default read time out of 30 mins to force the connection to refresh and remove any stale socket. Why This will allow the client to self heal in the event of a broken connection and avoid a situtation where the client gets out of sync with the FF server. Testing Manual testing, checked client was able to recover its connection even after the socket was broken on purpose.
1 parent c8f9b9f commit 50b46db

File tree

5 files changed

+18
-4
lines changed

5 files changed

+18
-4
lines changed

src/main/java/io/harness/cf/client/api/CfClient.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,13 @@
3131
import io.jsonwebtoken.Jwts;
3232
import io.vavr.CheckedRunnable;
3333
import java.util.List;
34+
import java.util.concurrent.TimeUnit;
3435
import java.util.stream.Collectors;
3536
import lombok.Getter;
3637
import lombok.Setter;
3738
import lombok.SneakyThrows;
3839
import lombok.extern.slf4j.Slf4j;
40+
import okhttp3.OkHttpClient;
3941
import okhttp3.Request;
4042
import org.apache.commons.collections4.CollectionUtils;
4143
import org.jetbrains.annotations.NotNull;
@@ -257,7 +259,13 @@ private void initStreamingMode() {
257259
}
258260

259261
void startSSE() {
260-
OkSse okSse = new OkSse();
262+
long readTimeoutInMins = Math.max(config.getSseReadTimeout(), 1);
263+
OkHttpClient sseClient =
264+
new OkHttpClient.Builder()
265+
.readTimeout(readTimeoutInMins, TimeUnit.MINUTES)
266+
.retryOnConnectionFailure(true)
267+
.build();
268+
OkSse okSse = new OkSse(sseClient);
261269
sse = okSse.newServerSentEvent(sseRequest, listener);
262270
}
263271

src/main/java/io/harness/cf/client/api/Config.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ public class Config {
5454
/** If metrics service POST call is taking > this time, we need to know about it */
5555
@Getter @Builder.Default long metricsServiceAcceptableDuration = 10000;
5656

57+
/** read timeout in minutes for SSE connections */
58+
@Getter @Builder.Default long sseReadTimeout = 30;
59+
5760
public int getFrequency() {
5861
return Math.max(frequency, Config.MIN_FREQUENCY);
5962
}

src/main/java/io/harness/cf/client/api/Poller.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public Poller(
4646
protected void runOneIteration() {
4747

4848
if (Thread.currentThread().isInterrupted()) {
49-
49+
log.warn("Polling thread interrupted, skipping iteration");
5050
return;
5151
}
5252
try {

src/main/java/io/harness/cf/client/api/SSEListener.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ private void processFeature(JsonObject jsonObject) {
8383
break;
8484
} else {
8585
log.error(
86-
format("Mismatched versions, payload version [%s] featureConfig version [%s]"),
86+
"Mismatched versions, payload version [{}] featureConfig version [{}]",
8787
version,
8888
featureConfig.getFeature());
8989
}
@@ -124,6 +124,8 @@ public boolean onRetryTime(ServerSentEvent serverSentEvent, long l) {
124124
@Override
125125
public boolean onRetryError(
126126
ServerSentEvent serverSentEvent, Throwable throwable, Response response) {
127+
log.warn("onRetryError got {}", throwable.getMessage());
128+
log.trace("onRetryError throwable is ", throwable);
127129
return false;
128130
}
129131

src/test/java/io/harness/cf/client/api/mock/MockedCfConfiguration.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ public MockedCfConfiguration() {
2929
30000,
3030
10000,
3131
true,
32-
10000);
32+
10000,
33+
30);
3334
}
3435

3536
@Override

0 commit comments

Comments
 (0)