diff --git a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/Agent.java b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/Agent.java index e55d6e99ef9..02a1894a5d4 100644 --- a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/Agent.java +++ b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/Agent.java @@ -242,6 +242,11 @@ public static void start( setSystemPropertyDefault( propertyNameToSystemPropertyName("integration.kafka.enabled"), "true"); + if (Config.get().isDataJobsOpenLineageEnabled()) { + setSystemPropertyDefault( + propertyNameToSystemPropertyName("integration.openlineage-spark.enabled"), "true"); + } + String javaCommand = System.getProperty("sun.java.command"); String dataJobsCommandPattern = Config.get().getDataJobsCommandPattern(); if (!isDataJobsSupported(javaCommand, dataJobsCommandPattern)) { diff --git a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java index 15e6fa5a80f..b305496f70c 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java +++ b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java @@ -5,8 +5,12 @@ import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.api.Config; import net.bytebuddy.asm.Advice; import org.apache.spark.SparkContext; +import org.apache.spark.util.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @AutoService(InstrumenterModule.class) public class Spark212Instrumentation extends AbstractSparkInstrumentation { @@ -17,6 +21,7 @@ public String[] helperClassNames() { packageName + ".DatabricksParentContext", packageName + ".OpenlineageParentContext", packageName + ".DatadogSpark212Listener", + packageName + ".PredeterminedTraceIdContext", packageName + ".RemoveEldestHashMap", packageName + ".SparkAggregatedTaskMetrics", packageName + ".SparkConfAllowList", @@ -41,6 +46,34 @@ public void methodAdvice(MethodTransformer transformer) { public static class InjectListener { @Advice.OnMethodEnter(suppress = Throwable.class) public static void enter(@Advice.This SparkContext sparkContext) { + // checking whether OpenLineage integration is enabled, available and that it supports tags + Logger log = LoggerFactory.getLogger(Config.class); + log.info( + "IL: ADSL classloader: ({}) {}", + System.identityHashCode(AbstractDatadogSparkListener.class.getClassLoader()), + AbstractDatadogSparkListener.class.getClassLoader()); + + if (Config.get().isDataJobsOpenLineageEnabled() + && Utils.classIsLoadable("io.openlineage.spark.agent.OpenLineageSparkListener") + && Utils.classIsLoadable( + "io.openlineage.spark.agent.facets.builder.TagsRunFacetBuilder")) { + if (!sparkContext.conf().contains("spark.extraListeners")) { + sparkContext + .conf() + .set("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener"); + } else { + String extraListeners = sparkContext.conf().get("spark.extraListeners"); + if (!extraListeners.contains("io.openlineage.spark.agent.OpenLineageSparkListener")) { + sparkContext + .conf() + .set( + "spark.extraListeners", + extraListeners + ",io.openlineage.spark.agent.OpenLineageSparkListener"); + } + } + } + + // We want to add the Datadog listener as the first listener AbstractDatadogSparkListener.listener = new DatadogSpark212Listener( sparkContext.getConf(), sparkContext.applicationId(), sparkContext.version()); diff --git a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java index 0d80eb7553c..25dceeac01c 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java +++ b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java @@ -5,8 +5,10 @@ import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.api.Config; import net.bytebuddy.asm.Advice; import org.apache.spark.SparkContext; +import org.apache.spark.util.Utils; @AutoService(InstrumenterModule.class) public class Spark213Instrumentation extends AbstractSparkInstrumentation { @@ -17,6 +19,7 @@ public String[] helperClassNames() { packageName + ".DatabricksParentContext", packageName + ".OpenlineageParentContext", packageName + ".DatadogSpark213Listener", + packageName + ".PredeterminedTraceIdContext", packageName + ".RemoveEldestHashMap", packageName + ".SparkAggregatedTaskMetrics", packageName + ".SparkConfAllowList", @@ -41,6 +44,28 @@ public void methodAdvice(MethodTransformer transformer) { public static class InjectListener { @Advice.OnMethodEnter(suppress = Throwable.class) public static void enter(@Advice.This SparkContext sparkContext) { + // checking whether OpenLineage integration is enabled, available and that it supports tags + if (Config.get().isDataJobsOpenLineageEnabled() + && Utils.classIsLoadable("io.openlineage.spark.agent.OpenLineageSparkListener") + && Utils.classIsLoadable( + "io.openlineage.spark.agent.facets.builder.TagsRunFacetBuilder")) { + if (!sparkContext.conf().contains("spark.extraListeners")) { + sparkContext + .conf() + .set("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener"); + } else { + String extraListeners = sparkContext.conf().get("spark.extraListeners"); + if (!extraListeners.contains("io.openlineage.spark.agent.OpenLineageSparkListener")) { + sparkContext + .conf() + .set( + "spark.extraListeners", + extraListeners + ",io.openlineage.spark.agent.OpenLineageSparkListener"); + } + } + } + + // We want to add the Datadog listener as the first listener AbstractDatadogSparkListener.listener = new DatadogSpark213Listener( sparkContext.getConf(), sparkContext.applicationId(), sparkContext.version()); diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java index bbd03d5b897..7ad789b3c0a 100644 --- a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java @@ -36,6 +36,7 @@ import java.util.Optional; import java.util.Properties; import java.util.UUID; +import java.util.function.Consumer; import org.apache.spark.ExceptionFailure; import org.apache.spark.SparkConf; import org.apache.spark.TaskFailedReason; @@ -68,19 +69,28 @@ public abstract class AbstractDatadogSparkListener extends SparkListener { private static final Logger log = LoggerFactory.getLogger(AbstractDatadogSparkListener.class); private static final ObjectMapper objectMapper = new ObjectMapper(); public static volatile AbstractDatadogSparkListener listener = null; + public static volatile boolean finishTraceOnApplicationEnd = true; public static volatile boolean isPysparkShell = false; private final int MAX_COLLECTION_SIZE = 5000; private final int MAX_ACCUMULATOR_SIZE = 50000; private final String RUNTIME_TAGS_PREFIX = "spark.datadog.tags."; + private static final String AGENT_OL_ENDPOINT = "openlineage/api/v1/lineage"; private final SparkConf sparkConf; private final String sparkVersion; private final String appId; + public static volatile SparkListenerInterface openLineageSparkListener = null; + public static volatile SparkConf openLineageSparkConf = null; + private final AgentTracer.TracerAPI tracer; + // This is created by constructor, and used if we're not in other known + // parent context like Databricks, OpenLineage + private final PredeterminedTraceIdContext predeterminedTraceIdContext; + private AgentSpan applicationSpan; private SparkListenerApplicationStart applicationStart; private final HashMap<String, AgentSpan> streamingBatchSpans = new HashMap<>(); @@ -109,6 +119,7 @@ public abstract class AbstractDatadogSparkListener extends SparkListener { private final Map<Long, SparkSQLUtils.AccumulatorWithStage> accumulators = new RemoveEldestHashMap<>(MAX_ACCUMULATOR_SIZE); + private volatile boolean isStreamingJob = false; private final boolean isRunningOnDatabricks; private final String databricksClusterName; private final String databricksServiceName; @@ -135,6 +146,8 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp databricksClusterName = sparkConf.get("spark.databricks.clusterUsageTags.clusterName", null); databricksServiceName = getDatabricksServiceName(sparkConf, databricksClusterName); sparkServiceName = getSparkServiceName(sparkConf, isRunningOnDatabricks); + predeterminedTraceIdContext = + new PredeterminedTraceIdContext(Config.get().getIdGenerationStrategy().generateTraceId()); // If JVM exiting with System.exit(code), it bypass the code closing the application span // @@ -151,8 +164,47 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp finishApplication(System.currentTimeMillis(), null, 0, null); } })); + } + + public void setupOpenLineage(DDTraceId traceId) { + log.debug("Setting up OpenLineage: {} {}", openLineageSparkListener, openLineageSparkConf); - log.info("Created datadog spark listener: {}", this.getClass().getSimpleName()); + log.info( + "Classloader for SL: ({}) {}", + System.identityHashCode(openLineageSparkListener.getClass().getClassLoader()), + openLineageSparkListener.getClass().getClassLoader()); + log.info( + "Classloader for ADSL: ({}) {}", + System.identityHashCode(AbstractDatadogSparkListener.class.getClassLoader()), + AbstractDatadogSparkListener.class.getClassLoader()); + log.info( + "Current thread class loader: ({}) {}", + System.identityHashCode(Thread.currentThread().getContextClassLoader()), + Thread.currentThread().getContextClassLoader()); + + if (openLineageSparkListener != null) { + openLineageSparkConf.set("spark.openlineage.transport.type", "composite"); + openLineageSparkConf.set("spark.openlineage.transport.continueOnFailure", "true"); + openLineageSparkConf.set("spark.openlineage.transport.transports.agent.type", "http"); + openLineageSparkConf.set( + "spark.openlineage.transport.transports.agent.url", getAgentHttpUrl()); + openLineageSparkConf.set( + "spark.openlineage.transport.transports.agent.endpoint", AGENT_OL_ENDPOINT); + openLineageSparkConf.set("spark.openlineage.transport.transports.agent.compression", "gzip"); + openLineageSparkConf.set( + "spark.openlineage.run.tags", + "_dd.trace_id:" + + traceId.toString() + + ";_dd.ol_intake.emit_spans:false;dd.ol_service:" + + sparkServiceName); + return; + } + log.info( + "There is no OpenLineage Spark Listener in the context. Skipping setting tags. {}", + openLineageSparkListener); + log.info( + "There is no OpenLineage SparkConf in the context. Skipping setting tags. {}", + openLineageSparkConf); } /** Resource name of the spark job. Provide an implementation based on a specific scala version */ @@ -176,6 +228,15 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp @Override public synchronized void onApplicationStart(SparkListenerApplicationStart applicationStart) { this.applicationStart = applicationStart; + + if (openLineageSparkListener != null) { + DDTraceId traceId = + OpenlineageParentContext.from(sparkConf) + .map(context -> context.getTraceId()) + .orElse(predeterminedTraceIdContext.getTraceId()); + setupOpenLineage(traceId); + } + notifyOl(x -> openLineageSparkListener.onApplicationStart(x), applicationStart); } private void initApplicationSpanIfNotInitialized() { @@ -183,6 +244,8 @@ private void initApplicationSpanIfNotInitialized() { return; } + log.debug("Starting tracer application span."); + AgentTracer.SpanBuilder builder = buildSparkSpan("spark.application", null); if (applicationStart != null) { @@ -197,35 +260,36 @@ private void initApplicationSpanIfNotInitialized() { } captureApplicationParameters(builder); - captureOpenlineageContextIfPresent(builder); + + Optional<OpenlineageParentContext> openlineageParentContext = + OpenlineageParentContext.from(sparkConf); + // We know we're not in Databricks context + if (openlineageParentContext.isPresent()) { + captureOpenlineageContextIfPresent(builder, openlineageParentContext.get()); + } else { + builder.asChildOf(predeterminedTraceIdContext); + } applicationSpan = builder.start(); setDataJobsSamplingPriority(applicationSpan); applicationSpan.setMeasured(true); } - private void captureOpenlineageContextIfPresent(AgentTracer.SpanBuilder builder) { - Optional<OpenlineageParentContext> openlineageParentContext = - OpenlineageParentContext.from(sparkConf); + private void captureOpenlineageContextIfPresent( + AgentTracer.SpanBuilder builder, OpenlineageParentContext context) { + builder.asChildOf(context); - if (openlineageParentContext.isPresent()) { - OpenlineageParentContext context = openlineageParentContext.get(); - builder.asChildOf(context); + builder.withSpanId(context.getChildRootSpanId()); - builder.withSpanId(context.getChildRootSpanId()); + log.debug( + "Captured Openlineage context: {}, with child trace_id: {}, child root span id: {}", + context, + context.getTraceId(), + context.getChildRootSpanId()); - log.debug( - "Captured Openlineage context: {}, with child trace_id: {}, child root span id: {}", - context, - context.getTraceId(), - context.getChildRootSpanId()); - - builder.withTag("openlineage_parent_job_namespace", context.getParentJobNamespace()); - builder.withTag("openlineage_parent_job_name", context.getParentJobName()); - builder.withTag("openlineage_parent_run_id", context.getParentRunId()); - } else { - log.debug("Openlineage context not found"); - } + builder.withTag("openlineage_parent_job_namespace", context.getParentJobNamespace()); + builder.withTag("openlineage_parent_job_name", context.getParentJobName()); + builder.withTag("openlineage_parent_run_id", context.getParentRunId()); } @Override @@ -233,6 +297,7 @@ public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { log.info( "Received spark application end event, finish trace on this event: {}", finishTraceOnApplicationEnd); + notifyOl(x -> openLineageSparkListener.onApplicationEnd(x), applicationEnd); if (finishTraceOnApplicationEnd) { finishApplication(applicationEnd.time(), null, 0, null); @@ -405,6 +470,7 @@ public synchronized void onJobStart(SparkListenerJobStart jobStart) { if (sqlSpan != null) { jobSpanBuilder.asChildOf(sqlSpan.context()); } else if (batchKey != null) { + isStreamingJob = true; AgentSpan batchSpan = getOrCreateStreamingBatchSpan(batchKey, jobStart.time(), jobStart.properties()); jobSpanBuilder.asChildOf(batchSpan.context()); @@ -426,6 +492,7 @@ public synchronized void onJobStart(SparkListenerJobStart jobStart) { stageToJob.put(stageId, jobStart.jobId()); } jobSpans.put(jobStart.jobId(), jobSpan); + notifyOl(x -> openLineageSparkListener.onJobStart(x), jobStart); } @Override @@ -456,6 +523,7 @@ public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) { if (metrics != null) { metrics.setSpanMetrics(jobSpan); } + notifyOl(x -> openLineageSparkListener.onJobEnd(x), jobEnd); jobSpan.finish(jobEnd.time() * 1000); } @@ -624,6 +692,8 @@ public void onTaskEnd(SparkListenerTaskEnd taskEnd) { Properties props = stageProperties.get(stageSpanKey); sendTaskSpan(stageSpan, taskEnd, props); + + notifyOl(x -> openLineageSparkListener.onTaskEnd(x), taskEnd); } private void sendTaskSpan( @@ -705,6 +775,25 @@ public void onOtherEvent(SparkListenerEvent event) { updateAdaptiveSQLPlan(event); } + private <T extends SparkListenerEvent> void notifyOl(Consumer<T> ol, T event) { + if (!Config.get().isDataJobsOpenLineageEnabled()) { + log.debug("Ignoring event {} - OpenLineage not enabled", event); + return; + } + + if (isRunningOnDatabricks || isStreamingJob) { + log.debug("Not emitting event when running on databricks or on streaming jobs"); + return; + } + if (openLineageSparkListener != null) { + log.debug( + "Passing event `{}` to OpenLineageSparkListener", event.getClass().getCanonicalName()); + ol.accept(event); + } else { + log.debug("OpenLineageSparkListener is null"); + } + } + private static final Class<?> adaptiveExecutionUpdateClass; private static final MethodHandle adaptiveExecutionIdMethod; private static final MethodHandle adaptiveSparkPlanMethod; @@ -753,6 +842,7 @@ private synchronized void updateAdaptiveSQLPlan(SparkListenerEvent event) { private synchronized void onSQLExecutionStart(SparkListenerSQLExecutionStart sqlStart) { sqlPlans.put(sqlStart.executionId(), sqlStart.sparkPlanInfo()); sqlQueries.put(sqlStart.executionId(), sqlStart); + notifyOl(x -> openLineageSparkListener.onOtherEvent(x), sqlStart); } private synchronized void onSQLExecutionEnd(SparkListenerSQLExecutionEnd sqlEnd) { @@ -765,6 +855,7 @@ private synchronized void onSQLExecutionEnd(SparkListenerSQLExecutionEnd sqlEnd) if (metrics != null) { metrics.setSpanMetrics(span); } + notifyOl(x -> openLineageSparkListener.onOtherEvent(x), sqlEnd); span.finish(sqlEnd.time() * 1000); } @@ -1260,6 +1351,15 @@ private static String getDatabricksRunName(SparkConf conf) { return null; } + private static String getAgentHttpUrl() { + StringBuilder sb = + new StringBuilder("http://") + .append(Config.get().getAgentHost()) + .append(":") + .append(Config.get().getAgentPort()); + return sb.toString(); + } + @SuppressForbidden // called at most once per spark application private static String removeUuidFromEndOfString(String input) { return input.replaceAll( diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java index 8bfe1ae8000..42f68ed0828 100644 --- a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java @@ -8,8 +8,12 @@ import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.api.Config; import net.bytebuddy.asm.Advice; import org.apache.spark.deploy.SparkSubmitArguments; +import org.apache.spark.scheduler.SparkListenerInterface; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class AbstractSparkInstrumentation extends InstrumenterModule.Tracing implements Instrumenter.ForKnownTypes, Instrumenter.HasMethodAdvice { @@ -28,7 +32,10 @@ public String[] knownMatchingTypes() { return new String[] { "org.apache.spark.SparkContext", "org.apache.spark.deploy.SparkSubmit", - "org.apache.spark.deploy.yarn.ApplicationMaster" + "org.apache.spark.deploy.yarn.ApplicationMaster", + "org.apache.spark.util.Utils", + "org.apache.spark.util.SparkClassUtils", + "org.apache.spark.scheduler.LiveListenerBus" }; } @@ -55,6 +62,14 @@ public void methodAdvice(MethodTransformer transformer) { .and(named("finish")) .and(isDeclaredBy(named("org.apache.spark.deploy.yarn.ApplicationMaster"))), AbstractSparkInstrumentation.class.getName() + "$YarnFinishAdvice"); + + // LiveListenerBus class is used to manage spark listeners + transformer.applyAdvice( + isMethod() + .and(named("addToSharedQueue")) + .and(takesArgument(0, named("org.apache.spark.scheduler.SparkListenerInterface"))) + .and(isDeclaredBy(named("org.apache.spark.scheduler.LiveListenerBus"))), + AbstractSparkInstrumentation.class.getName() + "$LiveListenerBusAdvice"); } public static class PrepareSubmitEnvAdvice { @@ -100,4 +115,25 @@ public static void enter(@Advice.Argument(1) int exitCode, @Advice.Argument(2) S } } } + + public static class LiveListenerBusAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class) + // If OL is disabled in tracer config but user set it up manually don't interfere + public static boolean enter(@Advice.Argument(0) SparkListenerInterface listener) { + Logger log = LoggerFactory.getLogger(Config.class); + log.info( + "LLBA: ADSL classloader: ({}) {}", + System.identityHashCode(AbstractDatadogSparkListener.class.getClassLoader()), + AbstractDatadogSparkListener.class.getClassLoader()); + if (Config.get().isDataJobsOpenLineageEnabled() + && listener != null + && "io.openlineage.spark.agent.OpenLineageSparkListener" + .equals(listener.getClass().getCanonicalName())) { + LoggerFactory.getLogger("LiveListenerBusAdvice") + .debug("Detected OpenLineage listener, skipping adding to ListenerBus"); + return true; + } + return false; + } + } } diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/ClassLoaderUtils.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/ClassLoaderUtils.java new file mode 100644 index 00000000000..4be56e86e96 --- /dev/null +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/ClassLoaderUtils.java @@ -0,0 +1,61 @@ +package datadog.trace.instrumentation.spark; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ClassLoaderUtils { + public static final Logger log = LoggerFactory.getLogger(ClassLoaderUtils.class); + + /** Print the classloader hierarchy for the current thread */ + public static void printClassLoaderHierarchy() { + ClassLoader loader = Thread.currentThread().getContextClassLoader(); + printClassLoaderHierarchy(loader); + } + + /** Print the classloader hierarchy for a specific class */ + public static void printClassLoaderHierarchy(Class<?> clazz) { + log.info("ClassLoader hierarchy for " + clazz.getName() + ":"); + printClassLoaderHierarchy(clazz.getClassLoader()); + } + + /** Print the classloader hierarchy starting from a specific classloader */ + public static void printClassLoaderHierarchy(ClassLoader classLoader) { + if (classLoader == null) { + log.info("Bootstrap ClassLoader (null)"); + return; + } + + int level = 0; + ClassLoader current = classLoader; + + while (current != null) { + log.info( + getIndent(level) + + "→ " + + current.getClass().getName() + + "@" + + Integer.toHexString(System.identityHashCode(current))); + + // Print URLs for URLClassLoaders + // if (current instanceof URLClassLoader) { + // URLClassLoader urlLoader = (URLClassLoader) current; + // for (URL url : urlLoader.getURLs()) { + // log.info(getIndent(level + 1) + "- " + url); + // } + // } + + current = current.getParent(); + level++; + } + + log.info(getIndent(level) + "→ Bootstrap ClassLoader (null)"); + } + + private static String getIndent(int level) { + StringBuilder indent = new StringBuilder(); + for (int i = 0; i < level; i++) { + indent.append(" "); + } + return indent.toString(); + } +} diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenLineageInstrumentation.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenLineageInstrumentation.java new file mode 100644 index 00000000000..bc4e5acd100 --- /dev/null +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenLineageInstrumentation.java @@ -0,0 +1,113 @@ +package datadog.trace.instrumentation.spark; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.isDeclaredBy; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.api.Config; +import java.lang.reflect.Field; +import net.bytebuddy.asm.Advice; +import org.apache.spark.SparkConf; +import org.apache.spark.scheduler.SparkListenerInterface; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@AutoService(InstrumenterModule.class) +public class OpenLineageInstrumentation extends InstrumenterModule.Tracing + implements Instrumenter.ForKnownTypes, Instrumenter.HasMethodAdvice { + + public OpenLineageInstrumentation() { + super("openlineage-spark"); + } + + @Override + public String[] helperClassNames() { + return new String[] { + packageName + ".AbstractDatadogSparkListener", + packageName + ".DatabricksParentContext", + packageName + ".OpenlineageParentContext", + packageName + ".PredeterminedTraceIdContext", + packageName + ".RemoveEldestHashMap", + packageName + ".SparkAggregatedTaskMetrics", + packageName + ".SparkConfAllowList", + packageName + ".SparkSQLUtils", + packageName + ".SparkSQLUtils$SparkPlanInfoForStage", + packageName + ".SparkSQLUtils$AccumulatorWithStage", + packageName + ".ClassLoaderUtils", + }; + } + + @Override + public boolean defaultEnabled() { + return false; + } + + @Override + public String[] knownMatchingTypes() { + return new String[] {"io.openlineage.spark.agent.OpenLineageSparkListener"}; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + // LiveListenerBus class is used when running in a YARN cluster + transformer.applyAdvice( + isConstructor() + .and(isDeclaredBy(named("io.openlineage.spark.agent.OpenLineageSparkListener"))) + .and(takesArgument(0, named("org.apache.spark.SparkConf"))), + OpenLineageInstrumentation.class.getName() + "$OpenLineageSparkListenerAdvice"); + } + + public static class OpenLineageSparkListenerAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void exit(@Advice.This Object self, @Advice.FieldValue("conf") SparkConf conf) { + // try { + Logger log = LoggerFactory.getLogger(Config.class); + ClassLoaderUtils.printClassLoaderHierarchy( + AbstractDatadogSparkListener.class.getClassLoader()); + log.info( + "OLSLA: ADSL classloader: ({}) {}", + System.identityHashCode(AbstractDatadogSparkListener.class.getClassLoader()), + AbstractDatadogSparkListener.class.getClassLoader()); + log.info( + "OLSLA: thread class loader ({}) {}", + System.identityHashCode(Thread.currentThread().getContextClassLoader()), + Thread.currentThread().getContextClassLoader()); + log.info( + "OLSLA: parent thread class loader ({}) {}", + System.identityHashCode(Thread.currentThread().getContextClassLoader().getParent()), + Thread.currentThread().getContextClassLoader().getParent()); + + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + if (cl.getClass().getName().contains("MutableURLClassLoader") + || cl.getClass().getName().contains("ChildFirstURLClassLoader")) { + log.debug( + "Detected MutableURLClassLoader. Setting OpenLineage on AbstractDatadogSparkListener.class of parent classloader"); + try { + log.debug( + "Parent classloader: ({}) {}", + System.identityHashCode(cl.getParent()), + cl.getParent()); + Class clazz = cl.getParent().loadClass(AbstractDatadogSparkListener.class.getName()); + Field openLineageSparkListener = clazz.getDeclaredField("openLineageSparkListener"); + openLineageSparkListener.setAccessible(true); + openLineageSparkListener.set(null, self); + + Field openLineageSparkConf = clazz.getDeclaredField("openLineageSparkConf"); + openLineageSparkConf.setAccessible(true); + openLineageSparkConf.set(null, conf); + } catch (Throwable e) { + log.info("Failed to setup OpenLineage", e); + } + } else { + log.debug( + "Detected other classloader than MutableURLClassLoader. Setting OpenLineage on AbstractDatadogSparkListener.class"); + AbstractDatadogSparkListener.openLineageSparkListener = (SparkListenerInterface) self; + AbstractDatadogSparkListener.openLineageSparkConf = conf; + } + } + } +} diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenlineageParentContext.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenlineageParentContext.java index 6a0b28a70c0..523d3220dde 100644 --- a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenlineageParentContext.java +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenlineageParentContext.java @@ -145,7 +145,7 @@ public PathwayContext getPathwayContext() { @Override public boolean isRemote() { - return false; + return true; } public String getParentJobNamespace() { diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/PredeterminedTraceIdContext.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/PredeterminedTraceIdContext.java new file mode 100644 index 00000000000..51164f8bbc8 --- /dev/null +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/PredeterminedTraceIdContext.java @@ -0,0 +1,52 @@ +package datadog.trace.instrumentation.spark; + +import datadog.trace.api.DDTraceId; +import datadog.trace.api.datastreams.PathwayContext; +import datadog.trace.api.sampling.PrioritySampling; +import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext; +import datadog.trace.bootstrap.instrumentation.api.AgentTraceCollector; +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; +import java.util.Map; + +public class PredeterminedTraceIdContext implements AgentSpanContext { + private final DDTraceId traceId; + + public PredeterminedTraceIdContext(DDTraceId traceId) { + this.traceId = traceId; + } + + @Override + public DDTraceId getTraceId() { + return this.traceId; + } + + @Override + public long getSpanId() { + return 0; + } + + @Override + public AgentTraceCollector getTraceCollector() { + return AgentTracer.NoopAgentTraceCollector.INSTANCE; + } + + @Override + public int getSamplingPriority() { + return PrioritySampling.USER_KEEP; + } + + @Override + public Iterable<Map.Entry<String, String>> baggageItems() { + return null; + } + + @Override + public PathwayContext getPathwayContext() { + return null; + } + + @Override + public boolean isRemote() { + return false; + } +} diff --git a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark24SqlTest.groovy b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark24SqlTest.groovy index f1634a54ce6..e839499b7fd 100644 --- a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark24SqlTest.groovy +++ b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark24SqlTest.groovy @@ -14,6 +14,7 @@ abstract class AbstractSpark24SqlTest extends AgentTestRunner { void configurePreAgent() { super.configurePreAgent() injectSysConfig("dd.integration.spark.enabled", "true") + injectSysConfig("dd.integration.openlineage-spark.enabled", "true") } static Dataset<Row> generateSampleDataframe(SparkSession spark) { diff --git a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark32SqlTest.groovy b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark32SqlTest.groovy index 0f42dd58f50..b8331ffd33c 100644 --- a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark32SqlTest.groovy +++ b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark32SqlTest.groovy @@ -10,6 +10,7 @@ abstract class AbstractSpark32SqlTest extends AgentTestRunner { void configurePreAgent() { super.configurePreAgent() injectSysConfig("dd.integration.spark.enabled", "true") + injectSysConfig("dd.integration.openlineage-spark.enabled", "true") } def "compute a GROUP BY sql query plan"() { diff --git a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkStructuredStreamingTest.groovy b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkStructuredStreamingTest.groovy index 38ea89e0ca0..cace414208d 100644 --- a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkStructuredStreamingTest.groovy +++ b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkStructuredStreamingTest.groovy @@ -25,6 +25,7 @@ class AbstractSparkStructuredStreamingTest extends AgentTestRunner { void configurePreAgent() { super.configurePreAgent() injectSysConfig("dd.integration.spark.enabled", "true") + injectSysConfig("dd.integration.openlineage-spark.enabled", "true") } private SparkSession createSparkSession(String appName) { diff --git a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy index 36ae6ab5fe0..368ef17c276 100644 --- a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy +++ b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy @@ -25,11 +25,16 @@ import spock.lang.IgnoreIf Platform.isJ9() }) abstract class AbstractSparkTest extends AgentTestRunner { + @Override + protected boolean isDataJobsEnabled() { + return true + } @Override void configurePreAgent() { super.configurePreAgent() injectSysConfig("dd.integration.spark.enabled", "true") + injectSysConfig("dd.integration.openlineage-spark.enabled", "true") } def "generate application span with child job and stages"() { @@ -53,6 +58,7 @@ abstract class AbstractSparkTest extends AgentTestRunner { resourceName "spark.application" spanType "spark" errored false + assert span.context().getTraceId() != DDTraceId.ZERO assert span.context().getSamplingPriority() == PrioritySampling.USER_KEEP assert span.context().getPropagationTags().createTagMap()["_dd.p.dm"] == (-SamplingMechanism.DATA_JOBS).toString() parent() diff --git a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/AgentTestRunner.groovy b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/AgentTestRunner.groovy index a484826e84a..029024f2a35 100644 --- a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/AgentTestRunner.groovy +++ b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/AgentTestRunner.groovy @@ -272,6 +272,10 @@ abstract class AgentTestRunner extends DDSpecification implements AgentBuilder.L return false } + protected boolean isDataJobsEnabled() { + return false + } + protected long dataStreamsBucketDuration() { TimeUnit.MILLISECONDS.toNanos(50) } @@ -464,6 +468,7 @@ abstract class AgentTestRunner extends DDSpecification implements AgentBuilder.L protected void configurePreAgent() { injectSysConfig(TracerConfig.SCOPE_ITERATION_KEEP_ALIVE, "1") // don't let iteration spans linger injectSysConfig(GeneralConfig.DATA_STREAMS_ENABLED, String.valueOf(isDataStreamsEnabled())) + injectSysConfig(GeneralConfig.DATA_JOBS_ENABLED, String.valueOf(isDataJobsEnabled())) } void setup() { diff --git a/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java b/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java index 5e620d344e1..4052b7502ab 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java @@ -221,6 +221,7 @@ public final class ConfigDefaults { static final int DEFAULT_CWS_TLS_REFRESH = 5000; static final boolean DEFAULT_DATA_JOBS_ENABLED = false; + static final boolean DEFAULT_DATA_JOBS_OPENLINEAGE_ENABLED = false; static final boolean DEFAULT_DATA_STREAMS_ENABLED = false; static final int DEFAULT_DATA_STREAMS_BUCKET_DURATION = 10; // seconds diff --git a/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java b/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java index a882a097e59..e49baba90e0 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java @@ -69,6 +69,7 @@ public final class GeneralConfig { public static final String DATA_JOBS_ENABLED = "data.jobs.enabled"; public static final String DATA_JOBS_COMMAND_PATTERN = "data.jobs.command.pattern"; + public static final String DATA_JOBS_OPENLINEAGE_ENABLED = "data.jobs.openlineage.enabled"; public static final String DATA_STREAMS_ENABLED = "data.streams.enabled"; public static final String DATA_STREAMS_BUCKET_DURATION_SECONDS = diff --git a/internal-api/src/main/java/datadog/trace/api/Config.java b/internal-api/src/main/java/datadog/trace/api/Config.java index 1f311b576d2..1ea0bc028b2 100644 --- a/internal-api/src/main/java/datadog/trace/api/Config.java +++ b/internal-api/src/main/java/datadog/trace/api/Config.java @@ -498,6 +498,7 @@ public static String getHostName() { private final boolean dataJobsEnabled; private final String dataJobsCommandPattern; + private final boolean dataJobsOpenLineageEnabled; private final boolean dataStreamsEnabled; private final float dataStreamsBucketDurationSeconds; @@ -1827,6 +1828,9 @@ PROFILING_DATADOG_PROFILER_ENABLED, isDatadogProfilerSafeInCurrentEnvironment()) cwsTlsRefresh = configProvider.getInteger(CWS_TLS_REFRESH, DEFAULT_CWS_TLS_REFRESH); dataJobsEnabled = configProvider.getBoolean(DATA_JOBS_ENABLED, DEFAULT_DATA_JOBS_ENABLED); + dataJobsOpenLineageEnabled = + configProvider.getBoolean( + DATA_JOBS_OPENLINEAGE_ENABLED, DEFAULT_DATA_JOBS_OPENLINEAGE_ENABLED); dataJobsCommandPattern = configProvider.getString(DATA_JOBS_COMMAND_PATTERN); dataStreamsEnabled = @@ -3587,6 +3591,10 @@ public boolean isDataJobsEnabled() { return dataJobsEnabled; } + public boolean isDataJobsOpenLineageEnabled() { + return dataJobsOpenLineageEnabled; + } + public String getDataJobsCommandPattern() { return dataJobsCommandPattern; } diff --git a/internal-api/src/main/java/datadog/trace/api/sampling/SamplingMechanism.java b/internal-api/src/main/java/datadog/trace/api/sampling/SamplingMechanism.java index b21855bdb3e..ca0e3038bb3 100644 --- a/internal-api/src/main/java/datadog/trace/api/sampling/SamplingMechanism.java +++ b/internal-api/src/main/java/datadog/trace/api/sampling/SamplingMechanism.java @@ -69,7 +69,8 @@ public static boolean validateWithSamplingPriority(int mechanism, int priority) * @return */ public static boolean canAvoidSamplingPriorityLock(int priority, int mechanism) { - return !Config.get().isApmTracingEnabled() && mechanism == SamplingMechanism.APPSEC; + return (!Config.get().isApmTracingEnabled() && mechanism == SamplingMechanism.APPSEC) + || (Config.get().isDataJobsEnabled() && mechanism == DATA_JOBS); } private SamplingMechanism() {}