Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 179 additions & 19 deletions consensus/core/src/commit_finalizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,23 +345,26 @@ impl CommitFinalizer {
}
}

/// To save bandwidth, blocks do not include explicit accept votes on transactions.
/// Reject votes are included only the first time the block containing the voted-on
/// transaction is linked in a block. Other first time linked transactions, when
/// not rejected, are assumed to be accepted. This vote compression rule must also be
/// applied during vote aggregation.
/// Updates the set of origin descendants, by appending blocks from the last commit to
/// origin descendants of previous linked blocks from the same origin.
///
/// Transactions in a block can only be voted on by its immediate descendants.
/// A block is an **immediate descendant** if it can only link directly to the voted-on
/// block, without any intermediate blocks from its own authority. Votes from
/// non-immediate descendants are ignored.
/// The purpose of maintaining the origin descendants per block is to save bandwidth by avoiding to explicitly
/// list all accept votes on transactions in blocks.
/// Instead when an ancestor block Ba is first included by a proposed block Bp, reject votes for transactions in Ba
/// are explicitly listed (if they exist). The rest of non-rejected transactions in Ba are assumed to be accepted by Bp.
/// This vote compression rule must be applied during vote aggregation as well.
///
/// This rule implies the following optimization is possible: after collecting votes from a block,
/// we can skip collecting votes from its **origin descendants** (descendant blocks from the
/// same authority), because their votes would be ignored anyway.
/// The above rule is equivalent to saying that transactions in a block can only be voted on by its immediate descendants.
/// A block Bp is an **immediate descendant** of Ba, if any directed path from Bp to Ba does not contain a block from Bp's own authority.
///
/// This function updates the set of origin descendants for all pending blocks using blocks
/// from the last commit.
/// This rule implies the following optimization is possible: after collecting votes for Ba from block Bp,
/// we can skip collecting votes from Bp's **origin descendants** (descendant blocks from the
/// same authority), because they cannot vote on Ba anyway.
///
/// This vote compression rule is easy to implement when proposing blocks. Reject votes can be gathered against
/// all the newly included ancestors of the proposed block. But vote decompression is trickier to get right.
/// One edge case is when a block may not be an immediate descendant, because of GC. In this case votes from the
/// block should not be counted.
fn append_origin_descendants_from_last_commit(&mut self) {
let commit_state = self
.pending_commits
Expand Down Expand Up @@ -468,7 +471,7 @@ impl CommitFinalizer {
"CommitFinalizer::try_indirect_finalize_pending_transactions_in_first_commit",
);

let pending_blocks: Vec<(BlockRef, BTreeSet<TransactionIndex>)> = self.pending_commits[0]
let pending_blocks: Vec<_> = self.pending_commits[0]
.pending_transactions
.iter()
.map(|(k, v)| (*k, v.clone()))
Expand Down Expand Up @@ -605,15 +608,40 @@ impl CommitFinalizer {
continue;
}
let curr_block_state = blocks_map.get(&curr_block_ref).unwrap_or_else(|| panic!("Block {curr_block_ref} is either incorrectly gc'ed or failed to be recovered after crash.")).read();
// Ignore info from the block if its direct ancestor has been processed.
// The first ancestor of current block should have the same origin / author as the current block.
// If it is not found in the blocks map but have round higher than the pending block, it might have
// voted on the pending block but have been GC'ed.
// Because the GC'ed block might have voted on the pending block and rejected some of the pending transactions,
// we cannot assume current block is voting to accept transactions from the pending block.
let curr_origin_ancestor_ref = curr_block_state.block.ancestors().first().unwrap();
let skip_votes = curr_block_ref.author == curr_origin_ancestor_ref.author
&& pending_block_ref.round < curr_origin_ancestor_ref.round
&& !blocks_map.contains_key(curr_origin_ancestor_ref);
// Skip counting votes from the block if it has been marked to be ignored.
if ignored.insert(curr_block_ref) {
// Skip collecting votes from origin descendants of current block.
// Votes from origin descendants of current block do not count for this transactions.
// Votes from origin descendants of current block do not count for these transactions.
// Consider this case: block B is an origin descendant of block A (from the same authority),
// and both blocks A and B link to another block C.
// Only B's implicit and explicit transaction votes on C are considered.
// None of A's implicit or explicit transaction votes on C should be considered.
//
// See append_origin_descendants_from_last_commit() for more details.
ignored.extend(curr_block_state.origin_descendants.iter());
// Skip counting votes from current block if the votes on pending block could have been casted by an earlier block
// from the same origin as the current block.
// Note: if the current block casts reject votes for the pending block, it can be assumed that accept votes
// are also casted by the current block. But we choose to skip counting accept votes in this edge case for simplicity.
if skip_votes {
let hostname = &context.committee.authority(curr_block_ref.author).hostname;
context
.metrics
.node_metrics
.finalizer_skipped_voting_blocks
.with_label_values(&[hostname])
.inc();
continue;
}
// Get reject votes from current block to the pending block.
let curr_block_reject_votes = curr_block_state
.reject_votes
Expand All @@ -638,7 +666,7 @@ impl CommitFinalizer {
for index in newly_finalized {
accept_votes.remove(&index);
}
// End traversing if all blocks and requested transactions have reached quorum.
// End traversal if all blocks and requested transactions have reached quorum.
if accept_votes.is_empty() {
break;
}
Expand Down Expand Up @@ -769,6 +797,7 @@ struct BlockState {
// Reject votes casted by this block, and by linked ancestors from the same authority.
reject_votes: BTreeMap<BlockRef, BTreeSet<TransactionIndex>>,
// Other committed blocks that are origin descendants of this block.
// See the comment above append_origin_descendants_from_last_commit() for more details.
origin_descendants: Vec<BlockRef>,
}

Expand Down Expand Up @@ -821,7 +850,10 @@ mod tests {
}

fn create_commit_finalizer_fixture() -> Fixture {
let (context, _keys) = Context::new_for_test(4);
let (mut context, _keys) = Context::new_for_test(4);
context
.protocol_config
.set_consensus_gc_depth_for_testing(5);
let context = Arc::new(context);
let dag_state = Arc::new(RwLock::new(DagState::new(
context.clone(),
Expand Down Expand Up @@ -1186,6 +1218,134 @@ mod tests {
assert!(fixture.commit_finalizer.is_empty());
}

// Test indirect finalization when transaction is rejected due to GC.
#[tokio::test]
async fn test_indirect_reject_with_gc() {
let mut fixture = create_commit_finalizer_fixture();
assert_eq!(fixture.context.protocol_config.consensus_gc_depth(), 5);

// Create round 1 blocks with 10 transactions each.
let mut dag_builder = DagBuilder::new(fixture.context.clone());
dag_builder
.layer(1)
.num_transactions(10)
.build()
.persist_layers(fixture.dag_state.clone());
let round_1_blocks = dag_builder.all_blocks();
fixture
.transaction_certifier
.add_voted_blocks(round_1_blocks.iter().map(|b| (b.clone(), vec![])).collect());

// Select B1(3) to have a rejected transaction.
let block_with_rejected_txn = round_1_blocks[3].clone();
// How transactions in this block will be voted:
// Txn 1 (GC reject): 1 reject vote at round 2. But the txn will get rejected because there are only
// 2 accept votes.

// Create round 2 blocks, with B2(1) rejecting transaction 1 from B1(3).
// Note that 3 blocks link to B1(3) without rejecting transaction 1.
let ancestors: Vec<BlockRef> = round_1_blocks.iter().map(|b| b.reference()).collect();
let round_2_blocks = vec![
create_block(2, 0, ancestors.clone(), 0, vec![]),
create_block(
2,
1,
ancestors.clone(),
0,
vec![BlockTransactionVotes {
block_ref: block_with_rejected_txn.reference(),
rejects: vec![1],
}],
),
create_block(2, 2, ancestors.clone(), 0, vec![]),
create_block(2, 3, ancestors.clone(), 0, vec![]),
];
fixture.add_blocks(round_2_blocks.clone());

// Create round 3-6 blocks without creating or linking to an authority 2 block.
// The goal is to GC B2(2).
let mut last_round_blocks: Vec<VerifiedBlock> = round_2_blocks
.iter()
.enumerate()
.filter_map(|(i, b)| if i != 2 { Some(b.clone()) } else { None })
.collect();
for r in 3..=6 {
let ancestors: Vec<BlockRef> =
last_round_blocks.iter().map(|b| b.reference()).collect();
last_round_blocks = [0, 1, 3]
.map(|i| create_block(r, i, ancestors.clone(), 0, vec![]))
.to_vec();
fixture.add_blocks(last_round_blocks.clone());
}

// Create round 7-10 blocks and add a leader from authority 0 of each round.
let mut leaders = vec![];
for r in 7..=10 {
let mut ancestors: Vec<BlockRef> =
last_round_blocks.iter().map(|b| b.reference()).collect();
last_round_blocks = (0..4)
.map(|i| {
if r == 7 && i == 2 {
// Link to the GC'ed block B2(2).
ancestors.push(round_2_blocks[2].reference());
}
create_block(r, i, ancestors.clone(), 0, vec![])
})
.collect();
leaders.push(last_round_blocks[0].clone());
fixture.add_blocks(last_round_blocks.clone());
}

// Create CommittedSubDag from leaders.
assert_eq!(leaders.len(), 4);
let committed_sub_dags = fixture.linearizer.handle_commit(leaders);
assert_eq!(committed_sub_dags.len(), 4);

// Ensure 1 reject vote is contained in B2(1) in commit 0.
assert!(committed_sub_dags[0].blocks.contains(&round_2_blocks[1]));
// Ensure B2(2) is GC'ed.
for commit in committed_sub_dags.iter() {
assert!(!commit.blocks.contains(&round_2_blocks[2]));
}

// Buffering the initial 3 commits should not finalize.
for commit in committed_sub_dags.iter().take(3) {
assert!(commit.decided_with_local_blocks);
let finalized_commits = fixture
.commit_finalizer
.process_commit(commit.clone())
.await;
assert_eq!(finalized_commits.len(), 0);
}

// Buffering the 4th commit should finalize all commits.
let finalized_commits = fixture
.commit_finalizer
.process_commit(committed_sub_dags[3].clone())
.await;
assert_eq!(finalized_commits.len(), 4);

// Check rejected transactions.
// B1(3) txn 1 gets rejected, even though there are has 3 blocks links to B1(3) without rejecting txn 1.
// This is because there are only 2 accept votes for this transaction, which is less than the quorum threshold.
let rejected_transactions = finalized_commits[0].rejected_transactions_by_block.clone();
assert_eq!(rejected_transactions.len(), 1);
assert_eq!(
rejected_transactions
.get(&block_with_rejected_txn.reference())
.unwrap(),
&vec![1]
);

// Other commits should have no rejected transactions.
for commit in finalized_commits.iter().skip(1) {
assert!(commit.rejected_transactions_by_block.is_empty());
}

// CommitFinalizer should be empty.
assert!(fixture.commit_finalizer.is_empty());
}

#[tokio::test]
async fn test_finalize_remote_commits_with_reject_votes() {
let mut fixture: Fixture = create_commit_finalizer_fixture();
Expand Down
7 changes: 7 additions & 0 deletions consensus/core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ pub(crate) struct NodeMetrics {
pub(crate) finalizer_transaction_status: IntCounterVec,
pub(crate) finalizer_reject_votes: IntCounterVec,
pub(crate) finalizer_output_commits: IntCounterVec,
pub(crate) finalizer_skipped_voting_blocks: IntCounterVec,
pub(crate) uptime: Histogram,
}

Expand Down Expand Up @@ -917,6 +918,12 @@ impl NodeMetrics {
&["type"],
registry
).unwrap(),
finalizer_skipped_voting_blocks: register_int_counter_vec_with_registry!(
"finalizer_skipped_voting_blocks",
"Number of blocks skipped from voting due to potentially not being an immediate descendant.",
&["authority"],
registry
).unwrap(),
uptime: register_histogram_with_registry!(
"uptime",
"Total node uptime",
Expand Down
Loading