Skip to content
28 changes: 15 additions & 13 deletions bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#![deny(unused_assignments)]
#![deny(unused_comparisons)]
#![deny(unreachable_pub)]
#![deny(missing_copy_implementations)]

Check warning on line 9 in bottlecap/src/bin/bottlecap/main.rs

View workflow job for this annotation

GitHub Actions / Format (ubuntu-22.04)

Diff in /home/runner/work/datadog-lambda-extension/datadog-lambda-extension/bottlecap/src/bin/bottlecap/main.rs
#![deny(missing_debug_implementations)]

use bottlecap::{
Expand All @@ -28,14 +28,11 @@
telemetry::{
self,
client::TelemetryApiClient,
events::{Status, TelemetryEvent, TelemetryRecord},

Check warning on line 31 in bottlecap/src/bin/bottlecap/main.rs

View workflow job for this annotation

GitHub Actions / Format (ubuntu-22.04)

Diff in /home/runner/work/datadog-lambda-extension/datadog-lambda-extension/bottlecap/src/bin/bottlecap/main.rs
listener::TelemetryListener,
},
traces::{
stats_aggregator::StatsAggregator,
stats_flusher::{self, StatsFlusher},
stats_processor, trace_agent, trace_aggregator,
trace_flusher::{self, TraceFlusher},
trace_processor,
},
DOGSTATSD_PORT, EXTENSION_ACCEPT_FEATURE_HEADER, EXTENSION_FEATURES, EXTENSION_HOST,
Expand Down Expand Up @@ -67,9 +64,11 @@
};
use telemetry::listener::TelemetryListenerConfig;
use tokio::{sync::mpsc::Sender, sync::Mutex as TokioMutex};
use tokio_util::sync::CancellationToken;

Check warning on line 67 in bottlecap/src/bin/bottlecap/main.rs

View workflow job for this annotation

GitHub Actions / Format (ubuntu-22.04)

Diff in /home/runner/work/datadog-lambda-extension/datadog-lambda-extension/bottlecap/src/bin/bottlecap/main.rs
use tracing::{debug, error};
use tracing_subscriber::EnvFilter;
use bottlecap::traces::trace_aggregator::{BatchAggregator, MAX_CONTENT_SIZE_BYTES_CPS, MAX_CONTENT_SIZE_BYTES_SD};
use bottlecap::traces::trace_flusher::ServerlessBatchFlusher;

