Skip to content

datastore: apply schema changes immediately to committed state. #2685

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/bench/benches/special.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ fn serialize_benchmarks<
Arc::new(table_schema),
spacetimedb_table::indexes::SquashedOffset::COMMITTED_STATE,
);
let pool = PagePool::default();
let pool = PagePool::new_for_test();
let mut blob_store = spacetimedb_table::blob_store::HashMapBlobStore::default();

let ptrs = data_pv
Expand Down
191 changes: 106 additions & 85 deletions crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,10 @@ use super::{
datastore::Result,
delete_table::DeleteTable,
sequence::{Sequence, SequencesState},
state_view::{IterByColRangeTx, StateView},
tx_state::{IndexIdMap, RemovedIndexIdSet, TxState},
state_view::{IterByColRangeTx, IterTx, ScanIterByColRangeTx, StateView},
tx_state::{IndexIdMap, PendingSchemaChange, TxState},
IterByColEqTx,
};
use crate::{
db::datastore::locking_tx_datastore::state_view::{IterTx, ScanIterByColRangeTx},
error::IndexError,
};
use crate::{
db::{
datastore::{
Expand All @@ -25,7 +21,7 @@ use crate::{
},
db_metrics::DB_METRICS,
},
error::TableError,
error::{IndexError, TableError},
execution_context::ExecutionContext,
};
use anyhow::anyhow;
Expand Down Expand Up @@ -116,14 +112,14 @@ impl StateView for CommittedState {
cols: ColList,
range: R,
) -> Result<Self::IterByColRange<'_, R>> {
// TODO: Why does this unconditionally return a `Scan` iter,
// instead of trying to return a `CommittedIndex` iter?
// Answer: Because CommittedIndexIter::tx_state: Option<&'a TxState> need to be Some to read after reopen
Ok(IterByColRangeTx::Scan(ScanIterByColRangeTx::new(
self.iter(table_id)?,
cols,
range,
)))
match self.index_seek(table_id, &cols, &range) {
Some(iter) => Ok(IterByColRangeTx::Index(iter)),
None => Ok(IterByColRangeTx::Scan(ScanIterByColRangeTx::new(
self.iter(table_id)?,
cols,
range,
))),
}
}

