Skip to content

Commit 8fc6fa9

Browse files
committed
profiling: test using reqwest instead of hyper for exporting
1 parent 6b26318 commit 8fc6fa9

File tree

8 files changed

+1603
-80
lines changed

8 files changed

+1603
-80
lines changed

Cargo.lock

Lines changed: 523 additions & 56 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

libdd-profiling/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,11 @@ thiserror = "2"
5050
tokio = {version = "1.23", features = ["rt", "macros"]}
5151
tokio-util = "0.7.1"
5252
zstd = { version = "0.13", default-features = false }
53+
reqwest = { version = "0.12.23", features = ["multipart", "rustls-tls"], default-features = false }
5354

5455
[dev-dependencies]
5556
bolero = "0.13"
5657
criterion = "0.5.1"
5758
lz4_flex = { version = "0.9", default-features = false, features = ["std", "frame"] }
5859
proptest = "1"
60+
wiremock = "0.5"

libdd-profiling/src/exporter/mod.rs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use libdd_common::{
2121

2222
pub mod config;
2323
mod errors;
24+
pub mod reqwest_exporter;
2425

2526
#[cfg(unix)]
2627
pub use connector::uds::{socket_path_from_uri, socket_path_to_uri};
@@ -235,11 +236,11 @@ impl ProfileExporter {
235236
// between them.
236237
tags_profiler.push_str(self.runtime_platform_tag().as_ref());
237238

238-
let attachments: Vec<String> = files_to_compress_and_export
239+
let attachments: Vec<&str> = files_to_compress_and_export
239240
.iter()
240-
.chain(files_to_export_unmodified.iter())
241-
.map(|file| file.name.to_owned())
242-
.chain(iter::once("profile.pprof".to_string()))
241+
.map(|file| file.name)
242+
.chain(files_to_export_unmodified.iter().map(|file| file.name))
243+
.chain(iter::once("profile.pprof"))
243244
.collect();
244245

245246
let endpoint_counts = if profile.endpoints_stats.is_empty() {
@@ -260,14 +261,13 @@ impl ProfileExporter {
260261
"endpoint_counts" : endpoint_counts,
261262
"internal": internal,
262263
"info": info.unwrap_or_else(|| json!({})),
263-
})
264-
.to_string();
264+
});
265265

