Skip to content

Commit 0c12dc1

Browse files
committed
profiling: test using reqwest instead of hyper for exporting
1 parent 6b26318 commit 0c12dc1

File tree

8 files changed

+1619
-65
lines changed

8 files changed

+1619
-65
lines changed

Cargo.lock

Lines changed: 523 additions & 56 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

libdd-profiling/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,11 @@ thiserror = "2"
5050
tokio = {version = "1.23", features = ["rt", "macros"]}
5151
tokio-util = "0.7.1"
5252
zstd = { version = "0.13", default-features = false }
53+
reqwest = { version = "0.12.23", features = ["multipart", "rustls-tls"], default-features = false }
5354

5455
[dev-dependencies]
5556
bolero = "0.13"
5657
criterion = "0.5.1"
5758
lz4_flex = { version = "0.9", default-features = false, features = ["std", "frame"] }
5859
proptest = "1"
60+
wiremock = "0.5"

libdd-profiling/src/exporter/mod.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use libdd_common::{
2121

2222
pub mod config;
2323
mod errors;
24+
pub mod reqwest_exporter;
2425

2526
#[cfg(unix)]
2627
pub use connector::uds::{socket_path_from_uri, socket_path_to_uri};
@@ -235,11 +236,11 @@ impl ProfileExporter {
235236
// between them.
236237
tags_profiler.push_str(self.runtime_platform_tag().as_ref());
237238

238-
let attachments: Vec<String> = files_to_compress_and_export
239+
let attachments: Vec<&str> = files_to_compress_and_export
239240
.iter()
240241
.chain(files_to_export_unmodified.iter())
241-
.map(|file| file.name.to_owned())
242-
.chain(iter::once("profile.pprof".to_string()))
242+
.map(|file| file.name)
243+
.chain(iter::once("profile.pprof"))
243244
.collect();
244245

245246
let endpoint_counts = if profile.endpoints_stats.is_empty() {
@@ -260,14 +261,13 @@ impl ProfileExporter {
260261
"endpoint_counts" : endpoint_counts,
261262
"internal": internal,
262263
"info": info.unwrap_or_else(|| json!({})),
263-
})
264-
.to_string();
264+
});
265265