#[derive(Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -541,26 +540,29 @@
tags_provider: &Arc<TagProvider>,
) -> (
Sender<datadog_trace_utils::send_data::SendData>,
Arc<trace_flusher::ServerlessTraceFlusher>,
Arc<ServerlessBatchFlusher>,
Arc<trace_processor::ServerlessTraceProcessor>,
Arc<stats_flusher::ServerlessStatsFlusher>,
Arc<ServerlessBatchFlusher>,
) {
// Stats
let stats_aggregator = Arc::new(TokioMutex::new(StatsAggregator::default()));
let stats_flusher = Arc::new(stats_flusher::ServerlessStatsFlusher::new(
resolved_api_key.clone(),
let stats_aggregator = Arc::new(TokioMutex::new(BatchAggregator::new(
MAX_CONTENT_SIZE_BYTES_CPS,
)));
let stats_flusher = Arc::new(ServerlessBatchFlusher::new(
stats_aggregator.clone(),
Arc::clone(config),
));

let stats_processor = Arc::new(stats_processor::ServerlessStatsProcessor {});

// Traces
let trace_aggregator = Arc::new(TokioMutex::new(trace_aggregator::TraceAggregator::default()));
let trace_flusher = Arc::new(trace_flusher::ServerlessTraceFlusher {
aggregator: trace_aggregator.clone(),
config: Arc::clone(config),
});
let trace_aggregator = Arc::new(TokioMutex::new(trace_aggregator::BatchAggregator::new(

Check warning on line 559 in bottlecap/src/bin/bottlecap/main.rs

View workflow job for this annotation

GitHub Actions / Format (ubuntu-22.04)

Diff in /home/runner/work/datadog-lambda-extension/datadog-lambda-extension/bottlecap/src/bin/bottlecap/main.rs
MAX_CONTENT_SIZE_BYTES_SD,
)));
let trace_flusher = Arc::new(ServerlessBatchFlusher::new (
trace_aggregator.clone(),
Arc::clone(config),
));

let obfuscation_config = obfuscation_config::ObfuscationConfig::new()
.map_err(|e| Error::new(std::io::ErrorKind::InvalidData, e.to_string()))
Expand Down
2 changes: 0 additions & 2 deletions bottlecap/src/traces/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
pub mod context;
pub mod propagation;
pub mod span_pointers;
pub mod stats_aggregator;
pub mod stats_flusher;
pub mod stats_processor;
pub mod trace_agent;
pub mod trace_aggregator;
Expand Down
167 changes: 0 additions & 167 deletions bottlecap/src/traces/stats_aggregator.rs

This file was deleted.

104 changes: 0 additions & 104 deletions bottlecap/src/traces/stats_flusher.rs

This file was deleted.

13 changes: 7 additions & 6 deletions bottlecap/src/traces/trace_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::Mutex;
use tracing::{debug, error};

Check warning on line 14 in bottlecap/src/traces/trace_agent.rs

View workflow job for this annotation

GitHub Actions / Format (ubuntu-22.04)

Diff in /home/runner/work/datadog-lambda-extension/datadog-lambda-extension/bottlecap/src/traces/trace_agent.rs
use crate::config;
use crate::tags::provider;
use crate::traces::{stats_aggregator, stats_processor, trace_aggregator, trace_processor};
use crate::traces::{stats_processor, trace_processor};
use datadog_trace_mini_agent::http_utils::{
self, log_and_create_http_response, log_and_create_traces_success_http_response,
};

Check warning on line 20 in bottlecap/src/traces/trace_agent.rs

View workflow job for this annotation

GitHub Actions / Format (ubuntu-22.04)

Diff in /home/runner/work/datadog-lambda-extension/datadog-lambda-extension/bottlecap/src/traces/trace_agent.rs
use datadog_trace_protobuf::pb;
use datadog_trace_utils::trace_utils::{self, SendData};
use crate::traces::trace_aggregator::{BatchAggregator, BatchData};

const TRACE_AGENT_PORT: usize = 8126;
const V4_TRACE_ENDPOINT_PATH: &str = "/v0.4/traces";
Expand All @@ -33,7 +34,7 @@
pub struct TraceAgent {
pub config: Arc<config::Config>,
pub trace_processor: Arc<dyn trace_processor::TraceProcessor + Send + Sync>,
pub stats_aggregator: Arc<Mutex<stats_aggregator::StatsAggregator>>,
pub stats_aggregator: Arc<Mutex<BatchAggregator>>,
pub stats_processor: Arc<dyn stats_processor::StatsProcessor + Send + Sync>,
pub tags_provider: Arc<provider::Provider>,
tx: Sender<SendData>,
Expand All @@ -50,9 +51,9 @@
#[allow(clippy::too_many_arguments)]
pub fn new(
config: Arc<config::Config>,
trace_aggregator: Arc<Mutex<trace_aggregator::TraceAggregator>>,
trace_aggregator: Arc<Mutex<BatchAggregator>>,
trace_processor: Arc<dyn trace_processor::TraceProcessor + Send + Sync>,
stats_aggregator: Arc<Mutex<stats_aggregator::StatsAggregator>>,
stats_aggregator: Arc<Mutex<BatchAggregator>>,
stats_processor: Arc<dyn stats_processor::StatsProcessor + Send + Sync>,
tags_provider: Arc<provider::Provider>,
) -> TraceAgent {
Expand All @@ -68,7 +69,7 @@
tokio::spawn(async move {
while let Some(tracer_payload) = trace_rx.recv().await {
let mut aggregator = trace_aggregator.lock().await;
aggregator.add(tracer_payload);
aggregator.add(BatchData::SD(tracer_payload));
}
});

Expand Down Expand Up @@ -97,7 +98,7 @@
tokio::spawn(async move {
while let Some(stats_payload) = stats_rx.recv().await {
let mut aggregator = stats_aggregator.lock().await;
aggregator.add(stats_payload);
aggregator.add(BatchData::CSP(stats_payload));
}
});

Expand Down
Loading
Loading