266266
form.add_reader_file_with_mime(
267267
// Intake does not look for filename=event.json, it looks for name=event.
268268
"event",
269269
// this one shouldn't be compressed
270-
Cursor::new(event),
270+
Cursor::new(serde_json::to_vec(&event)?),
271271
"event.json",
272272
mime::APPLICATION_JSON,
273273
);
@@ -281,7 +281,7 @@ impl ProfileExporter {
281281
let max_capacity = 10 * 1024 * 1024;
282282
// We haven't yet tested compression for attachments other than
283283
// profiles, which are compressed already before this point. We're
284-
// re-using the same level here for now.
284+
// re-using the same level here for now.
285285
let compression_level = Profile::COMPRESSION_LEVEL;
286286
let mut encoder = Compressor::<DefaultProfileCodec>::try_new(
287287
capacity,
@@ -298,15 +298,13 @@ impl ProfileExporter {
298298
*/
299299
form.add_reader_file(file.name, Cursor::new(encoded), file.name);
300300
}
301-
302301
for file in files_to_export_unmodified {
303-
let encoded = file.bytes.to_vec();
304302
/* The Datadog RFC examples strip off the file extension, but the exact behavior
305303
* isn't specified. This does the simple thing of using the filename
306304
* without modification for the form name because intake does not care
307305
* about these name of the form field for these attachments.
308306
*/
309-
form.add_reader_file(file.name, Cursor::new(encoded), file.name)
307+
form.add_reader_file(file.name, Cursor::new(file.bytes.to_vec()), file.name);
310308
}
311309
// Add the actual pprof
312310
form.add_reader_file(
Lines changed: 275 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,275 @@
1+
// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
//! Reqwest-based profiling exporter
5+
//!
6+
//! This is a simplified async implementation using reqwest.
7+
8+
use anyhow::Context;
9+
use libdd_common::tag::Tag;
10+
use libdd_common::{azure_app_services, tag, Endpoint};
11+
use serde_json::json;
12+
use std::{future, io::Write};
13+
use tokio_util::sync::CancellationToken;
14+
15+
use crate::internal::{EncodedProfile, Profile};
16+
use crate::profiles::{Compressor, DefaultProfileCodec};
17+
18+
pub struct ProfileExporter {
19+
client: reqwest::Client,
20+
family: String,
21+
base_tags_string: String,
22+
request_url: String,
23+
headers: reqwest::header::HeaderMap,
24+
}
25+
26+
pub struct File<'a> {
27+
pub name: &'a str,
28+
pub bytes: &'a [u8],
29+
}
30+
31+
impl ProfileExporter {
32+
/// Creates a new exporter to be used to report profiling data.
33+
///
34+
/// Note: Reqwest v0.12.23+ includes automatic retry support for transient failures.
35+
/// The default configuration automatically retries safe errors and low-level protocol NACKs.
36+
/// For custom retry policies, users can configure the reqwest client before creating the
37+
/// exporter.
38+
pub fn new(
39+
profiling_library_name: &str,
40+
profiling_library_version: &str,
41+
family: &str,
42+
tags: Vec<Tag>,
43+
endpoint: Endpoint,
44+
) -> anyhow::Result<Self> {
45+
let mut builder = reqwest::Client::builder()
46+
.use_rustls_tls()
47+
.timeout(std::time::Duration::from_millis(endpoint.timeout_ms));
48+
49+
// Check if this is a Unix Domain Socket
50+
#[cfg(unix)]
51+
if endpoint.url.scheme_str() == Some("unix") {
52+
use libdd_common::connector::uds::socket_path_from_uri;
53+
let socket_path = socket_path_from_uri(&endpoint.url)?;
54+
builder = builder.unix_socket(socket_path);
55+
}
56+
57+
// For Unix Domain Sockets, we need to use http://localhost as the URL
58+
// The socket path is configured on the client, so we convert the URL here
59+
let request_url = if endpoint.url.scheme_str() == Some("unix") {
60+
format!("http://localhost{}", endpoint.url.path())
61+
} else {
62+
endpoint.url.to_string()
63+
};
64+
65+
// Pre-build all static headers
66+
let mut headers = reqwest::header::HeaderMap::new();
67+
68+
// Always-present headers
69+
headers.insert(
70+
"Connection",
71+
reqwest::header::HeaderValue::from_static("close"),
72+
);
73+
headers.insert(
74+
"DD-EVP-ORIGIN",
75+
reqwest::header::HeaderValue::from_str(profiling_library_name)?,
76+
);
77+
headers.insert(
78+
"DD-EVP-ORIGIN-VERSION",
79+
reqwest::header::HeaderValue::from_str(profiling_library_version)?,
80+
);
81+
82+
let user_agent = format!("DDProf/{}", env!("CARGO_PKG_VERSION"));
83+
headers.insert(
84+
"User-Agent",
85+
reqwest::header::HeaderValue::from_str(&user_agent)?,
86+
);
87+
88+
// Optional headers (API key, test token)
89+
// These can fail if they contain invalid characters, but we treat that as non-fatal
90+
// since they're provided by the user's configuration
91+
if let Some(api_key) = &endpoint.api_key {
92+
if let Ok(value) = reqwest::header::HeaderValue::from_str(api_key) {
93+
headers.insert("DD-API-KEY", value);
94+
}
95+
}
96+
if let Some(test_token) = &endpoint.test_token {
97+
if let Ok(value) = reqwest::header::HeaderValue::from_str(test_token) {
98+
headers.insert("X-Datadog-Test-Session-Token", value);
99+
}
100+
}
101+
102+
// Precompute the base tags string (includes configured tags + Azure App Services tags)
103+
let mut base_tags_string = String::new();
104+
for tag in &tags {
105+
base_tags_string.push_str(tag.as_ref());
106+
base_tags_string.push(',');
107+
}
108+
109+
// Add Azure App Services tags if available
110+
if let Some(aas) = &*azure_app_services::AAS_METADATA {
111+
for (name, value) in [
112+
("aas.resource.id", aas.get_resource_id()),
113+
(
114+
"aas.environment.extension_version",
115+
aas.get_extension_version(),
116+
),
117+
("aas.environment.instance_id", aas.get_instance_id()),
118+
("aas.environment.instance_name", aas.get_instance_name()),
119+
("aas.environment.os", aas.get_operating_system()),
120+
("aas.resource.group", aas.get_resource_group()),
121+
("aas.site.name", aas.get_site_name()),
122+
("aas.site.kind", aas.get_site_kind()),
123+
("aas.site.type", aas.get_site_type()),
124+
("aas.subscription.id", aas.get_subscription_id()),
125+
] {
126+
if let Ok(tag) = Tag::new(name, value) {
127+
base_tags_string.push_str(tag.as_ref());
128+
base_tags_string.push(',');
129+
}
130+
}
131+
}
132+
133+
Ok(Self {
134+
client: builder.build()?,
135+
family: family.to_string(),
136+
base_tags_string,
137+
request_url,
138+
headers,
139+
})
140+
}
141+
142+
/// Build and send a profile. Returns the HTTP status code.
143+
pub async fn send(
144+
&self,
145+
profile: EncodedProfile,
146+
additional_files: &[File<'_>],
147+
additional_tags: &[Tag],
148+
internal_metadata: Option<serde_json::Value>,
149+
info: Option<serde_json::Value>,
150+
cancel: Option<&CancellationToken>,
151+
) -> anyhow::Result<reqwest::StatusCode> {
152+
let tags_profiler = self.build_tags_string(additional_tags);
153+
let event = self.build_event_json(
154+
&profile,
155+
additional_files,
156+
&tags_profiler,
157+
internal_metadata,
158+
info,
159+
);
160+
161+
let form = self.build_multipart_form(event, profile, additional_files)?;
162+
163+
// Build request
164+
let request = self
165+
.client
166+
.post(&self.request_url)
167+
.headers(self.headers.clone())
168+
.multipart(form)
169+
.build()?;
170+
171+
// Send request with cancellation support
172+
tokio::select! {
173+
_ = async {
174+
match cancel {
175+
Some(token) => token.cancelled().await,
176+
None => future::pending().await,
177+
}
178+
} => Err(anyhow::anyhow!("Operation cancelled by user")),
179+
result = self.client.execute(request) => {
180+
Ok(result?.status())
181+
}
182+
}
183+
}
184+
185+
// Helper methods
186+
187+
fn build_tags_string(&self, additional_tags: &[Tag]) -> String {
188+
// Start with precomputed base tags (includes configured tags + Azure App Services tags)
189+
let mut tags = self.base_tags_string.clone();
190+
191+
// Add additional tags
192+
for tag in additional_tags {
193+
tags.push_str(tag.as_ref());
194+
tags.push(',');
195+
}
196+
197+
// Add runtime platform tag (last, no trailing comma)
198+
tags.push_str(tag!("runtime_platform", target_triple::TARGET).as_ref());
199+
tags
200+
}
201+
202+
fn build_event_json(
203+
&self,
204+
profile: &EncodedProfile,
205+
additional_files: &[File],
206+
tags_profiler: &str,
207+
internal_metadata: Option<serde_json::Value>,
208+
info: Option<serde_json::Value>,
209+
) -> serde_json::Value {
210+
let attachments: Vec<_> = additional_files
211+
.iter()
212+
.map(|f| f.name)
213+
.chain(std::iter::once("profile.pprof"))
214+
.collect();
215+
216+
let mut internal = internal_metadata.unwrap_or_else(|| json!({}));
217+
internal["libdatadog_version"] = json!(env!("CARGO_PKG_VERSION"));
218+
219+
json!({
220+
"attachments": attachments,
221+
"tags_profiler": tags_profiler,
222+
"start": chrono::DateTime::<chrono::Utc>::from(profile.start)
223+
.format("%Y-%m-%dT%H:%M:%S%.9fZ").to_string(),
224+
"end": chrono::DateTime::<chrono::Utc>::from(profile.end)
225+
.format("%Y-%m-%dT%H:%M:%S%.9fZ").to_string(),
226+
"family": self.family,
227+
"version": "4",
228+
"endpoint_counts": if profile.endpoints_stats.is_empty() {
229+
None
230+
} else {
231+
Some(&profile.endpoints_stats)
232+
},
233+
"internal": internal,
234+
"info": info.unwrap_or_else(|| json!({})),
235+
})
236+
}
237+
238+
fn build_multipart_form(
239+
&self,
240+
event: serde_json::Value,
241+
profile: EncodedProfile,
242+
additional_files: &[File],
243+
) -> anyhow::Result<reqwest::multipart::Form> {
244+
let event_bytes = serde_json::to_vec(&event)?;
245+
246+
let mut form = reqwest::multipart::Form::new().part(
247+
"event",
248+
reqwest::multipart::Part::bytes(event_bytes)
249+
.file_name("event.json")
250+
.mime_str("application/json")?,
251+
);
252+
253+
// Add additional files (compressed)
254+
for file in additional_files {
255+
let mut encoder = Compressor::<DefaultProfileCodec>::try_new(
256+
(file.bytes.len() >> 3).next_power_of_two(),
257+
10 * 1024 * 1024,
258+
Profile::COMPRESSION_LEVEL,
259+
)
260+
.context("failed to create compressor")?;
261+
encoder.write_all(file.bytes)?;
262+
263+
form = form.part(
264+
file.name.to_string(),
265+
reqwest::multipart::Part::bytes(encoder.finish()?).file_name(file.name.to_string()),
266+
);
267+
}
268+
269+
// Add profile
270+
Ok(form.part(
271+
"profile.pprof",
272+
reqwest::multipart::Part::bytes(profile.buffer).file_name("profile.pprof"),
273+
))
274+
}
275+
}

0 commit comments

Comments
 (0)