@@ -70,9 +70,6 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
70
70
private static final Logger log = LoggerFactory .getLogger (AbstractDatadogSparkListener .class );
71
71
private static final ObjectMapper objectMapper = new ObjectMapper ();
72
72
public static volatile AbstractDatadogSparkListener listener = null ;
73
- public static volatile AtomicReference <SparkListenerInterface > openLineageSparkListener =
74
- new AtomicReference <>(null );
75
- public static volatile SparkConf openLineageSparkConf = null ;
76
73
77
74
public static volatile boolean finishTraceOnApplicationEnd = true ;
78
75
public static volatile boolean isPysparkShell = false ;
@@ -86,6 +83,10 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
86
83
private final String sparkVersion ;
87
84
private final String appId ;
88
85
86
+ public volatile AtomicReference <SparkListenerInterface > openLineageSparkListener =
87
+ new AtomicReference <>(null );
88
+ public volatile SparkConf openLineageSparkConf = null ;
89
+
89
90
private final AgentTracer .TracerAPI tracer ;
90
91
91
92
// This is created by constructor, and used if we're not in other known
@@ -167,34 +168,38 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
167
168
}));
168
169
}
169
170
170
- public void setupOpenLineage (DDTraceId traceId ) {
171
- log .debug ("Setting up OpenLineage tags" );
171
+ public void setupOpenLineage (SparkConf olSparkconf , SparkListenerInterface olSparkListener ) {
172
+ log .debug ("Setting up OpenLineage: {} {}" , olSparkconf , olSparkListener );
173
+ DDTraceId traceId =
174
+ OpenlineageParentContext .from (sparkConf )
175
+ .map (context -> context .getTraceId ())
176
+ .orElse (predeterminedTraceIdContext .getTraceId ());
177
+
178
+ openLineageSparkListener .set (olSparkListener );
179
+ openLineageSparkConf = olSparkconf ;
180
+
181
+ log .info (
182
+ "Classloader for SL: ({}) {}" ,
183
+ System .identityHashCode (openLineageSparkListener .getClass ().getClassLoader ()),
184
+ openLineageSparkListener .getClass ().getClassLoader ());
172
185
log .info (
173
- "Classloader for current variable: ({}) {}" ,
174
- System .identityHashCode (
175
- AbstractDatadogSparkListener .openLineageSparkListener
176
- .get ()
177
- .getClass ()
178
- .getClassLoader ()),
179
- AbstractDatadogSparkListener .openLineageSparkListener .get ().getClass ().getClassLoader ());
186
+ "Classloader for ADSL: ({}) {}" ,
187
+ System .identityHashCode (AbstractDatadogSparkListener .class .getClassLoader ()),
188
+ AbstractDatadogSparkListener .class .getClassLoader ());
180
189
log .info (
181
190
"Current thread class loader: ({}) {}" ,
182
191
System .identityHashCode (Thread .currentThread ().getContextClassLoader ()),
183
192
Thread .currentThread ().getContextClassLoader ());
184
- if (AbstractDatadogSparkListener .openLineageSparkListener .get () != null ) {
185
- AbstractDatadogSparkListener .openLineageSparkConf .set (
186
- "spark.openlineage.transport.type" , "composite" );
187
- AbstractDatadogSparkListener .openLineageSparkConf .set (
188
- "spark.openlineage.transport.continueOnFailure" , "true" );
189
- AbstractDatadogSparkListener .openLineageSparkConf .set (
190
- "spark.openlineage.transport.transports.agent.type" , "http" );
191
- AbstractDatadogSparkListener .openLineageSparkConf .set (
193
+ if (openLineageSparkListener .get () != null ) {
194
+ openLineageSparkConf .set ("spark.openlineage.transport.type" , "composite" );
195
+ openLineageSparkConf .set ("spark.openlineage.transport.continueOnFailure" , "true" );
196
+ openLineageSparkConf .set ("spark.openlineage.transport.transports.agent.type" , "http" );
197
+ openLineageSparkConf .set (
192
198
"spark.openlineage.transport.transports.agent.url" , getAgentHttpUrl ());
193
- AbstractDatadogSparkListener . openLineageSparkConf .set (
199
+ openLineageSparkConf .set (
194
200
"spark.openlineage.transport.transports.agent.endpoint" , AGENT_OL_ENDPOINT );
195
- AbstractDatadogSparkListener .openLineageSparkConf .set (
196
- "spark.openlineage.transport.transports.agent.compression" , "gzip" );
197
- AbstractDatadogSparkListener .openLineageSparkConf .set (
201
+ openLineageSparkConf .set ("spark.openlineage.transport.transports.agent.compression" , "gzip" );
202
+ openLineageSparkConf .set (
198
203
"spark.openlineage.run.tags" ,
199
204
"_dd.trace_id:"
200
205
+ traceId .toString ()
@@ -204,10 +209,10 @@ public void setupOpenLineage(DDTraceId traceId) {
204
209
}
205
210
log .info (
206
211
"There is no OpenLineage Spark Listener in the context. Skipping setting tags. {}" ,
207
- AbstractDatadogSparkListener . openLineageSparkListener .get ());
212
+ openLineageSparkListener .get ());
208
213
log .info (
209
214
"There is no OpenLineage SparkConf in the context. Skipping setting tags. {}" ,
210
- AbstractDatadogSparkListener . openLineageSparkConf );
215
+ openLineageSparkConf );
211
216
}
212
217
213
218
/** Resource name of the spark job. Provide an implementation based on a specific scala version */
@@ -231,13 +236,6 @@ public void setupOpenLineage(DDTraceId traceId) {
231
236
@ Override
232
237
public synchronized void onApplicationStart (SparkListenerApplicationStart applicationStart ) {
233
238
this .applicationStart = applicationStart ;
234
-
235
- if (Config .get ().isDataJobsOpenLineageEnabled ()) {
236
- setupOpenLineage (
237
- OpenlineageParentContext .from (sparkConf )
238
- .map (context -> context .getTraceId ())
239
- .orElse (predeterminedTraceIdContext .getTraceId ()));
240
- }
241
239
notifyOl (x -> openLineageSparkListener .get ().onApplicationStart (x ), applicationStart );
242
240
}
243
241
@@ -787,7 +785,7 @@ private <T extends SparkListenerEvent> void notifyOl(Consumer<T> ol, T event) {
787
785
log .debug ("Not emitting event when running on databricks or on streaming jobs" );
788
786
return ;
789
787
}
790
- if (AbstractDatadogSparkListener . openLineageSparkListener .get () != null ) {
788
+ if (openLineageSparkListener .get () != null ) {
791
789
log .debug (
792
790
"Passing event `{}` to OpenLineageSparkListener" , event .getClass ().getCanonicalName ());
793
791
ol .accept (event );
0 commit comments