diff --git a/micrometer-observation/src/main/java/io/micrometer/observation/aop/ObservedAspect.java b/micrometer-observation/src/main/java/io/micrometer/observation/aop/ObservedAspect.java index c6d7d4fdb8..fc6c7c4c6e 100644 --- a/micrometer-observation/src/main/java/io/micrometer/observation/aop/ObservedAspect.java +++ b/micrometer-observation/src/main/java/io/micrometer/observation/aop/ObservedAspect.java @@ -71,6 +71,7 @@ * @author Jonatan Ivanov * @author Yanming Zhou * @author Jeonggi Kim + * @author Maksim Petelin * @since 1.10.0 */ @Aspect @@ -151,16 +152,16 @@ private Object observe(ProceedingJoinPoint pjp, Method method, Observed observed try { Object result = pjp.proceed(); if (result == null) { - stopObservation(observation, scope, null); + stopObservation(observation, null); return result; } else { CompletionStage stage = (CompletionStage) result; - return stage.whenComplete((res, error) -> stopObservation(observation, scope, error)); + return stage.whenComplete((res, error) -> stopObservation(observation, error)); } } catch (Throwable error) { - stopObservation(observation, scope, error); + stopObservation(observation, error); throw error; } finally { @@ -191,11 +192,10 @@ private Method getMethod(ProceedingJoinPoint pjp) throws NoSuchMethodException { return method; } - private void stopObservation(Observation observation, Observation.Scope scope, @Nullable Throwable error) { + private void stopObservation(Observation observation, @Nullable Throwable error) { if (error != null) { observation.error(error); } - scope.close(); observation.stop(); } diff --git a/samples/micrometer-samples-spring-framework6/src/test/java/io/micrometer/samples/spring6/aop/ObservedAspectTests.java b/samples/micrometer-samples-spring-framework6/src/test/java/io/micrometer/samples/spring6/aop/ObservedAspectTests.java index a01ef02ac5..3a02bda71c 100644 --- a/samples/micrometer-samples-spring-framework6/src/test/java/io/micrometer/samples/spring6/aop/ObservedAspectTests.java +++ b/samples/micrometer-samples-spring-framework6/src/test/java/io/micrometer/samples/spring6/aop/ObservedAspectTests.java @@ -170,6 +170,26 @@ void annotatedAsyncCallShouldBeObservedAndErrorRecorded() { .isEqualTo(simulatedException); } + @Test + void observationShouldNotLeakToFutureCompletionThread() { + registry.observationConfig().observationHandler(new ObservationTextPublisher()); + + AspectJProxyFactory pf = new AspectJProxyFactory(new ObservedService()); + pf.addAspect(new ObservedAspect(registry)); + + ObservedService service = pf.getProxy(); + FakeAsyncTask fakeAsyncTask = new FakeAsyncTask("test-result"); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + CompletableFuture asyncResult = service.supply(() -> service.async(fakeAsyncTask, executor)); + // must run in the thread of the executor (async task) + CompletableFuture asyncAssertion = asyncResult + .thenRunAsync(() -> assertThat(registry).doesNotHaveAnyRemainingCurrentObservation(), executor); + fakeAsyncTask.proceed(); + + assertThat(asyncAssertion).succeedsWithin(Duration.ofMillis(200)); + } + @Test void customObservationConventionShouldBeUsed() { registry.observationConfig().observationHandler(new ObservationTextPublisher()); @@ -392,14 +412,28 @@ void error() { @Observed(name = "test.async") CompletableFuture async(FakeAsyncTask fakeAsyncTask) { - System.out.println("async"); ContextSnapshot contextSnapshot = ContextSnapshotFactory.builder() .captureKeyPredicate(key -> true) .contextRegistry(ContextRegistry.getInstance()) .build() .captureAll(); - return CompletableFuture.supplyAsync(fakeAsyncTask, - contextSnapshot.wrapExecutor(Executors.newSingleThreadExecutor())); + return supplyAsync(fakeAsyncTask, contextSnapshot.wrapExecutor(Executors.newSingleThreadExecutor())); + } + + @Observed(name = "test.async") + CompletableFuture async(FakeAsyncTask fakeAsyncTask, Executor singleThreadExecutor) { + return supplyAsync(fakeAsyncTask, singleThreadExecutor); + } + + @Observed(name = "test.supply") + T supply(Supplier supplier) { + System.out.println("supply"); + return supplier.get(); + } + + private CompletableFuture supplyAsync(FakeAsyncTask fakeAsyncTask, Executor executor) { + System.out.println("async"); + return CompletableFuture.supplyAsync(fakeAsyncTask, executor); } }