Skip to content

Paullgdc/trace exporter/export public send collection #992

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
81 changes: 58 additions & 23 deletions data-pipeline/src/trace_exporter/mod.rs
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@ use arc_swap::{ArcSwap, ArcSwapOption};
use bytes::Bytes;
use datadog_trace_utils::msgpack_decoder::{self, decode::error::DecodeError};
use datadog_trace_utils::send_with_retry::{send_with_retry, RetryStrategy, SendWithRetryError};
use datadog_trace_utils::span::SpanBytes;
use datadog_trace_utils::trace_utils::{self, TracerHeaderTags};
use datadog_trace_utils::tracer_payload::{self, TraceCollection};
use ddcommon::header::{
@@ -230,11 +231,11 @@ impl TraceExporter {
match self.input_format {
TraceExporterInputFormat::Proxy => self.send_proxy(data.as_ref(), trace_count),
TraceExporterInputFormat::V04 => match msgpack_decoder::v04::from_slice(data) {
Ok((traces, _)) => self.send_deser_ser(TraceCollection::TraceChunk(traces)),
Ok((traces, _)) => self.send_trace_collection(TraceCollection::TraceChunk(traces)),
Err(e) => Err(TraceExporterError::Deserialization(e)),
},
TraceExporterInputFormat::V05 => match msgpack_decoder::v05::from_slice(data) {
Ok((traces, _)) => self.send_deser_ser(TraceCollection::TraceChunk(traces)),
Ok((traces, _)) => self.send_trace_collection(TraceCollection::TraceChunk(traces)),
Err(e) => Err(TraceExporterError::Deserialization(e)),
},
}
@@ -438,6 +439,33 @@ impl TraceExporter {
}
}

/// !!! This function is only for testing purposes !!!
/// This function waits the agent info to be ready by checking the agent_info state.
/// It will only return Ok after the agent info has been fetched at least once or Err if timeout
/// has been reached
///
/// In production:
/// 1) We should not synchronously wait for this to be ready before sending traces
/// 2) It's not guaranteed to not block forever, since the /info endpoint might not be
/// available.
///
/// The `send`` function will check agent_info when running, which will only be available if the
/// fetcher had time to reach to the agent.
/// Since agent_info can enable CSS computation, waiting for this during testing can make
/// snapshots non-determinitic.
pub fn wait_agent_info_ready(&self, timeout: Duration) -> anyhow::Result<()> {
let start = std::time::Instant::now();
loop {
if std::time::Instant::now().duration_since(start) > timeout {
anyhow::bail!("Timeout waiting for agent info to be ready",);
}
if self.agent_info.load().is_some() {
return Ok(());
}
std::thread::sleep(Duration::from_millis(10));
}
}

fn send_proxy(&self, data: &[u8], trace_count: usize) -> Result<String, TraceExporterError> {
self.send_data_to_url(
data,
@@ -559,30 +587,34 @@ impl TraceExporter {
/// Add all spans from the given iterator into the stats concentrator
/// # Panic
/// Will panic if another thread panicked will holding the lock on `stats_concentrator`
fn add_spans_to_stats(&self, collection: &TraceCollection) {
if let StatsComputationStatus::Enabled {
stats_concentrator,
cancellation_token: _,
exporter_handle: _,
} = &**self.client_side_stats.load()
{
#[allow(clippy::unwrap_used)]
let mut stats_concentrator = stats_concentrator.lock().unwrap();

match collection {
TraceCollection::TraceChunk(traces) => {
let spans = traces.iter().flat_map(|trace| trace.iter());
for span in spans {
stats_concentrator.add_span(span);
}
fn add_spans_to_stats(
&self,
collection: &TraceCollection,
concentrator: &Mutex<SpanConcentrator>,
) {
match collection {
TraceCollection::TraceChunk(traces) => {
#[allow(clippy::unwrap_used)]
let mut stats_concentrator = concentrator.lock().unwrap();
let spans = traces.iter().flat_map(|trace| trace.iter());
for span in spans {
stats_concentrator.add_span(span);
}
// TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190).
TraceCollection::V07(_) => unreachable!(),
}
// TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190).
TraceCollection::V07(_) => unreachable!(),
}
}

fn send_deser_ser(
pub fn send_trace_chunks(
&self,
trace_chunks: Vec<Vec<SpanBytes>>,
) -> Result<String, TraceExporterError> {
self.check_agent_info();
self.send_trace_collection(TraceCollection::TraceChunk(trace_chunks))
}

fn send_trace_collection(
&self,
mut collection: TraceCollection,
) -> Result<String, TraceExporterError> {
@@ -594,11 +626,14 @@ impl TraceExporter {
let mut header_tags: TracerHeaderTags = self.metadata.borrow().into();

// Stats computation
if let StatsComputationStatus::Enabled { .. } = &**self.client_side_stats.load() {
if let StatsComputationStatus::Enabled {
stats_concentrator, ..
} = &**self.client_side_stats.load()
{
if !self.client_computed_top_level {
collection.set_top_level_spans();
}
self.add_spans_to_stats(&collection);
self.add_spans_to_stats(&collection, stats_concentrator);
// Once stats have been computed we can drop all chunks that are not going to be
// sampled by the agent
let (dropped_p0_traces, dropped_p0_spans) = collection.drop_chunks();
23 changes: 19 additions & 4 deletions tinybytes/src/bytes_string.rs
Original file line number Diff line number Diff line change
@@ -47,6 +47,23 @@ impl BytesString {
})
}

pub fn from_static(value: &'static str) -> Self {
// SAFETY: This is safe as a str is always a valid UTF-8 slice.
unsafe { Self::from_bytes_unchecked(Bytes::from_static(value.as_bytes())) }
}

pub fn from_string(value: String) -> Self {
// SAFETY: This is safe as a String is always a valid UTF-8 slice.
unsafe { Self::from_bytes_unchecked(Bytes::from_underlying(value)) }
}

pub fn from_cow(cow: std::borrow::Cow<'static, str>) -> Self {
match cow {
std::borrow::Cow::Borrowed(s) => Self::from_static(s),
std::borrow::Cow::Owned(s) => Self::from_string(s),
}
}

/// Creates a `BytesString` from a `tinybytes::Bytes` instance.
///
/// This function validates that the provided `Bytes` instance contains valid UTF-8 data. If the
@@ -144,15 +161,13 @@ impl AsRef<str> for BytesString {

impl From<String> for BytesString {
fn from(value: String) -> Self {
// SAFETY: This is safe as a String is always a valid UTF-8 slice.
unsafe { Self::from_bytes_unchecked(Bytes::from_underlying(value)) }
Self::from_string(value)
}
}

impl From<&'static str> for BytesString {
fn from(value: &'static str) -> Self {
// SAFETY: This is safe as a str is always a valid UTF-8 slice.
unsafe { Self::from_bytes_unchecked(Bytes::from_static(value.as_bytes())) }
Self::from_static(value)
}
}

19 changes: 19 additions & 0 deletions trace-utils/src/test_utils/datadog_test_agent.rs
Original file line number Diff line number Diff line change
@@ -101,6 +101,12 @@ impl DatadogTestAgentContainer {

DatadogTestAgentContainer { mounts, env_vars }
}

pub fn with_env(mut self, key: &str, value: &str) -> Self {
self.env_vars.insert(key.to_string(), value.to_string());
self
}

// The docker image requires an absolute path when mounting a volume. This function gets the
// absolute path of the workspace and appends the provided relative path.
fn calculate_volume_absolute_path(relative_snapshot_path: &str) -> String {
@@ -199,6 +205,19 @@ impl DatadogTestAgent {
}
}

pub async fn new_create_snapshot(
relative_snapshot_path: Option<&str>,
absolute_socket_path: Option<&str>,
) -> Self {
DatadogTestAgent {
container: DatadogTestAgentContainer::new(relative_snapshot_path, absolute_socket_path)
.with_env("SNAPSHOT_CI", "0")
.start()
.await
.expect("Unable to start DatadogTestAgent, is the Docker Daemon running?"),
}
}

async fn get_base_uri_string(&self) -> String {
let container_host = self.container.get_host().await.unwrap().to_string();
let container_port = self