diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 8586fd9cd36..48481c2e1dc 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -14,9 +14,8 @@ use crate::rpc::{ RequestType, ResponseTermination, RpcErrorResponse, RpcResponse, RpcSuccessResponse, RPC, }; use crate::types::{ - attestation_sync_committee_topics, fork_core_topics, subnet_from_topic_hash, GossipEncoding, - GossipKind, GossipTopic, SnappyTransform, Subnet, SubnetDiscovery, ALTAIR_CORE_TOPICS, - BASE_CORE_TOPICS, CAPELLA_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS, + all_topics_at_fork, core_topics_to_subscribe, is_fork_non_core_topic, subnet_from_topic_hash, + GossipEncoding, GossipKind, GossipTopic, SnappyTransform, Subnet, SubnetDiscovery, }; use crate::EnrExt; use crate::Eth2Enr; @@ -280,14 +279,39 @@ impl Network { // Set up a scoring update interval let update_gossipsub_scores = tokio::time::interval(params.decay_interval); - let max_topics = ctx.chain_spec.attestation_subnet_count as usize - + SYNC_COMMITTEE_SUBNET_COUNT as usize - + ctx.chain_spec.blob_sidecar_subnet_count_max() as usize - + ctx.chain_spec.data_column_sidecar_subnet_count as usize - + BASE_CORE_TOPICS.len() - + ALTAIR_CORE_TOPICS.len() - + CAPELLA_CORE_TOPICS.len() // 0 core deneb and electra topics - + LIGHT_CLIENT_GOSSIP_TOPICS.len(); + let current_and_future_forks = ForkName::list_all().into_iter().filter_map(|fork| { + if fork >= ctx.fork_context.current_fork() { + ctx.fork_context + .to_context_bytes(fork) + .map(|fork_digest| (fork, fork_digest)) + } else { + None + } + }); + + let all_topics_for_forks = current_and_future_forks + .map(|(fork, fork_digest)| { + all_topics_at_fork::(fork, &ctx.chain_spec) + .into_iter() + .map(|topic| { + Topic::new(GossipTopic::new( + topic, + GossipEncoding::default(), + fork_digest, + )) + .into() + }) + .collect::>() + }) + .collect::>(); + + // For simplicity find the fork with the most individual topics and assume all forks + // have the same topic count + let max_topics_at_any_fork = all_topics_for_forks + .iter() + .map(|topics| topics.len()) + .max() + .expect("each fork has at least 5 hardcoded core topics"); let possible_fork_digests = ctx.fork_context.all_fork_digests(); let filter = gossipsub::MaxCountSubscriptionFilter { @@ -297,9 +321,9 @@ impl Network { SYNC_COMMITTEE_SUBNET_COUNT, ), // during a fork we subscribe to both the old and new topics - max_subscribed_topics: max_topics * 4, + max_subscribed_topics: max_topics_at_any_fork * 4, // 424 in theory = (64 attestation + 4 sync committee + 7 core topics + 9 blob topics + 128 column topics) * 2 - max_subscriptions_per_request: max_topics * 2, + max_subscriptions_per_request: max_topics_at_any_fork * 2, }; // If metrics are enabled for libp2p build the configuration @@ -332,17 +356,9 @@ impl Network { // If we are using metrics, then register which topics we want to make sure to keep // track of if ctx.libp2p_registry.is_some() { - let topics_to_keep_metrics_for = attestation_sync_committee_topics::() - .map(|gossip_kind| { - Topic::from(GossipTopic::new( - gossip_kind, - GossipEncoding::default(), - enr_fork_id.fork_digest, - )) - .into() - }) - .collect::>(); - gossipsub.register_topics_for_metrics(topics_to_keep_metrics_for); + for topics in all_topics_for_forks { + gossipsub.register_topics_for_metrics(topics); + } } (gossipsub, update_gossipsub_scores) @@ -700,38 +716,26 @@ impl Network { /// Subscribe to all required topics for the `new_fork` with the given `new_fork_digest`. pub fn subscribe_new_fork_topics(&mut self, new_fork: ForkName, new_fork_digest: [u8; 4]) { - // Subscribe to existing topics with new fork digest + // Re-subscribe to non-core topics with the new fork digest let subscriptions = self.network_globals.gossipsub_subscriptions.read().clone(); for mut topic in subscriptions.into_iter() { - topic.fork_digest = new_fork_digest; - self.subscribe(topic); + if is_fork_non_core_topic(&topic, new_fork) { + topic.fork_digest = new_fork_digest; + self.subscribe(topic); + } } // Subscribe to core topics for the new fork - for kind in fork_core_topics::( - &new_fork, - &self.fork_context.spec, + for kind in core_topics_to_subscribe::( + new_fork, &self.network_globals.as_topic_config(), + &self.fork_context.spec, ) { let topic = GossipTopic::new(kind, GossipEncoding::default(), new_fork_digest); self.subscribe(topic); } - // TODO(das): unsubscribe from blob topics at the Fulu fork - - // Register the new topics for metrics - let topics_to_keep_metrics_for = attestation_sync_committee_topics::() - .map(|gossip_kind| { - Topic::from(GossipTopic::new( - gossip_kind, - GossipEncoding::default(), - new_fork_digest, - )) - .into() - }) - .collect::>(); - self.gossipsub_mut() - .register_topics_for_metrics(topics_to_keep_metrics_for); + // Already registered all possible gossipsub topics for metrics } /// Unsubscribe from all topics that doesn't have the given fork_digest diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index 2800b75133b..d243c68c0f5 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -187,6 +187,8 @@ impl NetworkGlobals { /// Returns the TopicConfig to compute the set of Gossip topics for a given fork pub fn as_topic_config(&self) -> TopicConfig { TopicConfig { + enable_light_client_server: self.config.enable_light_client_server, + subscribe_all_subnets: self.config.subscribe_all_subnets, subscribe_all_data_column_subnets: self.config.subscribe_all_data_column_subnets, sampling_subnets: &self.sampling_subnets, } diff --git a/beacon_node/lighthouse_network/src/types/mod.rs b/beacon_node/lighthouse_network/src/types/mod.rs index 58ba7588b98..db92f05b8fd 100644 --- a/beacon_node/lighthouse_network/src/types/mod.rs +++ b/beacon_node/lighthouse_network/src/types/mod.rs @@ -16,7 +16,6 @@ pub use pubsub::{PubsubMessage, SnappyTransform}; pub use subnet::{Subnet, SubnetDiscovery}; pub use sync_state::{BackFillState, SyncState}; pub use topics::{ - attestation_sync_committee_topics, core_topics_to_subscribe, fork_core_topics, - subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, TopicConfig, - ALTAIR_CORE_TOPICS, BASE_CORE_TOPICS, CAPELLA_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS, + all_topics_at_fork, core_topics_to_subscribe, is_fork_non_core_topic, subnet_from_topic_hash, + GossipEncoding, GossipKind, GossipTopic, TopicConfig, }; diff --git a/beacon_node/lighthouse_network/src/types/topics.rs b/beacon_node/lighthouse_network/src/types/topics.rs index 171dab09a35..56b97303d3e 100644 --- a/beacon_node/lighthouse_network/src/types/topics.rs +++ b/beacon_node/lighthouse_network/src/types/topics.rs @@ -25,104 +25,110 @@ pub const BLS_TO_EXECUTION_CHANGE_TOPIC: &str = "bls_to_execution_change"; pub const LIGHT_CLIENT_FINALITY_UPDATE: &str = "light_client_finality_update"; pub const LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic_update"; -pub const BASE_CORE_TOPICS: [GossipKind; 5] = [ - GossipKind::BeaconBlock, - GossipKind::BeaconAggregateAndProof, - GossipKind::VoluntaryExit, - GossipKind::ProposerSlashing, - GossipKind::AttesterSlashing, -]; - -pub const ALTAIR_CORE_TOPICS: [GossipKind; 1] = [GossipKind::SignedContributionAndProof]; - -pub const CAPELLA_CORE_TOPICS: [GossipKind; 1] = [GossipKind::BlsToExecutionChange]; - -pub const LIGHT_CLIENT_GOSSIP_TOPICS: [GossipKind; 2] = [ - GossipKind::LightClientFinalityUpdate, - GossipKind::LightClientOptimisticUpdate, -]; - #[derive(Debug)] pub struct TopicConfig<'a> { + pub enable_light_client_server: bool, + pub subscribe_all_subnets: bool, pub subscribe_all_data_column_subnets: bool, pub sampling_subnets: &'a HashSet, } -/// Returns the core topics associated with each fork that are new to the previous fork -pub fn fork_core_topics( - fork_name: &ForkName, +/// Returns all the topics the node should subscribe at `fork_name` +pub fn core_topics_to_subscribe( + fork_name: ForkName, + opts: &TopicConfig, spec: &ChainSpec, - topic_config: &TopicConfig, ) -> Vec { - match fork_name { - ForkName::Base => BASE_CORE_TOPICS.to_vec(), - ForkName::Altair => ALTAIR_CORE_TOPICS.to_vec(), - ForkName::Bellatrix => vec![], - ForkName::Capella => CAPELLA_CORE_TOPICS.to_vec(), - ForkName::Deneb => { - // All of deneb blob topics are core topics - let mut deneb_blob_topics = Vec::new(); - for i in 0..spec.blob_sidecar_subnet_count(ForkName::Deneb) { - deneb_blob_topics.push(GossipKind::BlobSidecar(i)); - } - deneb_blob_topics + let mut topics = vec![ + GossipKind::BeaconBlock, + GossipKind::BeaconAggregateAndProof, + GossipKind::VoluntaryExit, + GossipKind::ProposerSlashing, + GossipKind::AttesterSlashing, + ]; + + if opts.subscribe_all_subnets { + for i in 0..spec.attestation_subnet_count { + topics.push(GossipKind::Attestation(i.into())); } - ForkName::Electra => { - // All of electra blob topics are core topics - let mut electra_blob_topics = Vec::new(); - for i in 0..spec.blob_sidecar_subnet_count(ForkName::Electra) { - electra_blob_topics.push(GossipKind::BlobSidecar(i)); + } + + if fork_name.altair_enabled() { + topics.push(GossipKind::SignedContributionAndProof); + + if opts.subscribe_all_subnets { + for i in 0..E::SyncCommitteeSubnetCount::to_u64() { + topics.push(GossipKind::SyncCommitteeMessage(i.into())); } - electra_blob_topics } - ForkName::Fulu => { - let mut topics = vec![]; - if topic_config.subscribe_all_data_column_subnets { - for column_subnet in 0..spec.data_column_sidecar_subnet_count { - topics.push(GossipKind::DataColumnSidecar(DataColumnSubnetId::new( - column_subnet, - ))); - } - } else { - for column_subnet in topic_config.sampling_subnets { - topics.push(GossipKind::DataColumnSidecar(*column_subnet)); - } + + if opts.enable_light_client_server { + topics.push(GossipKind::LightClientFinalityUpdate); + topics.push(GossipKind::LightClientOptimisticUpdate); + } + } + + if fork_name.capella_enabled() { + topics.push(GossipKind::BlsToExecutionChange); + } + + if fork_name.deneb_enabled() && !fork_name.fulu_enabled() { + // All of deneb blob topics are core topics + for i in 0..spec.blob_sidecar_subnet_count(fork_name) { + topics.push(GossipKind::BlobSidecar(i)); + } + } + + if fork_name.fulu_enabled() { + if opts.subscribe_all_data_column_subnets { + for i in 0..spec.data_column_sidecar_subnet_count { + topics.push(GossipKind::DataColumnSidecar(i.into())); + } + } else { + for subnet in opts.sampling_subnets { + topics.push(GossipKind::DataColumnSidecar(*subnet)); } - topics } } -} -/// Returns all the attestation and sync committee topics, for a given fork. -pub fn attestation_sync_committee_topics() -> impl Iterator { - (0..E::SubnetBitfieldLength::to_usize()) - .map(|subnet_id| GossipKind::Attestation(SubnetId::new(subnet_id as u64))) - .chain( - (0..E::SyncCommitteeSubnetCount::to_usize()).map(|sync_committee_id| { - GossipKind::SyncCommitteeMessage(SyncSubnetId::new(sync_committee_id as u64)) - }), - ) + topics } -/// Returns all the topics that we need to subscribe to for a given fork -/// including topics from older forks and new topics for the current fork. -pub fn core_topics_to_subscribe( - mut current_fork: ForkName, - spec: &ChainSpec, - topic_config: &TopicConfig, -) -> Vec { - let mut topics = fork_core_topics::(¤t_fork, spec, topic_config); - while let Some(previous_fork) = current_fork.previous_fork() { - let previous_fork_topics = fork_core_topics::(&previous_fork, spec, topic_config); - topics.extend(previous_fork_topics); - current_fork = previous_fork; +/// Returns true if a given non-core `GossipTopic` MAY be subscribe at this fork. +/// +/// For example: the `Attestation` topic is not subscribed as a core topic if +/// subscribe_all_subnets = false` but we may subscribe to it outside of a fork +/// boundary if the node is an aggregator. +pub fn is_fork_non_core_topic(topic: &GossipTopic, _fork_name: ForkName) -> bool { + match topic.kind() { + // Node may be aggregator of attestation and sync_committee_message topics for all known + // forks + GossipKind::Attestation(_) | GossipKind::SyncCommitteeMessage(_) => true, + // All these topics are core-only + GossipKind::BeaconBlock + | GossipKind::BeaconAggregateAndProof + | GossipKind::BlobSidecar(_) + | GossipKind::DataColumnSidecar(_) + | GossipKind::VoluntaryExit + | GossipKind::ProposerSlashing + | GossipKind::AttesterSlashing + | GossipKind::SignedContributionAndProof + | GossipKind::BlsToExecutionChange + | GossipKind::LightClientFinalityUpdate + | GossipKind::LightClientOptimisticUpdate => false, } - // Remove duplicates - topics - .into_iter() - .collect::>() - .into_iter() - .collect() +} + +pub fn all_topics_at_fork(fork: ForkName, spec: &ChainSpec) -> Vec { + // Compute the worst case of all forks + let sampling_subnets = HashSet::from_iter(spec.all_data_column_sidecar_subnets()); + let opts = TopicConfig { + enable_light_client_server: true, + subscribe_all_subnets: true, + subscribe_all_data_column_subnets: true, + sampling_subnets: &sampling_subnets, + }; + core_topics_to_subscribe::(fork, &opts, spec) } /// A gossipsub topic which encapsulates the type of messages that should be sent and received over @@ -368,10 +374,9 @@ fn subnet_topic_index(topic: &str) -> Option { #[cfg(test)] mod tests { - use types::MainnetEthSpec; - use super::GossipKind::*; use super::*; + use types::{Epoch, MainnetEthSpec as E}; const GOOD_FORK_DIGEST: &str = "e1925f3b"; const BAD_PREFIX: &str = "tezos"; @@ -496,31 +501,94 @@ mod tests { assert_eq!("attester_slashing", AttesterSlashing.as_ref()); } - #[test] - fn test_core_topics_to_subscribe() { - type E = MainnetEthSpec; - let spec = E::default_spec(); - let mut all_topics = Vec::new(); - let topic_config = TopicConfig { + fn get_spec() -> ChainSpec { + let mut spec = E::default_spec(); + spec.altair_fork_epoch = Some(Epoch::new(1)); + spec.bellatrix_fork_epoch = Some(Epoch::new(2)); + spec.capella_fork_epoch = Some(Epoch::new(3)); + spec.deneb_fork_epoch = Some(Epoch::new(4)); + spec.electra_fork_epoch = Some(Epoch::new(5)); + spec.fulu_fork_epoch = Some(Epoch::new(6)); + spec + } + + fn get_sampling_subnets() -> HashSet { + HashSet::new() + } + + fn get_topic_config(sampling_subnets: &HashSet) -> TopicConfig { + TopicConfig { + enable_light_client_server: false, + subscribe_all_subnets: false, subscribe_all_data_column_subnets: false, - sampling_subnets: &HashSet::from_iter([1, 2].map(DataColumnSubnetId::new)), - }; - let mut fulu_core_topics = fork_core_topics::(&ForkName::Fulu, &spec, &topic_config); - let mut electra_core_topics = - fork_core_topics::(&ForkName::Electra, &spec, &topic_config); - let mut deneb_core_topics = fork_core_topics::(&ForkName::Deneb, &spec, &topic_config); - all_topics.append(&mut fulu_core_topics); - all_topics.append(&mut electra_core_topics); - all_topics.append(&mut deneb_core_topics); - all_topics.extend(CAPELLA_CORE_TOPICS); - all_topics.extend(ALTAIR_CORE_TOPICS); - all_topics.extend(BASE_CORE_TOPICS); + sampling_subnets, + } + } + + #[test] + fn base_topics_are_always_active() { + let spec = get_spec(); + let s = get_sampling_subnets(); + let topic_config = get_topic_config(&s); + for fork in ForkName::list_all() { + assert!(core_topics_to_subscribe::(fork, &topic_config, &spec,) + .contains(&GossipKind::BeaconBlock)); + } + } + #[test] + fn blobs_are_not_subscribed_in_peerdas() { + let spec = get_spec(); + let s = get_sampling_subnets(); + let topic_config = get_topic_config(&s); + assert!( + !core_topics_to_subscribe::(ForkName::Fulu, &topic_config, &spec,) + .contains(&GossipKind::BlobSidecar(0)) + ); + } + + #[test] + fn columns_are_subscribed_in_peerdas() { + let spec = get_spec(); + let s = get_sampling_subnets(); + let mut topic_config = get_topic_config(&s); + topic_config.subscribe_all_data_column_subnets = true; + assert!( + core_topics_to_subscribe::(ForkName::Fulu, &topic_config, &spec) + .contains(&GossipKind::DataColumnSidecar(0.into())) + ); + } + + #[test] + fn test_core_topics_to_subscribe() { + let spec = get_spec(); + let s = HashSet::from_iter([1, 2].map(DataColumnSubnetId::new)); + let mut topic_config = get_topic_config(&s); + topic_config.enable_light_client_server = true; let latest_fork = *ForkName::list_all().last().unwrap(); - let core_topics = core_topics_to_subscribe::(latest_fork, &spec, &topic_config); + let topics = core_topics_to_subscribe::(latest_fork, &topic_config, &spec); + + let mut expected_topics = vec![ + GossipKind::BeaconBlock, + GossipKind::BeaconAggregateAndProof, + GossipKind::VoluntaryExit, + GossipKind::ProposerSlashing, + GossipKind::AttesterSlashing, + GossipKind::SignedContributionAndProof, + GossipKind::LightClientFinalityUpdate, + GossipKind::LightClientOptimisticUpdate, + GossipKind::BlsToExecutionChange, + ]; + for subnet in s { + expected_topics.push(GossipKind::DataColumnSidecar(subnet)); + } // Need to check all the topics exist in an order independent manner - for topic in all_topics { - assert!(core_topics.contains(&topic)); + for expected_topic in expected_topics { + assert!( + topics.contains(&expected_topic), + "Should contain {:?}", + expected_topic + ); } } } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 1b2a681c644..e1ef57c6ceb 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -181,8 +181,6 @@ pub struct NetworkService { next_fork_subscriptions: Pin>>, /// A delay that expires when we need to unsubscribe from old fork topics. next_unsubscribe: Pin>>, - /// Subscribe to all the subnets once synced. - subscribe_all_subnets: bool, /// Shutdown beacon node after sync is complete. shutdown_after_sync: bool, /// Whether metrics are enabled or not. @@ -191,8 +189,6 @@ pub struct NetworkService { metrics_update: tokio::time::Interval, /// gossipsub_parameter_update timer gossipsub_parameter_update: tokio::time::Interval, - /// enable_light_client_server indicator - enable_light_client_server: bool, /// The logger for the network service. fork_context: Arc, log: slog::Logger, @@ -347,14 +343,12 @@ impl NetworkService { next_fork_update, next_fork_subscriptions, next_unsubscribe, - subscribe_all_subnets: config.subscribe_all_subnets, shutdown_after_sync: config.shutdown_after_sync, metrics_enabled: config.metrics_enabled, metrics_update, gossipsub_parameter_update, fork_context, log: network_log, - enable_light_client_server: config.enable_light_client_server, }; Ok((network_service, network_globals, network_senders)) @@ -713,8 +707,8 @@ impl NetworkService { let mut subscribed_topics: Vec = vec![]; for topic_kind in core_topics_to_subscribe::( self.fork_context.current_fork(), - &self.fork_context.spec, &self.network_globals.as_topic_config(), + &self.fork_context.spec, ) { for fork_digest in self.required_gossip_fork_digests() { let topic = GossipTopic::new( @@ -730,57 +724,18 @@ impl NetworkService { } } - if self.enable_light_client_server { - for light_client_topic_kind in - lighthouse_network::types::LIGHT_CLIENT_GOSSIP_TOPICS.iter() - { - for fork_digest in self.required_gossip_fork_digests() { - let light_client_topic = GossipTopic::new( - light_client_topic_kind.clone(), - GossipEncoding::default(), - fork_digest, - ); - if self.libp2p.subscribe(light_client_topic.clone()) { - subscribed_topics.push(light_client_topic); - } else { - warn!(self.log, "Could not subscribe to topic"; "topic" => %light_client_topic); - } - } - } - } - // If we are to subscribe to all subnets we do it here - if self.subscribe_all_subnets { + if self.network_globals.config.subscribe_all_subnets { for subnet_id in 0..<::EthSpec as EthSpec>::SubnetBitfieldLength::to_u64() { let subnet = Subnet::Attestation(SubnetId::new(subnet_id)); // Update the ENR bitfield self.libp2p.update_enr_subnet(subnet, true); - for fork_digest in self.required_gossip_fork_digests() { - let topic = GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest); - if self.libp2p.subscribe(topic.clone()) { - subscribed_topics.push(topic); - } else { - warn!(self.log, "Could not subscribe to topic"; "topic" => %topic); - } - } } let subnet_max = <::EthSpec as EthSpec>::SyncCommitteeSubnetCount::to_u64(); for subnet_id in 0..subnet_max { let subnet = Subnet::SyncCommittee(SyncSubnetId::new(subnet_id)); // Update the ENR bitfield self.libp2p.update_enr_subnet(subnet, true); - for fork_digest in self.required_gossip_fork_digests() { - let topic = GossipTopic::new( - subnet.into(), - GossipEncoding::default(), - fork_digest, - ); - if self.libp2p.subscribe(topic.clone()) { - subscribed_topics.push(topic); - } else { - warn!(self.log, "Could not subscribe to topic"; "topic" => %topic); - } - } } } @@ -909,8 +864,8 @@ impl NetworkService { fn subscribed_core_topics(&self) -> bool { let core_topics = core_topics_to_subscribe::( self.fork_context.current_fork(), - &self.fork_context.spec, &self.network_globals.as_topic_config(), + &self.fork_context.spec, ); let core_topics: HashSet<&GossipKind> = HashSet::from_iter(&core_topics); let subscriptions = self.network_globals.gossipsub_subscriptions.read(); diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index 230805e86c6..1650001db61 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -712,6 +712,10 @@ impl ChainSpec { } } + pub fn all_data_column_sidecar_subnets(&self) -> impl Iterator { + (0..self.data_column_sidecar_subnet_count).map(DataColumnSubnetId::new) + } + /// Returns a `ChainSpec` compatible with the Ethereum Foundation specification. pub fn mainnet() -> Self { Self {