Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 96 additions & 18 deletions rs/messaging/src/message_routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use prometheus::{
Gauge, Histogram, HistogramTimer, HistogramVec, IntCounter, IntCounterVec, IntGauge,
IntGaugeVec,
};
use std::cell::RefCell;
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::convert::{AsRef, TryFrom};
use std::net::{Ipv4Addr, Ipv6Addr};
Expand Down Expand Up @@ -582,14 +583,10 @@ trait BatchProcessor: Send {
}

/// Implementation of [`BatchProcessor`].
struct BatchProcessorImpl<RegistryClient_>
where
RegistryClient_: RegistryClient,
{
struct BatchProcessorImpl<RegistryClient_: RegistryClient> {
state_manager: Arc<dyn StateManager<State = ReplicatedState>>,
state_machine: Box<dyn StateMachine>,
registry: Arc<RegistryClient_>,
bitcoin_config: BitcoinConfig,
registry_reader: RegistryReader<RegistryClient_>,
metrics: MessageRoutingMetrics,
log: ReplicaLogger,
#[allow(dead_code)]
Expand Down Expand Up @@ -704,18 +701,68 @@ impl<RegistryClient_: RegistryClient> BatchProcessorImpl<RegistryClient_> {
metrics.clone(),
));

let registry_reader = RegistryReader::new(
registry,
hypervisor_config.bitcoin,
metrics.clone(),
log.clone(),
);

Self {
state_manager,
state_machine,
registry,
bitcoin_config: hypervisor_config.bitcoin,
registry_reader,
metrics,
log,
malicious_flags,
}
}
}

/// The registry-derived values read at a given registry version, cached by
/// [`RegistryReader`] so that repeated reads at the same version are cheap.
struct CachedRegistryData {
registry_version: RegistryVersion,
network_topology: Arc<NetworkTopology>,
own_subnet_info: Arc<OwnSubnetInfo>,
registry_execution_settings: Arc<RegistryExecutionSettings>,
}

/// Reads the registry contents required by `BatchProcessorImpl::process_batch()`.
///
/// Caches the values read at the most recent registry version, so that a
/// subsequent read at the same version returns `Arc` clones of the cached
/// values instead of reading the registry again.
struct RegistryReader<RegistryClient_: RegistryClient> {
registry: Arc<RegistryClient_>,
bitcoin_config: BitcoinConfig,
/// Cache holding the values read at the most recent registry version.
cache: RefCell<Option<CachedRegistryData>>,
metrics: MessageRoutingMetrics,
log: ReplicaLogger,
}

