From 786e39c6865b7e48e800f68e27fbe6fd2cd01d54 Mon Sep 17 00:00:00 2001 From: Andreas Hartmetz Date: Sun, 16 Feb 2025 14:44:46 +0100 Subject: [PATCH 1/2] Eliminate a self destructuring It seems to be a bad deal: increases line count and obscures the origin of values in a pretty long function. --- src/dist/http.rs | 43 +++++++++++++++++-------------------------- 1 file changed, 17 insertions(+), 26 deletions(-) diff --git a/src/dist/http.rs b/src/dist/http.rs index f8a0e9b7d..aac984992 100644 --- a/src/dist/http.rs +++ b/src/dist/http.rs @@ -913,30 +913,19 @@ mod server { } pub fn start(self) -> Result { - let Self { - bind_address, - scheduler_url, - scheduler_auth, - cert_digest, - cert_pem, - privkey_pem, - jwt_key, - server_nonce, - handler, - } = self; let heartbeat_req = HeartbeatServerHttpRequest { num_cpus: num_cpus(), - jwt_key: jwt_key.clone(), - server_nonce, - cert_digest, - cert_pem: cert_pem.clone(), + jwt_key: self.jwt_key.clone(), + server_nonce: self.server_nonce, + cert_digest: self.cert_digest, + cert_pem: self.cert_pem.clone(), }; - let job_authorizer = JWTJobAuthorizer::new(jwt_key); - let heartbeat_url = urls::scheduler_heartbeat_server(&scheduler_url); + let job_authorizer = JWTJobAuthorizer::new(self.jwt_key); + let heartbeat_url = urls::scheduler_heartbeat_server(&self.scheduler_url); let requester = ServerRequester { client: new_reqwest_blocking_client(), - scheduler_url, - scheduler_auth: scheduler_auth.clone(), + scheduler_url: self.scheduler_url, + scheduler_auth: self.scheduler_auth.clone(), }; // TODO: detect if this panics @@ -947,7 +936,7 @@ mod server { match bincode_req( client .post(heartbeat_url.clone()) - .bearer_auth(scheduler_auth.clone()) + .bearer_auth(self.scheduler_auth.clone()) .bincode(&heartbeat_req) .expect("failed to serialize heartbeat"), ) { @@ -967,10 +956,10 @@ mod server { } }); - info!("Server listening for clients on {}", bind_address); + info!("Server listening for clients on {}", self.bind_address); let request_count = atomic::AtomicUsize::new(0); - let server = rouille::Server::new_ssl(bind_address, move |request| { + let server = rouille::Server::new_ssl(self.bind_address, move |request| { let req_id = request_count.fetch_add(1, atomic::Ordering::SeqCst); trace!("Req {} ({}): {:?}", req_id, request.remote_addr(), request); let response = (|| router!(request, @@ -979,7 +968,7 @@ mod server { let toolchain = try_or_400_log!(req_id, bincode_input(request)); trace!("Req {}: assign_job({}): {:?}", req_id, job_id, toolchain); - let res: AssignJobResult = try_or_500_log!(req_id, handler.handle_assign_job(job_id, toolchain)); + let res: AssignJobResult = try_or_500_log!(req_id, self.handler.handle_assign_job(job_id, toolchain)); prepare_response(request, &res) }, (POST) (/api/v1/distserver/submit_toolchain/{job_id: JobId}) => { @@ -988,7 +977,8 @@ mod server { let body = request.data().expect("body was already read in submit_toolchain"); let toolchain_rdr = ToolchainReader(Box::new(body)); - let res: SubmitToolchainResult = try_or_500_log!(req_id, handler.handle_submit_toolchain(&requester, job_id, toolchain_rdr)); + let res: SubmitToolchainResult = try_or_500_log!(req_id, self.handler.handle_submit_toolchain( + &requester, job_id, toolchain_rdr)); prepare_response(request, &res) }, (POST) (/api/v1/distserver/run_job/{job_id: JobId}) => { @@ -1007,7 +997,8 @@ mod server { let inputs_rdr = InputsReader(Box::new(ZlibReadDecoder::new(body))); let outputs = outputs.into_iter().collect(); - let res: RunJobResult = try_or_500_log!(req_id, handler.handle_run_job(&requester, job_id, command, outputs, inputs_rdr)); + let res: RunJobResult = try_or_500_log!(req_id, self.handler.handle_run_job(&requester, job_id, + command, outputs, inputs_rdr)); prepare_response(request, &res) }, _ => { @@ -1017,7 +1008,7 @@ mod server { ))(); trace!("Res {}: {:?}", req_id, response); response - }, cert_pem, privkey_pem).map_err(|e| anyhow!(format!("Failed to start http server for sccache server: {}", e)))?; + }, self.cert_pem, self.privkey_pem).map_err(|e| anyhow!(format!("Failed to start http server for sccache server: {}", e)))?; // This limit is rouille's default for `start_server_with_pool`, which // we would use, except that interface doesn't permit any sort of From 3d47eb4d666eb8d8c5663a2e39aa47053100282c Mon Sep 17 00:00:00 2001 From: Andreas Hartmetz Date: Mon, 17 Feb 2025 00:29:00 +0100 Subject: [PATCH 2/2] Make reported core count of distributed builder configurable Also move the slight inflation of CPU core count ("overcommit" to make up for various latencies) to the builder in order to enable setting an exact maximum number of cores to use which will never be exceeded. That introduces a small problem in the scheduling protocol (excess overcommit if the builder is new and the scheduler is old) that seems pretty acceptable to me and, anyway, does not occur if both builder and scheduler are of the same version. As another side effect, it shouldn't occur anymore that the scheduler reports more running jobs than available slots. --- docs/Distributed.md | 3 +++ docs/DistributedFreeBSD.md | 3 +++ docs/DistributedQuickstart.md | 3 +++ src/bin/sccache-dist/main.rs | 21 +++++++++++++-------- src/config.rs | 5 +++++ src/dist/http.rs | 5 ++++- tests/harness/mod.rs | 2 ++ 7 files changed, 33 insertions(+), 9 deletions(-) diff --git a/docs/Distributed.md b/docs/Distributed.md index 9316d0905..d7826d9f0 100644 --- a/docs/Distributed.md +++ b/docs/Distributed.md @@ -355,6 +355,9 @@ type = "DANGEROUSLY_INSECURE" ```toml +# The maximum number of cores to be used for build jobs. +# If unspecified, slightly higher than the number of CPU cores (including SMT "cores"). +#core_count = 16 # This is where client toolchains will be stored. cache_dir = "/tmp/toolchains" # The maximum size of the toolchain cache, in bytes. diff --git a/docs/DistributedFreeBSD.md b/docs/DistributedFreeBSD.md index 9d9f96013..a9cc8cd88 100644 --- a/docs/DistributedFreeBSD.md +++ b/docs/DistributedFreeBSD.md @@ -35,6 +35,9 @@ Then, a server.conf like the one below is created, making use of the `pot` builder type (commented out options show defaults): ```toml +# The maximum number of cores to be used for build jobs. +# If unspecified, slightly higher than the number of CPU cores (including SMT "cores"). +#core_count = 16 # This is where client toolchains will be stored. cache_dir = "/tmp/toolchains" # The maximum size of the toolchain cache, in bytes. diff --git a/docs/DistributedQuickstart.md b/docs/DistributedQuickstart.md index dbcc8afa3..497a7e69a 100644 --- a/docs/DistributedQuickstart.md +++ b/docs/DistributedQuickstart.md @@ -68,6 +68,9 @@ The build server requires [bubblewrap](https://github.com/projectatomic/bubblewr Create a server.conf file to configure authentication, storage locations, network addresses and the path to bubblewrap. A minimal example looks like: ```toml +# The maximum number of cores to be used for build jobs. +# If unspecified, slightly higher than the number of CPU cores (including SMT "cores"). +#core_count = 16 # This is where client toolchains will be stored. cache_dir = "/tmp/toolchains" # The maximum size of the toolchain cache, in bytes. diff --git a/src/bin/sccache-dist/main.rs b/src/bin/sccache-dist/main.rs index 087e06451..9aae36ae9 100644 --- a/src/bin/sccache-dist/main.rs +++ b/src/bin/sccache-dist/main.rs @@ -14,8 +14,7 @@ use sccache::dist::{ ServerNonce, ServerOutgoing, SubmitToolchainResult, TcCache, Toolchain, ToolchainReader, UpdateJobStateResult, }; -use sccache::util::daemonize; -use sccache::util::BASE64_URL_SAFE_ENGINE; +use sccache::util::{daemonize, num_cpus, BASE64_URL_SAFE_ENGINE}; use serde::{Deserialize, Serialize}; use std::collections::{btree_map, BTreeMap, HashMap, HashSet}; use std::env; @@ -134,6 +133,15 @@ fn check_jwt_server_token( .ok() } +fn default_core_count_this_machine() -> usize { + let core_count = num_cpus(); + // Oversubscribe cores just a little to make up for network and I/O latency. This formula is + // not based on hard data but an extrapolation to high core counts of the conventional wisdom + // that slightly more jobs than cores achieve the shortest compile time. Which is originally + // about local compiles and this is over the network, so be slightly less conservative. + core_count + 1 + core_count / 8 +} + fn run(command: Command) -> Result { match command { Command::Auth(AuthSubcommand::Base64 { num_bytes }) => { @@ -229,6 +237,7 @@ fn run(command: Command) -> Result { scheduler_url, scheduler_auth, toolchain_cache_size, + core_count, }) => { let builder: Box = match builder { #[cfg(not(target_os = "freebsd"))] @@ -293,6 +302,7 @@ fn run(command: Command) -> Result { bind_address, scheduler_url.to_url(), scheduler_auth, + core_count.unwrap_or(default_core_count_this_machine()), server, ) .context("Failed to create sccache HTTP server instance")?; @@ -403,13 +413,8 @@ impl Default for Scheduler { } fn load_weight(job_count: usize, core_count: usize) -> f64 { - // Oversubscribe cores just a little to make up for network and I/O latency. This formula is - // not based on hard data but an extrapolation to high core counts of the conventional wisdom - // that slightly more jobs than cores achieve the shortest compile time. Which is originally - // about local compiles and this is over the network, so be slightly less conservative. - let cores_plus_slack = core_count + 1 + core_count / 8; // Note >=, not >, because the question is "can we add another job"? - if job_count >= cores_plus_slack { + if job_count >= core_count { MAX_PER_CORE_LOAD + 1f64 // no new jobs for now } else { job_count as f64 / core_count as f64 diff --git a/src/config.rs b/src/config.rs index 6e7d4eeb3..409e54600 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1196,6 +1196,7 @@ pub mod server { pub bind_address: Option, pub scheduler_url: HTTPUrl, pub scheduler_auth: SchedulerAuth, + pub core_count: Option, #[serde(default = "default_toolchain_cache_size")] pub toolchain_cache_size: u64, } @@ -1589,6 +1590,9 @@ fn server_toml_parse() { use server::BuilderType; use server::SchedulerAuth; const CONFIG_STR: &str = r#" + # The maximum number of cores to be used for build jobs. + # If unspecified, slightly higher than the number of CPU cores (including SMT "cores"). + core_count = 2097 # This is where client toolchains will be stored. cache_dir = "/tmp/toolchains" # The maximum size of the toolchain cache, in bytes. @@ -1641,6 +1645,7 @@ fn server_toml_parse() { token: "my server's token".to_owned() }, toolchain_cache_size: 10737418240, + core_count: Some(2097), } ) } diff --git a/src/dist/http.rs b/src/dist/http.rs index aac984992..2bcda05ee 100644 --- a/src/dist/http.rs +++ b/src/dist/http.rs @@ -873,6 +873,7 @@ mod server { bind_address: SocketAddr, scheduler_url: reqwest::Url, scheduler_auth: String, + core_count: usize, // HTTPS pieces all the builders will use for connection encryption cert_digest: Vec, cert_pem: Vec, @@ -890,6 +891,7 @@ mod server { bind_address: Option, scheduler_url: reqwest::Url, scheduler_auth: String, + core_count: usize, handler: S, ) -> Result { let (cert_digest, cert_pem, privkey_pem) = @@ -903,6 +905,7 @@ mod server { bind_address: bind_address.unwrap_or(public_addr), scheduler_url, scheduler_auth, + core_count, cert_digest, cert_pem, privkey_pem, @@ -914,7 +917,7 @@ mod server { pub fn start(self) -> Result { let heartbeat_req = HeartbeatServerHttpRequest { - num_cpus: num_cpus(), + num_cpus: self.core_count, jwt_key: self.jwt_key.clone(), server_nonce: self.server_nonce, cert_digest: self.cert_digest, diff --git a/tests/harness/mod.rs b/tests/harness/mod.rs index d148793f2..2265a3478 100644 --- a/tests/harness/mod.rs +++ b/tests/harness/mod.rs @@ -216,6 +216,7 @@ fn sccache_server_cfg( token: DIST_SERVER_TOKEN.to_owned(), }, toolchain_cache_size: TC_CACHE_SIZE, + core_count: None, } } @@ -430,6 +431,7 @@ impl DistSystem { Some(SocketAddr::from(([0, 0, 0, 0], server_addr.port()))), self.scheduler_url().to_url(), token, + 4, handler, ) .unwrap();