Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/Distributed.md
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,9 @@ type = "DANGEROUSLY_INSECURE"


```toml
# The maximum number of cores to be used for build jobs.
# If unspecified, slightly higher than the number of CPU cores (including SMT "cores").
#core_count = 16
# This is where client toolchains will be stored.
cache_dir = "/tmp/toolchains"
# The maximum size of the toolchain cache, in bytes.
Expand Down
3 changes: 3 additions & 0 deletions docs/DistributedFreeBSD.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ Then, a server.conf like the one below is created, making use of the `pot`
builder type (commented out options show defaults):

```toml
# The maximum number of cores to be used for build jobs.
# If unspecified, slightly higher than the number of CPU cores (including SMT "cores").
#core_count = 16
# This is where client toolchains will be stored.
cache_dir = "/tmp/toolchains"
# The maximum size of the toolchain cache, in bytes.
Expand Down
3 changes: 3 additions & 0 deletions docs/DistributedQuickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ The build server requires [bubblewrap](https://github.com/projectatomic/bubblewr

Create a server.conf file to configure authentication, storage locations, network addresses and the path to bubblewrap. A minimal example looks like:
```toml
# The maximum number of cores to be used for build jobs.
# If unspecified, slightly higher than the number of CPU cores (including SMT "cores").
#core_count = 16
# This is where client toolchains will be stored.
cache_dir = "/tmp/toolchains"
# The maximum size of the toolchain cache, in bytes.
Expand Down
21 changes: 13 additions & 8 deletions src/bin/sccache-dist/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ use sccache::dist::{
ServerNonce, ServerOutgoing, SubmitToolchainResult, TcCache, Toolchain, ToolchainReader,
UpdateJobStateResult,
};
use sccache::util::daemonize;
use sccache::util::BASE64_URL_SAFE_ENGINE;
use sccache::util::{daemonize, num_cpus, BASE64_URL_SAFE_ENGINE};
use serde::{Deserialize, Serialize};
use std::collections::{btree_map, BTreeMap, HashMap, HashSet};
use std::env;
Expand Down Expand Up @@ -134,6 +133,15 @@ fn check_jwt_server_token(
.ok()
}

fn default_core_count_this_machine() -> usize {
let core_count = num_cpus();
// Oversubscribe cores just a little to make up for network and I/O latency. This formula is
// not based on hard data but an extrapolation to high core counts of the conventional wisdom
// that slightly more jobs than cores achieve the shortest compile time. Which is originally
// about local compiles and this is over the network, so be slightly less conservative.
core_count + 1 + core_count / 8
}

fn run(command: Command) -> Result<i32> {
match command {
Command::Auth(AuthSubcommand::Base64 { num_bytes }) => {
Expand Down Expand Up @@ -229,6 +237,7 @@ fn run(command: Command) -> Result<i32> {
scheduler_url,
scheduler_auth,
toolchain_cache_size,
core_count,
}) => {
let builder: Box<dyn dist::BuilderIncoming> = match builder {
#[cfg(not(target_os = "freebsd"))]
Expand Down Expand Up @@ -293,6 +302,7 @@ fn run(command: Command) -> Result<i32> {
bind_address,
scheduler_url.to_url(),
scheduler_auth,
core_count.unwrap_or(default_core_count_this_machine()),
server,
)
.context("Failed to create sccache HTTP server instance")?;
Expand Down Expand Up @@ -403,13 +413,8 @@ impl Default for Scheduler {
}

fn load_weight(job_count: usize, core_count: usize) -> f64 {
// Oversubscribe cores just a little to make up for network and I/O latency. This formula is
// not based on hard data but an extrapolation to high core counts of the conventional wisdom
// that slightly more jobs than cores achieve the shortest compile time. Which is originally
// about local compiles and this is over the network, so be slightly less conservative.
let cores_plus_slack = core_count + 1 + core_count / 8;
// Note >=, not >, because the question is "can we add another job"?
if job_count >= cores_plus_slack {
if job_count >= core_count {
MAX_PER_CORE_LOAD + 1f64 // no new jobs for now
} else {
job_count as f64 / core_count as f64
Expand Down
5 changes: 5 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1196,6 +1196,7 @@ pub mod server {
pub bind_address: Option<SocketAddr>,
pub scheduler_url: HTTPUrl,
pub scheduler_auth: SchedulerAuth,
pub core_count: Option<usize>,
#[serde(default = "default_toolchain_cache_size")]
pub toolchain_cache_size: u64,
}
Expand Down Expand Up @@ -1589,6 +1590,9 @@ fn server_toml_parse() {
use server::BuilderType;
use server::SchedulerAuth;
const CONFIG_STR: &str = r#"
# The maximum number of cores to be used for build jobs.
# If unspecified, slightly higher than the number of CPU cores (including SMT "cores").
core_count = 2097
# This is where client toolchains will be stored.
cache_dir = "/tmp/toolchains"
# The maximum size of the toolchain cache, in bytes.
Expand Down Expand Up @@ -1641,6 +1645,7 @@ fn server_toml_parse() {
token: "my server's token".to_owned()
},
toolchain_cache_size: 10737418240,
core_count: Some(2097),
}
)
}
48 changes: 21 additions & 27 deletions src/dist/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,7 @@ mod server {
bind_address: SocketAddr,
scheduler_url: reqwest::Url,
scheduler_auth: String,
core_count: usize,
// HTTPS pieces all the builders will use for connection encryption
cert_digest: Vec<u8>,
cert_pem: Vec<u8>,
Expand All @@ -890,6 +891,7 @@ mod server {
bind_address: Option<SocketAddr>,
scheduler_url: reqwest::Url,
scheduler_auth: String,
core_count: usize,
handler: S,
) -> Result<Self> {
let (cert_digest, cert_pem, privkey_pem) =
Expand All @@ -903,6 +905,7 @@ mod server {
bind_address: bind_address.unwrap_or(public_addr),
scheduler_url,
scheduler_auth,
core_count,
cert_digest,
cert_pem,
privkey_pem,
Expand All @@ -913,30 +916,19 @@ mod server {
}

pub fn start(self) -> Result<Infallible> {
let Self {
bind_address,
scheduler_url,
scheduler_auth,
cert_digest,
cert_pem,
privkey_pem,
jwt_key,
server_nonce,
handler,
} = self;
let heartbeat_req = HeartbeatServerHttpRequest {
num_cpus: num_cpus(),
jwt_key: jwt_key.clone(),
server_nonce,
cert_digest,
cert_pem: cert_pem.clone(),
num_cpus: self.core_count,
jwt_key: self.jwt_key.clone(),
server_nonce: self.server_nonce,
cert_digest: self.cert_digest,
cert_pem: self.cert_pem.clone(),
};
let job_authorizer = JWTJobAuthorizer::new(jwt_key);
let heartbeat_url = urls::scheduler_heartbeat_server(&scheduler_url);
let job_authorizer = JWTJobAuthorizer::new(self.jwt_key);
let heartbeat_url = urls::scheduler_heartbeat_server(&self.scheduler_url);
let requester = ServerRequester {
client: new_reqwest_blocking_client(),
scheduler_url,
scheduler_auth: scheduler_auth.clone(),
scheduler_url: self.scheduler_url,
scheduler_auth: self.scheduler_auth.clone(),
};

// TODO: detect if this panics
Expand All @@ -947,7 +939,7 @@ mod server {
match bincode_req(
client
.post(heartbeat_url.clone())
.bearer_auth(scheduler_auth.clone())
.bearer_auth(self.scheduler_auth.clone())
.bincode(&heartbeat_req)
.expect("failed to serialize heartbeat"),
) {
Expand All @@ -967,10 +959,10 @@ mod server {
}
});

info!("Server listening for clients on {}", bind_address);
info!("Server listening for clients on {}", self.bind_address);
let request_count = atomic::AtomicUsize::new(0);

let server = rouille::Server::new_ssl(bind_address, move |request| {
let server = rouille::Server::new_ssl(self.bind_address, move |request| {
let req_id = request_count.fetch_add(1, atomic::Ordering::SeqCst);
trace!("Req {} ({}): {:?}", req_id, request.remote_addr(), request);
let response = (|| router!(request,
Expand All @@ -979,7 +971,7 @@ mod server {
let toolchain = try_or_400_log!(req_id, bincode_input(request));
trace!("Req {}: assign_job({}): {:?}", req_id, job_id, toolchain);

let res: AssignJobResult = try_or_500_log!(req_id, handler.handle_assign_job(job_id, toolchain));
let res: AssignJobResult = try_or_500_log!(req_id, self.handler.handle_assign_job(job_id, toolchain));
prepare_response(request, &res)
},
(POST) (/api/v1/distserver/submit_toolchain/{job_id: JobId}) => {
Expand All @@ -988,7 +980,8 @@ mod server {

let body = request.data().expect("body was already read in submit_toolchain");
let toolchain_rdr = ToolchainReader(Box::new(body));
let res: SubmitToolchainResult = try_or_500_log!(req_id, handler.handle_submit_toolchain(&requester, job_id, toolchain_rdr));
let res: SubmitToolchainResult = try_or_500_log!(req_id, self.handler.handle_submit_toolchain(
&requester, job_id, toolchain_rdr));
prepare_response(request, &res)
},
(POST) (/api/v1/distserver/run_job/{job_id: JobId}) => {
Expand All @@ -1007,7 +1000,8 @@ mod server {
let inputs_rdr = InputsReader(Box::new(ZlibReadDecoder::new(body)));
let outputs = outputs.into_iter().collect();

let res: RunJobResult = try_or_500_log!(req_id, handler.handle_run_job(&requester, job_id, command, outputs, inputs_rdr));
let res: RunJobResult = try_or_500_log!(req_id, self.handler.handle_run_job(&requester, job_id,
command, outputs, inputs_rdr));
prepare_response(request, &res)
},
_ => {
Expand All @@ -1017,7 +1011,7 @@ mod server {
))();
trace!("Res {}: {:?}", req_id, response);
response
}, cert_pem, privkey_pem).map_err(|e| anyhow!(format!("Failed to start http server for sccache server: {}", e)))?;
}, self.cert_pem, self.privkey_pem).map_err(|e| anyhow!(format!("Failed to start http server for sccache server: {}", e)))?;

// This limit is rouille's default for `start_server_with_pool`, which
// we would use, except that interface doesn't permit any sort of
Expand Down
2 changes: 2 additions & 0 deletions tests/harness/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ fn sccache_server_cfg(
token: DIST_SERVER_TOKEN.to_owned(),
},
toolchain_cache_size: TC_CACHE_SIZE,
core_count: None,
}
}

Expand Down Expand Up @@ -430,6 +431,7 @@ impl DistSystem {
Some(SocketAddr::from(([0, 0, 0, 0], server_addr.port()))),
self.scheduler_url().to_url(),
token,
4,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be great if the test could verify that, indeed, 4 core are being used

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess so, but that would add a lot more work than the actual change - and you are seeing some of the first Rust code that I've written here, so I really don't know the tradeoffs etc to make a good decision about how to pull it off.
I have some ideas about scheduling improvements, though absolutely no guarantees that I'll get to them. That would need a corresponding test harness (maybe with a kind of mock compiler that takes a configurable amount of CPU time and memory and writes some kind of tracing output) with one of the more trivial checks being that the max number of jobs is not exceeded.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean actually verify, not jusk asking the scheduler what it thinks its limit is, which seems a bit pointless because it "obviously works" (right?)

handler,
)
.unwrap();
Expand Down
Loading