Skip to content

Commit e598da6

Browse files
committed
synchronize setting listener
Signed-off-by: Maciej Obuchowski <[email protected]>
1 parent 1c43c96 commit e598da6

File tree

5 files changed

+94
-52
lines changed

5 files changed

+94
-52
lines changed

Diff for: dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ public void methodAdvice(MethodTransformer transformer) {
4444
public static class InjectListener {
4545
@Advice.OnMethodEnter(suppress = Throwable.class)
4646
public static void enter(@Advice.This SparkContext sparkContext) {
47-
// checking whether OpenLineage integration is available and that it supports tags
48-
// Disabling this mechanism for this PR. Will be enabled with provided with Config option.
47+
// checking whether OpenLineage integration is enabled, available and that it supports tags
48+
4949
if (Config.get().isDataJobsOpenLineageEnabled()
5050
&& Utils.classIsLoadable("io.openlineage.spark.agent.OpenLineageSparkListener")
5151
&& Utils.classIsLoadable(

Diff for: dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java

+1-4
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,7 @@ public void methodAdvice(MethodTransformer transformer) {
4444
public static class InjectListener {
4545
@Advice.OnMethodEnter(suppress = Throwable.class)
4646
public static void enter(@Advice.This SparkContext sparkContext) {
47-
// Checking whether OpenLineage integration is available and that it supports tags
48-
// Disabling this mechanism for this PR. Will be enabled with provided with Config
49-
// option.
50-
47+
// checking whether OpenLineage integration is enabled, available and that it supports tags
5148
if (Config.get().isDataJobsOpenLineageEnabled()
5249
&& Utils.classIsLoadable("io.openlineage.spark.agent.OpenLineageSparkListener")
5350
&& Utils.classIsLoadable(

Diff for: dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

+47-21
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.Optional;
3737
import java.util.Properties;
3838
import java.util.UUID;
39+
import java.util.concurrent.atomic.AtomicReference;
3940
import java.util.function.Consumer;
4041
import org.apache.spark.ExceptionFailure;
4142
import org.apache.spark.SparkConf;
@@ -69,7 +70,8 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
6970
private static final Logger log = LoggerFactory.getLogger(AbstractDatadogSparkListener.class);
7071
private static final ObjectMapper objectMapper = new ObjectMapper();
7172
public static volatile AbstractDatadogSparkListener listener = null;
72-
public static volatile SparkListenerInterface openLineageSparkListener = null;
73+
public static volatile AtomicReference<SparkListenerInterface> openLineageSparkListener =
74+
new AtomicReference<>(null);
7375
public static volatile SparkConf openLineageSparkConf = null;
7476

7577
public static volatile boolean finishTraceOnApplicationEnd = true;
@@ -167,21 +169,45 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
167169

168170
public void setupOpenLineage(DDTraceId traceId) {
169171
log.debug("Setting up OpenLineage tags");
170-
if (openLineageSparkListener != null) {
171-
openLineageSparkConf.set("spark.openlineage.transport.type", "composite");
172-
openLineageSparkConf.set("spark.openlineage.transport.continueOnFailure", "true");
173-
openLineageSparkConf.set("spark.openlineage.transport.transports.agent.type", "http");
174-
openLineageSparkConf.set(
172+
log.info(
173+
"Classloader for current variable: ({}) {}",
174+
System.identityHashCode(
175+
AbstractDatadogSparkListener.openLineageSparkListener
176+
.get()
177+
.getClass()
178+
.getClassLoader()),
179+
AbstractDatadogSparkListener.openLineageSparkListener.get().getClass().getClassLoader());
180+
log.info(
181+
"Current thread class loader: ({}) {}",
182+
System.identityHashCode(Thread.currentThread().getContextClassLoader()),
183+
Thread.currentThread().getContextClassLoader());
184+
if (AbstractDatadogSparkListener.openLineageSparkListener.get() != null) {
185+
AbstractDatadogSparkListener.openLineageSparkConf.set(
186+
"spark.openlineage.transport.type", "composite");
187+
AbstractDatadogSparkListener.openLineageSparkConf.set(
188+
"spark.openlineage.transport.continueOnFailure", "true");
189+
AbstractDatadogSparkListener.openLineageSparkConf.set(
190+
"spark.openlineage.transport.transports.agent.type", "http");
191+
AbstractDatadogSparkListener.openLineageSparkConf.set(
175192
"spark.openlineage.transport.transports.agent.url", getAgentHttpUrl());
176-
openLineageSparkConf.set(
193+
AbstractDatadogSparkListener.openLineageSparkConf.set(
177194
"spark.openlineage.transport.transports.agent.endpoint", AGENT_OL_ENDPOINT);
178-
openLineageSparkConf.set("spark.openlineage.transport.transports.agent.compression", "gzip");
179-
openLineageSparkConf.set(
195+
AbstractDatadogSparkListener.openLineageSparkConf.set(
196+
"spark.openlineage.transport.transports.agent.compression", "gzip");
197+
AbstractDatadogSparkListener.openLineageSparkConf.set(
180198
"spark.openlineage.run.tags",
181-
"_dd.trace_id:" + traceId.toString() + ";_dd.ol_intake.emit_spans:false");
199+
"_dd.trace_id:"
200+
+ traceId.toString()
201+
+ ";_dd.ol_intake.emit_spans:false;dd.ol_service:"
202+
+ sparkServiceName);
182203
return;
183204
}
184-
log.debug("No OpenLineageSparkListener!");
205+
log.info(
206+
"There is no OpenLineage Spark Listener in the context. Skipping setting tags. {}",
207+
AbstractDatadogSparkListener.openLineageSparkListener.get());
208+
log.info(
209+
"There is no OpenLineage SparkConf in the context. Skipping setting tags. {}",
210+
AbstractDatadogSparkListener.openLineageSparkConf);
185211
}
186212

187213
/** Resource name of the spark job. Provide an implementation based on a specific scala version */
@@ -212,7 +238,7 @@ public synchronized void onApplicationStart(SparkListenerApplicationStart applic
212238
.map(context -> context.getTraceId())
213239
.orElse(predeterminedTraceIdContext.getTraceId()));
214240
}
215-
notifyOl(x -> openLineageSparkListener.onApplicationStart(x), applicationStart);
241+
notifyOl(x -> openLineageSparkListener.get().onApplicationStart(x), applicationStart);
216242
}
217243

218244
private void initApplicationSpanIfNotInitialized() {
@@ -273,7 +299,7 @@ public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
273299
log.info(
274300
"Received spark application end event, finish trace on this event: {}",
275301
finishTraceOnApplicationEnd);
276-
notifyOl(x -> openLineageSparkListener.onApplicationEnd(x), applicationEnd);
302+
notifyOl(x -> openLineageSparkListener.get().onApplicationEnd(x), applicationEnd);
277303

278304
if (finishTraceOnApplicationEnd) {
279305
finishApplication(applicationEnd.time(), null, 0, null);
@@ -468,7 +494,7 @@ public synchronized void onJobStart(SparkListenerJobStart jobStart) {
468494
stageToJob.put(stageId, jobStart.jobId());
469495
}
470496
jobSpans.put(jobStart.jobId(), jobSpan);
471-
notifyOl(x -> openLineageSparkListener.onJobStart(x), jobStart);
497+
notifyOl(x -> openLineageSparkListener.get().onJobStart(x), jobStart);
472498
}
473499

474500
@Override
@@ -499,7 +525,7 @@ public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) {
499525
if (metrics != null) {
500526
metrics.setSpanMetrics(jobSpan);
501527
}
502-
notifyOl(x -> openLineageSparkListener.onJobEnd(x), jobEnd);
528+
notifyOl(x -> openLineageSparkListener.get().onJobEnd(x), jobEnd);
503529

504530
jobSpan.finish(jobEnd.time() * 1000);
505531
}
@@ -669,7 +695,7 @@ public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
669695
Properties props = stageProperties.get(stageSpanKey);
670696
sendTaskSpan(stageSpan, taskEnd, props);
671697

672-
notifyOl(x -> openLineageSparkListener.onTaskEnd(x), taskEnd);
698+
notifyOl(x -> openLineageSparkListener.get().onTaskEnd(x), taskEnd);
673699
}
674700

675701
private void sendTaskSpan(
@@ -753,20 +779,20 @@ public void onOtherEvent(SparkListenerEvent event) {
753779

754780
private <T extends SparkListenerEvent> void notifyOl(Consumer<T> ol, T event) {
755781
if (!Config.get().isDataJobsOpenLineageEnabled()) {
756-
log.trace("Ignoring event {} - OpenLineage not enabled", event);
782+
log.debug("Ignoring event {} - OpenLineage not enabled", event);
757783
return;
758784
}
759785

760786
if (isRunningOnDatabricks || isStreamingJob) {
761787
log.debug("Not emitting event when running on databricks or on streaming jobs");
762788
return;
763789
}
764-
if (openLineageSparkListener != null) {
790+
if (AbstractDatadogSparkListener.openLineageSparkListener.get() != null) {
765791
log.debug(
766792
"Passing event `{}` to OpenLineageSparkListener", event.getClass().getCanonicalName());
767793
ol.accept(event);
768794
} else {
769-
log.trace("OpenLineageSparkListener is null");
795+
log.debug("OpenLineageSparkListener is null");
770796
}
771797
}
772798

@@ -818,7 +844,7 @@ private synchronized void updateAdaptiveSQLPlan(SparkListenerEvent event) {
818844
private synchronized void onSQLExecutionStart(SparkListenerSQLExecutionStart sqlStart) {
819845
sqlPlans.put(sqlStart.executionId(), sqlStart.sparkPlanInfo());
820846
sqlQueries.put(sqlStart.executionId(), sqlStart);
821-
notifyOl(x -> openLineageSparkListener.onOtherEvent(x), sqlStart);
847+
notifyOl(x -> openLineageSparkListener.get().onOtherEvent(x), sqlStart);
822848
}
823849

824850
private synchronized void onSQLExecutionEnd(SparkListenerSQLExecutionEnd sqlEnd) {
@@ -831,7 +857,7 @@ private synchronized void onSQLExecutionEnd(SparkListenerSQLExecutionEnd sqlEnd)
831857
if (metrics != null) {
832858
metrics.setSpanMetrics(span);
833859
}
834-
notifyOl(x -> openLineageSparkListener.onOtherEvent(x), sqlEnd);
860+
notifyOl(x -> openLineageSparkListener.get().onOtherEvent(x), sqlEnd);
835861

836862
span.finish(sqlEnd.time() * 1000);
837863
}

Diff for: dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java

+6-11
Original file line numberDiff line numberDiff line change
@@ -119,17 +119,12 @@ public static class LiveListenerBusAdvice {
119119
@Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class)
120120
// If OL is disabled in tracer config but user set it up manually don't interfere
121121
public static boolean enter(@Advice.Argument(0) SparkListenerInterface listener) {
122-
if (!Config.get().isDataJobsOpenLineageEnabled()
123-
|| listener == null
124-
|| listener.getClass().getCanonicalName() == null) {
125-
return false;
126-
}
127-
if (listener
128-
.getClass()
129-
.getCanonicalName()
130-
.equals("io.openlineage.spark.agent.OpenLineageSparkListener")) {
131-
LoggerFactory.getLogger(Config.class)
132-
.debug("Detected OL listener, skipping adding to ListenerBus");
122+
if (Config.get().isDataJobsOpenLineageEnabled()
123+
&& listener != null
124+
&& "io.openlineage.spark.agent.OpenLineageSparkListener"
125+
.equals(listener.getClass().getCanonicalName())) {
126+
LoggerFactory.getLogger("LiveListenerBusAdvice")
127+
.debug("Detected OpenLineage listener, skipping adding to ListenerBus");
133128
return true;
134129
}
135130
return false;

Diff for: dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenLineageInstrumentation.java

+38-14
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@
99
import datadog.trace.agent.tooling.Instrumenter;
1010
import datadog.trace.agent.tooling.InstrumenterModule;
1111
import datadog.trace.api.Config;
12-
import java.lang.reflect.Field;
1312
import net.bytebuddy.asm.Advice;
1413
import org.apache.spark.SparkConf;
1514
import org.apache.spark.scheduler.SparkListenerInterface;
15+
import org.slf4j.Logger;
1616
import org.slf4j.LoggerFactory;
1717

1818
@AutoService(InstrumenterModule.class)
@@ -46,9 +46,7 @@ public boolean defaultEnabled() {
4646

4747
@Override
4848
public String[] knownMatchingTypes() {
49-
return new String[] {
50-
"io.openlineage.spark.agent.OpenLineageSparkListener", "org.apache.spark.util.Utils"
51-
};
49+
return new String[] {"io.openlineage.spark.agent.OpenLineageSparkListener"};
5250
}
5351

5452
@Override
@@ -63,18 +61,44 @@ public void methodAdvice(MethodTransformer transformer) {
6361

6462
public static class OpenLineageSparkListenerAdvice {
6563
@Advice.OnMethodExit(suppress = Throwable.class)
66-
public static void exit(@Advice.This Object self) throws IllegalAccessException {
67-
LoggerFactory.getLogger(Config.class).debug("Checking for OpenLineageSparkListener");
64+
public static void exit(@Advice.This Object self, @Advice.FieldValue("conf") SparkConf conf) {
6865
try {
69-
Field conf = self.getClass().getDeclaredField("conf");
70-
conf.setAccessible(true);
71-
AbstractDatadogSparkListener.openLineageSparkConf = (SparkConf) conf.get(self);
72-
AbstractDatadogSparkListener.openLineageSparkListener = (SparkListenerInterface) self;
73-
LoggerFactory.getLogger(Config.class)
74-
.debug("Detected OpenLineageSparkListener, passed to DatadogSparkListener");
75-
} catch (NoSuchFieldException | IllegalAccessException e) {
66+
Logger log = LoggerFactory.getLogger(Config.class);
67+
AbstractDatadogSparkListener.openLineageSparkConf = conf;
68+
69+
log.debug(
70+
"ADSL classloader ({}) {}",
71+
System.identityHashCode(AbstractDatadogSparkListener.class.getClassLoader()),
72+
AbstractDatadogSparkListener.class.getClassLoader());
73+
log.debug(
74+
"Current classloader ({}) {}",
75+
System.identityHashCode(Thread.currentThread().getContextClassLoader()),
76+
Thread.currentThread().getContextClassLoader().toString());
77+
log.debug("Got OpenLineageSparkListener: {}", self);
78+
AbstractDatadogSparkListener.openLineageSparkListener.set((SparkListenerInterface) self);
79+
log.debug(
80+
"Set OpenLineageSparkListener: {}",
81+
AbstractDatadogSparkListener.openLineageSparkListener.get());
82+
log.debug("Check for self: {}", self);
83+
if (AbstractDatadogSparkListener.openLineageSparkListener.get() != null) {
84+
log.debug(
85+
"Detected OpenLineageSparkListener, passed to DatadogSparkListener {} on ClassLoader ({}) {}",
86+
AbstractDatadogSparkListener.openLineageSparkListener.get().getClass().getName(),
87+
System.identityHashCode(
88+
AbstractDatadogSparkListener.openLineageSparkListener
89+
.get()
90+
.getClass()
91+
.getClassLoader()),
92+
AbstractDatadogSparkListener.openLineageSparkListener
93+
.get()
94+
.getClass()
95+
.getClassLoader());
96+
} else {
97+
log.debug("WTF it's null");
98+
}
99+
} catch (Exception e) {
76100
LoggerFactory.getLogger(Config.class)
77-
.debug("Failed to pass OpenLineageSparkListener to DatadogSparkListener", e);
101+
.error("Failed to set OpenLineageSparkListener: {}", e.getMessage());
78102
}
79103
}
80104
}

0 commit comments

Comments
 (0)