Skip to content

Commit 1976f89

Browse files
authored
Delete old consensus handler (#24107)
- Rewrite tests (with extensive help from Claude) that depend on old consensus handler - delete old consensus handler
1 parent 8462695 commit 1976f89

13 files changed

+976
-2875
lines changed

crates/sui-core/src/authority/authority_per_epoch_store.rs

Lines changed: 24 additions & 1536 deletions
Large diffs are not rendered by default.

crates/sui-core/src/authority/authority_test_utils.rs

Lines changed: 64 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,21 @@
22
// Copyright (c) Mysten Labs, Inc.
33
// SPDX-License-Identifier: Apache-2.0
44

5-
use crate::checkpoints::CheckpointServiceNoop;
6-
use crate::consensus_handler::SequencedConsensusTransaction;
7-
85
use core::default::Default;
96
use fastcrypto::hash::MultisetHash;
107
use fastcrypto::traits::KeyPair;
118
use sui_protocol_config::Chain;
129
use sui_types::base_types::FullObjectRef;
1310
use sui_types::crypto::{AccountKeyPair, AuthorityKeyPair};
14-
use sui_types::messages_consensus::ConsensusTransaction;
1511
use sui_types::utils::to_sender_signed_transaction;
1612

17-
use super::shared_object_version_manager::{AssignedTxAndVersions, AssignedVersions};
13+
use super::shared_object_version_manager::AssignedVersions;
1814
use super::test_authority_builder::TestAuthorityBuilder;
1915
use super::*;
2016

17+
#[cfg(test)]
18+
use super::shared_object_version_manager::{AssignedTxAndVersions, Schedulable};
19+
2120
pub async fn send_and_confirm_transaction(
2221
authority: &AuthorityState,
2322
transaction: Transaction,
@@ -427,27 +426,20 @@ pub async fn send_consensus(
427426
authority: &AuthorityState,
428427
cert: &VerifiedCertificate,
429428
) -> AssignedVersions {
430-
let transaction = SequencedConsensusTransaction::new_test(
431-
ConsensusTransaction::new_certificate_message(&authority.name, cert.clone().into_inner()),
432-
);
433-
434-
let (_, assigned_versions) = authority
429+
let assigned_versions = authority
435430
.epoch_store_for_testing()
436-
.process_consensus_transactions_for_tests(
437-
vec![transaction],
438-
&Arc::new(CheckpointServiceNoop {}),
431+
.assign_shared_object_versions_for_tests(
439432
authority.get_object_cache_reader().as_ref(),
440-
&authority.metrics,
441-
true,
433+
&vec![VerifiedExecutableTransaction::new_from_certificate(
434+
cert.clone(),
435+
)],
442436
)
443-
.await
444437
.unwrap();
445438

446439
let assigned_versions = assigned_versions
447-
.0
448-
.into_iter()
449-
.next()
450-
.map(|(_, v)| v)
440+
.into_map()
441+
.get(&cert.key())
442+
.cloned()
451443
.unwrap_or_else(|| AssignedVersions::new(vec![], None));
452444

453445
let certs = vec![(
@@ -466,53 +458,70 @@ pub async fn send_consensus_no_execution(
466458
authority: &AuthorityState,
467459
cert: &VerifiedCertificate,
468460
) -> AssignedVersions {
469-
let transaction = SequencedConsensusTransaction::new_test(
470-
ConsensusTransaction::new_certificate_message(&authority.name, cert.clone().into_inner()),
471-
);
472-
473-
// Call process_consensus_transaction() instead of handle_consensus_transaction(), to avoid actually executing cert.
461+
// Use the simpler assign_shared_object_versions_for_tests API to avoid actually executing cert.
474462
// This allows testing cert execution independently.
475-
let (_, assigned_versions) = authority
463+
let assigned_versions = authority
476464
.epoch_store_for_testing()
477-
.process_consensus_transactions_for_tests(
478-
vec![transaction],
479-
&Arc::new(CheckpointServiceNoop {}),
465+
.assign_shared_object_versions_for_tests(
480466
authority.get_object_cache_reader().as_ref(),
481-
&authority.metrics,
482-
true,
467+
&vec![VerifiedExecutableTransaction::new_from_certificate(
468+
cert.clone(),
469+
)],
483470
)
484-
.await
485471
.unwrap();
486-
assert_eq!(assigned_versions.0.len(), 1);
487-
assigned_versions.0.into_iter().next().unwrap().1
472+
473+
assigned_versions
474+
.into_map()
475+
.get(&cert.key())
476+
.cloned()
477+
.unwrap_or_else(|| AssignedVersions::new(vec![], None))
488478
}
489479

490-
pub async fn send_batch_consensus_no_execution(
480+
#[cfg(test)]
481+
pub async fn send_batch_consensus_no_execution<C>(
491482
authority: &AuthorityState,
492483
certificates: &[VerifiedCertificate],
493-
skip_consensus_commit_prologue_in_test: bool,
494-
) -> (Vec<Schedulable>, AssignedTxAndVersions) {
495-
let transactions = certificates
484+
consensus_handler: &mut crate::consensus_handler::ConsensusHandler<C>,
485+
captured_transactions: &crate::consensus_test_utils::CapturedTransactions,
486+
) -> (Vec<Schedulable>, AssignedTxAndVersions)
487+
where
488+
C: crate::checkpoints::CheckpointServiceNotify + Send + Sync + 'static,
489+
{
490+
use crate::consensus_test_utils::TestConsensusCommit;
491+
use sui_types::messages_consensus::ConsensusTransaction;
492+
493+
let consensus_transactions: Vec<ConsensusTransaction> = certificates
496494
.iter()
497495
.map(|cert| {
498-
SequencedConsensusTransaction::new_test(ConsensusTransaction::new_certificate_message(
499-
&authority.name,
500-
cert.clone().into_inner(),
501-
))
496+
ConsensusTransaction::new_certificate_message(&authority.name, cert.clone().into())
502497
})
503498
.collect();
504499

505-
// Call process_consensus_transaction() instead of handle_consensus_transaction(), to avoid actually executing cert.
506-
// This allows testing cert execution independently.
507-
authority
508-
.epoch_store_for_testing()
509-
.process_consensus_transactions_for_tests(
510-
transactions,
511-
&Arc::new(CheckpointServiceNoop {}),
512-
authority.get_object_cache_reader().as_ref(),
513-
&authority.metrics,
514-
skip_consensus_commit_prologue_in_test,
515-
)
516-
.await
517-
.unwrap()
500+
// Determine appropriate round and timestamp
501+
let epoch_store = authority.epoch_store_for_testing();
502+
let round = epoch_store.get_highest_pending_checkpoint_height() + 1;
503+
let timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
504+
let sub_dag_index = 0;
505+
506+
let commit =
507+
TestConsensusCommit::new(consensus_transactions, round, timestamp_ms, sub_dag_index);
508+
509+
consensus_handler
510+
.handle_consensus_commit_for_test(commit)
511+
.await;
512+
513+
// Wait for captured transactions to be available
514+
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
515+
516+
let (scheduled_txns, assigned_tx_and_versions) = {
517+
let mut captured = captured_transactions.lock();
518+
assert!(
519+
!captured.is_empty(),
520+
"Expected transactions to be scheduled"
521+
);
522+
let (scheduled_txns, assigned_tx_and_versions, _) = captured.remove(0);
523+
(scheduled_txns, assigned_tx_and_versions)
524+
};
525+
526+
(scheduled_txns, assigned_tx_and_versions)
518527
}

crates/sui-core/src/authority/consensus_quarantine.rs

Lines changed: 2 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use crate::authority::authority_per_epoch_store::{
66
};
77
use crate::authority::transaction_deferral::DeferralKey;
88
use crate::checkpoints::BuilderCheckpointSummary;
9-
use crate::consensus_handler::SequencedConsensusTransactionKind;
109
use crate::epoch::randomness::SINGLETON_KEY;
1110
use dashmap::DashMap;
1211
use fastcrypto_tbls::{dkg_v1, nodes::PartyId};
@@ -23,9 +22,7 @@ use sui_types::crypto::RandomnessRound;
2322
use sui_types::error::SuiResult;
2423
use sui_types::execution::ExecutionTimeObservationKey;
2524
use sui_types::messages_checkpoint::{CheckpointContents, CheckpointSequenceNumber};
26-
use sui_types::messages_consensus::{
27-
AuthorityIndex, ConsensusTransaction, ConsensusTransactionKind,
28-
};
25+
use sui_types::messages_consensus::AuthorityIndex;
2926
use sui_types::{
3027
base_types::{ConsensusObjectSequenceKey, ObjectID},
3128
digests::TransactionDigest,
@@ -193,14 +190,6 @@ impl ConsensusCommitOutput {
193190
}
194191

195192
pub fn defer_transactions(
196-
&mut self,
197-
key: DeferralKey,
198-
transactions: Vec<VerifiedSequencedConsensusTransaction>,
199-
) {
200-
self.deferred_txns.push((key, transactions));
201-
}
202-
203-
pub fn defer_transactions_v2(
204193
&mut self,
205194
key: DeferralKey,
206195
transactions: Vec<VerifiedExecutableTransaction>,
@@ -301,14 +290,11 @@ impl ConsensusCommitOutput {
301290
batch.insert_batch(&tables.next_shared_object_versions_v2, next_versions)?;
302291
}
303292

304-
// TODO(consensus-handler-rewrite): delete the old structures once commit handler rewrite is complete
305-
batch.delete_batch(&tables.deferred_transactions, &self.deleted_deferred_txns)?;
306293
batch.delete_batch(
307294
&tables.deferred_transactions_v2,
308295
&self.deleted_deferred_txns,
309296
)?;
310297

311-
batch.insert_batch(&tables.deferred_transactions, self.deferred_txns)?;
312298
batch.insert_batch(
313299
&tables.deferred_transactions_v2,
314300
self.deferred_txns_v2.into_iter().map(|(key, txs)| {
@@ -402,10 +388,6 @@ impl ConsensusCommitOutput {
402388
pub(crate) struct ConsensusOutputCache {
403389
// deferred transactions is only used by consensus handler so there should never be lock contention
404390
// - hence no need for a DashMap.
405-
// TODO(consensus-handler-rewrite): remove this once we no longer need to support the old consensus handler
406-
pub(crate) deferred_transactions:
407-
Mutex<BTreeMap<DeferralKey, Vec<VerifiedSequencedConsensusTransaction>>>,
408-
409391
pub(crate) deferred_transactions_v2:
410392
Mutex<BTreeMap<DeferralKey, Vec<VerifiedExecutableTransaction>>>,
411393

@@ -423,10 +405,6 @@ impl ConsensusOutputCache {
423405
epoch_start_configuration: &EpochStartConfiguration,
424406
tables: &AuthorityEpochTables,
425407
) -> Self {
426-
let deferred_transactions = tables
427-
.get_all_deferred_transactions()
428-
.expect("load deferred transactions cannot fail");
429-
430408
let deferred_transactions_v2 = tables
431409
.get_all_deferred_transactions_v2()
432410
.expect("load deferred transactions cannot fail");
@@ -439,7 +417,6 @@ impl ConsensusOutputCache {
439417
let executed_in_epoch_cache_capacity = 50_000;
440418

441419
Self {
442-
deferred_transactions: Mutex::new(deferred_transactions),
443420
deferred_transactions_v2: Mutex::new(deferred_transactions_v2),
444421
user_signatures_for_checkpoints: Default::default(),
445422
executed_in_epoch: RwLock::new(DashMap::with_shard_amount(2048)),
@@ -915,94 +892,7 @@ impl ConsensusOutputQuarantine {
915892
.next()
916893
}
917894

918-
pub(super) fn load_initial_object_debts(
919-
&self,
920-
epoch_store: &AuthorityPerEpochStore,
921-
current_round: Round,
922-
for_randomness: bool,
923-
transactions: &[VerifiedSequencedConsensusTransaction],
924-
) -> SuiResult<impl IntoIterator<Item = (ObjectID, u64)>> {
925-
let protocol_config = epoch_store.protocol_config();
926-
let tables = epoch_store.tables()?;
927-
let default_per_commit_budget = protocol_config
928-
.max_accumulated_txn_cost_per_object_in_mysticeti_commit_as_option()
929-
.unwrap_or(0);
930-
let (hash_table, db_table, per_commit_budget) = if for_randomness {
931-
(
932-
&self.congestion_control_randomness_object_debts,
933-
&tables.congestion_control_randomness_object_debts,
934-
protocol_config
935-
.max_accumulated_randomness_txn_cost_per_object_in_mysticeti_commit_as_option()
936-
.unwrap_or(default_per_commit_budget),
937-
)
938-
} else {
939-
(
940-
&self.congestion_control_object_debts,
941-
&tables.congestion_control_object_debts,
942-
default_per_commit_budget,
943-
)
944-
};
945-
let mut shared_input_object_ids: Vec<_> = transactions
946-
.iter()
947-
.filter_map(|tx| {
948-
match &tx.0.transaction {
949-
SequencedConsensusTransactionKind::External(ConsensusTransaction {
950-
kind: ConsensusTransactionKind::CertifiedTransaction(tx),
951-
..
952-
}) => Some(itertools::Either::Left(
953-
tx.shared_input_objects().map(|obj| obj.id),
954-
)),
955-
SequencedConsensusTransactionKind::External(ConsensusTransaction {
956-
kind: ConsensusTransactionKind::UserTransaction(tx),
957-
..
958-
// Bug fix that required a protocol flag.
959-
}) if protocol_config.use_mfp_txns_in_load_initial_object_debts() => Some(
960-
itertools::Either::Right(tx.shared_input_objects().map(|obj| obj.id)),
961-
),
962-
_ => None,
963-
}
964-
})
965-
.flatten()
966-
.collect();
967-
shared_input_object_ids.sort();
968-
shared_input_object_ids.dedup();
969-
970-
let results = do_fallback_lookup(
971-
&shared_input_object_ids,
972-
|object_id| {
973-
if let Some(debt) = hash_table.get(object_id) {
974-
CacheResult::Hit(Some(debt.into_v1()))
975-
} else {
976-
CacheResult::Miss
977-
}
978-
},
979-
|object_ids| {
980-
db_table
981-
.multi_get(object_ids)
982-
.expect("db error")
983-
.into_iter()
984-
.map(|debt| debt.map(|debt| debt.into_v1()))
985-
.collect()
986-
},
987-
);
988-
989-
Ok(results
990-
.into_iter()
991-
.zip(shared_input_object_ids)
992-
.filter_map(|(debt, object_id)| debt.map(|debt| (debt, object_id)))
993-
.map(move |((round, debt), object_id)| {
994-
// Stored debts already account for the budget of the round in which
995-
// they were accumulated. Application of budget from future rounds to
996-
// the debt is handled here.
997-
assert!(current_round > round);
998-
let num_rounds = current_round - round - 1;
999-
let debt = debt.saturating_sub(per_commit_budget * num_rounds);
1000-
(object_id, debt)
1001-
}))
1002-
}
1003-
1004-
// TODO: Remove the above version and rename this without _v2
1005-
pub(crate) fn load_initial_object_debts_v2(
895+
pub(crate) fn load_initial_object_debts(
1006896
&self,
1007897
epoch_store: &AuthorityPerEpochStore,
1008898
current_round: Round,

0 commit comments

Comments
 (0)