266266
form.add_reader_file_with_mime(
267267
// Intake does not look for filename=event.json, it looks for name=event.
268268
"event",
269269
// this one shouldn't be compressed
270-
Cursor::new(event),
270+
Cursor::new(serde_json::to_vec(&event)?),
271271
"event.json",
272272
mime::APPLICATION_JSON,
273273
);
Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
//! Reqwest-based profiling exporter
5+
//!
6+
//! This is a simplified async implementation using reqwest.
7+
8+
use anyhow::Context;
9+
use libdd_common::{azure_app_services, tag, Endpoint};
10+
use libdd_common::tag::Tag;
11+
use serde_json::json;
12+
use std::{future, io::Write};
13+
use tokio_util::sync::CancellationToken;
14+
15+
use crate::internal::{EncodedProfile, Profile};
16+
use crate::profiles::{Compressor, DefaultProfileCodec};
17+
18+
pub struct ProfileExporter {
19+
client: reqwest::Client,
20+
endpoint: Endpoint,
21+
family: String,
22+
profiling_library_name: String,
23+
profiling_library_version: String,
24+
tags: Vec<Tag>,
25+
user_agent: String,
26+
}
27+
28+
pub struct File<'a> {
29+
pub name: &'a str,
30+
pub bytes: &'a [u8],
31+
}
32+
33+
impl ProfileExporter {
34+
/// Creates a new exporter to be used to report profiling data.
35+
///
36+
/// Note: Reqwest v0.12.23+ includes automatic retry support for transient failures.
37+
/// The default configuration automatically retries safe errors and low-level protocol NACKs.
38+
/// For custom retry policies, users can configure the reqwest client before creating the exporter.
39+
pub fn new(
40+
profiling_library_name: impl Into<String>,
41+
profiling_library_version: impl Into<String>,
42+
family: impl Into<String>,
43+
tags: Vec<Tag>,
44+
endpoint: Endpoint,
45+
) -> anyhow::Result<Self> {
46+
let mut builder = reqwest::Client::builder().use_rustls_tls();
47+
48+
// Check if this is a Unix Domain Socket
49+
#[cfg(unix)]
50+
if endpoint.url.scheme_str() == Some("unix") {
51+
use libdd_common::connector::uds::socket_path_from_uri;
52+
let socket_path = socket_path_from_uri(&endpoint.url)?;
53+
builder = builder.unix_socket(socket_path);
54+
}
55+
56+
Ok(Self {
57+
client: builder.build()?,
58+
endpoint,
59+
family: family.into(),
60+
profiling_library_name: profiling_library_name.into(),
61+
profiling_library_version: profiling_library_version.into(),
62+
tags,
63+
user_agent: format!("DDProf/{}", env!("CARGO_PKG_VERSION")),
64+
})
65+
}
66+
67+
/// Build and send a profile. Returns the HTTP status code.
68+
#[allow(clippy::too_many_arguments)]
69+
pub async fn send<'a>(
70+
&self,
71+
profile: EncodedProfile,
72+
files_to_compress_and_export: &'a [File<'a>],
73+
files_to_export_unmodified: &'a [File<'a>],
74+
additional_tags: &[Tag],
75+
internal_metadata: Option<serde_json::Value>,
76+
info: Option<serde_json::Value>,
77+
cancel: Option<&CancellationToken>,
78+
) -> anyhow::Result<reqwest::StatusCode> {
79+
let tags_profiler = self.build_tags_string(additional_tags);
80+
let event = self.build_event_json(&profile, files_to_compress_and_export,
81+
files_to_export_unmodified, &tags_profiler, internal_metadata, info);
82+
83+
let form = self.build_multipart_form(event, profile, files_to_compress_and_export,
84+
files_to_export_unmodified)?;
85+
86+
// For Unix Domain Sockets, convert the URL to use http:// scheme
87+
// The socket path was already configured in the client builder
88+
let url = if self.endpoint.url.scheme_str() == Some("unix") {
89+
// Use localhost as a placeholder - the actual connection goes through the UDS
90+
format!("http://localhost{}", self.endpoint.url.path())
91+
} else {
92+
self.endpoint.url.to_string()
93+
};
94+
95+
// Build request
96+
let request = self.client
97+
.post(url)
98+
.header("Connection", "close")
99+
.header("DD-EVP-ORIGIN", &self.profiling_library_name)
100+
.header("DD-EVP-ORIGIN-VERSION", &self.profiling_library_version)
101+
.header("User-Agent", &self.user_agent)
102+
.headers(self.build_optional_headers())
103+
.timeout(std::time::Duration::from_millis(self.endpoint.timeout_ms))
104+
.multipart(form)
105+
.build()?;
106+
107+
// Send request with cancellation support
108+
tokio::select! {
109+
_ = async {
110+
match cancel {
111+
Some(token) => token.cancelled().await,
112+
None => future::pending().await,
113+
}
114+
} => Err(anyhow::anyhow!("Operation cancelled by user")),
115+
result = self.client.execute(request) => {
116+
Ok(result?.status())
117+
}
118+
}
119+
}
120+
121+
// Helper methods
122+
123+
fn build_tags_string(&self, additional_tags: &[Tag]) -> String {
124+
let mut tags = String::new();
125+
126+
// Add configured tags
127+
for tag in &self.tags {
128+
tags.push_str(tag.as_ref());
129+
tags.push(',');
130+
}
131+
132+
// Add additional tags
133+
for tag in additional_tags {
134+
tags.push_str(tag.as_ref());
135+
tags.push(',');
136+
}
137+
138+
// Add Azure App Services tags if available
139+
if let Some(aas) = &*azure_app_services::AAS_METADATA {
140+
for (name, value) in [
141+
("aas.resource.id", aas.get_resource_id()),
142+
("aas.environment.extension_version", aas.get_extension_version()),
143+
("aas.environment.instance_id", aas.get_instance_id()),
144+
("aas.environment.instance_name", aas.get_instance_name()),
145+
("aas.environment.os", aas.get_operating_system()),
146+
("aas.resource.group", aas.get_resource_group()),
147+
("aas.site.name", aas.get_site_name()),
148+
("aas.site.kind", aas.get_site_kind()),
149+
("aas.site.type", aas.get_site_type()),
150+
("aas.subscription.id", aas.get_subscription_id()),
151+
] {
152+
if let Ok(tag) = Tag::new(name, value) {
153+
tags.push_str(tag.as_ref());
154+
tags.push(',');
155+
}
156+
}
157+
}
158+
159+
// Add runtime platform tag (last, no trailing comma)
160+
tags.push_str(tag!("runtime_platform", target_triple::TARGET).as_ref());
161+
tags
162+
}
163+
164+
fn build_event_json(
165+
&self,
166+
profile: &EncodedProfile,
167+
files_to_compress: &[File],
168+
files_unmodified: &[File],
169+
tags_profiler: &str,
170+
internal_metadata: Option<serde_json::Value>,
171+
info: Option<serde_json::Value>,
172+
) -> serde_json::Value {
173+
let attachments: Vec<_> = files_to_compress.iter()
174+
.chain(files_unmodified.iter())
175+
.map(|f| f.name)
176+
.chain(std::iter::once("profile.pprof"))
177+
.collect();
178+
179+
let mut internal = internal_metadata.unwrap_or_else(|| json!({}));
180+
internal["libdatadog_version"] = json!(env!("CARGO_PKG_VERSION"));
181+
182+
json!({
183+
"attachments": attachments,
184+
"tags_profiler": tags_profiler,
185+
"start": chrono::DateTime::<chrono::Utc>::from(profile.start)
186+
.format("%Y-%m-%dT%H:%M:%S%.9fZ").to_string(),
187+
"end": chrono::DateTime::<chrono::Utc>::from(profile.end)
188+
.format("%Y-%m-%dT%H:%M:%S%.9fZ").to_string(),
189+
"family": self.family,
190+
"version": "4",
191+
"endpoint_counts": if profile.endpoints_stats.is_empty() {
192+
None
193+
} else {
194+
Some(&profile.endpoints_stats)
195+
},
196+
"internal": internal,
197+
"info": info.unwrap_or_else(|| json!({})),
198+
})
199+
}
200+
201+
fn build_multipart_form(
202+
&self,
203+
event: serde_json::Value,
204+
profile: EncodedProfile,
205+
files_to_compress: &[File],
206+
files_unmodified: &[File],
207+
) -> anyhow::Result<reqwest::multipart::Form> {
208+
let event_bytes = serde_json::to_vec(&event)?;
209+
210+
let mut form = reqwest::multipart::Form::new()
211+
.part(
212+
"event",
213+
reqwest::multipart::Part::bytes(event_bytes)
214+
.file_name("event.json")
215+
.mime_str("application/json")?,
216+
);
217+
218+
// Add compressed files
219+
for file in files_to_compress {
220+
let mut encoder = Compressor::<DefaultProfileCodec>::try_new(
221+
(file.bytes.len() >> 3).next_power_of_two(),
222+
10 * 1024 * 1024,
223+
Profile::COMPRESSION_LEVEL,
224+
)
225+
.context("failed to create compressor")?;
226+
encoder.write_all(file.bytes)?;
227+
228+
form = form.part(
229+
file.name.to_string(),
230+
reqwest::multipart::Part::bytes(encoder.finish()?)
231+
.file_name(file.name.to_string()),
232+
);
233+
}
234+
235+
// Add uncompressed files
236+
for file in files_unmodified {
237+
form = form.part(
238+
file.name.to_string(),
239+
reqwest::multipart::Part::bytes(file.bytes.to_vec())
240+
.file_name(file.name.to_string()),
241+
);
242+
}
243+
244+
// Add profile
245+
Ok(form.part(
246+
"profile.pprof",
247+
reqwest::multipart::Part::bytes(profile.buffer)
248+
.file_name("profile.pprof"),
249+
))
250+
}
251+
252+
fn build_optional_headers(&self) -> reqwest::header::HeaderMap {
253+
let mut headers = reqwest::header::HeaderMap::new();
254+
255+
if let Some(api_key) = &self.endpoint.api_key {
256+
if let Ok(value) = reqwest::header::HeaderValue::from_str(api_key) {
257+
headers.insert("DD-API-KEY", value);
258+
}
259+
}
260+
261+
if let Some(test_token) = &self.endpoint.test_token {
262+
if let Ok(value) = reqwest::header::HeaderValue::from_str(test_token) {
263+
headers.insert("X-Datadog-Test-Session-Token", value);
264+
}
265+
}
266+
267+
headers
268+
}
269+
}
270+

