Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement priority broadcasts for local changes #152

Merged
merged 5 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
/schema
/config.toml
/data
/book
/book
.direnv/**
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ indexmap = { version = "2.1.0", features = ["serde"] }
itertools = { version = "0.10.5" }
metrics = "0.22.0"
metrics-exporter-prometheus = { version = "0.13.0", default-features = false, features = ["http-listener"] }
nom = "7.0"
once_cell = "1.17.1"
opentelemetry = { version = "0.20.0", features = ["rt-tokio"] }
opentelemetry-otlp = { version = "0.13.0" }
Expand Down
14 changes: 8 additions & 6 deletions crates/corro-agent/src/agent/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ use std::{
};

use crate::{
agent::{bi, bootstrap, uni, util, SyncClientError, ANNOUNCE_INTERVAL},
agent::{bi, bootstrap, uni, SyncClientError, ANNOUNCE_INTERVAL},
api::peer::parallel_sync,
change,
transport::Transport,
};
use corro_types::{
Expand Down Expand Up @@ -107,7 +108,7 @@ pub fn spawn_incoming_connection_handlers(

// Spawn handler tasks for this connection
spawn_foca_handler(&agent, &tripwire, &conn);
uni::spawn_unipayload_handler(&tripwire, &conn, agent.clone());
uni::spawn_unipayload_handler(&agent, &bookie, &tripwire, &conn);
bi::spawn_bipayload_handler(&agent, &bookie, &tripwire, &conn);
});
}
Expand Down Expand Up @@ -414,7 +415,7 @@ pub async fn handle_changes(
}

debug!(count = %tmp_count, "spawning processing multiple changes from beginning of loop");
join_set.spawn(util::process_multiple_changes(
join_set.spawn(change::process_multiple_changes(
agent.clone(),
bookie.clone(),
std::mem::take(&mut buf),
Expand Down Expand Up @@ -529,7 +530,7 @@ pub async fn handle_changes(
if count < MIN_CHANGES_CHUNK && !queue.is_empty() && join_set.len() < MAX_CONCURRENT {
// we can process this right away
debug!(%count, "spawning processing multiple changes from max wait interval");
join_set.spawn(util::process_multiple_changes(
join_set.spawn(change::process_multiple_changes(
agent.clone(),
bookie.clone(),
queue.drain(..).collect(),
Expand Down Expand Up @@ -563,7 +564,7 @@ pub async fn handle_changes(
queue.push_back((change, src, Instant::now()));
if count >= MIN_CHANGES_CHUNK {
// drain and process current changes!
if let Err(e) = util::process_multiple_changes(
if let Err(e) = change::process_multiple_changes(
agent.clone(),
bookie.clone(),
queue.drain(..).collect(),
Expand All @@ -579,7 +580,8 @@ pub async fn handle_changes(
}

// process the last changes we got!
if let Err(e) = util::process_multiple_changes(agent, bookie, queue.into_iter().collect()).await
if let Err(e) =
change::process_multiple_changes(agent, bookie, queue.into_iter().collect()).await
{
error!("could not process multiple changes: {e}");
}
Expand Down
3 changes: 1 addition & 2 deletions crates/corro-agent/src/agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ mod metrics;
mod run_root;
mod setup;
mod uni;
mod util;
pub(crate) mod util;

#[cfg(test)]
mod tests;
Expand All @@ -27,7 +27,6 @@ use uuid::Uuid;
pub use error::{SyncClientError, SyncRecvError};
pub use run_root::start_with_config;
pub use setup::{setup, AgentOptions};
pub use util::process_multiple_changes;

pub const ANNOUNCE_INTERVAL: Duration = Duration::from_secs(300);
pub const COMPACT_BOOKED_INTERVAL: Duration = Duration::from_secs(300);
Expand Down
50 changes: 48 additions & 2 deletions crates/corro-agent/src/agent/uni.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use corro_types::{
agent::Agent,
agent::{Agent, Bookie},
broadcast::{BroadcastV1, ChangeSource, UniPayload, UniPayloadV1},
};
use metrics::counter;
Expand All @@ -11,12 +11,22 @@ use tripwire::Tripwire;

/// Spawn a task that accepts unidirectional broadcast streams, then
/// spawns another task for each incoming stream to handle.
pub fn spawn_unipayload_handler(tripwire: &Tripwire, conn: &quinn::Connection, agent: Agent) {
pub fn spawn_unipayload_handler(
agent: &Agent,
bookie: &Bookie,
tripwire: &Tripwire,
conn: &quinn::Connection,
) {
let agent = agent.clone();
let bookie = bookie.clone();
tokio::spawn({
let conn = conn.clone();
let mut tripwire = tripwire.clone();

async move {
loop {
let agent = agent.clone();
let bookie = bookie.clone();
let rx = tokio::select! {
rx_res = conn.accept_uni() => match rx_res {
Ok(rx) => rx,
Expand Down Expand Up @@ -44,6 +54,9 @@ pub fn spawn_unipayload_handler(tripwire: &Tripwire, conn: &quinn::Connection, a
let mut framed = FramedRead::new(rx, LengthDelimitedCodec::new());

loop {
let agent = agent.clone();
let bookie = bookie.clone();

match StreamExt::next(&mut framed).await {
Some(Ok(b)) => {
counter!("corro.peer.stream.bytes.recv.total", "type" => "uni")
Expand All @@ -59,6 +72,39 @@ pub fn spawn_unipayload_handler(tripwire: &Tripwire, conn: &quinn::Connection, a
change,
)),
cluster_id,
priority,
} if priority => {
if cluster_id != agent.cluster_id() {
continue;
}

tokio::spawn(async move {
if let Err(e) =
crate::change::process_multiple_changes(
agent,
bookie,
vec![(
change,
ChangeSource::Broadcast,
std::time::Instant::now(),
)],
)
.await
{
error!(
"process_priority_change failed: {:?}",
e
);
}
});
}
UniPayload::V1 {
data:
UniPayloadV1::Broadcast(BroadcastV1::Change(
change,
)),
cluster_id,
..
} => {
if cluster_id != agent.cluster_id() {
continue;
Expand Down
Loading
Loading