Skip to content

Commit

Permalink
Move event emitting off the main thread to avoid deadlocks
Browse files Browse the repository at this point in the history
When stacking event emitting inside an EventProvider, when using sychronization
the EventProvider can deadlock, to avoid this move the event emitting of the
main thread.

Signed-off-by: Philipp Fehre <[email protected]>
  • Loading branch information
Philipp Fehre committed Feb 3, 2025
1 parent 208411e commit 20ba91c
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 2 deletions.
32 changes: 31 additions & 1 deletion src/main/java/dev/openfeature/sdk/EventProvider.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package dev.openfeature.sdk;

import dev.openfeature.sdk.internal.TriConsumer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;

/**
* Abstract EventProvider. Providers must extend this class to support events.
Expand All @@ -14,8 +18,15 @@
*
* @see FeatureProvider
*/
@Slf4j
public abstract class EventProvider implements FeatureProvider {
private EventProviderListener eventProviderListener;
private static final int SHUTDOWN_TIMEOUT_SECONDS = 3;
private final ExecutorService emitterExecutor = Executors.newCachedThreadPool(runnable -> {
final Thread thread = new Thread(runnable);
thread.setDaemon(true);
return thread;
});

void setEventProviderListener(EventProviderListener eventProviderListener) {
this.eventProviderListener = eventProviderListener;
Expand Down Expand Up @@ -46,6 +57,24 @@ void detach() {
this.onEmit = null;
}

/**
* Stop the event emitter executor and block until either termination has completed
* or timeout period has elapsed.
*/
@Override
public void shutdown() {
emitterExecutor.shutdown();
try {
if (!emitterExecutor.awaitTermination(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
log.warn("Emitter executor did not terminate before the timeout period had elapsed");
emitterExecutor.shutdownNow();
}
} catch (InterruptedException e) {
emitterExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
}

/**
* Emit the specified {@link ProviderEvent}.
*
Expand All @@ -57,7 +86,7 @@ public void emit(ProviderEvent event, ProviderEventDetails details) {
eventProviderListener.onEmit(event, details);
}
if (this.onEmit != null) {
this.onEmit.accept(this, event, details);
emitterExecutor.submit(() -> this.onEmit.accept(this, event, details));
}
}

Expand All @@ -68,6 +97,7 @@ public void emit(ProviderEvent event, ProviderEventDetails details) {
* @param details The details of the event
*/
public void emitProviderReady(ProviderEventDetails details) {
System.out.println("Details: " + details);
emit(ProviderEvent.PROVIDER_READY, details);
}

Expand Down
111 changes: 110 additions & 1 deletion src/test/java/dev/openfeature/sdk/EventProviderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import dev.openfeature.sdk.internal.TriConsumer;
import java.util.function.Consumer;
import lombok.SneakyThrows;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

class EventProviderTest {

Expand Down Expand Up @@ -75,6 +80,110 @@ void doesNotThrowWhenOnEmitSame() {
eventProvider.attach(onEmit2); // should not throw, same instance. noop
}

@Test
@SneakyThrows
@Timeout(value = 2, threadMode = Timeout.ThreadMode.SEPARATE_THREAD)
@DisplayName("should not deadlock on emit called during emit")
void doesNotDeadlockOnEmitStackedCalls() {
StackedEmitCallsProvider provider = new StackedEmitCallsProvider();
OpenFeatureAPI.getInstance().setProviderAndWait(provider);
}

static class StackedEmitCallsProvider extends EventProvider {
private final NestedBlockingEmitter nestedBlockingEmitter = new NestedBlockingEmitter(this::onProviderEvent);

@Override
public Metadata getMetadata() {
return () -> getClass().getSimpleName();
}

@Override
public void initialize(EvaluationContext evaluationContext) throws Exception {
synchronized (nestedBlockingEmitter) {
nestedBlockingEmitter.init();
while (!nestedBlockingEmitter.isReady()) {
try {
nestedBlockingEmitter.wait();
} catch (InterruptedException e) {
}
}
}
}

private void onProviderEvent(ProviderEvent providerEvent) {
synchronized (nestedBlockingEmitter) {
if (providerEvent == ProviderEvent.PROVIDER_READY) {
nestedBlockingEmitter.setReady();
/*
* This line deadlocked in the original implementation without the emitterExecutor see
* https://github.com/open-feature/java-sdk/issues/1299
*/
emitProviderReady(ProviderEventDetails.builder().build());
}
}
}

@Override
public ProviderEvaluation<Boolean> getBooleanEvaluation(
String key, Boolean defaultValue, EvaluationContext ctx) {
throw new UnsupportedOperationException("Unimplemented method 'getBooleanEvaluation'");
}

@Override
public ProviderEvaluation<String> getStringEvaluation(String key, String defaultValue, EvaluationContext ctx) {
throw new UnsupportedOperationException("Unimplemented method 'getStringEvaluation'");
}

@Override
public ProviderEvaluation<Integer> getIntegerEvaluation(
String key, Integer defaultValue, EvaluationContext ctx) {
throw new UnsupportedOperationException("Unimplemented method 'getIntegerEvaluation'");
}

@Override
public ProviderEvaluation<Double> getDoubleEvaluation(String key, Double defaultValue, EvaluationContext ctx) {
throw new UnsupportedOperationException("Unimplemented method 'getDoubleEvaluation'");
}

@Override
public ProviderEvaluation<Value> getObjectEvaluation(String key, Value defaultValue, EvaluationContext ctx) {
throw new UnsupportedOperationException("Unimplemented method 'getObjectEvaluation'");
}
}

static class NestedBlockingEmitter {

private final Consumer<ProviderEvent> emitProviderEvent;
private volatile boolean isReady;

public NestedBlockingEmitter(Consumer<ProviderEvent> emitProviderEvent) {
this.emitProviderEvent = emitProviderEvent;
}

public void init() {
// run init outside monitored thread
new Thread(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

emitProviderEvent.accept(ProviderEvent.PROVIDER_READY);
})
.start();
}

public boolean isReady() {
return isReady;
}

public synchronized void setReady() {
isReady = true;
this.notifyAll();
}
}

static class TestEventProvider extends EventProvider {

private static final String NAME = "TestEventProvider";
Expand Down

0 comments on commit 20ba91c

Please sign in to comment.