Skip to content

Commit a505cda

Browse files
committed
TQ: Integrate protocol with NodeTask
`NodeTask` now uses the `trust_quorum_protocol::Node` and `trust_quorum_protocol::NodeCtx` to send and receive trust quorum messages. An API to drive this was added to the `NodeTaskHandle`. The majority of code in this PR is tests using the API. A follow up will deal with saving persistent state to a Ledger.
1 parent 809559c commit a505cda

File tree

7 files changed

+1046
-60
lines changed

7 files changed

+1046
-60
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

trust-quorum/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ dropshot.workspace = true
4848
omicron-test-utils.workspace = true
4949
proptest.workspace = true
5050
serde_json.workspace = true
51+
sled-hardware-types.workspace = true
5152
test-strategy.workspace = true
53+
trust-quorum-protocol = { workspace = true, features = ["testing"] }
5254
trust-quorum-test-utils.workspace = true
5355
sprockets-tls-test-utils.workspace = true

trust-quorum/protocol/src/coordinator_state.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,10 @@ impl CoordinatorState {
236236
&self.op
237237
}
238238

239+
pub fn config(&self) -> &Configuration {
240+
&self.configuration
241+
}
242+
239243
/// Send any required messages as a reconfiguration coordinator
240244
///
241245
/// This varies depending upon the current `CoordinatorState`.

trust-quorum/protocol/src/crypto.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ impl Clone for ReconstructedRackSecret {
130130
}
131131
}
132132

