Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions clients/cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions clients/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ postcard = "1.0.10"
prost = "0.13"
prost-types = "0.13.5"
rand = "0.8"
rayon = "1"
rand_core = "0.6"
ratatui = "0.29.0"
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] }
Expand Down
7 changes: 5 additions & 2 deletions clients/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ enum Command {
/// Serialized inputs blob
#[arg(long)]
inputs: String,
/// Number of Rayon threads for this subprocess (0 = use Rayon default)
#[arg(long, default_value_t = 0)]
num_threads: usize,
},
}

Expand Down Expand Up @@ -218,9 +221,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
let orchestrator = Box::new(OrchestratorClient::new(environment));
register_node(node_id, &config_path, orchestrator).await
}
Command::ProveFibSubprocess { inputs } => {
Command::ProveFibSubprocess { inputs, num_threads } => {
let inputs: (u32, u32, u32) = serde_json::from_str(&inputs)?;
match ProvingEngine::prove_fib_subprocess(&inputs) {
match ProvingEngine::prove_fib_subprocess(&inputs, num_threads) {
Ok(proof) => {
let bytes = to_allocvec(&proof)?;
let mut out = std::io::stdout().lock();
Expand Down
10 changes: 9 additions & 1 deletion clients/cli/src/prover/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,12 @@ impl ProvingEngine {
}

/// Subprocess entrypoint: generate proof without verification
pub fn prove_fib_subprocess(inputs: &(u32, u32, u32)) -> Result<Proof, ProverError> {
pub fn prove_fib_subprocess(inputs: &(u32, u32, u32), num_threads: usize) -> Result<Proof, ProverError> {
if num_threads > 0 {
let _ = rayon::ThreadPoolBuilder::new()
.num_threads(num_threads)
.build_global();
}
let prover = Self::create_fib_prover()?;
let (view, proof) = prover
.prove_with_input::<(), (u32, u32, u32)>(&(), inputs)
Expand All @@ -53,13 +58,16 @@ impl ProvingEngine {
task: &Task,
environment: &Environment,
client_id: &str,
num_threads: usize,
) -> Result<Proof, ProverError> {
// Spawn a subprocess for proof generation to isolate memory usage
let exe_path = env::current_exe()?;
let mut cmd = tokio::process::Command::new(exe_path);
cmd.arg("prove-fib-subprocess")
.arg("--inputs")
.arg(serde_json::to_string(inputs)?)
.arg("--num-threads")
.arg(num_threads.to_string())
.stdout(Stdio::piped())
.stderr(Stdio::inherit());

Expand Down
1 change: 1 addition & 0 deletions clients/cli/src/prover/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ impl ProvingPipeline {
&task_ref,
&environment_ref,
&client_id_ref,
num_workers,
)
.await?;

Expand Down
83 changes: 81 additions & 2 deletions clients/cli/src/session/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,9 @@ pub async fn setup_session(
let max_workers = ((total_cores as f64 * 0.75).ceil() as usize).max(1);
let mut num_workers: usize = max_threads.unwrap_or(1).clamp(1, max_workers as u32) as usize;

// Check memory and clamp threads if max-threads was explicitly set OR check-memory flag is set
if max_threads.is_some() || check_mem {
// Check memory and clamp threads only when --check-memory is explicitly requested.
// When the user explicitly sets --max-threads, trust their hardware knowledge.
if check_mem {
let memory_clamped_workers = clamp_threads_by_memory(num_workers);
if memory_clamped_workers < num_workers {
crate::print_cmd_warn!(
Expand Down Expand Up @@ -166,3 +167,81 @@ pub async fn setup_session(
num_workers,
})
}

#[cfg(test)]
mod tests {
use super::*;

fn compute_num_workers(max_threads: Option<u32>, total_cores: usize) -> usize {
let max_workers = ((total_cores as f64 * 0.75).ceil() as usize).max(1);
max_threads.unwrap_or(1).clamp(1, max_workers as u32) as usize
}

#[test]
fn num_workers_defaults_to_1_without_flag() {
let workers = compute_num_workers(None, 8);
assert_eq!(workers, 1);
}

#[test]
fn num_workers_uses_max_threads_when_set() {
// 8 cores → max_workers = ceil(6) = 6; --max-threads 4 should give 4
let workers = compute_num_workers(Some(4), 8);
assert_eq!(workers, 4);
}

#[test]
fn num_workers_clamps_to_core_limit() {
// 2 cores → max_workers = ceil(1.5) = 2; --max-threads 8 should be clamped to 2
let workers = compute_num_workers(Some(8), 2);
assert_eq!(workers, 2);
}

#[test]
fn num_workers_minimum_is_1() {
// --max-threads 0 would be clamped up to 1 by clamp(1, …)
let workers = compute_num_workers(Some(0), 8);
assert_eq!(workers, 1);
}

#[test]
fn memory_clamp_not_applied_when_check_mem_false() {
// Simulate a machine with very little "memory" — previously this would reduce workers to 1
// when max_threads.is_some(). Now it must NOT clamp unless check_mem=true.
let requested = 4usize;
// With check_mem=false the result stays at requested (no memory check runs).
// We verify by re-running the guard logic: clamp_threads_by_memory would return 1 on a
// tiny memory budget, but the guard is now gated on check_mem only.
let check_mem = false;
let max_threads: Option<u32> = Some(4);

// The guard in setup_session is: if check_mem { ... }
// Since check_mem=false, clamping never runs regardless of max_threads.
let result = if check_mem {
clamp_threads_by_memory(requested)
} else {
requested
};

assert_eq!(result, requested,
"num_workers should not be memory-clamped when --check-memory is not set (got {result}, max_threads={max_threads:?})");
}

#[test]
fn memory_clamp_applied_when_check_mem_true_and_overcommitted() {
// When check_mem=true and the request exceeds available memory, clamping should fire.
// clamp_threads_by_memory reads real system memory, so we only verify the path is taken
// and the result is ≥ 1.
let requested = usize::MAX; // absurdly large
let check_mem = true;

let result = if check_mem {
clamp_threads_by_memory(requested)
} else {
requested
};

assert!(result >= 1, "clamp_threads_by_memory must always return at least 1");
assert!(result < usize::MAX, "overcommitted request must be clamped");
}
}
60 changes: 60 additions & 0 deletions clients/cli/tests/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ fn config_file_path(dir: &tempfile::TempDir) -> PathBuf {

const BINARY_NAME: &str = "nexus-network";

/// Run the binary with given args and return stdout + stderr combined.
fn run_bin(args: &[&str]) -> assert_cmd::assert::Assert {
Command::cargo_bin(BINARY_NAME).unwrap().args(args).assert()
}

#[test]
/// Help command should display usage information.
fn cli_help_displays_usage() {
Expand Down Expand Up @@ -49,6 +54,61 @@ fn register_user_command_creates_config_file() {
assert!(config_path.exists());
}

#[test]
/// --max-threads flag appears in `start --help`.
fn start_help_shows_max_threads() {
run_bin(&["start", "--help"])
.success()
.stdout(contains("--max-threads"));
}

#[test]
/// --max-threads is accepted by clap; an unrecognized value would produce "error: invalid value".
/// Here we confirm that passing a numeric value doesn't produce a clap argument error.
/// (The command will still fail at runtime due to missing config, which is expected.)
fn start_max_threads_flag_is_parsed() {
// We expect a runtime failure (missing config / version check), NOT a clap "unrecognized
// argument" or "invalid value" error. The absence of those clap-level errors confirms the
// flag was wired up correctly.
let output = Command::cargo_bin(BINARY_NAME)
.unwrap()
.args(["start", "--max-threads", "4"])
.output()
.unwrap();

let stderr = String::from_utf8_lossy(&output.stderr);
assert!(
!stderr.contains("unrecognized argument") && !stderr.contains("invalid value for '--max-threads'"),
"clap rejected --max-threads: {stderr}"
);
}

#[test]
/// The hidden `prove-fib-subprocess` subcommand accepts --num-threads.
/// Passing an invalid --inputs value triggers a JSON parse error (not an "unrecognized argument"
/// error), which confirms the flag was wired up correctly.
fn subprocess_num_threads_flag_is_parsed() {
let output = Command::cargo_bin(BINARY_NAME)
.unwrap()
.args([
"prove-fib-subprocess",
"--inputs",
"not-valid-json",
"--num-threads",
"4",
])
.output()
.unwrap();

let stderr = String::from_utf8_lossy(&output.stderr);
assert!(
!stderr.contains("unrecognized argument") && !stderr.contains("invalid value for '--num-threads'"),
"clap rejected --num-threads: {stderr}"
);
// The process must have exited non-zero (invalid inputs)
assert!(!output.status.success(), "expected non-zero exit for invalid inputs");
}

#[test]
/// Logout command should delete an existing config file.
fn logout_deletes_config_file() {
Expand Down