Skip to content

Commit 5a6e453

Browse files
committed
Add priority broadcast type and insert handler
1 parent 29241d1 commit 5a6e453

File tree

7 files changed

+257
-101
lines changed

7 files changed

+257
-101
lines changed

crates/corro-agent/src/agent/handlers.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ pub fn spawn_incoming_connection_handlers(
108108

109109
// Spawn handler tasks for this connection
110110
spawn_foca_handler(&agent, &tripwire, &conn);
111-
uni::spawn_unipayload_handler(&tripwire, &conn, agent.tx_changes().clone());
111+
uni::spawn_unipayload_handler(&agent, &bookie, &tripwire, &conn, agent.tx_changes().clone());
112112
bi::spawn_bipayload_handler(&agent, &bookie, &tripwire, &conn);
113113
});
114114
}

crates/corro-agent/src/agent/uni.rs

+28
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use corro_types::{
2+
agent::{Agent, Bookie},
23
broadcast::{BroadcastV1, ChangeSource, ChangeV1, UniPayload, UniPayloadV1},
34
channel::CorroSender,
45
};
@@ -12,15 +13,22 @@ use tripwire::Tripwire;
1213
/// Spawn a task that accepts unidirectional broadcast streams, then
1314
/// spawns another task for each incoming stream to handle.
1415
pub fn spawn_unipayload_handler(
16+
agent: &Agent,
17+
bookie: &Bookie,
1518
tripwire: &Tripwire,
1619
conn: &quinn::Connection,
1720
changes_tx: CorroSender<(ChangeV1, ChangeSource)>,
1821
) {
22+
let agent = agent.clone();
23+
let bookie = bookie.clone();
1924
tokio::spawn({
2025
let conn = conn.clone();
2126
let mut tripwire = tripwire.clone();
27+
2228
async move {
2329
loop {
30+
let agent = agent.clone();
31+
let bookie = bookie.clone();
2432
let rx = tokio::select! {
2533
rx_res = conn.accept_uni() => match rx_res {
2634
Ok(rx) => rx,
@@ -48,6 +56,9 @@ pub fn spawn_unipayload_handler(
4856
let mut framed = FramedRead::new(rx, LengthDelimitedCodec::new());
4957

5058
loop {
59+
let agent = agent.clone();
60+
let bookie = bookie.clone();
61+
5162
match StreamExt::next(&mut framed).await {
5263
Some(Ok(b)) => {
5364
counter!("corro.peer.stream.bytes.recv.total", "type" => "uni")
@@ -70,6 +81,23 @@ pub fn spawn_unipayload_handler(
7081
return;
7182
}
7283
}
84+
UniPayload::V1(
85+
UniPayloadV1::PriorityBroadcast(
86+
BroadcastV1::Change(change),
87+
),
88+
) => {
89+
// fixme: queue with priority?
90+
tokio::spawn(async move {
91+
crate::change::process_priority_change(
92+
agent,
93+
bookie,
94+
change,
95+
ChangeSource::PriorityBroadcast,
96+
)
97+
.await
98+
.unwrap();
99+
});
100+
}
73101
}
74102
}
75103
Err(e) => {

crates/corro-agent/src/api/peer.rs

+1-4
Original file line numberDiff line numberDiff line change
@@ -1549,10 +1549,7 @@ mod tests {
15491549
use tokio::sync::mpsc;
15501550
use tripwire::Tripwire;
15511551

1552-
use crate::{
1553-
agent::{process_multiple_changes, setup},
1554-
api::public::api_v1_db_schema,
1555-
};
1552+
use crate::{agent::setup, api::public::api_v1_db_schema, change::process_multiple_changes};
15561553

15571554
use super::*;
15581555

0 commit comments

Comments
 (0)