133-
#[cfg(test)]
133+
#[cfg(any(test, feature = "testing"))]
134134
impl PartialEq for ReconstructedRackSecret {
135135
fn eq(&self, other: &Self) -> bool {
136136
self.expose_secret().ct_eq(other.expose_secret()).into()

trust-quorum/protocol/src/lib.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,14 @@ pub use coordinator_state::{
3737
};
3838
pub use rack_secret_loader::{LoadRackSecretError, RackSecretLoaderDiff};
3939
pub use validators::{
40-
ValidatedLrtqUpgradeMsgDiff, ValidatedReconfigureMsgDiff,
40+
LrtqUpgradeError, ReconfigurationError, ValidatedLrtqUpgradeMsgDiff,
41+
ValidatedReconfigureMsgDiff,
4142
};
4243

4344
pub use alarm::Alarm;
44-
pub use crypto::RackSecret;
45+
pub use crypto::{RackSecret, ReconstructedRackSecret};
4546
pub use messages::*;
46-
pub use node::{Node, NodeDiff};
47+
pub use node::{CommitError, Node, NodeDiff, PrepareAndCommitError};
4748
// public only for docs.
4849
pub use node_ctx::NodeHandlerCtx;
4950
pub use node_ctx::{NodeCallerCtx, NodeCommonCtx, NodeCtx, NodeCtxDiff};

trust-quorum/src/connection_manager.rs

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
//! A mechanism for maintaining a full mesh of trust quorum node connections
66
77
use crate::established_conn::EstablishedConn;
8-
use trust_quorum_protocol::{BaseboardId, PeerMsg};
8+
use trust_quorum_protocol::{BaseboardId, Envelope, PeerMsg};
99
// TODO: Move or copy this to this crate?
1010
use bootstore::schemes::v0::NetworkConfig;
1111
use camino::Utf8PathBuf;
@@ -47,7 +47,6 @@ pub enum AcceptError {
4747
/// Messages sent from the main task to the connection managing tasks
4848
#[derive(Debug)]
4949
pub enum MainToConnMsg {
50-
#[expect(unused)]
5150
Msg(WireMsg),
5251
}
5352

@@ -100,7 +99,6 @@ pub enum ConnToMainMsgInner {
10099
addr: SocketAddrV6,
101100
peer_id: BaseboardId,
102101
},
103-
#[expect(unused)]
104102
Received {
105103
from: BaseboardId,
106104
msg: PeerMsg,
@@ -117,7 +115,6 @@ pub enum ConnToMainMsgInner {
117115

118116
pub struct TaskHandle {
119117
pub abort_handle: AbortHandle,
120-
#[expect(unused)]
121118
pub tx: mpsc::Sender<MainToConnMsg>,
122119
pub conn_type: ConnectionType,
123120
}
@@ -134,6 +131,10 @@ impl TaskHandle {
134131
pub fn abort(&self) {
135132
self.abort_handle.abort()
136133
}
134+
135+
pub async fn send(&self, msg: PeerMsg) {
136+
let _ = self.tx.send(MainToConnMsg::Msg(WireMsg::Tq(msg))).await;
137+
}
137138
}
138139

139140
impl BiHashItem for TaskHandle {
@@ -175,6 +176,10 @@ impl EstablishedTaskHandle {
175176
pub fn abort(&self) {
176177
self.task_handle.abort();
177178
}
179+
180+
pub async fn send(&self, msg: PeerMsg) {
181+
let _ = self.task_handle.send(msg).await;
182+
}
178183
}
179184

180185
impl TriHashItem for EstablishedTaskHandle {
@@ -372,6 +377,14 @@ impl ConnMgr {
372377
self.listen_addr
373378
}
374379

380+
pub async fn send(&self, envelope: Envelope) {
381+
let Envelope { to, msg, .. } = envelope;
382+
info!(self.log, "Sending {msg:?}"; "peer_id" => %to);
383+
if let Some(handle) = self.established.get1(&to) {
384+
handle.send(msg).await;
385+
}
386+
}
387+
375388
/// Perform any polling related operations that the connection
376389
/// manager must perform concurrently.
377390
pub async fn step(
@@ -573,13 +586,15 @@ impl ConnMgr {
573586
/// easiest way to achieve this is to only connect to peers with addresses
574587
/// that sort less than our own and tear down any connections that no longer
575588
/// exist in `addrs`.
589+
///
590+
/// Return the `BaseboardId` of all peers that have been disconnected.
576591
pub async fn update_bootstrap_connections(
577592
&mut self,
578593
addrs: BTreeSet<SocketAddrV6>,
579594
corpus: Vec<Utf8PathBuf>,
580-
) {
595+
) -> BTreeSet<BaseboardId> {
581596
if self.bootstrap_addrs == addrs {
582-
return;
597+
return BTreeSet::new();
583598
}
584599

585600
// We don't try to compare addresses from accepted nodes. If DDMD
@@ -607,9 +622,13 @@ impl ConnMgr {
607622
self.connect_client(corpus.clone(), addr).await;
608623
}
609624

625+
let mut disconnected_peers = BTreeSet::new();
610626
for addr in to_disconnect {
611-
self.disconnect_client(addr).await;
627+
if let Some(peer_id) = self.disconnect_client(addr).await {
628+
disconnected_peers.insert(peer_id);
629+
}
612630
}
631+
disconnected_peers
613632
}
614633

615634
/// Spawn a task to estalbish a sprockets connection for the given address
@@ -688,7 +707,13 @@ impl ConnMgr {
688707
///
689708
/// We don't tear down server connections this way as we don't know their
690709
/// listen port, just the ephemeral port.
691-
async fn disconnect_client(&mut self, addr: SocketAddrV6) {
710+
///
711+
/// Return the `BaseboardId` of the peer if an established connection is
712+
// torn down.
713+
async fn disconnect_client(
714+
&mut self,
715+
addr: SocketAddrV6,
716+
) -> Option<BaseboardId> {
692717
if let Some(handle) = self.connecting.remove2(&addr) {
693718
// The connection has not yet completed its handshake
694719
info!(
@@ -697,6 +722,7 @@ impl ConnMgr {
697722
"remote_addr" => %addr
698723
);
699724
handle.abort();
725+
None
700726
} else {
701727
if let Some(handle) = self.established.remove3(&addr) {
702728
info!(
@@ -706,6 +732,9 @@ impl ConnMgr {
706732
"peer_id" => %handle.baseboard_id
707733
);
708734
handle.abort();
735+
Some(handle.baseboard_id)
736+
} else {
737+
None
709738
}
710739
}
711740
}

0 commit comments

Comments
 (0)