From dabe035ac48066f2e78d4c95e3546c3e8d9c9b45 Mon Sep 17 00:00:00 2001 From: Aaron Todd Date: Fri, 20 Dec 2024 17:14:18 +0000 Subject: [PATCH 1/3] use official protobuf types for serde --- runners/s3-benchrunner-rust/Cargo.lock | 291 +++++++++++++++--- runners/s3-benchrunner-rust/Cargo.toml | 10 +- runners/s3-benchrunner-rust/src/telemetry.rs | 1 - .../src/telemetry/common.rs | 275 ----------------- .../src/telemetry/trace/exporter.rs | 20 +- .../src/telemetry/trace/mod.rs | 2 - .../src/telemetry/trace/transform.rs | 241 --------------- 7 files changed, 275 insertions(+), 565 deletions(-) delete mode 100644 runners/s3-benchrunner-rust/src/telemetry/common.rs delete mode 100644 runners/s3-benchrunner-rust/src/telemetry/trace/transform.rs diff --git a/runners/s3-benchrunner-rust/Cargo.lock b/runners/s3-benchrunner-rust/Cargo.lock index a8e36896..101423e3 100644 --- a/runners/s3-benchrunner-rust/Cargo.lock +++ b/runners/s3-benchrunner-rust/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -114,6 +114,28 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-task" version = "4.7.1" @@ -145,9 +167,9 @@ checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" [[package]] name = "aws-config" -version = "1.5.10" +version = "1.5.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b49afaa341e8dd8577e1a2200468f98956d6eda50bcf4a53246cc00174ba924" +checksum = "a5d1c2c88936a73c699225d0bc00684a534166b0cebc2659c3cdf08de8edc64c" dependencies = [ "aws-credential-types", "aws-runtime", @@ -213,9 +235,9 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.4.3" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a10d5c055aa540164d9561a0e2e74ad30f0dcf7393c3a92f6733ddf9c5762468" +checksum = "300a12520b4e6d08b73f77680f12c16e8ae43250d55100e0b2be46d78da16a48" dependencies = [ "aws-credential-types", "aws-sigv4", @@ -240,11 +262,11 @@ dependencies = [ [[package]] name = "aws-s3-transfer-manager" version = "0.1.0" -source = "git+https://github.com/awslabs/aws-s3-transfer-manager-rs.git?rev=06c087a5d53676bb048f6c512b8eb1fda63f03d5#06c087a5d53676bb048f6c512b8eb1fda63f03d5" dependencies = [ "async-channel", "async-trait", "aws-config", + "aws-runtime", "aws-sdk-s3", "aws-smithy-async", "aws-smithy-experimental", @@ -258,16 +280,16 @@ dependencies = [ "path-clean", "pin-project-lite", "tokio", - "tower 0.5.1", + "tower 0.5.2", "tracing", "walkdir", ] [[package]] name = "aws-sdk-s3" -version = "1.64.0" +version = "1.66.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35fe5e7f71b1cc6274e905d3bcc7daf94099ac2d4cba83447ffb959b5b27b3c1" +checksum = "154488d16ab0d627d15ab2832b57e68a16684c8c902f14cb8a75ec933fc94852" dependencies = [ "aws-credential-types", "aws-runtime", @@ -299,9 +321,9 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.49.0" +version = "1.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09677244a9da92172c8dc60109b4a9658597d4d298b188dd0018b6a66b410ca4" +checksum = "74995133da38f109a0eb8e8c886f9e80c713b6e9f2e6e5a6a1ba4450ce2ffc46" dependencies = [ "aws-credential-types", "aws-runtime", @@ -321,9 +343,9 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.50.0" +version = "1.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81fea2f3a8bb3bd10932ae7ad59cc59f65f270fc9183a7e91f501dc5efbef7ee" +checksum = "e7062a779685cbf3b2401eb36151e2c6589fd5f3569b8a6bc2d199e5aaa1d059" dependencies = [ "aws-credential-types", "aws-runtime", @@ -343,9 +365,9 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.50.0" +version = "1.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ada54e5f26ac246dc79727def52f7f8ed38915cb47781e2a72213957dc3a7d5" +checksum = "299dae7b1dc0ee50434453fa5a229dc4b22bd3ee50409ff16becf1f7346e0193" dependencies = [ "aws-credential-types", "aws-runtime", @@ -366,9 +388,9 @@ dependencies = [ [[package]] name = "aws-sigv4" -version = "1.2.5" +version = "1.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5619742a0d8f253be760bfbb8e8e8368c69e3587e4637af5754e488a611499b1" +checksum = "7d3820e0c08d0737872ff3c7c1f21ebbb6693d832312d6152bf18ef50a5471c2" dependencies = [ "aws-credential-types", "aws-smithy-eventstream", @@ -395,9 +417,9 @@ dependencies = [ [[package]] name = "aws-smithy-async" -version = "1.2.1" +version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62220bc6e97f946ddd51b5f1361f78996e704677afc518a4ff66b7a72ea1378c" +checksum = "8aa8ff1492fd9fb99ae28e8467af0dbbb7c31512b16fabf1a0f10d7bb6ef78bb" dependencies = [ "futures-util", "pin-project-lite", @@ -482,9 +504,9 @@ dependencies = [ [[package]] name = "aws-smithy-json" -version = "0.60.7" +version = "0.61.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4683df9469ef09468dad3473d129960119a0d3593617542b7d52086c8486f2d6" +checksum = "ee4e69cc50921eb913c6b662f8d909131bb3e6ad6cb6090d3a39b66fc5c52095" dependencies = [ "aws-smithy-types", ] @@ -501,9 +523,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.7.3" +version = "1.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be28bd063fa91fd871d131fc8b68d7cd4c5fa0869bea68daca50dcb1cbd76be2" +checksum = "431a10d0e07e09091284ef04453dae4069283aa108d209974d67e77ae1caa658" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -545,9 +567,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.2.9" +version = "1.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fbd94a32b3a7d55d3806fe27d98d3ad393050439dd05eb53ece36ec5e3d3510" +checksum = "8ecbf4d5dfb169812e2b240a4350f15ad3c6b03a54074e5712818801615f2dc5" dependencies = [ "base64-simd", "bytes", @@ -592,6 +614,53 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" +dependencies = [ + "async-trait", + "axum-core", + "bytes", + "futures-util", + "http 1.2.0", + "http-body 1.0.1", + "http-body-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper", + "tower 0.5.2", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.2.0", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.74" @@ -619,6 +688,12 @@ version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + [[package]] name = "base64-simd" version = "0.8.0" @@ -1227,7 +1302,7 @@ dependencies = [ "futures-sink", "futures-util", "http 0.2.12", - "indexmap", + "indexmap 2.7.0", "slab", "tokio", "tokio-util", @@ -1246,13 +1321,19 @@ dependencies = [ "futures-core", "futures-sink", "http 1.2.0", - "indexmap", + "indexmap 2.7.0", "slab", "tokio", "tokio-util", "tracing", ] +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" + [[package]] name = "hashbrown" version = "0.15.2" @@ -1409,6 +1490,7 @@ dependencies = [ "http 1.2.0", "http-body 1.0.1", "httparse", + "httpdate", "itoa", "pin-project-lite", "smallvec", @@ -1450,6 +1532,19 @@ dependencies = [ "tower-service", ] +[[package]] +name = "hyper-timeout" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" +dependencies = [ + "hyper 1.5.1", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.10" @@ -1631,6 +1726,16 @@ dependencies = [ "icu_properties", ] +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown 0.12.3", +] + [[package]] name = "indexmap" version = "2.7.0" @@ -1638,7 +1743,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62f822373a4fe84d4bb149bf54e584a7f4abec90e072ed49cda0edea5b95471f" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.15.2", ] [[package]] @@ -1733,7 +1838,7 @@ version = "0.12.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38" dependencies = [ - "hashbrown", + "hashbrown 0.15.2", ] [[package]] @@ -1745,6 +1850,12 @@ dependencies = [ "regex-automata 0.1.10", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "md-5" version = "0.10.6" @@ -1761,6 +1872,12 @@ version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -1866,6 +1983,20 @@ dependencies = [ "thiserror", ] +[[package]] +name = "opentelemetry-proto" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9d3968ce3aefdcca5c27e3c4ea4391b37547726a70893aab52d3de95d5f8b34" +dependencies = [ + "hex", + "opentelemetry", + "opentelemetry_sdk", + "prost", + "serde", + "tonic", +] + [[package]] name = "opentelemetry-semantic-conventions" version = "0.26.0" @@ -1966,6 +2097,26 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "pin-project" +version = "1.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be57f64e946e500c8ee36ef6331845d40a93055567ec57e8fae13efd33759b95" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c0f5fad0874fc7abcd4d750e76917eaebbecaa2c20bde22e1dbeeba8beb758c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.15" @@ -2033,6 +2184,29 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c0fef6c4230e4ccf618a35c59d7ede15dea37de8427500f50aff708806e42ec" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "157c5a9d7ea5c2ed2d9fb8f495b64759f7816c7eaea54ba3978f0d63000162e3" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "quote" version = "1.0.37" @@ -2238,7 +2412,7 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" dependencies = [ - "base64", + "base64 0.21.7", ] [[package]] @@ -2269,6 +2443,12 @@ dependencies = [ "untrusted", ] +[[package]] +name = "rustversion" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e819f2bc632f285be6d7cd36e25940d45b2391dd6d9b939e79de557f7014248" + [[package]] name = "ryu" version = "1.0.18" @@ -2283,13 +2463,13 @@ dependencies = [ "async-trait", "aws-config", "aws-s3-transfer-manager", - "aws-sdk-s3", "bytes", "chrono", "clap", "fastrand", "futures-util", "opentelemetry", + "opentelemetry-proto", "opentelemetry-semantic-conventions", "opentelemetry-stdout", "opentelemetry_sdk", @@ -2547,9 +2727,9 @@ dependencies = [ [[package]] name = "sync_wrapper" -version = "0.1.2" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" [[package]] name = "synstructure" @@ -2705,12 +2885,51 @@ dependencies = [ "tokio", ] +[[package]] +name = "tonic" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.22.1", + "bytes", + "h2 0.4.7", + "http 1.2.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.5.1", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "prost", + "socket2", + "tokio", + "tokio-stream", + "tower 0.4.13", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ + "futures-core", + "futures-util", + "indexmap 1.9.3", + "pin-project", + "pin-project-lite", + "rand", + "slab", + "tokio", + "tokio-util", "tower-layer", "tower-service", "tracing", @@ -2718,9 +2937,9 @@ dependencies = [ [[package]] name = "tower" -version = "0.5.1" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2873938d487c3cfb9aed7546dc9f2711d867c9f90c46b889989a2cb84eba6b4f" +checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" dependencies = [ "futures-core", "futures-util", @@ -3033,7 +3252,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] diff --git a/runners/s3-benchrunner-rust/Cargo.toml b/runners/s3-benchrunner-rust/Cargo.toml index f4e70b34..b4cf3f70 100644 --- a/runners/s3-benchrunner-rust/Cargo.toml +++ b/runners/s3-benchrunner-rust/Cargo.toml @@ -6,8 +6,8 @@ edition = "2021" [dependencies] # Swap which line is commented-out to use GitHub or local aws-s3-transfer-manager -aws-s3-transfer-manager = { git = "https://github.com/awslabs/aws-s3-transfer-manager-rs.git", rev = "06c087a5d53676bb048f6c512b8eb1fda63f03d5" } -# aws-s3-transfer-manager = { path = "../../../aws-s3-transfer-manager-rs/aws-s3-transfer-manager" } +#aws-s3-transfer-manager = { git = "https://github.com/awslabs/aws-s3-transfer-manager-rs.git", rev = "06c087a5d53676bb048f6c512b8eb1fda63f03d5" } +aws-s3-transfer-manager = { path = "../../../aws-s3-transfer-manager-rs/aws-s3-transfer-manager" } tracing-opentelemetry = "0.27" opentelemetry = { version = "0.26", features = ["trace"] } @@ -15,13 +15,13 @@ opentelemetry_sdk = { version = "0.26", default-features = false, features = [ "trace", "rt-tokio", ] } +opentelemetry-proto = "0.26" opentelemetry-stdout = { version = "0.26", features = ["trace"] } opentelemetry-semantic-conventions = "0.26" anyhow = "1.0.86" async-trait = "0.1.81" -aws-config = "1.5.4" -aws-sdk-s3 = "1.41.0" +aws-config = "1.5.11" bytes = "1" chrono = "0.4.38" clap = { version = "4.5.9", features = ["derive"] } @@ -31,6 +31,6 @@ ordered-float = "4.3.0" serde = { version = "1.0.204", features = ["derive"] } serde_json = "1.0.120" thiserror = "1.0.62" -tokio = { version = "1.40.0", features = ["io-util"] } +tokio = { version = "1.42.0", features = ["io-util"] } tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } diff --git a/runners/s3-benchrunner-rust/src/telemetry.rs b/runners/s3-benchrunner-rust/src/telemetry.rs index b37155c0..4429dd0c 100644 --- a/runners/s3-benchrunner-rust/src/telemetry.rs +++ b/runners/s3-benchrunner-rust/src/telemetry.rs @@ -12,7 +12,6 @@ use std::env; use crate::Result; -pub mod common; pub mod trace; // Create OTEL Resource (the entity that produces telemetry) diff --git a/runners/s3-benchrunner-rust/src/telemetry/common.rs b/runners/s3-benchrunner-rust/src/telemetry/common.rs deleted file mode 100644 index bfa86f79..00000000 --- a/runners/s3-benchrunner-rust/src/telemetry/common.rs +++ /dev/null @@ -1,275 +0,0 @@ -//! This file is for mapping from `opentelemetry_sdk` structs, to serde-serializable -//! structs that match the OpenTelemetry Protocol (OTLP) format. -//! See: https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/common/v1/common.proto -//! -//! Code adapted from: https://github.com/open-telemetry/opentelemetry-rust/blob/3193320fa6dc17e89a7bed6090000aef781ac29c/opentelemetry-stdout/src/common.rs - -use std::{ - borrow::Cow, - collections::BTreeMap, - hash::{Hash, Hasher}, - time::{SystemTime, UNIX_EPOCH}, -}; - -use chrono::{LocalResult, TimeZone, Utc}; -use ordered_float::OrderedFloat; -use serde::{Serialize, Serializer}; - -#[derive(Debug, Serialize, Clone, Hash, Eq, PartialEq)] -pub(crate) struct AttributeSet(pub BTreeMap); - -impl From<&opentelemetry_sdk::Resource> for AttributeSet { - fn from(value: &opentelemetry_sdk::Resource) -> Self { - AttributeSet( - value - .iter() - .map(|(key, value)| (Key::from(key.clone()), Value::from(value.clone()))) - .collect(), - ) - } -} - -#[derive(Debug, Clone, Serialize)] -#[serde(rename_all = "camelCase")] -pub(crate) struct Resource { - attributes: Vec, - #[serde(skip_serializing_if = "is_zero")] - dropped_attributes_count: u64, -} - -fn is_zero(v: &u64) -> bool { - *v == 0 -} - -impl From<&opentelemetry_sdk::Resource> for Resource { - fn from(value: &opentelemetry_sdk::Resource) -> Self { - Resource { - attributes: value - .iter() - .map(|(key, value)| KeyValue { - key: key.clone().into(), - value: value.clone().into(), - }) - .collect(), - dropped_attributes_count: 0, - } - } -} - -#[derive(Debug, Clone, Serialize, Hash, PartialEq, Eq, Ord, PartialOrd)] -pub(crate) struct Key(Cow<'static, str>); - -impl From> for Key { - fn from(value: Cow<'static, str>) -> Self { - Key(value) - } -} - -impl From for Key { - fn from(value: opentelemetry::Key) -> Self { - Key(value.as_str().to_string().into()) - } -} - -#[derive(Debug, Serialize, Clone)] -#[allow(dead_code, clippy::enum_variant_names)] // we want to emphasize the *Values are collection -pub(crate) enum Value { - #[serde(rename = "boolValue")] - Bool(bool), - #[serde(rename = "intValue")] - Int(i64), - #[serde(rename = "doubleValue")] - Double(f64), - #[serde(rename = "stringValue")] - String(String), - #[serde(rename = "arrayValue")] - Array(Vec), - #[serde(rename = "kvListValue")] - KeyValues(Vec), - #[serde(rename = "bytesValue")] - BytesValue(Vec), -} - -impl PartialEq for Value { - fn eq(&self, other: &Self) -> bool { - match (self, other) { - (Value::Bool(b), Value::Bool(ob)) => b.eq(ob), - (Value::Int(i), Value::Int(oi)) => i.eq(oi), - (Value::Double(f), Value::Double(of)) => OrderedFloat(*f).eq(&OrderedFloat(*of)), - (Value::String(s), Value::String(os)) => s.eq(os), - (Value::Array(a), Value::Array(oa)) => a.eq(oa), - (Value::KeyValues(kv), Value::KeyValues(okv)) => kv.eq(okv), - (Value::BytesValue(b), Value::BytesValue(ob)) => b.eq(ob), - (Value::Bool(_), _) => false, - (Value::Int(_), _) => false, - (Value::Double(_), _) => false, - (Value::String(_), _) => false, - (Value::Array(_), _) => false, - (Value::KeyValues(_), _) => false, - (Value::BytesValue(_), _) => false, - } - } -} - -impl Eq for Value {} - -impl Hash for Value { - fn hash(&self, state: &mut H) { - match &self { - Value::Bool(b) => b.hash(state), - Value::Int(i) => i.hash(state), - Value::Double(f) => OrderedFloat(*f).hash(state), - Value::String(s) => s.hash(state), - Value::Array(a) => a.iter().for_each(|v| v.hash(state)), - Value::KeyValues(kv) => kv.iter().for_each(|kv| { - kv.key.hash(state); - kv.value.hash(state); - }), - Value::BytesValue(b) => b.hash(state), - } - } -} - -impl From for Value { - fn from(value: opentelemetry::Value) -> Self { - match value { - opentelemetry::Value::Bool(b) => Value::Bool(b), - opentelemetry::Value::I64(i) => Value::Int(i), - opentelemetry::Value::F64(f) => Value::Double(f), - opentelemetry::Value::String(s) => Value::String(s.into()), - opentelemetry::Value::Array(a) => match a { - opentelemetry::Array::Bool(b) => { - Value::Array(b.into_iter().map(Value::Bool).collect()) - } - opentelemetry::Array::I64(i) => { - Value::Array(i.into_iter().map(Value::Int).collect()) - } - opentelemetry::Array::F64(f) => { - Value::Array(f.into_iter().map(Value::Double).collect()) - } - opentelemetry::Array::String(s) => { - Value::Array(s.into_iter().map(|s| Value::String(s.into())).collect()) - } - }, - } - } -} - -#[derive(Debug, Serialize, PartialEq, Clone)] -#[serde(rename_all = "camelCase")] -pub(crate) struct KeyValue { - key: Key, - value: Value, -} - -impl From for KeyValue { - fn from(value: opentelemetry::KeyValue) -> Self { - KeyValue { - key: value.key.into(), - value: value.value.into(), - } - } -} - -impl From<&opentelemetry::KeyValue> for KeyValue { - fn from(value: &opentelemetry::KeyValue) -> Self { - KeyValue { - key: value.key.clone().into(), - value: value.value.clone().into(), - } - } -} - -impl From<(opentelemetry::Key, opentelemetry::Value)> for KeyValue { - fn from((key, value): (opentelemetry::Key, opentelemetry::Value)) -> Self { - KeyValue { - key: key.into(), - value: value.into(), - } - } -} - -#[derive(Debug, Clone, Serialize, PartialEq)] -#[serde(rename_all = "camelCase")] -pub(crate) struct Scope { - #[serde(skip_serializing_if = "str::is_empty")] - name: Cow<'static, str>, - #[serde(skip_serializing_if = "Option::is_none")] - version: Option>, - #[serde(skip_serializing_if = "Vec::is_empty")] - attributes: Vec, - #[serde(skip_serializing_if = "is_zero")] - dropped_attributes_count: u64, -} - -impl From for Scope { - fn from(value: opentelemetry_sdk::Scope) -> Self { - Scope { - name: value.name, - version: value.version, - attributes: value.attributes.into_iter().map(Into::into).collect(), - dropped_attributes_count: 0, - } - } -} - -pub(crate) fn as_human_readable(time: &SystemTime, serializer: S) -> Result -where - S: Serializer, -{ - let duration_since_epoch = time.duration_since(UNIX_EPOCH).unwrap_or_default(); - - match Utc.timestamp_opt( - duration_since_epoch.as_secs() as i64, - duration_since_epoch.subsec_nanos(), - ) { - LocalResult::Single(datetime) => serializer.serialize_str( - datetime - .format("%Y-%m-%d %H:%M:%S.%3f") - .to_string() - .as_ref(), - ), - _ => Err(serde::ser::Error::custom("Invalid Timestamp.")), - } -} - -#[allow(dead_code)] -// Used for serde serialization. Not used in traces. -pub(crate) fn as_opt_human_readable( - time: &Option, - serializer: S, -) -> Result -where - S: Serializer, -{ - match time { - None => serializer.serialize_none(), - Some(time) => as_human_readable(time, serializer), - } -} - -pub(crate) fn as_unix_nano(time: &SystemTime, serializer: S) -> Result -where - S: Serializer, -{ - let nanos = time - .duration_since(UNIX_EPOCH) - .unwrap_or_default() - .as_nanos(); - - serializer.serialize_u128(nanos) -} - -#[allow(dead_code)] -pub(crate) fn as_opt_unix_nano( - time: &Option, - serializer: S, -) -> Result -where - S: Serializer, -{ - match time { - None => serializer.serialize_none(), - Some(time) => as_unix_nano(time, serializer), - } -} diff --git a/runners/s3-benchrunner-rust/src/telemetry/trace/exporter.rs b/runners/s3-benchrunner-rust/src/telemetry/trace/exporter.rs index e5cdd583..291be9b9 100644 --- a/runners/s3-benchrunner-rust/src/telemetry/trace/exporter.rs +++ b/runners/s3-benchrunner-rust/src/telemetry/trace/exporter.rs @@ -10,7 +10,10 @@ use std::{ sync::{Arc, Mutex}, }; -use crate::telemetry::trace::transform::SpanData; +use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema; +use opentelemetry_proto::{ + tonic::trace::v1::TracesData, transform::trace::tonic::group_spans_by_resource_and_scope, +}; use opentelemetry_sdk::resource::Resource; /// Magic number based on: In Oct 2024, downloading 1 30GiB file generated 11,000+ batches per run. @@ -41,7 +44,7 @@ impl opentelemetry_sdk::export::trace::SpanExporter for SpanExporter { // Queue batch, along with the current resource let batch = SdkSpanDataBatch { resource: self.resource.clone(), - batch: batch, + batch, }; self.queued_batches.lock().unwrap().push(batch); @@ -78,14 +81,21 @@ impl SpanExporter { new_queue }; - // Transform sdk spans into serde spans - let span_data = SpanData::new(queued_batches); + let resource_spans: Vec<_> = queued_batches + .into_iter() + .flat_map(|batch| { + let resource = ResourceAttributesWithSchema::from(&batch.resource); + group_spans_by_resource_and_scope(batch.batch, &resource) + }) + .collect(); + + let trace_data = TracesData { resource_spans }; // Write to file let file = File::create_new(path).with_context(|| format!("Failed opening trace file: {path}"))?; let writer = BufWriter::new(file); - serde_json::to_writer(writer, &span_data) + serde_json::to_writer(writer, &trace_data) .with_context(|| format!("Failed writing json to: {path}")) } } diff --git a/runners/s3-benchrunner-rust/src/telemetry/trace/mod.rs b/runners/s3-benchrunner-rust/src/telemetry/trace/mod.rs index 418c5c29..cf224ca5 100644 --- a/runners/s3-benchrunner-rust/src/telemetry/trace/mod.rs +++ b/runners/s3-benchrunner-rust/src/telemetry/trace/mod.rs @@ -1,5 +1,3 @@ mod exporter; -mod transform; pub use exporter::*; -pub use transform::*; diff --git a/runners/s3-benchrunner-rust/src/telemetry/trace/transform.rs b/runners/s3-benchrunner-rust/src/telemetry/trace/transform.rs deleted file mode 100644 index a7e57905..00000000 --- a/runners/s3-benchrunner-rust/src/telemetry/trace/transform.rs +++ /dev/null @@ -1,241 +0,0 @@ -//! This file is for mapping from `opentelemetry_sdk` structs, to serde-serializable -//! structs that match the OpenTelemetry Protocol (OTLP) format. -//! See: https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto -//! -//! Code adapted from: https://github.com/open-telemetry/opentelemetry-rust/blob/3193320fa6dc17e89a7bed6090000aef781ac29c/opentelemetry-stdout/src/trace/exporter.rs - -use crate::telemetry::common::{ - as_human_readable, as_unix_nano, AttributeSet, KeyValue, Resource, Scope, -}; -use serde::{Serialize, Serializer}; -use std::{borrow::Cow, collections::HashMap, time::SystemTime}; - -use super::SdkSpanDataBatch; - -/// Transformed trace data that can be serialized -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct SpanData { - resource_spans: Vec, -} - -impl SpanData { - pub(crate) fn new(sdk_span_batches: Vec) -> Self { - let mut resource_spans = HashMap::::new(); - for sdk_span_batch in sdk_span_batches { - let sdk_spans = sdk_span_batch.batch; - let sdk_resource = &sdk_span_batch.resource; - - for sdk_span in sdk_spans { - let scope = sdk_span.instrumentation_lib.clone().into(); - - let rs = resource_spans - .entry(sdk_resource.into()) - .or_insert_with(move || ResourceSpans { - resource: sdk_resource.into(), - scope_spans: Vec::with_capacity(1), - schema_url: sdk_resource.schema_url().map(|s| s.to_string().into()), - }); - - match rs.scope_spans.iter_mut().find(|ss| ss.scope == scope) { - Some(ss) => ss.spans.push(sdk_span.into()), - None => rs.scope_spans.push(ScopeSpans { - scope, - schema_url: sdk_span.instrumentation_lib.schema_url.clone(), - spans: vec![sdk_span.into()], - }), - }; - } - } - - SpanData { - resource_spans: resource_spans.into_values().collect(), - } - } -} - -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -struct ResourceSpans { - resource: Resource, - scope_spans: Vec, - #[serde(skip_serializing_if = "Option::is_none")] - schema_url: Option>, -} - -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -struct ScopeSpans { - scope: Scope, - spans: Vec, - #[serde(skip_serializing_if = "Option::is_none")] - schema_url: Option>, -} - -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -struct Span { - trace_id: String, - span_id: String, - #[serde(skip_serializing_if = "Option::is_none")] - trace_state: Option, - parent_span_id: String, - name: Cow<'static, str>, - kind: SpanKind, - #[serde(serialize_with = "as_unix_nano")] - start_time_unix_nano: SystemTime, - #[serde(serialize_with = "as_human_readable")] - start_time: SystemTime, - #[serde(serialize_with = "as_unix_nano")] - end_time_unix_nano: SystemTime, - #[serde(serialize_with = "as_human_readable")] - end_time: SystemTime, - attributes: Vec, - dropped_attributes_count: u32, - #[serde(skip_serializing_if = "Vec::is_empty")] - events: Vec, - dropped_events_count: u32, - flags: u32, - #[serde(skip_serializing_if = "Vec::is_empty")] - links: Vec, - dropped_links_count: u32, - status: Status, -} - -impl From for Span { - fn from(value: opentelemetry_sdk::export::trace::SpanData) -> Self { - Span { - trace_id: value.span_context.trace_id().to_string(), - span_id: value.span_context.span_id().to_string(), - trace_state: Some(value.span_context.trace_state().header()).filter(|s| !s.is_empty()), - parent_span_id: Some(value.parent_span_id.to_string()) - .filter(|s| s != "0") - .unwrap_or_default(), - name: value.name, - kind: value.span_kind.into(), - start_time_unix_nano: value.start_time, - start_time: value.start_time, - end_time_unix_nano: value.end_time, - end_time: value.end_time, - dropped_attributes_count: value.dropped_attributes_count, - attributes: value.attributes.into_iter().map(Into::into).collect(), - dropped_events_count: value.events.dropped_count, - flags: value.span_context.trace_flags().to_u8() as u32, - events: value.events.into_iter().map(Into::into).collect(), - dropped_links_count: value.links.dropped_count, - links: value.links.iter().map(Into::into).collect(), - status: value.status.into(), - } - } -} - -#[derive(Debug, Clone, Copy)] -enum SpanKind { - #[allow(dead_code)] - Unspecified = 0, - Internal = 1, - Server = 2, - Client = 3, - Producer = 4, - Consumer = 5, -} - -impl Serialize for SpanKind { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - serializer.serialize_u8(*self as u32 as u8) - } -} - -impl From for SpanKind { - fn from(value: opentelemetry::trace::SpanKind) -> Self { - match value { - opentelemetry::trace::SpanKind::Client => SpanKind::Client, - opentelemetry::trace::SpanKind::Server => SpanKind::Server, - opentelemetry::trace::SpanKind::Producer => SpanKind::Producer, - opentelemetry::trace::SpanKind::Consumer => SpanKind::Consumer, - opentelemetry::trace::SpanKind::Internal => SpanKind::Internal, - } - } -} - -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -struct Event { - name: Cow<'static, str>, - attributes: Vec, - dropped_attributes_count: u32, - #[serde(serialize_with = "as_unix_nano")] - time_unix_nano: SystemTime, - #[serde(serialize_with = "as_human_readable")] - time: SystemTime, -} - -impl From for Event { - fn from(value: opentelemetry::trace::Event) -> Self { - Event { - name: value.name, - attributes: value.attributes.into_iter().map(Into::into).collect(), - dropped_attributes_count: value.dropped_attributes_count, - time_unix_nano: value.timestamp, - time: value.timestamp, - } - } -} - -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -struct Link { - trace_id: String, - span_id: String, - #[serde(skip_serializing_if = "Option::is_none")] - trace_state: Option, - attributes: Vec, - dropped_attributes_count: u32, -} - -impl From<&opentelemetry::trace::Link> for Link { - fn from(value: &opentelemetry::trace::Link) -> Self { - Link { - trace_id: value.span_context.trace_id().to_string(), - span_id: value.span_context.span_id().to_string(), - trace_state: Some(value.span_context.trace_state().header()).filter(|s| !s.is_empty()), - attributes: value.attributes.iter().map(Into::into).collect(), - dropped_attributes_count: value.dropped_attributes_count, - } - } -} - -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -struct Status { - #[serde(skip_serializing_if = "Option::is_none")] - message: Option>, - #[serde(skip_serializing_if = "is_zero")] - code: u32, -} - -fn is_zero(v: &u32) -> bool { - *v == 0 -} - -impl From for Status { - fn from(value: opentelemetry::trace::Status) -> Self { - match value { - opentelemetry::trace::Status::Unset => Status { - message: None, - code: 0, - }, - opentelemetry::trace::Status::Error { description } => Status { - message: Some(description), - code: 2, - }, - opentelemetry::trace::Status::Ok => Status { - message: None, - code: 1, - }, - } - } -} From 81b6cfce2a0f391d2a0bf50a79951958bc43dbca Mon Sep 17 00:00:00 2001 From: Aaron J Todd Date: Fri, 20 Dec 2024 14:24:12 -0500 Subject: [PATCH 2/3] add otlp upload to runner --- runners/s3-benchrunner-rust/Cargo.lock | 34 +++++-- runners/s3-benchrunner-rust/Cargo.toml | 8 +- runners/s3-benchrunner-rust/src/main.rs | 121 ++++++++++++++++++++---- 3 files changed, 135 insertions(+), 28 deletions(-) diff --git a/runners/s3-benchrunner-rust/Cargo.lock b/runners/s3-benchrunner-rust/Cargo.lock index 101423e3..d9c4ef5f 100644 --- a/runners/s3-benchrunner-rust/Cargo.lock +++ b/runners/s3-benchrunner-rust/Cargo.lock @@ -262,11 +262,11 @@ dependencies = [ [[package]] name = "aws-s3-transfer-manager" version = "0.1.0" +source = "git+https://github.com/awslabs/aws-s3-transfer-manager-rs.git?rev=06c087a5d53676bb048f6c512b8eb1fda63f03d5#06c087a5d53676bb048f6c512b8eb1fda63f03d5" dependencies = [ "async-channel", "async-trait", "aws-config", - "aws-runtime", "aws-sdk-s3", "aws-smithy-async", "aws-smithy-experimental", @@ -842,9 +842,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.21" +version = "4.5.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb3b4b9e5a7c7514dfa52869339ee98b3156b0bfb4e8a77c4ff4babb64b1604f" +checksum = "3135e7ec2ef7b10c6ed8950f0f792ed96ee093fa088608f1c76e569722700c84" dependencies = [ "clap_builder", "clap_derive", @@ -852,9 +852,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.21" +version = "4.5.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b17a95aa67cc7b5ebd32aa5370189aa0d79069ef1c64ce893bd30fb24bff20ec" +checksum = "30582fc632330df2bd26877bde0c1f4470d57c582bbc070376afcd04d8cb4838" dependencies = [ "anstream", "anstyle", @@ -876,9 +876,9 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.7.3" +version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "afb84c814227b90d6895e01398aee0d8033c00e7466aca416fb6a8e0eb19d8a7" +checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6" [[package]] name = "cmake" @@ -1983,6 +1983,24 @@ dependencies = [ "thiserror", ] +[[package]] +name = "opentelemetry-otlp" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29e1f9c8b032d4f635c730c0efcf731d5e2530ea13fa8bef7939ddc8420696bd" +dependencies = [ + "async-trait", + "futures-core", + "http 1.2.0", + "opentelemetry", + "opentelemetry-proto", + "opentelemetry_sdk", + "prost", + "thiserror", + "tokio", + "tonic", +] + [[package]] name = "opentelemetry-proto" version = "0.26.1" @@ -2469,6 +2487,7 @@ dependencies = [ "fastrand", "futures-util", "opentelemetry", + "opentelemetry-otlp", "opentelemetry-proto", "opentelemetry-semantic-conventions", "opentelemetry-stdout", @@ -2478,6 +2497,7 @@ dependencies = [ "serde_json", "thiserror", "tokio", + "tonic", "tracing", "tracing-opentelemetry", "tracing-subscriber", diff --git a/runners/s3-benchrunner-rust/Cargo.toml b/runners/s3-benchrunner-rust/Cargo.toml index b4cf3f70..3e272ff5 100644 --- a/runners/s3-benchrunner-rust/Cargo.toml +++ b/runners/s3-benchrunner-rust/Cargo.toml @@ -6,8 +6,8 @@ edition = "2021" [dependencies] # Swap which line is commented-out to use GitHub or local aws-s3-transfer-manager -#aws-s3-transfer-manager = { git = "https://github.com/awslabs/aws-s3-transfer-manager-rs.git", rev = "06c087a5d53676bb048f6c512b8eb1fda63f03d5" } -aws-s3-transfer-manager = { path = "../../../aws-s3-transfer-manager-rs/aws-s3-transfer-manager" } +aws-s3-transfer-manager = { git = "https://github.com/awslabs/aws-s3-transfer-manager-rs.git", rev = "06c087a5d53676bb048f6c512b8eb1fda63f03d5" } +#aws-s3-transfer-manager = { path = "../../../aws-s3-transfer-manager-rs/aws-s3-transfer-manager" } tracing-opentelemetry = "0.27" opentelemetry = { version = "0.26", features = ["trace"] } @@ -16,15 +16,17 @@ opentelemetry_sdk = { version = "0.26", default-features = false, features = [ "rt-tokio", ] } opentelemetry-proto = "0.26" +opentelemetry-otlp = "0.26" opentelemetry-stdout = { version = "0.26", features = ["trace"] } opentelemetry-semantic-conventions = "0.26" +tonic = "0.12.3" anyhow = "1.0.86" async-trait = "0.1.81" aws-config = "1.5.11" bytes = "1" chrono = "0.4.38" -clap = { version = "4.5.9", features = ["derive"] } +clap = { version = "4.5.23", features = ["derive"] } fastrand = "=2.1.0" futures-util = "0.3" ordered-float = "4.3.0" diff --git a/runners/s3-benchrunner-rust/src/main.rs b/runners/s3-benchrunner-rust/src/main.rs index 67cd02b3..f9414cb7 100644 --- a/runners/s3-benchrunner-rust/src/main.rs +++ b/runners/s3-benchrunner-rust/src/main.rs @@ -1,15 +1,44 @@ -use clap::{Parser, ValueEnum}; -use std::process::exit; +use clap::{Parser, Subcommand, ValueEnum}; +use std::fs::File; use std::time::Instant; +use std::{path::Path, process::exit}; use tracing::{info_span, Instrument}; +use opentelemetry_proto::tonic::{ + collector::trace::v1::{trace_service_client::TraceServiceClient, ExportTraceServiceRequest}, + trace::v1::TracesData, +}; +use tonic::transport::Channel; + use s3_benchrunner_rust::{ bytes_to_gigabits, prepare_run, telemetry, BenchmarkConfig, Result, RunBenchmark, SkipBenchmarkError, TransferManagerRunner, }; + +#[derive(Parser, Debug)] +struct SimpleCli { + #[command(flatten)] + run_args: RunArgs, +} + #[derive(Parser, Debug)] -#[command()] -struct Args { +struct ExtendedCli { + #[command(subcommand)] + command: Command, + #[command(flatten)] + run_args: Option, +} + +#[derive(Subcommand, Debug)] +enum Command { + RunBenchmark(RunArgs), + UploadOtlp(UploadOtlpArgs), +} + +#[derive(Debug, clap::Args)] +#[command(args_conflicts_with_subcommands = true)] +#[command(flatten_help = true)] +struct RunArgs { #[arg(value_enum, help = "ID of S3 library to use")] s3_client: S3ClientId, #[arg(help = "Path to workload file (e.g. download-1GiB.run.json)")] @@ -29,6 +58,17 @@ struct Args { disable_directory: bool, } +#[derive(Debug, clap::Args)] +#[command(flatten_help = true)] +struct UploadOtlpArgs { + /// OLTP endpoint to export data to + #[arg(long, default_value = "http://localhost:4317")] + oltp_endpoint: String, + + /// Path to the trace file (in opentelemetry-proto JSON format) to upload + json_file: String, +} + #[derive(ValueEnum, Clone, Debug)] enum S3ClientId { #[clap(name = "sdk-rust-tm", help = "use aws-s3-transfer-manager crate")] @@ -39,24 +79,36 @@ enum S3ClientId { } #[tokio::main] -async fn main() { - let args = Args::parse(); - - let result = execute(&args).await; - if let Err(e) = result { - match e.downcast_ref::() { - None => { - panic!("{e:?}"); - } - Some(msg) => { - eprintln!("Skipping benchmark - {msg}"); - exit(123); +async fn main() -> Result<()> { + let command = SimpleCli::try_parse() + .map(|cli| Command::RunBenchmark(cli.run_args)) + .unwrap_or_else(|_| ExtendedCli::parse().command); + + // let cli = Cli::parse(); + // let command = cli.command.unwrap_or(Command::RunBenchmark(cli.run_args)); + + match command { + Command::RunBenchmark(args) => { + let result = execute(&args).await; + if let Err(e) = result { + match e.downcast_ref::() { + None => { + panic!("{e:?}"); + } + Some(msg) => { + eprintln!("Skipping benchmark - {msg}"); + exit(123); + } + } } } + Command::UploadOtlp(args) => upload_otlp(args).await?, } + + Ok(()) } -async fn execute(args: &Args) -> Result<()> { +async fn execute(args: &RunArgs) -> Result<()> { let mut telemetry = if args.telemetry { // If emitting telemetry, set that up as tracing_subscriber. Some(telemetry::init_tracing_subscriber().unwrap()) @@ -119,7 +171,7 @@ async fn execute(args: &Args) -> Result<()> { Ok(()) } -async fn new_runner(args: &Args) -> Result> { +async fn new_runner(args: &RunArgs) -> Result> { let config = BenchmarkConfig::new( &args.workload, &args.bucket, @@ -150,3 +202,36 @@ fn trace_file_name( let run_start = run_start.format("%Y%m%dT%H%M%SZ").to_string(); format!("trace_{run_start}_{workload}_run{run_num:02}.json") } + +async fn upload_otlp(args: UploadOtlpArgs) -> Result<()> { + let path = Path::new(&args.json_file); + let f = File::open(path)?; + let trace_data = read_spans_from_json(f)?; + println!("loaded {} spans", trace_data.resource_spans.len()); + + let endpoint = Channel::from_shared(args.oltp_endpoint)?; + let channel = endpoint.connect_lazy(); + let mut client = TraceServiceClient::new(channel); + + let requests: Vec<_> = trace_data + .resource_spans + .chunks(4_096) + .map(|batch| ExportTraceServiceRequest { + resource_spans: batch.to_vec(), + }) + .collect(); + + for request in requests { + let resp = client.export(request).await?; + println!("export response: {:?}", resp); + } + + Ok(()) +} + +// read a file contains ResourceSpans in json format +pub fn read_spans_from_json(file: File) -> Result { + let reader = std::io::BufReader::new(file); + let trace_data: TracesData = serde_json::from_reader(reader)?; + Ok(trace_data) +} From 508f630890a5279ab6df7a8ee7d8866ea272529a Mon Sep 17 00:00:00 2001 From: Aaron J Todd Date: Fri, 20 Dec 2024 14:28:32 -0500 Subject: [PATCH 3/3] lint --- runners/s3-benchrunner-rust/src/main.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/runners/s3-benchrunner-rust/src/main.rs b/runners/s3-benchrunner-rust/src/main.rs index f9414cb7..cfefaa4d 100644 --- a/runners/s3-benchrunner-rust/src/main.rs +++ b/runners/s3-benchrunner-rust/src/main.rs @@ -84,9 +84,6 @@ async fn main() -> Result<()> { .map(|cli| Command::RunBenchmark(cli.run_args)) .unwrap_or_else(|_| ExtendedCli::parse().command); - // let cli = Cli::parse(); - // let command = cli.command.unwrap_or(Command::RunBenchmark(cli.run_args)); - match command { Command::RunBenchmark(args) => { let result = execute(&args).await;