libdd-profiling/src/internal/profile/mod.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -536,6 +536,46 @@ impl Profile {
536536
Ok(self)
537537
}
538538

539+
/// Convenience method to serialize and export the profile to a reqwest exporter.
540+
///
541+
/// # Arguments
542+
/// * `exporter` - The reqwest ProfileExporter to send the profile to
543+
/// * `files_to_compress_and_export` - Additional files to compress and attach to the profile
544+
/// * `files_to_export_unmodified` - Additional files to attach without compression
545+
/// * `additional_tags` - Tags to add to this specific profile (in addition to the exporter's configured tags)
546+
/// * `internal_metadata` - Internal metadata to include in the event JSON
547+
/// * `info` - System info to include in the event JSON
548+
/// * `end_time` - Optional end time (defaults to now)
549+
/// * `duration` - Optional duration (defaults to end_time - start_time)
550+
/// * `cancel` - Optional cancellation token to abort the upload
551+
///
552+
/// # Returns
553+
/// The HTTP status code from the upload
554+
#[allow(clippy::too_many_arguments)]
555+
pub async fn export_to_endpoint<'a>(
556+
self,
557+
exporter: &crate::exporter::reqwest_exporter::ProfileExporter,
558+
files_to_compress_and_export: &'a [crate::exporter::reqwest_exporter::File<'a>],
559+
files_to_export_unmodified: &'a [crate::exporter::reqwest_exporter::File<'a>],
560+
additional_tags: &[libdd_common::tag::Tag],
561+
internal_metadata: Option<serde_json::Value>,
562+
info: Option<serde_json::Value>,
563+
end_time: Option<SystemTime>,
564+
duration: Option<Duration>,
565+
cancel: Option<&tokio_util::sync::CancellationToken>,
566+
) -> anyhow::Result<reqwest::StatusCode> {
567+
let encoded = self.serialize_into_compressed_pprof(end_time, duration)?;
568+
exporter.send(
569+
encoded,
570+
files_to_compress_and_export,
571+
files_to_export_unmodified,
572+
additional_tags,
573+
internal_metadata,
574+
info,
575+
cancel,
576+
).await
577+
}
578+
539579
/// In incident 35390 (JIRA PROF-11456) we observed invalid location_ids being present in
540580
/// emitted profiles. We're doing extra checks here so that if we see incorrect ids again,
541581
/// we are 100% sure they were not introduced prior to this stage.

0 commit comments

Comments
 (0)