Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Move event emitting off the main thread to avoid deadlocks #1314

Merged
merged 6 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,8 @@
<artifactId>maven-surefire-plugin</artifactId>
<version>3.5.2</version>
<configuration>
<forkCount>1</forkCount>
<reuseForks>false</reuseForks>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok this is obviously not a solution, but I want to know what the reason for the test failures are, are they actual issues ore is it cross test interactions? It seems we are suffering from global setup issues due to the Singleton that is OpenFeatureApi. @toddbaert any suggestion how to get around this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which tests are failing? Today we merged something which increases the stability of the spectests

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a test annotation to run some tests in isolation, which may be the best solution here. If you cannot narrow down the exact suite I'd support you adding forkCount 1 for now, and I can look into it later (or somebody else).

If you want to keep digging at it yourself, you're more than welcome, but it's not 100% related to your improvement, so I don't want to force you to do it if you are sure it's a bad test interaction.

Up to you!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ Execution(SAME_THREAD) annotation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah thanks! Yes it is always the same tests so I'm gonna annotate them and remove this global one. And then this should be done.

Copy link
Contributor Author

@sideshowcoder sideshowcoder Feb 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok so I tried this out locally and got some fixes but still have some failing tests, with annotations it is not possible to get the same isolation as with no reuse for forks. I am gonna spent more time tomorrow on this and if I can't resolve it I would suggest to move this to a followup PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm in favor of merging as is and following up with improvements in a separate PR. @toddbaert @liran2000 any concerns about that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No objections.

<argLine>
${surefireArgLine}
</argLine>
Expand Down
30 changes: 28 additions & 2 deletions src/main/java/dev/openfeature/sdk/EventProvider.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package dev.openfeature.sdk;

import dev.openfeature.sdk.internal.TriConsumer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;

