Skip to content

Commit c0f1c07

Browse files
committed
Add dev tooling for creating cluster arrangements from a config
1 parent 3894e28 commit c0f1c07

File tree

8 files changed

+487
-30
lines changed

8 files changed

+487
-30
lines changed

Cargo.lock

+146-30
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ indexmap = { version = "2.1.0", features = ["serde"] }
3636
itertools = { version = "0.10.5" }
3737
metrics = "0.22.0"
3838
metrics-exporter-prometheus = { version = "0.13.0", default-features = false, features = ["http-listener"] }
39+
nom = "7.0"
3940
once_cell = "1.17.1"
4041
opentelemetry = { version = "0.20.0", features = ["rt-tokio"] }
4142
opentelemetry-otlp = { version = "0.13.0" }
@@ -47,6 +48,7 @@ quinn-proto = "0.10.5"
4748
quinn-plaintext = { version = "0.2.0" }
4849
quoted-string = "0.6.1"
4950
rand = { version = "0.8.5", features = ["small_rng"] }
51+
random_graphs = "0.1"
5052
rangemap = { version = "1.4.0" }
5153
rcgen = { version = "0.11.1", features = ["x509-parser"] }
5254
rhai = { version = "1.15.1", features = ["sync"] }

crates/corro-devcluster/Cargo.toml

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
[package]
2+
name = "corro-devcluster"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
serde = { workspace = true, features = [ "derive" ] }
8+
serde_json = { workspace = true }
9+
thiserror = { workspace = true }
10+
rand = { workspace = true }
11+
random_graphs = { workspace = true }
12+
nom = { workspace = true }
13+
clap = { workspace = true }

crates/corro-devcluster/README.md

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# corro-devcluster
2+
3+
A tool to quickly start a corrosion cluster for development.
4+
5+
- Build corrosion with nix (keep in mind that changes to the
6+
`Cargo.lock` file need to be checked into git to be visible to a nix
7+
build)
8+
- Provide a topology config (`.txt` is fine, see
9+
[example_topologies](example_topologies/))
10+
- Provide a state directory. Node state and log output will be kept
11+
in separated directories.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
A -> B
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
A -> B
2+
B -> C
3+
C -> A

crates/corro-devcluster/src/main.rs

