From e328d422131790282e57564bd9e44bde3bbbcf66 Mon Sep 17 00:00:00 2001 From: Arthur Silva Date: Fri, 11 Nov 2022 15:18:29 +0100 Subject: [PATCH 1/4] Run python plot script with poetry --- .gitignore | 1 + crossbeam-channel/benchmarks/README.md | 3 +-- crossbeam-channel/benchmarks/pyproject.toml | 13 +++++++++++++ crossbeam-channel/benchmarks/run.sh | 9 ++++++++- 4 files changed, 23 insertions(+), 3 deletions(-) create mode 100644 crossbeam-channel/benchmarks/pyproject.toml diff --git a/.gitignore b/.gitignore index 6a56a4f7d..60e69f406 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ /crossbeam-channel/benchmarks/*.png target/ Cargo.lock +poetry.lock diff --git a/crossbeam-channel/benchmarks/README.md b/crossbeam-channel/benchmarks/README.md index c5aa3439f..29a2d1cd4 100644 --- a/crossbeam-channel/benchmarks/README.md +++ b/crossbeam-channel/benchmarks/README.md @@ -27,8 +27,7 @@ Dependencies: - Rust (nightly) - Go - Bash -- Python 2 -- Matplotlib +- Poetry (https://python-poetry.org/) ### Results diff --git a/crossbeam-channel/benchmarks/pyproject.toml b/crossbeam-channel/benchmarks/pyproject.toml new file mode 100644 index 000000000..b4b63ed08 --- /dev/null +++ b/crossbeam-channel/benchmarks/pyproject.toml @@ -0,0 +1,13 @@ +[tool.poetry] +name = "benchmarks" +version = "0.1.0" +description = "" +authors = [] + +[tool.poetry.dependencies] +python = "^3.8" +matplotlib = "3.6" + +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" diff --git a/crossbeam-channel/benchmarks/run.sh b/crossbeam-channel/benchmarks/run.sh index 099b0e4ef..b525e5d6c 100755 --- a/crossbeam-channel/benchmarks/run.sh +++ b/crossbeam-channel/benchmarks/run.sh @@ -9,4 +9,11 @@ cargo run --release --bin futures-channel | tee futures-channel.txt cargo run --release --bin mpsc | tee mpsc.txt go run go.go | tee go.txt -./plot.py ./*.txt +if ! command -v poetry +then + echo "poetry (https://python-poetry.org) is required, exiting..." + exit 1 +fi + +poetry install +poetry run python plot.py ./*.txt From a85b1ca4f10bef969934f245f66d78b87c7a1fd3 Mon Sep 17 00:00:00 2001 From: Arthur Silva Date: Fri, 11 Nov 2022 15:21:48 +0100 Subject: [PATCH 2/4] Improvements to crossbeam channel /benches benchmark --- crossbeam-channel/benches/crossbeam.rs | 359 +++++++++++++++---------- 1 file changed, 212 insertions(+), 147 deletions(-) diff --git a/crossbeam-channel/benches/crossbeam.rs b/crossbeam-channel/benches/crossbeam.rs index 1c0522294..cfcfdbb3f 100644 --- a/crossbeam-channel/benches/crossbeam.rs +++ b/crossbeam-channel/benches/crossbeam.rs @@ -6,39 +6,87 @@ use crossbeam_channel::{bounded, unbounded}; use crossbeam_utils::thread::scope; use test::Bencher; -const TOTAL_STEPS: usize = 40_000; +#[derive(Default)] +struct Msg([u64; 4]); + +struct Config { + steps: usize, + threads: usize, + spin: usize, +} + +impl Config { + fn from_env() -> Self { + let steps = std::env::var("STEPS") + .unwrap_or("40000".to_string()) + .parse() + .unwrap(); + let threads = std::env::var("THREADS") + .unwrap_or(std::thread::available_parallelism().unwrap().to_string()) + .parse() + .unwrap(); + let spin = std::env::var("SPIN") + .unwrap_or("0".to_string()) + .parse() + .unwrap(); + Self { + steps, + threads, + spin, + } + } + + #[inline(always)] + fn consume_msg(&self, msg: Msg) { + test::black_box(msg); + for i in 0..self.spin { + test::black_box(i); + } + } + + #[inline(always)] + fn produce_msg(&self) -> Msg { + for i in 0..self.spin { + test::black_box(i); + } + Msg::default() + } +} mod unbounded { use super::*; #[bench] fn create(b: &mut Bencher) { - b.iter(unbounded::); + b.iter(unbounded::); } #[bench] fn oneshot(b: &mut Bencher) { + let config = Config::from_env(); b.iter(|| { - let (s, r) = unbounded::(); - s.send(0).unwrap(); - r.recv().unwrap(); + let (s, r) = unbounded::(); + s.send(config.produce_msg()).unwrap(); + r.recv().map(|m| config.consume_msg(m)).unwrap(); }); } #[bench] fn inout(b: &mut Bencher) { - let (s, r) = unbounded::(); + let config = Config::from_env(); + let (s, r) = unbounded::(); b.iter(|| { - s.send(0).unwrap(); - r.recv().unwrap(); + s.send(config.produce_msg()).unwrap(); + r.recv().map(|m| config.consume_msg(m)).unwrap(); }); } #[bench] fn par_inout(b: &mut Bencher) { - let threads = num_cpus::get(); - let steps = TOTAL_STEPS / threads; - let (s, r) = unbounded::(); + let config = Config::from_env(); + let threads = config.threads; + let steps = config.steps / config.threads; + let (s, r) = unbounded::(); let (s1, r1) = bounded(0); let (s2, r2) = bounded(0); @@ -46,9 +94,9 @@ mod unbounded { for _ in 0..threads { scope.spawn(|_| { while r1.recv().is_ok() { - for i in 0..steps { - s.send(i as i32).unwrap(); - r.recv().unwrap(); + for _ in 0..steps { + s.send(config.produce_msg()).unwrap(); + r.recv().map(|m| config.consume_msg(m)).unwrap(); } s2.send(()).unwrap(); } @@ -57,7 +105,7 @@ mod unbounded { b.iter(|| { for _ in 0..threads { - s1.send(()).unwrap(); + s1.send(config.produce_msg()).unwrap(); } for _ in 0..threads { r2.recv().unwrap(); @@ -70,16 +118,17 @@ mod unbounded { #[bench] fn spsc(b: &mut Bencher) { - let steps = TOTAL_STEPS; - let (s, r) = unbounded::(); + let config = Config::from_env(); + let steps = config.steps; + let (s, r) = unbounded::(); let (s1, r1) = bounded(0); let (s2, r2) = bounded(0); scope(|scope| { scope.spawn(|_| { while r1.recv().is_ok() { - for i in 0..steps { - s.send(i as i32).unwrap(); + for _ in 0..steps { + s.send(config.produce_msg()).unwrap(); } s2.send(()).unwrap(); } @@ -88,7 +137,7 @@ mod unbounded { b.iter(|| { s1.send(()).unwrap(); for _ in 0..steps { - r.recv().unwrap(); + r.recv().map(|m| config.consume_msg(m)).unwrap(); } r2.recv().unwrap(); }); @@ -99,18 +148,19 @@ mod unbounded { #[bench] fn spmc(b: &mut Bencher) { - let threads = num_cpus::get() - 1; - let steps = TOTAL_STEPS / threads; - let (s, r) = unbounded::(); + let config = Config::from_env(); + let consum_threads = config.threads - 1; + let steps = config.steps / consum_threads; + let (s, r) = unbounded::(); let (s1, r1) = bounded(0); let (s2, r2) = bounded(0); scope(|scope| { - for _ in 0..threads { + for _ in 0..consum_threads { scope.spawn(|_| { while r1.recv().is_ok() { for _ in 0..steps { - r.recv().unwrap(); + r.recv().map(|m| config.consume_msg(m)).unwrap(); } s2.send(()).unwrap(); } @@ -118,13 +168,13 @@ mod unbounded { } b.iter(|| { - for _ in 0..threads { + for _ in 0..consum_threads { s1.send(()).unwrap(); } - for i in 0..steps * threads { - s.send(i as i32).unwrap(); + for _ in 0..steps * consum_threads { + s.send(config.produce_msg()).unwrap(); } - for _ in 0..threads { + for _ in 0..consum_threads { r2.recv().unwrap(); } }); @@ -135,18 +185,19 @@ mod unbounded { #[bench] fn mpsc(b: &mut Bencher) { - let threads = num_cpus::get() - 1; - let steps = TOTAL_STEPS / threads; - let (s, r) = unbounded::(); + let config = Config::from_env(); + let prod_threads = config.threads - 1; + let steps = config.steps / prod_threads; + let (s, r) = unbounded::(); let (s1, r1) = bounded(0); let (s2, r2) = bounded(0); scope(|scope| { - for _ in 0..threads { + for _ in 0..prod_threads { scope.spawn(|_| { while r1.recv().is_ok() { - for i in 0..steps { - s.send(i as i32).unwrap(); + for _ in 0..steps { + s.send(config.produce_msg()).unwrap(); } s2.send(()).unwrap(); } @@ -154,13 +205,13 @@ mod unbounded { } b.iter(|| { - for _ in 0..threads { + for _ in 0..prod_threads { s1.send(()).unwrap(); } - for _ in 0..steps * threads { - r.recv().unwrap(); + for _ in 0..steps * prod_threads { + r.recv().map(|m| config.consume_msg(m)).unwrap(); } - for _ in 0..threads { + for _ in 0..prod_threads { r2.recv().unwrap(); } }); @@ -171,9 +222,10 @@ mod unbounded { #[bench] fn mpmc(b: &mut Bencher) { - let threads = num_cpus::get(); - let steps = TOTAL_STEPS / threads; - let (s, r) = unbounded::(); + let config = Config::from_env(); + let threads = config.threads; + let steps = config.steps / config.threads; + let (s, r) = unbounded::(); let (s1, r1) = bounded(0); let (s2, r2) = bounded(0); @@ -181,8 +233,8 @@ mod unbounded { for _ in 0..threads / 2 { scope.spawn(|_| { while r1.recv().is_ok() { - for i in 0..steps { - s.send(i as i32).unwrap(); + for _ in 0..steps { + s.send(config.produce_msg()).unwrap(); } s2.send(()).unwrap(); } @@ -192,7 +244,7 @@ mod unbounded { scope.spawn(|_| { while r1.recv().is_ok() { for _ in 0..steps { - r.recv().unwrap(); + r.recv().map(|m| config.consume_msg(m)).unwrap(); } s2.send(()).unwrap(); } @@ -218,16 +270,17 @@ mod bounded_n { #[bench] fn spsc(b: &mut Bencher) { - let steps = TOTAL_STEPS; - let (s, r) = bounded::(steps); + let config = Config::from_env(); + let steps = config.steps; + let (s, r) = bounded::(steps); let (s1, r1) = bounded(0); let (s2, r2) = bounded(0); scope(|scope| { scope.spawn(|_| { while r1.recv().is_ok() { - for i in 0..steps { - s.send(i as i32).unwrap(); + for _ in 0..steps { + s.send(config.produce_msg()).unwrap(); } s2.send(()).unwrap(); } @@ -236,7 +289,7 @@ mod bounded_n { b.iter(|| { s1.send(()).unwrap(); for _ in 0..steps { - r.recv().unwrap(); + r.recv().map(|m| config.consume_msg(m)).unwrap(); } r2.recv().unwrap(); }); @@ -247,18 +300,19 @@ mod bounded_n { #[bench] fn spmc(b: &mut Bencher) { - let threads = num_cpus::get() - 1; - let steps = TOTAL_STEPS / threads; - let (s, r) = bounded::(steps * threads); + let config = Config::from_env(); + let consum_threads = config.threads - 1; + let steps = config.steps / consum_threads; + let (s, r) = bounded::(steps * consum_threads); let (s1, r1) = bounded(0); let (s2, r2) = bounded(0); scope(|scope| { - for _ in 0..threads { + for _ in 0..consum_threads { scope.spawn(|_| { while r1.recv().is_ok() { for _ in 0..steps { - r.recv().unwrap(); + r.recv().map(|m| config.consume_msg(m)).unwrap(); } s2.send(()).unwrap(); } @@ -266,13 +320,13 @@ mod bounded_n { } b.iter(|| { - for _ in 0..threads { + for _ in 0..consum_threads { s1.send(()).unwrap(); } - for i in 0..steps * threads { - s.send(i as i32).unwrap(); + for _ in 0..steps * consum_threads { + s.send(config.produce_msg()).unwrap(); } - for _ in 0..threads { + for _ in 0..consum_threads { r2.recv().unwrap(); } }); @@ -283,18 +337,19 @@ mod bounded_n { #[bench] fn mpsc(b: &mut Bencher) { - let threads = num_cpus::get() - 1; - let steps = TOTAL_STEPS / threads; - let (s, r) = bounded::(steps * threads); + let config = Config::from_env(); + let prod_threads = config.threads - 1; + let steps = config.steps / prod_threads; + let (s, r) = bounded::(steps * prod_threads); let (s1, r1) = bounded(0); let (s2, r2) = bounded(0); scope(|scope| { - for _ in 0..threads { + for _ in 0..prod_threads { scope.spawn(|_| { while r1.recv().is_ok() { - for i in 0..steps { - s.send(i as i32).unwrap(); + for _ in 0..steps { + s.send(config.produce_msg()).unwrap(); } s2.send(()).unwrap(); } @@ -302,13 +357,13 @@ mod bounded_n { } b.iter(|| { - for _ in 0..threads { + for _ in 0..prod_threads { s1.send(()).unwrap(); } - for _ in 0..steps * threads { - r.recv().unwrap(); + for _ in 0..steps * prod_threads { + r.recv().map(|m| config.consume_msg(m)).unwrap(); } - for _ in 0..threads { + for _ in 0..prod_threads { r2.recv().unwrap(); } }); @@ -319,9 +374,10 @@ mod bounded_n { #[bench] fn par_inout(b: &mut Bencher) { - let threads = num_cpus::get(); - let steps = TOTAL_STEPS / threads; - let (s, r) = bounded::(threads); + let config = Config::from_env(); + let threads = config.threads; + let steps = config.steps / config.threads; + let (s, r) = bounded::(threads); let (s1, r1) = bounded(0); let (s2, r2) = bounded(0); @@ -329,9 +385,9 @@ mod bounded_n { for _ in 0..threads { scope.spawn(|_| { while r1.recv().is_ok() { - for i in 0..steps { - s.send(i as i32).unwrap(); - r.recv().unwrap(); + for _ in 0..steps { + s.send(config.produce_msg()).unwrap(); + r.recv().map(|m| config.consume_msg(m)).unwrap(); } s2.send(()).unwrap(); } @@ -353,10 +409,10 @@ mod bounded_n { #[bench] fn mpmc(b: &mut Bencher) { - let threads = num_cpus::get(); - assert_eq!(threads % 2, 0); - let steps = TOTAL_STEPS / threads; - let (s, r) = bounded::(steps * threads); + let config = Config::from_env(); + let threads = config.threads; + let steps = config.steps / config.threads; + let (s, r) = bounded::(steps * threads); let (s1, r1) = bounded(0); let (s2, r2) = bounded(0); @@ -364,8 +420,8 @@ mod bounded_n { for _ in 0..threads / 2 { scope.spawn(|_| { while r1.recv().is_ok() { - for i in 0..steps { - s.send(i as i32).unwrap(); + for _ in 0..steps { + s.send(config.produce_msg()).unwrap(); } s2.send(()).unwrap(); } @@ -375,7 +431,7 @@ mod bounded_n { scope.spawn(|_| { while r1.recv().is_ok() { for _ in 0..steps { - r.recv().unwrap(); + r.recv().map(|m| config.consume_msg(m)).unwrap(); } s2.send(()).unwrap(); } @@ -401,30 +457,32 @@ mod bounded_1 { #[bench] fn create(b: &mut Bencher) { - b.iter(|| bounded::(1)); + b.iter(|| bounded::(1)); } #[bench] fn oneshot(b: &mut Bencher) { + let config = Config::from_env(); b.iter(|| { - let (s, r) = bounded::(1); - s.send(0).unwrap(); - r.recv().unwrap(); + let (s, r) = bounded::(1); + s.send(config.produce_msg()).unwrap(); + r.recv().map(|m| config.consume_msg(m)).unwrap(); }); } #[bench] fn spsc(b: &mut Bencher) { - let steps = TOTAL_STEPS; - let (s, r) = bounded::(1); + let config = Config::from_env(); + let steps = config.steps; + let (s, r) = bounded::(1); let (s1, r1) = bounded(0); let (s2, r2) = bounded(0); scope(|scope| { scope.spawn(|_| { while r1.recv().is_ok() { - for i in 0..steps { - s.send(i as i32).unwrap(); + for _ in 0..steps { + s.send(config.produce_msg()).unwrap(); } s2.send(()).unwrap(); } @@ -433,7 +491,7 @@ mod bounded_1 { b.iter(|| { s1.send(()).unwrap(); for _ in 0..steps { - r.recv().unwrap(); + r.recv().map(|m| config.consume_msg(m)).unwrap(); } r2.recv().unwrap(); }); @@ -444,18 +502,19 @@ mod bounded_1 { #[bench] fn spmc(b: &mut Bencher) { - let threads = num_cpus::get() - 1; - let steps = TOTAL_STEPS / threads; - let (s, r) = bounded::(1); + let config = Config::from_env(); + let consum_threads = config.threads - 1; + let steps = config.steps / consum_threads; + let (s, r) = bounded::(1); let (s1, r1) = bounded(0); let (s2, r2) = bounded(0); scope(|scope| { - for _ in 0..threads { + for _ in 0..consum_threads { scope.spawn(|_| { while r1.recv().is_ok() { for _ in 0..steps { - r.recv().unwrap(); + r.recv().map(|m| config.consume_msg(m)).unwrap(); } s2.send(()).unwrap(); } @@ -463,13 +522,13 @@ mod bounded_1 { } b.iter(|| { - for _ in 0..threads { + for _ in 0..consum_threads { s1.send(()).unwrap(); } - for i in 0..steps * threads { - s.send(i as i32).unwrap(); + for _ in 0..steps * consum_threads { + s.send(config.produce_msg()).unwrap(); } - for _ in 0..threads { + for _ in 0..consum_threads { r2.recv().unwrap(); } }); @@ -480,18 +539,19 @@ mod bounded_1 { #[bench] fn mpsc(b: &mut Bencher) { - let threads = num_cpus::get() - 1; - let steps = TOTAL_STEPS / threads; - let (s, r) = bounded::(1); + let config = Config::from_env(); + let prod_threads = config.threads - 1; + let steps = config.steps / prod_threads; + let (s, r) = bounded::(1); let (s1, r1) = bounded(0); let (s2, r2) = bounded(0); scope(|scope| { - for _ in 0..threads { + for _ in 0..prod_threads { scope.spawn(|_| { while r1.recv().is_ok() { - for i in 0..steps { - s.send(i as i32).unwrap(); + for _ in 0..steps { + s.send(config.produce_msg()).unwrap(); } s2.send(()).unwrap(); } @@ -499,13 +559,13 @@ mod bounded_1 { } b.iter(|| { - for _ in 0..threads { + for _ in 0..prod_threads { s1.send(()).unwrap(); } - for _ in 0..steps * threads { - r.recv().unwrap(); + for _ in 0..steps * prod_threads { + r.recv().map(|m| config.consume_msg(m)).unwrap(); } - for _ in 0..threads { + for _ in 0..prod_threads { r2.recv().unwrap(); } }); @@ -516,9 +576,10 @@ mod bounded_1 { #[bench] fn mpmc(b: &mut Bencher) { - let threads = num_cpus::get(); - let steps = TOTAL_STEPS / threads; - let (s, r) = bounded::(1); + let config = Config::from_env(); + let threads = config.threads; + let steps = config.steps / config.threads; + let (s, r) = bounded::(1); let (s1, r1) = bounded(0); let (s2, r2) = bounded(0); @@ -526,8 +587,8 @@ mod bounded_1 { for _ in 0..threads / 2 { scope.spawn(|_| { while r1.recv().is_ok() { - for i in 0..steps { - s.send(i as i32).unwrap(); + for _ in 0..steps { + s.send(config.produce_msg()).unwrap(); } s2.send(()).unwrap(); } @@ -537,7 +598,7 @@ mod bounded_1 { scope.spawn(|_| { while r1.recv().is_ok() { for _ in 0..steps { - r.recv().unwrap(); + r.recv().map(|m| config.consume_msg(m)).unwrap(); } s2.send(()).unwrap(); } @@ -563,21 +624,22 @@ mod bounded_0 { #[bench] fn create(b: &mut Bencher) { - b.iter(|| bounded::(0)); + b.iter(|| bounded::(0)); } #[bench] fn spsc(b: &mut Bencher) { - let steps = TOTAL_STEPS; - let (s, r) = bounded::(0); + let config = Config::from_env(); + let steps = config.steps; + let (s, r) = bounded::(0); let (s1, r1) = bounded(0); let (s2, r2) = bounded(0); scope(|scope| { scope.spawn(|_| { while r1.recv().is_ok() { - for i in 0..steps { - s.send(i as i32).unwrap(); + for _ in 0..steps { + s.send(config.produce_msg()).unwrap(); } s2.send(()).unwrap(); } @@ -586,7 +648,7 @@ mod bounded_0 { b.iter(|| { s1.send(()).unwrap(); for _ in 0..steps { - r.recv().unwrap(); + r.recv().map(|m| config.consume_msg(m)).unwrap(); } r2.recv().unwrap(); }); @@ -597,18 +659,19 @@ mod bounded_0 { #[bench] fn spmc(b: &mut Bencher) { - let threads = num_cpus::get() - 1; - let steps = TOTAL_STEPS / threads; - let (s, r) = bounded::(0); + let config = Config::from_env(); + let consum_threads = config.threads - 1; + let steps = config.steps / consum_threads; + let (s, r) = bounded::(0); let (s1, r1) = bounded(0); let (s2, r2) = bounded(0); scope(|scope| { - for _ in 0..threads { + for _ in 0..consum_threads { scope.spawn(|_| { while r1.recv().is_ok() { for _ in 0..steps { - r.recv().unwrap(); + r.recv().map(|m| config.consume_msg(m)).unwrap(); } s2.send(()).unwrap(); } @@ -616,13 +679,13 @@ mod bounded_0 { } b.iter(|| { - for _ in 0..threads { + for _ in 0..consum_threads { s1.send(()).unwrap(); } - for i in 0..steps * threads { - s.send(i as i32).unwrap(); + for _ in 0..steps * consum_threads { + s.send(config.produce_msg()).unwrap(); } - for _ in 0..threads { + for _ in 0..consum_threads { r2.recv().unwrap(); } }); @@ -633,18 +696,19 @@ mod bounded_0 { #[bench] fn mpsc(b: &mut Bencher) { - let threads = num_cpus::get() - 1; - let steps = TOTAL_STEPS / threads; - let (s, r) = bounded::(0); + let config = Config::from_env(); + let prod_threads = config.threads - 1; + let steps = config.steps / prod_threads; + let (s, r) = bounded::(0); let (s1, r1) = bounded(0); let (s2, r2) = bounded(0); scope(|scope| { - for _ in 0..threads { + for _ in 0..prod_threads { scope.spawn(|_| { while r1.recv().is_ok() { - for i in 0..steps { - s.send(i as i32).unwrap(); + for _ in 0..steps { + s.send(config.produce_msg()).unwrap(); } s2.send(()).unwrap(); } @@ -652,13 +716,13 @@ mod bounded_0 { } b.iter(|| { - for _ in 0..threads { + for _ in 0..prod_threads { s1.send(()).unwrap(); } - for _ in 0..steps * threads { - r.recv().unwrap(); + for _ in 0..steps * prod_threads { + r.recv().map(|m| config.consume_msg(m)).unwrap(); } - for _ in 0..threads { + for _ in 0..prod_threads { r2.recv().unwrap(); } }); @@ -669,9 +733,10 @@ mod bounded_0 { #[bench] fn mpmc(b: &mut Bencher) { - let threads = num_cpus::get(); - let steps = TOTAL_STEPS / threads; - let (s, r) = bounded::(0); + let config = Config::from_env(); + let threads = config.threads; + let steps = config.steps / config.threads; + let (s, r) = bounded::(0); let (s1, r1) = bounded(0); let (s2, r2) = bounded(0); @@ -679,8 +744,8 @@ mod bounded_0 { for _ in 0..threads / 2 { scope.spawn(|_| { while r1.recv().is_ok() { - for i in 0..steps { - s.send(i as i32).unwrap(); + for _ in 0..steps { + s.send(config.produce_msg()).unwrap(); } s2.send(()).unwrap(); } @@ -690,7 +755,7 @@ mod bounded_0 { scope.spawn(|_| { while r1.recv().is_ok() { for _ in 0..steps { - r.recv().unwrap(); + r.recv().map(|m| config.consume_msg(m)).unwrap(); } s2.send(()).unwrap(); } From 7b02169a09067790b4ea77a5e784daf5ebf4e56b Mon Sep 17 00:00:00 2001 From: Arthur Silva Date: Sun, 13 Nov 2022 11:01:54 +0100 Subject: [PATCH 3/4] Add comments to backoff.rs --- crossbeam-utils/src/backoff.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crossbeam-utils/src/backoff.rs b/crossbeam-utils/src/backoff.rs index 9e256aaf2..6f47fee92 100644 --- a/crossbeam-utils/src/backoff.rs +++ b/crossbeam-utils/src/backoff.rs @@ -2,7 +2,9 @@ use crate::primitive::sync::atomic; use core::cell::Cell; use core::fmt; +/// Backoff will call yield_now between steps 0..=SPIN_LIMIT const SPIN_LIMIT: u32 = 6; +/// Backoff will call yield_now between steps SPIN_LIMIT+1..=YIELD_LIMIT const YIELD_LIMIT: u32 = 10; /// Performs exponential backoff in spin loops. From d9b88fcdb2a8447efd84bd4db2ea696294b9469f Mon Sep 17 00:00:00 2001 From: Arthur Silva Date: Mon, 14 Nov 2022 11:10:29 +0100 Subject: [PATCH 4/4] make linter happy --- crossbeam-channel/benchmarks/Cargo.toml | 1 + crossbeam-utils/src/sync/wait_group.rs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/crossbeam-channel/benchmarks/Cargo.toml b/crossbeam-channel/benchmarks/Cargo.toml index d42c645ce..172797449 100644 --- a/crossbeam-channel/benchmarks/Cargo.toml +++ b/crossbeam-channel/benchmarks/Cargo.toml @@ -9,6 +9,7 @@ atomicring = "1.1.1" bus = "2.0.1" chan = "0.1.23" crossbeam = { path = "../.." } +# crossbeam-channel = "*" crossbeam-channel = { path = ".." } crossbeam-deque = { path = "../../crossbeam-deque" } flume = "0.10" diff --git a/crossbeam-utils/src/sync/wait_group.rs b/crossbeam-utils/src/sync/wait_group.rs index 19d607415..37e74bb06 100644 --- a/crossbeam-utils/src/sync/wait_group.rs +++ b/crossbeam-utils/src/sync/wait_group.rs @@ -139,7 +139,7 @@ impl Clone for WaitGroup { impl fmt::Debug for WaitGroup { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let count: &usize = &*self.inner.count.lock().unwrap(); + let count: &usize = &self.inner.count.lock().unwrap(); f.debug_struct("WaitGroup").field("count", count).finish() } }