diff --git a/.gitignore b/.gitignore index 17860e72..b1b3e3fe 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ /schema /config.toml /data -/book \ No newline at end of file +/book +.direnv/** diff --git a/Cargo.lock b/Cargo.lock index e6c140da..f81f1f9f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -847,6 +847,18 @@ dependencies = [ "uuid", ] +[[package]] +name = "corro-devcluster" +version = "0.1.0" +dependencies = [ + "clap", + "nom", + "rand", + "serde", + "serde_json", + "thiserror", +] + [[package]] name = "corro-pg" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 0502be00..d5ca8418 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ indexmap = { version = "2.1.0", features = ["serde"] } itertools = { version = "0.10.5" } metrics = "0.22.0" metrics-exporter-prometheus = { version = "0.13.0", default-features = false, features = ["http-listener"] } +nom = "7.0" once_cell = "1.17.1" opentelemetry = { version = "0.20.0", features = ["rt-tokio"] } opentelemetry-otlp = { version = "0.13.0" } diff --git a/crates/corro-agent/src/agent/handlers.rs b/crates/corro-agent/src/agent/handlers.rs index a4ef7605..89373176 100644 --- a/crates/corro-agent/src/agent/handlers.rs +++ b/crates/corro-agent/src/agent/handlers.rs @@ -10,8 +10,9 @@ use std::{ }; use crate::{ - agent::{bi, bootstrap, uni, util, SyncClientError, ANNOUNCE_INTERVAL}, + agent::{bi, bootstrap, uni, SyncClientError, ANNOUNCE_INTERVAL}, api::peer::parallel_sync, + change, transport::Transport, }; use corro_types::{ @@ -107,7 +108,7 @@ pub fn spawn_incoming_connection_handlers( // Spawn handler tasks for this connection spawn_foca_handler(&agent, &tripwire, &conn); - uni::spawn_unipayload_handler(&tripwire, &conn, agent.clone()); + uni::spawn_unipayload_handler(&agent, &bookie, &tripwire, &conn); bi::spawn_bipayload_handler(&agent, &bookie, &tripwire, &conn); }); } @@ -414,7 +415,7 @@ pub async fn handle_changes( } debug!(count = %tmp_count, "spawning processing multiple changes from beginning of loop"); - join_set.spawn(util::process_multiple_changes( + join_set.spawn(change::process_multiple_changes( agent.clone(), bookie.clone(), std::mem::take(&mut buf), @@ -529,7 +530,7 @@ pub async fn handle_changes( if count < MIN_CHANGES_CHUNK && !queue.is_empty() && join_set.len() < MAX_CONCURRENT { // we can process this right away debug!(%count, "spawning processing multiple changes from max wait interval"); - join_set.spawn(util::process_multiple_changes( + join_set.spawn(change::process_multiple_changes( agent.clone(), bookie.clone(), queue.drain(..).collect(), @@ -563,7 +564,7 @@ pub async fn handle_changes( queue.push_back((change, src, Instant::now())); if count >= MIN_CHANGES_CHUNK { // drain and process current changes! - if let Err(e) = util::process_multiple_changes( + if let Err(e) = change::process_multiple_changes( agent.clone(), bookie.clone(), queue.drain(..).collect(), @@ -579,7 +580,8 @@ pub async fn handle_changes( } // process the last changes we got! - if let Err(e) = util::process_multiple_changes(agent, bookie, queue.into_iter().collect()).await + if let Err(e) = + change::process_multiple_changes(agent, bookie, queue.into_iter().collect()).await { error!("could not process multiple changes: {e}"); } diff --git a/crates/corro-agent/src/agent/mod.rs b/crates/corro-agent/src/agent/mod.rs index 9e54cadd..d06bb1b5 100644 --- a/crates/corro-agent/src/agent/mod.rs +++ b/crates/corro-agent/src/agent/mod.rs @@ -12,7 +12,7 @@ mod metrics; mod run_root; mod setup; mod uni; -mod util; +pub(crate) mod util; #[cfg(test)] mod tests; @@ -27,7 +27,6 @@ use uuid::Uuid; pub use error::{SyncClientError, SyncRecvError}; pub use run_root::start_with_config; pub use setup::{setup, AgentOptions}; -pub use util::process_multiple_changes; pub const ANNOUNCE_INTERVAL: Duration = Duration::from_secs(300); pub const COMPACT_BOOKED_INTERVAL: Duration = Duration::from_secs(300); diff --git a/crates/corro-agent/src/agent/uni.rs b/crates/corro-agent/src/agent/uni.rs index 6e9ec874..5044986f 100644 --- a/crates/corro-agent/src/agent/uni.rs +++ b/crates/corro-agent/src/agent/uni.rs @@ -1,5 +1,5 @@ use corro_types::{ - agent::Agent, + agent::{Agent, Bookie}, broadcast::{BroadcastV1, ChangeSource, UniPayload, UniPayloadV1}, }; use metrics::counter; @@ -11,12 +11,22 @@ use tripwire::Tripwire; /// Spawn a task that accepts unidirectional broadcast streams, then /// spawns another task for each incoming stream to handle. -pub fn spawn_unipayload_handler(tripwire: &Tripwire, conn: &quinn::Connection, agent: Agent) { +pub fn spawn_unipayload_handler( + agent: &Agent, + bookie: &Bookie, + tripwire: &Tripwire, + conn: &quinn::Connection, +) { + let agent = agent.clone(); + let bookie = bookie.clone(); tokio::spawn({ let conn = conn.clone(); let mut tripwire = tripwire.clone(); + async move { loop { + let agent = agent.clone(); + let bookie = bookie.clone(); let rx = tokio::select! { rx_res = conn.accept_uni() => match rx_res { Ok(rx) => rx, @@ -44,6 +54,9 @@ pub fn spawn_unipayload_handler(tripwire: &Tripwire, conn: &quinn::Connection, a let mut framed = FramedRead::new(rx, LengthDelimitedCodec::new()); loop { + let agent = agent.clone(); + let bookie = bookie.clone(); + match StreamExt::next(&mut framed).await { Some(Ok(b)) => { counter!("corro.peer.stream.bytes.recv.total", "type" => "uni") @@ -59,6 +72,39 @@ pub fn spawn_unipayload_handler(tripwire: &Tripwire, conn: &quinn::Connection, a change, )), cluster_id, + priority, + } if priority => { + if cluster_id != agent.cluster_id() { + continue; + } + + tokio::spawn(async move { + if let Err(e) = + crate::change::process_multiple_changes( + agent, + bookie, + vec![( + change, + ChangeSource::Broadcast, + std::time::Instant::now(), + )], + ) + .await + { + error!( + "process_priority_change failed: {:?}", + e + ); + } + }); + } + UniPayload::V1 { + data: + UniPayloadV1::Broadcast(BroadcastV1::Change( + change, + )), + cluster_id, + .. } => { if cluster_id != agent.cluster_id() { continue; diff --git a/crates/corro-agent/src/agent/util.rs b/crates/corro-agent/src/agent/util.rs index de0ad207..61345c94 100644 --- a/crates/corro-agent/src/agent/util.rs +++ b/crates/corro-agent/src/agent/util.rs @@ -6,8 +6,7 @@ //! be pulled out of this file in future. use std::{ - cmp, - collections::{BTreeMap, BTreeSet, HashMap, HashSet}, + collections::{BTreeMap, BTreeSet, HashMap}, convert::Infallible, net::SocketAddr, ops::RangeInclusive, @@ -30,7 +29,7 @@ use corro_types::{ actor::{Actor, ActorId}, agent::{Agent, Bookie, ChangeError, CurrentVersion, KnownDbVersion, PartialVersion}, base::{CrsqlDbVersion, CrsqlSeq, Version}, - broadcast::{ChangeSource, ChangeV1, Changeset, ChangesetParts, FocaInput}, + broadcast::{Changeset, ChangesetParts, FocaInput}, channel::CorroReceiver, config::AuthzConfig, pubsub::SubsManager, @@ -47,8 +46,8 @@ use foca::Member; use futures::{FutureExt, TryFutureExt}; use hyper::{server::conn::AddrIncoming, StatusCode}; use itertools::Itertools; -use metrics::{counter, histogram}; -use rangemap::{RangeInclusiveMap, RangeInclusiveSet}; +use metrics::counter; +use rangemap::RangeInclusiveSet; use rusqlite::{ named_params, params, params_from_iter, Connection, OptionalExtension, ToSql, Transaction, }; @@ -722,51 +721,6 @@ pub async fn process_completed_empties( Ok(()) } -#[tracing::instrument(skip_all, err)] -pub fn process_single_version( - agent: &Agent, - tx: &Transaction, - last_db_version: Option, - change: ChangeV1, -) -> rusqlite::Result<(KnownDbVersion, Changeset)> { - let ChangeV1 { - actor_id, - changeset, - } = change; - - let versions = changeset.versions(); - - let (known, changeset) = if changeset.is_complete() { - let (known, changeset) = process_complete_version( - tx, - actor_id, - last_db_version, - versions, - changeset - .into_parts() - .expect("no changeset parts, this shouldn't be happening!"), - )?; - - if check_buffered_meta_to_clear(tx, actor_id, changeset.versions())? { - if let Err(e) = agent - .tx_clear_buf() - .try_send((actor_id, changeset.versions())) - { - error!("could not schedule buffered meta clear: {e}"); - } - } - - (known, changeset) - } else { - let parts = changeset.into_parts().unwrap(); - let known = process_incomplete_version(tx, actor_id, &parts)?; - - (known, parts.into()) - }; - - Ok((known, changeset)) -} - pub fn store_empty_changeset( conn: &Connection, actor_id: ActorId, @@ -987,270 +941,6 @@ pub async fn process_fully_buffered_changes( Ok(inserted) } -#[tracing::instrument(skip(agent, bookie, changes), err)] -pub async fn process_multiple_changes( - agent: Agent, - bookie: Bookie, - changes: Vec<(ChangeV1, ChangeSource, Instant)>, -) -> Result<(), ChangeError> { - let start = Instant::now(); - counter!("corro.agent.changes.processing.started").increment(changes.len() as u64); - debug!(self_actor_id = %agent.actor_id(), "processing multiple changes, len: {}", changes.iter().map(|(change, _, _)| cmp::max(change.len(), 1)).sum::()); - - let mut seen = HashSet::new(); - let mut unknown_changes = Vec::with_capacity(changes.len()); - for (change, src, queued_at) in changes { - histogram!("corro.agent.changes.queued.seconds").record(queued_at.elapsed()); - let versions = change.versions(); - let seqs = change.seqs(); - if !seen.insert((change.actor_id, versions, seqs.cloned())) { - continue; - } - if bookie - .write(format!( - "process_multiple_changes(ensure):{}", - change.actor_id.as_simple() - )) - .await - .ensure(change.actor_id) - .read(format!( - "process_multiple_changes(contains?):{}", - change.actor_id.as_simple() - )) - .await - .contains_all(change.versions(), change.seqs()) - { - continue; - } - - unknown_changes.push((change, src)); - } - - unknown_changes.sort_by_key(|(change, _src)| change.actor_id); - - let mut conn = agent.pool().write_normal().await?; - - let changesets = block_in_place(|| { - let start = Instant::now(); - let tx = conn - .immediate_transaction() - .map_err(|source| ChangeError::Rusqlite { - source, - actor_id: None, - version: None, - })?; - - let mut knowns: BTreeMap> = BTreeMap::new(); - let mut changesets = vec![]; - - let mut last_db_version = None; - - for (actor_id, changes) in unknown_changes - .into_iter() - .group_by(|(change, _src)| change.actor_id) - .into_iter() - { - // get a lock on the actor id's booked writer if we didn't already - { - let booked = { - bookie - .blocking_write(format!( - "process_multiple_changes(for_actor_blocking):{}", - actor_id.as_simple() - )) - .ensure(actor_id) - }; - let booked_write = booked.blocking_write(format!( - "process_multiple_changes(booked writer):{}", - actor_id.as_simple() - )); - - let mut seen = RangeInclusiveMap::new(); - - for (change, src) in changes { - trace!("handling a single changeset: {change:?}"); - let seqs = change.seqs(); - if booked_write.contains_all(change.versions(), change.seqs()) { - trace!( - "previously unknown versions are now deemed known, aborting inserts" - ); - continue; - } - - let versions = change.versions(); - - // check if we've seen this version here... - if versions.clone().all(|version| match seqs { - Some(check_seqs) => match seen.get(&version) { - Some(known) => match known { - KnownDbVersion::Partial(PartialVersion { seqs, .. }) => { - check_seqs.clone().all(|seq| seqs.contains(&seq)) - } - KnownDbVersion::Current { .. } | KnownDbVersion::Cleared => true, - }, - None => false, - }, - None => seen.contains_key(&version), - }) { - continue; - } - - // optimizing this, insert later! - let known = if change.is_complete() && change.is_empty() { - // we never want to block here - if let Err(e) = agent.tx_empty().try_send((actor_id, change.versions())) { - error!("could not send empty changed versions into channel: {e}"); - } - - KnownDbVersion::Cleared - } else { - if let Some(seqs) = change.seqs() { - if seqs.end() < seqs.start() { - warn!(%actor_id, versions = ?change.versions(), "received an invalid change, seqs start is greater than seqs end: {seqs:?}"); - continue; - } - } - - let (known, versions) = match process_single_version( - &agent, - &tx, - last_db_version, - change, - ) { - Ok((known, changeset)) => { - let versions = changeset.versions(); - if let KnownDbVersion::Current(CurrentVersion { - db_version, .. - }) = &known - { - last_db_version = Some(*db_version); - changesets.push((actor_id, changeset, *db_version, src)); - } - (known, versions) - } - Err(e) => { - error!(%actor_id, ?versions, "could not process single change: {e}"); - continue; - } - }; - debug!(%actor_id, self_actor_id = %agent.actor_id(), ?versions, "got known to insert: {known:?}"); - known - }; - - seen.insert(versions.clone(), known.clone()); - knowns.entry(actor_id).or_default().push((versions, known)); - } - } - } - - let mut count = 0; - - for (actor_id, knowns) in knowns.iter_mut() { - debug!(%actor_id, self_actor_id = %agent.actor_id(), "processing {} knowns", knowns.len()); - for (versions, known) in knowns.iter_mut() { - match known { - KnownDbVersion::Partial { .. } => { - continue; - } - KnownDbVersion::Current(CurrentVersion { - db_version, - last_seq, - ts, - }) => { - count += 1; - let version = versions.start(); - debug!(%actor_id, self_actor_id = %agent.actor_id(), %version, "inserting bookkeeping row db_version: {db_version}, ts: {ts:?}"); - tx.prepare_cached(" - INSERT INTO __corro_bookkeeping ( actor_id, start_version, db_version, last_seq, ts) - VALUES (:actor_id, :start_version, :db_version, :last_seq, :ts);").map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(*actor_id), version: Some(*version)})? - .execute(named_params!{ - ":actor_id": actor_id, - ":start_version": *version, - ":db_version": *db_version, - ":last_seq": *last_seq, - ":ts": *ts - }).map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(*actor_id), version: Some(*version)})?; - } - KnownDbVersion::Cleared => { - debug!(%actor_id, self_actor_id = %agent.actor_id(), ?versions, "inserting CLEARED bookkeeping"); - if let Err(e) = agent.tx_empty().try_send((*actor_id, versions.clone())) { - error!("could not schedule version to be cleared: {e}"); - } - } - } - debug!(%actor_id, self_actor_id = %agent.actor_id(), ?versions, "inserted bookkeeping row"); - } - } - - debug!("inserted {count} new changesets"); - - tx.commit().map_err(|source| ChangeError::Rusqlite { - source, - actor_id: None, - version: None, - })?; - - for (_, changeset, _, _) in changesets.iter() { - if let Some(ts) = changeset.ts() { - let dur = (agent.clock().new_timestamp().get_time() - ts.0).to_duration(); - histogram!("corro.agent.changes.commit.lag.seconds").record(dur); - } - } - - debug!("committed {count} changes in {:?}", start.elapsed()); - - for (actor_id, knowns) in knowns { - let booked = { - bookie - .blocking_write(format!( - "process_multiple_changes(for_actor_blocking):{}", - actor_id.as_simple() - )) - .ensure(actor_id) - }; - let mut booked_write = booked.blocking_write(format!( - "process_multiple_changes(booked writer, post commit):{}", - actor_id.as_simple() - )); - - for (versions, known) in knowns { - let version = *versions.start(); - // this merges partial version seqs - if let Some(PartialVersion { seqs, last_seq, .. }) = - booked_write.insert_many(versions, known) - { - let full_seqs_range = CrsqlSeq(0)..=last_seq; - let gaps_count = seqs.gaps(&full_seqs_range).count(); - if gaps_count == 0 { - // if we have no gaps, then we can schedule applying all these changes. - debug!(%actor_id, %version, "we now have all versions, notifying for background jobber to insert buffered changes! seqs: {seqs:?}, expected full seqs: {full_seqs_range:?}"); - let tx_apply = agent.tx_apply().clone(); - tokio::spawn(async move { - if let Err(e) = tx_apply.send((actor_id, version)).await { - error!("could not send trigger for applying fully buffered changes later: {e}"); - } - }); - } else { - debug!(%actor_id, %version, "still have {gaps_count} gaps in partially buffered seqs"); - } - } - } - } - - Ok::<_, ChangeError>(changesets) - })?; - - for (_actor_id, changeset, db_version, _src) in changesets { - agent - .subs_manager() - .match_changes(changeset.changes(), db_version); - } - - histogram!("corro.agent.changes.processing.time.seconds").record(start.elapsed()); - - Ok(()) -} - #[tracing::instrument(skip(tx, parts), err)] pub fn process_incomplete_version( tx: &Transaction, diff --git a/crates/corro-agent/src/api/peer.rs b/crates/corro-agent/src/api/peer.rs index b47e35af..3a5f75b2 100644 --- a/crates/corro-agent/src/api/peer.rs +++ b/crates/corro-agent/src/api/peer.rs @@ -1564,10 +1564,7 @@ mod tests { use tokio::sync::mpsc; use tripwire::Tripwire; - use crate::{ - agent::{process_multiple_changes, setup}, - api::public::api_v1_db_schema, - }; + use crate::{agent::setup, api::public::api_v1_db_schema, change::process_multiple_changes}; use super::*; diff --git a/crates/corro-agent/src/broadcast/mod.rs b/crates/corro-agent/src/broadcast/mod.rs index 5f8ed0ab..cb0f1a66 100644 --- a/crates/corro-agent/src/broadcast/mod.rs +++ b/crates/corro-agent/src/broadcast/mod.rs @@ -1,3 +1,5 @@ +mod send; + use std::{ collections::{hash_map::Entry, HashMap, HashSet}, net::SocketAddr, @@ -11,7 +13,7 @@ use std::{ }; use bincode::DefaultOptions; -use bytes::{BufMut, Bytes, BytesMut}; +use bytes::{Bytes, BytesMut}; use foca::{BincodeCodec, Foca, NoCustomBroadcast, Notification, Timer}; use futures::{ stream::{FusedStream, FuturesUnordered}, @@ -22,7 +24,6 @@ use parking_lot::RwLock; use rand::{rngs::StdRng, seq::IteratorRandom, SeedableRng}; use rusqlite::params; use spawn::spawn_counted; -use speedy::Writable; use strum::EnumDiscriminants; use tokio::{ sync::mpsc, @@ -30,18 +31,18 @@ use tokio::{ time::interval, }; use tokio_stream::StreamExt; -use tokio_util::codec::{Encoder, LengthDelimitedCodec}; +use tokio_util::codec::LengthDelimitedCodec; use tracing::{debug, error, log::info, trace, warn}; use tripwire::Tripwire; use corro_types::{ actor::{Actor, ActorId}, agent::Agent, - broadcast::{BroadcastInput, DispatchRuntime, FocaCmd, FocaInput, UniPayload, UniPayloadV1}, + broadcast::{BroadcastInput, DispatchRuntime, FocaCmd, FocaInput}, channel::{bounded, CorroReceiver, CorroSender}, }; -use crate::transport::Transport; +use crate::{broadcast::send::dispatch_broadcast, transport::Transport}; #[derive(Clone)] struct TimerSpawner { @@ -363,8 +364,6 @@ pub fn runtime_loop( }); tokio::spawn(async move { - const BROADCAST_CUTOFF: usize = 64 * 1024; - let mut bcast_codec = LengthDelimitedCodec::new(); let mut bcast_buf = BytesMut::new(); @@ -443,66 +442,21 @@ pub fn runtime_loop( } Branch::Broadcast(input) => { trace!("handling Branch::Broadcast"); - let (bcast, is_local) = match input { - BroadcastInput::Rebroadcast(bcast) => (bcast, false), - BroadcastInput::AddBroadcast(bcast) => (bcast, true), - }; - trace!("adding broadcast: {bcast:?}, local? {is_local}"); - - if let Err(e) = (UniPayload::V1 { - data: UniPayloadV1::Broadcast(bcast.clone()), - cluster_id: agent.cluster_id(), - }) - .write_to_stream((&mut ser_buf).writer()) + if dispatch_broadcast( + &agent, + &transport, + input, + &mut bcast_codec, + &mut ser_buf, + &mut single_bcast_buf, + &mut local_bcast_buf, + &mut bcast_buf, + &mut to_broadcast, + ) + .is_none() { - error!("could not encode UniPayload::V1 Broadcast: {e}"); - ser_buf.clear(); continue; } - trace!("ser buf len: {}", ser_buf.len()); - - if is_local { - if let Err(e) = - bcast_codec.encode(ser_buf.split().freeze(), &mut single_bcast_buf) - { - error!("could not encode local broadcast: {e}"); - single_bcast_buf.clear(); - continue; - } - - let payload = single_bcast_buf.split().freeze(); - - local_bcast_buf.extend_from_slice(&payload); - - { - let members = agent.members().read(); - for addr in members.ring0(agent.cluster_id()) { - // this spawns, so we won't be holding onto the read lock for long - tokio::spawn(transmit_broadcast( - payload.clone(), - transport.clone(), - addr, - )); - } - } - - if local_bcast_buf.len() >= BROADCAST_CUTOFF { - to_broadcast.push(PendingBroadcast::new_local( - local_bcast_buf.split().freeze(), - )); - } - } else { - if let Err(e) = bcast_codec.encode(ser_buf.split().freeze(), &mut bcast_buf) - { - error!("could not encode broadcast: {e}"); - bcast_buf.clear(); - continue; - } - - if bcast_buf.len() >= BROADCAST_CUTOFF { - to_broadcast.push(PendingBroadcast::new(bcast_buf.split().freeze())); - } - } } Branch::WokePendingBroadcast(pending) => { trace!("handling Branch::WokePendingBroadcast"); diff --git a/crates/corro-agent/src/broadcast/send.rs b/crates/corro-agent/src/broadcast/send.rs new file mode 100644 index 00000000..b9310e67 --- /dev/null +++ b/crates/corro-agent/src/broadcast/send.rs @@ -0,0 +1,102 @@ +use crate::{broadcast::transmit_broadcast, transport::Transport}; +use bytes::{BufMut, BytesMut}; +use corro_types::{ + agent::Agent, + broadcast::{BroadcastInput, UniPayload, UniPayloadV1}, +}; +use speedy::Writable; +use tokio_util::codec::{Encoder, LengthDelimitedCodec}; +use tracing::{error, info, trace}; + +use super::PendingBroadcast; + +#[inline] +pub fn dispatch_broadcast( + agent: &Agent, + transport: &Transport, + input: BroadcastInput, + bcast_codec: &mut LengthDelimitedCodec, + ser_buf: &mut BytesMut, + single_bcast_buf: &mut BytesMut, + local_bcast_buf: &mut BytesMut, + bcast_buf: &mut BytesMut, + to_broadcast: &mut Vec, +) -> Option<()> { + const BROADCAST_CUTOFF: usize = 64 * 1024; + + let (bcast, is_local) = match input { + BroadcastInput::Rebroadcast(bcast) => (bcast, false), + BroadcastInput::AddBroadcast(bcast) => (bcast, true), + }; + trace!("adding broadcast: {bcast:?}, local? {is_local}"); + + // Locally originating broadcasts are given higher priority + if is_local { + // todo: change this to encode a V1 payload before merging + info!("Generating a high priority broadcast!"); + if let Err(e) = (UniPayload::V1 { + data: UniPayloadV1::Broadcast(bcast), + cluster_id: agent.cluster_id(), + priority: true, + }) + .write_to_stream(ser_buf.writer()) + { + error!("could not encode UniPayload::V1 Broadcast: {e}"); + ser_buf.clear(); + return None; + } + + trace!("ser buf len: {}", ser_buf.len()); + + if let Err(e) = bcast_codec.encode(ser_buf.split().freeze(), single_bcast_buf) { + error!("could not encode local broadcast: {e}"); + single_bcast_buf.clear(); + return None; + } + + let payload = single_bcast_buf.split().freeze(); + local_bcast_buf.extend_from_slice(&payload); + + { + let members = agent.members().read(); + for addr in members.ring0(agent.cluster_id()) { + // this spawns, so we won't be holding onto the read lock for long + tokio::spawn(transmit_broadcast(payload.clone(), transport.clone(), addr)); + } + } + + if local_bcast_buf.len() >= BROADCAST_CUTOFF { + to_broadcast.push(PendingBroadcast::new_local( + local_bcast_buf.split().freeze(), + )); + } + } + // Re-broadcasts are given default priority + else { + info!("Generating a regular broadcast!"); + if let Err(e) = (UniPayload::V1 { + data: UniPayloadV1::Broadcast(bcast), + cluster_id: agent.cluster_id(), + priority: false, + }) + .write_to_stream(ser_buf.writer()) + { + error!("could not encode UniPayload::V1 Broadcast: {e}"); + ser_buf.clear(); + return None; + } + trace!("ser buf len: {}", ser_buf.len()); + + if let Err(e) = bcast_codec.encode(ser_buf.split().freeze(), bcast_buf) { + error!("could not encode broadcast: {e}"); + bcast_buf.clear(); + return None; + } + + if bcast_buf.len() >= BROADCAST_CUTOFF { + to_broadcast.push(PendingBroadcast::new(bcast_buf.split().freeze())); + } + } + + Some(()) +} diff --git a/crates/corro-agent/src/change.rs b/crates/corro-agent/src/change.rs new file mode 100644 index 00000000..5a1b4e30 --- /dev/null +++ b/crates/corro-agent/src/change.rs @@ -0,0 +1,372 @@ +use crate::agent::util; +use corro_types::{ + actor::ActorId, + agent::{Agent, Bookie, ChangeError, CurrentVersion, KnownDbVersion, PartialVersion}, + base::{CrsqlDbVersion, CrsqlSeq, Version}, + broadcast::{ChangeSource, ChangeV1, Changeset}, +}; +use itertools::Itertools; +use metrics::{counter, histogram}; +use rangemap::RangeInclusiveMap; +use rusqlite::{named_params, Transaction}; +use std::{ + cmp, + collections::{BTreeMap, HashSet}, + ops::RangeInclusive, + time::Instant, +}; +use tokio::task::block_in_place; +use tracing::{debug, error, trace, warn}; + +/// Filter the incoming change queue by versions and sequences that +/// have duplicates in the queue. Return all unknown changes sorted +/// by the changes ActorId. +pub async fn filter_changes( + bookie: &Bookie, + changes: Vec<(ChangeV1, ChangeSource, Instant)>, +) -> Vec<(ChangeV1, ChangeSource)> { + let mut seen = HashSet::new(); + let mut unknown_changes = Vec::with_capacity(changes.len()); + for (change, src, queued_at) in changes { + histogram!("corro.agent.changes.queued.seconds").record(queued_at.elapsed()); + let versions = change.versions(); + let seqs = change.seqs(); + if !seen.insert((change.actor_id, versions, seqs.cloned())) { + continue; + } + if bookie + .write(format!( + "process_multiple_changes(ensure):{}", + change.actor_id.as_simple() + )) + .await + .ensure(change.actor_id) + .read(format!( + "process_multiple_changes(contains?):{}", + change.actor_id.as_simple() + )) + .await + .contains_all(change.versions(), change.seqs()) + { + continue; + } + + unknown_changes.push((change, src)); + } + + unknown_changes.sort_by_key(|(change, _src)| change.actor_id); + + unknown_changes +} + +/// Process a single changeset if it hasn't previously been completed +/// or is empty. Calls process_single_version internally. Return +/// `None` if the changeset was skipped and `Some(KnownDbVersion)` if +/// it was handled +fn process_changeset( + agent: &Agent, + tx: &Transaction<'_>, + actor_id: ActorId, + change: ChangeV1, + change_src: ChangeSource, + versions: &RangeInclusive, + last_db_version: &mut Option, + changesets: &mut Vec<(ActorId, Changeset, CrsqlDbVersion, ChangeSource)>, +) -> Option { + if change.is_complete() && change.is_empty() { + // we never want to block here + if let Err(e) = agent.tx_empty().try_send((actor_id, change.versions())) { + error!("could not send empty changed versions into channel: {e}"); + } + + Some(KnownDbVersion::Cleared) + } else { + if let Some(seqs) = change.seqs() { + if seqs.end() < seqs.start() { + warn!(%actor_id, versions = ?change.versions(), "received an invalid change, seqs start is greater than seqs end: {seqs:?}"); + return None; + } + } + + let (known, versions) = match process_single_version(&agent, &tx, *last_db_version, change) + { + Ok((known, changeset)) => { + let versions = changeset.versions(); + if let KnownDbVersion::Current(CurrentVersion { db_version, .. }) = &known { + *last_db_version = Some(*db_version); + changesets.push((actor_id, changeset, *db_version, change_src)); + } + (known, versions) + } + Err(e) => { + error!(%actor_id, ?versions, "could not process single change: {e}"); + return None; + } + }; + debug!(%actor_id, self_actor_id = %agent.actor_id(), ?versions, "got known to insert: {known:?}"); + Some(known) + } +} + +/// Update the bookie table for current versions of some data +fn update_bookkeeping( + agent: &Agent, + tx: &Transaction, + knowns: &mut BTreeMap, KnownDbVersion)>>, + count: &mut usize, +) -> Result<(), ChangeError> { + for (actor_id, knowns) in knowns.iter_mut() { + debug!(%actor_id, self_actor_id = %agent.actor_id(), "processing {} knowns", knowns.len()); + for (versions, known) in knowns.iter_mut() { + match known { + KnownDbVersion::Partial { .. } => { + continue; + } + KnownDbVersion::Current(CurrentVersion { + db_version, + last_seq, + ts, + }) => { + *count += 1; + let version = versions.start(); + debug!(%actor_id, self_actor_id = %agent.actor_id(), %version, "inserting bookkeeping row db_version: {db_version}, ts: {ts:?}"); + tx.prepare_cached(" + INSERT INTO __corro_bookkeeping ( actor_id, start_version, db_version, last_seq, ts) + VALUES (:actor_id, :start_version, :db_version, :last_seq, :ts);" + ).map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(*actor_id), version: Some(*version)})? + .execute(named_params!{ + ":actor_id": actor_id, + ":start_version": *version, + ":db_version": *db_version, + ":last_seq": *last_seq, + ":ts": *ts + }).map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(*actor_id), version: Some(*version)})?; + } + KnownDbVersion::Cleared => { + debug!(%actor_id, self_actor_id = %agent.actor_id(), ?versions, "inserting CLEARED bookkeeping"); + if let Err(e) = agent.tx_empty().try_send((*actor_id, versions.clone())) { + error!("could not schedule version to be cleared: {e}"); + } + } + } + debug!(%actor_id, self_actor_id = %agent.actor_id(), ?versions, "inserted bookkeeping row"); + } + } + + Ok(()) +} + +#[tracing::instrument(skip(agent, bookie, changes), err)] +pub async fn process_multiple_changes( + agent: Agent, + bookie: Bookie, + changes: Vec<(ChangeV1, ChangeSource, Instant)>, +) -> Result<(), ChangeError> { + let start = Instant::now(); + counter!("corro.agent.changes.processing.started").increment(changes.len() as u64); + debug!(self_actor_id = %agent.actor_id(), "processing multiple changes, len: {}", changes.iter().map(|(change, _, _)| cmp::max(change.len(), 1)).sum::()); + + let unknown_changes = filter_changes(&bookie, changes).await; + let mut conn = agent.pool().write_normal().await?; + + let applied_changesets = block_in_place(|| { + let start = Instant::now(); + let tx = conn + .immediate_transaction() + .map_err(|source| ChangeError::Rusqlite { + source, + actor_id: None, + version: None, + })?; + + let mut knowns: BTreeMap> = BTreeMap::new(); + let mut changesets = vec![]; + + let mut last_db_version = None; + + for (actor_id, changes) in unknown_changes + .into_iter() + .group_by(|(change, _src)| change.actor_id) + .into_iter() + { + // get a lock on the actor id's booked writer if we didn't already + { + let booked = { + bookie + .blocking_write(format!( + "process_multiple_changes(for_actor_blocking):{}", + actor_id.as_simple() + )) + .ensure(actor_id) + }; + let booked_write = booked.blocking_write(format!( + "process_multiple_changes(booked writer):{}", + actor_id.as_simple() + )); + + let mut seen = RangeInclusiveMap::new(); // RangeInclusiveMap + + for (change, src) in changes { + trace!("handling a single changeset: {change:?}"); + let seqs = change.seqs(); + + // If full change sequence in a version is already + // known, skip this change + if booked_write.contains_all(change.versions(), change.seqs()) { + trace!( + "previously unknown versions are now deemed known, aborting inserts" + ); + continue; + } + + let versions = change.versions(); + + // check if we've seen this version here... + if versions.clone().all(|version| match seqs { + Some(check_seqs) => match seen.get(&version) { + Some(known) => match known { + KnownDbVersion::Partial(PartialVersion { seqs, .. }) => { + check_seqs.clone().all(|seq| seqs.contains(&seq)) + } + KnownDbVersion::Current { .. } | KnownDbVersion::Cleared => true, + }, + None => false, + }, + None => seen.contains_key(&version), + }) { + continue; + } + + // optimizing this, insert later! + if let Some(known) = process_changeset( + &agent, + &tx, + actor_id, + change, + src, + &versions, + &mut last_db_version, + &mut changesets, + ) { + seen.insert(versions.clone(), known.clone()); + knowns.entry(actor_id).or_default().push((versions, known)); + } + } + } + } + + let mut count = 0; + update_bookkeeping(&agent, &tx, &mut knowns, &mut count)?; + debug!("inserted {count} new changesets"); + + tx.commit().map_err(|source| ChangeError::Rusqlite { + source, + actor_id: None, + version: None, + })?; + + for (_, changeset, _, _) in changesets.iter() { + if let Some(ts) = changeset.ts() { + let dur = (agent.clock().new_timestamp().get_time() - ts.0).to_duration(); + histogram!("corro.agent.changes.commit.lag.seconds").record(dur); + } + } + + debug!("committed {count} changes in {:?}", start.elapsed()); + + for (actor_id, knowns) in knowns { + let booked = { + bookie + .blocking_write(format!( + "process_multiple_changes(for_actor_blocking):{}", + actor_id.as_simple() + )) + .ensure(actor_id) + }; + let mut booked_write = booked.blocking_write(format!( + "process_multiple_changes(booked writer, post commit):{}", + actor_id.as_simple() + )); + + for (versions, known) in knowns { + let version = *versions.start(); + // this merges partial version seqs + if let Some(PartialVersion { seqs, last_seq, .. }) = + booked_write.insert_many(versions, known) + { + let full_seqs_range = CrsqlSeq(0)..=last_seq; + let gaps_count = seqs.gaps(&full_seqs_range).count(); + if gaps_count == 0 { + // if we have no gaps, then we can schedule applying all these changes. + debug!(%actor_id, %version, "we now have all versions, notifying for background jobber to insert buffered changes! seqs: {seqs:?}, expected full seqs: {full_seqs_range:?}"); + let tx_apply = agent.tx_apply().clone(); + tokio::spawn(async move { + if let Err(e) = tx_apply.send((actor_id, version)).await { + error!("could not send trigger for applying fully buffered changes later: {e}"); + } + }); + } else { + debug!(%actor_id, %version, "still have {gaps_count} gaps in partially buffered seqs"); + } + } + } + } + + Ok::<_, ChangeError>(changesets) + })?; + + for (_actor_id, changeset, db_version, _src) in applied_changesets { + agent + .subs_manager() + .match_changes(changeset.changes(), db_version); + } + + histogram!("corro.agent.changes.processing.time.seconds").record(start.elapsed()); + + Ok(()) +} + +#[tracing::instrument(skip_all, err)] +pub fn process_single_version( + agent: &Agent, + trans: &Transaction, + last_db_version: Option, + change: ChangeV1, +) -> rusqlite::Result<(KnownDbVersion, Changeset)> { + let ChangeV1 { + actor_id, + changeset, + } = change; + + let versions = changeset.versions(); + + let (known, changeset) = if changeset.is_complete() { + let (known, changeset) = util::process_complete_version( + trans, + actor_id, + last_db_version, + versions, + changeset + .into_parts() + .expect("no changeset parts, this shouldn't be happening!"), + )?; + + if util::check_buffered_meta_to_clear(trans, actor_id, changeset.versions())? { + if let Err(e) = agent + .tx_clear_buf() + .try_send((actor_id, changeset.versions())) + { + error!("could not schedule buffered meta clear: {e}"); + } + } + + (known, changeset) + } else { + let parts = changeset.into_parts().unwrap(); + let known = util::process_incomplete_version(trans, actor_id, &parts)?; + + (known, parts.into()) + }; + + Ok((known, changeset)) +} diff --git a/crates/corro-agent/src/lib.rs b/crates/corro-agent/src/lib.rs index 26604eab..43aab945 100644 --- a/crates/corro-agent/src/lib.rs +++ b/crates/corro-agent/src/lib.rs @@ -2,5 +2,5 @@ pub mod agent; pub mod api; pub mod broadcast; +pub mod change; pub mod transport; - diff --git a/crates/corro-base-types/src/lib.rs b/crates/corro-base-types/src/lib.rs index 14bb3f84..4ec8040e 100644 --- a/crates/corro-base-types/src/lib.rs +++ b/crates/corro-base-types/src/lib.rs @@ -181,6 +181,9 @@ impl fmt::Display for CrsqlDbVersion { } } +/// A single change in a sequence of changes in a single transaction +/// +/// This type is usually wrapped by a RangeMap/Set #[derive( Copy, Clone, Debug, Default, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize, )] diff --git a/crates/corro-devcluster/Cargo.toml b/crates/corro-devcluster/Cargo.toml new file mode 100644 index 00000000..71edfbc7 --- /dev/null +++ b/crates/corro-devcluster/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "corro-devcluster" +version = "0.1.0" +edition = "2021" + +[dependencies] +serde = { workspace = true, features = [ "derive" ] } +serde_json = { workspace = true } +thiserror = { workspace = true } +rand = { workspace = true } +nom = { workspace = true } +clap = { workspace = true } \ No newline at end of file diff --git a/crates/corro-devcluster/README.md b/crates/corro-devcluster/README.md new file mode 100644 index 00000000..6b6a2626 --- /dev/null +++ b/crates/corro-devcluster/README.md @@ -0,0 +1,11 @@ +# corro-devcluster + +A tool to quickly start a corrosion cluster for development. + +- Build corrosion with nix (keep in mind that changes to the + `Cargo.lock` file need to be checked into git to be visible to a nix + build) +- Provide a topology config (`.txt` is fine, see + [example_topologies](example_topologies/)) +- Provide a state directory. Node state and log output will be kept + in separated directories. diff --git a/crates/corro-devcluster/example_topologies/a_b.txt b/crates/corro-devcluster/example_topologies/a_b.txt new file mode 100644 index 00000000..5d42c59d --- /dev/null +++ b/crates/corro-devcluster/example_topologies/a_b.txt @@ -0,0 +1 @@ +A -> B diff --git a/crates/corro-devcluster/example_topologies/a_b_c_circle.txt b/crates/corro-devcluster/example_topologies/a_b_c_circle.txt new file mode 100644 index 00000000..01b86788 --- /dev/null +++ b/crates/corro-devcluster/example_topologies/a_b_c_circle.txt @@ -0,0 +1,3 @@ +A -> B +B -> C +C -> A diff --git a/crates/corro-devcluster/src/main.rs b/crates/corro-devcluster/src/main.rs new file mode 100644 index 00000000..a9129947 --- /dev/null +++ b/crates/corro-devcluster/src/main.rs @@ -0,0 +1,258 @@ +mod topology; + +use clap::{Parser, Subcommand}; +use std::{ + collections::{BTreeMap, HashMap}, + env, + fs::File, + io::{Read, Write}, + path::PathBuf, + process::Command, + sync::mpsc::{channel, Sender}, + thread, + time::Duration, +}; + +use crate::topology::Simple; + +#[derive(Parser)] +#[clap(version = env!("CARGO_PKG_VERSION"))] +struct Cli { + /// Set the state directory path. If not set the environment + /// variable CORRO_DEVCLUSTER_STATE_DIR will be used + #[clap(long = "statedir", short = 'd', global = true)] + state_directory: Option, + + /// Set the state directory path. If not set the environment + /// variable CORRO_DEVCLUSTER_SCHEMA_DIR will be used + #[clap(long = "schemadir", short = 's', global = true)] + schema_directory: Option, + + /// Provide the binary path for corrosion. If none is provided, + /// corrosion will be built with nix (which may take a minute) + #[clap(long = "binpath", short = 'b', global = true)] + binary_path: Option, + + #[command(subcommand)] + command: CliCommand, +} + +#[derive(Subcommand)] +enum CliCommand { + /// Create a simple topology in format `A -> B`, `B -> C`, etc + Simple { + /// Set the topology file path + topology_path: PathBuf, + }, +} + +fn main() { + let cli: Cli = Cli::parse(); + + let state_dir = match cli + .state_directory + .or(env::var("CORRO_DEVCLUSTER_STATE_DIR") + .ok() + .map(|path| PathBuf::new().join(path))) + { + Some(dir) => dir, + None => { + eprintln!("FAILED: either pass `--statedir` or set 'CORRO_DEVCLUSTER_STATE_DIR' environment variable!"); + std::process::exit(1); + } + }; + + let schema_dir = match cli + .schema_directory + .or(env::var("CORRO_DEVCLUSTER_SCHEMA_DIR") + .ok() + .map(|path| PathBuf::new().join(path))) + { + Some(dir) => dir, + None => { + eprintln!("FAILED: either pass `--statedir` or set 'CORRO_DEVCLUSTER_STATE_DIR' environment variable!"); + std::process::exit(1); + } + }; + + let bin_path = cli + .binary_path + .or_else(|| build_corrosion().map(|h| h.path)) + .expect("failed to determine corrosion binary location!"); + + match cli.command { + CliCommand::Simple { topology_path } => { + let mut topo_config = File::open(topology_path).expect("failed to open topology-file!"); + let mut topo_buffer = String::new(); + topo_config + .read_to_string(&mut topo_buffer) + .expect("failed to read topology-file!"); + + let mut topology = Simple::default(); + topo_buffer.lines().for_each(|line| { + topology + .parse_edge(line) + .expect("Syntax error in topology-file!"); + }); + + run_simple_topology(topology, bin_path, state_dir, schema_dir); + } + } + + // let handle = build_corrosion(env::args().next().map(|s| PathBuf::new().join(s)).unwrap()); + // println!("{:#?}", handle); +} + +fn run_simple_topology(topo: Simple, bin_path: String, state_dir: PathBuf, schema_dir: PathBuf) { + println!("//// Creating topology: \n{:#?}", topo); + let nodes = topo.get_all_nodes(); + + let mut port_map = BTreeMap::default(); + + // First go assign ports to all the nodes + for node_name in &nodes { + // Generate a port in range 1025 - 32768 + let node_port: u16 = 1025 + rand::random::() % (32 * 1024) - 1025; + port_map.insert(node_name.clone(), node_port); + } + + // Then generate each config with the appropriate bootstrap_set + for node_name in &nodes { + let node_port = port_map.get(node_name).unwrap(); // We just put it there + let node_state = state_dir.join(node_name); + + // Delete / create the node state directory + let _ = std::fs::remove_dir(&node_state); + let _ = std::fs::create_dir_all(&node_state); + + let mut bootstrap_set = vec![]; + for link in topo.inner.get(node_name).unwrap() { + bootstrap_set.push(format!( + "\"[::1]:{}\"", // only connect locally + port_map.get(link).expect("Port for node not set!") + )); + } + + let node_config = generate_config( + node_state.to_str().unwrap(), + schema_dir.to_str().unwrap(), + *node_port, + bootstrap_set, + ); + + println!( + "Generated config for node '{}': \n{}", + node_name, node_config + ); + + let mut config_file = File::create(node_state.join("config.toml")) + .expect("failed to create node config file"); + config_file + .write_all(node_config.as_bytes()) + .expect("failed to write node config file"); + } + + let (tx, rx) = channel::<()>(); + + // Spawn nodes those without bootstraps first if they exist. + for (pure_responder, _) in topo.inner.iter().filter(|(_, vec)| vec.is_empty()) { + run_corrosion(tx.clone(), bin_path.clone(), state_dir.join(pure_responder)); + thread::sleep(Duration::from_millis(250)); // give the start thread a bit of time to breathe + } + + for (initiator, _) in topo.inner.iter().filter(|(_, vec)| !vec.is_empty()) { + run_corrosion(tx.clone(), bin_path.clone(), state_dir.join(initiator)); + thread::sleep(Duration::from_millis(250)); // give the start thread a bit of time to breathe + } + + // wait for the threads + while let Ok(()) = rx.recv() {} + Command::new("pkill") + .arg("corrosion") + .output() + .expect("failed to gracefully kill corrosions. They've become sentient!!!"); +} + +fn generate_config( + state_dir: &str, + schema_dir: &str, + port: u16, + bootstrap_set: Vec, +) -> String { + let bootstrap = bootstrap_set.join(","); + format!( + r#"[db] +path = "{state_dir}/corrosion.db" +schema_paths = ["{schema_dir}"] + +[gossip] +addr = "[::]:{port}" +external_addr = "[::1]:{port}" +bootstrap = [{bootstrap}] +plaintext = true + +[api] +addr = "127.0.0.1:{api_port}" + +[admin] +path = "{state_dir}/admin.sock" +"#, + state_dir = state_dir, + schema_dir = schema_dir, + port = port, + // the chances of a collision here are very very small since + // every port is random + api_port = port + 1, + bootstrap = bootstrap + ) +} + +#[derive(Debug)] +struct BinHandle { + path: String, +} + +fn nix_output(vec: &Vec) -> Vec> { + serde_json::from_slice(vec).unwrap() +} + +fn run_corrosion(tx: Sender<()>, bin_path: String, state_path: PathBuf) { + let node_log = File::create(state_path.join("node.log")).expect("couldn't create log file"); + let mut cmd = Command::new(bin_path); + + cmd.args([ + "-c", + state_path.join("config.toml").to_str().unwrap(), + "agent", + ]); + + cmd.stdout(node_log); + let mut cmd_handle = cmd.spawn().expect("failed to spawn corrosion!"); + + thread::spawn(move || { + println!("Waiting for node..."); + cmd_handle + .wait() + .expect("corrosion node has encountered an error!"); + tx.send(()).unwrap(); + println!("Node completed") + }); +} + +fn build_corrosion() -> Option { + println!("Running 'nix build' ..."); + let build_output = Command::new("nix") + .args(["build", "--json"]) + .output() + .ok()?; + + let json = nix_output(&build_output.stdout).remove(0); + + Some(BinHandle { + path: json + .get("outputs")? + .get("out")? + .to_string() + .replace("\"", ""), + }) +} diff --git a/crates/corro-devcluster/src/topology/mod.rs b/crates/corro-devcluster/src/topology/mod.rs new file mode 100644 index 00000000..894fad9a --- /dev/null +++ b/crates/corro-devcluster/src/topology/mod.rs @@ -0,0 +1,52 @@ +use nom::{ + bytes::complete::tag, + character::complete::{alpha1, multispace0}, + sequence::delimited, + IResult, +}; +use std::collections::{BTreeMap, BTreeSet}; + +/// Topology definition as a set of graph edges +#[derive(Debug, Default)] +pub struct Simple { + pub(crate) inner: BTreeMap>, +} + +impl Simple { + /// Parse a topology line in the following format: + /// + /// A -> B + /// B -> C + /// A -> C + /// etc + pub fn parse_edge<'top, 'input>(&mut self, input: &'input str) -> IResult<&'input str, ()> { + let (input, first) = alpha1(input)?; + let (input, _) = delimited(multispace0, tag("->"), multispace0)(input)?; + let (input, second) = alpha1(input)?; + + // Add first -> second edge + self.inner + .entry(first.to_string()) + .or_default() + .push(second.to_string()); + // Add second to the map if it doesn't exist yet but don't + // create the connection edge + self.inner.entry(second.to_string()).or_default(); + + Ok((input, ())) + } + + pub fn get_all_nodes(&self) -> Vec { + self.inner + .keys() + .fold(BTreeSet::new(), |mut acc, key| { + acc.insert(key.clone()); + self.inner.get(key).unwrap().iter().for_each(|value| { + acc.insert(value.clone()); + }); + acc + }) + .into_iter() + .collect() + } +} diff --git a/crates/corro-types/src/broadcast.rs b/crates/corro-types/src/broadcast.rs index 1b4035ac..44c68117 100644 --- a/crates/corro-types/src/broadcast.rs +++ b/crates/corro-types/src/broadcast.rs @@ -33,9 +33,12 @@ pub enum UniPayload { data: UniPayloadV1, #[speedy(default_on_eof)] cluster_id: ClusterId, + #[speedy(default_on_eof)] + priority: bool, }, } +/// Broadcast change payload #[derive(Debug, Clone, Readable, Writable)] pub enum UniPayloadV1 { Broadcast(BroadcastV1), diff --git a/flake.nix b/flake.nix index 6f9f5231..88bb1df5 100644 --- a/flake.nix +++ b/flake.nix @@ -54,10 +54,15 @@ ulimit -n 65536 ''; + # Useful when doing development builds + doCheck = false; + buildType = "debug"; + ## Build environment dependencies nativeBuildInputs = [ pkgs.pkg-config pkgs.mold + pkgs.clang rust-latest.toolchain ];