Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

experiment with openlineage spark #8626

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,11 @@ public static void start(
setSystemPropertyDefault(
propertyNameToSystemPropertyName("integration.kafka.enabled"), "true");

if (Config.get().isDataJobsOpenLineageEnabled()) {
setSystemPropertyDefault(
propertyNameToSystemPropertyName("integration.openlineage-spark.enabled"), "true");
}

String javaCommand = System.getProperty("sun.java.command");
String dataJobsCommandPattern = Config.get().getDataJobsCommandPattern();
if (!isDataJobsSupported(javaCommand, dataJobsCommandPattern)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.api.Config;
import net.bytebuddy.asm.Advice;
import org.apache.spark.SparkContext;
import org.apache.spark.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AutoService(InstrumenterModule.class)
public class Spark212Instrumentation extends AbstractSparkInstrumentation {
Expand All @@ -17,6 +21,7 @@ public String[] helperClassNames() {
packageName + ".DatabricksParentContext",
packageName + ".OpenlineageParentContext",
packageName + ".DatadogSpark212Listener",
packageName + ".PredeterminedTraceIdContext",
packageName + ".RemoveEldestHashMap",
packageName + ".SparkAggregatedTaskMetrics",
packageName + ".SparkConfAllowList",
Expand All @@ -41,6 +46,34 @@ public void methodAdvice(MethodTransformer transformer) {
public static class InjectListener {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void enter(@Advice.This SparkContext sparkContext) {
// checking whether OpenLineage integration is enabled, available and that it supports tags
Logger log = LoggerFactory.getLogger(Config.class);
log.info(
"IL: ADSL classloader: ({}) {}",
System.identityHashCode(AbstractDatadogSparkListener.class.getClassLoader()),
AbstractDatadogSparkListener.class.getClassLoader());

if (Config.get().isDataJobsOpenLineageEnabled()
&& Utils.classIsLoadable("io.openlineage.spark.agent.OpenLineageSparkListener")
&& Utils.classIsLoadable(
"io.openlineage.spark.agent.facets.builder.TagsRunFacetBuilder")) {
if (!sparkContext.conf().contains("spark.extraListeners")) {
sparkContext
.conf()
.set("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener");
} else {
String extraListeners = sparkContext.conf().get("spark.extraListeners");
if (!extraListeners.contains("io.openlineage.spark.agent.OpenLineageSparkListener")) {
sparkContext
.conf()
.set(
"spark.extraListeners",
extraListeners + ",io.openlineage.spark.agent.OpenLineageSparkListener");
}
}
}

// We want to add the Datadog listener as the first listener
AbstractDatadogSparkListener.listener =
new DatadogSpark212Listener(
sparkContext.getConf(), sparkContext.applicationId(), sparkContext.version());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.api.Config;
import net.bytebuddy.asm.Advice;
import org.apache.spark.SparkContext;
import org.apache.spark.util.Utils;

@AutoService(InstrumenterModule.class)
public class Spark213Instrumentation extends AbstractSparkInstrumentation {
Expand All @@ -17,6 +19,7 @@ public String[] helperClassNames() {
packageName + ".DatabricksParentContext",
packageName + ".OpenlineageParentContext",
packageName + ".DatadogSpark213Listener",
packageName + ".PredeterminedTraceIdContext",
packageName + ".RemoveEldestHashMap",
packageName + ".SparkAggregatedTaskMetrics",
packageName + ".SparkConfAllowList",
Expand All @@ -41,6 +44,28 @@ public void methodAdvice(MethodTransformer transformer) {
public static class InjectListener {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void enter(@Advice.This SparkContext sparkContext) {
// checking whether OpenLineage integration is enabled, available and that it supports tags
if (Config.get().isDataJobsOpenLineageEnabled()
&& Utils.classIsLoadable("io.openlineage.spark.agent.OpenLineageSparkListener")
&& Utils.classIsLoadable(
"io.openlineage.spark.agent.facets.builder.TagsRunFacetBuilder")) {
if (!sparkContext.conf().contains("spark.extraListeners")) {
sparkContext
.conf()
.set("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener");
} else {
String extraListeners = sparkContext.conf().get("spark.extraListeners");
if (!extraListeners.contains("io.openlineage.spark.agent.OpenLineageSparkListener")) {
sparkContext
.conf()
.set(
"spark.extraListeners",
extraListeners + ",io.openlineage.spark.agent.OpenLineageSparkListener");
}
}
}

// We want to add the Datadog listener as the first listener
AbstractDatadogSparkListener.listener =
new DatadogSpark213Listener(
sparkContext.getConf(), sparkContext.applicationId(), sparkContext.version());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.function.Consumer;
import org.apache.spark.ExceptionFailure;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskFailedReason;
Expand Down Expand Up @@ -68,19 +69,28 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
private static final Logger log = LoggerFactory.getLogger(AbstractDatadogSparkListener.class);
private static final ObjectMapper objectMapper = new ObjectMapper();
public static volatile AbstractDatadogSparkListener listener = null;

public static volatile boolean finishTraceOnApplicationEnd = true;
public static volatile boolean isPysparkShell = false;

private final int MAX_COLLECTION_SIZE = 5000;
private final int MAX_ACCUMULATOR_SIZE = 50000;
private final String RUNTIME_TAGS_PREFIX = "spark.datadog.tags.";
private static final String AGENT_OL_ENDPOINT = "openlineage/api/v1/lineage";

private final SparkConf sparkConf;
private final String sparkVersion;
private final String appId;

public static volatile SparkListenerInterface openLineageSparkListener = null;
public static volatile SparkConf openLineageSparkConf = null;

private final AgentTracer.TracerAPI tracer;

// This is created by constructor, and used if we're not in other known
// parent context like Databricks, OpenLineage
private final PredeterminedTraceIdContext predeterminedTraceIdContext;

private AgentSpan applicationSpan;
private SparkListenerApplicationStart applicationStart;
private final HashMap<String, AgentSpan> streamingBatchSpans = new HashMap<>();
Expand Down Expand Up @@ -109,6 +119,7 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
private final Map<Long, SparkSQLUtils.AccumulatorWithStage> accumulators =
new RemoveEldestHashMap<>(MAX_ACCUMULATOR_SIZE);

private volatile boolean isStreamingJob = false;
private final boolean isRunningOnDatabricks;
private final String databricksClusterName;
private final String databricksServiceName;
Expand All @@ -135,6 +146,8 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
databricksClusterName = sparkConf.get("spark.databricks.clusterUsageTags.clusterName", null);
databricksServiceName = getDatabricksServiceName(sparkConf, databricksClusterName);
sparkServiceName = getSparkServiceName(sparkConf, isRunningOnDatabricks);
predeterminedTraceIdContext =
new PredeterminedTraceIdContext(Config.get().getIdGenerationStrategy().generateTraceId());

// If JVM exiting with System.exit(code), it bypass the code closing the application span
//
Expand All @@ -151,8 +164,47 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
finishApplication(System.currentTimeMillis(), null, 0, null);
}
}));
}

public void setupOpenLineage(DDTraceId traceId) {
log.debug("Setting up OpenLineage: {} {}", openLineageSparkListener, openLineageSparkConf);

log.info("Created datadog spark listener: {}", this.getClass().getSimpleName());
log.info(
"Classloader for SL: ({}) {}",
System.identityHashCode(openLineageSparkListener.getClass().getClassLoader()),
openLineageSparkListener.getClass().getClassLoader());
log.info(
"Classloader for ADSL: ({}) {}",
System.identityHashCode(AbstractDatadogSparkListener.class.getClassLoader()),
AbstractDatadogSparkListener.class.getClassLoader());
log.info(
"Current thread class loader: ({}) {}",
System.identityHashCode(Thread.currentThread().getContextClassLoader()),
Thread.currentThread().getContextClassLoader());

if (openLineageSparkListener != null) {
openLineageSparkConf.set("spark.openlineage.transport.type", "composite");
openLineageSparkConf.set("spark.openlineage.transport.continueOnFailure", "true");
openLineageSparkConf.set("spark.openlineage.transport.transports.agent.type", "http");
openLineageSparkConf.set(
"spark.openlineage.transport.transports.agent.url", getAgentHttpUrl());
openLineageSparkConf.set(
"spark.openlineage.transport.transports.agent.endpoint", AGENT_OL_ENDPOINT);
openLineageSparkConf.set("spark.openlineage.transport.transports.agent.compression", "gzip");
openLineageSparkConf.set(
"spark.openlineage.run.tags",
"_dd.trace_id:"
+ traceId.toString()
+ ";_dd.ol_intake.emit_spans:false;dd.ol_service:"
+ sparkServiceName);
return;
}
log.info(
"There is no OpenLineage Spark Listener in the context. Skipping setting tags. {}",
openLineageSparkListener);
log.info(
"There is no OpenLineage SparkConf in the context. Skipping setting tags. {}",
openLineageSparkConf);
}

/** Resource name of the spark job. Provide an implementation based on a specific scala version */
Expand All @@ -176,13 +228,24 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
@Override
public synchronized void onApplicationStart(SparkListenerApplicationStart applicationStart) {
this.applicationStart = applicationStart;

if (openLineageSparkListener != null) {
DDTraceId traceId =
OpenlineageParentContext.from(sparkConf)
.map(context -> context.getTraceId())
.orElse(predeterminedTraceIdContext.getTraceId());
setupOpenLineage(traceId);
}
notifyOl(x -> openLineageSparkListener.onApplicationStart(x), applicationStart);
}

private void initApplicationSpanIfNotInitialized() {
if (applicationSpan != null) {
return;
}

log.debug("Starting tracer application span.");

AgentTracer.SpanBuilder builder = buildSparkSpan("spark.application", null);

if (applicationStart != null) {
Expand All @@ -197,42 +260,44 @@ private void initApplicationSpanIfNotInitialized() {
}

captureApplicationParameters(builder);
captureOpenlineageContextIfPresent(builder);

Optional<OpenlineageParentContext> openlineageParentContext =
OpenlineageParentContext.from(sparkConf);
// We know we're not in Databricks context
if (openlineageParentContext.isPresent()) {
captureOpenlineageContextIfPresent(builder, openlineageParentContext.get());
} else {
builder.asChildOf(predeterminedTraceIdContext);
}

applicationSpan = builder.start();
setDataJobsSamplingPriority(applicationSpan);
applicationSpan.setMeasured(true);
}

private void captureOpenlineageContextIfPresent(AgentTracer.SpanBuilder builder) {
Optional<OpenlineageParentContext> openlineageParentContext =
OpenlineageParentContext.from(sparkConf);
private void captureOpenlineageContextIfPresent(
AgentTracer.SpanBuilder builder, OpenlineageParentContext context) {
builder.asChildOf(context);

if (openlineageParentContext.isPresent()) {
OpenlineageParentContext context = openlineageParentContext.get();
builder.asChildOf(context);
builder.withSpanId(context.getChildRootSpanId());

builder.withSpanId(context.getChildRootSpanId());
log.debug(
"Captured Openlineage context: {}, with child trace_id: {}, child root span id: {}",
context,
context.getTraceId(),
context.getChildRootSpanId());

log.debug(
"Captured Openlineage context: {}, with child trace_id: {}, child root span id: {}",
context,
context.getTraceId(),
context.getChildRootSpanId());

builder.withTag("openlineage_parent_job_namespace", context.getParentJobNamespace());
builder.withTag("openlineage_parent_job_name", context.getParentJobName());
builder.withTag("openlineage_parent_run_id", context.getParentRunId());
} else {
log.debug("Openlineage context not found");
}
builder.withTag("openlineage_parent_job_namespace", context.getParentJobNamespace());
builder.withTag("openlineage_parent_job_name", context.getParentJobName());
builder.withTag("openlineage_parent_run_id", context.getParentRunId());
}

@Override
public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
log.info(
"Received spark application end event, finish trace on this event: {}",
finishTraceOnApplicationEnd);
notifyOl(x -> openLineageSparkListener.onApplicationEnd(x), applicationEnd);

if (finishTraceOnApplicationEnd) {
finishApplication(applicationEnd.time(), null, 0, null);
Expand Down Expand Up @@ -405,6 +470,7 @@ public synchronized void onJobStart(SparkListenerJobStart jobStart) {
if (sqlSpan != null) {
jobSpanBuilder.asChildOf(sqlSpan.context());
} else if (batchKey != null) {
isStreamingJob = true;
AgentSpan batchSpan =
getOrCreateStreamingBatchSpan(batchKey, jobStart.time(), jobStart.properties());
jobSpanBuilder.asChildOf(batchSpan.context());
Expand All @@ -426,6 +492,7 @@ public synchronized void onJobStart(SparkListenerJobStart jobStart) {
stageToJob.put(stageId, jobStart.jobId());
}
jobSpans.put(jobStart.jobId(), jobSpan);
notifyOl(x -> openLineageSparkListener.onJobStart(x), jobStart);
}

@Override
Expand Down Expand Up @@ -456,6 +523,7 @@ public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) {
if (metrics != null) {
metrics.setSpanMetrics(jobSpan);
}
notifyOl(x -> openLineageSparkListener.onJobEnd(x), jobEnd);

jobSpan.finish(jobEnd.time() * 1000);
}
Expand Down Expand Up @@ -624,6 +692,8 @@ public void onTaskEnd(SparkListenerTaskEnd taskEnd) {

Properties props = stageProperties.get(stageSpanKey);
sendTaskSpan(stageSpan, taskEnd, props);

notifyOl(x -> openLineageSparkListener.onTaskEnd(x), taskEnd);
}

private void sendTaskSpan(
Expand Down Expand Up @@ -705,6 +775,25 @@ public void onOtherEvent(SparkListenerEvent event) {
updateAdaptiveSQLPlan(event);
}

private <T extends SparkListenerEvent> void notifyOl(Consumer<T> ol, T event) {
if (!Config.get().isDataJobsOpenLineageEnabled()) {
log.debug("Ignoring event {} - OpenLineage not enabled", event);
return;
}

if (isRunningOnDatabricks || isStreamingJob) {
log.debug("Not emitting event when running on databricks or on streaming jobs");
return;
}
if (openLineageSparkListener != null) {
log.debug(
"Passing event `{}` to OpenLineageSparkListener", event.getClass().getCanonicalName());
ol.accept(event);
} else {
log.debug("OpenLineageSparkListener is null");
}
}

private static final Class<?> adaptiveExecutionUpdateClass;
private static final MethodHandle adaptiveExecutionIdMethod;
private static final MethodHandle adaptiveSparkPlanMethod;
Expand Down Expand Up @@ -753,6 +842,7 @@ private synchronized void updateAdaptiveSQLPlan(SparkListenerEvent event) {
private synchronized void onSQLExecutionStart(SparkListenerSQLExecutionStart sqlStart) {
sqlPlans.put(sqlStart.executionId(), sqlStart.sparkPlanInfo());
sqlQueries.put(sqlStart.executionId(), sqlStart);
notifyOl(x -> openLineageSparkListener.onOtherEvent(x), sqlStart);
}

private synchronized void onSQLExecutionEnd(SparkListenerSQLExecutionEnd sqlEnd) {
Expand All @@ -765,6 +855,7 @@ private synchronized void onSQLExecutionEnd(SparkListenerSQLExecutionEnd sqlEnd)
if (metrics != null) {
metrics.setSpanMetrics(span);
}
notifyOl(x -> openLineageSparkListener.onOtherEvent(x), sqlEnd);

span.finish(sqlEnd.time() * 1000);
}
Expand Down Expand Up @@ -1260,6 +1351,15 @@ private static String getDatabricksRunName(SparkConf conf) {
return null;
}

private static String getAgentHttpUrl() {
StringBuilder sb =
new StringBuilder("http://")
.append(Config.get().getAgentHost())
.append(":")
.append(Config.get().getAgentPort());
return sb.toString();
}

@SuppressForbidden // called at most once per spark application
private static String removeUuidFromEndOfString(String input) {
return input.replaceAll(
Expand Down
Loading