36
36
import java .util .Optional ;
37
37
import java .util .Properties ;
38
38
import java .util .UUID ;
39
- import java .util .concurrent .atomic .AtomicReference ;
40
39
import java .util .function .Consumer ;
41
40
import org .apache .spark .ExceptionFailure ;
42
41
import org .apache .spark .SparkConf ;
@@ -70,9 +69,6 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
70
69
private static final Logger log = LoggerFactory .getLogger (AbstractDatadogSparkListener .class );
71
70
private static final ObjectMapper objectMapper = new ObjectMapper ();
72
71
public static volatile AbstractDatadogSparkListener listener = null ;
73
- public static volatile AtomicReference <SparkListenerInterface > openLineageSparkListener =
74
- new AtomicReference <>(null );
75
- public static volatile SparkConf openLineageSparkConf = null ;
76
72
77
73
public static volatile boolean finishTraceOnApplicationEnd = true ;
78
74
public static volatile boolean isPysparkShell = false ;
@@ -86,6 +82,9 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
86
82
private final String sparkVersion ;
87
83
private final String appId ;
88
84
85
+ public static volatile SparkListenerInterface openLineageSparkListener = null ;
86
+ public static volatile SparkConf openLineageSparkConf = null ;
87
+
89
88
private final AgentTracer .TracerAPI tracer ;
90
89
91
90
// This is created by constructor, and used if we're not in other known
@@ -168,33 +167,31 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
168
167
}
169
168
170
169
public void setupOpenLineage (DDTraceId traceId ) {
171
- log .debug ("Setting up OpenLineage tags" );
170
+ log .debug ("Setting up OpenLineage: {} {}" , openLineageSparkListener , openLineageSparkConf );
171
+
172
+ log .info (
173
+ "Classloader for SL: ({}) {}" ,
174
+ System .identityHashCode (openLineageSparkListener .getClass ().getClassLoader ()),
175
+ openLineageSparkListener .getClass ().getClassLoader ());
172
176
log .info (
173
- "Classloader for current variable: ({}) {}" ,
174
- System .identityHashCode (
175
- AbstractDatadogSparkListener .openLineageSparkListener
176
- .get ()
177
- .getClass ()
178
- .getClassLoader ()),
179
- AbstractDatadogSparkListener .openLineageSparkListener .get ().getClass ().getClassLoader ());
177
+ "Classloader for ADSL: ({}) {}" ,
178
+ System .identityHashCode (AbstractDatadogSparkListener .class .getClassLoader ()),
179
+ AbstractDatadogSparkListener .class .getClassLoader ());
180
180
log .info (
181
181
"Current thread class loader: ({}) {}" ,
182
182
System .identityHashCode (Thread .currentThread ().getContextClassLoader ()),
183
183
Thread .currentThread ().getContextClassLoader ());
184
- if (AbstractDatadogSparkListener .openLineageSparkListener .get () != null ) {
185
- AbstractDatadogSparkListener .openLineageSparkConf .set (
186
- "spark.openlineage.transport.type" , "composite" );
187
- AbstractDatadogSparkListener .openLineageSparkConf .set (
188
- "spark.openlineage.transport.continueOnFailure" , "true" );
189
- AbstractDatadogSparkListener .openLineageSparkConf .set (
190
- "spark.openlineage.transport.transports.agent.type" , "http" );
191
- AbstractDatadogSparkListener .openLineageSparkConf .set (
184
+
185
+ if (openLineageSparkListener != null ) {
186
+ openLineageSparkConf .set ("spark.openlineage.transport.type" , "composite" );
187
+ openLineageSparkConf .set ("spark.openlineage.transport.continueOnFailure" , "true" );
188
+ openLineageSparkConf .set ("spark.openlineage.transport.transports.agent.type" , "http" );
189
+ openLineageSparkConf .set (
192
190
"spark.openlineage.transport.transports.agent.url" , getAgentHttpUrl ());
193
- AbstractDatadogSparkListener . openLineageSparkConf .set (
191
+ openLineageSparkConf .set (
194
192
"spark.openlineage.transport.transports.agent.endpoint" , AGENT_OL_ENDPOINT );
195
- AbstractDatadogSparkListener .openLineageSparkConf .set (
196
- "spark.openlineage.transport.transports.agent.compression" , "gzip" );
197
- AbstractDatadogSparkListener .openLineageSparkConf .set (
193
+ openLineageSparkConf .set ("spark.openlineage.transport.transports.agent.compression" , "gzip" );
194
+ openLineageSparkConf .set (
198
195
"spark.openlineage.run.tags" ,
199
196
"_dd.trace_id:"
200
197
+ traceId .toString ()
@@ -204,10 +201,10 @@ public void setupOpenLineage(DDTraceId traceId) {
204
201
}
205
202
log .info (
206
203
"There is no OpenLineage Spark Listener in the context. Skipping setting tags. {}" ,
207
- AbstractDatadogSparkListener . openLineageSparkListener . get () );
204
+ openLineageSparkListener );
208
205
log .info (
209
206
"There is no OpenLineage SparkConf in the context. Skipping setting tags. {}" ,
210
- AbstractDatadogSparkListener . openLineageSparkConf );
207
+ openLineageSparkConf );
211
208
}
212
209
213
210
/** Resource name of the spark job. Provide an implementation based on a specific scala version */
@@ -232,13 +229,14 @@ public void setupOpenLineage(DDTraceId traceId) {
232
229
public synchronized void onApplicationStart (SparkListenerApplicationStart applicationStart ) {
233
230
this .applicationStart = applicationStart ;
234
231
235
- if (Config . get (). isDataJobsOpenLineageEnabled () ) {
236
- setupOpenLineage (
232
+ if (openLineageSparkListener != null ) {
233
+ DDTraceId traceId =
237
234
OpenlineageParentContext .from (sparkConf )
238
235
.map (context -> context .getTraceId ())
239
- .orElse (predeterminedTraceIdContext .getTraceId ()));
236
+ .orElse (predeterminedTraceIdContext .getTraceId ());
237
+ setupOpenLineage (traceId );
240
238
}
241
- notifyOl (x -> openLineageSparkListener .get (). onApplicationStart (x ), applicationStart );
239
+ notifyOl (x -> openLineageSparkListener .onApplicationStart (x ), applicationStart );
242
240
}
243
241
244
242
private void initApplicationSpanIfNotInitialized () {
@@ -299,7 +297,7 @@ public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
299
297
log .info (
300
298
"Received spark application end event, finish trace on this event: {}" ,
301
299
finishTraceOnApplicationEnd );
302
- notifyOl (x -> openLineageSparkListener .get (). onApplicationEnd (x ), applicationEnd );
300
+ notifyOl (x -> openLineageSparkListener .onApplicationEnd (x ), applicationEnd );
303
301
304
302
if (finishTraceOnApplicationEnd ) {
305
303
finishApplication (applicationEnd .time (), null , 0 , null );
@@ -494,7 +492,7 @@ public synchronized void onJobStart(SparkListenerJobStart jobStart) {
494
492
stageToJob .put (stageId , jobStart .jobId ());
495
493
}
496
494
jobSpans .put (jobStart .jobId (), jobSpan );
497
- notifyOl (x -> openLineageSparkListener .get (). onJobStart (x ), jobStart );
495
+ notifyOl (x -> openLineageSparkListener .onJobStart (x ), jobStart );
498
496
}
499
497
500
498
@ Override
@@ -525,7 +523,7 @@ public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) {
525
523
if (metrics != null ) {
526
524
metrics .setSpanMetrics (jobSpan );
527
525
}
528
- notifyOl (x -> openLineageSparkListener .get (). onJobEnd (x ), jobEnd );
526
+ notifyOl (x -> openLineageSparkListener .onJobEnd (x ), jobEnd );
529
527
530
528
jobSpan .finish (jobEnd .time () * 1000 );
531
529
}
@@ -695,7 +693,7 @@ public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
695
693
Properties props = stageProperties .get (stageSpanKey );
696
694
sendTaskSpan (stageSpan , taskEnd , props );
697
695
698
- notifyOl (x -> openLineageSparkListener .get (). onTaskEnd (x ), taskEnd );
696
+ notifyOl (x -> openLineageSparkListener .onTaskEnd (x ), taskEnd );
699
697
}
700
698
701
699
private void sendTaskSpan (
@@ -787,7 +785,7 @@ private <T extends SparkListenerEvent> void notifyOl(Consumer<T> ol, T event) {
787
785
log .debug ("Not emitting event when running on databricks or on streaming jobs" );
788
786
return ;
789
787
}
790
- if (AbstractDatadogSparkListener . openLineageSparkListener . get () != null ) {
788
+ if (openLineageSparkListener != null ) {
791
789
log .debug (
792
790
"Passing event `{}` to OpenLineageSparkListener" , event .getClass ().getCanonicalName ());
793
791
ol .accept (event );
@@ -844,7 +842,7 @@ private synchronized void updateAdaptiveSQLPlan(SparkListenerEvent event) {
844
842
private synchronized void onSQLExecutionStart (SparkListenerSQLExecutionStart sqlStart ) {
845
843
sqlPlans .put (sqlStart .executionId (), sqlStart .sparkPlanInfo ());
846
844
sqlQueries .put (sqlStart .executionId (), sqlStart );
847
- notifyOl (x -> openLineageSparkListener .get (). onOtherEvent (x ), sqlStart );
845
+ notifyOl (x -> openLineageSparkListener .onOtherEvent (x ), sqlStart );
848
846
}
849
847
850
848
private synchronized void onSQLExecutionEnd (SparkListenerSQLExecutionEnd sqlEnd ) {
@@ -857,7 +855,7 @@ private synchronized void onSQLExecutionEnd(SparkListenerSQLExecutionEnd sqlEnd)
857
855
if (metrics != null ) {
858
856
metrics .setSpanMetrics (span );
859
857
}
860
- notifyOl (x -> openLineageSparkListener .get (). onOtherEvent (x ), sqlEnd );
858
+ notifyOl (x -> openLineageSparkListener .onOtherEvent (x ), sqlEnd );
861
859
862
860
span .finish (sqlEnd .time () * 1000 );
863
861
}
0 commit comments