+259
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
mod topology;
2+
3+
use clap::{Parser, Subcommand};
4+
use serde::Deserialize;
5+
use std::{
6+
collections::{BTreeMap, HashMap},
7+
env,
8+
fs::File,
9+
io::{Read, Stdout, Write},
10+
path::{Path, PathBuf},
11+
process::{Command, Stdio},
12+
sync::mpsc::{channel, RecvError, Sender},
13+
thread,
14+
time::Duration,
15+
};
16+
17+
use crate::topology::Simple;
18+
19+
#[derive(Parser)]
20+
#[clap(version = env!("CARGO_PKG_VERSION"))]
21+
struct Cli {
22+
/// Set the state directory path. If not set the environment
23+
/// variable CORRO_DEVCLUSTER_STATE_DIR will be used
24+
#[clap(long = "statedir", short = 'd', global = true)]
25+
state_directory: Option<PathBuf>,
26+
27+
/// Set the state directory path. If not set the environment
28+
/// variable CORRO_DEVCLUSTER_SCHEMA_DIR will be used
29+
#[clap(long = "schemadir", short = 's', global = true)]
30+
schema_directory: Option<PathBuf>,
31+
32+
/// Provide the binary path for corrosion. If none is provided,
33+
/// corrosion will be built with nix (which may take a minute)
34+
#[clap(long = "binpath", short = 'b', global = true)]
35+
binary_path: Option<String>,
36+
37+
#[command(subcommand)]
38+
command: CliCommand,
39+
}
40+
41+
#[derive(Subcommand)]
42+
enum CliCommand {
43+
/// Create a simple topology in format `A -> B`, `B -> C`, etc
44+
Simple {
45+
/// Set the topology file path
46+
topology_path: PathBuf,
47+
},
48+
}
49+
50+
fn main() {
51+
let cli: Cli = Cli::parse();
52+
53+
let state_dir = match cli
54+
.state_directory
55+
.or(env::var("CORRO_DEVCLUSTER_STATE_DIR")
56+
.ok()
57+
.map(|path| PathBuf::new().join(path)))
58+
{
59+
Some(dir) => dir,
60+
None => {
61+
eprintln!("FAILED: either pass `--statedir` or set 'CORRO_DEVCLUSTER_STATE_DIR' environment variable!");
62+
std::process::exit(1);
63+
}
64+
};
65+
66+
let schema_dir = match cli
67+
.schema_directory
68+
.or(env::var("CORRO_DEVCLUSTER_SCHEMA_DIR")
69+
.ok()
70+
.map(|path| PathBuf::new().join(path)))
71+
{
72+
Some(dir) => dir,
73+
None => {
74+
eprintln!("FAILED: either pass `--statedir` or set 'CORRO_DEVCLUSTER_STATE_DIR' environment variable!");
75+
std::process::exit(1);
76+
}
77+
};
78+
79+
let bin_path = cli
80+
.binary_path
81+
.or_else(|| build_corrosion().map(|h| h.path))
82+
.expect("failed to determine corrosion binary location!");
83+
84+
match cli.command {
85+
CliCommand::Simple { topology_path } => {
86+
let mut topo_config = File::open(topology_path).expect("failed to open topology-file!");
87+
let mut topo_buffer = String::new();
88+
topo_config
89+
.read_to_string(&mut topo_buffer)
90+
.expect("failed to read topology-file!");
91+
92+
let mut topology = Simple::default();
93+
topo_buffer.lines().for_each(|line| {
94+
topology
95+
.parse_edge(line)
96+
.expect("Syntax error in topology-file!");
97+
});
98+
99+
run_simple_topology(topology, bin_path, state_dir, schema_dir);
100+
}
101+
}
102+
103+
// let handle = build_corrosion(env::args().next().map(|s| PathBuf::new().join(s)).unwrap());
104+
// println!("{:#?}", handle);
105+
}
106+
107+
fn run_simple_topology(topo: Simple, bin_path: String, state_dir: PathBuf, schema_dir: PathBuf) {
108+
println!("//// Creating topology: \n{:#?}", topo);
109+
let nodes = topo.get_all_nodes();
110+
111+
let mut port_map = BTreeMap::default();
112+
113+
// First go assign ports to all the nodes
114+
for node_name in &nodes {
115+
// Generate a port in range 1025 - 32768
116+
let node_port: u16 = 1025 + rand::random::<u16>() % (32 * 1024) - 1025;
117+
port_map.insert(node_name.clone(), node_port);
118+
}
119+
120+
// Then generate each config with the appropriate bootstrap_set
121+
for node_name in &nodes {
122+
let node_port = port_map.get(node_name).unwrap(); // We just put it there
123+
let node_state = state_dir.join(node_name);
124+
125+
// Delete / create the node state directory
126+
let _ = std::fs::remove_dir(&node_state);
127+
let _ = std::fs::create_dir_all(&node_state);
128+
129+
let mut bootstrap_set = vec![];
130+
for link in topo.inner.get(node_name).unwrap() {
131+
bootstrap_set.push(format!(
132+
"\"[::1]:{}\"", // only connect locally
133+
port_map.get(link).expect("Port for node not set!")
134+
));
135+
}
136+
137+
let node_config = generate_config(
138+
node_state.to_str().unwrap(),
139+
schema_dir.to_str().unwrap(),
140+
*node_port,
141+
bootstrap_set,
142+
);
143+
144+
println!(
145+
"Generated config for node '{}': \n{}",
146+
node_name, node_config
147+
);
148+
149+
let mut config_file = File::create(node_state.join("config.toml"))
150+
.expect("failed to create node config file");
151+
config_file
152+
.write_all(node_config.as_bytes())
153+
.expect("failed to write node config file");
154+
}
155+
156+
let (tx, rx) = channel::<()>();
157+
158+
// Spawn nodes those without bootstraps first if they exist.
159+
for (pure_responder, _) in topo.inner.iter().filter(|(_, vec)| vec.is_empty()) {
160+
run_corrosion(tx.clone(), bin_path.clone(), state_dir.join(pure_responder));
161+
thread::sleep(Duration::from_millis(250)); // give the start thread a bit of time to breathe
162+
}
163+
164+
for (initiator, _) in topo.inner.iter().filter(|(_, vec)| !vec.is_empty()) {
165+
run_corrosion(tx.clone(), bin_path.clone(), state_dir.join(initiator));
166+
thread::sleep(Duration::from_millis(250)); // give the start thread a bit of time to breathe
167+
}
168+
169+
// wait for the threads
170+
while let Ok(()) = rx.recv() {}
171+
Command::new("pkill")
172+
.arg("corrosion")
173+
.output()
174+
.expect("failed to gracefully kill corrosions. They've become sentient!!!");
175+
}
176+
177+
fn generate_config(
178+
state_dir: &str,
179+
schema_dir: &str,
180+
port: u16,
181+
bootstrap_set: Vec<String>,
182+
) -> String {
183+
let bootstrap = bootstrap_set.join(",");
184+
format!(
185+
r#"[db]
186+
path = "{state_dir}/corrosion.db"
187+
schema_paths = ["{schema_dir}"]
188+
189+
[gossip]
190+
addr = "[::]:{port}"
191+
external_addr = "[::1]:{port}"
192+
bootstrap = [{bootstrap}]
193+
plaintext = true
194+
195+
[api]
196+
addr = "127.0.0.1:{api_port}"
197+
198+
[admin]
199+
path = "{state_dir}/admin.sock"
200+
"#,
201+
state_dir = state_dir,
202+
schema_dir = schema_dir,
203+
port = port,
204+
// the chances of a collision here are very very small since
205+
// every port is random
206+
api_port = port + 1,
207+
bootstrap = bootstrap
208+
)
209+
}
210+
211+
#[derive(Debug)]
212+
struct BinHandle {
213+
path: String,
214+
}
215+
216+
fn nix_output(vec: &Vec<u8>) -> Vec<HashMap<String, serde_json::Value>> {
217+
serde_json::from_slice(vec).unwrap()
218+
}
219+
220+
fn run_corrosion(tx: Sender<()>, bin_path: String, state_path: PathBuf) {
221+
let node_log = File::create(state_path.join("node.log")).expect("couldn't create log file");
222+
let mut cmd = Command::new(bin_path);
223+
224+
cmd.args([
225+
"-c",
226+
state_path.join("config.toml").to_str().unwrap(),
227+
"agent",
228+
]);
229+
230+
cmd.stdout(node_log);
231+
let mut cmd_handle = cmd.spawn().expect("failed to spawn corrosion!");
232+
233+
thread::spawn(move || {
234+
println!("Waiting for node...");
235+
cmd_handle
236+
.wait()
237+
.expect("corrosion node has encountered an error!");
238+
tx.send(()).unwrap();
239+
println!("Node completed")
240+
});
241+
}
242+
243+
fn build_corrosion() -> Option<BinHandle> {
244+
println!("Running 'nix build' ...");
245+
let build_output = Command::new("nix")
246+
.args(["build", "--json"])
247+
.output()
248+
.ok()?;
249+
250+
let json = nix_output(&build_output.stdout).remove(0);
251+
252+
Some(BinHandle {
253+
path: json
254+
.get("outputs")?
255+
.get("out")?
256+
.to_string()
257+
.replace("\"", ""),
258+
})
259+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
use nom::{
2+
bytes::complete::tag,
3+
character::complete::{alpha1, multispace0},
4+
sequence::delimited,
5+
IResult,
6+
};
7+
use std::collections::{BTreeMap, BTreeSet};
8+
9+
/// Topology definition as a set of graph edges
10+
#[derive(Debug, Default)]
11+
pub struct Simple {
12+
pub(crate) inner: BTreeMap<String, Vec<String>>,
13+
}
14+
15+
impl Simple {
16+
/// Parse a topology line in the following format:
17+
///
18+
/// A -> B
19+
/// B -> C
20+
/// A -> C
21+
/// etc
22+
pub fn parse_edge<'top, 'input>(&mut self, input: &'input str) -> IResult<&'input str, ()> {
23+
let (input, first) = alpha1(input)?;
24+
let (input, _) = delimited(multispace0, tag("->"), multispace0)(input)?;
25+
let (input, second) = alpha1(input)?;
26+
27+
// Add first -> second edge
28+
self.inner
29+
.entry(first.to_string())
30+
.or_default()
31+
.push(second.to_string());
32+
// Add second to the map if it doesn't exist yet but don't
33+
// create the connection edge
34+
self.inner.entry(second.to_string()).or_default();
35+
36+
Ok((input, ()))
37+
}
38+
39+
pub fn get_all_nodes(&self) -> Vec<String> {
40+
self.inner
41+
.keys()
42+
.fold(BTreeSet::new(), |mut acc, key| {
43+
acc.insert(key.clone());
44+
self.inner.get(key).unwrap().iter().for_each(|value| {
45+
acc.insert(value.clone());
46+
});
47+
acc
48+
})
49+
.into_iter()
50+
.collect()
51+
}
52+
}

0 commit comments

Comments
 (0)