/**
* Abstract EventProvider. Providers must extend this class to support events.
Expand All @@ -14,8 +18,10 @@
*
* @see FeatureProvider
*/
@Slf4j
public abstract class EventProvider implements FeatureProvider {
private EventProviderListener eventProviderListener;
private final ExecutorService emitterExecutor = Executors.newCachedThreadPool();

void setEventProviderListener(EventProviderListener eventProviderListener) {
this.eventProviderListener = eventProviderListener;
Expand Down Expand Up @@ -46,6 +52,24 @@ void detach() {
this.onEmit = null;
}

/**
* Stop the event emitter executor and block until either termination has completed
* or timeout period has elapsed.
*/
@Override
public void shutdown() {
emitterExecutor.shutdown();
try {
if (!emitterExecutor.awaitTermination(EventSupport.SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
log.warn("Emitter executor did not terminate before the timeout period had elapsed");
emitterExecutor.shutdownNow();
}
} catch (InterruptedException e) {
emitterExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
}

/**
* Emit the specified {@link ProviderEvent}.
*
Expand All @@ -56,8 +80,10 @@ public void emit(ProviderEvent event, ProviderEventDetails details) {
if (eventProviderListener != null) {
eventProviderListener.onEmit(event, details);
}
if (this.onEmit != null) {
this.onEmit.accept(this, event, details);

final TriConsumer<EventProvider, ProviderEvent, ProviderEventDetails> localOnEmit = this.onEmit;
if (localOnEmit != null) {
emitterExecutor.submit(() -> localOnEmit.accept(this, event, details));
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/dev/openfeature/sdk/EventSupport.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@
@Slf4j
class EventSupport {

public static final int SHUTDOWN_TIMEOUT_SECONDS = 3;

// we use a v4 uuid as a "placeholder" for anonymous clients, since
// ConcurrentHashMap doesn't support nulls
private static final String defaultClientUuid = UUID.randomUUID().toString();
private static final int SHUTDOWN_TIMEOUT_SECONDS = 3;
private final Map<String, HandlerStore> handlerStores = new ConcurrentHashMap<>();
private final HandlerStore globalHandlerStore = new HandlerStore();
private final ExecutorService taskExecutor = Executors.newCachedThreadPool(runnable -> {
final Thread thread = new Thread(runnable);
thread.setDaemon(true);
return thread;
});

Expand Down
27 changes: 23 additions & 4 deletions src/test/java/dev/openfeature/sdk/EventProviderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,18 @@
import static org.mockito.Mockito.*;

import dev.openfeature.sdk.internal.TriConsumer;
import dev.openfeature.sdk.testutils.TestStackedEmitCallsProvider;
import io.cucumber.java.AfterAll;
import lombok.SneakyThrows;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

class EventProviderTest {

private static final int TIMEOUT = 300;

private TestEventProvider eventProvider;

@BeforeEach
Expand All @@ -21,6 +26,11 @@ void setup() {
eventProvider.initialize(null);
}

@AfterAll
public static void resetDefaultProvider() {
OpenFeatureAPI.getInstance().setProviderAndWait(new NoOpProvider());
}

@Test
@DisplayName("should run attached onEmit with emitters")
void emitsEventsWhenAttached() {
Expand All @@ -34,10 +44,10 @@ void emitsEventsWhenAttached() {
eventProvider.emitProviderStale(details);
eventProvider.emitProviderError(details);

verify(onEmit, times(2)).accept(eventProvider, ProviderEvent.PROVIDER_READY, details);
verify(onEmit, times(1)).accept(eventProvider, ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, details);
verify(onEmit, times(1)).accept(eventProvider, ProviderEvent.PROVIDER_STALE, details);
verify(onEmit, times(1)).accept(eventProvider, ProviderEvent.PROVIDER_ERROR, details);
verify(onEmit, timeout(TIMEOUT).times(2)).accept(eventProvider, ProviderEvent.PROVIDER_READY, details);
verify(onEmit, timeout(TIMEOUT)).accept(eventProvider, ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, details);
verify(onEmit, timeout(TIMEOUT)).accept(eventProvider, ProviderEvent.PROVIDER_STALE, details);
verify(onEmit, timeout(TIMEOUT)).accept(eventProvider, ProviderEvent.PROVIDER_ERROR, details);
}

@Test
Expand Down Expand Up @@ -75,6 +85,15 @@ void doesNotThrowWhenOnEmitSame() {
eventProvider.attach(onEmit2); // should not throw, same instance. noop
}

@Test
@SneakyThrows
@Timeout(value = 2, threadMode = Timeout.ThreadMode.SEPARATE_THREAD)
@DisplayName("should not deadlock on emit called during emit")
void doesNotDeadlockOnEmitStackedCalls() {
beeme1mr marked this conversation as resolved.
Show resolved Hide resolved
TestStackedEmitCallsProvider provider = new TestStackedEmitCallsProvider();
OpenFeatureAPI.getInstance().setProviderAndWait(provider);
}

static class TestEventProvider extends EventProvider {

private static final String NAME = "TestEventProvider";
Expand Down
10 changes: 5 additions & 5 deletions src/test/java/dev/openfeature/sdk/EventsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

class EventsTest {

private static final int TIMEOUT = 300;
private static final int TIMEOUT = 500;
private static final int INIT_DELAY = TIMEOUT / 2;

@AfterAll
Expand Down Expand Up @@ -601,13 +601,13 @@ void matchingStaleEventsMustRunImmediately() {
OpenFeatureAPI api = OpenFeatureAPI.getInstance();

// provider which is already stale
TestEventsProvider provider = TestEventsProvider.newInitializedTestEventsProvider();
TestEventsProvider provider = new TestEventsProvider(INIT_DELAY);
Client client = api.getClient(name);
api.setProviderAndWait(name, provider);
provider.emitProviderStale(ProviderEventDetails.builder().build());
assertThat(client.getProviderState()).isEqualTo(ProviderState.STALE);

// should run even thought handler was added after stale
// should run even though handler was added after stale
client.onProviderStale(handler);
verify(handler, timeout(TIMEOUT)).accept(any());
}
Expand All @@ -623,13 +623,13 @@ void matchingErrorEventsMustRunImmediately() {
OpenFeatureAPI api = OpenFeatureAPI.getInstance();

// provider which is already in error
TestEventsProvider provider = new TestEventsProvider();
TestEventsProvider provider = new TestEventsProvider(INIT_DELAY);
Client client = api.getClient(name);
api.setProviderAndWait(name, provider);
provider.emitProviderError(ProviderEventDetails.builder().build());
assertThat(client.getProviderState()).isEqualTo(ProviderState.ERROR);

// should run even thought handler was added after error
// should run even though handler was added after error
client.onProviderError(handler);
verify(handler, timeout(TIMEOUT)).accept(any());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package dev.openfeature.sdk.testutils;

import dev.openfeature.sdk.EvaluationContext;
import dev.openfeature.sdk.EventProvider;
import dev.openfeature.sdk.Metadata;
import dev.openfeature.sdk.ProviderEvaluation;
import dev.openfeature.sdk.ProviderEvent;
import dev.openfeature.sdk.ProviderEventDetails;
import dev.openfeature.sdk.Value;
import java.util.function.Consumer;

public class TestStackedEmitCallsProvider extends EventProvider {
private final NestedBlockingEmitter nestedBlockingEmitter = new NestedBlockingEmitter(this::onProviderEvent);

@Override
public Metadata getMetadata() {
return () -> getClass().getSimpleName();
}

@Override
public void initialize(EvaluationContext evaluationContext) throws Exception {
synchronized (nestedBlockingEmitter) {
nestedBlockingEmitter.init();
while (!nestedBlockingEmitter.isReady()) {
try {
nestedBlockingEmitter.wait();
} catch (InterruptedException e) {
}
}
}
}

private void onProviderEvent(ProviderEvent providerEvent) {
synchronized (nestedBlockingEmitter) {
if (providerEvent == ProviderEvent.PROVIDER_READY) {
nestedBlockingEmitter.setReady();
/*
* This line deadlocked in the original implementation without the emitterExecutor see
* https://github.com/open-feature/java-sdk/issues/1299
*/
emitProviderReady(ProviderEventDetails.builder().build());
}
}
}

@Override
public ProviderEvaluation<Boolean> getBooleanEvaluation(String key, Boolean defaultValue, EvaluationContext ctx) {
throw new UnsupportedOperationException("Unimplemented method 'getBooleanEvaluation'");
}

@Override
public ProviderEvaluation<String> getStringEvaluation(String key, String defaultValue, EvaluationContext ctx) {
throw new UnsupportedOperationException("Unimplemented method 'getStringEvaluation'");
}

@Override
public ProviderEvaluation<Integer> getIntegerEvaluation(String key, Integer defaultValue, EvaluationContext ctx) {
throw new UnsupportedOperationException("Unimplemented method 'getIntegerEvaluation'");
}

@Override
public ProviderEvaluation<Double> getDoubleEvaluation(String key, Double defaultValue, EvaluationContext ctx) {
throw new UnsupportedOperationException("Unimplemented method 'getDoubleEvaluation'");
}

@Override
public ProviderEvaluation<Value> getObjectEvaluation(String key, Value defaultValue, EvaluationContext ctx) {
throw new UnsupportedOperationException("Unimplemented method 'getObjectEvaluation'");
}

static class NestedBlockingEmitter {

private final Consumer<ProviderEvent> emitProviderEvent;
private volatile boolean isReady;

public NestedBlockingEmitter(Consumer<ProviderEvent> emitProviderEvent) {
this.emitProviderEvent = emitProviderEvent;
}

public void init() {
// run init outside monitored thread
new Thread(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

emitProviderEvent.accept(ProviderEvent.PROVIDER_READY);
})
.start();
}

public boolean isReady() {
return isReady;
}

public synchronized void setReady() {
isReady = true;
this.notifyAll();
}
}
}
Loading