From d55914d11eb46032fd462e7bf75b79c03adfae78 Mon Sep 17 00:00:00 2001 From: Finn Bear Date: Fri, 16 Jun 2023 16:31:47 -0700 Subject: [PATCH] Finish integrating maelstrom. --- .gitignore | 1 + Makefile | 4 + src/bin/maelstrom.rs | 407 +++++++++++++++++++++++------------- src/ir/record.rs | 24 ++- src/ir/replica.rs | 16 +- src/ir/tests/lock_server.rs | 2 +- src/tapir/client.rs | 2 +- src/tapir/tests/kv.rs | 2 +- src/transport/channel.rs | 11 +- src/transport/mod.rs | 1 - 10 files changed, 307 insertions(+), 163 deletions(-) diff --git a/.gitignore b/.gitignore index 740add9..4cfdb29 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,4 @@ Cargo.lock /target flamegraph.svg +/store diff --git a/Makefile b/Makefile index 04f7865..17c4afd 100644 --- a/Makefile +++ b/Makefile @@ -3,3 +3,7 @@ test: bench: clear && cargo test throughput_3_ser --release -- --nocapture + +maelstrom: + cargo build --release --features maelstrom --bin maelstrom + maelstrom test -w lin-kv --bin target/release/maelstrom --latency 10 --rate 100 --time-limit 15 --concurrency 10 diff --git a/src/bin/maelstrom.rs b/src/bin/maelstrom.rs index 2935e93..ca3ef8f 100644 --- a/src/bin/maelstrom.rs +++ b/src/bin/maelstrom.rs @@ -36,6 +36,7 @@ struct KvNode { inner: Option<(Maelstrom, KvNodeInner)>, } +#[derive(Clone)] enum KvNodeInner { Replica(Arc, Maelstrom>>), App(Arc>), @@ -56,7 +57,8 @@ struct Maelstrom { #[derive(Clone, Serialize, Deserialize, Debug)] struct Wrapper { message: Message, - reply: Option, + is_reply_to: Option, + do_reply_to: Option, } struct Inner { @@ -75,9 +77,9 @@ enum IdEnum { impl Display for IdEnum { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Self::Replica(n) => write!(f, "n{n}"), - Self::App(n) => write!(f, "n{}", n + 3), - Self::Client(n) => write!(f, "c{n}"), + Self::Replica(n) => write!(f, "n{}", n + 1), + Self::App(n) => write!(f, "n{}", n + 3 + 1), + Self::Client(n) => write!(f, "c{}", n + 1), } } } @@ -86,21 +88,20 @@ impl FromStr for IdEnum { type Err = (); fn from_str(s: &str) -> Result { Ok(if let Some(n) = s.strip_prefix("n") { - let n = usize::from_str(n).map_err(|_| ())?; + let n = usize::from_str(n).map_err(|_| ())? - 1; if n < 3 { Self::Replica(n) } else { - Self::Client(n - 3) + Self::App(n - 3) } } else { let n = s.strip_prefix("c").ok_or(())?; - let n = usize::from_str(n).map_err(|_| ())?; + let n = usize::from_str(n).map_err(|_| ())? - 1; Self::Client(n) }) } } -// Msg impl Transport for Maelstrom { type Address = IdEnum; type Message = Message; @@ -133,16 +134,22 @@ impl Transport for Maelstrom { self.inner.requests.lock().unwrap().insert(reply, sender); let message = Wrapper { message: message.into(), - reply: Some(reply), + is_reply_to: None, + do_reply_to: Some(reply), }; + eprintln!("{id} sending {message:?} to {address}"); let inner = Arc::clone(&self.inner); async move { loop { - let _ = inner.net.txq.try_send(Msg { - src: id.to_string(), - dest: address.to_string(), - body: Body::Application(message.clone()), - }); + let _ = inner + .net + .txq + .send(Msg { + src: id.to_string(), + dest: address.to_string(), + body: Body::Application(message.clone()), + }) + .await; let sleep = Self::sleep(Duration::from_secs(1)); @@ -161,13 +168,20 @@ impl Transport for Maelstrom { fn do_send(&self, address: Self::Address, message: impl Into + std::fmt::Debug) { let message = Wrapper { message: message.into(), - reply: None, + do_reply_to: None, + is_reply_to: None, }; - let _ = self.inner.net.txq.try_send(Msg { - src: self.id.to_string(), - dest: address.to_string(), - body: Body::Application(message), - }); + //eprintln!("{} do-sending {message:?} to {address}", self.id); + let _ = self + .inner + .net + .txq + .send_blocking(Msg { + src: self.id.to_string(), + dest: address.to_string(), + body: Body::Application(message), + }) + .unwrap(); } } @@ -204,7 +218,7 @@ impl Process for KvNode { IdEnum::App(_) => { KvNodeInner::App(Arc::new(TapirClient::new(membership, transport))) } - _ => panic!(), + id => panic!("{id}"), }, )); } @@ -212,42 +226,127 @@ impl Process for KvNode { async fn run(&self) -> Status { let (transport, inner) = self.inner.as_ref().unwrap(); loop { + eprintln!("RECEIVING"); match transport.inner.net.rxq.recv().await { Ok(Msg { src, body, .. }) => { - match body { - Body::Application(app) => { - if let Some(reply) = app.reply { - let mut requests = transport.inner.requests.lock().unwrap(); - if let Some(sender) = requests.remove(&reply) { - let _ = sender.send(app.message); - } - } else { - if let KvNodeInner::Replica(replica) = &inner { - replica.receive(src.parse::().unwrap(), app.message); + eprintln!("received {body:?} from {src}"); + let transport = transport.clone(); + let inner = inner.clone(); + tokio::spawn(async move { + match body { + Body::Application(app) => { + if let Some(reply) = app.is_reply_to { + let mut requests = transport.inner.requests.lock().unwrap(); + if let Some(sender) = requests.remove(&reply) { + eprintln!("is reply"); + let _ = sender.send(app.message); + } else { + eprintln!("duplicate reply"); + } } else { - unreachable!(); - }; + if let KvNodeInner::Replica(replica) = &inner { + if let Some(response) = replica + .receive(src.parse::().unwrap(), app.message) + { + let response = Msg { + src: transport.id.to_string(), + dest: src.clone(), + body: Body::Application(Wrapper { + message: response, + do_reply_to: None, + is_reply_to: app.do_reply_to, + }), + }; + eprintln!("sending response {response:?}"); + let _ = transport + .inner + .net + .txq + .send(response) + .await + .unwrap(); + } else { + eprintln!("NO RESPONSE"); + } + } else { + unreachable!(); + }; + } } - } - Body::Workload(work) => { - if let KvNodeInner::App(app) = &inner { - let txn = app.begin(); - match work { - LinKv::Cas { - msg_id, - key, - from, - to, - } => { - let key = serde_json::to_string(&key).unwrap(); - let old = txn - .get(key.clone()) - .await - .map(|s| serde_json::from_str(&s).unwrap()); - if old == Some(from) { - let to = serde_json::to_string(&to).unwrap(); - txn.put(key, Some(to)); + Body::Workload(work) => { + if let KvNodeInner::App(app) = &inner { + let txn = app.begin(); + match work { + LinKv::Cas { + msg_id, + key, + from, + to, + } => { + let key = serde_json::to_string(&key).unwrap(); + let old = txn + .get(key.clone()) + .await + .map(|s| serde_json::from_str(&s).unwrap()); + let swap = old == Some(from); + if swap { + let to = serde_json::to_string(&to).unwrap(); + txn.put(key, Some(to)); + } + if txn.commit().await.is_some() { + if old.is_none() { + let _ = transport + .inner + .net + .txq + .send(Msg { + src: transport.id.to_string(), + dest: src, + body: Body::Error(Error { + in_reply_to: msg_id, + text: String::from( + "cas key not found", + ), + code: 20, + }), + }) + .await; + } else if swap { + let _ = transport + .inner + .net + .txq + .send(Msg { + src: transport.id.to_string(), + dest: src, + body: Body::Workload(LinKv::CasOk { + in_reply_to: msg_id, + msg_id: Some( + transport.next_msg_id(), + ), + }), + }) + .await; + } else { + let _ = transport + .inner + .net + .txq + .send(Msg { + src: transport.id.to_string(), + dest: src, + body: Body::Error(Error { + in_reply_to: msg_id, + text: String::from( + "cas precondition failed", + ), + code: 22, + }), + }) + .await; + } + } else { let _ = transport .inner .net @@ -255,12 +354,55 @@ impl Process for KvNode { .send(Msg { src: transport.id.to_string(), dest: src, - body: Body::Workload(LinKv::CasOk { + body: Body::Error(Error { in_reply_to: msg_id, - msg_id: Some(transport.next_msg_id()), + text: String::from("cas txn conflict"), + code: 30, }), }) .await; + } + } + LinKv::Read { msg_id, key } => { + let key = serde_json::to_string(&key).unwrap(); + let old = txn.get(key.clone()).await.map(|s| { + serde_json::from_str::(&s) + .unwrap() + }); + if txn.commit().await.is_some() { + if let Some(old) = old { + let _ = transport + .inner + .net + .txq + .send(Msg { + src: transport.id.to_string(), + dest: src, + body: Body::Workload(LinKv::ReadOk { + in_reply_to: msg_id, + msg_id: Some( + transport.next_msg_id(), + ), + value: old, + }), + }) + .await; + } else { + let _ = transport + .inner + .net + .txq + .send(Msg { + src: transport.id.to_string(), + dest: src, + body: Body::Error(Error { + in_reply_to: msg_id, + text: String::from("not found"), + code: 20, + }), + }) + .await; + } } else { let _ = transport .inner @@ -271,111 +413,76 @@ impl Process for KvNode { dest: src, body: Body::Error(Error { in_reply_to: msg_id, - text: String::from("CaS fail"), - code: 13, + text: String::from("read txn conflict"), + code: 30, }), }) .await; } - } else { - let _ = transport - .inner - .net - .txq - .send(Msg { - src: transport.id.to_string(), - dest: src, - body: Body::Error(Error { - in_reply_to: msg_id, - text: String::from("CaS fail"), - code: 13, - }), - }) - .await; - } - } - LinKv::Read { msg_id, key } => { - let key = serde_json::to_string(&key).unwrap(); - let old = txn.get(key.clone()).await.map(|s| { - serde_json::from_str::(&s).unwrap() - }); - if let Some(old) = old && txn.commit().await.is_some() { - let _ = transport - .inner - .net - .txq - .send(Msg { - src: transport.id.to_string(), - dest: src, - body: Body::Workload(LinKv::ReadOk { - in_reply_to: msg_id, - msg_id: Some(transport.next_msg_id()), - value: old - }), - }) - .await; - } else { - let _ = transport - .inner - .net - .txq - .send(Msg { - src: transport.id.to_string(), - dest: src, - body: Body::Error(Error { - in_reply_to: msg_id, - text: String::from("read fail"), - code: 13, - }), - }) - .await; } - } - LinKv::Write { msg_id, key, value } => { - let key = serde_json::to_string(&key).unwrap(); - let value = serde_json::to_string(&value).unwrap(); - txn.put(key, Some(value)); - if txn.commit().await.is_some() { - let _ = transport - .inner - .net - .txq - .send(Msg { - src: transport.id.to_string(), - dest: src, - body: Body::Workload(LinKv::CasOk { - in_reply_to: msg_id, - msg_id: Some(transport.next_msg_id()), - }), - }) - .await; - } else { - let _ = transport - .inner - .net - .txq - .send(Msg { - src: transport.id.to_string(), - dest: src, - body: Body::Error(Error { - in_reply_to: msg_id, - text: String::from("write fail"), - code: 13, - }), - }) - .await; + LinKv::Write { msg_id, key, value } => { + let key = serde_json::to_string(&key).unwrap(); + let value = serde_json::to_string(&value).unwrap(); + txn.put(key, Some(value)); + if txn.commit().await.is_some() { + let _ = transport + .inner + .net + .txq + .send(Msg { + src: transport.id.to_string(), + dest: src, + body: Body::Workload(LinKv::WriteOk { + in_reply_to: msg_id, + }), + }) + .await; + } else { + let _ = transport + .inner + .net + .txq + .send(Msg { + src: transport.id.to_string(), + dest: src, + body: Body::Error(Error { + in_reply_to: msg_id, + text: String::from("read txn conflict"), + code: 30, + }), + }) + .await; + } } + _ => unreachable!(), } - _ => unreachable!(), - } - } else { - // Ignore. - }; + } else { + // Proxy... + eprintln!("Proxying..."); + let _ = transport + .inner + .net + .txq + .send(Msg { + src: src.clone(), + dest: IdEnum::App({ + let n = thread_rng().gen::(); + n % 2 + }) + .to_string(), + body: Body::Workload(work), + }) + .await; + }; + } + body => unreachable!("{body:?}"), } - body => unreachable!("{body:?}"), - } + }); } - Err(_) => return Ok(()), // Runtime is shutting down. + Err(_) => { + eprintln!("shutting down recv"); + return Ok(()); + } // Runtime is shutting down. }; } } diff --git a/src/ir/record.rs b/src/ir/record.rs index 63682fc..24e02b2 100644 --- a/src/ir/record.rs +++ b/src/ir/record.rs @@ -1,6 +1,5 @@ -use serde::{Deserialize, Serialize}; - use super::OpId; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::{collections::HashMap, fmt::Debug}; #[derive(Copy, Clone, Serialize, Deserialize)] @@ -61,7 +60,7 @@ pub struct Entry { pub state: State, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone)] pub struct Record { pub entries: HashMap>, } @@ -73,3 +72,22 @@ impl Default for Record { } } } + +impl Serialize for Record { + fn serialize<'a, S>(&self, ser: S) -> Result + where + S: Serializer, + { + let container: Vec<_> = self.entries.iter().collect(); + serde::Serialize::serialize(&container, ser) + } +} + +impl<'de, O: Deserialize<'de>, R: Deserialize<'de>> Deserialize<'de> for Record { + fn deserialize>(des: D) -> Result { + let container: Vec<(OpId, Entry)> = serde::Deserialize::deserialize(des)?; + Ok(Self { + entries: HashMap::from_iter(container.into_iter()), + }) + } +} diff --git a/src/ir/replica.rs b/src/ir/replica.rs index 32cf7bb..e80a4e6 100644 --- a/src/ir/replica.rs +++ b/src/ir/replica.rs @@ -182,7 +182,7 @@ impl>> Replica>> Replica>> Replica= threshold { - println!("DOING VIEW CHANGE"); + eprintln!("DOING VIEW CHANGE"); { let latest_normal_view = sync.latest_normal_view.max( matching @@ -363,7 +369,7 @@ impl>> Replica>> Replica sync.view.number || (view_number == sync.view.number || !sync.status.is_normal()) { - println!("{:?} starting view {view_number:?}", self.index); + eprintln!("{:?} starting view {view_number:?}", self.index); sync.upcalls.sync(&sync.record, &new_record); sync.record = new_record; sync.status = Status::Normal; @@ -502,7 +508,7 @@ impl>> Replica { - println!("unexpected message"); + eprintln!("unexpected message"); } } None diff --git a/src/ir/tests/lock_server.rs b/src/ir/tests/lock_server.rs index e3ea3bc..046782c 100644 --- a/src/ir/tests/lock_server.rs +++ b/src/ir/tests/lock_server.rs @@ -205,7 +205,7 @@ async fn lock_server(num_replicas: usize) { for i in 0..5 { ChannelTransport::::sleep(Duration::from_secs(8)).await; - println!("@@@@@ INVOKE {replicas:?}"); + eprintln!("@@@@@ INVOKE {replicas:?}"); if clients[1] .invoke_consensus(Op::Lock(clients[1].id()), &decide_lock) .await diff --git a/src/tapir/client.rs b/src/tapir/client.rs index 9048ae5..093bfaf 100644 --- a/src/tapir/client.rs +++ b/src/tapir/client.rs @@ -89,7 +89,7 @@ impl< inner.end(timestamp, ok).await; if ok && remaining_tries != 3 { - println!("Retry actually worked!"); + eprintln!("Retry actually worked!"); } return Some(timestamp).filter(|_| ok); diff --git a/src/tapir/tests/kv.rs b/src/tapir/tests/kv.rs index b72a4b8..7146523 100644 --- a/src/tapir/tests/kv.rs +++ b/src/tapir/tests/kv.rs @@ -156,7 +156,7 @@ async fn increment_sequential(num_replicas: usize) { Transport::sleep(Duration::from_millis(1000)).await; } - println!("committed = {committed}"); + eprintln!("committed = {committed}"); assert!(committed > 0); } diff --git a/src/transport/channel.rs b/src/transport/channel.rs index 3102300..196dad8 100644 --- a/src/transport/channel.rs +++ b/src/transport/channel.rs @@ -92,6 +92,15 @@ impl Transport for Channel { self.address } + fn time(&self) -> u64 { + use rand::Rng; + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_nanos() as u64 + + rand::thread_rng().gen_range(0..10 * 1000 * 1000) + } + fn sleep(duration: Duration) -> Self::Sleep { tokio::time::sleep(duration / 10) } @@ -147,7 +156,7 @@ impl Transport for Channel { } } } else { - println!("unknown address {address:?}"); + eprintln!("unknown address {address:?}"); } } } diff --git a/src/transport/mod.rs b/src/transport/mod.rs index 8f3a501..4d6fc74 100644 --- a/src/transport/mod.rs +++ b/src/transport/mod.rs @@ -25,7 +25,6 @@ pub trait Transport: Clone + Send + Sync + 'static { .duration_since(SystemTime::UNIX_EPOCH) .unwrap() .as_nanos() as u64 - + rand::thread_rng().gen_range(0..10 * 1000 * 1000) } /// Sleep for duration.