Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 21 additions & 13 deletions src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,49 +33,57 @@ pub struct Args {

/// enable IPv6 on the server (on most hosts, this will allow both IPv4 and IPv6,
/// but it might limit to just IPv6 on some)
#[arg(short = '6', long)]
#[arg(short = '6', long, conflicts_with = "client")]
pub version6: bool,

/// limit the number of concurrent clients that can be processed by a server;
/// any over this count will be immediately disconnected
#[arg(long, value_name = "number", default_value = "0")]
#[arg(long, value_name = "number", default_value = "0", conflicts_with = "client")]
pub client_limit: usize,

/// run in client mode; value is the server's address
#[arg(short, long, value_name = "host", conflicts_with = "server")]
pub client: Option<std::net::IpAddr>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get what you're going for here, but this breaks compatibility with hostname-based resolution.

Not everything will, or should, be specified as an IP address.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A string?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, specifically a hostname as defined in https://datatracker.ietf.org/doc/html/rfc952 or an IP address, v4 or v6, per https://datatracker.ietf.org/doc/html/rfc1123#page-13, but I'm perfectly content for it to just be a string and for the user to figure it out if they enter something nonsensical, rather than having the tool try to hold their hand on something basic like this.


/// run in reverse-mode (server sends, client receives)
#[arg(short = 'R', long)]
#[arg(short = 'R', long, conflicts_with = "server")]
pub reverse: bool,

/// the format in which to deplay information (json, megabit/sec, megabyte/sec)
#[arg(short, long, value_enum, value_name = "format", default_value = "megabit")]
#[arg(
short,
long,
value_enum,
value_name = "format",
default_value = "megabit",
conflicts_with = "server"
)]
pub format: Format,

/// use UDP rather than TCP
#[arg(short, long)]
#[arg(short, long, conflicts_with = "server")]
pub udp: bool,

/// target bandwidth in bytes/sec; this value is applied to each stream,
/// with a default target of 1 megabit/second for all protocols (note: megabit, not mebibit);
/// the suffixes kKmMgG can also be used for xbit and xbyte, respectively
#[arg(short, long, default_value = "125000", value_name = "bytes/sec")]
#[arg(short, long, default_value = "125000", value_name = "bytes/sec", conflicts_with = "server")]
pub bandwidth: String,

/// the time in seconds for which to transmit
#[arg(short, long, default_value = "10.0", value_name = "seconds")]
#[arg(short, long, default_value = "10.0", value_name = "seconds", conflicts_with = "server")]
pub time: f64,

/// the interval at which to send batches of data, in seconds, between [0.0 and 1.0);
/// this is used to evenly spread packets out over time
#[arg(long, default_value = "0.05", value_name = "seconds")]
#[arg(long, default_value = "0.05", value_name = "seconds", conflicts_with = "server")]
pub send_interval: f64,

