From 5ec11c8a2f3466e17ab03e34f16fc3a495f15ba6 Mon Sep 17 00:00:00 2001 From: Finn Bear Date: Mon, 10 Jul 2023 14:32:54 -0700 Subject: [PATCH] Add reconfiguration (#3) * Refactor transport. * Reconfiguration. * Add timeouts to tests. --- Makefile | 3 + README.md | 2 +- src/bin/maelstrom.rs | 24 +-- src/ir/client.rs | 204 ++++++++++++----------- src/ir/membership.rs | 102 ++++++------ src/ir/message.rs | 85 +++++----- src/ir/mod.rs | 10 +- src/ir/op.rs | 2 +- src/ir/replica.rs | 317 ++++++++++++++++++++++-------------- src/ir/tests/lock_server.rs | 129 +++++++++++---- src/ir/view.rs | 17 +- src/lib.rs | 2 +- src/tapir/client.rs | 12 +- src/tapir/key_value.rs | 2 +- src/tapir/replica.rs | 26 ++- src/tapir/shard_client.rs | 16 +- src/tapir/tests/kv.rs | 105 +++++++----- src/transport/channel.rs | 51 +++--- src/transport/mod.rs | 15 +- 19 files changed, 657 insertions(+), 467 deletions(-) diff --git a/Makefile b/Makefile index cc37989..d7f7b0c 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,9 @@ test: clear && cargo test --release -- # --nocapture +lock_server_stress_test: + clear && cargo test --release -- lock_server_loop --nocapture --include-ignored + coordinator_failure_stress_test_3: clear && cargo test --release -- coordinator_recovery_3_loop --nocapture --include-ignored diff --git a/README.md b/README.md index d1710e1..ecdee1f 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ Rust implementation of [TAPIR](https://syslab.cs.washington.edu/papers/tapir-tr- - [x] Consensus - [x] View changes - [x] Recovery - - [ ] ~~Epoch changes~~ + - [x] Membership change - [ ] Real network transport - [ ] TAPIR - [x] Get diff --git a/src/bin/maelstrom.rs b/src/bin/maelstrom.rs index 2c13e65..65ff15a 100644 --- a/src/bin/maelstrom.rs +++ b/src/bin/maelstrom.rs @@ -19,14 +19,12 @@ use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering::SeqCst; use std::sync::{Arc, Mutex}; use std::time::Duration; -use tapirs::{ - IrMembership, IrMessage, IrReplica, IrReplicaIndex, TapirClient, TapirReplica, Transport, -}; +use tapirs::{IrMembership, IrMessage, IrReplica, TapirClient, TapirReplica, Transport}; use tokio::spawn; type K = String; type V = String; -type Message = IrMessage>; +type Message = IrMessage, Maelstrom>; #[derive(Default)] struct KvNode { @@ -64,7 +62,7 @@ struct Inner { net: ProcNet, } -#[derive(Copy, Clone, Serialize, Deserialize, Debug, Eq, PartialEq)] +#[derive(Copy, Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)] enum IdEnum { Replica(usize), App(usize), @@ -99,9 +97,8 @@ impl FromStr for IdEnum { } } -impl Transport for Maelstrom { +impl Transport> for Maelstrom { type Address = IdEnum; - type Message = Message; type Sleep = tokio::time::Sleep; fn address(&self) -> Self::Address { @@ -129,10 +126,10 @@ impl Transport for Maelstrom { tokio::time::sleep(duration) } - fn send + Send + std::fmt::Debug>( + fn send, Self>> + Send + std::fmt::Debug>( &self, address: Self::Address, - message: impl Into + std::fmt::Debug, + message: impl Into, Self>> + std::fmt::Debug, ) -> impl futures::Future + Send + 'static { let id = self.id; let (sender, mut receiver) = tokio::sync::oneshot::channel(); @@ -171,7 +168,11 @@ impl Transport for Maelstrom { } } - fn do_send(&self, address: Self::Address, message: impl Into + std::fmt::Debug) { + fn do_send( + &self, + address: Self::Address, + message: impl Into, Self>> + std::fmt::Debug, + ) { let message = Wrapper { message: message.into(), do_reply_to: None, @@ -216,8 +217,7 @@ impl Process for KvNode { self.inner = Some(( transport.clone(), match id { - IdEnum::Replica(index) => KvNodeInner::Replica(Arc::new(IrReplica::new( - IrReplicaIndex(index), + IdEnum::Replica(_) => KvNodeInner::Replica(Arc::new(IrReplica::new( membership, TapirReplica::new(true), transport, diff --git a/src/ir/client.rs b/src/ir/client.rs index 98cde05..5665e35 100644 --- a/src/ir/client.rs +++ b/src/ir/client.rs @@ -1,7 +1,7 @@ use super::{ Confirm, DoViewChange, FinalizeConsensus, FinalizeInconsistent, Membership, MembershipSize, - Message, OpId, ProposeConsensus, ProposeInconsistent, ReplicaIndex, ReplicaUpcalls, - ReplyConsensus, ReplyInconsistent, ReplyUnlogged, RequestUnlogged, ViewNumber, + Message, OpId, ProposeConsensus, ProposeInconsistent, ReplicaUpcalls, ReplyConsensus, + ReplyInconsistent, ReplyUnlogged, RequestUnlogged, View, ViewNumber, }; use crate::{ util::{join, Join}, @@ -39,13 +39,13 @@ impl Debug for Id { } /// IR client, capable of invoking operations on an IR replica group. -pub struct Client { +pub struct Client> { id: Id, - inner: Arc>, + inner: Arc>, _spooky: PhantomData, } -impl Clone for Client { +impl> Clone for Client { fn clone(&self) -> Self { Self { id: self.id, @@ -55,18 +55,17 @@ impl Clone for Client { } } -struct Inner { +struct Inner> { transport: T, - sync: Mutex>, + sync: Mutex>, } -struct SyncInner { +struct SyncInner> { operation_counter: u64, - membership: Membership, - recent: ViewNumber, + view: View, } -impl SyncInner { +impl> SyncInner { fn next_number(&mut self) -> u64 { let ret = self.operation_counter; self.operation_counter += 1; @@ -74,16 +73,18 @@ impl SyncInner { } } -impl>> Client { - pub fn new(membership: Membership, transport: T) -> Self { +impl> Client { + pub fn new(membership: Membership, transport: T) -> Self { Self { id: Id::new(), inner: Arc::new(Inner { transport, sync: Mutex::new(SyncInner { operation_counter: 0, - membership, - recent: ViewNumber(0), + view: View { + membership, + number: ViewNumber(0), + }, }), }), _spooky: PhantomData, @@ -98,24 +99,34 @@ impl>> Client { &self.inner.transport } - fn notify_old_views( + /// Updates own view and those of lagging replicas. + /// + /// `Index`'s in `views` must correspond to `sync.view`. + fn update_view<'a>( transport: &T, - sync: &mut SyncInner, - views: impl IntoIterator + Clone, + sync: &mut SyncInner, + views: impl IntoIterator)> + Clone, ) { - if let Some(latest_view) = views.clone().into_iter().map(|(_, n)| n).max() { - sync.recent = sync.recent.max(latest_view); - for (index, view_number) in views { - if view_number < latest_view { + if let Some(latest_view) = views + .clone() + .into_iter() + .map(|(_, n)| n) + .max_by_key(|v| v.number) + { + for (address, view) in views { + if view.number < latest_view.number { transport.do_send( - sync.membership.get(index).unwrap(), - Message::::DoViewChange(DoViewChange { - view_number: latest_view, + address, + Message::::DoViewChange(DoViewChange { + view: latest_view.clone(), addendum: None, }), ) } } + if latest_view.number > sync.view.number { + sync.view = latest_view.clone(); + } } } @@ -124,7 +135,7 @@ impl>> Client { let inner = Arc::clone(&self.inner); let (index, count) = { let sync = inner.sync.lock().unwrap(); - let count = sync.membership.len(); + let count = sync.view.membership.len(); (thread_rng().gen_range(0..count), count) }; @@ -136,13 +147,15 @@ impl>> Client { { let sync = inner.sync.lock().unwrap(); let address = sync + .view .membership - .get(ReplicaIndex((index + futures.len()) % count)) + .get((index + futures.len()) % count) .unwrap(); drop(sync); - let future = inner - .transport - .send::>(address, RequestUnlogged { op: op.clone() }); + let future = inner.transport.send::>( + address, + RequestUnlogged { op: op.clone() }, + ); futures.push(future); } @@ -157,7 +170,9 @@ impl>> Client { } }; let mut sync = inner.sync.lock().unwrap(); - sync.recent = sync.recent.max(response.view_number); + if response.view.number > sync.view.number { + sync.view = response.view; + } return response.result; } } @@ -170,18 +185,21 @@ impl>> Client { &self, op: U::UO, ) -> ( - Join>>, + Join>>, MembershipSize, ) { let sync = self.inner.sync.lock().unwrap(); - let membership_size = sync.membership.size(); + let membership_size = sync.view.membership.size(); - let future = join(sync.membership.iter().map(|(index, address)| { + let future = join(sync.view.membership.iter().map(|address| { ( - index, + address, self.inner .transport - .send::>(address, RequestUnlogged { op: op.clone() }), + .send::>( + address, + RequestUnlogged { op: op.clone() }, + ), ) })); @@ -201,29 +219,28 @@ impl>> Client { } }; - fn has_ancient(results: &HashMap) -> bool { + fn has_ancient(results: &HashMap>) -> bool { results.values().any(|v| v.state.is_none()) } - fn has_quorum( + fn has_quorum( membership: MembershipSize, - results: &HashMap, + results: &HashMap>, check_views: bool, ) -> bool { - let threshold = membership.f_plus_one(); if check_views { for result in results.values() { let matching = results .values() - .filter(|other| other.view_number == result.view_number) + .filter(|other| other.view.number == result.view.number) .count(); - if matching >= threshold { + if matching >= result.view.membership.size().f_plus_one() { return true; } } false } else { - results.len() >= threshold + results.len() >= membership.f_plus_one() } } @@ -231,17 +248,17 @@ impl>> Client { loop { let (membership_size, future) = { let sync = inner.sync.lock().unwrap(); - let membership_size = sync.membership.size(); + let membership_size = sync.view.membership.size(); - let future = join(sync.membership.iter().map(|(index, address)| { + let future = join(sync.view.membership.iter().map(|address| { ( - index, - inner.transport.send::( + address, + inner.transport.send::>( address, ProposeInconsistent { op_id, op: op.clone(), - recent: sync.recent, + recent: sync.view.number, }, ), ) @@ -249,27 +266,31 @@ impl>> Client { (membership_size, future) }; - let mut timeout = std::pin::pin!(T::sleep(Duration::from_millis(250))); + let mut soft_timeout = std::pin::pin!(T::sleep(Duration::from_millis(250))); + + // E.g. the replica group got smaller and we can't get a response from a majority of the old size. + let mut hard_timeout = std::pin::pin!(T::sleep(Duration::from_millis(5000))); let results = future .until( - move |results: &HashMap, + move |results: &HashMap>, cx: &mut Context<'_>| { has_ancient(results) || has_quorum( membership_size, results, - timeout.as_mut().poll(cx).is_ready(), + soft_timeout.as_mut().poll(cx).is_ready(), ) + || hard_timeout.as_mut().poll(cx).is_ready() }, ) .await; let mut sync = inner.sync.lock().unwrap(); - Self::notify_old_views( + Self::update_view( &inner.transport, &mut *sync, - results.iter().map(|(i, r)| (*i, r.view_number)), + results.iter().map(|(i, r)| (*i, &r.view)), ); if has_ancient(&results) || !has_quorum(membership_size, &results, true) { @@ -277,7 +298,7 @@ impl>> Client { } // println!("finalizing to membership: {:?}", sync.membership); - for (_, address) in &sync.membership { + for address in &sync.view.membership { inner .transport .do_send(address, FinalizeInconsistent { op_id }); @@ -293,22 +314,22 @@ impl>> Client { op: U::CO, decide: impl Fn(HashMap, MembershipSize) -> U::CR + Send, ) -> impl Future + Send { - fn has_ancient(replies: &HashMap>) -> bool { + fn has_ancient(replies: &HashMap>) -> bool { replies.values().any(|v| v.result_state.is_none()) } - fn get_finalized(replies: &HashMap>) -> Option<&R> { + fn get_finalized(replies: &HashMap>) -> Option<&R> { replies .values() .find(|r| r.result_state.as_ref().unwrap().1.is_finalized()) .map(|r| &r.result_state.as_ref().unwrap().0) } - fn get_quorum( + fn get_quorum( membership: MembershipSize, - replies: &HashMap>, + replies: &HashMap>, matching_results: bool, - ) -> Option<(ViewNumber, &R)> { + ) -> Option<(&View, &R)> { let required = if matching_results { membership.three_over_two_f_plus_one() } else { @@ -318,17 +339,14 @@ impl>> Client { let matches = replies .values() .filter(|other| { - other.view_number == candidate.view_number + other.view.number == candidate.view.number && (!matching_results || other.result_state.as_ref().unwrap().0 == candidate.result_state.as_ref().unwrap().0) }) .count(); if matches >= required { - return Some(( - candidate.view_number, - &candidate.result_state.as_ref().unwrap().0, - )); + return Some((&candidate.view, &candidate.result_state.as_ref().unwrap().0)); } } None @@ -343,17 +361,17 @@ impl>> Client { let mut sync = inner.sync.lock().unwrap(); let number = sync.next_number(); let op_id = OpId { client_id, number }; - let membership_size = sync.membership.size(); + let membership_size = sync.view.membership.size(); - let future = join(sync.membership.iter().map(|(index, address)| { + let future = join(sync.view.membership.iter().map(|address| { ( - index, - inner.transport.send::>( + address, + inner.transport.send::>( address, ProposeConsensus { op_id, op: op.clone(), - recent: sync.recent, + recent: sync.view.number, }, ), ) @@ -366,7 +384,7 @@ impl>> Client { let results = future .until( - move |results: &HashMap>, + move |results: &HashMap>, cx: &mut Context<'_>| { has_ancient(results) || get_finalized(results).is_some() @@ -381,18 +399,14 @@ impl>> Client { ) .await; - fn get_quorum_view( - membership: MembershipSize, - results: &HashMap, - ) -> Option { - let threshold = membership.f_plus_one(); + fn get_quorum_view(results: &HashMap>) -> Option<&View> { for result in results.values() { let matching = results .values() - .filter(|other| other.view_number == result.view_number) + .filter(|other| other.view.number == result.view.number) .count(); - if matching >= threshold { - return Some(result.view_number); + if matching >= result.view.membership.size().f_plus_one() { + return Some(&result.view); } } None @@ -401,10 +415,10 @@ impl>> Client { let next = { let mut sync = inner.sync.lock().unwrap(); - Self::notify_old_views( + Self::update_view( &inner.transport, &mut *sync, - results.iter().map(|(i, r)| (*i, r.view_number)), + results.iter().map(|(i, r)| (*i, &r.view)), ); if has_ancient(&results) { @@ -412,13 +426,13 @@ impl>> Client { continue; } - let membership_size = sync.membership.size(); + let membership_size = sync.view.membership.size(); let finalized = get_finalized(&results); //println!("checking quorum: {}", finalized.is_some()); if finalized.is_none() && let Some((_, result)) = get_quorum(membership_size, &results, true) { // Fast path. // eprintln!("doing fast path"); - for (_, address) in &sync.membership { + for address in &sync.view.membership { inner.transport.do_send( address, FinalizeConsensus { @@ -432,18 +446,17 @@ impl>> Client { if let Some((result, reply_consensus_view)) = finalized.map(|f| (f.clone(), None)).or_else(|| { - let view_number = - get_quorum(membership_size, &results, false).map(|(v, _)| v); - view_number.map(|view_number| { + let view = get_quorum(membership_size, &results, false).map(|(v, _)| v); + view.cloned().map(|view| { let results = results .into_values() - .filter(|rc| rc.view_number == view_number) + .filter(|rc| rc.view.number == view.number) .map(|rc| rc.result_state.unwrap().0); let mut totals = HashMap::new(); for result in results { *totals.entry(result).or_default() += 1; } - (decide(totals, membership_size), Some(view_number)) + (decide(totals, view.membership.size()), Some(view.number)) }) }) { @@ -454,10 +467,10 @@ impl>> Client { reply_consensus_view.is_none() ); */ - let future = join(sync.membership.iter().map(|(index, address)| { + let future = join(sync.view.membership.iter().map(|address| { ( - index, - inner.transport.send::( + address, + inner.transport.send::>( address, FinalizeConsensus { op_id, @@ -479,8 +492,9 @@ impl>> Client { let mut hard_timeout = std::pin::pin!(T::sleep(Duration::from_millis(500))); let results = future .until( - |results: &HashMap, cx: &mut Context<'_>| { - get_quorum_view(membership_size, results).is_some() + |results: &HashMap>, + cx: &mut Context<'_>| { + get_quorum_view(results).is_some() || (soft_timeout.as_mut().poll(cx).is_ready() && results.len() >= membership_size.f_plus_one()) || hard_timeout.as_mut().poll(cx).is_ready() @@ -488,12 +502,12 @@ impl>> Client { ) .await; let mut sync = inner.sync.lock().unwrap(); - Self::notify_old_views( + Self::update_view( &inner.transport, &mut *sync, - results.iter().map(|(i, r)| (*i, r.view_number)), + results.iter().map(|(i, r)| (*i, &r.view)), ); - if let Some(quorum_view) = get_quorum_view(membership_size, &results) && reply_consensus_view.map(|reply_consensus_view| quorum_view == reply_consensus_view).unwrap_or(true) { + if let Some(quorum_view) = get_quorum_view(&results) && reply_consensus_view.map(|reply_consensus_view| quorum_view.number == reply_consensus_view).unwrap_or(true) { return result; } } diff --git a/src/ir/membership.rs b/src/ir/membership.rs index 667f273..bf94e13 100644 --- a/src/ir/membership.rs +++ b/src/ir/membership.rs @@ -1,104 +1,94 @@ +use serde::{Deserialize, Serialize}; use std::fmt::Debug; -use crate::transport::Transport; - -use super::ReplicaIndex; - /// Internally stores 'f' #[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)] pub struct Size(usize); /// Stores the address of replica group members. -#[derive(Clone)] -pub struct Membership { - members: Vec, +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Membership { + members: Vec, } -impl Debug for Membership { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Membership") - .field("members", &self.members) - .finish() +impl Membership { + pub fn size(&self) -> Size { + Size(self.members.len() / 2) } -} -impl Membership { - /// Must have an odd number of replicas. - pub fn new(members: Vec) -> Self { - assert_eq!(members.len() % 2, 1); - Self { members } + #[allow(clippy::len_without_is_empty)] + pub fn len(&self) -> usize { + self.members.len() } +} - pub fn get(&self, index: ReplicaIndex) -> Option { - self.members.get(index.0).cloned() +impl Membership { + /// # Panics + /// + /// If `members` is empty or contains duplicates. + pub fn new(members: Vec) -> Self { + assert!(!members.is_empty()); + assert!(members + .iter() + .all(|a| members.iter().filter(|a2| a == *a2).count() == 1)); + Self { members } } - pub fn size(&self) -> Size { - Size((self.members.len() - 1) / 2) + pub fn get(&self, index: usize) -> Option { + self.members.get(index).cloned() } - #[allow(clippy::len_without_is_empty)] - pub fn len(&self) -> usize { - self.members.len() + pub fn contains(&self, address: A) -> bool { + self.members.contains(&address) } - pub fn get_index(&self, address: T::Address) -> Option { - self.members - .iter() - .position(|a| *a == address) - .map(ReplicaIndex) + pub fn get_index(&self, address: A) -> Option { + self.members.iter().position(|a| *a == address) } #[allow(clippy::type_complexity)] - pub fn iter( - &self, - ) -> std::iter::Map< - std::iter::Enumerate>, - for<'a> fn((usize, &'a T::Address)) -> (ReplicaIndex, T::Address), - > { + pub fn iter(&self) -> std::iter::Copied> { self.into_iter() } } -impl IntoIterator for Membership { - type Item = (ReplicaIndex, T::Address); - type IntoIter = std::iter::Map< - std::iter::Enumerate>, - fn((usize, T::Address)) -> Self::Item, - >; +impl IntoIterator for Membership { + type Item = A; + type IntoIter = std::vec::IntoIter; fn into_iter(self) -> Self::IntoIter { - self.members - .into_iter() - .enumerate() - .map(|(i, a)| (ReplicaIndex(i), a)) + self.members.into_iter() } } -impl<'a, T: Transport> IntoIterator for &'a Membership { - type Item = (ReplicaIndex, T::Address); - type IntoIter = std::iter::Map< - std::iter::Enumerate>, - for<'b> fn((usize, &'b T::Address)) -> Self::Item, - >; +impl<'a, A: Copy> IntoIterator for &'a Membership { + type Item = A; + type IntoIter = std::iter::Copied>; fn into_iter(self) -> Self::IntoIter { - self.members - .iter() - .enumerate() - .map(|(i, a)| (ReplicaIndex(i), *a)) + self.members.iter().copied() } } impl Size { + /// One node fewer than a majority. + /// + /// With an odd number of replicas, this is the maximum + /// number of nodes that can fail while preserving liveness. + /// /// In a replica group of size 3, this is 1. pub fn f(&self) -> usize { self.0 } + /// A majority of nodes. + /// /// In a replica group of size 3, this is 2. pub fn f_plus_one(&self) -> usize { self.f() + 1 } + /// Minimum number of nodes that guarantees a majority of + /// all possible majorities of nodes. + /// /// In a replica group of size 3, this is 3. pub fn three_over_two_f_plus_one(&self) -> usize { (self.f() * 3).div_ceil(2) + 1 diff --git a/src/ir/message.rs b/src/ir/message.rs index c1658f2..97de165 100644 --- a/src/ir/message.rs +++ b/src/ir/message.rs @@ -1,34 +1,36 @@ -use super::{ - record::RecordImpl, OpId, RecordEntryState, ReplicaIndex, ReplicaUpcalls, ViewNumber, -}; +use super::{record::RecordImpl, OpId, RecordEntryState, ReplicaUpcalls, View, ViewNumber}; +use crate::Transport; use serde::{Deserialize, Serialize}; use std::fmt::Debug; -pub type Message = MessageImpl< +pub type Message = MessageImpl< ::UO, ::UR, ::IO, ::CO, ::CR, + >::Address, >; #[derive(Clone, derive_more::From, derive_more::TryInto, Serialize, Deserialize)] -pub enum MessageImpl { +pub enum MessageImpl { RequestUnlogged(RequestUnlogged), - ReplyUnlogged(ReplyUnlogged), + ReplyUnlogged(ReplyUnlogged), ProposeInconsistent(ProposeInconsistent), ProposeConsensus(ProposeConsensus), - ReplyInconsistent(ReplyInconsistent), - ReplyConsensus(ReplyConsensus), + ReplyInconsistent(ReplyInconsistent), + ReplyConsensus(ReplyConsensus), FinalizeInconsistent(FinalizeInconsistent), FinalizeConsensus(FinalizeConsensus), - Confirm(Confirm), - DoViewChange(DoViewChange), - StartView(StartView), + Confirm(Confirm), + DoViewChange(DoViewChange), + StartView(StartView), + AddMember(AddMember), + RemoveMember(RemoveMember), } -impl Debug - for MessageImpl +impl Debug + for MessageImpl { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { @@ -43,6 +45,8 @@ impl Debug Self::Confirm(r) => Debug::fmt(r, f), Self::DoViewChange(r) => Debug::fmt(r, f), Self::StartView(r) => Debug::fmt(r, f), + Self::AddMember(r) => Debug::fmt(r, f), + Self::RemoveMember(r) => Debug::fmt(r, f), } } } @@ -53,11 +57,9 @@ pub struct RequestUnlogged { } #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ReplyUnlogged { +pub struct ReplyUnlogged { pub result: UR, - /// Current view number, for priming the - /// client's ability to send `recent`. - pub view_number: ViewNumber, + pub view: View, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -81,18 +83,18 @@ pub struct ProposeConsensus { } #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ReplyInconsistent { +pub struct ReplyInconsistent { pub op_id: OpId, - pub view_number: ViewNumber, + pub view: View, /// If `None`, the request couldn't be processed because /// `recent` wasn't recent. pub state: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ReplyConsensus { +pub struct ReplyConsensus { pub op_id: OpId, - pub view_number: ViewNumber, + pub view: View, /// If `None`, the request couldn't be processed because /// `recent` wasn't recent. pub result_state: Option<(CR, RecordEntryState)>, @@ -110,34 +112,31 @@ pub struct FinalizeConsensus { } #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Confirm { +pub struct Confirm { pub op_id: OpId, - pub view_number: ViewNumber, + pub view: View, } /// Informs a replica about a new view. #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct DoViewChange { - /// View number to change to. - pub view_number: ViewNumber, +pub struct DoViewChange { + /// View to change to. + pub view: View, /// Is `Some` when sent from replica to new leader. - pub addendum: Option>, + pub addendum: Option>, } #[derive(Clone, Serialize, Deserialize)] -pub struct ViewChangeAddendum { - /// Sender replica's index. - pub replica_index: ReplicaIndex, +pub struct ViewChangeAddendum { /// Sender replica's record. pub record: RecordImpl, /// Latest view in which sender replica had a normal state. - pub latest_normal_view: ViewNumber, + pub latest_normal_view: View, } -impl Debug for ViewChangeAddendum { +impl Debug for ViewChangeAddendum { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Addendum") - .field("replica_index", &self.replica_index) .field("latest_normal_view", &self.latest_normal_view) .finish_non_exhaustive() } @@ -145,17 +144,27 @@ impl Debug for ViewChangeAddendum { /// From leader to inform a replica that a new view has begun. #[derive(Clone, Serialize, Deserialize)] -pub struct StartView { +pub struct StartView { /// Leader's merged record. pub record: RecordImpl, - /// New view number. - pub view_number: ViewNumber, + /// New view. + pub view: View, } -impl Debug for StartView { +impl Debug for StartView { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("StartView") - .field("view_number", &self.view_number) + .field("view", &self.view) .finish_non_exhaustive() } } + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct AddMember { + pub address: A, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct RemoveMember { + pub address: A, +} diff --git a/src/ir/mod.rs b/src/ir/mod.rs index a5b504c..357d4e2 100644 --- a/src/ir/mod.rs +++ b/src/ir/mod.rs @@ -12,16 +12,14 @@ mod tests; pub use client::{Client, Id as ClientId}; pub use membership::{Membership, Size as MembershipSize}; pub use message::{ - Confirm, DoViewChange, FinalizeConsensus, FinalizeInconsistent, Message, ProposeConsensus, - ProposeInconsistent, ReplyConsensus, ReplyInconsistent, ReplyUnlogged, RequestUnlogged, - StartView, ViewChangeAddendum, + AddMember, Confirm, DoViewChange, FinalizeConsensus, FinalizeInconsistent, Message, + ProposeConsensus, ProposeInconsistent, RemoveMember, ReplyConsensus, ReplyInconsistent, + ReplyUnlogged, RequestUnlogged, StartView, ViewChangeAddendum, }; pub use op::Id as OpId; pub use record::{ ConsensusEntry as RecordConsensusEntry, Consistency, InconsistentEntry as RecordInconsistentEntry, Record, State as RecordEntryState, }; -pub use replica::{ - Index as ReplicaIndex, Replica, Status as ReplicaStatus, Upcalls as ReplicaUpcalls, -}; +pub use replica::{Replica, Status as ReplicaStatus, Upcalls as ReplicaUpcalls}; pub use view::{Number as ViewNumber, View}; diff --git a/src/ir/op.rs b/src/ir/op.rs index 8d47802..9dc1a3f 100644 --- a/src/ir/op.rs +++ b/src/ir/op.rs @@ -1,5 +1,5 @@ -use serde::{Deserialize, Serialize}; use super::ClientId; +use serde::{Deserialize, Serialize}; use std::fmt::Debug; #[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)] diff --git a/src/ir/replica.rs b/src/ir/replica.rs index 01e2be6..eb8ff4c 100644 --- a/src/ir/replica.rs +++ b/src/ir/replica.rs @@ -1,8 +1,8 @@ use super::{ - message::ViewChangeAddendum, Confirm, DoViewChange, FinalizeConsensus, FinalizeInconsistent, - Membership, Message, OpId, ProposeConsensus, ProposeInconsistent, Record, RecordConsensusEntry, - RecordEntryState, RecordInconsistentEntry, ReplyConsensus, ReplyInconsistent, ReplyUnlogged, - RequestUnlogged, StartView, View, ViewNumber, + message::ViewChangeAddendum, AddMember, Confirm, DoViewChange, FinalizeConsensus, + FinalizeInconsistent, Membership, Message, OpId, ProposeConsensus, ProposeInconsistent, Record, + RecordConsensusEntry, RecordEntryState, RecordInconsistentEntry, RemoveMember, ReplyConsensus, + ReplyInconsistent, ReplyUnlogged, RequestUnlogged, StartView, View, ViewNumber, }; use crate::{Transport, TransportMessage}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; @@ -14,15 +14,6 @@ use std::{ time::{Duration, Instant}, }; -#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)] -pub struct Index(pub usize); - -impl Debug for Index { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "R({})", self.0) - } -} - #[derive(Debug)] pub enum Status { Normal, @@ -72,22 +63,17 @@ pub trait Upcalls: Sized + Send + Serialize + DeserializeOwned + 'static { d: HashMap, u: Vec<(OpId, Self::CO, Self::CR)>, ) -> HashMap; - fn tick>>( - &mut self, - membership: &Membership, - transport: &T, - ) { + fn tick>(&mut self, membership: &Membership, transport: &T) { let _ = (membership, transport); // No-op. } } -pub struct Replica>> { - index: Index, +pub struct Replica> { inner: Arc>, } -impl>> Debug for Replica { +impl> Debug for Replica { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let mut s = f.debug_struct("Replica"); if let Ok(sync) = self.inner.sync.try_lock() { @@ -101,44 +87,44 @@ impl>> Debug for Replica { } } -struct Inner>> { +struct Inner> { transport: T, sync: Mutex>, } -struct Sync>> { +struct Sync> { status: Status, - view: View, - latest_normal_view: ViewNumber, + view: View, + latest_normal_view: View, changed_view_recently: bool, upcalls: U, record: Record, - outstanding_do_view_changes: HashMap>, + outstanding_do_view_changes: HashMap>, /// Last time received message from each peer replica. - peer_liveness: HashMap, + peer_liveness: HashMap, } #[derive(Serialize, Deserialize)] -struct PersistentViewInfo { - view: ViewNumber, - latest_normal_view: ViewNumber, +struct PersistentViewInfo { + view: View, + latest_normal_view: View, } -impl>> Replica { +impl> Replica { const VIEW_CHANGE_INTERVAL: Duration = Duration::from_secs(4); - pub fn new(index: Index, membership: Membership, upcalls: U, transport: T) -> Self { + pub fn new(membership: Membership, upcalls: U, transport: T) -> Self { + let view = View { + membership, + number: ViewNumber(0), + }; let ret = Self { - index, inner: Arc::new(Inner { transport, sync: Mutex::new(Sync { status: Status::Normal, - view: View { - membership, - number: ViewNumber(0), - }, - latest_normal_view: ViewNumber(0), + latest_normal_view: view.clone(), + view, changed_view_recently: true, upcalls, record: Record::::default(), @@ -152,20 +138,20 @@ impl>> Replica { if let Some(persistent) = ret .inner .transport - .persisted::(&ret.view_info_key()) + .persisted::>(&ret.view_info_key()) { sync.status = Status::Recovering; - sync.view.number = persistent.view; + sync.view = persistent.view; sync.latest_normal_view = persistent.latest_normal_view; sync.view.number.0 += 1; - if sync.view.leader_index() == ret.index { + if sync.view.leader() == ret.inner.transport.address() { sync.view.number.0 += 1; } ret.persist_view_info(&*sync); - Self::broadcast_do_view_change(ret.index, &ret.inner.transport, &mut *sync); + Self::broadcast_do_view_change(&ret.inner.transport, &mut *sync); } else { ret.persist_view_info(&*sync); } @@ -175,8 +161,16 @@ impl>> Replica { ret } + pub fn transport(&self) -> &T { + &self.inner.transport + } + + pub fn address(&self) -> T::Address { + self.inner.transport.address() + } + fn view_info_key(&self) -> String { - format!("ir_replica_{}", self.index.0) + format!("ir_replica_{}", self.inner.transport.address()) } fn persist_view_info(&self, sync: &Sync) { @@ -186,14 +180,13 @@ impl>> Replica { self.inner.transport.persist( &self.view_info_key(), Some(&PersistentViewInfo { - view: sync.view.number, - latest_normal_view: sync.latest_normal_view, + view: sync.view.clone(), + latest_normal_view: sync.latest_normal_view.clone(), }), ); } fn tick(&self) { - let my_index = self.index; let inner = Arc::downgrade(&self.inner); tokio::spawn(async move { loop { @@ -203,9 +196,15 @@ impl>> Replica { break; }; let mut sync = inner.sync.lock().unwrap(); + let sync = &mut *sync; + + sync.peer_liveness + .retain(|a, _| sync.view.membership.contains(*a)); + if sync.changed_view_recently { sync.changed_view_recently = false; - } else if sync + } + /* else if sync .peer_liveness .get(&Index( ((sync.view.number.0 + 1) % sync.view.membership.len() as u64) as usize, @@ -214,18 +213,20 @@ impl>> Replica { .unwrap_or(false) { // skip this view change. - } else { + } */ + else { if sync.status.is_normal() { sync.status = Status::ViewChanging; } sync.view.number.0 += 1; eprintln!( - "{my_index:?} timeout sending do view change {}", + "{:?} timeout sending do view change {}", + inner.transport.address(), sync.view.number.0 ); - Self::broadcast_do_view_change(my_index, &inner.transport, &mut *sync); + Self::broadcast_do_view_change(&inner.transport, &mut *sync); } } }); @@ -248,53 +249,59 @@ impl>> Replica { }); } - fn broadcast_do_view_change(my_index: Index, transport: &T, sync: &mut Sync) { + fn broadcast_do_view_change(transport: &T, sync: &mut Sync) { sync.changed_view_recently = true; - for (index, address) in &sync.view.membership { - if index == my_index { + let destinations = sync + .view + .membership + .iter() + .chain(sync.latest_normal_view.membership.iter()) + .collect::>(); + + for address in destinations { + if address == transport.address() { continue; } transport.do_send( address, - Message::::DoViewChange(DoViewChange { - view_number: sync.view.number, - addendum: (index == sync.view.leader_index()).then(|| ViewChangeAddendum { + Message::::DoViewChange(DoViewChange { + view: sync.view.clone(), + addendum: (address == sync.view.leader()).then(|| ViewChangeAddendum { record: sync.record.clone(), - replica_index: my_index, - latest_normal_view: sync.latest_normal_view, + latest_normal_view: sync.latest_normal_view.clone(), }), }), ) } } - pub fn receive(&self, address: T::Address, message: Message) -> Option> { + pub fn receive(&self, address: T::Address, message: Message) -> Option> { let mut sync = self.inner.sync.lock().unwrap(); let sync = &mut *sync; - if let Some(index) = sync.view.membership.get_index(address) { - sync.peer_liveness.insert(index, Instant::now()); + if sync.view.membership.get_index(address).is_some() { + sync.peer_liveness.insert(address, Instant::now()); } match message { - Message::::RequestUnlogged(RequestUnlogged { op }) => { + Message::::RequestUnlogged(RequestUnlogged { op }) => { if sync.status.is_normal() { let result = sync.upcalls.exec_unlogged(op); - return Some(Message::::ReplyUnlogged(ReplyUnlogged { + return Some(Message::::ReplyUnlogged(ReplyUnlogged { result, - view_number: sync.view.number, + view: sync.view.clone(), })); } else { - eprintln!("{:?} abnormal", self.index); + eprintln!("{:?} abnormal", self.inner.transport.address()); } } - Message::::ProposeInconsistent(ProposeInconsistent { op_id, op, recent }) => { + Message::::ProposeInconsistent(ProposeInconsistent { op_id, op, recent }) => { if sync.status.is_normal() { if !recent.is_recent_relative_to(sync.view.number) { eprintln!("ancient relative to {:?}", sync.view.number); - return Some(Message::::ReplyInconsistent(ReplyInconsistent { + return Some(Message::::ReplyInconsistent(ReplyInconsistent { op_id, - view_number: sync.view.number, + view: sync.view.clone(), state: None, })); } @@ -312,20 +319,20 @@ impl>> Replica { } }; - return Some(Message::::ReplyInconsistent(ReplyInconsistent { + return Some(Message::::ReplyInconsistent(ReplyInconsistent { op_id, - view_number: sync.view.number, + view: sync.view.clone(), state: Some(state), })); } } - Message::::ProposeConsensus(ProposeConsensus { op_id, op, recent }) => { + Message::::ProposeConsensus(ProposeConsensus { op_id, op, recent }) => { if sync.status.is_normal() { if !recent.is_recent_relative_to(sync.view.number) { eprintln!("ancient relative to {:?}", sync.view.number); - return Some(Message::::ReplyConsensus(ReplyConsensus { + return Some(Message::::ReplyConsensus(ReplyConsensus { op_id, - view_number: sync.view.number, + view: sync.view.clone(), result_state: None, })); } @@ -345,22 +352,22 @@ impl>> Replica { } }; - return Some(Message::::ReplyConsensus(ReplyConsensus { + return Some(Message::::ReplyConsensus(ReplyConsensus { op_id, - view_number: sync.view.number, + view: sync.view.clone(), result_state: Some((result, state)), })); } else { - eprintln!("{:?} abnormal", self.index); + eprintln!("{:?} abnormal", self.inner.transport.address()); } } - Message::::FinalizeInconsistent(FinalizeInconsistent { op_id }) => { + Message::::FinalizeInconsistent(FinalizeInconsistent { op_id }) => { if sync.status.is_normal() && let Some(entry) = sync.record.inconsistent.get_mut(&op_id) && entry.state.is_tentative() { entry.state = RecordEntryState::Finalized(sync.view.number); sync.upcalls.exec_inconsistent(&entry.op); } } - Message::::FinalizeConsensus(FinalizeConsensus { op_id, result }) => { + Message::::FinalizeConsensus(FinalizeConsensus { op_id, result }) => { if sync.status.is_normal() { if let Some(entry) = sync.record.consensus.get_mut(&op_id) { // Don't allow a late `FinalizeConsensus` to overwrite @@ -376,25 +383,25 @@ impl>> Replica { // Send `Confirm` regardless; the view number gives the // client enough information to retry if needed. - return Some(Message::::Confirm(Confirm { + return Some(Message::::Confirm(Confirm { op_id, - view_number: sync.view.number, + view: sync.view.clone(), })); } } } - Message::::DoViewChange(msg) => { - if msg.view_number > sync.view.number - || (msg.view_number == sync.view.number && sync.status.is_view_changing()) + Message::::DoViewChange(msg) => { + //eprintln!("{:?} receiving dvt {:?} and have {:?} / {:?}", self.inner.transport.address(), msg.view.number, sync.view.number, sync.status); + if msg.view.number > sync.view.number + || (msg.view.number == sync.view.number && sync.status.is_view_changing()) { - if msg.view_number > sync.view.number { - sync.view.number = msg.view_number; + if msg.view.number > sync.view.number { + sync.view = msg.view.clone(); if sync.status.is_normal() { sync.status = Status::ViewChanging; } self.persist_view_info(&*sync); Self::broadcast_do_view_change( - self.index, &self.inner.transport, &mut *sync, ); @@ -408,59 +415,63 @@ impl>> Replica { ); */ - if self.index == sync.view.leader_index() && let Some(addendum) = msg.addendum.as_ref() { - let msg_view_number = msg.view_number; - match sync.outstanding_do_view_changes.entry(addendum.replica_index) { + if self.inner.transport.address() == sync.view.leader() && msg.addendum.is_some() { + let msg_view_number = msg.view.number; + match sync.outstanding_do_view_changes.entry(address) { Entry::Vacant(vacant) => { vacant.insert(msg); } Entry::Occupied(mut occupied) => { - if msg.view_number < occupied.get().view_number { + if msg.view.number < occupied.get().view.number { return None; } occupied.insert(msg); } } - let threshold = sync.view.membership.size().f(); + let my_address = self.inner.transport.address(); + let synthetic = DoViewChange{ view: sync.view.clone(), addendum: Some(ViewChangeAddendum { record: sync.record.clone(), latest_normal_view: sync.latest_normal_view.clone() }) }; let matching = sync .outstanding_do_view_changes - .values() - .filter(|other| other.view_number == sync.view.number); + .iter() + .chain(std::iter::once((&my_address, &synthetic))) + .filter(|(address, other)| + sync.latest_normal_view.membership.contains(**address) + && other.view.number == sync.view.number + ); - if matching.clone().count() >= threshold { - eprintln!("DOING VIEW CHANGE"); + if matching.clone().count() >= sync.latest_normal_view.membership.size().f_plus_one() { + eprintln!("{:?} DOING VIEW CHANGE", self.inner.transport.address()); { - let latest_normal_view = sync.latest_normal_view.max( + let latest_normal_view = matching .clone() - .map(|r| { - r.addendum.as_ref().unwrap().latest_normal_view + .map(|(_, r)| { + &r.addendum.as_ref().unwrap().latest_normal_view }) - .max() - .unwrap(), - ); - let mut latest_records = matching + .chain(std::iter::once(&sync.latest_normal_view)) + .max_by_key(|v| v.number) + .unwrap() + ; + let latest_records = matching .clone() - .filter(|r| { - r.addendum.as_ref().unwrap().latest_normal_view - == latest_normal_view + .filter(|(_, r)| { + r.addendum.as_ref().unwrap().latest_normal_view.number + == latest_normal_view.number }) - .map(|r| r.addendum.as_ref().unwrap().record.clone()) + .map(|(_, r)| r.addendum.as_ref().unwrap().record.clone()) .collect::>(); - if sync.latest_normal_view == latest_normal_view { - latest_records.push(sync.record.clone()); - } + eprintln!( "have {} latest ({:?})", latest_records.len(), sync .outstanding_do_view_changes .iter() - .map(|(i, dvt)| (*i, dvt.view_number, dvt.addendum.as_ref().unwrap().latest_normal_view)) + .map(|(a, dvt)| (*a, dvt.view.number, dvt.addendum.as_ref().unwrap().latest_normal_view.number)) .chain( std::iter::once( - (self.index, sync.view.number, sync.latest_normal_view) + (self.inner.transport.address(), sync.view.number, sync.latest_normal_view.number) ) ) .collect::>() @@ -538,7 +549,7 @@ impl>> Replica { .count(); if matches - >= sync.view.membership.size().f_over_two_plus_one() + >= sync.latest_normal_view.membership.size().f_over_two_plus_one() { majority_result_in_d = Some(entry.result.clone()); break; @@ -586,17 +597,27 @@ impl>> Replica { sync.changed_view_recently = true; sync.status = Status::Normal; sync.view.number = msg_view_number; - sync.latest_normal_view = msg_view_number; + + let destinations = sync + .view + .membership + .iter() + .chain(sync.latest_normal_view.membership.iter()) + .collect::>(); + + sync.latest_normal_view.number = msg_view_number; + sync.latest_normal_view.membership = sync.view.membership.clone(); self.persist_view_info(&*sync); - for (index, address) in &sync.view.membership { - if index == self.index { + + for address in destinations { + if address == self.inner.transport.address() { continue; } self.inner.transport.do_send( address, - Message::::StartView(StartView { + Message::::StartView(StartView { record: sync.record.clone(), - view_number: sync.view.number, + view: sync.view.clone(), }), ); } @@ -608,22 +629,70 @@ impl>> Replica { } } } - Message::::StartView(StartView { + Message::::StartView(StartView { record: new_record, - view_number, + view, }) => { - if view_number > sync.view.number - || (view_number == sync.view.number || !sync.status.is_normal()) + if view.number > sync.view.number + || (view.number == sync.view.number || !sync.status.is_normal()) { - eprintln!("{:?} starting view {view_number:?}", self.index); + eprintln!("{:?} starting view {view:?}", self.inner.transport.address()); sync.upcalls.sync(&sync.record, &new_record); sync.record = new_record; sync.status = Status::Normal; - sync.view.number = view_number; - sync.latest_normal_view = view_number; + sync.view = view.clone(); + sync.latest_normal_view = view; self.persist_view_info(&*sync); } } + Message::::AddMember(AddMember{address}) => { + println!("{:?} recv add member {address:?}", self.inner.transport.address()); + if sync.status.is_normal() && sync.view.membership.get_index(address).is_none() { + if !sync.view.membership.contains(self.inner.transport.address()) { + // TODO: Expand coverage. + return None; + } + println!("{:?} acting on add member {address:?}", self.inner.transport.address()); + + sync.status = Status::ViewChanging; + sync.view.number.0 += 3; + + // Add the node. + sync.view.membership = Membership::new( + sync.view.membership + .iter() + .chain(std::iter::once(address)) + .collect() + ); + self.persist_view_info(&*sync); + + // Election. + Self::broadcast_do_view_change(&self.inner.transport, sync); + } + } + Message::::RemoveMember(RemoveMember{address}) => { + println!("{:?} recv remove member {address:?}", self.inner.transport.address()); + if sync.status.is_normal() && sync.view.membership.get_index(address).is_some() && sync.view.membership.len() > 1 && address != self.inner.transport.address() { + if !sync.view.membership.contains(self.inner.transport.address()) { + return None; + } + println!("{:?} acting on remove member {address:?}", self.inner.transport.address()); + sync.status = Status::ViewChanging; + sync.view.number.0 += 3; + + // Remove the node. + sync.view.membership = Membership::new( + sync.view.membership + .iter() + .filter(|a| *a != address) + .collect() + ); + self.persist_view_info(&*sync); + + // Election. + Self::broadcast_do_view_change(&self.inner.transport, sync); + } + } _ => { eprintln!("unexpected message"); } diff --git a/src/ir/tests/lock_server.rs b/src/ir/tests/lock_server.rs index 7277d53..7e7b0e2 100644 --- a/src/ir/tests/lock_server.rs +++ b/src/ir/tests/lock_server.rs @@ -1,8 +1,8 @@ use crate::{ ChannelRegistry, ChannelTransport, IrClient, IrClientId, IrMembership, IrMembershipSize, - IrMessage, IrOpId, IrRecord, IrReplica, IrReplicaIndex, - IrReplicaUpcalls, Transport, + IrOpId, IrRecord, IrReplica, IrReplicaUpcalls, Transport, }; +use rand::{seq::IteratorRandom, thread_rng, Rng}; use serde::{Deserialize, Serialize}; use std::{ collections::{HashMap, HashSet}, @@ -10,24 +10,56 @@ use std::{ time::Duration, }; +#[tokio::test] +async fn lock_server_1() { + timeout_lock_server(1).await; +} + +#[tokio::test] +async fn lock_server_2() { + timeout_lock_server(2).await; +} + #[tokio::test] async fn lock_server_3() { - lock_server(3).await; + timeout_lock_server(3).await; +} + +#[tokio::test] +async fn lock_server_4() { + timeout_lock_server(4).await; } #[tokio::test] async fn lock_server_5() { - lock_server(5).await; + timeout_lock_server(5).await; } #[tokio::test] async fn lock_server_7() { - lock_server(7).await; + timeout_lock_server(7).await; } #[tokio::test] async fn lock_server_9() { - lock_server(9).await; + timeout_lock_server(9).await; +} + +#[ignore] +#[tokio::test] +async fn lock_server_loop() { + loop { + timeout_lock_server(3).await; + } +} + +async fn timeout_lock_server(num_replicas: usize) { + tokio::time::timeout( + Duration::from_secs((num_replicas as u64 + 10) * 10), + lock_server(num_replicas), + ) + .await + .unwrap(); } async fn lock_server(num_replicas: usize) { @@ -45,8 +77,6 @@ async fn lock_server(num_replicas: usize) { No, } - type Message = IrMessage; - #[derive(Serialize, Deserialize)] struct Upcalls { locked: Option, @@ -63,11 +93,13 @@ async fn lock_server(num_replicas: usize) { let _ = op; unreachable!(); } + fn exec_inconsistent(&mut self, op: &Self::IO) { if Some(op.0) == self.locked { self.locked = None; } } + fn exec_consensus(&mut self, op: &Self::CO) -> Self::CR { if self.locked.is_none() || self.locked == Some(op.0) { self.locked = Some(op.0); @@ -76,6 +108,7 @@ async fn lock_server(num_replicas: usize) { LockResult::No } } + fn sync(&mut self, _: &IrRecord, record: &IrRecord) { self.locked = None; @@ -99,6 +132,7 @@ async fn lock_server(num_replicas: usize) { } } } + fn merge( &mut self, d: HashMap, @@ -111,7 +145,7 @@ async fn lock_server(num_replicas: usize) { results.insert( *op_id, - if successful && self.locked.is_none() { + if successful && (self.locked.is_none() || self.locked == Some(request.0)) { self.locked = Some(request.0); LockResult::Ok } else { @@ -120,8 +154,8 @@ async fn lock_server(num_replicas: usize) { ); } - for (op_id, _, _) in &u { - results.insert(*op_id, LockResult::No); + for (op_id, op, _) in &u { + results.insert(*op_id, self.exec_consensus(op)); } results @@ -132,35 +166,34 @@ async fn lock_server(num_replicas: usize) { let membership = IrMembership::new((0..num_replicas).collect::>()); fn create_replica( - index: IrReplicaIndex, - registry: &ChannelRegistry, - membership: &IrMembership>, - ) -> Arc>> { + registry: &ChannelRegistry, + membership: &IrMembership, + ) -> Arc>> { Arc::new_cyclic( - |weak: &std::sync::Weak>>| { + |weak: &std::sync::Weak>>| { let weak = weak.clone(); let channel = registry.channel(move |from, message| weak.upgrade()?.receive(from, message)); let upcalls = Upcalls { locked: None }; - IrReplica::new(index, membership.clone(), upcalls, channel) + IrReplica::new(membership.clone(), upcalls, channel) }, ) } - let replicas = (0..num_replicas) - .map(|i| create_replica(IrReplicaIndex(i), ®istry, &membership)) + let mut replicas = std::iter::repeat_with(|| create_replica(®istry, &membership)) + .take(num_replicas) .collect::>(); fn create_client( - registry: &ChannelRegistry, - membership: &IrMembership>, - ) -> Arc>> { + registry: &ChannelRegistry, + membership: &IrMembership, + ) -> Arc>> { let channel = registry.channel(move |_, _| unreachable!()); Arc::new(IrClient::new(membership.clone(), channel)) } - let clients = (0..2) - .map(|_| create_client(®istry, &membership)) + let clients = std::iter::repeat_with(|| create_client(®istry, &membership)) + .take(2) .collect::>(); let decide_lock = |results: HashMap, membership: IrMembershipSize| { @@ -172,27 +205,65 @@ async fn lock_server(num_replicas: usize) { } }; - for _ in 0..2 { + fn add_replica( + replicas: &mut Vec>>>, + registry: &ChannelRegistry, + membership: &IrMembership, + ) { + let new = create_replica(®istry, &membership); + for d in &*replicas { + new.transport().do_send( + d.address(), + crate::ir::AddMember { + address: new.address(), + }, + ); + } + replicas.push(new); + } + + for i in 0..8 { assert_eq!( clients[0] .invoke_consensus(Lock(clients[0].id()), &decide_lock) .await, - LockResult::Ok + LockResult::Ok, + "{i}" ); + assert_eq!( clients[1] .invoke_consensus(Lock(clients[1].id()), &decide_lock) .await, - LockResult::No + LockResult::No, + "{i}" ); + + for _ in 0..2 { + if thread_rng().gen() { + let to_remove = replicas + .iter() + .map(|r| r.address()) + .choose(&mut thread_rng()) + .unwrap(); + for r in replicas.iter() { + clients[0] + .transport() + .do_send(r.address(), crate::ir::RemoveMember { address: to_remove }); + } + } + if thread_rng().gen() { + add_replica(&mut replicas, ®istry, &membership); + } + } } clients[0] .invoke_inconsistent(Unlock(clients[0].id())) .await; - for _ in 0..10 { - ChannelTransport::::sleep(Duration::from_secs(5)).await; + for _ in 0..(replicas.len() + 1) * 20 { + ChannelTransport::::sleep(Duration::from_secs(5)).await; eprintln!("@@@@@ INVOKE {replicas:?}"); if clients[1] diff --git a/src/ir/view.rs b/src/ir/view.rs index 958b9c3..fb5b2ee 100644 --- a/src/ir/view.rs +++ b/src/ir/view.rs @@ -1,5 +1,4 @@ -use super::{Membership, ReplicaIndex}; -use crate::transport::Transport; +use super::Membership; use serde::{Deserialize, Serialize}; use std::fmt::Debug; @@ -17,13 +16,17 @@ impl Debug for Number { write!(f, "V({})", self.0) } } -pub struct View { - pub membership: Membership, + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct View { + pub membership: Membership, pub number: Number, } -impl View { - pub fn leader_index(&self) -> ReplicaIndex { - ReplicaIndex((self.number.0 % self.membership.len() as u64) as usize) +impl View { + pub fn leader(&self) -> A { + self.membership + .get((self.number.0 % self.membership.len() as u64) as usize) + .unwrap() } } diff --git a/src/lib.rs b/src/lib.rs index c5d4b54..1b3cce3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,7 +16,7 @@ pub use ir::{ Client as IrClient, ClientId as IrClientId, Membership as IrMembership, MembershipSize as IrMembershipSize, Message as IrMessage, OpId as IrOpId, Record as IrRecord, RecordConsensusEntry as IrRecordConsensusEntry, Replica as IrReplica, - ReplicaIndex as IrReplicaIndex, ReplicaUpcalls as IrReplicaUpcalls, + ReplicaUpcalls as IrReplicaUpcalls, }; pub use mvcc::Store as MvccStore; pub use occ::{ diff --git a/src/tapir/client.rs b/src/tapir/client.rs index 20f73f6..9605735 100644 --- a/src/tapir/client.rs +++ b/src/tapir/client.rs @@ -1,5 +1,5 @@ use super::{Key, Replica, ShardClient, ShardTransaction, Timestamp, Value}; -use crate::{IrMembership, IrMessage, OccPrepareResult, OccTransactionId, Transport}; +use crate::{IrMembership, OccPrepareResult, OccTransactionId, Transport}; use rand::{thread_rng, Rng}; use std::{ future::Future, @@ -8,7 +8,7 @@ use std::{ }; use tokio::select; -pub struct Client { +pub struct Client>> { /// TODO: Add multiple shards. inner: ShardClient, #[allow(unused)] @@ -16,15 +16,15 @@ pub struct Client { next_transaction_number: AtomicU64, } -pub struct Transaction { +pub struct Transaction>> { #[allow(unused)] id: OccTransactionId, // TODO: Multiple shards. inner: ShardTransaction, } -impl>>> Client { - pub fn new(membership: IrMembership, transport: T) -> Self { +impl>> Client { + pub fn new(membership: IrMembership, transport: T) -> Self { Self { inner: ShardClient::new(membership, transport.clone()), transport, @@ -44,7 +44,7 @@ impl>>> Client< } } -impl>>> Transaction { +impl>> Transaction { pub fn get(&self, key: K) -> impl Future> { self.inner.get(key) } diff --git a/src/tapir/key_value.rs b/src/tapir/key_value.rs index f73b852..f9bc5ce 100644 --- a/src/tapir/key_value.rs +++ b/src/tapir/key_value.rs @@ -1,5 +1,5 @@ -use std::{fmt::Debug, hash::Hash}; use serde::{de::DeserializeOwned, Serialize}; +use std::{fmt::Debug, hash::Hash}; pub trait Key: Debug + Clone + Ord + Hash + Send + Sync + Serialize + DeserializeOwned + 'static diff --git a/src/tapir/replica.rs b/src/tapir/replica.rs index 47e02b4..354f487 100644 --- a/src/tapir/replica.rs +++ b/src/tapir/replica.rs @@ -2,8 +2,8 @@ use super::{Key, Timestamp, Value, CO, CR, IO, UO, UR}; use crate::ir::ReplyUnlogged; use crate::util::vectorize; use crate::{ - IrClient, IrMembership, IrMembershipSize, IrMessage, IrOpId, IrRecord, IrReplicaIndex, - IrReplicaUpcalls, OccPrepareResult, OccStore, OccTransaction, OccTransactionId, Transport, + IrClient, IrMembership, IrMembershipSize, IrOpId, IrRecord, IrReplicaUpcalls, OccPrepareResult, + OccStore, OccTransaction, OccTransactionId, Transport, }; use serde::{Deserialize, Serialize}; use std::task::Context; @@ -52,11 +52,11 @@ impl Replica { } } - fn recover_coordination>>( + fn recover_coordination>( transaction_id: OccTransactionId, transaction: OccTransaction, commit: Timestamp, - membership: IrMembership, + membership: IrMembership, transport: T, ) -> impl Future { eprintln!("trying to recover {transaction_id:?}"); @@ -110,11 +110,11 @@ impl Replica { return; } - fn decide( - results: &HashMap>>, + fn decide( + results: &HashMap, A>>, membership: IrMembershipSize, ) -> Option> { - let highest_view = results.values().map(|r| r.view_number).max()?; + let highest_view = results.values().map(|r| r.view.number).max()?; Some( if results .values() @@ -124,7 +124,7 @@ impl Replica { } else if results .values() .filter(|r| { - r.view_number == highest_view + r.view.number == highest_view && matches!(r.result, UR::CheckPrepare(OccPrepareResult::Ok)) }) .count() @@ -134,7 +134,7 @@ impl Replica { } else if results .values() .filter(|r| { - r.view_number == highest_view + r.view.number == highest_view && matches!(r.result, UR::CheckPrepare(OccPrepareResult::TooLate)) }) .count() @@ -156,7 +156,7 @@ impl Replica { let mut timeout = std::pin::pin!(T::sleep(Duration::from_millis(1000))); let results = future .until( - |results: &HashMap>>, + |results: &HashMap, T::Address>>, cx: &mut Context<'_>| { decide(results, membership).is_some() || timeout.as_mut().poll(cx).is_ready() @@ -574,11 +574,7 @@ impl IrReplicaUpcalls for Replica { ret } - fn tick>>( - &mut self, - membership: &IrMembership, - transport: &T, - ) { + fn tick>(&mut self, membership: &IrMembership, transport: &T) { eprintln!( "there are {} prepared transactions", self.inner.prepared.len() diff --git a/src/tapir/shard_client.rs b/src/tapir/shard_client.rs index e720fb8..8b1038f 100644 --- a/src/tapir/shard_client.rs +++ b/src/tapir/shard_client.rs @@ -1,7 +1,7 @@ use super::{Key, Replica, Timestamp, Value, CO, CR, IO, UO, UR}; use crate::{ - transport::Transport, IrClient, IrClientId, IrMembership, IrMessage, OccPrepareResult, - OccTransaction, OccTransactionId, + transport::Transport, IrClient, IrClientId, IrMembership, OccPrepareResult, OccTransaction, + OccTransactionId, }; use std::{ collections::HashMap, @@ -9,12 +9,12 @@ use std::{ sync::{Arc, Mutex}, }; -pub struct ShardClient { +pub struct ShardClient>> { inner: IrClient, T>, } -impl>>> ShardClient { - pub fn new(membership: IrMembership, transport: T) -> Self { +impl>> ShardClient { + pub fn new(membership: IrMembership, transport: T) -> Self { Self { inner: IrClient::new(membership, transport), } @@ -30,12 +30,12 @@ impl>>> ShardCl } } -pub struct ShardTransaction { +pub struct ShardTransaction>> { pub client: IrClient, T>, inner: Arc>>, } -impl Clone for ShardTransaction { +impl>> Clone for ShardTransaction { fn clone(&self) -> Self { Self { client: self.client.clone(), @@ -50,7 +50,7 @@ struct Inner { read_cache: HashMap>, } -impl>>> ShardTransaction { +impl>> ShardTransaction { fn new(client: IrClient, T>, id: OccTransactionId) -> Self { Self { client, diff --git a/src/tapir/tests/kv.rs b/src/tapir/tests/kv.rs index 9ccca79..2a222c7 100644 --- a/src/tapir/tests/kv.rs +++ b/src/tapir/tests/kv.rs @@ -3,8 +3,8 @@ use rand::{thread_rng, Rng}; use tokio::time::timeout; use crate::{ - ChannelRegistry, ChannelTransport, IrMembership, IrMessage, IrReplica, IrReplicaIndex, - TapirClient, TapirReplica, TapirTimestamp, Transport as _, + ChannelRegistry, ChannelTransport, IrMembership, IrReplica, TapirClient, TapirReplica, + TapirTimestamp, Transport as _, }; use std::{ sync::{ @@ -16,16 +16,15 @@ use std::{ type K = i64; type V = i64; -type Message = IrMessage>; -type Transport = ChannelTransport; +type Transport = ChannelTransport>; fn build_kv( linearizable: bool, num_replicas: usize, num_clients: usize, ) -> ( - Vec, ChannelTransport>>>, - Vec>>>, + Vec, ChannelTransport>>>>, + Vec>>>>, ) { println!("---------------------------"); println!(" linearizable={linearizable} num_replicas={num_replicas}"); @@ -35,36 +34,37 @@ fn build_kv( let membership = IrMembership::new((0..num_replicas).collect::>()); fn create_replica( - index: IrReplicaIndex, - registry: &ChannelRegistry, - membership: &IrMembership>, + registry: &ChannelRegistry>, + membership: &IrMembership, linearizable: bool, - ) -> Arc, ChannelTransport>> { + ) -> Arc, ChannelTransport>>> { Arc::new_cyclic( - |weak: &std::sync::Weak, ChannelTransport>>| { + |weak: &std::sync::Weak< + IrReplica, ChannelTransport>>, + >| { let weak = weak.clone(); let channel = registry.channel(move |from, message| weak.upgrade()?.receive(from, message)); let upcalls = TapirReplica::new(linearizable); - IrReplica::new(index, membership.clone(), upcalls, channel) + IrReplica::new(membership.clone(), upcalls, channel) }, ) } - let replicas = (0..num_replicas) - .map(|i| create_replica(IrReplicaIndex(i), ®istry, &membership, linearizable)) + let replicas = std::iter::repeat_with(|| create_replica(®istry, &membership, linearizable)) + .take(num_replicas) .collect::>(); fn create_client( - registry: &ChannelRegistry, - membership: &IrMembership>, - ) -> Arc>> { + registry: &ChannelRegistry>, + membership: &IrMembership, + ) -> Arc>>> { let channel = registry.channel(move |_, _| unreachable!()); Arc::new(TapirClient::new(membership.clone(), channel)) } - let clients = (0..num_clients) - .map(|_| create_client(®istry, &membership)) + let clients = std::iter::repeat_with(|| create_client(®istry, &membership)) + .take(num_clients) .collect::>(); (replicas, clients) @@ -88,7 +88,12 @@ async fn fuzz_rwr_7() { async fn fuzz_rwr(replicas: usize) { for _ in 0..16 { for linearizable in [false, true] { - rwr(linearizable, replicas).await; + timeout( + Duration::from_secs((replicas as u64 + 5) * 10), + rwr(linearizable, replicas), + ) + .await + .unwrap(); } } } @@ -129,12 +134,21 @@ async fn rwr(linearizable: bool, num_replicas: usize) { #[tokio::test] async fn increment_sequential_3() { - increment_sequential(3).await; + increment_sequential_timeout(3).await; } #[tokio::test] async fn increment_sequential_7() { - increment_sequential(7).await; + increment_sequential_timeout(7).await; +} + +async fn increment_sequential_timeout(num_replicas: usize) { + timeout( + Duration::from_secs((num_replicas as u64 + 10) * 10), + increment_sequential(num_replicas), + ) + .await + .unwrap(); } async fn increment_sequential(num_replicas: usize) { @@ -160,12 +174,21 @@ async fn increment_sequential(num_replicas: usize) { #[tokio::test] async fn increment_parallel_3() { - increment_parallel(3).await; + increment_parallel_timeout(3).await; } #[tokio::test] async fn increment_parallel_7() { - increment_parallel(7).await; + increment_parallel_timeout(7).await; +} + +async fn increment_parallel_timeout(num_replicas: usize) { + timeout( + Duration::from_secs((num_replicas as u64 + 10) * 10), + increment_parallel(num_replicas), + ) + .await + .unwrap(); } async fn increment_parallel(num_replicas: usize) { @@ -205,6 +228,11 @@ async fn throughput_3_lin() { async fn throughput(linearizable: bool, num_replicas: usize, num_clients: usize) { let local = tokio::task::LocalSet::new(); + local.spawn_local(async move { + tokio::time::sleep(Duration::from_secs(60)).await; + panic!("timeout"); + }); + // Run the local task set. local .run_until(async move { @@ -270,41 +298,40 @@ async fn throughput(linearizable: bool, num_replicas: usize, num_clients: usize) #[tokio::test] async fn coordinator_recovery_3_loop() { loop { - timeout(Duration::from_secs(120), coordinator_recovery(3)) - .await - .unwrap(); + timeout_coordinator_recovery(3).await; } } #[tokio::test] async fn coordinator_recovery_3() { - timeout(Duration::from_secs(120), coordinator_recovery(3)) - .await - .unwrap(); + timeout_coordinator_recovery(3).await; } #[tokio::test] async fn coordinator_recovery_5() { - timeout(Duration::from_secs(180), coordinator_recovery(5)) - .await - .unwrap(); + timeout_coordinator_recovery(5).await; } #[ignore] #[tokio::test] async fn coordinator_recovery_7_loop() { loop { - timeout(Duration::from_secs(240), coordinator_recovery(7)) - .await - .unwrap(); + timeout_coordinator_recovery(7).await; } } #[tokio::test] async fn coordinator_recovery_7() { - timeout(Duration::from_secs(240), coordinator_recovery(7)) - .await - .unwrap(); + timeout_coordinator_recovery(7).await; +} + +async fn timeout_coordinator_recovery(num_replicas: usize) { + timeout( + Duration::from_secs((num_replicas as u64 + 10) * 20), + coordinator_recovery(num_replicas), + ) + .await + .unwrap(); } async fn coordinator_recovery(num_replicas: usize) { diff --git a/src/transport/channel.rs b/src/transport/channel.rs index fe28314..42db063 100644 --- a/src/transport/channel.rs +++ b/src/transport/channel.rs @@ -1,4 +1,5 @@ -use super::{Message, Transport}; +use super::Transport; +use crate::{IrMessage, IrReplicaUpcalls}; use rand::{thread_rng, Rng}; use serde::{de::DeserializeOwned, Serialize}; use std::{ @@ -12,11 +13,11 @@ use std::{ const LOG: bool = true; -pub struct Registry { - inner: Arc>>, +pub struct Registry { + inner: Arc>>, } -impl Default for Registry { +impl Default for Registry { fn default() -> Self { Self { inner: Default::default(), @@ -24,12 +25,18 @@ impl Default for Registry { } } -struct Inner { +struct Inner { #[allow(clippy::type_complexity)] - callbacks: Vec Option + Send + Sync>>, + callbacks: Vec< + Arc< + dyn Fn(usize, IrMessage>) -> Option>> + + Send + + Sync, + >, + >, } -impl Default for Inner { +impl Default for Inner { fn default() -> Self { Self { callbacks: Vec::new(), @@ -37,11 +44,14 @@ impl Default for Inner { } } -impl Registry { +impl Registry { pub fn channel( &self, - callback: impl Fn(usize, M) -> Option + Send + Sync + 'static, - ) -> Channel { + callback: impl Fn(usize, IrMessage>) -> Option>> + + Send + + Sync + + 'static, + ) -> Channel { let mut inner = self.inner.write().unwrap(); let address = inner.callbacks.len(); inner.callbacks.push(Arc::new(callback)); @@ -53,13 +63,13 @@ impl Registry { } } -pub struct Channel { +pub struct Channel { address: usize, persistent: Arc>>, - inner: Arc>>, + inner: Arc>>, } -impl Clone for Channel { +impl Clone for Channel { fn clone(&self) -> Self { Self { address: self.address, @@ -69,7 +79,7 @@ impl Clone for Channel { } } -impl Channel { +impl Channel { #[allow(unused_variables)] fn should_drop(from: usize, to: usize) -> bool { //return false; @@ -78,17 +88,16 @@ impl Channel { rand::thread_rng().gen_bool(1.0 / 5.0) } - fn random_delay(range: Range) -> ::Sleep { + fn random_delay(range: Range) -> >::Sleep { Self::sleep(std::time::Duration::from_millis( thread_rng().gen_range(range), )) } } -impl Transport for Channel { +impl Transport for Channel { type Address = usize; type Sleep = tokio::time::Sleep; - type Message = M; fn address(&self) -> Self::Address { self.address @@ -114,7 +123,7 @@ impl Transport for Channel { let mut persistent = self.persistent.lock().unwrap(); if let Some(value) = value { let string = serde_json::to_string(&value).unwrap(); - let display = if string.len() > 100 { + let display = if string.len() > 200 { let with_bc = bitcode::serialize(&value).unwrap(); format!("<{} bytes ({} with bitcode)>", string.len(), with_bc.len()) } else { @@ -136,10 +145,10 @@ impl Transport for Channel { .and_then(|value| serde_json::from_str(value).ok()) } - fn send + Send + Debug>( + fn send> + Send + Debug>( &self, address: Self::Address, - message: impl Into + Debug, + message: impl Into> + Debug, ) -> impl Future + 'static { let from: usize = self.address; if LOG { @@ -179,7 +188,7 @@ impl Transport for Channel { } } - fn do_send(&self, address: Self::Address, message: impl Into + Debug) { + fn do_send(&self, address: Self::Address, message: impl Into> + Debug) { let from = self.address; let should_drop = Self::should_drop(self.address, address); if LOG { diff --git a/src/transport/mod.rs b/src/transport/mod.rs index e228552..e00aea5 100644 --- a/src/transport/mod.rs +++ b/src/transport/mod.rs @@ -1,19 +1,20 @@ +use crate::{IrMessage, IrReplicaUpcalls}; pub use channel::{Channel, Registry as ChannelRegistry}; pub use message::Message; use serde::{de::DeserializeOwned, Serialize}; use std::{ - fmt::Debug, + fmt::{Debug, Display}, future::Future, + hash::Hash, time::{Duration, SystemTime}, }; mod channel; mod message; -pub trait Transport: Clone + Send + Sync + 'static { - type Address: Copy + Eq + Debug + Send + 'static; +pub trait Transport: Clone + Send + Sync + 'static { + type Address: Copy + Eq + Hash + Debug + Display + Send + Serialize + DeserializeOwned + 'static; type Sleep: Future + Send; - type Message: Message; /// Get own address. fn address(&self) -> Self::Address; @@ -44,12 +45,12 @@ pub trait Transport: Clone + Send + Sync + 'static { fn persisted(&self, key: &str) -> Option; /// Send/retry, ignoring any errors, until there is a reply. - fn send + Send + Debug>( + fn send> + Send + Debug>( &self, address: Self::Address, - message: impl Into + Debug, + message: impl Into> + Debug, ) -> impl Future + Send + 'static; /// Send once and don't wait for a reply. - fn do_send(&self, address: Self::Address, message: impl Into + Debug); + fn do_send(&self, address: Self::Address, message: impl Into> + Debug); }