Skip to content

Commit a2f5f28

Browse files
authored
fix: possible tight busy loop on certain connection errors (#1629)
Signed-off-by: Todd Baert <[email protected]>
1 parent 0b720f3 commit a2f5f28

File tree

2 files changed

+84
-13
lines changed

2 files changed

+84
-13
lines changed

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public class SyncStreamQueueSource implements QueueSource {
3838
private static final int QUEUE_SIZE = 5;
3939

4040
private final AtomicBoolean shutdown = new AtomicBoolean(false);
41+
private final AtomicBoolean shouldThrottle = new AtomicBoolean(false);
4142
private final int streamDeadline;
4243
private final int deadline;
4344
private final int maxBackoffMs;
@@ -102,7 +103,10 @@ public BlockingQueue<QueuePayload> getStreamQueue() {
102103
* @throws InterruptedException if stream can't be closed within deadline.
103104
*/
104105
public void shutdown() throws InterruptedException {
105-
if (shutdown.getAndSet(true)) {
106+
// Use atomic compareAndSet to ensure shutdown is only executed once
107+
// This prevents race conditions when shutdown is called from multiple threads
108+
if (!shutdown.compareAndSet(false, true)) {
109+
log.debug("Shutdown already in progress or completed");
106110
return;
107111
}
108112
this.channelConnector.shutdown();
@@ -117,16 +121,26 @@ private void observeSyncStream() {
117121
// error conditions
118122
while (!shutdown.get()) {
119123
try {
124+
if (shouldThrottle.getAndSet(false)) {
125+
log.debug("Previous stream ended with error, waiting {} ms before retry", this.maxBackoffMs);
126+
Thread.sleep(this.maxBackoffMs);
127+
128+
// Check shutdown again after sleep to avoid unnecessary work
129+
if (shutdown.get()) {
130+
break;
131+
}
132+
}
133+
120134
log.debug("Initializing sync stream request");
121-
SyncStreamObserver observer = new SyncStreamObserver(outgoingQueue);
135+
SyncStreamObserver observer = new SyncStreamObserver(outgoingQueue, shouldThrottle);
122136
try {
123137
observer.metadata = getMetadata();
124138
} catch (Exception metaEx) {
125139
// retry if getMetadata fails
126140
String message = metaEx.getMessage();
127141
log.debug("Metadata request error: {}, will restart", message, metaEx);
128142
enqueueError(String.format("Error in getMetadata request: %s", message));
129-
Thread.sleep(this.maxBackoffMs);
143+
shouldThrottle.set(true);
130144
continue;
131145
}
132146

@@ -135,7 +149,7 @@ private void observeSyncStream() {
135149
} catch (Exception ex) {
136150
log.error("Unexpected sync stream exception, will restart.", ex);
137151
enqueueError(String.format("Error in syncStream: %s", ex.getMessage()));
138-
Thread.sleep(this.maxBackoffMs);
152+
shouldThrottle.set(true);
139153
}
140154
} catch (InterruptedException ie) {
141155
log.debug("Stream loop interrupted, most likely shutdown was invoked", ie);
@@ -209,12 +223,14 @@ private static void enqueueError(BlockingQueue<QueuePayload> queue, String messa
209223

210224
private static class SyncStreamObserver implements StreamObserver<SyncFlagsResponse> {
211225
private final BlockingQueue<QueuePayload> outgoingQueue;
226+
private final AtomicBoolean shouldThrottle;
212227
private final Awaitable done = new Awaitable();
213228

214229
private Struct metadata;
215230

216-
public SyncStreamObserver(BlockingQueue<QueuePayload> outgoingQueue) {
231+
public SyncStreamObserver(BlockingQueue<QueuePayload> outgoingQueue, AtomicBoolean shouldThrottle) {
217232
this.outgoingQueue = outgoingQueue;
233+
this.shouldThrottle = shouldThrottle;
218234
}
219235

220236
@Override
@@ -235,6 +251,9 @@ public void onError(Throwable throwable) {
235251
String message = throwable != null ? throwable.getMessage() : "unknown";
236252
log.debug("Stream error: {}, will restart", message, throwable);
237253
enqueueError(outgoingQueue, String.format("Error from stream: %s", message));
254+
255+
// Set throttling flag to ensure backoff before retry
256+
this.shouldThrottle.set(true);
238257
} finally {
239258
done.wakeup();
240259
}

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSourceTest.java

Lines changed: 60 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ class SyncStreamQueueSourceTest {
3636
private ChannelConnector mockConnector;
3737
private FlagSyncServiceBlockingStub blockingStub;
3838
private FlagSyncServiceStub stub;
39-
private FlagSyncServiceStub errorStub;
39+
private FlagSyncServiceStub syncErrorStub;
40+
private FlagSyncServiceStub asyncErrorStub;
4041
private StreamObserver<SyncFlagsResponse> observer;
4142
private CountDownLatch latch; // used to wait for observer to be initialized
4243

@@ -60,25 +61,76 @@ public void setup() throws Exception {
6061
.when(stub)
6162
.syncFlags(any(SyncFlagsRequest.class), any(StreamObserver.class)); // Mock the initialize
6263

63-
errorStub = mock(FlagSyncServiceStub.class);
64-
when(errorStub.withDeadlineAfter(anyLong(), any())).thenReturn(errorStub);
64+
syncErrorStub = mock(FlagSyncServiceStub.class);
65+
when(syncErrorStub.withDeadlineAfter(anyLong(), any())).thenReturn(syncErrorStub);
6566
doAnswer((Answer<Void>) invocation -> {
6667
Object[] args = invocation.getArguments();
6768
observer = (StreamObserver<SyncFlagsResponse>) args[1];
6869
latch.countDown();
6970
throw new StatusRuntimeException(io.grpc.Status.NOT_FOUND);
7071
})
71-
.when(errorStub)
72+
.when(syncErrorStub)
7273
.syncFlags(any(SyncFlagsRequest.class), any(StreamObserver.class)); // Mock the initialize
74+
75+
asyncErrorStub = mock(FlagSyncServiceStub.class);
76+
when(asyncErrorStub.withDeadlineAfter(anyLong(), any())).thenReturn(asyncErrorStub);
77+
doAnswer((Answer<Void>) invocation -> {
78+
Object[] args = invocation.getArguments();
79+
observer = (StreamObserver<SyncFlagsResponse>) args[1];
80+
latch.countDown();
81+
82+
// Start a thread to call onError after a short delay
83+
new Thread(() -> {
84+
try {
85+
Thread.sleep(10); // Wait 100ms before calling onError
86+
observer.onError(new StatusRuntimeException(io.grpc.Status.INTERNAL));
87+
} catch (InterruptedException e) {
88+
Thread.currentThread().interrupt();
89+
}
90+
})
91+
.start();
92+
93+
return null;
94+
})
95+
.when(asyncErrorStub)
96+
.syncFlags(any(SyncFlagsRequest.class), any(StreamObserver.class)); // Mock the initialize
97+
}
98+
99+
@Test
100+
void syncInitError_DoesNotBusyWait() throws Exception {
101+
// make sure we do not spin in a busy loop on immediately errors
102+
103+
int maxBackoffMs = 1000;
104+
SyncStreamQueueSource queueSource = new SyncStreamQueueSource(
105+
FlagdOptions.builder().retryBackoffMaxMs(maxBackoffMs).build(),
106+
mockConnector,
107+
syncErrorStub,
108+
blockingStub);
109+
latch = new CountDownLatch(1);
110+
queueSource.init();
111+
latch.await();
112+
113+
BlockingQueue<QueuePayload> streamQueue = queueSource.getStreamQueue();
114+
QueuePayload payload = streamQueue.poll(1000, TimeUnit.MILLISECONDS);
115+
assertNotNull(payload);
116+
assertEquals(QueuePayloadType.ERROR, payload.getType());
117+
Thread.sleep(maxBackoffMs + (maxBackoffMs / 2)); // wait 1.5x our delay for reties
118+
119+
// should have retried the stream (2 calls); initial + 1 retry
120+
// it's very important that the retry count is low, to confirm no busy-loop
121+
verify(syncErrorStub, times(2)).syncFlags(any(), any());
73122
}
74123

75124
@Test
76-
void initError_DoesNotBusyWait() throws Exception {
77-
// make sure we do not spin in a busy loop on errors
125+
void asyncInitError_DoesNotBusyWait() throws Exception {
126+
// make sure we do not spin in a busy loop on async errors
78127

79128
int maxBackoffMs = 1000;
80129
SyncStreamQueueSource queueSource = new SyncStreamQueueSource(
81-
FlagdOptions.builder().retryBackoffMaxMs(maxBackoffMs).build(), mockConnector, errorStub, blockingStub);
130+
FlagdOptions.builder().retryBackoffMaxMs(maxBackoffMs).build(),
131+
mockConnector,
132+
asyncErrorStub,
133+
blockingStub);
82134
latch = new CountDownLatch(1);
83135
queueSource.init();
84136
latch.await();
@@ -91,7 +143,7 @@ void initError_DoesNotBusyWait() throws Exception {
91143

92144
// should have retried the stream (2 calls); initial + 1 retry
93145
// it's very important that the retry count is low, to confirm no busy-loop
94-
verify(errorStub, times(2)).syncFlags(any(), any());
146+
verify(asyncErrorStub, times(2)).syncFlags(any(), any());
95147
}
96148

97149
@Test

0 commit comments

Comments
 (0)