Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
321 changes: 280 additions & 41 deletions runners/s3-benchrunner-rust/Cargo.lock

Large diffs are not rendered by default.

12 changes: 7 additions & 5 deletions runners/s3-benchrunner-rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,32 @@ edition = "2021"

# Swap which line is commented-out to use GitHub or local aws-s3-transfer-manager
aws-s3-transfer-manager = { git = "https://github.com/awslabs/aws-s3-transfer-manager-rs.git", rev = "06c087a5d53676bb048f6c512b8eb1fda63f03d5" }
# aws-s3-transfer-manager = { path = "../../../aws-s3-transfer-manager-rs/aws-s3-transfer-manager" }
#aws-s3-transfer-manager = { path = "../../../aws-s3-transfer-manager-rs/aws-s3-transfer-manager" }

tracing-opentelemetry = "0.27"
opentelemetry = { version = "0.26", features = ["trace"] }
opentelemetry_sdk = { version = "0.26", default-features = false, features = [
"trace",
"rt-tokio",
] }
opentelemetry-proto = "0.26"
opentelemetry-otlp = "0.26"
opentelemetry-stdout = { version = "0.26", features = ["trace"] }
Copy link
Contributor

@graebm graebm Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't use this. My bad for leaving it in

Suggested change
opentelemetry-stdout = { version = "0.26", features = ["trace"] }

opentelemetry-semantic-conventions = "0.26"
tonic = "0.12.3"

