Skip to content

Commit

Permalink
Add reconfiguration (#3)
Browse files Browse the repository at this point in the history
* Refactor transport.

* Reconfiguration.

* Add timeouts to tests.
  • Loading branch information
finnbear authored Jul 10, 2023
1 parent 7dcd8c1 commit 5ec11c8
Show file tree
Hide file tree
Showing 19 changed files with 657 additions and 467 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
test:
clear && cargo test --release -- # --nocapture

lock_server_stress_test:
clear && cargo test --release -- lock_server_loop --nocapture --include-ignored

coordinator_failure_stress_test_3:
clear && cargo test --release -- coordinator_recovery_3_loop --nocapture --include-ignored

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Rust implementation of [TAPIR](https://syslab.cs.washington.edu/papers/tapir-tr-
- [x] Consensus
- [x] View changes
- [x] Recovery
- [ ] ~~Epoch changes~~
- [x] Membership change
- [ ] Real network transport
- [ ] TAPIR
- [x] Get
Expand Down
24 changes: 12 additions & 12 deletions src/bin/maelstrom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@ use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tapirs::{
IrMembership, IrMessage, IrReplica, IrReplicaIndex, TapirClient, TapirReplica, Transport,
};
use tapirs::{IrMembership, IrMessage, IrReplica, TapirClient, TapirReplica, Transport};
use tokio::spawn;

type K = String;
type V = String;
type Message = IrMessage<TapirReplica<K, V>>;
type Message = IrMessage<TapirReplica<K, V>, Maelstrom>;

#[derive(Default)]
struct KvNode {
Expand Down Expand Up @@ -64,7 +62,7 @@ struct Inner {
net: ProcNet<LinKv, Wrapper>,
}

#[derive(Copy, Clone, Serialize, Deserialize, Debug, Eq, PartialEq)]
#[derive(Copy, Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)]
enum IdEnum {
Replica(usize),
App(usize),
Expand Down Expand Up @@ -99,9 +97,8 @@ impl FromStr for IdEnum {
}
}

impl Transport for Maelstrom {
impl Transport<TapirReplica<K, V>> for Maelstrom {
type Address = IdEnum;
type Message = Message;
type Sleep = tokio::time::Sleep;

fn address(&self) -> Self::Address {
Expand Down Expand Up @@ -129,10 +126,10 @@ impl Transport for Maelstrom {
tokio::time::sleep(duration)
}

fn send<R: TryFrom<Self::Message> + Send + std::fmt::Debug>(
fn send<R: TryFrom<IrMessage<TapirReplica<K, V>, Self>> + Send + std::fmt::Debug>(
&self,
address: Self::Address,
message: impl Into<Self::Message> + std::fmt::Debug,
message: impl Into<IrMessage<TapirReplica<K, V>, Self>> + std::fmt::Debug,
) -> impl futures::Future<Output = R> + Send + 'static {
let id = self.id;
let (sender, mut receiver) = tokio::sync::oneshot::channel();
Expand Down Expand Up @@ -171,7 +168,11 @@ impl Transport for Maelstrom {
}
}

fn do_send(&self, address: Self::Address, message: impl Into<Self::Message> + std::fmt::Debug) {
fn do_send(
&self,
address: Self::Address,
message: impl Into<IrMessage<TapirReplica<K, V>, Self>> + std::fmt::Debug,
) {
let message = Wrapper {
message: message.into(),
do_reply_to: None,
Expand Down Expand Up @@ -216,8 +217,7 @@ impl Process<LinKv, Wrapper> for KvNode {
self.inner = Some((
transport.clone(),
match id {
IdEnum::Replica(index) => KvNodeInner::Replica(Arc::new(IrReplica::new(
IrReplicaIndex(index),
IdEnum::Replica(_) => KvNodeInner::Replica(Arc::new(IrReplica::new(
membership,
TapirReplica::new(true),
transport,
Expand Down
Loading

0 comments on commit 5ec11c8

Please sign in to comment.