Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,12 @@ public class InferredSpansProcessor implements SpanProcessor {
SpanAnchoredClock clock,
boolean startScheduledProfiling,
@Nullable File activationEventsFile,
@Nullable File jfrFile) {
@Nullable File jfrFile,
@Nullable File tempDir) {
this.config = config;
profiler =
new SamplingProfiler(config, clock, this::getTracer, activationEventsFile, jfrFile, null);
new SamplingProfiler(
config, clock, this::getTracer, activationEventsFile, jfrFile, tempDir);
if (startScheduledProfiling) {
profiler.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class InferredSpansProcessorBuilder {
private boolean startScheduledProfiling = true;
@Nullable private File activationEventsFile = null;
@Nullable private File jfrFile = null;
@Nullable private File tempDir = null;
private BiConsumer<SpanBuilder, SpanContext> parentOverrideHandler =
CallTree.DEFAULT_PARENT_OVERRIDE;

Expand All @@ -75,7 +76,7 @@ public InferredSpansProcessor build() {
parentOverrideHandler);
InferredSpansProcessor processor =
new InferredSpansProcessor(
config, clock, startScheduledProfiling, activationEventsFile, jfrFile);
config, clock, startScheduledProfiling, activationEventsFile, jfrFile, tempDir);
InferredSpans.setInstance(processor);
return processor;
}
Expand Down Expand Up @@ -205,6 +206,12 @@ InferredSpansProcessorBuilder jfrFile(@Nullable File jfrFile) {
return this;
}

/** For testing only. */
public InferredSpansProcessorBuilder tempDir(@Nullable File tempDir) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] not related to this PR, but we could probably replace usage of File with Path to prevent having to call toFile.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leaving this for another time

this.tempDir = tempDir;
return this;
}

