Skip to content

Commit

Permalink
Add sharding (#4)
Browse files Browse the repository at this point in the history
* Very broken sharding (WIP).

* Less broken sharding (WIP).

* Client can make sense of multi-shard prepare results.

* Revise README.

* Sharded coordinator recovery.

* Simple sharding test.
  • Loading branch information
finnbear authored Jul 13, 2023
1 parent 5ec11c8 commit c6c3865
Show file tree
Hide file tree
Showing 16 changed files with 661 additions and 308 deletions.
17 changes: 12 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ Rust implementation of [TAPIR](https://syslab.cs.washington.edu/papers/tapir-tr-
- [x] IR sync & merge
- [x] Prepare retries
- [x] Coordinator recovery
- [ ] Sharding
- [x] Sharding
- [ ] Persistent storage (e.g. `sled`)
- [ ] Pessimistic read only transactions
- [ ] Snapshot read
- [ ] Planned extensions
- [x] Delete key operation
- [ ] Garbage collection
- [ ] Quorum range scan
- [ ] Range scan
- [ ] Automatic shard balancing
- [ ] Disaster recovery
- [ ] Testing
- [x] IR lock server (very simple)
- [x] TAPIR-KV (simple)
Expand All @@ -39,6 +40,12 @@ Rust implementation of [TAPIR](https://syslab.cs.washington.edu/papers/tapir-tr-
- [ ] Reduce allocations
- [ ] Reduce temporary unavailability

## Acknowledgement
## Acknowledgements

Thank you to the TAPIR authors for answering my questions about the paper!
Thanks to [James Wilcox](https://jamesrwilcox.com) for assigning TAPIR as a reading.

Thanks to [the TAPIR authors](https://github.com/UWSysLab/tapir#contact-and-questions) for answering questions about
the paper!

Thanks to [Kyle](https://aphyr.com) at [Jepsen](https://jepsen.io) for clarifying the relative
strength of isolation levels.
25 changes: 20 additions & 5 deletions src/bin/maelstrom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tapirs::{IrMembership, IrMessage, IrReplica, TapirClient, TapirReplica, Transport};
use tapirs::{
IrMembership, IrMessage, IrReplica, TapirClient, TapirReplica, TapirTransport, Transport,
};
use tokio::spawn;

type K = String;
Expand Down Expand Up @@ -193,6 +195,20 @@ impl Transport<TapirReplica<K, V>> for Maelstrom {
}
}

impl TapirTransport<K, V> for Maelstrom {
fn shard_addresses(
&self,
shard: tapirs::ShardNumber,
) -> impl futures::Future<Output = IrMembership<Self::Address>> + Send + 'static {
assert_eq!(shard.0, 0);
std::future::ready(IrMembership::new(vec![
IdEnum::Replica(0),
IdEnum::Replica(1),
IdEnum::Replica(2),
]))
}
}

#[async_trait]
impl Process<LinKv, Wrapper> for KvNode {
fn init(
Expand All @@ -219,12 +235,11 @@ impl Process<LinKv, Wrapper> for KvNode {
match id {
IdEnum::Replica(_) => KvNodeInner::Replica(Arc::new(IrReplica::new(
membership,
TapirReplica::new(true),
TapirReplica::new(tapirs::ShardNumber(0), true),
transport,
Some(TapirReplica::tick),
))),
IdEnum::App(_) => {
KvNodeInner::App(Arc::new(TapirClient::new(membership, transport)))
}
IdEnum::App(_) => KvNodeInner::App(Arc::new(TapirClient::new(transport))),
id => panic!("{id}"),
},
));
Expand Down
6 changes: 5 additions & 1 deletion src/ir/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use tokio::select;
pub struct Id(pub u64);

impl Id {
fn new() -> Self {
pub fn new() -> Self {
Self(thread_rng().gen())
}
}
Expand Down Expand Up @@ -95,6 +95,10 @@ impl<U: ReplicaUpcalls, T: Transport<U>> Client<U, T> {
self.id
}

pub fn set_id(&mut self, id: Id) {
self.id = id;
}

pub fn transport(&self) -> &T {
&self.inner.transport
}
Expand Down
33 changes: 20 additions & 13 deletions src/ir/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,6 @@ pub trait Upcalls: Sized + Send + Serialize + DeserializeOwned + 'static {
d: HashMap<OpId, (Self::CO, Self::CR)>,
u: Vec<(OpId, Self::CO, Self::CR)>,
) -> HashMap<OpId, Self::CR>;
fn tick<T: Transport<Self>>(&mut self, membership: &Membership<T::Address>, transport: &T) {
let _ = (membership, transport);
// No-op.
}
}

pub struct Replica<U: Upcalls, T: Transport<U>> {
Expand All @@ -89,10 +85,11 @@ impl<U: Upcalls, T: Transport<U>> Debug for Replica<U, T> {

struct Inner<U: Upcalls, T: Transport<U>> {
transport: T,
sync: Mutex<Sync<U, T>>,
app_tick: Option<fn(&U, &T, &Membership<T::Address>)>,
sync: Mutex<SyncInner<U, T>>,
}

struct Sync<U: Upcalls, T: Transport<U>> {
struct SyncInner<U: Upcalls, T: Transport<U>> {
status: Status,
view: View<T::Address>,
latest_normal_view: View<T::Address>,
Expand All @@ -113,15 +110,21 @@ struct PersistentViewInfo<A> {
impl<U: Upcalls, T: Transport<U>> Replica<U, T> {
const VIEW_CHANGE_INTERVAL: Duration = Duration::from_secs(4);

pub fn new(membership: Membership<T::Address>, upcalls: U, transport: T) -> Self {
pub fn new(
membership: Membership<T::Address>,
upcalls: U,
transport: T,
app_tick: Option<fn(&U, &T, &Membership<T::Address>)>,
) -> Self {
let view = View {
membership,
number: ViewNumber(0),
};
let ret = Self {
inner: Arc::new(Inner {
transport,
sync: Mutex::new(Sync {
app_tick,
sync: Mutex::new(SyncInner {
status: Status::Normal,
latest_normal_view: view.clone(),
view,
Expand Down Expand Up @@ -173,7 +176,7 @@ impl<U: Upcalls, T: Transport<U>> Replica<U, T> {
format!("ir_replica_{}", self.inner.transport.address())
}

fn persist_view_info(&self, sync: &Sync<U, T>) {
fn persist_view_info(&self, sync: &SyncInner<U, T>) {
if sync.view.membership.len() == 1 {
return;
}
Expand Down Expand Up @@ -244,12 +247,16 @@ impl<U: Upcalls, T: Transport<U>> Replica<U, T> {
};
let mut sync = inner.sync.lock().unwrap();
let sync = &mut *sync;
sync.upcalls.tick(&sync.view.membership, &transport);
if let Some(tick) = inner.app_tick.as_ref() {
tick(&sync.upcalls, &transport, &sync.view.membership);
} else {
break;
}
}
});
}

fn broadcast_do_view_change(transport: &T, sync: &mut Sync<U, T>) {
fn broadcast_do_view_change(transport: &T, sync: &mut SyncInner<U, T>) {
sync.changed_view_recently = true;
let destinations = sync
.view
Expand Down Expand Up @@ -461,7 +468,7 @@ impl<U: Upcalls, T: Transport<U>> Replica<U, T> {
})
.map(|(_, r)| r.addendum.as_ref().unwrap().record.clone())
.collect::<Vec<_>>();

eprintln!(
"have {} latest ({:?})",
latest_records.len(),
Expand Down Expand Up @@ -608,7 +615,7 @@ impl<U: Upcalls, T: Transport<U>> Replica<U, T> {
sync.latest_normal_view.number = msg_view_number;
sync.latest_normal_view.membership = sync.view.membership.clone();
self.persist_view_info(&*sync);

for address in destinations {
if address == self.inner.transport.address() {
continue;
Expand Down
2 changes: 1 addition & 1 deletion src/ir/tests/lock_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ async fn lock_server(num_replicas: usize) {
let channel =
registry.channel(move |from, message| weak.upgrade()?.receive(from, message));
let upcalls = Upcalls { locked: None };
IrReplica::new(membership.clone(), upcalls, channel)
IrReplica::new(membership.clone(), upcalls, channel, None)
},
)
}
Expand Down
10 changes: 7 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ pub use occ::{
PrepareResult as OccPrepareResult, Store as OccStore, Timestamp as OccTimestamp,
Transaction as OccTransaction, TransactionId as OccTransactionId,
};
pub use tapir::{Client as TapirClient, Replica as TapirReplica, Timestamp as TapirTimestamp};
pub use transport::{Channel as ChannelTransport, ChannelRegistry};
pub use transport::{Message as TransportMessage, Transport};
pub use tapir::{
Client as TapirClient, Replica as TapirReplica, ShardNumber, Timestamp as TapirTimestamp,
};
pub use transport::{
Channel as ChannelTransport, ChannelRegistry, Message as TransportMessage, TapirTransport,
Transport,
};
54 changes: 36 additions & 18 deletions src/occ/store.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::{Timestamp, Transaction, TransactionId};
use crate::{
tapir::{Key, Value},
tapir::{Key, ShardNumber, Value},
util::{vectorize, vectorize_btree},
MvccStore,
};
Expand All @@ -15,6 +15,7 @@ use std::{

#[derive(Serialize, Deserialize)]
pub struct Store<K, V, TS> {
shard: ShardNumber,
linearizable: bool,
#[serde(bound(
serialize = "K: Serialize, V: Serialize, TS: Serialize",
Expand Down Expand Up @@ -99,11 +100,28 @@ impl<TS: Timestamp> PrepareResult<TS> {
pub fn is_fail(&self) -> bool {
matches!(self, Self::Fail)
}

pub fn is_abstain(&self) -> bool {
matches!(self, Self::Abstain)
}

pub fn is_retry(&self) -> bool {
matches!(self, Self::Retry { .. })
}

pub fn is_too_late(&self) -> bool {
matches!(self, Self::TooLate)
}

pub fn is_too_old(&self) -> bool {
matches!(self, Self::TooOld)
}
}

impl<K: Key, V: Value, TS> Store<K, V, TS> {
pub fn new(linearizable: bool) -> Self {
pub fn new(shard: ShardNumber, linearizable: bool) -> Self {
Self {
shard,
linearizable,
inner: Default::default(),
prepared: Default::default(),
Expand Down Expand Up @@ -174,18 +192,18 @@ impl<K: Key, V: Value, TS: Timestamp> Store<K, V, TS> {

fn occ_check(&self, transaction: &Transaction<K, V, TS>, commit: TS) -> PrepareResult<TS> {
// Check for conflicts with the read set.
for (key, read) in &transaction.read_set {
if *read > commit {
for (key, read) in transaction.shard_read_set(self.shard) {
if read > commit {
debug_assert!(false, "client picked too low commit timestamp for read");
return PrepareResult::Retry {
proposed: read.time(),
};
}

// If we don't have this key then no conflicts for read.
let (beginning, end) = self.inner.get_range(key, *read);
let (beginning, end) = self.inner.get_range(key, read);

if beginning == *read {
if beginning == read {
if let Some(end) = end && (self.linearizable || commit > end) {
// Read value is now invalid (not the latest version), so
// the prepare isn't linearizable and may not be serializable.
Expand All @@ -204,7 +222,7 @@ impl<K: Key, V: Value, TS: Timestamp> Store<K, V, TS> {
if self.linearizable {
Bound::Unbounded
} else {
Bound::Excluded(*read)
Bound::Excluded(read)
},
Bound::Excluded(commit),
))
Expand All @@ -218,7 +236,7 @@ impl<K: Key, V: Value, TS: Timestamp> Store<K, V, TS> {
}

// Check for conflicts with the write set.
for key in transaction.write_set.keys() {
for (key, _) in transaction.shard_write_set(self.shard) {
{
let (_, timestamp) = self.inner.get(key);
// If the last commited write is after the write...
Expand Down Expand Up @@ -262,13 +280,13 @@ impl<K: Key, V: Value, TS: Timestamp> Store<K, V, TS> {
PrepareResult::Ok
}

pub fn commit(&mut self, id: TransactionId, transaction: Transaction<K, V, TS>, commit: TS) {
for (key, read) in transaction.read_set {
pub fn commit(&mut self, id: TransactionId, transaction: &Transaction<K, V, TS>, commit: TS) {
for (key, read) in transaction.shard_read_set(self.shard) {
self.inner.commit_get(key.clone(), read, commit);
}

for (key, value) in transaction.write_set {
self.inner.put(key, value, commit);
for (key, value) in transaction.shard_write_set(self.shard) {
self.inner.put(key.clone(), value.clone(), commit);
}

// Note: Transaction may not be in the prepared list of this particular replica, and that's okay.
Expand Down Expand Up @@ -307,13 +325,13 @@ impl<K: Key, V: Value, TS: Timestamp> Store<K, V, TS> {
}

fn add_prepared_inner(&mut self, transaction: Transaction<K, V, TS>, commit: TS) {
for key in transaction.read_set.keys() {
for (key, _) in transaction.shard_read_set(self.shard) {
self.prepared_reads
.entry(key.clone())
.or_default()
.insert(commit, ());
}
for key in transaction.write_set.keys() {
for (key, _) in transaction.shard_write_set(self.shard) {
self.prepared_writes
.entry(key.clone())
.or_default()
Expand All @@ -332,16 +350,16 @@ impl<K: Key, V: Value, TS: Timestamp> Store<K, V, TS> {
}

fn remove_prepared_inner(&mut self, transaction: Transaction<K, V, TS>, commit: TS) {
for key in transaction.read_set.into_keys() {
if let Entry::Occupied(mut occupied) = self.prepared_reads.entry(key) {
for (key, _) in transaction.shard_read_set(self.shard) {
if let Entry::Occupied(mut occupied) = self.prepared_reads.entry(key.clone()) {
occupied.get_mut().remove(&commit);
if occupied.get().is_empty() {
occupied.remove();
}
}
}
for key in transaction.write_set.into_keys() {
if let Entry::Occupied(mut occupied) = self.prepared_writes.entry(key) {
for (key, _) in transaction.shard_write_set(self.shard) {
if let Entry::Occupied(mut occupied) = self.prepared_writes.entry(key.clone()) {
occupied.get_mut().remove(&commit);
if occupied.get().is_empty() {
occupied.remove();
Expand Down
Loading

0 comments on commit c6c3865

Please sign in to comment.