diff --git a/inferred-spans/src/main/java/io/opentelemetry/contrib/inferredspans/InferredSpansProcessor.java b/inferred-spans/src/main/java/io/opentelemetry/contrib/inferredspans/InferredSpansProcessor.java index 27d1da9b7..4a8c1630b 100644 --- a/inferred-spans/src/main/java/io/opentelemetry/contrib/inferredspans/InferredSpansProcessor.java +++ b/inferred-spans/src/main/java/io/opentelemetry/contrib/inferredspans/InferredSpansProcessor.java @@ -49,10 +49,12 @@ public class InferredSpansProcessor implements SpanProcessor { SpanAnchoredClock clock, boolean startScheduledProfiling, @Nullable File activationEventsFile, - @Nullable File jfrFile) { + @Nullable File jfrFile, + @Nullable File tempDir) { this.config = config; profiler = - new SamplingProfiler(config, clock, this::getTracer, activationEventsFile, jfrFile, null); + new SamplingProfiler( + config, clock, this::getTracer, activationEventsFile, jfrFile, tempDir); if (startScheduledProfiling) { profiler.start(); } diff --git a/inferred-spans/src/main/java/io/opentelemetry/contrib/inferredspans/InferredSpansProcessorBuilder.java b/inferred-spans/src/main/java/io/opentelemetry/contrib/inferredspans/InferredSpansProcessorBuilder.java index a96f3b942..ee30d6b13 100644 --- a/inferred-spans/src/main/java/io/opentelemetry/contrib/inferredspans/InferredSpansProcessorBuilder.java +++ b/inferred-spans/src/main/java/io/opentelemetry/contrib/inferredspans/InferredSpansProcessorBuilder.java @@ -52,6 +52,7 @@ public class InferredSpansProcessorBuilder { private boolean startScheduledProfiling = true; @Nullable private File activationEventsFile = null; @Nullable private File jfrFile = null; + @Nullable private File tempDir = null; private BiConsumer parentOverrideHandler = CallTree.DEFAULT_PARENT_OVERRIDE; @@ -75,7 +76,7 @@ public InferredSpansProcessor build() { parentOverrideHandler); InferredSpansProcessor processor = new InferredSpansProcessor( - config, clock, startScheduledProfiling, activationEventsFile, jfrFile); + config, clock, startScheduledProfiling, activationEventsFile, jfrFile, tempDir); InferredSpans.setInstance(processor); return processor; } @@ -205,6 +206,12 @@ InferredSpansProcessorBuilder jfrFile(@Nullable File jfrFile) { return this; } + /** For testing only. */ + public InferredSpansProcessorBuilder tempDir(@Nullable File tempDir) { + this.tempDir = tempDir; + return this; + } + /** * Defines the action to perform when a inferred span is discovered to actually be the parent of a * normal span. The first argument of the handler is the modifiable inferred span, the second diff --git a/inferred-spans/src/main/java/io/opentelemetry/contrib/inferredspans/internal/SamplingProfiler.java b/inferred-spans/src/main/java/io/opentelemetry/contrib/inferredspans/internal/SamplingProfiler.java index 2b5dff2b6..d272f52b9 100644 --- a/inferred-spans/src/main/java/io/opentelemetry/contrib/inferredspans/internal/SamplingProfiler.java +++ b/inferred-spans/src/main/java/io/opentelemetry/contrib/inferredspans/internal/SamplingProfiler.java @@ -43,6 +43,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; @@ -153,6 +154,7 @@ public class SamplingProfiler implements Runnable { @Nullable private final File tempDir; private final AsyncProfiler profiler; + private final ReentrantLock profilerLock = new ReentrantLock(); @Nullable private volatile Future profilingTask; /** @@ -317,6 +319,9 @@ public boolean onActivation(Span activeSpan, @Nullable Span previouslyActive) { if (previouslyActive == null) { profiler.addThread(Thread.currentThread()); } + if (!config.isPostProcessingEnabled()) { + return true; + } boolean success = eventBuffer.tryPublishEvent(activationEventTranslator, activeSpan, previouslyActive); if (!success) { @@ -343,6 +348,9 @@ public boolean onDeactivation(Span deactivatedSpan, @Nullable Span previouslyAct if (previouslyActive == null) { profiler.removeThread(Thread.currentThread()); } + if (!config.isPostProcessingEnabled()) { + return true; + } boolean success = eventBuffer.tryPublishEvent( deactivationEventTranslator, deactivatedSpan, previouslyActive); @@ -373,7 +381,9 @@ public void run() { Duration profilingDuration = config.getProfilingDuration(); boolean postProcessingEnabled = config.isPostProcessingEnabled(); - setProfilingSessionOngoing(postProcessingEnabled); + // We need to enable the session so that onActivation is called and threads are added to the + // profiler (profiler.addThread). Otherwise, with the "filter" option, nothing is profiled. + setProfilingSessionOngoing(true); if (postProcessingEnabled) { logger.fine("Start full profiling session (async-profiler and agent processing)"); @@ -394,9 +404,17 @@ public void run() { config.isNonStopProfiling() && !interrupted && postProcessingEnabled; setProfilingSessionOngoing(continueProfilingSession); - if (!interrupted && !scheduler.isShutdown()) { - long delay = config.getProfilingInterval().toMillis() - profilingDuration.toMillis(); - profilingTask = scheduler.schedule(this, delay, TimeUnit.MILLISECONDS); + profilerLock.lock(); + try { + // it's possible for an interruption to occur just before the lock was acquired. This is + // handled by re-reading Thread.currentThread().isInterrupted() to ensure no task is scheduled + // if an interruption occurred just before acquiring the lock + if (!Thread.currentThread().isInterrupted() && !scheduler.isShutdown()) { + long delay = config.getProfilingInterval().toMillis() - profilingDuration.toMillis(); + profilingTask = scheduler.schedule(this, delay, TimeUnit.MILLISECONDS); + } + } finally { + profilerLock.unlock(); } } @@ -404,27 +422,44 @@ public void run() { private void profile(Duration profilingDuration) throws Exception { try { String startCommand = createStartCommand(); - String startMessage = profiler.execute(startCommand); + String startMessage; + try { + startMessage = profiler.execute(startCommand); + } catch (IllegalStateException e) { + if (e.getMessage() != null && e.getMessage().contains("already started")) { + logger.fine("Profiler already started. Stopping and restarting."); + try { + profiler.stop(); + } catch (RuntimeException ignore) { + logger.log(Level.FINE, "Ignored error on stopping profiler", ignore); + } + startMessage = profiler.execute(startCommand); + } else { + throw e; + } + } logger.fine(startMessage); - if (!profiledThreads.isEmpty()) { - restoreFilterState(profiler); + try { + // try-finally because if the code is interrupted we want to ensure the + // profiler.execute("stop") is called + if (!profiledThreads.isEmpty()) { + restoreFilterState(profiler); + } + // Doesn't need to be atomic as this field is being updated only by a single thread + profilingSessions++; + + // When post-processing is disabled activation events are ignored, but we still need to + // invoke this method as it is the one enforcing the sampling session duration. As a side + // effect it will also consume residual activation events if post-processing is disabled + // dynamically + consumeActivationEventsFromRingBufferAndWriteToFile(profilingDuration); + } finally { + String stopMessage = profiler.execute("stop"); + logger.fine(stopMessage); } - // Doesn't need to be atomic as this field is being updated only by a single thread - profilingSessions++; - - // When post-processing is disabled activation events are ignored, but we still need to invoke - // this method - // as it is the one enforcing the sampling session duration. As a side effect it will also - // consume - // residual activation events if post-processing is disabled dynamically - consumeActivationEventsFromRingBufferAndWriteToFile(profilingDuration); - - String stopMessage = profiler.execute("stop"); - logger.fine(stopMessage); // When post-processing is disabled, jfr file will not be parsed and the heavy processing will - // not occur - // as this method aborts when no activation events are buffered + // not occur as this method aborts when no activation events are buffered processTraces(); } catch (InterruptedException | ClosedByInterruptException e) { try { @@ -505,6 +540,9 @@ EventPoller.PollState consumeActivationEventsFromRingBufferAndWriteToFile() thro } public void processTraces() throws IOException { + if (!config.isPostProcessingEnabled()) { + return; + } if (jfrParser == null) { jfrParser = new JfrParser(); } @@ -739,18 +777,25 @@ public void start() { @SuppressWarnings({"FutureReturnValueIgnored", "Interruption"}) public void reschedule() { - Future future = this.profilingTask; - if (future != null) { - if (future.cancel(true)) { + profilerLock.lock(); + try { + Future future = this.profilingTask; + if (future != null && future.cancel(true)) { Duration profilingDuration = config.getProfilingDuration(); long delay = config.getProfilingInterval().toMillis() - profilingDuration.toMillis(); profilingTask = scheduler.schedule(this, delay, TimeUnit.MILLISECONDS); } + } finally { + profilerLock.unlock(); } } + @SuppressWarnings({"FutureReturnValueIgnored", "Interruption"}) public void stop() throws InterruptedException, IOException { // cancels/interrupts the profiling thread + if (profilingTask != null) { + profilingTask.cancel(true); + } // implicitly clears profiled threads scheduler.shutdown(); scheduler.awaitTermination(10, TimeUnit.SECONDS); diff --git a/inferred-spans/src/main/java/io/opentelemetry/contrib/inferredspans/internal/asyncprofiler/JfrParser.java b/inferred-spans/src/main/java/io/opentelemetry/contrib/inferredspans/internal/asyncprofiler/JfrParser.java index 44c6f9c13..fcb3a19e4 100644 --- a/inferred-spans/src/main/java/io/opentelemetry/contrib/inferredspans/internal/asyncprofiler/JfrParser.java +++ b/inferred-spans/src/main/java/io/opentelemetry/contrib/inferredspans/internal/asyncprofiler/JfrParser.java @@ -93,6 +93,9 @@ public void parse( this.includedClasses = includedClasses; bufferedFile.setFile(file); long fileSize = bufferedFile.size(); + if (fileSize == 0) { + return; + } int chunkSize = readChunk(0); if (chunkSize < fileSize) { diff --git a/inferred-spans/src/test/java/io/opentelemetry/contrib/inferredspans/InferredSpansTest.java b/inferred-spans/src/test/java/io/opentelemetry/contrib/inferredspans/InferredSpansTest.java index e0de5997e..939f3003f 100644 --- a/inferred-spans/src/test/java/io/opentelemetry/contrib/inferredspans/InferredSpansTest.java +++ b/inferred-spans/src/test/java/io/opentelemetry/contrib/inferredspans/InferredSpansTest.java @@ -10,17 +10,20 @@ import io.opentelemetry.contrib.inferredspans.internal.SamplingProfiler; import io.opentelemetry.contrib.inferredspans.internal.util.DisabledOnOpenJ9; +import java.nio.file.Path; import java.time.Duration; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.condition.OS; +import org.junit.jupiter.api.io.TempDir; @DisabledOnOs(OS.WINDOWS) @DisabledOnOpenJ9 class InferredSpansTest { + @TempDir Path tempDir; private ProfilerTestSetup setup; @BeforeEach @@ -40,7 +43,7 @@ void tearDown() { void testIsEnabled() { assertThat(InferredSpans.isEnabled()).isFalse(); - setup = ProfilerTestSetup.create(c -> {}); + setup = ProfilerTestSetup.create(c -> c.tempDir(tempDir.toFile())); assertThat(InferredSpans.isEnabled()).isTrue(); @@ -61,7 +64,8 @@ void testSetProfilerIntervalWhenDisabled() { ProfilerTestSetup.create( c -> c.profilerInterval(Duration.ofSeconds(10)) - .profilingDuration(Duration.ofMillis(500))); + .profilingDuration(Duration.ofMillis(500)) + .tempDir(tempDir.toFile())); // assert that the interval set before the profiler was initialized is ignored assertThat(setup.profiler.getConfig().getProfilingInterval()).isEqualTo(Duration.ofSeconds(10)); @@ -73,7 +77,8 @@ void testSetProfilerInterval() { ProfilerTestSetup.create( c -> c.profilerInterval(Duration.ofSeconds(10)) - .profilingDuration(Duration.ofMillis(500))); + .profilingDuration(Duration.ofMillis(500)) + .tempDir(tempDir.toFile())); SamplingProfiler profiler = setup.profiler; await() diff --git a/inferred-spans/src/test/java/io/opentelemetry/contrib/inferredspans/internal/SamplingProfilerTest.java b/inferred-spans/src/test/java/io/opentelemetry/contrib/inferredspans/internal/SamplingProfilerTest.java index 327077f85..6471cb684 100644 --- a/inferred-spans/src/test/java/io/opentelemetry/contrib/inferredspans/internal/SamplingProfilerTest.java +++ b/inferred-spans/src/test/java/io/opentelemetry/contrib/inferredspans/internal/SamplingProfilerTest.java @@ -22,7 +22,6 @@ import java.lang.reflect.Method; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.Paths; import java.time.Duration; import java.util.List; import java.util.Optional; @@ -44,6 +43,11 @@ @DisabledOnOpenJ9 class SamplingProfilerTest { + static { + // Needed to ensure ordering because tests things out of order + ProfilingActivationListener.ensureInitialized(); + } + private ProfilerTestSetup setup; @TempDir private Path tempDir; @@ -58,12 +62,6 @@ void tearDown() { @Test void shouldLazilyCreateTempFilesAndCleanThem() { - for (Path file : getProfilerTempFiles()) { - if (!file.toFile().delete()) { - throw new IllegalStateException("Could not delete temp file: " + file); - } - } - // temporary files should be created on-demand, and properly deleted afterwards setupProfiler(false); @@ -91,9 +89,8 @@ void shouldLazilyCreateTempFilesAndCleanThem() { .isEmpty(); } - private static List getProfilerTempFiles() { - Path tempFolder = Paths.get(System.getProperty("java.io.tmpdir")); - try (Stream files = Files.list(tempFolder)) { + private List getProfilerTempFiles() { + try (Stream files = Files.list(tempDir)) { return files .filter(f -> f.getFileName().toString().startsWith("otel-inferred-")) .sorted() @@ -327,7 +324,8 @@ private void setupProfiler(Consumer configCustomi config .profilingDuration(Duration.ofMillis(500)) .profilerInterval(Duration.ofMillis(500)) - .samplingInterval(Duration.ofMillis(5)); + .samplingInterval(Duration.ofMillis(5)) + .tempDir(tempDir.toFile()); configCustomizer.accept(config); }); } diff --git a/inferred-spans/src/test/java/io/opentelemetry/contrib/inferredspans/internal/asyncprofiler/JfrParserTest.java b/inferred-spans/src/test/java/io/opentelemetry/contrib/inferredspans/internal/asyncprofiler/JfrParserTest.java index 60273534b..dd67a3a71 100644 --- a/inferred-spans/src/test/java/io/opentelemetry/contrib/inferredspans/internal/asyncprofiler/JfrParserTest.java +++ b/inferred-spans/src/test/java/io/opentelemetry/contrib/inferredspans/internal/asyncprofiler/JfrParserTest.java @@ -60,4 +60,16 @@ void name() throws Exception { }); assertThat(stackTraces.get()).isEqualTo(92); } + + @Test + void testParseEmptyFile() throws Exception { + File file = File.createTempFile("empty", ".jfr"); + try { + JfrParser jfrParser = new JfrParser(); + jfrParser.parse(file, Collections.emptyList(), Collections.emptyList()); + jfrParser.consumeStackTraces((threadId, stackTraceId, nanoTime) -> {}); + } finally { + file.delete(); + } + } }