From 0f3ba6fcc308ddc59458cd684896be76fe368b8b Mon Sep 17 00:00:00 2001 From: Katharina Fey Date: Fri, 2 Feb 2024 12:44:47 +0100 Subject: [PATCH 1/5] Move change processing functions to a separate module --- crates/corro-agent/src/agent/handlers.rs | 12 +- crates/corro-agent/src/agent/mod.rs | 3 +- crates/corro-agent/src/agent/util.rs | 318 +--------------------- crates/corro-agent/src/change.rs | 327 +++++++++++++++++++++++ crates/corro-agent/src/lib.rs | 1 + 5 files changed, 340 insertions(+), 321 deletions(-) create mode 100644 crates/corro-agent/src/change.rs diff --git a/crates/corro-agent/src/agent/handlers.rs b/crates/corro-agent/src/agent/handlers.rs index a4ef7605..915d6d41 100644 --- a/crates/corro-agent/src/agent/handlers.rs +++ b/crates/corro-agent/src/agent/handlers.rs @@ -10,8 +10,9 @@ use std::{ }; use crate::{ - agent::{bi, bootstrap, uni, util, SyncClientError, ANNOUNCE_INTERVAL}, + agent::{bi, bootstrap, uni, SyncClientError, ANNOUNCE_INTERVAL}, api::peer::parallel_sync, + change, transport::Transport, }; use corro_types::{ @@ -414,7 +415,7 @@ pub async fn handle_changes( } debug!(count = %tmp_count, "spawning processing multiple changes from beginning of loop"); - join_set.spawn(util::process_multiple_changes( + join_set.spawn(change::process_multiple_changes( agent.clone(), bookie.clone(), std::mem::take(&mut buf), @@ -529,7 +530,7 @@ pub async fn handle_changes( if count < MIN_CHANGES_CHUNK && !queue.is_empty() && join_set.len() < MAX_CONCURRENT { // we can process this right away debug!(%count, "spawning processing multiple changes from max wait interval"); - join_set.spawn(util::process_multiple_changes( + join_set.spawn(change::process_multiple_changes( agent.clone(), bookie.clone(), queue.drain(..).collect(), @@ -563,7 +564,7 @@ pub async fn handle_changes( queue.push_back((change, src, Instant::now())); if count >= MIN_CHANGES_CHUNK { // drain and process current changes! - if let Err(e) = util::process_multiple_changes( + if let Err(e) = change::process_multiple_changes( agent.clone(), bookie.clone(), queue.drain(..).collect(), @@ -579,7 +580,8 @@ pub async fn handle_changes( } // process the last changes we got! - if let Err(e) = util::process_multiple_changes(agent, bookie, queue.into_iter().collect()).await + if let Err(e) = + change::process_multiple_changes(agent, bookie, queue.into_iter().collect()).await { error!("could not process multiple changes: {e}"); } diff --git a/crates/corro-agent/src/agent/mod.rs b/crates/corro-agent/src/agent/mod.rs index 9e54cadd..d06bb1b5 100644 --- a/crates/corro-agent/src/agent/mod.rs +++ b/crates/corro-agent/src/agent/mod.rs @@ -12,7 +12,7 @@ mod metrics; mod run_root; mod setup; mod uni; -mod util; +pub(crate) mod util; #[cfg(test)] mod tests; @@ -27,7 +27,6 @@ use uuid::Uuid; pub use error::{SyncClientError, SyncRecvError}; pub use run_root::start_with_config; pub use setup::{setup, AgentOptions}; -pub use util::process_multiple_changes; pub const ANNOUNCE_INTERVAL: Duration = Duration::from_secs(300); pub const COMPACT_BOOKED_INTERVAL: Duration = Duration::from_secs(300); diff --git a/crates/corro-agent/src/agent/util.rs b/crates/corro-agent/src/agent/util.rs index de0ad207..61345c94 100644 --- a/crates/corro-agent/src/agent/util.rs +++ b/crates/corro-agent/src/agent/util.rs @@ -6,8 +6,7 @@ //! be pulled out of this file in future. use std::{ - cmp, - collections::{BTreeMap, BTreeSet, HashMap, HashSet}, + collections::{BTreeMap, BTreeSet, HashMap}, convert::Infallible, net::SocketAddr, ops::RangeInclusive, @@ -30,7 +29,7 @@ use corro_types::{ actor::{Actor, ActorId}, agent::{Agent, Bookie, ChangeError, CurrentVersion, KnownDbVersion, PartialVersion}, base::{CrsqlDbVersion, CrsqlSeq, Version}, - broadcast::{ChangeSource, ChangeV1, Changeset, ChangesetParts, FocaInput}, + broadcast::{Changeset, ChangesetParts, FocaInput}, channel::CorroReceiver, config::AuthzConfig, pubsub::SubsManager, @@ -47,8 +46,8 @@ use foca::Member; use futures::{FutureExt, TryFutureExt}; use hyper::{server::conn::AddrIncoming, StatusCode}; use itertools::Itertools; -use metrics::{counter, histogram}; -use rangemap::{RangeInclusiveMap, RangeInclusiveSet}; +use metrics::counter; +use rangemap::RangeInclusiveSet; use rusqlite::{ named_params, params, params_from_iter, Connection, OptionalExtension, ToSql, Transaction, }; @@ -722,51 +721,6 @@ pub async fn process_completed_empties( Ok(()) } -#[tracing::instrument(skip_all, err)] -pub fn process_single_version( - agent: &Agent, - tx: &Transaction, - last_db_version: Option, - change: ChangeV1, -) -> rusqlite::Result<(KnownDbVersion, Changeset)> { - let ChangeV1 { - actor_id, - changeset, - } = change; - - let versions = changeset.versions(); - - let (known, changeset) = if changeset.is_complete() { - let (known, changeset) = process_complete_version( - tx, - actor_id, - last_db_version, - versions, - changeset - .into_parts() - .expect("no changeset parts, this shouldn't be happening!"), - )?; - - if check_buffered_meta_to_clear(tx, actor_id, changeset.versions())? { - if let Err(e) = agent - .tx_clear_buf() - .try_send((actor_id, changeset.versions())) - { - error!("could not schedule buffered meta clear: {e}"); - } - } - - (known, changeset) - } else { - let parts = changeset.into_parts().unwrap(); - let known = process_incomplete_version(tx, actor_id, &parts)?; - - (known, parts.into()) - }; - - Ok((known, changeset)) -} - pub fn store_empty_changeset( conn: &Connection, actor_id: ActorId, @@ -987,270 +941,6 @@ pub async fn process_fully_buffered_changes( Ok(inserted) } -#[tracing::instrument(skip(agent, bookie, changes), err)] -pub async fn process_multiple_changes( - agent: Agent, - bookie: Bookie, - changes: Vec<(ChangeV1, ChangeSource, Instant)>, -) -> Result<(), ChangeError> { - let start = Instant::now(); - counter!("corro.agent.changes.processing.started").increment(changes.len() as u64); - debug!(self_actor_id = %agent.actor_id(), "processing multiple changes, len: {}", changes.iter().map(|(change, _, _)| cmp::max(change.len(), 1)).sum::()); - - let mut seen = HashSet::new(); - let mut unknown_changes = Vec::with_capacity(changes.len()); - for (change, src, queued_at) in changes { - histogram!("corro.agent.changes.queued.seconds").record(queued_at.elapsed()); - let versions = change.versions(); - let seqs = change.seqs(); - if !seen.insert((change.actor_id, versions, seqs.cloned())) { - continue; - } - if bookie - .write(format!( - "process_multiple_changes(ensure):{}", - change.actor_id.as_simple() - )) - .await - .ensure(change.actor_id) - .read(format!( - "process_multiple_changes(contains?):{}", - change.actor_id.as_simple() - )) - .await - .contains_all(change.versions(), change.seqs()) - { - continue; - } - - unknown_changes.push((change, src)); - } - - unknown_changes.sort_by_key(|(change, _src)| change.actor_id); - - let mut conn = agent.pool().write_normal().await?; - - let changesets = block_in_place(|| { - let start = Instant::now(); - let tx = conn - .immediate_transaction() - .map_err(|source| ChangeError::Rusqlite { - source, - actor_id: None, - version: None, - })?; - - let mut knowns: BTreeMap> = BTreeMap::new(); - let mut changesets = vec![]; - - let mut last_db_version = None; - - for (actor_id, changes) in unknown_changes - .into_iter() - .group_by(|(change, _src)| change.actor_id) - .into_iter() - { - // get a lock on the actor id's booked writer if we didn't already - { - let booked = { - bookie - .blocking_write(format!( - "process_multiple_changes(for_actor_blocking):{}", - actor_id.as_simple() - )) - .ensure(actor_id) - }; - let booked_write = booked.blocking_write(format!( - "process_multiple_changes(booked writer):{}", - actor_id.as_simple() - )); - - let mut seen = RangeInclusiveMap::new(); - - for (change, src) in changes { - trace!("handling a single changeset: {change:?}"); - let seqs = change.seqs(); - if booked_write.contains_all(change.versions(), change.seqs()) { - trace!( - "previously unknown versions are now deemed known, aborting inserts" - ); - continue; - } - - let versions = change.versions(); - - // check if we've seen this version here... - if versions.clone().all(|version| match seqs { - Some(check_seqs) => match seen.get(&version) { - Some(known) => match known { - KnownDbVersion::Partial(PartialVersion { seqs, .. }) => { - check_seqs.clone().all(|seq| seqs.contains(&seq)) - } - KnownDbVersion::Current { .. } | KnownDbVersion::Cleared => true, - }, - None => false, - }, - None => seen.contains_key(&version), - }) { - continue; - } - - // optimizing this, insert later! - let known = if change.is_complete() && change.is_empty() { - // we never want to block here - if let Err(e) = agent.tx_empty().try_send((actor_id, change.versions())) { - error!("could not send empty changed versions into channel: {e}"); - } - - KnownDbVersion::Cleared - } else { - if let Some(seqs) = change.seqs() { - if seqs.end() < seqs.start() { - warn!(%actor_id, versions = ?change.versions(), "received an invalid change, seqs start is greater than seqs end: {seqs:?}"); - continue; - } - } - - let (known, versions) = match process_single_version( - &agent, - &tx, - last_db_version, - change, - ) { - Ok((known, changeset)) => { - let versions = changeset.versions(); - if let KnownDbVersion::Current(CurrentVersion { - db_version, .. - }) = &known - { - last_db_version = Some(*db_version); - changesets.push((actor_id, changeset, *db_version, src)); - } - (known, versions) - } - Err(e) => { - error!(%actor_id, ?versions, "could not process single change: {e}"); - continue; - } - }; - debug!(%actor_id, self_actor_id = %agent.actor_id(), ?versions, "got known to insert: {known:?}"); - known - }; - - seen.insert(versions.clone(), known.clone()); - knowns.entry(actor_id).or_default().push((versions, known)); - } - } - } - - let mut count = 0; - - for (actor_id, knowns) in knowns.iter_mut() { - debug!(%actor_id, self_actor_id = %agent.actor_id(), "processing {} knowns", knowns.len()); - for (versions, known) in knowns.iter_mut() { - match known { - KnownDbVersion::Partial { .. } => { - continue; - } - KnownDbVersion::Current(CurrentVersion { - db_version, - last_seq, - ts, - }) => { - count += 1; - let version = versions.start(); - debug!(%actor_id, self_actor_id = %agent.actor_id(), %version, "inserting bookkeeping row db_version: {db_version}, ts: {ts:?}"); - tx.prepare_cached(" - INSERT INTO __corro_bookkeeping ( actor_id, start_version, db_version, last_seq, ts) - VALUES (:actor_id, :start_version, :db_version, :last_seq, :ts);").map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(*actor_id), version: Some(*version)})? - .execute(named_params!{ - ":actor_id": actor_id, - ":start_version": *version, - ":db_version": *db_version, - ":last_seq": *last_seq, - ":ts": *ts - }).map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(*actor_id), version: Some(*version)})?; - } - KnownDbVersion::Cleared => { - debug!(%actor_id, self_actor_id = %agent.actor_id(), ?versions, "inserting CLEARED bookkeeping"); - if let Err(e) = agent.tx_empty().try_send((*actor_id, versions.clone())) { - error!("could not schedule version to be cleared: {e}"); - } - } - } - debug!(%actor_id, self_actor_id = %agent.actor_id(), ?versions, "inserted bookkeeping row"); - } - } - - debug!("inserted {count} new changesets"); - - tx.commit().map_err(|source| ChangeError::Rusqlite { - source, - actor_id: None, - version: None, - })?; - - for (_, changeset, _, _) in changesets.iter() { - if let Some(ts) = changeset.ts() { - let dur = (agent.clock().new_timestamp().get_time() - ts.0).to_duration(); - histogram!("corro.agent.changes.commit.lag.seconds").record(dur); - } - } - - debug!("committed {count} changes in {:?}", start.elapsed()); - - for (actor_id, knowns) in knowns { - let booked = { - bookie - .blocking_write(format!( - "process_multiple_changes(for_actor_blocking):{}", - actor_id.as_simple() - )) - .ensure(actor_id) - }; - let mut booked_write = booked.blocking_write(format!( - "process_multiple_changes(booked writer, post commit):{}", - actor_id.as_simple() - )); - - for (versions, known) in knowns { - let version = *versions.start(); - // this merges partial version seqs - if let Some(PartialVersion { seqs, last_seq, .. }) = - booked_write.insert_many(versions, known) - { - let full_seqs_range = CrsqlSeq(0)..=last_seq; - let gaps_count = seqs.gaps(&full_seqs_range).count(); - if gaps_count == 0 { - // if we have no gaps, then we can schedule applying all these changes. - debug!(%actor_id, %version, "we now have all versions, notifying for background jobber to insert buffered changes! seqs: {seqs:?}, expected full seqs: {full_seqs_range:?}"); - let tx_apply = agent.tx_apply().clone(); - tokio::spawn(async move { - if let Err(e) = tx_apply.send((actor_id, version)).await { - error!("could not send trigger for applying fully buffered changes later: {e}"); - } - }); - } else { - debug!(%actor_id, %version, "still have {gaps_count} gaps in partially buffered seqs"); - } - } - } - } - - Ok::<_, ChangeError>(changesets) - })?; - - for (_actor_id, changeset, db_version, _src) in changesets { - agent - .subs_manager() - .match_changes(changeset.changes(), db_version); - } - - histogram!("corro.agent.changes.processing.time.seconds").record(start.elapsed()); - - Ok(()) -} - #[tracing::instrument(skip(tx, parts), err)] pub fn process_incomplete_version( tx: &Transaction, diff --git a/crates/corro-agent/src/change.rs b/crates/corro-agent/src/change.rs new file mode 100644 index 00000000..c5dcfeab --- /dev/null +++ b/crates/corro-agent/src/change.rs @@ -0,0 +1,327 @@ +use crate::agent::util; +use corro_types::{ + actor::ActorId, + agent::{Agent, Bookie, ChangeError, CurrentVersion, KnownDbVersion, PartialVersion}, + base::{CrsqlSeq, CrsqlDbVersion}, + broadcast::{ChangeSource, ChangeV1, Changeset}, +}; +use itertools::Itertools; +use metrics::{counter, histogram}; +use rangemap::RangeInclusiveMap; +use rusqlite::{named_params, Transaction}; +use std::{ + cmp, + collections::{BTreeMap, HashSet}, + time::Instant, +}; +use tokio::task::block_in_place; +use tracing::{debug, error, trace, warn}; + +#[tracing::instrument(skip(agent, bookie, changes), err)] +pub async fn process_multiple_changes( + agent: Agent, + bookie: Bookie, + changes: Vec<(ChangeV1, ChangeSource, Instant)>, +) -> Result<(), ChangeError> { + let start = Instant::now(); + counter!("corro.agent.changes.processing.started").increment(changes.len() as u64); + debug!(self_actor_id = %agent.actor_id(), "processing multiple changes, len: {}", changes.iter().map(|(change, _, _)| cmp::max(change.len(), 1)).sum::()); + + let mut seen = HashSet::new(); + let mut unknown_changes = Vec::with_capacity(changes.len()); + for (change, src, queued_at) in changes { + histogram!("corro.agent.changes.queued.seconds").record(queued_at.elapsed()); + let versions = change.versions(); + let seqs = change.seqs(); + if !seen.insert((change.actor_id, versions, seqs.cloned())) { + continue; + } + if bookie + .write(format!( + "process_multiple_changes(ensure):{}", + change.actor_id.as_simple() + )) + .await + .ensure(change.actor_id) + .read(format!( + "process_multiple_changes(contains?):{}", + change.actor_id.as_simple() + )) + .await + .contains_all(change.versions(), change.seqs()) + { + continue; + } + + unknown_changes.push((change, src)); + } + + unknown_changes.sort_by_key(|(change, _src)| change.actor_id); + + let mut conn = agent.pool().write_normal().await?; + + let changesets = block_in_place(|| { + let start = Instant::now(); + let tx = conn + .immediate_transaction() + .map_err(|source| ChangeError::Rusqlite { + source, + actor_id: None, + version: None, + })?; + + let mut knowns: BTreeMap> = BTreeMap::new(); + let mut changesets = vec![]; + + let mut last_db_version = None; + + for (actor_id, changes) in unknown_changes + .into_iter() + .group_by(|(change, _src)| change.actor_id) + .into_iter() + { + // get a lock on the actor id's booked writer if we didn't already + { + let booked = { + bookie + .blocking_write(format!( + "process_multiple_changes(for_actor_blocking):{}", + actor_id.as_simple() + )) + .ensure(actor_id) + }; + let booked_write = booked.blocking_write(format!( + "process_multiple_changes(booked writer):{}", + actor_id.as_simple() + )); + + let mut seen = RangeInclusiveMap::new(); + + for (change, src) in changes { + trace!("handling a single changeset: {change:?}"); + let seqs = change.seqs(); + if booked_write.contains_all(change.versions(), change.seqs()) { + trace!( + "previously unknown versions are now deemed known, aborting inserts" + ); + continue; + } + + let versions = change.versions(); + + // check if we've seen this version here... + if versions.clone().all(|version| match seqs { + Some(check_seqs) => match seen.get(&version) { + Some(known) => match known { + KnownDbVersion::Partial(PartialVersion { seqs, .. }) => { + check_seqs.clone().all(|seq| seqs.contains(&seq)) + } + KnownDbVersion::Current { .. } | KnownDbVersion::Cleared => true, + }, + None => false, + }, + None => seen.contains_key(&version), + }) { + continue; + } + + // optimizing this, insert later! + let known = if change.is_complete() && change.is_empty() { + // we never want to block here + if let Err(e) = agent.tx_empty().try_send((actor_id, change.versions())) { + error!("could not send empty changed versions into channel: {e}"); + } + + KnownDbVersion::Cleared + } else { + if let Some(seqs) = change.seqs() { + if seqs.end() < seqs.start() { + warn!(%actor_id, versions = ?change.versions(), "received an invalid change, seqs start is greater than seqs end: {seqs:?}"); + continue; + } + } + + let (known, versions) = match process_single_version( + &agent, + &tx, + last_db_version, + change, + ) { + Ok((known, changeset)) => { + let versions = changeset.versions(); + if let KnownDbVersion::Current(CurrentVersion { + db_version, .. + }) = &known + { + last_db_version = Some(*db_version); + changesets.push((actor_id, changeset, *db_version, src)); + } + (known, versions) + } + Err(e) => { + error!(%actor_id, ?versions, "could not process single change: {e}"); + continue; + } + }; + debug!(%actor_id, self_actor_id = %agent.actor_id(), ?versions, "got known to insert: {known:?}"); + known + }; + + seen.insert(versions.clone(), known.clone()); + knowns.entry(actor_id).or_default().push((versions, known)); + } + } + } + + let mut count = 0; + + for (actor_id, knowns) in knowns.iter_mut() { + debug!(%actor_id, self_actor_id = %agent.actor_id(), "processing {} knowns", knowns.len()); + for (versions, known) in knowns.iter_mut() { + match known { + KnownDbVersion::Partial { .. } => { + continue; + } + KnownDbVersion::Current(CurrentVersion { + db_version, + last_seq, + ts, + }) => { + count += 1; + let version = versions.start(); + debug!(%actor_id, self_actor_id = %agent.actor_id(), %version, "inserting bookkeeping row db_version: {db_version}, ts: {ts:?}"); + tx.prepare_cached(" + INSERT INTO __corro_bookkeeping ( actor_id, start_version, db_version, last_seq, ts) + VALUES (:actor_id, :start_version, :db_version, :last_seq, :ts);").map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(*actor_id), version: Some(*version)})? + .execute(named_params!{ + ":actor_id": actor_id, + ":start_version": *version, + ":db_version": *db_version, + ":last_seq": *last_seq, + ":ts": *ts + }).map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(*actor_id), version: Some(*version)})?; + } + KnownDbVersion::Cleared => { + debug!(%actor_id, self_actor_id = %agent.actor_id(), ?versions, "inserting CLEARED bookkeeping"); + if let Err(e) = agent.tx_empty().try_send((*actor_id, versions.clone())) { + error!("could not schedule version to be cleared: {e}"); + } + } + } + debug!(%actor_id, self_actor_id = %agent.actor_id(), ?versions, "inserted bookkeeping row"); + } + } + + debug!("inserted {count} new changesets"); + + tx.commit().map_err(|source| ChangeError::Rusqlite { + source, + actor_id: None, + version: None, + })?; + + for (_, changeset, _, _) in changesets.iter() { + if let Some(ts) = changeset.ts() { + let dur = (agent.clock().new_timestamp().get_time() - ts.0).to_duration(); + histogram!("corro.agent.changes.commit.lag.seconds").record(dur); + } + } + + debug!("committed {count} changes in {:?}", start.elapsed()); + + for (actor_id, knowns) in knowns { + let booked = { + bookie + .blocking_write(format!( + "process_multiple_changes(for_actor_blocking):{}", + actor_id.as_simple() + )) + .ensure(actor_id) + }; + let mut booked_write = booked.blocking_write(format!( + "process_multiple_changes(booked writer, post commit):{}", + actor_id.as_simple() + )); + + for (versions, known) in knowns { + let version = *versions.start(); + // this merges partial version seqs + if let Some(PartialVersion { seqs, last_seq, .. }) = + booked_write.insert_many(versions, known) + { + let full_seqs_range = CrsqlSeq(0)..=last_seq; + let gaps_count = seqs.gaps(&full_seqs_range).count(); + if gaps_count == 0 { + // if we have no gaps, then we can schedule applying all these changes. + debug!(%actor_id, %version, "we now have all versions, notifying for background jobber to insert buffered changes! seqs: {seqs:?}, expected full seqs: {full_seqs_range:?}"); + let tx_apply = agent.tx_apply().clone(); + tokio::spawn(async move { + if let Err(e) = tx_apply.send((actor_id, version)).await { + error!("could not send trigger for applying fully buffered changes later: {e}"); + } + }); + } else { + debug!(%actor_id, %version, "still have {gaps_count} gaps in partially buffered seqs"); + } + } + } + } + + Ok::<_, ChangeError>(changesets) + })?; + + for (_actor_id, changeset, db_version, _src) in changesets { + agent + .subs_manager() + .match_changes(changeset.changes(), db_version); + } + + histogram!("corro.agent.changes.processing.time.seconds").record(start.elapsed()); + + Ok(()) +} + +#[tracing::instrument(skip_all, err)] +pub fn process_single_version( + agent: &Agent, + trans: &Transaction, + last_db_version: Option, + change: ChangeV1, +) -> rusqlite::Result<(KnownDbVersion, Changeset)> { + let ChangeV1 { + actor_id, + changeset, + } = change; + + let versions = changeset.versions(); + + let (known, changeset) = if changeset.is_complete() { + let (known, changeset) = util::process_complete_version( + trans, + actor_id, + last_db_version, + versions, + changeset + .into_parts() + .expect("no changeset parts, this shouldn't be happening!"), + )?; + + if util::check_buffered_meta_to_clear(trans, actor_id, changeset.versions())? { + if let Err(e) = agent + .tx_clear_buf() + .try_send((actor_id, changeset.versions())) + { + error!("could not schedule buffered meta clear: {e}"); + } + } + + (known, changeset) + } else { + let parts = changeset.into_parts().unwrap(); + let known = util::process_incomplete_version(trans, actor_id, &parts)?; + + (known, parts.into()) + }; + + Ok((known, changeset)) +} diff --git a/crates/corro-agent/src/lib.rs b/crates/corro-agent/src/lib.rs index 26604eab..973cd557 100644 --- a/crates/corro-agent/src/lib.rs +++ b/crates/corro-agent/src/lib.rs @@ -3,4 +3,5 @@ pub mod agent; pub mod api; pub mod broadcast; pub mod transport; +pub mod change; From d8d57a36a16ae170dcafe6c3cf0ca06a1245a9db Mon Sep 17 00:00:00 2001 From: Katharina Fey Date: Tue, 6 Feb 2024 14:35:15 +0100 Subject: [PATCH 2/5] Exclude direnv from git --- .gitignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 17860e72..b1b3e3fe 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ /schema /config.toml /data -/book \ No newline at end of file +/book +.direnv/** From 1df5346cb92e78e7126f516dd56ced9c576986a9 Mon Sep 17 00:00:00 2001 From: Katharina Fey Date: Tue, 6 Feb 2024 14:35:39 +0100 Subject: [PATCH 3/5] Add dev tooling for creating cluster arrangements from a config --- Cargo.lock | 169 ++++++++++-- Cargo.toml | 2 + crates/corro-devcluster/Cargo.toml | 13 + crates/corro-devcluster/README.md | 11 + .../example_topologies/a_b.txt | 1 + .../example_topologies/a_b_c_circle.txt | 3 + crates/corro-devcluster/src/main.rs | 258 ++++++++++++++++++ crates/corro-devcluster/src/topology/mod.rs | 52 ++++ 8 files changed, 479 insertions(+), 30 deletions(-) create mode 100644 crates/corro-devcluster/Cargo.toml create mode 100644 crates/corro-devcluster/README.md create mode 100644 crates/corro-devcluster/example_topologies/a_b.txt create mode 100644 crates/corro-devcluster/example_topologies/a_b_c_circle.txt create mode 100644 crates/corro-devcluster/src/main.rs create mode 100644 crates/corro-devcluster/src/topology/mod.rs diff --git a/Cargo.lock b/Cargo.lock index e6c140da..f7e3d638 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -23,7 +23,7 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47" dependencies = [ - "getrandom", + "getrandom 0.2.9", "once_cell", "version_check", ] @@ -36,7 +36,7 @@ checksum = "91429305e9f0a25f6205c5b8e0d2db09e0708a7a6df0f42212bb56c32c8ac97a" dependencies = [ "cfg-if", "const-random", - "getrandom", + "getrandom 0.2.9", "once_cell", "version_check", "zerocopy", @@ -290,7 +290,7 @@ dependencies = [ name = "backoff" version = "0.1.0" dependencies = [ - "rand", + "rand 0.8.5", ] [[package]] @@ -664,7 +664,7 @@ version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d7d6ab3c3a2282db210df5f02c4dab6e0a7057af0fb7ebd4070f30fe05c0ddb" dependencies = [ - "getrandom", + "getrandom 0.2.9", "once_cell", "proc-macro-hack", "tiny-keccak", @@ -751,7 +751,7 @@ dependencies = [ "http-body", "hyper", "indexmap 2.1.0", - "itertools", + "itertools 0.10.5", "metrics", "opentelemetry", "parking_lot", @@ -759,7 +759,7 @@ dependencies = [ "quinn-plaintext", "quinn-proto", "quoted-string", - "rand", + "rand 0.8.5", "rangemap", "rusqlite", "rustls", @@ -847,6 +847,19 @@ dependencies = [ "uuid", ] +[[package]] +name = "corro-devcluster" +version = "0.1.0" +dependencies = [ + "clap", + "nom", + "rand 0.8.5", + "random_graphs", + "serde", + "serde_json", + "thiserror", +] + [[package]] name = "corro-pg" version = "0.1.0" @@ -859,7 +872,7 @@ dependencies = [ "fallible-iterator 0.3.0", "futures", "hex", - "itertools", + "itertools 0.10.5", "metrics", "pgwire", "postgres-types", @@ -953,12 +966,12 @@ dependencies = [ "futures", "hex", "indexmap 2.1.0", - "itertools", + "itertools 0.10.5", "metrics", "once_cell", "opentelemetry", "parking_lot", - "rand", + "rand 0.8.5", "rangemap", "rcgen", "rusqlite", @@ -1495,6 +1508,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fcfdc7a0362c9f4444381a9e697c79d435fe65b52a37466fc2c1184cee9edc6" +[[package]] +name = "fixedbitset" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37ab347416e802de484e4d03c7316c48f1ecb56574dfd4a46a80f173ce1de04d" + [[package]] name = "fnv" version = "1.0.7" @@ -1510,7 +1529,7 @@ dependencies = [ "anyhow", "bincode", "bytes", - "rand", + "rand 0.8.5", "serde", "tracing", ] @@ -1632,6 +1651,17 @@ dependencies = [ "version_check", ] +[[package]] +name = "getrandom" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fc3cb4d91f53b50155bdcfd23f6a4c39ae1969c2ae85982b135750cccaf5fce" +dependencies = [ + "cfg-if", + "libc", + "wasi 0.9.0+wasi-snapshot-preview1", +] + [[package]] name = "getrandom" version = "0.2.9" @@ -1640,7 +1670,7 @@ checksum = "c85e1d9ab2eadba7e5040d4e09cbd6d072b76a557ad64e797c2cb9d4da21d7e4" dependencies = [ "cfg-if", "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", ] [[package]] @@ -2057,6 +2087,15 @@ version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "616cde7c720bb2bb5824a224687d8f77bfd38922027f01d825cd7453be5099fb" +[[package]] +name = "itertools" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "284f18f85651fe11e8a991b2adb42cb078325c996ed026d994719efcfca1d54b" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.10.5" @@ -2385,7 +2424,7 @@ checksum = "3dce281c5e46beae905d4de1870d8b1509a9142b62eedf18b443b011ca8343d0" dependencies = [ "libc", "log", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys 0.48.0", ] @@ -2601,7 +2640,7 @@ dependencies = [ "opentelemetry_api", "ordered-float", "percent-encoding", - "rand", + "rand 0.8.5", "regex", "serde_json", "thiserror", @@ -2675,6 +2714,16 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e" +[[package]] +name = "petgraph" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "467d164a6de56270bd7c4d070df81d07beace25012d5103ced4e9ff08d6afdb7" +dependencies = [ + "fixedbitset", + "indexmap 1.9.3", +] + [[package]] name = "pgwire" version = "0.16.1" @@ -2692,7 +2741,7 @@ dependencies = [ "log", "md5", "postgres-types", - "rand", + "rand 0.8.5", "ring", "stringprep", "thiserror", @@ -2729,7 +2778,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b1181c94580fa345f50f19d738aaa39c0ed30a600d95cb2d3e23f94266f14fbf" dependencies = [ "phf_shared", - "rand", + "rand 0.8.5", ] [[package]] @@ -2799,7 +2848,7 @@ dependencies = [ "hmac", "md-5", "memchr", - "rand", + "rand 0.8.5", "sha2", "stringprep", ] @@ -2831,7 +2880,7 @@ checksum = "09963355b9f467184c04017ced4a2ba2d75cbcb4e7462690d388233253d4b1a9" dependencies = [ "anstyle", "difflib", - "itertools", + "itertools 0.10.5", "predicates-core", ] @@ -2917,7 +2966,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4" dependencies = [ "anyhow", - "itertools", + "itertools 0.10.5", "proc-macro2", "quote", "syn 1.0.109", @@ -2933,7 +2982,7 @@ dependencies = [ "libc", "once_cell", "raw-cpuid", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "web-sys", "winapi", ] @@ -2979,7 +3028,7 @@ version = "0.10.5" source = "git+https://github.com/jeromegn/quinn?rev=108f25a6#108f25a6d45ce0c41acf2d87f8d0b2d35fedfbaa" dependencies = [ "bytes", - "rand", + "rand 0.8.5", "ring", "rustc-hash", "rustls", @@ -3018,6 +3067,19 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a206a30ce37189d1340e7da2ee0b4d65e342590af676541c23a4f3959ba272e" +[[package]] +name = "rand" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" +dependencies = [ + "getrandom 0.1.16", + "libc", + "rand_chacha 0.2.2", + "rand_core 0.5.1", + "rand_hc", +] + [[package]] name = "rand" version = "0.8.5" @@ -3025,8 +3087,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", - "rand_chacha", - "rand_core", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_chacha" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" +dependencies = [ + "ppv-lite86", + "rand_core 0.5.1", ] [[package]] @@ -3036,7 +3108,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_core" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" +dependencies = [ + "getrandom 0.1.16", ] [[package]] @@ -3045,7 +3126,29 @@ version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" dependencies = [ - "getrandom", + "getrandom 0.2.9", +] + +[[package]] +name = "rand_hc" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" +dependencies = [ + "rand_core 0.5.1", +] + +[[package]] +name = "random_graphs" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7ace5d2f73bca32195a82a8bf345f274ce4712046de915163c0d8702d9e13fe" +dependencies = [ + "itertools 0.9.0", + "num-integer", + "petgraph", + "rand 0.7.3", + "thiserror", ] [[package]] @@ -4029,7 +4132,7 @@ dependencies = [ "pin-project-lite", "postgres-protocol", "postgres-types", - "rand", + "rand 0.8.5", "socket2 0.5.5", "tokio", "tokio-util", @@ -4147,7 +4250,7 @@ dependencies = [ "indexmap 1.9.3", "pin-project", "pin-project-lite", - "rand", + "rand 0.8.5", "slab", "tokio", "tokio-util", @@ -4327,7 +4430,7 @@ dependencies = [ "idna 0.2.3", "ipnet", "lazy_static", - "rand", + "rand 0.8.5", "smallvec", "thiserror", "tinyvec", @@ -4374,7 +4477,7 @@ dependencies = [ "http", "httparse", "log", - "rand", + "rand 0.8.5", "sha1", "thiserror", "url", @@ -4397,7 +4500,7 @@ dependencies = [ "humantime", "lazy_static", "log", - "rand", + "rand 0.8.5", "serde", "spin 0.9.8", ] @@ -4495,7 +4598,7 @@ version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b55a3fef2a1e3b3a00ce878640918820d3c51081576ac657d23af9fc7928fdb" dependencies = [ - "getrandom", + "getrandom 0.2.9", "serde", ] @@ -4546,6 +4649,12 @@ dependencies = [ "try-lock", ] +[[package]] +name = "wasi" +version = "0.9.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" diff --git a/Cargo.toml b/Cargo.toml index 0502be00..711ba7ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ indexmap = { version = "2.1.0", features = ["serde"] } itertools = { version = "0.10.5" } metrics = "0.22.0" metrics-exporter-prometheus = { version = "0.13.0", default-features = false, features = ["http-listener"] } +nom = "7.0" once_cell = "1.17.1" opentelemetry = { version = "0.20.0", features = ["rt-tokio"] } opentelemetry-otlp = { version = "0.13.0" } @@ -47,6 +48,7 @@ quinn-proto = "0.10.5" quinn-plaintext = { version = "0.2.0" } quoted-string = "0.6.1" rand = { version = "0.8.5", features = ["small_rng"] } +random_graphs = "0.1" rangemap = { version = "1.4.0" } rcgen = { version = "0.11.1", features = ["x509-parser"] } rhai = { version = "1.15.1", features = ["sync"] } diff --git a/crates/corro-devcluster/Cargo.toml b/crates/corro-devcluster/Cargo.toml new file mode 100644 index 00000000..83f71b37 --- /dev/null +++ b/crates/corro-devcluster/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "corro-devcluster" +version = "0.1.0" +edition = "2021" + +[dependencies] +serde = { workspace = true, features = [ "derive" ] } +serde_json = { workspace = true } +thiserror = { workspace = true } +rand = { workspace = true } +random_graphs = { workspace = true } +nom = { workspace = true } +clap = { workspace = true } \ No newline at end of file diff --git a/crates/corro-devcluster/README.md b/crates/corro-devcluster/README.md new file mode 100644 index 00000000..6b6a2626 --- /dev/null +++ b/crates/corro-devcluster/README.md @@ -0,0 +1,11 @@ +# corro-devcluster + +A tool to quickly start a corrosion cluster for development. + +- Build corrosion with nix (keep in mind that changes to the + `Cargo.lock` file need to be checked into git to be visible to a nix + build) +- Provide a topology config (`.txt` is fine, see + [example_topologies](example_topologies/)) +- Provide a state directory. Node state and log output will be kept + in separated directories. diff --git a/crates/corro-devcluster/example_topologies/a_b.txt b/crates/corro-devcluster/example_topologies/a_b.txt new file mode 100644 index 00000000..5d42c59d --- /dev/null +++ b/crates/corro-devcluster/example_topologies/a_b.txt @@ -0,0 +1 @@ +A -> B diff --git a/crates/corro-devcluster/example_topologies/a_b_c_circle.txt b/crates/corro-devcluster/example_topologies/a_b_c_circle.txt new file mode 100644 index 00000000..01b86788 --- /dev/null +++ b/crates/corro-devcluster/example_topologies/a_b_c_circle.txt @@ -0,0 +1,3 @@ +A -> B +B -> C +C -> A diff --git a/crates/corro-devcluster/src/main.rs b/crates/corro-devcluster/src/main.rs new file mode 100644 index 00000000..a9129947 --- /dev/null +++ b/crates/corro-devcluster/src/main.rs @@ -0,0 +1,258 @@ +mod topology; + +use clap::{Parser, Subcommand}; +use std::{ + collections::{BTreeMap, HashMap}, + env, + fs::File, + io::{Read, Write}, + path::PathBuf, + process::Command, + sync::mpsc::{channel, Sender}, + thread, + time::Duration, +}; + +use crate::topology::Simple; + +#[derive(Parser)] +#[clap(version = env!("CARGO_PKG_VERSION"))] +struct Cli { + /// Set the state directory path. If not set the environment + /// variable CORRO_DEVCLUSTER_STATE_DIR will be used + #[clap(long = "statedir", short = 'd', global = true)] + state_directory: Option, + + /// Set the state directory path. If not set the environment + /// variable CORRO_DEVCLUSTER_SCHEMA_DIR will be used + #[clap(long = "schemadir", short = 's', global = true)] + schema_directory: Option, + + /// Provide the binary path for corrosion. If none is provided, + /// corrosion will be built with nix (which may take a minute) + #[clap(long = "binpath", short = 'b', global = true)] + binary_path: Option, + + #[command(subcommand)] + command: CliCommand, +} + +#[derive(Subcommand)] +enum CliCommand { + /// Create a simple topology in format `A -> B`, `B -> C`, etc + Simple { + /// Set the topology file path + topology_path: PathBuf, + }, +} + +fn main() { + let cli: Cli = Cli::parse(); + + let state_dir = match cli + .state_directory + .or(env::var("CORRO_DEVCLUSTER_STATE_DIR") + .ok() + .map(|path| PathBuf::new().join(path))) + { + Some(dir) => dir, + None => { + eprintln!("FAILED: either pass `--statedir` or set 'CORRO_DEVCLUSTER_STATE_DIR' environment variable!"); + std::process::exit(1); + } + }; + + let schema_dir = match cli + .schema_directory + .or(env::var("CORRO_DEVCLUSTER_SCHEMA_DIR") + .ok() + .map(|path| PathBuf::new().join(path))) + { + Some(dir) => dir, + None => { + eprintln!("FAILED: either pass `--statedir` or set 'CORRO_DEVCLUSTER_STATE_DIR' environment variable!"); + std::process::exit(1); + } + }; + + let bin_path = cli + .binary_path + .or_else(|| build_corrosion().map(|h| h.path)) + .expect("failed to determine corrosion binary location!"); + + match cli.command { + CliCommand::Simple { topology_path } => { + let mut topo_config = File::open(topology_path).expect("failed to open topology-file!"); + let mut topo_buffer = String::new(); + topo_config + .read_to_string(&mut topo_buffer) + .expect("failed to read topology-file!"); + + let mut topology = Simple::default(); + topo_buffer.lines().for_each(|line| { + topology + .parse_edge(line) + .expect("Syntax error in topology-file!"); + }); + + run_simple_topology(topology, bin_path, state_dir, schema_dir); + } + } + + // let handle = build_corrosion(env::args().next().map(|s| PathBuf::new().join(s)).unwrap()); + // println!("{:#?}", handle); +} + +fn run_simple_topology(topo: Simple, bin_path: String, state_dir: PathBuf, schema_dir: PathBuf) { + println!("//// Creating topology: \n{:#?}", topo); + let nodes = topo.get_all_nodes(); + + let mut port_map = BTreeMap::default(); + + // First go assign ports to all the nodes + for node_name in &nodes { + // Generate a port in range 1025 - 32768 + let node_port: u16 = 1025 + rand::random::() % (32 * 1024) - 1025; + port_map.insert(node_name.clone(), node_port); + } + + // Then generate each config with the appropriate bootstrap_set + for node_name in &nodes { + let node_port = port_map.get(node_name).unwrap(); // We just put it there + let node_state = state_dir.join(node_name); + + // Delete / create the node state directory + let _ = std::fs::remove_dir(&node_state); + let _ = std::fs::create_dir_all(&node_state); + + let mut bootstrap_set = vec![]; + for link in topo.inner.get(node_name).unwrap() { + bootstrap_set.push(format!( + "\"[::1]:{}\"", // only connect locally + port_map.get(link).expect("Port for node not set!") + )); + } + + let node_config = generate_config( + node_state.to_str().unwrap(), + schema_dir.to_str().unwrap(), + *node_port, + bootstrap_set, + ); + + println!( + "Generated config for node '{}': \n{}", + node_name, node_config + ); + + let mut config_file = File::create(node_state.join("config.toml")) + .expect("failed to create node config file"); + config_file + .write_all(node_config.as_bytes()) + .expect("failed to write node config file"); + } + + let (tx, rx) = channel::<()>(); + + // Spawn nodes those without bootstraps first if they exist. + for (pure_responder, _) in topo.inner.iter().filter(|(_, vec)| vec.is_empty()) { + run_corrosion(tx.clone(), bin_path.clone(), state_dir.join(pure_responder)); + thread::sleep(Duration::from_millis(250)); // give the start thread a bit of time to breathe + } + + for (initiator, _) in topo.inner.iter().filter(|(_, vec)| !vec.is_empty()) { + run_corrosion(tx.clone(), bin_path.clone(), state_dir.join(initiator)); + thread::sleep(Duration::from_millis(250)); // give the start thread a bit of time to breathe + } + + // wait for the threads + while let Ok(()) = rx.recv() {} + Command::new("pkill") + .arg("corrosion") + .output() + .expect("failed to gracefully kill corrosions. They've become sentient!!!"); +} + +fn generate_config( + state_dir: &str, + schema_dir: &str, + port: u16, + bootstrap_set: Vec, +) -> String { + let bootstrap = bootstrap_set.join(","); + format!( + r#"[db] +path = "{state_dir}/corrosion.db" +schema_paths = ["{schema_dir}"] + +[gossip] +addr = "[::]:{port}" +external_addr = "[::1]:{port}" +bootstrap = [{bootstrap}] +plaintext = true + +[api] +addr = "127.0.0.1:{api_port}" + +[admin] +path = "{state_dir}/admin.sock" +"#, + state_dir = state_dir, + schema_dir = schema_dir, + port = port, + // the chances of a collision here are very very small since + // every port is random + api_port = port + 1, + bootstrap = bootstrap + ) +} + +#[derive(Debug)] +struct BinHandle { + path: String, +} + +fn nix_output(vec: &Vec) -> Vec> { + serde_json::from_slice(vec).unwrap() +} + +fn run_corrosion(tx: Sender<()>, bin_path: String, state_path: PathBuf) { + let node_log = File::create(state_path.join("node.log")).expect("couldn't create log file"); + let mut cmd = Command::new(bin_path); + + cmd.args([ + "-c", + state_path.join("config.toml").to_str().unwrap(), + "agent", + ]); + + cmd.stdout(node_log); + let mut cmd_handle = cmd.spawn().expect("failed to spawn corrosion!"); + + thread::spawn(move || { + println!("Waiting for node..."); + cmd_handle + .wait() + .expect("corrosion node has encountered an error!"); + tx.send(()).unwrap(); + println!("Node completed") + }); +} + +fn build_corrosion() -> Option { + println!("Running 'nix build' ..."); + let build_output = Command::new("nix") + .args(["build", "--json"]) + .output() + .ok()?; + + let json = nix_output(&build_output.stdout).remove(0); + + Some(BinHandle { + path: json + .get("outputs")? + .get("out")? + .to_string() + .replace("\"", ""), + }) +} diff --git a/crates/corro-devcluster/src/topology/mod.rs b/crates/corro-devcluster/src/topology/mod.rs new file mode 100644 index 00000000..894fad9a --- /dev/null +++ b/crates/corro-devcluster/src/topology/mod.rs @@ -0,0 +1,52 @@ +use nom::{ + bytes::complete::tag, + character::complete::{alpha1, multispace0}, + sequence::delimited, + IResult, +}; +use std::collections::{BTreeMap, BTreeSet}; + +/// Topology definition as a set of graph edges +#[derive(Debug, Default)] +pub struct Simple { + pub(crate) inner: BTreeMap>, +} + +impl Simple { + /// Parse a topology line in the following format: + /// + /// A -> B + /// B -> C + /// A -> C + /// etc + pub fn parse_edge<'top, 'input>(&mut self, input: &'input str) -> IResult<&'input str, ()> { + let (input, first) = alpha1(input)?; + let (input, _) = delimited(multispace0, tag("->"), multispace0)(input)?; + let (input, second) = alpha1(input)?; + + // Add first -> second edge + self.inner + .entry(first.to_string()) + .or_default() + .push(second.to_string()); + // Add second to the map if it doesn't exist yet but don't + // create the connection edge + self.inner.entry(second.to_string()).or_default(); + + Ok((input, ())) + } + + pub fn get_all_nodes(&self) -> Vec { + self.inner + .keys() + .fold(BTreeSet::new(), |mut acc, key| { + acc.insert(key.clone()); + self.inner.get(key).unwrap().iter().for_each(|value| { + acc.insert(value.clone()); + }); + acc + }) + .into_iter() + .collect() + } +} From fd79cf12d5b13f48f37120894072a3425b548fb9 Mon Sep 17 00:00:00 2001 From: Katharina Fey Date: Wed, 7 Feb 2024 17:05:18 +0100 Subject: [PATCH 4/5] Fix nix builds and speed up compilation for development --- flake.nix | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/flake.nix b/flake.nix index 6f9f5231..477eb46c 100644 --- a/flake.nix +++ b/flake.nix @@ -54,10 +54,15 @@ ulimit -n 65536 ''; + # Useful when doing development builds + doCheck = false; + buildType = "debug"; + ## Build environment dependencies nativeBuildInputs = [ pkgs.pkg-config pkgs.mold + pkgs.clang rust-latest.toolchain ]; From 3db082f9df4097fc99e467aa070fe7715b57fb39 Mon Sep 17 00:00:00 2001 From: Katharina Fey Date: Thu, 8 Feb 2024 14:17:38 +0100 Subject: [PATCH 5/5] Implement priority broadcasts to ring0 nodes --- Cargo.lock | 159 +++------------- Cargo.toml | 1 - crates/corro-agent/src/agent/handlers.rs | 2 +- crates/corro-agent/src/agent/uni.rs | 50 ++++- crates/corro-agent/src/api/peer.rs | 5 +- crates/corro-agent/src/broadcast/mod.rs | 82 ++------ crates/corro-agent/src/broadcast/send.rs | 102 ++++++++++ crates/corro-agent/src/change.rs | 233 ++++++++++++++--------- crates/corro-agent/src/lib.rs | 3 +- crates/corro-base-types/src/lib.rs | 3 + crates/corro-devcluster/Cargo.toml | 1 - crates/corro-types/src/broadcast.rs | 3 + flake.nix | 2 +- 13 files changed, 348 insertions(+), 298 deletions(-) create mode 100644 crates/corro-agent/src/broadcast/send.rs diff --git a/Cargo.lock b/Cargo.lock index f7e3d638..f81f1f9f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -23,7 +23,7 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47" dependencies = [ - "getrandom 0.2.9", + "getrandom", "once_cell", "version_check", ] @@ -36,7 +36,7 @@ checksum = "91429305e9f0a25f6205c5b8e0d2db09e0708a7a6df0f42212bb56c32c8ac97a" dependencies = [ "cfg-if", "const-random", - "getrandom 0.2.9", + "getrandom", "once_cell", "version_check", "zerocopy", @@ -290,7 +290,7 @@ dependencies = [ name = "backoff" version = "0.1.0" dependencies = [ - "rand 0.8.5", + "rand", ] [[package]] @@ -664,7 +664,7 @@ version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d7d6ab3c3a2282db210df5f02c4dab6e0a7057af0fb7ebd4070f30fe05c0ddb" dependencies = [ - "getrandom 0.2.9", + "getrandom", "once_cell", "proc-macro-hack", "tiny-keccak", @@ -751,7 +751,7 @@ dependencies = [ "http-body", "hyper", "indexmap 2.1.0", - "itertools 0.10.5", + "itertools", "metrics", "opentelemetry", "parking_lot", @@ -759,7 +759,7 @@ dependencies = [ "quinn-plaintext", "quinn-proto", "quoted-string", - "rand 0.8.5", + "rand", "rangemap", "rusqlite", "rustls", @@ -853,8 +853,7 @@ version = "0.1.0" dependencies = [ "clap", "nom", - "rand 0.8.5", - "random_graphs", + "rand", "serde", "serde_json", "thiserror", @@ -872,7 +871,7 @@ dependencies = [ "fallible-iterator 0.3.0", "futures", "hex", - "itertools 0.10.5", + "itertools", "metrics", "pgwire", "postgres-types", @@ -966,12 +965,12 @@ dependencies = [ "futures", "hex", "indexmap 2.1.0", - "itertools 0.10.5", + "itertools", "metrics", "once_cell", "opentelemetry", "parking_lot", - "rand 0.8.5", + "rand", "rangemap", "rcgen", "rusqlite", @@ -1508,12 +1507,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fcfdc7a0362c9f4444381a9e697c79d435fe65b52a37466fc2c1184cee9edc6" -[[package]] -name = "fixedbitset" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37ab347416e802de484e4d03c7316c48f1ecb56574dfd4a46a80f173ce1de04d" - [[package]] name = "fnv" version = "1.0.7" @@ -1529,7 +1522,7 @@ dependencies = [ "anyhow", "bincode", "bytes", - "rand 0.8.5", + "rand", "serde", "tracing", ] @@ -1651,17 +1644,6 @@ dependencies = [ "version_check", ] -[[package]] -name = "getrandom" -version = "0.1.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fc3cb4d91f53b50155bdcfd23f6a4c39ae1969c2ae85982b135750cccaf5fce" -dependencies = [ - "cfg-if", - "libc", - "wasi 0.9.0+wasi-snapshot-preview1", -] - [[package]] name = "getrandom" version = "0.2.9" @@ -1670,7 +1652,7 @@ checksum = "c85e1d9ab2eadba7e5040d4e09cbd6d072b76a557ad64e797c2cb9d4da21d7e4" dependencies = [ "cfg-if", "libc", - "wasi 0.11.0+wasi-snapshot-preview1", + "wasi", ] [[package]] @@ -2087,15 +2069,6 @@ version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "616cde7c720bb2bb5824a224687d8f77bfd38922027f01d825cd7453be5099fb" -[[package]] -name = "itertools" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "284f18f85651fe11e8a991b2adb42cb078325c996ed026d994719efcfca1d54b" -dependencies = [ - "either", -] - [[package]] name = "itertools" version = "0.10.5" @@ -2424,7 +2397,7 @@ checksum = "3dce281c5e46beae905d4de1870d8b1509a9142b62eedf18b443b011ca8343d0" dependencies = [ "libc", "log", - "wasi 0.11.0+wasi-snapshot-preview1", + "wasi", "windows-sys 0.48.0", ] @@ -2640,7 +2613,7 @@ dependencies = [ "opentelemetry_api", "ordered-float", "percent-encoding", - "rand 0.8.5", + "rand", "regex", "serde_json", "thiserror", @@ -2714,16 +2687,6 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e" -[[package]] -name = "petgraph" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "467d164a6de56270bd7c4d070df81d07beace25012d5103ced4e9ff08d6afdb7" -dependencies = [ - "fixedbitset", - "indexmap 1.9.3", -] - [[package]] name = "pgwire" version = "0.16.1" @@ -2741,7 +2704,7 @@ dependencies = [ "log", "md5", "postgres-types", - "rand 0.8.5", + "rand", "ring", "stringprep", "thiserror", @@ -2778,7 +2741,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b1181c94580fa345f50f19d738aaa39c0ed30a600d95cb2d3e23f94266f14fbf" dependencies = [ "phf_shared", - "rand 0.8.5", + "rand", ] [[package]] @@ -2848,7 +2811,7 @@ dependencies = [ "hmac", "md-5", "memchr", - "rand 0.8.5", + "rand", "sha2", "stringprep", ] @@ -2880,7 +2843,7 @@ checksum = "09963355b9f467184c04017ced4a2ba2d75cbcb4e7462690d388233253d4b1a9" dependencies = [ "anstyle", "difflib", - "itertools 0.10.5", + "itertools", "predicates-core", ] @@ -2966,7 +2929,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4" dependencies = [ "anyhow", - "itertools 0.10.5", + "itertools", "proc-macro2", "quote", "syn 1.0.109", @@ -2982,7 +2945,7 @@ dependencies = [ "libc", "once_cell", "raw-cpuid", - "wasi 0.11.0+wasi-snapshot-preview1", + "wasi", "web-sys", "winapi", ] @@ -3028,7 +2991,7 @@ version = "0.10.5" source = "git+https://github.com/jeromegn/quinn?rev=108f25a6#108f25a6d45ce0c41acf2d87f8d0b2d35fedfbaa" dependencies = [ "bytes", - "rand 0.8.5", + "rand", "ring", "rustc-hash", "rustls", @@ -3067,19 +3030,6 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a206a30ce37189d1340e7da2ee0b4d65e342590af676541c23a4f3959ba272e" -[[package]] -name = "rand" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" -dependencies = [ - "getrandom 0.1.16", - "libc", - "rand_chacha 0.2.2", - "rand_core 0.5.1", - "rand_hc", -] - [[package]] name = "rand" version = "0.8.5" @@ -3087,18 +3037,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", - "rand_chacha 0.3.1", - "rand_core 0.6.4", -] - -[[package]] -name = "rand_chacha" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" -dependencies = [ - "ppv-lite86", - "rand_core 0.5.1", + "rand_chacha", + "rand_core", ] [[package]] @@ -3108,16 +3048,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core 0.6.4", -] - -[[package]] -name = "rand_core" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" -dependencies = [ - "getrandom 0.1.16", + "rand_core", ] [[package]] @@ -3126,29 +3057,7 @@ version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" dependencies = [ - "getrandom 0.2.9", -] - -[[package]] -name = "rand_hc" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" -dependencies = [ - "rand_core 0.5.1", -] - -[[package]] -name = "random_graphs" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7ace5d2f73bca32195a82a8bf345f274ce4712046de915163c0d8702d9e13fe" -dependencies = [ - "itertools 0.9.0", - "num-integer", - "petgraph", - "rand 0.7.3", - "thiserror", + "getrandom", ] [[package]] @@ -4132,7 +4041,7 @@ dependencies = [ "pin-project-lite", "postgres-protocol", "postgres-types", - "rand 0.8.5", + "rand", "socket2 0.5.5", "tokio", "tokio-util", @@ -4250,7 +4159,7 @@ dependencies = [ "indexmap 1.9.3", "pin-project", "pin-project-lite", - "rand 0.8.5", + "rand", "slab", "tokio", "tokio-util", @@ -4430,7 +4339,7 @@ dependencies = [ "idna 0.2.3", "ipnet", "lazy_static", - "rand 0.8.5", + "rand", "smallvec", "thiserror", "tinyvec", @@ -4477,7 +4386,7 @@ dependencies = [ "http", "httparse", "log", - "rand 0.8.5", + "rand", "sha1", "thiserror", "url", @@ -4500,7 +4409,7 @@ dependencies = [ "humantime", "lazy_static", "log", - "rand 0.8.5", + "rand", "serde", "spin 0.9.8", ] @@ -4598,7 +4507,7 @@ version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b55a3fef2a1e3b3a00ce878640918820d3c51081576ac657d23af9fc7928fdb" dependencies = [ - "getrandom 0.2.9", + "getrandom", "serde", ] @@ -4649,12 +4558,6 @@ dependencies = [ "try-lock", ] -[[package]] -name = "wasi" -version = "0.9.0+wasi-snapshot-preview1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" - [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" diff --git a/Cargo.toml b/Cargo.toml index 711ba7ec..d5ca8418 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,7 +48,6 @@ quinn-proto = "0.10.5" quinn-plaintext = { version = "0.2.0" } quoted-string = "0.6.1" rand = { version = "0.8.5", features = ["small_rng"] } -random_graphs = "0.1" rangemap = { version = "1.4.0" } rcgen = { version = "0.11.1", features = ["x509-parser"] } rhai = { version = "1.15.1", features = ["sync"] } diff --git a/crates/corro-agent/src/agent/handlers.rs b/crates/corro-agent/src/agent/handlers.rs index 915d6d41..89373176 100644 --- a/crates/corro-agent/src/agent/handlers.rs +++ b/crates/corro-agent/src/agent/handlers.rs @@ -108,7 +108,7 @@ pub fn spawn_incoming_connection_handlers( // Spawn handler tasks for this connection spawn_foca_handler(&agent, &tripwire, &conn); - uni::spawn_unipayload_handler(&tripwire, &conn, agent.clone()); + uni::spawn_unipayload_handler(&agent, &bookie, &tripwire, &conn); bi::spawn_bipayload_handler(&agent, &bookie, &tripwire, &conn); }); } diff --git a/crates/corro-agent/src/agent/uni.rs b/crates/corro-agent/src/agent/uni.rs index 6e9ec874..5044986f 100644 --- a/crates/corro-agent/src/agent/uni.rs +++ b/crates/corro-agent/src/agent/uni.rs @@ -1,5 +1,5 @@ use corro_types::{ - agent::Agent, + agent::{Agent, Bookie}, broadcast::{BroadcastV1, ChangeSource, UniPayload, UniPayloadV1}, }; use metrics::counter; @@ -11,12 +11,22 @@ use tripwire::Tripwire; /// Spawn a task that accepts unidirectional broadcast streams, then /// spawns another task for each incoming stream to handle. -pub fn spawn_unipayload_handler(tripwire: &Tripwire, conn: &quinn::Connection, agent: Agent) { +pub fn spawn_unipayload_handler( + agent: &Agent, + bookie: &Bookie, + tripwire: &Tripwire, + conn: &quinn::Connection, +) { + let agent = agent.clone(); + let bookie = bookie.clone(); tokio::spawn({ let conn = conn.clone(); let mut tripwire = tripwire.clone(); + async move { loop { + let agent = agent.clone(); + let bookie = bookie.clone(); let rx = tokio::select! { rx_res = conn.accept_uni() => match rx_res { Ok(rx) => rx, @@ -44,6 +54,9 @@ pub fn spawn_unipayload_handler(tripwire: &Tripwire, conn: &quinn::Connection, a let mut framed = FramedRead::new(rx, LengthDelimitedCodec::new()); loop { + let agent = agent.clone(); + let bookie = bookie.clone(); + match StreamExt::next(&mut framed).await { Some(Ok(b)) => { counter!("corro.peer.stream.bytes.recv.total", "type" => "uni") @@ -59,6 +72,39 @@ pub fn spawn_unipayload_handler(tripwire: &Tripwire, conn: &quinn::Connection, a change, )), cluster_id, + priority, + } if priority => { + if cluster_id != agent.cluster_id() { + continue; + } + + tokio::spawn(async move { + if let Err(e) = + crate::change::process_multiple_changes( + agent, + bookie, + vec![( + change, + ChangeSource::Broadcast, + std::time::Instant::now(), + )], + ) + .await + { + error!( + "process_priority_change failed: {:?}", + e + ); + } + }); + } + UniPayload::V1 { + data: + UniPayloadV1::Broadcast(BroadcastV1::Change( + change, + )), + cluster_id, + .. } => { if cluster_id != agent.cluster_id() { continue; diff --git a/crates/corro-agent/src/api/peer.rs b/crates/corro-agent/src/api/peer.rs index b47e35af..3a5f75b2 100644 --- a/crates/corro-agent/src/api/peer.rs +++ b/crates/corro-agent/src/api/peer.rs @@ -1564,10 +1564,7 @@ mod tests { use tokio::sync::mpsc; use tripwire::Tripwire; - use crate::{ - agent::{process_multiple_changes, setup}, - api::public::api_v1_db_schema, - }; + use crate::{agent::setup, api::public::api_v1_db_schema, change::process_multiple_changes}; use super::*; diff --git a/crates/corro-agent/src/broadcast/mod.rs b/crates/corro-agent/src/broadcast/mod.rs index 5f8ed0ab..cb0f1a66 100644 --- a/crates/corro-agent/src/broadcast/mod.rs +++ b/crates/corro-agent/src/broadcast/mod.rs @@ -1,3 +1,5 @@ +mod send; + use std::{ collections::{hash_map::Entry, HashMap, HashSet}, net::SocketAddr, @@ -11,7 +13,7 @@ use std::{ }; use bincode::DefaultOptions; -use bytes::{BufMut, Bytes, BytesMut}; +use bytes::{Bytes, BytesMut}; use foca::{BincodeCodec, Foca, NoCustomBroadcast, Notification, Timer}; use futures::{ stream::{FusedStream, FuturesUnordered}, @@ -22,7 +24,6 @@ use parking_lot::RwLock; use rand::{rngs::StdRng, seq::IteratorRandom, SeedableRng}; use rusqlite::params; use spawn::spawn_counted; -use speedy::Writable; use strum::EnumDiscriminants; use tokio::{ sync::mpsc, @@ -30,18 +31,18 @@ use tokio::{ time::interval, }; use tokio_stream::StreamExt; -use tokio_util::codec::{Encoder, LengthDelimitedCodec}; +use tokio_util::codec::LengthDelimitedCodec; use tracing::{debug, error, log::info, trace, warn}; use tripwire::Tripwire; use corro_types::{ actor::{Actor, ActorId}, agent::Agent, - broadcast::{BroadcastInput, DispatchRuntime, FocaCmd, FocaInput, UniPayload, UniPayloadV1}, + broadcast::{BroadcastInput, DispatchRuntime, FocaCmd, FocaInput}, channel::{bounded, CorroReceiver, CorroSender}, }; -use crate::transport::Transport; +use crate::{broadcast::send::dispatch_broadcast, transport::Transport}; #[derive(Clone)] struct TimerSpawner { @@ -363,8 +364,6 @@ pub fn runtime_loop( }); tokio::spawn(async move { - const BROADCAST_CUTOFF: usize = 64 * 1024; - let mut bcast_codec = LengthDelimitedCodec::new(); let mut bcast_buf = BytesMut::new(); @@ -443,66 +442,21 @@ pub fn runtime_loop( } Branch::Broadcast(input) => { trace!("handling Branch::Broadcast"); - let (bcast, is_local) = match input { - BroadcastInput::Rebroadcast(bcast) => (bcast, false), - BroadcastInput::AddBroadcast(bcast) => (bcast, true), - }; - trace!("adding broadcast: {bcast:?}, local? {is_local}"); - - if let Err(e) = (UniPayload::V1 { - data: UniPayloadV1::Broadcast(bcast.clone()), - cluster_id: agent.cluster_id(), - }) - .write_to_stream((&mut ser_buf).writer()) + if dispatch_broadcast( + &agent, + &transport, + input, + &mut bcast_codec, + &mut ser_buf, + &mut single_bcast_buf, + &mut local_bcast_buf, + &mut bcast_buf, + &mut to_broadcast, + ) + .is_none() { - error!("could not encode UniPayload::V1 Broadcast: {e}"); - ser_buf.clear(); continue; } - trace!("ser buf len: {}", ser_buf.len()); - - if is_local { - if let Err(e) = - bcast_codec.encode(ser_buf.split().freeze(), &mut single_bcast_buf) - { - error!("could not encode local broadcast: {e}"); - single_bcast_buf.clear(); - continue; - } - - let payload = single_bcast_buf.split().freeze(); - - local_bcast_buf.extend_from_slice(&payload); - - { - let members = agent.members().read(); - for addr in members.ring0(agent.cluster_id()) { - // this spawns, so we won't be holding onto the read lock for long - tokio::spawn(transmit_broadcast( - payload.clone(), - transport.clone(), - addr, - )); - } - } - - if local_bcast_buf.len() >= BROADCAST_CUTOFF { - to_broadcast.push(PendingBroadcast::new_local( - local_bcast_buf.split().freeze(), - )); - } - } else { - if let Err(e) = bcast_codec.encode(ser_buf.split().freeze(), &mut bcast_buf) - { - error!("could not encode broadcast: {e}"); - bcast_buf.clear(); - continue; - } - - if bcast_buf.len() >= BROADCAST_CUTOFF { - to_broadcast.push(PendingBroadcast::new(bcast_buf.split().freeze())); - } - } } Branch::WokePendingBroadcast(pending) => { trace!("handling Branch::WokePendingBroadcast"); diff --git a/crates/corro-agent/src/broadcast/send.rs b/crates/corro-agent/src/broadcast/send.rs new file mode 100644 index 00000000..b9310e67 --- /dev/null +++ b/crates/corro-agent/src/broadcast/send.rs @@ -0,0 +1,102 @@ +use crate::{broadcast::transmit_broadcast, transport::Transport}; +use bytes::{BufMut, BytesMut}; +use corro_types::{ + agent::Agent, + broadcast::{BroadcastInput, UniPayload, UniPayloadV1}, +}; +use speedy::Writable; +use tokio_util::codec::{Encoder, LengthDelimitedCodec}; +use tracing::{error, info, trace}; + +use super::PendingBroadcast; + +#[inline] +pub fn dispatch_broadcast( + agent: &Agent, + transport: &Transport, + input: BroadcastInput, + bcast_codec: &mut LengthDelimitedCodec, + ser_buf: &mut BytesMut, + single_bcast_buf: &mut BytesMut, + local_bcast_buf: &mut BytesMut, + bcast_buf: &mut BytesMut, + to_broadcast: &mut Vec, +) -> Option<()> { + const BROADCAST_CUTOFF: usize = 64 * 1024; + + let (bcast, is_local) = match input { + BroadcastInput::Rebroadcast(bcast) => (bcast, false), + BroadcastInput::AddBroadcast(bcast) => (bcast, true), + }; + trace!("adding broadcast: {bcast:?}, local? {is_local}"); + + // Locally originating broadcasts are given higher priority + if is_local { + // todo: change this to encode a V1 payload before merging + info!("Generating a high priority broadcast!"); + if let Err(e) = (UniPayload::V1 { + data: UniPayloadV1::Broadcast(bcast), + cluster_id: agent.cluster_id(), + priority: true, + }) + .write_to_stream(ser_buf.writer()) + { + error!("could not encode UniPayload::V1 Broadcast: {e}"); + ser_buf.clear(); + return None; + } + + trace!("ser buf len: {}", ser_buf.len()); + + if let Err(e) = bcast_codec.encode(ser_buf.split().freeze(), single_bcast_buf) { + error!("could not encode local broadcast: {e}"); + single_bcast_buf.clear(); + return None; + } + + let payload = single_bcast_buf.split().freeze(); + local_bcast_buf.extend_from_slice(&payload); + + { + let members = agent.members().read(); + for addr in members.ring0(agent.cluster_id()) { + // this spawns, so we won't be holding onto the read lock for long + tokio::spawn(transmit_broadcast(payload.clone(), transport.clone(), addr)); + } + } + + if local_bcast_buf.len() >= BROADCAST_CUTOFF { + to_broadcast.push(PendingBroadcast::new_local( + local_bcast_buf.split().freeze(), + )); + } + } + // Re-broadcasts are given default priority + else { + info!("Generating a regular broadcast!"); + if let Err(e) = (UniPayload::V1 { + data: UniPayloadV1::Broadcast(bcast), + cluster_id: agent.cluster_id(), + priority: false, + }) + .write_to_stream(ser_buf.writer()) + { + error!("could not encode UniPayload::V1 Broadcast: {e}"); + ser_buf.clear(); + return None; + } + trace!("ser buf len: {}", ser_buf.len()); + + if let Err(e) = bcast_codec.encode(ser_buf.split().freeze(), bcast_buf) { + error!("could not encode broadcast: {e}"); + bcast_buf.clear(); + return None; + } + + if bcast_buf.len() >= BROADCAST_CUTOFF { + to_broadcast.push(PendingBroadcast::new(bcast_buf.split().freeze())); + } + } + + Some(()) +} diff --git a/crates/corro-agent/src/change.rs b/crates/corro-agent/src/change.rs index c5dcfeab..5a1b4e30 100644 --- a/crates/corro-agent/src/change.rs +++ b/crates/corro-agent/src/change.rs @@ -2,7 +2,7 @@ use crate::agent::util; use corro_types::{ actor::ActorId, agent::{Agent, Bookie, ChangeError, CurrentVersion, KnownDbVersion, PartialVersion}, - base::{CrsqlSeq, CrsqlDbVersion}, + base::{CrsqlDbVersion, CrsqlSeq, Version}, broadcast::{ChangeSource, ChangeV1, Changeset}, }; use itertools::Itertools; @@ -12,21 +12,19 @@ use rusqlite::{named_params, Transaction}; use std::{ cmp, collections::{BTreeMap, HashSet}, + ops::RangeInclusive, time::Instant, }; use tokio::task::block_in_place; use tracing::{debug, error, trace, warn}; -#[tracing::instrument(skip(agent, bookie, changes), err)] -pub async fn process_multiple_changes( - agent: Agent, - bookie: Bookie, +/// Filter the incoming change queue by versions and sequences that +/// have duplicates in the queue. Return all unknown changes sorted +/// by the changes ActorId. +pub async fn filter_changes( + bookie: &Bookie, changes: Vec<(ChangeV1, ChangeSource, Instant)>, -) -> Result<(), ChangeError> { - let start = Instant::now(); - counter!("corro.agent.changes.processing.started").increment(changes.len() as u64); - debug!(self_actor_id = %agent.actor_id(), "processing multiple changes, len: {}", changes.iter().map(|(change, _, _)| cmp::max(change.len(), 1)).sum::()); - +) -> Vec<(ChangeV1, ChangeSource)> { let mut seen = HashSet::new(); let mut unknown_changes = Vec::with_capacity(changes.len()); for (change, src, queued_at) in changes { @@ -58,9 +56,120 @@ pub async fn process_multiple_changes( unknown_changes.sort_by_key(|(change, _src)| change.actor_id); + unknown_changes +} + +/// Process a single changeset if it hasn't previously been completed +/// or is empty. Calls process_single_version internally. Return +/// `None` if the changeset was skipped and `Some(KnownDbVersion)` if +/// it was handled +fn process_changeset( + agent: &Agent, + tx: &Transaction<'_>, + actor_id: ActorId, + change: ChangeV1, + change_src: ChangeSource, + versions: &RangeInclusive, + last_db_version: &mut Option, + changesets: &mut Vec<(ActorId, Changeset, CrsqlDbVersion, ChangeSource)>, +) -> Option { + if change.is_complete() && change.is_empty() { + // we never want to block here + if let Err(e) = agent.tx_empty().try_send((actor_id, change.versions())) { + error!("could not send empty changed versions into channel: {e}"); + } + + Some(KnownDbVersion::Cleared) + } else { + if let Some(seqs) = change.seqs() { + if seqs.end() < seqs.start() { + warn!(%actor_id, versions = ?change.versions(), "received an invalid change, seqs start is greater than seqs end: {seqs:?}"); + return None; + } + } + + let (known, versions) = match process_single_version(&agent, &tx, *last_db_version, change) + { + Ok((known, changeset)) => { + let versions = changeset.versions(); + if let KnownDbVersion::Current(CurrentVersion { db_version, .. }) = &known { + *last_db_version = Some(*db_version); + changesets.push((actor_id, changeset, *db_version, change_src)); + } + (known, versions) + } + Err(e) => { + error!(%actor_id, ?versions, "could not process single change: {e}"); + return None; + } + }; + debug!(%actor_id, self_actor_id = %agent.actor_id(), ?versions, "got known to insert: {known:?}"); + Some(known) + } +} + +/// Update the bookie table for current versions of some data +fn update_bookkeeping( + agent: &Agent, + tx: &Transaction, + knowns: &mut BTreeMap, KnownDbVersion)>>, + count: &mut usize, +) -> Result<(), ChangeError> { + for (actor_id, knowns) in knowns.iter_mut() { + debug!(%actor_id, self_actor_id = %agent.actor_id(), "processing {} knowns", knowns.len()); + for (versions, known) in knowns.iter_mut() { + match known { + KnownDbVersion::Partial { .. } => { + continue; + } + KnownDbVersion::Current(CurrentVersion { + db_version, + last_seq, + ts, + }) => { + *count += 1; + let version = versions.start(); + debug!(%actor_id, self_actor_id = %agent.actor_id(), %version, "inserting bookkeeping row db_version: {db_version}, ts: {ts:?}"); + tx.prepare_cached(" + INSERT INTO __corro_bookkeeping ( actor_id, start_version, db_version, last_seq, ts) + VALUES (:actor_id, :start_version, :db_version, :last_seq, :ts);" + ).map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(*actor_id), version: Some(*version)})? + .execute(named_params!{ + ":actor_id": actor_id, + ":start_version": *version, + ":db_version": *db_version, + ":last_seq": *last_seq, + ":ts": *ts + }).map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(*actor_id), version: Some(*version)})?; + } + KnownDbVersion::Cleared => { + debug!(%actor_id, self_actor_id = %agent.actor_id(), ?versions, "inserting CLEARED bookkeeping"); + if let Err(e) = agent.tx_empty().try_send((*actor_id, versions.clone())) { + error!("could not schedule version to be cleared: {e}"); + } + } + } + debug!(%actor_id, self_actor_id = %agent.actor_id(), ?versions, "inserted bookkeeping row"); + } + } + + Ok(()) +} + +#[tracing::instrument(skip(agent, bookie, changes), err)] +pub async fn process_multiple_changes( + agent: Agent, + bookie: Bookie, + changes: Vec<(ChangeV1, ChangeSource, Instant)>, +) -> Result<(), ChangeError> { + let start = Instant::now(); + counter!("corro.agent.changes.processing.started").increment(changes.len() as u64); + debug!(self_actor_id = %agent.actor_id(), "processing multiple changes, len: {}", changes.iter().map(|(change, _, _)| cmp::max(change.len(), 1)).sum::()); + + let unknown_changes = filter_changes(&bookie, changes).await; let mut conn = agent.pool().write_normal().await?; - let changesets = block_in_place(|| { + let applied_changesets = block_in_place(|| { let start = Instant::now(); let tx = conn .immediate_transaction() @@ -95,11 +204,14 @@ pub async fn process_multiple_changes( actor_id.as_simple() )); - let mut seen = RangeInclusiveMap::new(); + let mut seen = RangeInclusiveMap::new(); // RangeInclusiveMap for (change, src) in changes { trace!("handling a single changeset: {change:?}"); let seqs = change.seqs(); + + // If full change sequence in a version is already + // known, skip this change if booked_write.contains_all(change.versions(), change.seqs()) { trace!( "previously unknown versions are now deemed known, aborting inserts" @@ -126,92 +238,25 @@ pub async fn process_multiple_changes( } // optimizing this, insert later! - let known = if change.is_complete() && change.is_empty() { - // we never want to block here - if let Err(e) = agent.tx_empty().try_send((actor_id, change.versions())) { - error!("could not send empty changed versions into channel: {e}"); - } - - KnownDbVersion::Cleared - } else { - if let Some(seqs) = change.seqs() { - if seqs.end() < seqs.start() { - warn!(%actor_id, versions = ?change.versions(), "received an invalid change, seqs start is greater than seqs end: {seqs:?}"); - continue; - } - } - - let (known, versions) = match process_single_version( - &agent, - &tx, - last_db_version, - change, - ) { - Ok((known, changeset)) => { - let versions = changeset.versions(); - if let KnownDbVersion::Current(CurrentVersion { - db_version, .. - }) = &known - { - last_db_version = Some(*db_version); - changesets.push((actor_id, changeset, *db_version, src)); - } - (known, versions) - } - Err(e) => { - error!(%actor_id, ?versions, "could not process single change: {e}"); - continue; - } - }; - debug!(%actor_id, self_actor_id = %agent.actor_id(), ?versions, "got known to insert: {known:?}"); - known - }; - - seen.insert(versions.clone(), known.clone()); - knowns.entry(actor_id).or_default().push((versions, known)); - } - } - } - - let mut count = 0; - - for (actor_id, knowns) in knowns.iter_mut() { - debug!(%actor_id, self_actor_id = %agent.actor_id(), "processing {} knowns", knowns.len()); - for (versions, known) in knowns.iter_mut() { - match known { - KnownDbVersion::Partial { .. } => { - continue; - } - KnownDbVersion::Current(CurrentVersion { - db_version, - last_seq, - ts, - }) => { - count += 1; - let version = versions.start(); - debug!(%actor_id, self_actor_id = %agent.actor_id(), %version, "inserting bookkeeping row db_version: {db_version}, ts: {ts:?}"); - tx.prepare_cached(" - INSERT INTO __corro_bookkeeping ( actor_id, start_version, db_version, last_seq, ts) - VALUES (:actor_id, :start_version, :db_version, :last_seq, :ts);").map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(*actor_id), version: Some(*version)})? - .execute(named_params!{ - ":actor_id": actor_id, - ":start_version": *version, - ":db_version": *db_version, - ":last_seq": *last_seq, - ":ts": *ts - }).map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(*actor_id), version: Some(*version)})?; - } - KnownDbVersion::Cleared => { - debug!(%actor_id, self_actor_id = %agent.actor_id(), ?versions, "inserting CLEARED bookkeeping"); - if let Err(e) = agent.tx_empty().try_send((*actor_id, versions.clone())) { - error!("could not schedule version to be cleared: {e}"); - } + if let Some(known) = process_changeset( + &agent, + &tx, + actor_id, + change, + src, + &versions, + &mut last_db_version, + &mut changesets, + ) { + seen.insert(versions.clone(), known.clone()); + knowns.entry(actor_id).or_default().push((versions, known)); } } - debug!(%actor_id, self_actor_id = %agent.actor_id(), ?versions, "inserted bookkeeping row"); } } + let mut count = 0; + update_bookkeeping(&agent, &tx, &mut knowns, &mut count)?; debug!("inserted {count} new changesets"); tx.commit().map_err(|source| ChangeError::Rusqlite { @@ -270,7 +315,7 @@ pub async fn process_multiple_changes( Ok::<_, ChangeError>(changesets) })?; - for (_actor_id, changeset, db_version, _src) in changesets { + for (_actor_id, changeset, db_version, _src) in applied_changesets { agent .subs_manager() .match_changes(changeset.changes(), db_version); diff --git a/crates/corro-agent/src/lib.rs b/crates/corro-agent/src/lib.rs index 973cd557..43aab945 100644 --- a/crates/corro-agent/src/lib.rs +++ b/crates/corro-agent/src/lib.rs @@ -2,6 +2,5 @@ pub mod agent; pub mod api; pub mod broadcast; -pub mod transport; pub mod change; - +pub mod transport; diff --git a/crates/corro-base-types/src/lib.rs b/crates/corro-base-types/src/lib.rs index 14bb3f84..4ec8040e 100644 --- a/crates/corro-base-types/src/lib.rs +++ b/crates/corro-base-types/src/lib.rs @@ -181,6 +181,9 @@ impl fmt::Display for CrsqlDbVersion { } } +/// A single change in a sequence of changes in a single transaction +/// +/// This type is usually wrapped by a RangeMap/Set #[derive( Copy, Clone, Debug, Default, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize, )] diff --git a/crates/corro-devcluster/Cargo.toml b/crates/corro-devcluster/Cargo.toml index 83f71b37..71edfbc7 100644 --- a/crates/corro-devcluster/Cargo.toml +++ b/crates/corro-devcluster/Cargo.toml @@ -8,6 +8,5 @@ serde = { workspace = true, features = [ "derive" ] } serde_json = { workspace = true } thiserror = { workspace = true } rand = { workspace = true } -random_graphs = { workspace = true } nom = { workspace = true } clap = { workspace = true } \ No newline at end of file diff --git a/crates/corro-types/src/broadcast.rs b/crates/corro-types/src/broadcast.rs index 1b4035ac..44c68117 100644 --- a/crates/corro-types/src/broadcast.rs +++ b/crates/corro-types/src/broadcast.rs @@ -33,9 +33,12 @@ pub enum UniPayload { data: UniPayloadV1, #[speedy(default_on_eof)] cluster_id: ClusterId, + #[speedy(default_on_eof)] + priority: bool, }, } +/// Broadcast change payload #[derive(Debug, Clone, Readable, Writable)] pub enum UniPayloadV1 { Broadcast(BroadcastV1), diff --git a/flake.nix b/flake.nix index 477eb46c..88bb1df5 100644 --- a/flake.nix +++ b/flake.nix @@ -57,7 +57,7 @@ # Useful when doing development builds doCheck = false; buildType = "debug"; - + ## Build environment dependencies nativeBuildInputs = [ pkgs.pkg-config