Skip to content

Commit

Permalink
Finish integrating maelstrom.
Browse files Browse the repository at this point in the history
  • Loading branch information
finnbear committed Jun 16, 2023
1 parent 857f2c6 commit d55914d
Show file tree
Hide file tree
Showing 10 changed files with 307 additions and 163 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ Cargo.lock
/target

flamegraph.svg
/store
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,7 @@ test:

bench:
clear && cargo test throughput_3_ser --release -- --nocapture

maelstrom:
cargo build --release --features maelstrom --bin maelstrom
maelstrom test -w lin-kv --bin target/release/maelstrom --latency 10 --rate 100 --time-limit 15 --concurrency 10
407 changes: 257 additions & 150 deletions src/bin/maelstrom.rs

Large diffs are not rendered by default.

24 changes: 21 additions & 3 deletions src/ir/record.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use serde::{Deserialize, Serialize};

use super::OpId;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::{collections::HashMap, fmt::Debug};

#[derive(Copy, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -61,7 +60,7 @@ pub struct Entry<O, R> {
pub state: State,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone)]
pub struct Record<O, R> {
pub entries: HashMap<OpId, Entry<O, R>>,
}
Expand All @@ -73,3 +72,22 @@ impl<O, R> Default for Record<O, R> {
}
}
}

impl<O: Serialize, R: Serialize> Serialize for Record<O, R> {
fn serialize<'a, S>(&self, ser: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let container: Vec<_> = self.entries.iter().collect();
serde::Serialize::serialize(&container, ser)
}
}

impl<'de, O: Deserialize<'de>, R: Deserialize<'de>> Deserialize<'de> for Record<O, R> {
fn deserialize<D: Deserializer<'de>>(des: D) -> Result<Self, D::Error> {
let container: Vec<(OpId, Entry<O, R>)> = serde::Deserialize::deserialize(des)?;
Ok(Self {
entries: HashMap::from_iter(container.into_iter()),
})
}
}
16 changes: 11 additions & 5 deletions src/ir/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ impl<U: Upcalls, T: Transport<Message = Message<U::Op, U::Result>>> Replica<U, T
}
sync.view.number.0 += 1;

println!(
eprintln!(
"{my_index:?} timeout sending do view change {}",
sync.view.number.0
);
Expand Down Expand Up @@ -319,6 +319,12 @@ impl<U: Upcalls, T: Transport<Message = Message<U::Op, U::Result>>> Replica<U, T
);
}

eprintln!(
"index = {:?} , leader index = {:?}",
self.index,
sync.view.leader_index()
);

if self.index == sync.view.leader_index() && let Some(addendum) = msg.addendum.as_ref() {
let msg_view_number = msg.view_number;
match sync.outstanding_do_view_changes.entry(addendum.replica_index) {
Expand All @@ -341,7 +347,7 @@ impl<U: Upcalls, T: Transport<Message = Message<U::Op, U::Result>>> Replica<U, T
.filter(|other| other.view_number == do_view_change.view_number);

if matching.clone().count() >= threshold {
println!("DOING VIEW CHANGE");
eprintln!("DOING VIEW CHANGE");
{
let latest_normal_view = sync.latest_normal_view.max(
matching
Expand All @@ -363,7 +369,7 @@ impl<U: Upcalls, T: Transport<Message = Message<U::Op, U::Result>>> Replica<U, T
if sync.latest_normal_view == latest_normal_view {
latest_records.push(sync.record.clone());
}
println!("have {} latest", latest_records.len());
eprintln!("have {} latest", latest_records.len());

#[allow(non_snake_case)]
let mut R = Record::default();
Expand Down Expand Up @@ -492,7 +498,7 @@ impl<U: Upcalls, T: Transport<Message = Message<U::Op, U::Result>>> Replica<U, T
if view_number > sync.view.number
|| (view_number == sync.view.number || !sync.status.is_normal())
{
println!("{:?} starting view {view_number:?}", self.index);
eprintln!("{:?} starting view {view_number:?}", self.index);
sync.upcalls.sync(&sync.record, &new_record);
sync.record = new_record;
sync.status = Status::Normal;
Expand All @@ -502,7 +508,7 @@ impl<U: Upcalls, T: Transport<Message = Message<U::Op, U::Result>>> Replica<U, T
}
}
_ => {
println!("unexpected message");
eprintln!("unexpected message");
}
}
None
Expand Down
2 changes: 1 addition & 1 deletion src/ir/tests/lock_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ async fn lock_server(num_replicas: usize) {
for i in 0..5 {
ChannelTransport::<Message>::sleep(Duration::from_secs(8)).await;

println!("@@@@@ INVOKE {replicas:?}");
eprintln!("@@@@@ INVOKE {replicas:?}");
if clients[1]
.invoke_consensus(Op::Lock(clients[1].id()), &decide_lock)
.await
Expand Down
2 changes: 1 addition & 1 deletion src/tapir/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl<
inner.end(timestamp, ok).await;

if ok && remaining_tries != 3 {
println!("Retry actually worked!");
eprintln!("Retry actually worked!");
}

return Some(timestamp).filter(|_| ok);
Expand Down
2 changes: 1 addition & 1 deletion src/tapir/tests/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ async fn increment_sequential(num_replicas: usize) {
Transport::sleep(Duration::from_millis(1000)).await;
}

println!("committed = {committed}");
eprintln!("committed = {committed}");
assert!(committed > 0);
}

Expand Down
11 changes: 10 additions & 1 deletion src/transport/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ impl<M: Message> Transport for Channel<M> {
self.address
}

fn time(&self) -> u64 {
use rand::Rng;
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_nanos() as u64
+ rand::thread_rng().gen_range(0..10 * 1000 * 1000)
}

fn sleep(duration: Duration) -> Self::Sleep {
tokio::time::sleep(duration / 10)
}
Expand Down Expand Up @@ -147,7 +156,7 @@ impl<M: Message> Transport for Channel<M> {
}
}
} else {
println!("unknown address {address:?}");
eprintln!("unknown address {address:?}");
}
}
}
Expand Down
1 change: 0 additions & 1 deletion src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ pub trait Transport: Clone + Send + Sync + 'static {
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_nanos() as u64
+ rand::thread_rng().gen_range(0..10 * 1000 * 1000)
}

/// Sleep for duration.
Expand Down

0 comments on commit d55914d

Please sign in to comment.