/**
* Defines the action to perform when a inferred span is discovered to actually be the parent of a
* normal span. The first argument of the handler is the modifiable inferred span, the second
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -153,6 +154,7 @@ public class SamplingProfiler implements Runnable {
@Nullable private final File tempDir;

private final AsyncProfiler profiler;
private final ReentrantLock profilerLock = new ReentrantLock();
@Nullable private volatile Future<?> profilingTask;

/**
Expand Down Expand Up @@ -317,6 +319,9 @@ public boolean onActivation(Span activeSpan, @Nullable Span previouslyActive) {
if (previouslyActive == null) {
profiler.addThread(Thread.currentThread());
}
if (!config.isPostProcessingEnabled()) {
return true;
}
boolean success =
eventBuffer.tryPublishEvent(activationEventTranslator, activeSpan, previouslyActive);
if (!success) {
Expand All @@ -343,6 +348,9 @@ public boolean onDeactivation(Span deactivatedSpan, @Nullable Span previouslyAct
if (previouslyActive == null) {
profiler.removeThread(Thread.currentThread());
}
if (!config.isPostProcessingEnabled()) {
return true;
}
boolean success =
eventBuffer.tryPublishEvent(
deactivationEventTranslator, deactivatedSpan, previouslyActive);
Expand Down Expand Up @@ -373,7 +381,9 @@ public void run() {
Duration profilingDuration = config.getProfilingDuration();
boolean postProcessingEnabled = config.isPostProcessingEnabled();

setProfilingSessionOngoing(postProcessingEnabled);
// We need to enable the session so that onActivation is called and threads are added to the
// profiler (profiler.addThread). Otherwise, with the "filter" option, nothing is profiled.
setProfilingSessionOngoing(true);

if (postProcessingEnabled) {
logger.fine("Start full profiling session (async-profiler and agent processing)");
Expand All @@ -394,37 +404,62 @@ public void run() {
config.isNonStopProfiling() && !interrupted && postProcessingEnabled;
setProfilingSessionOngoing(continueProfilingSession);

if (!interrupted && !scheduler.isShutdown()) {
long delay = config.getProfilingInterval().toMillis() - profilingDuration.toMillis();
profilingTask = scheduler.schedule(this, delay, TimeUnit.MILLISECONDS);
profilerLock.lock();
try {
// it's possible for an interruption to occur just before the lock was acquired. This is
// handled by re-reading Thread.currentThread().isInterrupted() to ensure no task is scheduled
// if an interruption occurred just before acquiring the lock
if (!Thread.currentThread().isInterrupted() && !scheduler.isShutdown()) {
long delay = config.getProfilingInterval().toMillis() - profilingDuration.toMillis();
profilingTask = scheduler.schedule(this, delay, TimeUnit.MILLISECONDS);
}
} finally {
profilerLock.unlock();
}
}

@SuppressWarnings({"NonAtomicVolatileUpdate", "EmptyCatch"})
private void profile(Duration profilingDuration) throws Exception {
try {
String startCommand = createStartCommand();
String startMessage = profiler.execute(startCommand);
String startMessage;
try {
startMessage = profiler.execute(startCommand);
} catch (IllegalStateException e) {
if (e.getMessage() != null && e.getMessage().contains("already started")) {
logger.fine("Profiler already started. Stopping and restarting.");
try {
profiler.stop();
} catch (RuntimeException ignore) {
logger.log(Level.FINE, "Ignored error on stopping profiler", ignore);
}
startMessage = profiler.execute(startCommand);
} else {
throw e;
}
Comment on lines +429 to +439
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is is a known failure mode of async profiler here ? If so, then maybe having a dedicated method to wrap this "when exception is thrown stop, start and try again" logic could make it slightly more readeable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is testing fragility under concurrent test runs, I'd be surprised to see it get hit in production. Especially as dynamic updates to the test durations will be few and far in between in reality

}
logger.fine(startMessage);
if (!profiledThreads.isEmpty()) {
restoreFilterState(profiler);
try {
// try-finally because if the code is interrupted we want to ensure the
// profiler.execute("stop") is called
if (!profiledThreads.isEmpty()) {
restoreFilterState(profiler);
}
// Doesn't need to be atomic as this field is being updated only by a single thread
profilingSessions++;

// When post-processing is disabled activation events are ignored, but we still need to
// invoke this method as it is the one enforcing the sampling session duration. As a side
// effect it will also consume residual activation events if post-processing is disabled
// dynamically
consumeActivationEventsFromRingBufferAndWriteToFile(profilingDuration);
} finally {
String stopMessage = profiler.execute("stop");
logger.fine(stopMessage);
}
// Doesn't need to be atomic as this field is being updated only by a single thread
profilingSessions++;

// When post-processing is disabled activation events are ignored, but we still need to invoke
// this method
// as it is the one enforcing the sampling session duration. As a side effect it will also
// consume
// residual activation events if post-processing is disabled dynamically
consumeActivationEventsFromRingBufferAndWriteToFile(profilingDuration);

String stopMessage = profiler.execute("stop");
logger.fine(stopMessage);

// When post-processing is disabled, jfr file will not be parsed and the heavy processing will
// not occur
// as this method aborts when no activation events are buffered
// not occur as this method aborts when no activation events are buffered
processTraces();
} catch (InterruptedException | ClosedByInterruptException e) {
try {
Expand Down Expand Up @@ -505,6 +540,9 @@ EventPoller.PollState consumeActivationEventsFromRingBufferAndWriteToFile() thro
}

public void processTraces() throws IOException {
if (!config.isPostProcessingEnabled()) {
return;
}
if (jfrParser == null) {
jfrParser = new JfrParser();
}
Expand Down Expand Up @@ -739,18 +777,25 @@ public void start() {

@SuppressWarnings({"FutureReturnValueIgnored", "Interruption"})
public void reschedule() {
Future<?> future = this.profilingTask;
if (future != null) {
if (future.cancel(true)) {
profilerLock.lock();
try {
Future<?> future = this.profilingTask;
if (future != null && future.cancel(true)) {
Duration profilingDuration = config.getProfilingDuration();
long delay = config.getProfilingInterval().toMillis() - profilingDuration.toMillis();
profilingTask = scheduler.schedule(this, delay, TimeUnit.MILLISECONDS);
}
} finally {
profilerLock.unlock();
}
}

@SuppressWarnings({"FutureReturnValueIgnored", "Interruption"})
public void stop() throws InterruptedException, IOException {
// cancels/interrupts the profiling thread
if (profilingTask != null) {
profilingTask.cancel(true);
}
Comment on lines +796 to +798
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] I would suggest to also wrap the access to profilingTask and potentially modifying its state with the profilerLock, at least for consistency with the other call to .cancel(true) above. There is however no side effect on potentially calling this twice as the return value of cancel(...) is ignored.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leaving this for another time

// implicitly clears profiled threads
scheduler.shutdown();
scheduler.awaitTermination(10, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ public void parse(
this.includedClasses = includedClasses;
bufferedFile.setFile(file);
long fileSize = bufferedFile.size();
if (fileSize == 0) {
return;
}
Comment on lines +96 to +98
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the case of empty file covered by any existing test ? I haven't seen any change related to this in the tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, test added now


int chunkSize = readChunk(0);
if (chunkSize < fileSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,20 @@

import io.opentelemetry.contrib.inferredspans.internal.SamplingProfiler;
import io.opentelemetry.contrib.inferredspans.internal.util.DisabledOnOpenJ9;
import java.nio.file.Path;
import java.time.Duration;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
import org.junit.jupiter.api.io.TempDir;

@DisabledOnOs(OS.WINDOWS)
@DisabledOnOpenJ9
class InferredSpansTest {

@TempDir Path tempDir;
private ProfilerTestSetup setup;

@BeforeEach
Expand All @@ -40,7 +43,7 @@ void tearDown() {
void testIsEnabled() {
assertThat(InferredSpans.isEnabled()).isFalse();

setup = ProfilerTestSetup.create(c -> {});
setup = ProfilerTestSetup.create(c -> c.tempDir(tempDir.toFile()));

assertThat(InferredSpans.isEnabled()).isTrue();

Expand All @@ -61,7 +64,8 @@ void testSetProfilerIntervalWhenDisabled() {
ProfilerTestSetup.create(
c ->
c.profilerInterval(Duration.ofSeconds(10))
.profilingDuration(Duration.ofMillis(500)));
.profilingDuration(Duration.ofMillis(500))
.tempDir(tempDir.toFile()));

// assert that the interval set before the profiler was initialized is ignored
assertThat(setup.profiler.getConfig().getProfilingInterval()).isEqualTo(Duration.ofSeconds(10));
Expand All @@ -73,7 +77,8 @@ void testSetProfilerInterval() {
ProfilerTestSetup.create(
c ->
c.profilerInterval(Duration.ofSeconds(10))
.profilingDuration(Duration.ofMillis(500)));
.profilingDuration(Duration.ofMillis(500))
.tempDir(tempDir.toFile()));

SamplingProfiler profiler = setup.profiler;
await()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.lang.reflect.Method;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
Expand All @@ -44,6 +43,11 @@
@DisabledOnOpenJ9
class SamplingProfilerTest {

static {
// Needed to ensure ordering because tests things out of order
ProfilingActivationListener.ensureInitialized();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the benefit of explicitly calling this in the tests and not in production code ? is there any benefit of doing this and if yes maybe a comment could be welcome.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the ProfilingActivationListener has a static initializer which has to be executed before any Context access. This happens in production, but is not guaranteed in test because we start things out of order to setup tests

}

private ProfilerTestSetup setup;

@TempDir private Path tempDir;
Expand All @@ -58,12 +62,6 @@ void tearDown() {

@Test
void shouldLazilyCreateTempFilesAndCleanThem() {
for (Path file : getProfilerTempFiles()) {
if (!file.toFile().delete()) {
throw new IllegalStateException("Could not delete temp file: " + file);
}
}

// temporary files should be created on-demand, and properly deleted afterwards
setupProfiler(false);

Expand Down Expand Up @@ -91,9 +89,8 @@ void shouldLazilyCreateTempFilesAndCleanThem() {
.isEmpty();
}

private static List<Path> getProfilerTempFiles() {
Path tempFolder = Paths.get(System.getProperty("java.io.tmpdir"));
try (Stream<Path> files = Files.list(tempFolder)) {
private List<Path> getProfilerTempFiles() {
try (Stream<Path> files = Files.list(tempDir)) {
return files
.filter(f -> f.getFileName().toString().startsWith("otel-inferred-"))
.sorted()
Expand Down Expand Up @@ -327,7 +324,8 @@ private void setupProfiler(Consumer<InferredSpansProcessorBuilder> configCustomi
config
.profilingDuration(Duration.ofMillis(500))
.profilerInterval(Duration.ofMillis(500))
.samplingInterval(Duration.ofMillis(5));
.samplingInterval(Duration.ofMillis(5))
.tempDir(tempDir.toFile());
configCustomizer.accept(config);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,16 @@ void name() throws Exception {
});
assertThat(stackTraces.get()).isEqualTo(92);
}

@Test
void testParseEmptyFile() throws Exception {
File file = File.createTempFile("empty", ".jfr");
try {
JfrParser jfrParser = new JfrParser();
jfrParser.parse(file, Collections.emptyList(), Collections.emptyList());
jfrParser.consumeStackTraces((threadId, stackTraceId, nanoTime) -> {});
} finally {
file.delete();
}
}
}