/// length of the buffer to exchange; for TCP, this defaults to 32 kibibytes; for UDP, it's 1024 bytes
#[arg(
short,
long,
conflicts_with = "server",
default_value = "32768",
default_value_if("udp", "true", Some("1024")),
value_name = "bytes"
Expand All @@ -85,27 +93,27 @@ pub struct Args {
/// send buffer, in bytes (only supported on some platforms;
/// if set too small, a 'resource unavailable' error may occur;
/// affects TCP window-size)
#[arg(long, default_value = "0", value_name = "bytes")]
#[arg(long, default_value = "0", value_name = "bytes", conflicts_with = "server")]
pub send_buffer: usize,

/// receive buffer, in bytes (only supported on some platforms;
/// if set too small, a 'resource unavailable' error may occur; affects TCP window-size)
#[arg(long, default_value = "0", value_name = "bytes")]
#[arg(long, default_value = "0", value_name = "bytes", conflicts_with = "server")]
pub receive_buffer: usize,

/// the number of parallel data-streams to use
#[arg(short = 'P', long, value_name = "number", default_value = "1")]
#[arg(short = 'P', long, value_name = "number", default_value = "1", conflicts_with = "server")]
pub parallel: usize,

/// omit a number of seconds from the start of calculations,
/// primarily to avoid including TCP ramp-up in averages;
/// using this option may result in disagreement between bytes sent and received,
/// since data can be in-flight across time-boundaries
#[arg(short, long, default_value = "0", value_name = "seconds")]
#[arg(short, long, default_value = "0", value_name = "seconds", conflicts_with = "server")]
pub omit: usize,

/// use no-delay mode for TCP tests, disabling Nagle's Algorithm
#[arg(short = 'N', long)]
#[arg(short = 'N', long, conflicts_with = "server")]
pub no_delay: bool,

/// an optional pool of IPv4 TCP ports over which data will be accepted;
Expand Down
8 changes: 7 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,10 @@ pub mod server;
pub(crate) mod stream;
pub(crate) mod utils;

pub(crate) type BoxResult<T> = Result<T, Box<dyn std::error::Error + Send + Sync + 'static>>;
pub type BoxResult<T> = Result<T, Box<dyn std::error::Error + Send + Sync + 'static>>;

/// a global token generator
pub(crate) fn get_global_token() -> mio::Token {
mio::Token(TOKEN_SEED.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1)
}
static TOKEN_SEED: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
18 changes: 5 additions & 13 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
* along with rperf. If not, see <https://www.gnu.org/licenses/>.
*/

use rperf::{args, client, server};
use rperf::{args, client, server, BoxResult};

fn main() {
fn main() -> BoxResult<()> {
use clap::Parser;
let args = args::Args::parse();

Expand All @@ -45,12 +45,8 @@ fn main() {
.expect("unable to set SIGINT handler");

log::debug!("beginning normal operation...");
let service = server::serve(&args);
server::serve(&args)?;
exiting.join().expect("unable to join SIGINT handler thread");
if service.is_err() {
log::error!("unable to run server: {}", service.unwrap_err());
std::process::exit(4);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This exit-code is important to help determine whether an environmental error has occurred. I must ask that it not be suppressed -- the changes here would replace it with 1.

}
} else if args.client.is_some() {
log::debug!("registering SIGINT handler...");
ctrlc2::set_handler(move || {
Expand All @@ -65,15 +61,11 @@ fn main() {
.expect("unable to set SIGINT handler");

log::debug!("connecting to server...");
let execution = client::execute(&args);
if execution.is_err() {
log::error!("unable to run client: {}", execution.unwrap_err());
std::process::exit(4);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This exit-code is also important.

}
client::execute(&args)?;
} else {
use clap::CommandFactory;
let mut cmd = args::Args::command();
cmd.print_help().unwrap();
std::process::exit(2);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this one, too. 2 is commonly used to indicate that a configuration error occurred and this exit-code is part of rperf's established process interface.

}
Ok(())
}
46 changes: 28 additions & 18 deletions src/protocol/communication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::io::{self, Read, Write};
use std::time::Duration;

use mio::net::TcpStream;
use mio::{Events, Interest, Poll, Token};
use mio::{Events, Interest, Poll};

use crate::BoxResult;

Expand Down Expand Up @@ -51,14 +51,17 @@ pub fn send(stream: &mut TcpStream, message: &serde_json::Value) -> BoxResult<()

/// receives the length-count of a pending message over a client-server communications stream
fn receive_length(stream: &mut TcpStream, alive_check: fn() -> bool, results_handler: &mut dyn FnMut() -> BoxResult<()>) -> BoxResult<u16> {
let mio_token = Token(0);
let mio_token = crate::get_global_token();
let mut poll = Poll::new()?;
poll.registry().register(stream, mio_token, Interest::READABLE)?;
let mut events = Events::with_capacity(1); //only interacting with one stream

let mut length_bytes_read = 0;
let mut length_spec: [u8; 2] = [0; 2];
while alive_check() {
let result: BoxResult<u16> = 'exiting: loop {
if !alive_check() {
break 'exiting Ok(0);
}
//waiting to find out how long the next message is
results_handler()?; //send any outstanding results between cycles
poll.poll(&mut events, Some(POLL_TIMEOUT))?;
Expand All @@ -72,31 +75,33 @@ fn receive_length(stream: &mut TcpStream, alive_check: fn() -> bool, results_han
break;
}
Err(e) => {
return Err(Box::new(e));
break 'exiting Err(Box::new(e));
}
};

if size == 0 {
if alive_check() {
return Err(Box::new(simple_error::simple_error!("connection lost")));
break 'exiting Err(Box::new(simple_error::simple_error!("connection lost")));
} else {
//shutting down; a disconnect is expected
return Err(Box::new(simple_error::simple_error!("local shutdown requested")));
break 'exiting Err(Box::new(simple_error::simple_error!("local shutdown requested")));
}
}

length_bytes_read += size;
if length_bytes_read == 2 {
let length = u16::from_be_bytes(length_spec);
log::debug!("received length-spec of {} from {}", length, stream.peer_addr()?);
return Ok(length);
break 'exiting Ok(length);
} else {
log::debug!("received partial length-spec from {}", stream.peer_addr()?);
}
}
}
}
Err(Box::new(simple_error::simple_error!("system shutting down")))
};
poll.registry().deregister(stream)?;
result
// Err(Box::new(simple_error::simple_error!("system shutting down")))
}
/// receives the data-value of a pending message over a client-server communications stream
fn receive_payload(
Expand All @@ -105,14 +110,17 @@ fn receive_payload(
results_handler: &mut dyn FnMut() -> BoxResult<()>,
length: u16,
) -> BoxResult<serde_json::Value> {
let mio_token = Token(0);
let mio_token = crate::get_global_token();
let mut poll = Poll::new()?;
poll.registry().register(stream, mio_token, Interest::READABLE)?;
let mut events = Events::with_capacity(1); //only interacting with one stream

let mut bytes_read = 0;
let mut buffer = vec![0_u8; length.into()];
while alive_check() {
let result: BoxResult<serde_json::Value> = 'exiting: loop {
if !alive_check() {
break 'exiting Ok(serde_json::from_slice(&buffer[0..0])?);
}
//waiting to receive the payload
results_handler()?; //send any outstanding results between cycles
poll.poll(&mut events, Some(POLL_TIMEOUT))?;
Expand All @@ -126,16 +134,16 @@ fn receive_payload(
break;
}
Err(e) => {
return Err(Box::new(e));
break 'exiting Err(Box::new(e));
}
};

if size == 0 {
if alive_check() {
return Err(Box::new(simple_error::simple_error!("connection lost")));
break 'exiting Err(Box::new(simple_error::simple_error!("connection lost")));
} else {
// shutting down; a disconnect is expected
return Err(Box::new(simple_error::simple_error!("local shutdown requested")));
break 'exiting Err(Box::new(simple_error::simple_error!("local shutdown requested")));
}
}

Expand All @@ -144,19 +152,21 @@ fn receive_payload(
match serde_json::from_slice(&buffer) {
Ok(v) => {
log::debug!("received {:?} from {}", v, stream.peer_addr()?);
return Ok(v);
break 'exiting Ok(v);
}
Err(e) => {
return Err(Box::new(e));
break 'exiting Err(Box::new(e));
}
}
} else {
log::debug!("received partial payload from {}", stream.peer_addr()?);
}
}
}
}
Err(Box::new(simple_error::simple_error!("system shutting down")))
};
poll.registry().deregister(stream)?;
result
// Err(Box::new(simple_error::simple_error!("system shutting down")))
}
/// handles the full process of retrieving a message from a client-server communications stream
pub fn receive(
Expand Down
6 changes: 3 additions & 3 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::thread;
use std::time::Duration;

use mio::net::{TcpListener, TcpStream};
use mio::{Events, Interest, Poll, Token};
use mio::{Events, Interest, Poll};

use crate::args::Args;
use crate::protocol::communication::{receive, send, KEEPALIVE_DURATION};
Expand Down Expand Up @@ -328,15 +328,15 @@ pub fn serve(args: &Args) -> BoxResult<()> {
TcpListener::bind(SocketAddr::new(args.bind, port)).unwrap_or_else(|_| panic!("failed to bind TCP socket, port {}", port));
log::info!("server listening on {}", listener.local_addr()?);

let mio_token = Token(0);
let mio_token = crate::get_global_token();
let mut poll = Poll::new()?;
poll.registry().register(&mut listener, mio_token, Interest::READABLE)?;
let mut events = Events::with_capacity(32);

while is_alive() {
if let Err(err) = poll.poll(&mut events, Some(POLL_TIMEOUT)) {
if err.kind() == std::io::ErrorKind::Interrupted {
log::debug!("Poll interrupted: \"{err}\", ignored, continue polling");
log::debug!("Poll interrupted: \"{err}\"");
continue;
}
log::error!("Poll error: {}", err);
Expand Down
Loading