anyhow = "1.0.86"
async-trait = "0.1.81"
aws-config = "1.5.4"
aws-sdk-s3 = "1.41.0"
aws-config = "1.5.11"
bytes = "1"
chrono = "0.4.38"
clap = { version = "4.5.9", features = ["derive"] }
clap = { version = "4.5.23", features = ["derive"] }
fastrand = "=2.1.0"
futures-util = "0.3"
ordered-float = "4.3.0"
serde = { version = "1.0.204", features = ["derive"] }
serde_json = "1.0.120"
thiserror = "1.0.62"
tokio = { version = "1.40.0", features = ["io-util"] }
tokio = { version = "1.42.0", features = ["io-util"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
118 changes: 100 additions & 18 deletions runners/s3-benchrunner-rust/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,44 @@
use clap::{Parser, ValueEnum};
use std::process::exit;
use clap::{Parser, Subcommand, ValueEnum};
use std::fs::File;
use std::time::Instant;
use std::{path::Path, process::exit};
use tracing::{info_span, Instrument};

use opentelemetry_proto::tonic::{
collector::trace::v1::{trace_service_client::TraceServiceClient, ExportTraceServiceRequest},
trace::v1::TracesData,
};
use tonic::transport::Channel;

use s3_benchrunner_rust::{
bytes_to_gigabits, prepare_run, telemetry, BenchmarkConfig, Result, RunBenchmark,
SkipBenchmarkError, TransferManagerRunner,
};

#[derive(Parser, Debug)]
struct SimpleCli {
#[command(flatten)]
run_args: RunArgs,
}

#[derive(Parser, Debug)]
#[command()]
struct Args {
struct ExtendedCli {
#[command(subcommand)]
command: Command,
#[command(flatten)]
run_args: Option<RunArgs>,
}

#[derive(Subcommand, Debug)]
enum Command {
Comment on lines +32 to +33
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, in the scripts that run all the benchmarks across the different language runners, it's assumed all runners take the same exact command line arguments (that's why the essential args are passed by position, so that it's easier to parse, regardless of language). See: https://github.com/awslabs/aws-crt-s3-benchmarks/?tab=readme-ov-file#run-a-benchmark

RUNNER_CMD S3_CLIENT WORKLOAD BUCKET REGION TARGET_THROUGHPUT

Fortunately RUNNER_CMD can be a list of strings (because it takes a lot of string to launch a java application), so we can make this work:

Edit this line:

return [str(runner_src/'target/release/s3-benchrunner-rust')]

to be like return [str(runner_src/'target/release/s3-benchrunner-rust', 'run-benchmark')]

RunBenchmark(RunArgs),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trivial/debatable: if we stick with this, I'd prefer a shorter name, since this is what we're doing 99% of the time

Suggested change
RunBenchmark(RunArgs),
Run(RunArgs),

UploadOtlp(UploadOtlpArgs),
}

#[derive(Debug, clap::Args)]
#[command(args_conflicts_with_subcommands = true)]
#[command(flatten_help = true)]
struct RunArgs {
#[arg(value_enum, help = "ID of S3 library to use")]
s3_client: S3ClientId,
#[arg(help = "Path to workload file (e.g. download-1GiB.run.json)")]
Expand All @@ -29,6 +58,17 @@ struct Args {
disable_directory: bool,
}

#[derive(Debug, clap::Args)]
#[command(flatten_help = true)]
struct UploadOtlpArgs {
Comment on lines +61 to +63
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't seem like this upload-otlp mode actually shares any code with the benchmark runner?

It seems simpler to just build this as a separate utility, rather than complicate the benchmark runner, unless you foresee a lot of shared functionality in the future?

/// OLTP endpoint to export data to
#[arg(long, default_value = "http://localhost:4317")]
oltp_endpoint: String,

/// Path to the trace file (in opentelemetry-proto JSON format) to upload
json_file: String,
}

#[derive(ValueEnum, Clone, Debug)]
enum S3ClientId {
#[clap(name = "sdk-rust-tm", help = "use aws-s3-transfer-manager crate")]
Expand All @@ -39,24 +79,33 @@ enum S3ClientId {
}

#[tokio::main]
async fn main() {
let args = Args::parse();

let result = execute(&args).await;
if let Err(e) = result {
match e.downcast_ref::<SkipBenchmarkError>() {
None => {
panic!("{e:?}");
}
Some(msg) => {
eprintln!("Skipping benchmark - {msg}");
exit(123);
async fn main() -> Result<()> {
let command = SimpleCli::try_parse()
.map(|cli| Command::RunBenchmark(cli.run_args))
.unwrap_or_else(|_| ExtendedCli::parse().command);

match command {
Command::RunBenchmark(args) => {
let result = execute(&args).await;
if let Err(e) = result {
match e.downcast_ref::<SkipBenchmarkError>() {
None => {
panic!("{e:?}");
}
Some(msg) => {
eprintln!("Skipping benchmark - {msg}");
exit(123);
}
}
}
}
Command::UploadOtlp(args) => upload_otlp(args).await?,
}

Ok(())
}

async fn execute(args: &Args) -> Result<()> {
async fn execute(args: &RunArgs) -> Result<()> {
let mut telemetry = if args.telemetry {
// If emitting telemetry, set that up as tracing_subscriber.
Some(telemetry::init_tracing_subscriber().unwrap())
Expand Down Expand Up @@ -119,7 +168,7 @@ async fn execute(args: &Args) -> Result<()> {
Ok(())
}

async fn new_runner(args: &Args) -> Result<Box<dyn RunBenchmark>> {
async fn new_runner(args: &RunArgs) -> Result<Box<dyn RunBenchmark>> {
let config = BenchmarkConfig::new(
&args.workload,
&args.bucket,
Expand Down Expand Up @@ -150,3 +199,36 @@ fn trace_file_name(
let run_start = run_start.format("%Y%m%dT%H%M%SZ").to_string();
format!("trace_{run_start}_{workload}_run{run_num:02}.json")
}

async fn upload_otlp(args: UploadOtlpArgs) -> Result<()> {
let path = Path::new(&args.json_file);
let f = File::open(path)?;
let trace_data = read_spans_from_json(f)?;
println!("loaded {} spans", trace_data.resource_spans.len());

let endpoint = Channel::from_shared(args.oltp_endpoint)?;
let channel = endpoint.connect_lazy();
let mut client = TraceServiceClient::new(channel);

let requests: Vec<_> = trace_data
.resource_spans
.chunks(4_096)
.map(|batch| ExportTraceServiceRequest {
resource_spans: batch.to_vec(),
})
.collect();

for request in requests {
let resp = client.export(request).await?;
println!("export response: {:?}", resp);
}

Ok(())
}

// read a file contains ResourceSpans in json format
pub fn read_spans_from_json(file: File) -> Result<TracesData> {
let reader = std::io::BufReader::new(file);
let trace_data: TracesData = serde_json::from_reader(reader)?;
Ok(trace_data)
}
1 change: 0 additions & 1 deletion runners/s3-benchrunner-rust/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use std::env;

use crate::Result;

pub mod common;
pub mod trace;

// Create OTEL Resource (the entity that produces telemetry)
Expand Down
Loading