fn iter_by_col_eq<'a, 'r>(
Expand Down Expand Up @@ -377,7 +373,7 @@ impl CommittedState {
seq.value = seq.allocated() + 1;
}

sequence_state.insert(seq.id(), seq);
sequence_state.insert(seq);
}
Ok(())
}
Expand All @@ -403,7 +399,7 @@ impl CommittedState {
for index_row in rows {
let index_id = index_row.index_id;
let table_id = index_row.table_id;
let Some((table, blob_store)) = self.get_table_and_blob_store(table_id) else {
let (Some(table), blob_store, index_id_map) = self.get_table_and_blob_store_mut(table_id) else {
panic!("Cannot create index for table which doesn't exist in committed state");
};
let algo: IndexAlgorithm = index_row.index_algorithm.into();
Expand All @@ -413,7 +409,7 @@ impl CommittedState {
let index = table.new_index(&algo, is_unique)?;
// SAFETY: `index` was derived from `table`.
unsafe { table.insert_index(blob_store, index_id, index) };
self.index_id_map.insert(index_id, table_id);
index_id_map.insert(index_id, table_id);
}
Ok(())
}
Expand Down Expand Up @@ -538,9 +534,6 @@ impl CommittedState {
pub(super) fn merge(&mut self, tx_state: TxState, ctx: &ExecutionContext) -> TxData {
let mut tx_data = TxData::default();

// First, merge index id fast-lookup map changes and delete indices.
self.merge_index_map(tx_state.index_id_map, tx_state.index_id_map_removals.as_deref());

// First, apply deletes. This will free up space in the committed tables.
self.merge_apply_deletes(&mut tx_data, tx_state.delete_tables);

Expand All @@ -560,7 +553,7 @@ impl CommittedState {

fn merge_apply_deletes(&mut self, tx_data: &mut TxData, delete_tables: BTreeMap<TableId, DeleteTable>) {
for (table_id, row_ptrs) in delete_tables {
if let Some((table, blob_store)) = self.get_table_and_blob_store(table_id) {
if let (Some(table), blob_store, _) = self.get_table_and_blob_store_mut(table_id) {
let mut deletes = Vec::with_capacity(row_ptrs.len());

// Note: we maintain the invariant that the delete_tables
Expand Down Expand Up @@ -622,19 +615,7 @@ impl CommittedState {
tx_data.set_inserts_for_table(table_id, table_name, inserts.into());
}

let (schema, indexes, pages) = tx_table.consume_for_merge();

// Add all newly created indexes to the committed state.
for (index_id, mut index) in indexes {
if !commit_table.indexes.contains_key(&index_id) {
index.clear();
// SAFETY: `tx_table` is derived from `commit_table`,
// so they have the same row type.
// This entails that all indices in `tx_table`
// were constructed with the same row type/layout as `commit_table`.
unsafe { commit_table.insert_index(commit_blob_store, index_id, index) };
}
}
let (schema, _indexes, pages) = tx_table.consume_for_merge();

// The schema may have been modified in the transaction.
// Update this last to placate borrowck and avoid a clone.
Expand All @@ -646,43 +627,104 @@ impl CommittedState {
}
}

fn merge_index_map(&mut self, index_id_map: IndexIdMap, index_id_map_removals: Option<&RemovedIndexIdSet>) {
// Remove indices that tx-state removed.
// It's not necessarily the case that the index already existed in the committed state.
for (index_id, table_id) in index_id_map_removals
.into_iter()
.flatten()
.filter_map(|index_id| self.index_id_map.remove(index_id).map(|x| (*index_id, x)))
{
assert!(self
.tables
.get_mut(&table_id)
.expect("table to delete index from should exist")
.delete_index(&self.blob_store, index_id));
/// Rolls back the changes immediately made to the committed state during a transaction.
pub(super) fn rollback(&mut self, seq_state: &mut SequencesState, tx_state: TxState) {
// Roll back the changes in the reverse order in which they were made
// so that e.g., the last change is undone first.
for change in tx_state.pending_schema_changes.into_iter().rev() {
self.rollback_pending_schema_change(seq_state, change);
}
}

fn rollback_pending_schema_change(
&mut self,
seq_state: &mut SequencesState,
change: PendingSchemaChange,
) -> Option<()> {
use PendingSchemaChange::*;
match change {
// An index was removed. Add it back.
IndexRemoved(table_id, index_id, table_index, index_schema) => {
let table = self.tables.get_mut(&table_id)?;
// SAFETY: `table_index` was derived from `table`.
unsafe { table.add_index(index_id, table_index) };
table.with_mut_schema(|s| s.update_index(index_schema));
self.index_id_map.insert(index_id, table_id);
}
// An index was added. Remove it.
IndexAdded(table_id, index_id, pointer_map) => {
let table = self.tables.get_mut(&table_id)?;
table.delete_index(&self.blob_store, index_id, pointer_map);
table.with_mut_schema(|s| s.remove_index(index_id));
self.index_id_map.remove(&index_id);
}
// A table was removed. Add it back.
TableRemoved(table_id, table) => {
// We don't need to deal with sub-components.
// That is, we don't need to add back indices and such.
// Instead, there will be separate pending schema changes like `IndexRemoved`.
self.tables.insert(table_id, table);
}
// A table was added. Remove it.
TableAdded(table_id) => {
// We don't need to deal with sub-components.
// That is, we don't need to remove indices and such.
// Instead, there will be separate pending schema changes like `IndexAdded`.
self.tables.remove(&table_id);
}
// A table's access was changed. Change back to the old one.
TableAlterAccess(table_id, access) => {
let table = self.tables.get_mut(&table_id)?;
table.with_mut_schema(|s| s.table_access = access);
}
// A constraint was removed. Add it back.
ConstraintRemoved(table_id, constraint_schema) => {
let table = self.tables.get_mut(&table_id)?;
table.with_mut_schema(|s| s.update_constraint(constraint_schema));
}
// A constraint was added. Remove it.
ConstraintAdded(table_id, constraint_id) => {
let table = self.tables.get_mut(&table_id)?;
table.with_mut_schema(|s| s.remove_constraint(constraint_id));
}
// A sequence was removed. Add it back.
SequenceRemoved(table_id, seq, schema) => {
let table = self.tables.get_mut(&table_id)?;
table.with_mut_schema(|s| s.update_sequence(schema));
seq_state.insert(seq);
}
// A sequence was added. Remove it.
SequenceAdded(table_id, sequence_id) => {
let table = self.tables.get_mut(&table_id)?;
table.with_mut_schema(|s| s.remove_sequence(sequence_id));
seq_state.remove(sequence_id);
}
}

// Add the ones tx-state added.
self.index_id_map.extend(index_id_map);
Some(())
}

pub(super) fn get_table(&self, table_id: TableId) -> Option<&Table> {
self.tables.get(&table_id)
}

pub(super) fn get_table_mut(&mut self, table_id: TableId) -> (Option<&mut Table>, &PagePool) {
(self.tables.get_mut(&table_id), &self.page_pool)
}

pub fn get_table_and_blob_store_immutable(&self, table_id: TableId) -> Option<(&Table, &dyn BlobStore)> {
self.tables
.get(&table_id)
.map(|tbl| (tbl, &self.blob_store as &dyn BlobStore))
#[allow(clippy::unnecessary_lazy_evaluations)]
pub fn get_table_and_blob_store(&self, table_id: TableId) -> Result<CommitTableForInsertion<'_>> {
let table = self
.get_table(table_id)
.ok_or_else(|| TableError::IdNotFoundState(table_id))?;
Ok((table, &self.blob_store as &dyn BlobStore, &self.index_id_map))
}

pub(super) fn get_table_and_blob_store(&mut self, table_id: TableId) -> Option<(&mut Table, &mut dyn BlobStore)> {
self.tables
.get_mut(&table_id)
.map(|tbl| (tbl, &mut self.blob_store as &mut dyn BlobStore))
pub(super) fn get_table_and_blob_store_mut(
&mut self,
table_id: TableId,
) -> (Option<&mut Table>, &mut dyn BlobStore, &mut IndexIdMap) {
(
self.tables.get_mut(&table_id),
&mut self.blob_store as &mut dyn BlobStore,
&mut self.index_id_map,
)
}

fn make_table(schema: Arc<TableSchema>) -> Table {
Expand All @@ -703,7 +745,7 @@ impl CommittedState {
.entry(table_id)
.or_insert_with(|| Self::make_table(schema.clone()));
let blob_store = &mut self.blob_store;
let pool = &mut self.page_pool;
let pool = &self.page_pool;
(table, blob_store, pool)
}

Expand Down Expand Up @@ -741,25 +783,4 @@ impl CommittedState {
}
}

pub struct CommittedIndexIterWithDeletedMutTx<'a> {
committed_rows: IndexScanRangeIter<'a>,
del_table: &'a DeleteTable,
}

impl<'a> CommittedIndexIterWithDeletedMutTx<'a> {
pub(super) fn new(committed_rows: IndexScanRangeIter<'a>, del_table: &'a DeleteTable) -> Self {
Self {
committed_rows,
del_table,
}
}
}

impl<'a> Iterator for CommittedIndexIterWithDeletedMutTx<'a> {
type Item = RowRef<'a>;

fn next(&mut self) -> Option<Self::Item> {
self.committed_rows
.find(|row_ref| !self.del_table.contains(row_ref.pointer()))
}
}
pub(super) type CommitTableForInsertion<'a> = (&'a Table, &'a dyn BlobStore, &'a IndexIdMap);
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,8 @@ impl MutTxDatastore for Locking {
index_id: IndexId,
row: &[u8],
) -> Result<(ColList, RowRef<'a>, UpdateFlags)> {
tx.update(table_id, index_id, row)
let (gens, row_ref, update_flags) = tx.update(table_id, index_id, row)?;
Ok((gens, row_ref.collapse(), update_flags))
}

fn metadata_mut_tx(&self, tx: &Self::MutTx) -> Result<Option<Metadata>> {
Expand Down Expand Up @@ -1157,7 +1158,7 @@ mod tests {
}

fn get_datastore() -> Result<Locking> {
Locking::bootstrap(Identity::ZERO, <_>::default())
Locking::bootstrap(Identity::ZERO, PagePool::new_for_test())
}

fn col(col: u16) -> ColList {
Expand Down Expand Up @@ -2219,7 +2220,7 @@ mod tests {
// the delete should first mark the committed row as deleted in the delete tables,
// and then it should remove it from the delete tables upon insertion,
// rather than actually inserting it in the tx state.
// So the second transaction should be observationally a no-op.s
// So the second transaction should be observationally a no-op.
// There was a bug in the datastore that did not respect this in the presence of a unique index.
let (deleted_1, tx_data_1) = update(&datastore)?;
let (deleted_2, tx_data_2) = update(&datastore)?;
Expand Down
Loading
Loading