impl<RegistryClient_: RegistryClient> RegistryReader<RegistryClient_> {
fn new(
registry: Arc<RegistryClient_>,
bitcoin_config: BitcoinConfig,
metrics: MessageRoutingMetrics,
log: ReplicaLogger,
) -> Self {
Self {
registry,
bitcoin_config,
cache: RefCell::new(None),
metrics,
log,
}
}

/// Reads registry contents required by `BatchProcessorImpl::process_batch()`.
///
/// Returns `Arc` clones of the cached values if they were previously read at
/// the same `registry_version`; otherwise reads the registry, caches the
/// results and returns them.
//
/// # Warning
/// If the registry is unavailable, this method loops until it becomes
Expand All @@ -724,10 +771,25 @@ impl<RegistryClient_: RegistryClient> BatchProcessorImpl<RegistryClient_> {
&self,
registry_version: RegistryVersion,
own_subnet_id: SubnetId,
) -> (NetworkTopology, OwnSubnetInfo, RegistryExecutionSettings) {
loop {
) -> (
Arc<NetworkTopology>,
Arc<OwnSubnetInfo>,
Arc<RegistryExecutionSettings>,
) {
// Return the cached values if they were read at the requested version.
if let Some(cached) = self.cache.borrow().as_ref()
&& cached.registry_version == registry_version
{
return (
Arc::clone(&cached.network_topology),
Arc::clone(&cached.own_subnet_info),
Arc::clone(&cached.registry_execution_settings),
);
}

let (network_topology, own_subnet_info, registry_execution_settings) = loop {
match self.try_to_read_registry(registry_version, own_subnet_id) {
Ok(result) => return result,
Ok(result) => break result,
Err(ReadRegistryError::Persistent(error_message)) => {
// Increment the critical error counter in case of a persistent error.
self.metrics.critical_error_failed_to_read_registry.inc();
Expand All @@ -749,11 +811,28 @@ impl<RegistryClient_: RegistryClient> BatchProcessorImpl<RegistryClient_> {
}
}
sleep(std::time::Duration::from_millis(100));
}
};

let network_topology = Arc::new(network_topology);
let own_subnet_info = Arc::new(own_subnet_info);
let registry_execution_settings = Arc::new(registry_execution_settings);

*self.cache.borrow_mut() = Some(CachedRegistryData {
registry_version,
network_topology: Arc::clone(&network_topology),
own_subnet_info: Arc::clone(&own_subnet_info),
registry_execution_settings: Arc::clone(&registry_execution_settings),
});

(
network_topology,
own_subnet_info,
registry_execution_settings,
)
}

/// Loads the `NetworkTopology`, `SubnetFeatures`, execution settings and
/// own subnet node public keys from the registry.
/// Loads the `NetworkTopology`, `OwnSubnetInfo` and execution settings from the
/// registry.
///
/// All of the above are required for deterministic processing, so if any
/// entry is missing or cannot be decoded; or reading the registry fails; the
Expand Down Expand Up @@ -1373,11 +1452,10 @@ impl<RegistryClient_: RegistryClient> BatchProcessor for BatchProcessorImpl<Regi
CertificationScope::Metadata
};

// TODO (MR-29) Cache network topology and subnet_features; and populate only
// if version referenced in batch changes.
let registry_version = batch.registry_version;
let (network_topology, own_subnet_info, registry_execution_settings) =
self.read_registry(registry_version, state.metadata.own_subnet_id);
let (network_topology, own_subnet_info, registry_execution_settings) = self
.registry_reader
.read_registry(registry_version, state.metadata.own_subnet_id);

self.metrics.blocks_proposed_total.inc();
self.metrics
Expand Down
121 changes: 109 additions & 12 deletions rs/messaging/src/message_routing/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,12 +620,12 @@ impl StateMachine for FakeStateMachine {
&self,
mut state: ReplicatedState,
_batch: Batch,
network_topology: NetworkTopology,
own_subnet_info: OwnSubnetInfo,
network_topology: Arc<NetworkTopology>,
own_subnet_info: Arc<OwnSubnetInfo>,
registry_settings: &RegistryExecutionSettings,
) -> ReplicatedState {
state.metadata.network_topology = Arc::new(network_topology);
state.metadata.own_subnet_info = Arc::new(own_subnet_info);
state.metadata.network_topology = network_topology;
state.metadata.own_subnet_info = own_subnet_info;
state.put_canister_state(
CanisterStateBuilder::new()
.with_canister_id(canister_test_id(1))
Expand Down Expand Up @@ -668,8 +668,12 @@ fn make_batch_processor<RegistryClient_: RegistryClient + 'static>(
let batch_processor = BatchProcessorImpl {
state_manager: state_manager.clone(),
state_machine: Box::new(FakeStateMachine(registry_settings.clone())),
registry,
bitcoin_config: BitcoinConfig::default(),
registry_reader: RegistryReader::new(
registry,
BitcoinConfig::default(),
metrics.clone(),
log.clone(),
),
metrics: metrics.clone(),
log,
malicious_flags: MaliciousFlags::default(),
Expand All @@ -685,7 +689,9 @@ fn try_to_read_registry(
own_subnet_id: SubnetId,
) -> Result<(NetworkTopology, OwnSubnetInfo, RegistryExecutionSettings), ReadRegistryError> {
let (batch_processor, _, _, _) = make_batch_processor(registry.clone(), log);
batch_processor.try_to_read_registry(registry.get_latest_version(), own_subnet_id)
batch_processor
.registry_reader
.try_to_read_registry(registry.get_latest_version(), own_subnet_id)
}

/// Tests that `BatchProcessorImpl::try_to_read_registry()` returns `Ok(_)`; and checks that the
Expand Down Expand Up @@ -873,6 +879,7 @@ fn try_read_registry_succeeds_with_fully_specified_registry_records() {
let (batch_processor, metrics, state_manager, registry_settings) =
make_batch_processor(fixture.registry.clone(), log);
let (network_topology, own_subnet_info, registry_execution_settings) = batch_processor
.registry_reader
.try_to_read_registry(fixture.registry.get_latest_version(), own_subnet_id)
.unwrap();
let OwnSubnetInfo {
Expand Down Expand Up @@ -1095,6 +1102,7 @@ fn try_read_registry_succeeds_with_minimal_registry_records() {
let (batch_processor, metrics, _, _) =
make_batch_processor(fixture.registry.clone(), log.clone());
let result = batch_processor
.registry_reader
.try_to_read_registry(fixture.registry.get_latest_version(), own_subnet_id);
assert_matches!(result, Ok(_));

Expand Down Expand Up @@ -1156,6 +1164,81 @@ fn try_read_registry_succeeds_with_minimal_registry_records() {
});
}

/// Tests that `RegistryReader::read_registry()` caches the values read at the most
/// recent registry version: a second read at the same version returns `Arc` clones
/// of the cached values, while a read at a different version reads the registry anew.
#[test]
fn read_registry_caches_values_by_registry_version() {
with_test_replica_logger(|log| {
use Integrity::*;

let own_subnet_id = subnet_test_id(13);
let own_subnet_record = SubnetRecord {
max_number_of_canisters: 784,
..Default::default()
};
let own_transcript = dummy_transcript_for_tests();
let nns_subnet_id = subnet_test_id(42);

let input = TestRecords {
subnet_ids: Valid([own_subnet_id]),
subnet_records: [Valid(&own_subnet_record)],
ni_dkg_transcripts: [Valid(Some(&own_transcript))],
nns_subnet_id: Valid(nns_subnet_id),
chain_key_enabled_subnets: &BTreeMap::default(),
provisional_whitelist: Missing,
routing_table: Missing,
canister_migrations: Missing,
node_public_keys: &BTreeMap::default(),
api_boundary_node_records: &BTreeMap::default(),
node_records: &BTreeMap::default(),
};

let fixture = RegistryFixture::new();
fixture.write_test_records(&input).unwrap();
let version_1 = fixture.registry.get_latest_version();

let (batch_processor, _, _, _) =
make_batch_processor(fixture.registry.clone(), log.clone());

// Two reads at the same registry version return `Arc` clones of the same
// (cached) values.
let (nt_1, osi_1, res_1) = batch_processor
.registry_reader
.read_registry(version_1, own_subnet_id);
let (nt_2, osi_2, res_2) = batch_processor
.registry_reader
.read_registry(version_1, own_subnet_id);
assert!(Arc::ptr_eq(&nt_1, &nt_2));
assert!(Arc::ptr_eq(&osi_1, &osi_2));
assert!(Arc::ptr_eq(&res_1, &res_2));

// Writing the same records again bumps the registry to a new version.
fixture.write_test_records(&input).unwrap();
let version_2 = fixture.registry.get_latest_version();
assert_ne!(version_1, version_2);

// A read at a different version bypasses the cache and returns freshly read
// values: distinct `Arc` instances, but equal in content.
let (nt_3, osi_3, res_3) = batch_processor
.registry_reader
.read_registry(version_2, own_subnet_id);
assert!(!Arc::ptr_eq(&nt_1, &nt_3));
assert!(!Arc::ptr_eq(&osi_1, &osi_3));
assert!(!Arc::ptr_eq(&res_1, &res_3));
assert_eq!(nt_1, nt_3);
assert_eq!(osi_1, osi_3);

// The freshly read values are now the cached ones.
let (nt_4, osi_4, res_4) = batch_processor
.registry_reader
.read_registry(version_2, own_subnet_id);
assert!(Arc::ptr_eq(&nt_3, &nt_4));
assert!(Arc::ptr_eq(&osi_3, &osi_4));
assert!(Arc::ptr_eq(&res_3, &res_4));
});
}

/// Tests that `BatchProcessorImpl::try_to_read_registry()` returns `Err(_)` if records in the
/// registry hold corrupted data. Corrupted data is simulated by writing a string of prime numbers
/// (almost anything would do) rather than an actual struct into registry.
Expand Down Expand Up @@ -1416,6 +1499,7 @@ fn try_read_registry_can_skip_missing_or_invalid_node_public_keys() {
let (batch_processor, metrics, _, _) =
make_batch_processor(fixture.registry.clone(), log.clone());
let res = batch_processor
.registry_reader
.try_to_read_registry(fixture.registry.get_latest_version(), own_subnet_id);
assert_matches!(res, Ok(_));

Expand Down Expand Up @@ -1562,6 +1646,7 @@ fn try_read_registry_can_skip_missing_or_invalid_fields_of_api_boundary_nodes()
let (batch_processor, metrics, _, _) =
make_batch_processor(fixture.registry.clone(), log.clone());
let res = batch_processor
.registry_reader
.try_to_read_registry(fixture.registry.get_latest_version(), own_subnet_id);
assert_matches!(res, Ok(_));

Expand Down Expand Up @@ -1639,6 +1724,7 @@ fn try_read_registry_succeeds_and_populates_subnet_admins() {
let (batch_processor, _, _, _) =
make_batch_processor(fixture.registry.clone(), log.clone());
let network_topology = batch_processor
.registry_reader
.try_to_read_registry(fixture.registry.get_latest_version(), own_subnet_id)
.unwrap()
.0;
Expand Down Expand Up @@ -1718,6 +1804,7 @@ fn try_read_registry_succeeds_and_resets_subnet_admins() {
let (batch_processor, metrics, _, _) =
make_batch_processor(fixture.registry.clone(), log.clone());
let network_topology = batch_processor
.registry_reader
.try_to_read_registry(fixture.registry.get_latest_version(), own_subnet_id)
.unwrap()
.0;
Expand Down Expand Up @@ -2091,13 +2178,17 @@ fn check_critical_error_counter_is_not_incremented_for_transient_error() {

// Try reading the registry at the next version; should return `Err(_)`.
assert_matches!(
batch_processor.try_to_read_registry(next_registry_version, own_subnet_id),
batch_processor
.registry_reader
.try_to_read_registry(next_registry_version, own_subnet_id),
Err(ReadRegistryError::Transient(_))
);
// Write minimal records to the registry, reading the registry should now work.
fixture.write_test_records(&minimal_input).unwrap();
assert_matches!(
batch_processor.try_to_read_registry(next_registry_version, own_subnet_id),
batch_processor
.registry_reader
.try_to_read_registry(next_registry_version, own_subnet_id),
Ok(_)
);

Expand All @@ -2108,7 +2199,9 @@ fn check_critical_error_counter_is_not_incremented_for_transient_error() {
// Spawn a thread trying to read from the registry at the next version; this will fail
// until we update the registry.
let handle = std::thread::spawn(move || {
batch_processor.read_registry(next_registry_version, own_subnet_id)
batch_processor
.registry_reader
.read_registry(next_registry_version, own_subnet_id)
});
// Wait 150ms, then update the registry and join the thread above.
std::thread::sleep(Duration::from_millis(150));
Expand Down Expand Up @@ -2146,11 +2239,15 @@ fn reading_mainnet_registry_succeeds() {

let (batch_processor, _, _, _) = make_batch_processor(registry, log);
assert_matches!(
batch_processor.try_to_read_registry(registry_version, mainnet_nns_subnet()),
batch_processor
.registry_reader
.try_to_read_registry(registry_version, mainnet_nns_subnet()),
Ok(_)
);
assert_matches!(
batch_processor.try_to_read_registry(registry_version, mainnet_app_subnet()),
batch_processor
.registry_reader
.try_to_read_registry(registry_version, mainnet_app_subnet()),
Ok(_)
);
});
Expand Down
Loading
Loading