Skip to content

Commit 95b0529

Browse files
committed
synchronize setting listener
Signed-off-by: Maciej Obuchowski <[email protected]>
1 parent 1c43c96 commit 95b0529

File tree

7 files changed

+164
-42
lines changed

7 files changed

+164
-42
lines changed

dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
import net.bytebuddy.asm.Advice;
1010
import org.apache.spark.SparkContext;
1111
import org.apache.spark.util.Utils;
12+
import org.slf4j.Logger;
13+
import org.slf4j.LoggerFactory;
1214

1315
@AutoService(InstrumenterModule.class)
1416
public class Spark212Instrumentation extends AbstractSparkInstrumentation {
@@ -44,8 +46,13 @@ public void methodAdvice(MethodTransformer transformer) {
4446
public static class InjectListener {
4547
@Advice.OnMethodEnter(suppress = Throwable.class)
4648
public static void enter(@Advice.This SparkContext sparkContext) {
47-
// checking whether OpenLineage integration is available and that it supports tags
48-
// Disabling this mechanism for this PR. Will be enabled with provided with Config option.
49+
// checking whether OpenLineage integration is enabled, available and that it supports tags
50+
Logger log = LoggerFactory.getLogger(Config.class);
51+
log.info(
52+
"IL: ADSL classloader: ({}) {}",
53+
System.identityHashCode(AbstractDatadogSparkListener.class.getClassLoader()),
54+
AbstractDatadogSparkListener.class.getClassLoader());
55+
4956
if (Config.get().isDataJobsOpenLineageEnabled()
5057
&& Utils.classIsLoadable("io.openlineage.spark.agent.OpenLineageSparkListener")
5158
&& Utils.classIsLoadable(

dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java

+1-4
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,7 @@ public void methodAdvice(MethodTransformer transformer) {
4444
public static class InjectListener {
4545
@Advice.OnMethodEnter(suppress = Throwable.class)
4646
public static void enter(@Advice.This SparkContext sparkContext) {
47-
// Checking whether OpenLineage integration is available and that it supports tags
48-
// Disabling this mechanism for this PR. Will be enabled with provided with Config
49-
// option.
50-
47+
// checking whether OpenLineage integration is enabled, available and that it supports tags
5148
if (Config.get().isDataJobsOpenLineageEnabled()
5249
&& Utils.classIsLoadable("io.openlineage.spark.agent.OpenLineageSparkListener")
5350
&& Utils.classIsLoadable(

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

+34-10
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,6 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
6969
private static final Logger log = LoggerFactory.getLogger(AbstractDatadogSparkListener.class);
7070
private static final ObjectMapper objectMapper = new ObjectMapper();
7171
public static volatile AbstractDatadogSparkListener listener = null;
72-
public static volatile SparkListenerInterface openLineageSparkListener = null;
73-
public static volatile SparkConf openLineageSparkConf = null;
7472

7573
public static volatile boolean finishTraceOnApplicationEnd = true;
7674
public static volatile boolean isPysparkShell = false;
@@ -84,6 +82,9 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
8482
private final String sparkVersion;
8583
private final String appId;
8684

85+
public static volatile SparkListenerInterface openLineageSparkListener = null;
86+
public static volatile SparkConf openLineageSparkConf = null;
87+
8788
private final AgentTracer.TracerAPI tracer;
8889

8990
// This is created by constructor, and used if we're not in other known
@@ -166,7 +167,21 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
166167
}
167168

168169
public void setupOpenLineage(DDTraceId traceId) {
169-
log.debug("Setting up OpenLineage tags");
170+
log.debug("Setting up OpenLineage: {} {}", openLineageSparkListener, openLineageSparkConf);
171+
172+
log.info(
173+
"Classloader for SL: ({}) {}",
174+
System.identityHashCode(openLineageSparkListener.getClass().getClassLoader()),
175+
openLineageSparkListener.getClass().getClassLoader());
176+
log.info(
177+
"Classloader for ADSL: ({}) {}",
178+
System.identityHashCode(AbstractDatadogSparkListener.class.getClassLoader()),
179+
AbstractDatadogSparkListener.class.getClassLoader());
180+
log.info(
181+
"Current thread class loader: ({}) {}",
182+
System.identityHashCode(Thread.currentThread().getContextClassLoader()),
183+
Thread.currentThread().getContextClassLoader());
184+
170185
if (openLineageSparkListener != null) {
171186
openLineageSparkConf.set("spark.openlineage.transport.type", "composite");
172187
openLineageSparkConf.set("spark.openlineage.transport.continueOnFailure", "true");
@@ -178,10 +193,18 @@ public void setupOpenLineage(DDTraceId traceId) {
178193
openLineageSparkConf.set("spark.openlineage.transport.transports.agent.compression", "gzip");
179194
openLineageSparkConf.set(
180195
"spark.openlineage.run.tags",
181-
"_dd.trace_id:" + traceId.toString() + ";_dd.ol_intake.emit_spans:false");
196+
"_dd.trace_id:"
197+
+ traceId.toString()
198+
+ ";_dd.ol_intake.emit_spans:false;dd.ol_service:"
199+
+ sparkServiceName);
182200
return;
183201
}
184-
log.debug("No OpenLineageSparkListener!");
202+
log.info(
203+
"There is no OpenLineage Spark Listener in the context. Skipping setting tags. {}",
204+
openLineageSparkListener);
205+
log.info(
206+
"There is no OpenLineage SparkConf in the context. Skipping setting tags. {}",
207+
openLineageSparkConf);
185208
}
186209

187210
/** Resource name of the spark job. Provide an implementation based on a specific scala version */
@@ -206,11 +229,12 @@ public void setupOpenLineage(DDTraceId traceId) {
206229
public synchronized void onApplicationStart(SparkListenerApplicationStart applicationStart) {
207230
this.applicationStart = applicationStart;
208231

209-
if (Config.get().isDataJobsOpenLineageEnabled()) {
210-
setupOpenLineage(
232+
if (openLineageSparkListener != null) {
233+
DDTraceId traceId =
211234
OpenlineageParentContext.from(sparkConf)
212235
.map(context -> context.getTraceId())
213-
.orElse(predeterminedTraceIdContext.getTraceId()));
236+
.orElse(predeterminedTraceIdContext.getTraceId());
237+
setupOpenLineage(traceId);
214238
}
215239
notifyOl(x -> openLineageSparkListener.onApplicationStart(x), applicationStart);
216240
}
@@ -753,7 +777,7 @@ public void onOtherEvent(SparkListenerEvent event) {
753777

754778
private <T extends SparkListenerEvent> void notifyOl(Consumer<T> ol, T event) {
755779
if (!Config.get().isDataJobsOpenLineageEnabled()) {
756-
log.trace("Ignoring event {} - OpenLineage not enabled", event);
780+
log.debug("Ignoring event {} - OpenLineage not enabled", event);
757781
return;
758782
}
759783

@@ -766,7 +790,7 @@ private <T extends SparkListenerEvent> void notifyOl(Consumer<T> ol, T event) {
766790
"Passing event `{}` to OpenLineageSparkListener", event.getClass().getCanonicalName());
767791
ol.accept(event);
768792
} else {
769-
log.trace("OpenLineageSparkListener is null");
793+
log.debug("OpenLineageSparkListener is null");
770794
}
771795
}
772796

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java

+12-11
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import net.bytebuddy.asm.Advice;
1313
import org.apache.spark.deploy.SparkSubmitArguments;
1414
import org.apache.spark.scheduler.SparkListenerInterface;
15+
import org.slf4j.Logger;
1516
import org.slf4j.LoggerFactory;
1617

1718
public abstract class AbstractSparkInstrumentation extends InstrumenterModule.Tracing
@@ -119,17 +120,17 @@ public static class LiveListenerBusAdvice {
119120
@Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class)
120121
// If OL is disabled in tracer config but user set it up manually don't interfere
121122
public static boolean enter(@Advice.Argument(0) SparkListenerInterface listener) {
122-
if (!Config.get().isDataJobsOpenLineageEnabled()
123-
|| listener == null
124-
|| listener.getClass().getCanonicalName() == null) {
125-
return false;
126-
}
127-
if (listener
128-
.getClass()
129-
.getCanonicalName()
130-
.equals("io.openlineage.spark.agent.OpenLineageSparkListener")) {
131-
LoggerFactory.getLogger(Config.class)
132-
.debug("Detected OL listener, skipping adding to ListenerBus");
123+
Logger log = LoggerFactory.getLogger(Config.class);
124+
log.info(
125+
"LLBA: ADSL classloader: ({}) {}",
126+
System.identityHashCode(AbstractDatadogSparkListener.class.getClassLoader()),
127+
AbstractDatadogSparkListener.class.getClassLoader());
128+
if (Config.get().isDataJobsOpenLineageEnabled()
129+
&& listener != null
130+
&& "io.openlineage.spark.agent.OpenLineageSparkListener"
131+
.equals(listener.getClass().getCanonicalName())) {
132+
LoggerFactory.getLogger("LiveListenerBusAdvice")
133+
.debug("Detected OpenLineage listener, skipping adding to ListenerBus");
133134
return true;
134135
}
135136
return false;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package datadog.trace.instrumentation.spark;
2+
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
6+
public class ClassLoaderUtils {
7+
public static final Logger log = LoggerFactory.getLogger(ClassLoaderUtils.class);
8+
9+
/** Print the classloader hierarchy for the current thread */
10+
public static void printClassLoaderHierarchy() {
11+
ClassLoader loader = Thread.currentThread().getContextClassLoader();
12+
printClassLoaderHierarchy(loader);
13+
}
14+
15+
/** Print the classloader hierarchy for a specific class */
16+
public static void printClassLoaderHierarchy(Class<?> clazz) {
17+
log.info("ClassLoader hierarchy for " + clazz.getName() + ":");
18+
printClassLoaderHierarchy(clazz.getClassLoader());
19+
}
20+
21+
/** Print the classloader hierarchy starting from a specific classloader */
22+
public static void printClassLoaderHierarchy(ClassLoader classLoader) {
23+
if (classLoader == null) {
24+
log.info("Bootstrap ClassLoader (null)");
25+
return;
26+
}
27+
28+
int level = 0;
29+
ClassLoader current = classLoader;
30+
31+
while (current != null) {
32+
log.info(
33+
getIndent(level)
34+
+ "→ "
35+
+ current.getClass().getName()
36+
+ "@"
37+
+ Integer.toHexString(System.identityHashCode(current)));
38+
39+
// Print URLs for URLClassLoaders
40+
// if (current instanceof URLClassLoader) {
41+
// URLClassLoader urlLoader = (URLClassLoader) current;
42+
// for (URL url : urlLoader.getURLs()) {
43+
// log.info(getIndent(level + 1) + "- " + url);
44+
// }
45+
// }
46+
47+
current = current.getParent();
48+
level++;
49+
}
50+
51+
log.info(getIndent(level) + "→ Bootstrap ClassLoader (null)");
52+
}
53+
54+
private static String getIndent(int level) {
55+
StringBuilder indent = new StringBuilder();
56+
for (int i = 0; i < level; i++) {
57+
indent.append(" ");
58+
}
59+
return indent.toString();
60+
}
61+
}

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenLineageInstrumentation.java

+46-14
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import net.bytebuddy.asm.Advice;
1414
import org.apache.spark.SparkConf;
1515
import org.apache.spark.scheduler.SparkListenerInterface;
16+
import org.slf4j.Logger;
1617
import org.slf4j.LoggerFactory;
1718

1819
@AutoService(InstrumenterModule.class)
@@ -36,6 +37,7 @@ public String[] helperClassNames() {
3637
packageName + ".SparkSQLUtils",
3738
packageName + ".SparkSQLUtils$SparkPlanInfoForStage",
3839
packageName + ".SparkSQLUtils$AccumulatorWithStage",
40+
packageName + ".ClassLoaderUtils",
3941
};
4042
}
4143

@@ -46,9 +48,7 @@ public boolean defaultEnabled() {
4648

4749
@Override
4850
public String[] knownMatchingTypes() {
49-
return new String[] {
50-
"io.openlineage.spark.agent.OpenLineageSparkListener", "org.apache.spark.util.Utils"
51-
};
51+
return new String[] {"io.openlineage.spark.agent.OpenLineageSparkListener"};
5252
}
5353

5454
@Override
@@ -63,18 +63,50 @@ public void methodAdvice(MethodTransformer transformer) {
6363

6464
public static class OpenLineageSparkListenerAdvice {
6565
@Advice.OnMethodExit(suppress = Throwable.class)
66-
public static void exit(@Advice.This Object self) throws IllegalAccessException {
67-
LoggerFactory.getLogger(Config.class).debug("Checking for OpenLineageSparkListener");
68-
try {
69-
Field conf = self.getClass().getDeclaredField("conf");
70-
conf.setAccessible(true);
71-
AbstractDatadogSparkListener.openLineageSparkConf = (SparkConf) conf.get(self);
66+
public static void exit(@Advice.This Object self, @Advice.FieldValue("conf") SparkConf conf) {
67+
// try {
68+
Logger log = LoggerFactory.getLogger(Config.class);
69+
ClassLoaderUtils.printClassLoaderHierarchy(
70+
AbstractDatadogSparkListener.class.getClassLoader());
71+
log.info(
72+
"OLSLA: ADSL classloader: ({}) {}",
73+
System.identityHashCode(AbstractDatadogSparkListener.class.getClassLoader()),
74+
AbstractDatadogSparkListener.class.getClassLoader());
75+
log.info(
76+
"OLSLA: thread class loader ({}) {}",
77+
System.identityHashCode(Thread.currentThread().getContextClassLoader()),
78+
Thread.currentThread().getContextClassLoader());
79+
log.info(
80+
"OLSLA: parent thread class loader ({}) {}",
81+
System.identityHashCode(Thread.currentThread().getContextClassLoader().getParent()),
82+
Thread.currentThread().getContextClassLoader().getParent());
83+
84+
ClassLoader cl = Thread.currentThread().getContextClassLoader();
85+
if (cl.getClass().getName().contains("MutableURLClassLoader")
86+
|| cl.getClass().getName().contains("ChildFirstURLClassLoader")) {
87+
log.debug(
88+
"Detected MutableURLClassLoader. Setting OpenLineage on AbstractDatadogSparkListener.class of parent classloader");
89+
try {
90+
log.debug(
91+
"Parent classloader: ({}) {}",
92+
System.identityHashCode(cl.getParent()),
93+
cl.getParent());
94+
Class clazz = cl.getParent().loadClass(AbstractDatadogSparkListener.class.getName());
95+
Field openLineageSparkListener = clazz.getDeclaredField("openLineageSparkListener");
96+
openLineageSparkListener.setAccessible(true);
97+
openLineageSparkListener.set(null, self);
98+
99+
Field openLineageSparkConf = clazz.getDeclaredField("openLineageSparkConf");
100+
openLineageSparkConf.setAccessible(true);
101+
openLineageSparkConf.set(null, conf);
102+
} catch (Throwable e) {
103+
log.info("Failed to setup OpenLineage", e);
104+
}
105+
} else {
106+
log.debug(
107+
"Detected other classloader than MutableURLClassLoader. Setting OpenLineage on AbstractDatadogSparkListener.class");
72108
AbstractDatadogSparkListener.openLineageSparkListener = (SparkListenerInterface) self;
73-
LoggerFactory.getLogger(Config.class)
74-
.debug("Detected OpenLineageSparkListener, passed to DatadogSparkListener");
75-
} catch (NoSuchFieldException | IllegalAccessException e) {
76-
LoggerFactory.getLogger(Config.class)
77-
.debug("Failed to pass OpenLineageSparkListener to DatadogSparkListener", e);
109+
AbstractDatadogSparkListener.openLineageSparkConf = conf;
78110
}
79111
}
80112
}

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenlineageParentContext.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ public PathwayContext getPathwayContext() {
145145

146146
@Override
147147
public boolean isRemote() {
148-
return false;
148+
return true;
149149
}
150150

151151
public String getParentJobNamespace() {

0 commit comments

Comments
 (0)