From a88ecc09e317c61710565eb859c133f760008d6d Mon Sep 17 00:00:00 2001 From: Florian Engelhardt Date: Fri, 28 Nov 2025 17:11:41 +0100 Subject: [PATCH] Switch from hyper and tokio to reqwest Co-authored-by: Gemini --- Cargo.lock | 238 ++++++++++++++++++---- libdd-profiling-ffi/Cargo.toml | 5 +- libdd-profiling-ffi/src/exporter.rs | 156 +++------------ libdd-profiling/Cargo.toml | 7 +- libdd-profiling/src/exporter/errors.rs | 25 --- libdd-profiling/src/exporter/mod.rs | 267 +++++++++++++------------ libdd-profiling/tests/form.rs | 20 +- 7 files changed, 371 insertions(+), 347 deletions(-) delete mode 100644 libdd-profiling/src/exporter/errors.rs diff --git a/Cargo.lock b/Cargo.lock index bc75503b2e..c844f049a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -844,22 +844,6 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" -[[package]] -name = "common-multipart-rfc7578" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f08d53b5e0c302c5830cfa7511ba0edc3f241c691a95c0d184dfb761e11a6cc2" -dependencies = [ - "bytes", - "futures-core", - "futures-util", - "http", - "mime", - "mime_guess", - "rand 0.8.5", - "thiserror 1.0.69", -] - [[package]] name = "concurrent-queue" version = "2.5.0" @@ -1871,9 +1855,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd" dependencies = [ "cfg-if", + "js-sys", "libc", "r-efi", "wasip2", + "wasm-bindgen", ] [[package]] @@ -2177,19 +2163,6 @@ dependencies = [ "want", ] -[[package]] -name = "hyper-multipart-rfc7578" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a60fb748074dd040c8d05d8a002725200fb594e0ffcfa0b83fb8f64616b50267" -dependencies = [ - "bytes", - "common-multipart-rfc7578", - "futures-core", - "http", - "hyper", -] - [[package]] name = "hyper-rustls" version = "0.27.7" @@ -2227,6 +2200,7 @@ version = "0.1.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "52e9a2a24dc5c6821e71a7030e1e14b7b632acac55c40e9d2e082c621261bb56" dependencies = [ + "base64 0.22.1", "bytes", "futures-channel", "futures-core", @@ -2234,7 +2208,9 @@ dependencies = [ "http", "http-body", "hyper", + "ipnet", "libc", + "percent-encoding", "pin-project-lite", "socket2 0.6.1", "tokio", @@ -2432,6 +2408,22 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "ipnet" +version = "2.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130" + +[[package]] +name = "iri-string" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f867b9d1d896b67beb18518eda36fdb77a32ea590de864f1325b294a6d14397" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "is-terminal" version = "0.4.17" @@ -2786,12 +2778,8 @@ dependencies = [ "chrono", "criterion", "crossbeam-utils", - "futures", "hashbrown 0.16.0", "http", - "http-body-util", - "hyper", - "hyper-multipart-rfc7578", "indexmap 2.12.0", "libdd-alloc", "libdd-common", @@ -2801,13 +2789,12 @@ dependencies = [ "parking_lot", "proptest", "prost", + "reqwest", "rustc-hash 1.1.0", "serde", "serde_json", "target-triple 0.1.4", "thiserror 2.0.17", - "tokio", - "tokio-util", "zstd", ] @@ -2819,9 +2806,7 @@ dependencies = [ "build_common", "datadog-ffe-ffi", "function_name", - "futures", - "http-body-util", - "hyper", + "http", "libc", "libdd-common", "libdd-common-ffi", @@ -2834,7 +2819,6 @@ dependencies = [ "libdd-telemetry-ffi", "serde_json", "symbolizer-ffi", - "tokio-util", ] [[package]] @@ -2951,7 +2935,6 @@ dependencies = [ "http-body-util", "httpmock", "hyper", - "hyper-http-proxy", "indexmap 2.12.0", "libdd-common", "libdd-tinybytes", @@ -3053,6 +3036,12 @@ dependencies = [ "value-bag", ] +[[package]] +name = "lru-slab" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" + [[package]] name = "lz4_flex" version = "0.9.5" @@ -4080,6 +4069,61 @@ version = "1.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" +[[package]] +name = "quinn" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e20a958963c291dc322d98411f541009df2ced7b5a4f2bd52337638cfccf20" +dependencies = [ + "bytes", + "cfg_aliases", + "pin-project-lite", + "quinn-proto", + "quinn-udp", + "rustc-hash 2.1.1", + "rustls", + "socket2 0.6.1", + "thiserror 2.0.17", + "tokio", + "tracing", + "web-time", +] + +[[package]] +name = "quinn-proto" +version = "0.11.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1906b49b0c3bc04b5fe5d86a77925ae6524a19b816ae38ce1e426255f1d8a31" +dependencies = [ + "bytes", + "getrandom 0.3.4", + "lru-slab", + "rand 0.9.2", + "ring", + "rustc-hash 2.1.1", + "rustls", + "rustls-pki-types", + "slab", + "thiserror 2.0.17", + "tinyvec", + "tracing", + "web-time", +] + +[[package]] +name = "quinn-udp" +version = "0.5.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "addec6a0dcad8a8d96a771f815f0eaf55f9d1805756410b39f5fa81332574cbd" +dependencies = [ + "cfg_aliases", + "libc", + "once_cell", + "socket2 0.6.1", + "tracing", + "windows-sys 0.60.2", +] + [[package]] name = "quote" version = "1.0.42" @@ -4250,6 +4294,47 @@ version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58" +[[package]] +name = "reqwest" +version = "0.12.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d0946410b9f7b082a427e4ef5c8ff541a88b357bc6c637c40db3a68ac70a36f" +dependencies = [ + "base64 0.22.1", + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-rustls", + "hyper-util", + "js-sys", + "log", + "mime_guess", + "percent-encoding", + "pin-project-lite", + "quinn", + "rustls", + "rustls-pki-types", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tokio-rustls", + "tower 0.5.2", + "tower-http", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "webpki-roots", +] + [[package]] name = "ring" version = "0.17.14" @@ -4369,6 +4454,7 @@ version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94182ad936a0c91c324cd46c6511b9510ed16af436d7b5bab34beab0afd55f7a" dependencies = [ + "web-time", "zeroize", ] @@ -4664,6 +4750,18 @@ dependencies = [ "serde_core", ] +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "serde_with" version = "3.15.1" @@ -5033,6 +5131,9 @@ name = "sync_wrapper" version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" +dependencies = [ + "futures-core", +] [[package]] name = "synstructure" @@ -5329,6 +5430,21 @@ dependencies = [ "serde_json", ] +[[package]] +name = "tinyvec" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa5fdc3bce6191a1dbc8c02d5c8bffcf557bafa17c124c5264a458f1b0613fa" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" + [[package]] name = "tokio" version = "1.48.0" @@ -5568,6 +5684,25 @@ dependencies = [ "futures-util", "pin-project-lite", "sync_wrapper", + "tokio", + "tower-layer", + "tower-service", +] + +[[package]] +name = "tower-http" +version = "0.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cf146f99d442e8e68e585f5d798ccd3cad9a7835b917e09728880a862706456" +dependencies = [ + "bitflags", + "bytes", + "futures-util", + "http", + "http-body", + "iri-string", + "pin-project-lite", + "tower 0.5.2", "tower-layer", "tower-service", ] @@ -5933,6 +6068,19 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.55" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "551f88106c6d5e7ccc7cd9a16f312dd3b5d36ea8b4954304657d5dfba115d4a0" +dependencies = [ + "cfg-if", + "js-sys", + "once_cell", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.105" @@ -5975,6 +6123,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webpki-roots" version = "1.0.4" diff --git a/libdd-profiling-ffi/Cargo.toml b/libdd-profiling-ffi/Cargo.toml index 41e01d1631..2861205302 100644 --- a/libdd-profiling-ffi/Cargo.toml +++ b/libdd-profiling-ffi/Cargo.toml @@ -49,11 +49,8 @@ libdd-telemetry-ffi = { path = "../libdd-telemetry-ffi", default-features = fals libdd-ddsketch-ffi = { path = "../libdd-ddsketch-ffi", default-features = false, optional = true } libdd-log-ffi = { path = "../libdd-log-ffi", default-features = false, optional = true } function_name = "0.3.0" -futures = { version = "0.3", default-features = false } -http-body-util = "0.1" -hyper = { workspace = true} +http = "1.0" libc = "0.2" serde_json = { version = "1.0" } symbolizer-ffi = { path = "../symbolizer-ffi", optional = true, default-features = false } -tokio-util = "0.7.1" datadog-ffe-ffi = { path = "../datadog-ffe-ffi", default-features = false, optional = true } diff --git a/libdd-profiling-ffi/src/exporter.rs b/libdd-profiling-ffi/src/exporter.rs index d5c0cbc295..dc5d29c4b2 100644 --- a/libdd-profiling-ffi/src/exporter.rs +++ b/libdd-profiling-ffi/src/exporter.rs @@ -14,9 +14,6 @@ use libdd_profiling::exporter; use libdd_profiling::exporter::{ProfileExporter, Request}; use libdd_profiling::internal::EncodedProfile; use std::borrow::Cow; -use std::str::FromStr; - -type TokioCancellationToken = tokio_util::sync::CancellationToken; #[allow(dead_code)] #[repr(C)] @@ -73,7 +70,7 @@ pub extern "C" fn ddog_prof_Endpoint_agentless<'a>( pub extern "C" fn endpoint_file(filename: CharSlice) -> ProfilingEndpoint { ProfilingEndpoint::File(filename) } -unsafe fn try_to_url(slice: CharSlice) -> anyhow::Result { +unsafe fn try_to_url(slice: CharSlice) -> anyhow::Result { let str: &str = slice.try_to_utf8()?; #[cfg(unix)] if let Some(path) = str.strip_prefix("unix://") { @@ -83,7 +80,7 @@ unsafe fn try_to_url(slice: CharSlice) -> anyhow::Result { if let Some(path) = str.strip_prefix("windows:") { return Ok(exporter::named_pipe_path_to_uri(path.as_ref())?); } - Ok(hyper::Uri::from_str(str)?) + Ok(str.parse()?) } pub unsafe fn try_to_endpoint( @@ -293,94 +290,19 @@ pub unsafe extern "C" fn ddog_prof_Exporter_Request_drop(mut request: *mut Handl pub unsafe extern "C" fn ddog_prof_Exporter_send( mut exporter: *mut Handle, mut request: *mut Handle, - mut cancel: *mut Handle, ) -> Result { wrap_with_ffi_result!({ let request = *request.take().context("request")?; let exporter = exporter.to_inner_mut()?; - let cancel = cancel.to_inner_mut().ok(); - let response = exporter.send(request, cancel.as_deref())?; + let response = exporter.send(request)?; anyhow::Ok(HttpStatus(response.status().as_u16())) }) } -/// Can be passed as an argument to send and then be used to asynchronously cancel it from a -/// different thread. -#[no_mangle] -#[must_use] -pub extern "C" fn ddog_CancellationToken_new() -> Handle { - TokioCancellationToken::new().into() -} - -/// A cloned TokioCancellationToken is connected to the TokioCancellationToken it was created from. -/// Either the cloned or the original token can be used to cancel or provided as arguments to send. -/// The useful part is that they have independent lifetimes and can be dropped separately. -/// -/// Thus, it's possible to do something like: -/// ```c -/// cancel_t1 = ddog_CancellationToken_new(); -/// cancel_t2 = ddog_CancellationToken_clone(cancel_t1); -/// -/// // On thread t1: -/// ddog_prof_Exporter_send(..., cancel_t1); -/// ddog_CancellationToken_drop(cancel_t1); -/// -/// // On thread t2: -/// ddog_CancellationToken_cancel(cancel_t2); -/// ddog_CancellationToken_drop(cancel_t2); -/// ``` -/// -/// Without clone, both t1 and t2 would need to synchronize to make sure neither was using the -/// cancel before it could be dropped. With clone, there is no need for such synchronization, both -/// threads have their own cancel and should drop that cancel after they are done with it. -/// -/// # Safety -/// If the `token` is non-null, it must point to a valid object. -#[no_mangle] -#[must_use] -pub unsafe extern "C" fn ddog_CancellationToken_clone( - mut token: *mut Handle, -) -> Handle { - if let Ok(token) = token.to_inner_mut() { - token.clone().into() - } else { - Handle::empty() - } -} - -/// Cancel send that is being called in another thread with the given token. -/// Note that cancellation is a terminal state; cancelling a token more than once does nothing. -/// Returns `true` if token was successfully cancelled. -#[no_mangle] -pub unsafe extern "C" fn ddog_CancellationToken_cancel( - mut cancel: *mut Handle, -) -> bool { - if let Ok(token) = cancel.to_inner_mut() { - let will_cancel = !token.is_cancelled(); - if will_cancel { - token.cancel(); - } - will_cancel - } else { - false - } -} - -/// # Safety -/// The `token` can be null, but non-null values must be created by the Rust -/// Global allocator and must have not been dropped already. -#[no_mangle] -pub unsafe extern "C" fn ddog_CancellationToken_drop( - mut token: *mut Handle, -) { - drop(token.take()) -} - #[cfg(test)] mod tests { use super::*; - use http_body_util::BodyExt; use libdd_common::tag; use libdd_common_ffi::Slice; use serde_json::json; @@ -408,50 +330,7 @@ mod tests { fn parsed_event_json(request: libdd_common_ffi::Result>) -> serde_json::Value { // Safety: This is a test let request = unsafe { request.unwrap().take().unwrap() }; - // Really hacky way of getting the event.json file contents, because I didn't want to - // implement a full multipart parser and didn't find a particularly good - // alternative. If you do figure out a better way, there's another copy of this code - // in the profiling tests, please update there too :) - let body = request.body(); - let body_bytes: String = String::from_utf8_lossy( - &futures::executor::block_on(body.collect()) - .unwrap() - .to_bytes(), - ) - .to_string(); - let event_json = body_bytes - .lines() - .skip_while(|line| !line.contains(r#"filename="event.json""#)) - .nth(2) - .unwrap(); - - serde_json::from_str(event_json).unwrap() - } - - #[test] - // This test invokes an external function SecTrustSettingsCopyCertificates - // which Miri cannot evaluate. - #[cfg_attr(miri, ignore)] - fn profile_exporter_new_and_delete() { - let tags = vec![tag!("host", "localhost")].into(); - - let result = unsafe { - ddog_prof_Exporter_new( - profiling_library_name(), - profiling_library_version(), - family(), - Some(&tags), - ddog_prof_Endpoint_agent(endpoint()), - ) - }; - - match result { - Result::Ok(mut exporter) => unsafe { ddog_prof_Exporter_drop(&mut exporter) }, - Result::Err(message) => { - drop(message); - panic!("Should not occur!") - } - } + serde_json::from_str(request.event_json()).unwrap() } #[test] @@ -773,6 +652,30 @@ mod tests { )); } + #[test] + #[cfg_attr(miri, ignore)] + fn profile_exporter_new_and_delete() { + let tags = vec![tag!("host", "localhost")].into(); + + let result = unsafe { + ddog_prof_Exporter_new( + profiling_library_name(), + profiling_library_version(), + family(), + Some(&tags), + ddog_prof_Endpoint_agent(endpoint()), + ) + }; + + match result { + Result::Ok(mut exporter) => unsafe { ddog_prof_Exporter_drop(&mut exporter) }, + Result::Err(message) => { + drop(message); + panic!("Should not occur!") + } + } + } + #[test] fn test_build_failure() { let profile = &mut EncodedProfile::test_instance().unwrap().into(); @@ -796,9 +699,8 @@ mod tests { fn send_fails_with_null() { let exporter = &mut Handle::empty(); let request = &mut Handle::empty(); - let cancel = &mut Handle::empty(); unsafe { - let error = ddog_prof_Exporter_send(exporter, request, cancel) + let error = ddog_prof_Exporter_send(exporter, request) .unwrap_err() .to_string(); assert_eq!( diff --git a/libdd-profiling/Cargo.toml b/libdd-profiling/Cargo.toml index bc8eb70928..108719cd10 100644 --- a/libdd-profiling/Cargo.toml +++ b/libdd-profiling/Cargo.toml @@ -31,24 +31,19 @@ crossbeam-utils = { version = "0.8.21" } libdd-alloc = { version = "1.0.0", path = "../libdd-alloc" } libdd-profiling-protobuf = { version = "1.0.0", path = "../libdd-profiling-protobuf", features = ["prost_impls"] } libdd-common = { version = "1.0.0", path = "../libdd-common" } -futures = { version = "0.3", default-features = false } hashbrown = { version = "0.16", default-features = false } http = "1.0" -hyper = { workspace = true} -http-body-util = "0.1" -hyper-multipart-rfc7578 = "0.9.0" indexmap = "2.11" lz4_flex = { version = "0.9", default-features = false, features = ["std", "safe-encode", "frame"] } mime = "0.3.16" parking_lot = { version = "0.12", default-features = false } prost = "0.13.5" +reqwest = { version = "0.12", default-features = false, features = ["blocking", "multipart", "rustls-tls", "json"] } rustc-hash = { version = "1.1", default-features = false } serde = {version = "1.0", features = ["derive"]} serde_json = {version = "1.0"} target-triple = "0.1.4" thiserror = "2" -tokio = {version = "1.23", features = ["rt", "macros"]} -tokio-util = "0.7.1" zstd = { version = "0.13", default-features = false } [dev-dependencies] diff --git a/libdd-profiling/src/exporter/errors.rs b/libdd-profiling/src/exporter/errors.rs deleted file mode 100644 index 063459a94a..0000000000 --- a/libdd-profiling/src/exporter/errors.rs +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ -// SPDX-License-Identifier: Apache-2.0 - -use std::error; -use std::fmt; - -#[derive(Clone, Debug, Eq, PartialEq)] -#[allow(dead_code)] -pub(crate) enum Error { - InvalidUrl, - OperationTimedOut, - UserRequestedCancellation, -} - -impl fmt::Display for Error { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str(match self { - Self::InvalidUrl => "invalid url", - Self::OperationTimedOut => "operation timed out", - Self::UserRequestedCancellation => "operation cancelled by user", - }) - } -} - -impl error::Error for Error {} diff --git a/libdd-profiling/src/exporter/mod.rs b/libdd-profiling/src/exporter/mod.rs index 70e6e2d436..91ecfbf5bf 100644 --- a/libdd-profiling/src/exporter/mod.rs +++ b/libdd-profiling/src/exporter/mod.rs @@ -2,25 +2,19 @@ // SPDX-License-Identifier: Apache-2.0 use anyhow::Context; -use bytes::Bytes; pub use chrono::{DateTime, Utc}; -pub use hyper::Uri; -use hyper_multipart_rfc7578::client::multipart; +pub use http::Uri; pub use libdd_common::tag::Tag; +use reqwest::blocking::multipart; use serde_json::json; use std::borrow::Cow; use std::fmt::Debug; use std::io::{Cursor, Write}; -use std::{future, iter}; -use tokio::runtime::Runtime; -use tokio_util::sync::CancellationToken; +use std::iter; -use libdd_common::{ - azure_app_services, connector, hyper_migration, tag, Endpoint, HttpClient, HttpResponse, -}; +use libdd_common::{azure_app_services, connector, tag, Endpoint}; pub mod config; -mod errors; #[cfg(unix)] pub use connector::uds::{socket_path_from_uri, socket_path_to_uri}; @@ -31,11 +25,8 @@ pub use connector::named_pipe::{named_pipe_path_from_uri, named_pipe_path_to_uri use crate::internal::{EncodedProfile, Profile}; use crate::profiles::{Compressor, DefaultProfileCodec}; -const DURATION_ZERO: std::time::Duration = std::time::Duration::from_millis(0); - pub struct Exporter { - client: HttpClient, - runtime: Runtime, + client: reqwest::blocking::Client, } pub struct Fields { @@ -60,65 +51,46 @@ pub struct File<'a> { #[derive(Debug)] pub struct Request { timeout: Option, - req: hyper_migration::HttpRequest, -} - -impl From for Request { - fn from(req: hyper_migration::HttpRequest) -> Self { - Self { req, timeout: None } - } + req: reqwest::blocking::RequestBuilder, + uri: http::Uri, + headers: http::HeaderMap, + event_json: String, } impl Request { - fn with_timeout(mut self, timeout: std::time::Duration) -> Self { - self.timeout = if timeout != DURATION_ZERO { - Some(timeout) - } else { - None - }; - self - } - pub fn timeout(&self) -> &Option { &self.timeout } - pub fn uri(&self) -> &hyper::Uri { - self.req.uri() + pub fn uri(&self) -> &http::Uri { + &self.uri } - pub fn headers(&self) -> &hyper::HeaderMap { - self.req.headers() + pub fn headers(&self) -> &http::HeaderMap { + &self.headers } - pub fn body(self) -> hyper_migration::Body { - self.req.into_body() + pub fn event_json(&self) -> &str { + &self.event_json } - async fn send( - self, - client: &HttpClient, - cancel: Option<&CancellationToken>, - ) -> anyhow::Result { - tokio::select! { - _ = async { match cancel { - Some(cancellation_token) => cancellation_token.cancelled().await, - // If no token is provided, future::pending() provides a no-op future that never resolves - None => future::pending().await, - }} - => Err(crate::exporter::errors::Error::UserRequestedCancellation.into()), - result = async { - match self.timeout { - Some(t) => { - let res = tokio::time::timeout(t, client.request(self.req)).await; - res - .map_err(|_| anyhow::Error::from(crate::exporter::errors::Error::OperationTimedOut)) - }, - None => Ok(client.request(self.req).await), - } + fn send(self) -> anyhow::Result>> { + let mut req = self.req; + if let Some(timeout) = self.timeout { + req = req.timeout(timeout); + } + let response = req.send()?; + let status = response.status(); + let headers = response.headers().clone(); + let bytes = response.bytes()?.to_vec(); + + let mut builder = http::Response::builder().status(status); + for (key, value) in headers { + if let Some(key) = key { + builder = builder.header(key, value); } - => Ok(hyper_migration::into_response(result??)), } + Ok(builder.body(bytes)?) } } @@ -147,7 +119,7 @@ impl ProfileExporter { V: Into>, { Ok(Self { - exporter: Exporter::new()?, + exporter: Exporter::new(&endpoint)?, endpoint, family: family.into(), profiling_library_name: profiling_library_name.into(), @@ -190,7 +162,7 @@ impl ProfileExporter { internal_metadata: Option, info: Option, ) -> anyhow::Result { - let mut form = multipart::Form::default(); + let mut form = multipart::Form::new(); // combine tags and additional_tags let mut tags_profiler = String::new(); @@ -263,13 +235,13 @@ impl ProfileExporter { }) .to_string(); - form.add_reader_file_with_mime( - // Intake does not look for filename=event.json, it looks for name=event. + let event_cursor = Cursor::new(event.clone()); + + form = form.part( "event", - // this one shouldn't be compressed - Cursor::new(event), - "event.json", - mime::APPLICATION_JSON, + multipart::Part::reader(event_cursor) + .file_name("event.json") + .mime_str(mime::APPLICATION_JSON.as_ref())?, ); for file in files_to_compress_and_export { @@ -291,34 +263,78 @@ impl ProfileExporter { .context("failed to create compressor")?; encoder.write_all(file.bytes)?; let encoded = encoder.finish()?; - /* The Datadog RFC examples strip off the file extension, but the exact behavior - * isn't specified. This does the simple thing of using the filename - * without modification for the form name because intake does not care - * about these name of the form field for these attachments. - */ - form.add_reader_file(file.name, Cursor::new(encoded), file.name); + + form = form.part( + file.name.to_owned(), + multipart::Part::reader(Cursor::new(encoded)).file_name(file.name.to_owned()), + ); } for file in files_to_export_unmodified { let encoded = file.bytes.to_vec(); - /* The Datadog RFC examples strip off the file extension, but the exact behavior - * isn't specified. This does the simple thing of using the filename - * without modification for the form name because intake does not care - * about these name of the form field for these attachments. - */ - form.add_reader_file(file.name, Cursor::new(encoded), file.name) + form = form.part( + file.name.to_owned(), + multipart::Part::reader(Cursor::new(encoded)).file_name(file.name.to_owned()), + ); } // Add the actual pprof - form.add_reader_file( - "profile.pprof", - Cursor::new(profile.buffer), + form = form.part( "profile.pprof", + multipart::Part::reader(Cursor::new(profile.buffer)).file_name("profile.pprof"), ); - let builder = self - .endpoint - .to_request_builder(concat!("DDProf/", env!("CARGO_PKG_VERSION")))? - .method(http::Method::POST) + // Build the request using reqwest + let user_agent = concat!("DDProf/", env!("CARGO_PKG_VERSION")); + + let url_string = if self.endpoint.url.scheme_str() == Some("unix") { + // Replace unix://... with http://localhost... + let path = self + .endpoint + .url + .path_and_query() + .map(|pq| pq.as_str()) + .unwrap_or(""); + format!("http://localhost{}", path) + } else { + self.endpoint.url.to_string() + }; + + // We need to parse it back to http::Uri to store it in Request, or just store String? + // The accessor returns &http::Uri. + let uri: http::Uri = url_string.parse()?; + + let mut builder = self + .exporter + .client + .request(reqwest::Method::POST, url_string); + + let mut headers = http::HeaderMap::new(); + headers.insert(reqwest::header::USER_AGENT, user_agent.parse()?); + + // Replicate logic from Endpoint::to_request_builder but for reqwest + builder = builder.header(reqwest::header::USER_AGENT, user_agent); + if let Some(api_key) = &self.endpoint.api_key { + builder = builder.header("dd-api-key", api_key.as_ref()); + headers.insert("dd-api-key", api_key.as_ref().parse()?); + } + if let Some(token) = &self.endpoint.test_token { + builder = builder.header("x-datadog-test-session-token", token.as_ref()); + headers.insert("x-datadog-test-session-token", token.as_ref().parse()?); + } + if let Some(container_id) = libdd_common::entity_id::get_container_id() { + builder = builder.header("datadog-container-id", container_id); + headers.insert("datadog-container-id", container_id.parse()?); + } + if let Some(entity_id) = libdd_common::entity_id::get_entity_id() { + builder = builder.header("datadog-entity-id", entity_id); + headers.insert("datadog-entity-id", entity_id.parse()?); + } + if let Some(external_env) = *libdd_common::entity_id::DD_EXTERNAL_ENV { + builder = builder.header("datadog-external-env", external_env); + headers.insert("datadog-external-env", external_env.parse()?); + } + + builder = builder .header("Connection", "close") .header("DD-EVP-ORIGIN", self.profiling_library_name.as_ref()) .header( @@ -326,21 +342,32 @@ impl ProfileExporter { self.profiling_library_version.as_ref(), ); - Ok(Request::from( - form.set_body::(builder)? - .map(hyper_migration::Body::boxed), - ) - .with_timeout(std::time::Duration::from_millis(self.endpoint.timeout_ms))) + headers.insert("Connection", "close".parse()?); + headers.insert( + "DD-EVP-ORIGIN", + self.profiling_library_name.as_ref().parse()?, + ); + headers.insert( + "DD-EVP-ORIGIN-VERSION", + self.profiling_library_version.as_ref().parse()?, + ); + + builder = builder.multipart(form); + + let event_json = event.clone(); + + Ok(Request { + timeout: Some(std::time::Duration::from_millis(self.endpoint.timeout_ms)), + req: builder, + uri, + headers, + event_json, + }) } - pub fn send( - &self, - request: Request, - cancel: Option<&CancellationToken>, - ) -> anyhow::Result { - self.exporter - .runtime - .block_on(request.send(&self.exporter.client, cancel)) + pub fn send(&self, request: Request) -> anyhow::Result>> { + // We ignore cancellation in blocking client for now + request.send() } pub fn set_timeout(&mut self, timeout_ms: u64) { @@ -349,35 +376,23 @@ impl ProfileExporter { } impl Exporter { - /// Creates a new Exporter, initializing the TLS stack. - pub fn new() -> anyhow::Result { - // Set idle to 0, which prevents the pipe being broken every 2nd request - let client = hyper_migration::new_client_periodic(); - let runtime = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build()?; - Ok(Self { client, runtime }) - } + /// Creates a new Exporter. + pub fn new(endpoint: &Endpoint) -> anyhow::Result { + let mut builder = reqwest::blocking::Client::builder(); + + // Check for UDS or Named Pipe + if let Some(scheme) = endpoint.url.scheme() { + if scheme.as_str() == "unix" { + #[cfg(unix)] + { + let path = libdd_common::decode_uri_path_in_authority(&endpoint.url)?; + builder = builder.unix_socket(path); + } + } + } - pub fn send( - &self, - http_method: http::Method, - url: &str, - mut headers: hyper::header::HeaderMap, - body: &[u8], - timeout: std::time::Duration, - ) -> anyhow::Result { - self.runtime.block_on(async { - let mut request = hyper::Request::builder() - .method(http_method) - .uri(url) - .body(hyper_migration::Body::from_bytes(Bytes::copy_from_slice( - body, - )))?; - std::mem::swap(request.headers_mut(), &mut headers); - - let request: Request = request.into(); - request.with_timeout(timeout).send(&self.client, None).await + Ok(Self { + client: builder.build()?, }) } } diff --git a/libdd-profiling/tests/form.rs b/libdd-profiling/tests/form.rs index e13ba371f5..1852c5a01e 100644 --- a/libdd-profiling/tests/form.rs +++ b/libdd-profiling/tests/form.rs @@ -36,7 +36,6 @@ fn multipart( #[cfg(test)] mod tests { use crate::multipart; - use http_body_util::BodyExt; use libdd_common::tag; use libdd_profiling::exporter::*; use serde_json::json; @@ -46,24 +45,7 @@ mod tests { } fn parsed_event_json(request: Request) -> serde_json::Value { - // Really hacky way of getting the event.json file contents, because I didn't want to - // implement a full multipart parser and didn't find a particularly good - // alternative. If you do figure out a better way, there's another copy of this code - // in the profiling-ffi tests, please update there too :) - let body = request.body(); - let body_bytes: String = String::from_utf8_lossy( - &futures::executor::block_on(body.collect()) - .unwrap() - .to_bytes(), - ) - .to_string(); - let event_json = body_bytes - .lines() - .skip_while(|line| !line.contains(r#"filename="event.json""#)) - .nth(2) - .unwrap(); - - serde_json::from_str(event_json).unwrap() + serde_json::from_str(request.event_json()).unwrap() } #[test]