Skip to content

Commit 7fd468d

Browse files
authored
[FFM-10760] - Use a single ExecutorService for UpdateProcessor (#182)
* [FFM-10760] - Use a single ExecutorService for UpdateProcessor What - Change UpdateProcessor to use a single thread pool for the life time of the service - Updated `processFlag()` and `processSegment()` to catch `Throwable` to prevent executor threads dying - Drop some verbose logs to trace to make debug level more readable Why We are getting reports in the field that the following error message is being logged `Update processor is terminating/restarting. Update skipped: <EVENT JSON>` Looking at the code this can only happen if the SSE stream drops and we attempt to call `restart()` while events are still arriving from the old stream. We should modify the code to avoid creating/destroying the thread pool on each disconnect and instead use a single thread pool for the lifetime of the UpdateProcessor which is only destroyed when `close()` is called. This will allow the removal of the following defensive check and will help avoid dropping SSE messages that did make it before the disconnect: ``` if (executor.isShutdown() || executor.isTerminated()) { log.warn("Update processor is terminating/restarting. Update skipped: {}", message); return; } ```
1 parent 2d3606a commit 7fd468d

File tree

6 files changed

+31
-33
lines changed

6 files changed

+31
-33
lines changed

examples/src/main/java/io/harness/ff/examples/GettingStarted.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,18 @@
99

1010
public class GettingStarted {
1111
// API Key - set this as an env variable
12-
private static String apiKey = getEnvOrDefault("FF_API_KEY", "");
12+
private static final String apiKey = getEnvOrDefault("FF_API_KEY", "");
1313

1414
// Flag Identifier
15-
private static String flagName = getEnvOrDefault("FF_FLAG_NAME", "harnessappdemodarkmode");
15+
private static final String flagName = getEnvOrDefault("FF_FLAG_NAME", "harnessappdemodarkmode");
1616

1717
private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
1818

1919
public static void main(String[] args) {
2020
System.out.println("Harness SDK Getting Started");
2121

22-
try {
23-
//Create a Feature Flag Client
24-
CfClient cfClient = new CfClient(apiKey, BaseConfig.builder().build());
22+
//Create a Feature Flag Client
23+
try (CfClient cfClient = new CfClient(apiKey, BaseConfig.builder().build())) {
2524
cfClient.waitForInitialization();
2625

2726
// Create a target (different targets can get different results based on rules. This includes a custom attribute 'location')
@@ -46,7 +45,6 @@ public static void main(String[] args) {
4645
// Close the SDK
4746
System.out.println("Cleaning up...");
4847
scheduler.shutdownNow();
49-
cfClient.close();
5048

5149
} catch (Exception e) {
5250
e.printStackTrace();

src/main/java/io/harness/cf/client/api/CaffeineCache.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,37 +20,37 @@ public class CaffeineCache implements Cache {
2020

2121
public CaffeineCache(int size) {
2222
cache = Caffeine.newBuilder().maximumSize(size).build();
23-
log.debug("CaffeineCache initialized with size {}", size);
23+
log.trace("CaffeineCache initialized with size {}", size);
2424
}
2525

2626
@Override
2727
public void set(@NonNull String key, @NonNull Object value) {
2828
cache.put(key, value);
29-
log.debug("New value in the cache with key {} and value {}", key, value);
29+
log.trace("New value in the cache with key '{}' and value '{}'", key, value);
3030
}
3131

3232
@Override
3333
@Nullable
3434
public Object get(@NonNull String key) {
3535
Object value = cache.getIfPresent(key);
3636
if (value != null) {
37-
log.debug("Key {} found in cache with value {}", key, value);
37+
log.trace("Key '{}' found in cache with value '{}'", key, value);
3838
} else {
39-
log.debug("Key {} not found in cache", key);
39+
log.trace("Key '{}' not found in cache", key);
4040
}
4141
return value;
4242
}
4343

4444
@Override
4545
public void delete(@NonNull String key) {
4646
cache.invalidate(key);
47-
log.debug("Key {} removed from cache", key);
47+
log.trace("Key {} removed from cache", key);
4848
}
4949

5050
@Override
5151
public List<String> keys() {
5252
List<String> keys = new ArrayList<>(cache.asMap().keySet());
53-
log.debug("Keys in cache {}", keys);
53+
log.trace("Keys in cache {}", keys);
5454
return keys;
5555
}
5656
}

src/main/java/io/harness/cf/client/api/Evaluator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ protected Optional<Variation> findVariation(
6060
}
6161
Optional<Variation> variation =
6262
variations.stream().filter(v -> v.getIdentifier().equals(identifier)).findFirst();
63-
log.debug("Variation {} found in variations {}", identifier, variations);
63+
log.trace("Variation {} found in variations {}", identifier, variations);
6464
return variation;
6565
}
6666

src/main/java/io/harness/cf/client/api/UpdateProcessor.java

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import io.harness.cf.model.Segment;
1010
import java.util.concurrent.ExecutorService;
1111
import java.util.concurrent.Executors;
12-
import java.util.concurrent.TimeUnit;
1312
import lombok.NonNull;
1413
import lombok.extern.slf4j.Slf4j;
1514

@@ -18,7 +17,7 @@ class UpdateProcessor implements AutoCloseable {
1817
private final Connector connector;
1918
private final Repository repository;
2019
private final Updater updater;
21-
private final ExecutorService executor = Executors.newFixedThreadPool(100);
20+
private final ExecutorService executor = Executors.newFixedThreadPool(10);
2221

2322
private Service stream;
2423

@@ -42,6 +41,9 @@ public void start() {
4241
}
4342

4443
try {
44+
if (stream != null) {
45+
stream.close();
46+
}
4547
stream = connector.stream(this.updater);
4648
stream.start();
4749
running = true;
@@ -65,24 +67,14 @@ public void stop() {
6567
stream.stop();
6668
running = false;
6769
}
68-
executor.shutdown();
69-
boolean result = executor.awaitTermination(3, TimeUnit.SECONDS);
70-
if (result) {
71-
log.debug("All tasks done");
72-
} else {
73-
log.warn("UpdateProcessor: timeout while wait threads to finish!");
74-
}
70+
7571
} catch (InterruptedException e) {
7672
log.error("Exception was raised when stopping update tasks", e);
7773
Thread.currentThread().interrupt();
7874
}
7975
}
8076

8177
public void update(@NonNull final Message message) {
82-
if (executor.isShutdown() || executor.isTerminated()) {
83-
log.warn("Update processor is terminating/restarting. Update skipped: {}", message);
84-
return;
85-
}
8678

8779
if (message.getDomain().equals("flag")) {
8880
log.debug("execute processFlag with message {}", message);
@@ -103,13 +95,13 @@ protected Runnable processFlag(@NonNull final Message message) {
10395
final FeatureConfig config = connector.getFlag(message.getIdentifier());
10496
if (config != null) {
10597
repository.setFlag(message.getIdentifier(), config);
106-
log.debug("Set new segment with key {} and value {}", message.getIdentifier(), config);
98+
log.trace("Set new segment with key {} and value {}", message.getIdentifier(), config);
10799
}
108100
} else if (message.getEvent().equals("delete")) {
109101
log.debug("Delete flag with key {}", message.getIdentifier());
110102
repository.deleteFlag(message.getIdentifier());
111103
}
112-
} catch (ConnectorException e) {
104+
} catch (Throwable e) {
113105
log.error(
114106
"Exception was raised when fetching flag '{}' with the message {}",
115107
message.getIdentifier(),
@@ -124,14 +116,14 @@ protected Runnable processSegment(@NonNull final Message message) {
124116
if (message.getEvent().equals("create") || message.getEvent().equals("patch")) {
125117
final Segment segment = connector.getSegment(message.getIdentifier());
126118
if (segment != null) {
127-
log.debug("Set new segment with key {} and value {}", message.getIdentifier(), segment);
119+
log.trace("Set new segment with key {} and value {}", message.getIdentifier(), segment);
128120
repository.setSegment(message.getIdentifier(), segment);
129121
}
130122
} else if (message.getEvent().equals("delete")) {
131123
log.debug("Delete segment with key {}", message.getIdentifier());
132124
repository.deleteSegment(message.getIdentifier());
133125
}
134-
} catch (ConnectorException e) {
126+
} catch (Throwable e) {
135127
log.error(
136128
"Exception was raised when fetching segment '{}' with the message {}",
137129
message.getIdentifier(),
@@ -152,6 +144,8 @@ public void close() {
152144
Thread.currentThread().interrupt();
153145
}
154146
}
147+
148+
executor.shutdownNow();
155149
log.debug("UpdateProcessor closed");
156150
}
157151

src/main/java/io/harness/cf/client/connector/EventSource.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,12 +190,18 @@ public void onResponse(@NotNull Call call, @NotNull Response response) throws IO
190190
log.warn("End of SSE stream");
191191
updater.onDisconnected("End of SSE stream");
192192
} catch (Throwable ex) {
193-
log.warn("SSE Stream aborted: " + ex.getMessage());
193+
log.warn("SSE Stream aborted: " + getExceptionMsg(ex));
194194
log.trace("SSE Stream aborted trace", ex);
195-
updater.onDisconnected(ex.getMessage());
195+
updater.onDisconnected(getExceptionMsg(ex));
196196
}
197197
}
198198

199+
private String getExceptionMsg(Throwable ex) {
200+
return (ex.getMessage() == null || "null".equals(ex.getMessage()))
201+
? ex.getClass().getSimpleName()
202+
: ex.getMessage();
203+
}
204+
199205
private static class SSEStreamException extends RuntimeException {
200206
public SSEStreamException(String msg, Throwable cause) {
201207
super(msg, cause);

src/main/java/io/harness/cf/client/connector/HarnessConnector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@ public Service stream(@NonNull final Updater updater) throws ConnectorException
404404
map,
405405
updater,
406406
Math.max(options.getSseReadTimeout(), 1),
407-
2_000,
407+
ThreadLocalRandom.current().nextInt(5000, 10000),
408408
options.getTlsTrustedCAs());
409409
return eventSource;
410410
}

0 commit comments

Comments
 (0)