From 55d1e754b4ba3952a23874e29cf8565a079d88e2 Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Mon, 3 Feb 2025 03:07:39 -0300 Subject: [PATCH] Subscribe to PeerDAS topics on Fulu fork (#6849) `TODO(das)` now that PeerDAS is scheduled in a hard fork we can subscribe to its topics on the fork activation. In current stable we subscribe to PeerDAS topics as soon as the node starts if PeerDAS is scheduled. This PR adds another todo to unsubscribe to blob topics at the fork. This other PR included solution for that, but I can include it in a separate PR - https://github.com/sigp/lighthouse/pull/5899/files Include PeerDAS topics as part of Fulu fork in `fork_core_topics`. --- .../lighthouse_network/src/service/mod.rs | 8 +++- .../lighthouse_network/src/types/globals.rs | 9 ++++ .../lighthouse_network/src/types/mod.rs | 4 +- .../lighthouse_network/src/types/topics.rs | 47 ++++++++++++++++--- beacon_node/network/src/service.rs | 44 ++--------------- 5 files changed, 62 insertions(+), 50 deletions(-) diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 354def79b03..8586fd9cd36 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -708,11 +708,17 @@ impl Network { } // Subscribe to core topics for the new fork - for kind in fork_core_topics::(&new_fork, &self.fork_context.spec) { + for kind in fork_core_topics::( + &new_fork, + &self.fork_context.spec, + &self.network_globals.as_topic_config(), + ) { let topic = GossipTopic::new(kind, GossipEncoding::default(), new_fork_digest); self.subscribe(topic); } + // TODO(das): unsubscribe from blob topics at the Fulu fork + // Register the new topics for metrics let topics_to_keep_metrics_for = attestation_sync_committee_topics::() .map(|gossip_kind| { diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index c9e84e2dd11..2800b75133b 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -1,4 +1,5 @@ //! A collection of variables that are accessible outside of the network thread itself. +use super::TopicConfig; use crate::peer_manager::peerdb::PeerDB; use crate::rpc::{MetaData, MetaDataV3}; use crate::types::{BackFillState, SyncState}; @@ -183,6 +184,14 @@ impl NetworkGlobals { .collect::>() } + /// Returns the TopicConfig to compute the set of Gossip topics for a given fork + pub fn as_topic_config(&self) -> TopicConfig { + TopicConfig { + subscribe_all_data_column_subnets: self.config.subscribe_all_data_column_subnets, + sampling_subnets: &self.sampling_subnets, + } + } + /// TESTING ONLY. Build a dummy NetworkGlobals instance. pub fn new_test_globals( trusted_peers: Vec, diff --git a/beacon_node/lighthouse_network/src/types/mod.rs b/beacon_node/lighthouse_network/src/types/mod.rs index a1eedaef746..58ba7588b98 100644 --- a/beacon_node/lighthouse_network/src/types/mod.rs +++ b/beacon_node/lighthouse_network/src/types/mod.rs @@ -17,6 +17,6 @@ pub use subnet::{Subnet, SubnetDiscovery}; pub use sync_state::{BackFillState, SyncState}; pub use topics::{ attestation_sync_committee_topics, core_topics_to_subscribe, fork_core_topics, - subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, ALTAIR_CORE_TOPICS, - BASE_CORE_TOPICS, CAPELLA_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS, + subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, TopicConfig, + ALTAIR_CORE_TOPICS, BASE_CORE_TOPICS, CAPELLA_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS, }; diff --git a/beacon_node/lighthouse_network/src/types/topics.rs b/beacon_node/lighthouse_network/src/types/topics.rs index 2c79f934237..171dab09a35 100644 --- a/beacon_node/lighthouse_network/src/types/topics.rs +++ b/beacon_node/lighthouse_network/src/types/topics.rs @@ -1,5 +1,6 @@ use gossipsub::{IdentTopic as Topic, TopicHash}; use serde::{Deserialize, Serialize}; +use std::collections::HashSet; use strum::AsRefStr; use types::{ChainSpec, DataColumnSubnetId, EthSpec, ForkName, SubnetId, SyncSubnetId, Unsigned}; @@ -41,8 +42,18 @@ pub const LIGHT_CLIENT_GOSSIP_TOPICS: [GossipKind; 2] = [ GossipKind::LightClientOptimisticUpdate, ]; +#[derive(Debug)] +pub struct TopicConfig<'a> { + pub subscribe_all_data_column_subnets: bool, + pub sampling_subnets: &'a HashSet, +} + /// Returns the core topics associated with each fork that are new to the previous fork -pub fn fork_core_topics(fork_name: &ForkName, spec: &ChainSpec) -> Vec { +pub fn fork_core_topics( + fork_name: &ForkName, + spec: &ChainSpec, + topic_config: &TopicConfig, +) -> Vec { match fork_name { ForkName::Base => BASE_CORE_TOPICS.to_vec(), ForkName::Altair => ALTAIR_CORE_TOPICS.to_vec(), @@ -64,7 +75,21 @@ pub fn fork_core_topics(fork_name: &ForkName, spec: &ChainSpec) -> V } electra_blob_topics } - ForkName::Fulu => vec![], + ForkName::Fulu => { + let mut topics = vec![]; + if topic_config.subscribe_all_data_column_subnets { + for column_subnet in 0..spec.data_column_sidecar_subnet_count { + topics.push(GossipKind::DataColumnSidecar(DataColumnSubnetId::new( + column_subnet, + ))); + } + } else { + for column_subnet in topic_config.sampling_subnets { + topics.push(GossipKind::DataColumnSidecar(*column_subnet)); + } + } + topics + } } } @@ -84,10 +109,11 @@ pub fn attestation_sync_committee_topics() -> impl Iterator( mut current_fork: ForkName, spec: &ChainSpec, + topic_config: &TopicConfig, ) -> Vec { - let mut topics = fork_core_topics::(¤t_fork, spec); + let mut topics = fork_core_topics::(¤t_fork, spec, topic_config); while let Some(previous_fork) = current_fork.previous_fork() { - let previous_fork_topics = fork_core_topics::(&previous_fork, spec); + let previous_fork_topics = fork_core_topics::(&previous_fork, spec, topic_config); topics.extend(previous_fork_topics); current_fork = previous_fork; } @@ -475,8 +501,15 @@ mod tests { type E = MainnetEthSpec; let spec = E::default_spec(); let mut all_topics = Vec::new(); - let mut electra_core_topics = fork_core_topics::(&ForkName::Electra, &spec); - let mut deneb_core_topics = fork_core_topics::(&ForkName::Deneb, &spec); + let topic_config = TopicConfig { + subscribe_all_data_column_subnets: false, + sampling_subnets: &HashSet::from_iter([1, 2].map(DataColumnSubnetId::new)), + }; + let mut fulu_core_topics = fork_core_topics::(&ForkName::Fulu, &spec, &topic_config); + let mut electra_core_topics = + fork_core_topics::(&ForkName::Electra, &spec, &topic_config); + let mut deneb_core_topics = fork_core_topics::(&ForkName::Deneb, &spec, &topic_config); + all_topics.append(&mut fulu_core_topics); all_topics.append(&mut electra_core_topics); all_topics.append(&mut deneb_core_topics); all_topics.extend(CAPELLA_CORE_TOPICS); @@ -484,7 +517,7 @@ mod tests { all_topics.extend(BASE_CORE_TOPICS); let latest_fork = *ForkName::list_all().last().unwrap(); - let core_topics = core_topics_to_subscribe::(latest_fork, &spec); + let core_topics = core_topics_to_subscribe::(latest_fork, &spec, &topic_config); // Need to check all the topics exist in an order independent manner for topic in all_topics { assert!(core_topics.contains(&topic)); diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 49f73bf9c8d..1b2a681c644 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -33,8 +33,8 @@ use task_executor::ShutdownReason; use tokio::sync::mpsc; use tokio::time::Sleep; use types::{ - ChainSpec, DataColumnSubnetId, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription, - SyncSubnetId, Unsigned, ValidatorSubscription, + ChainSpec, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription, SyncSubnetId, + Unsigned, ValidatorSubscription, }; mod tests; @@ -181,8 +181,6 @@ pub struct NetworkService { next_fork_subscriptions: Pin>>, /// A delay that expires when we need to unsubscribe from old fork topics. next_unsubscribe: Pin>>, - /// Subscribe to all the data column subnets. - subscribe_all_data_column_subnets: bool, /// Subscribe to all the subnets once synced. subscribe_all_subnets: bool, /// Shutdown beacon node after sync is complete. @@ -349,7 +347,6 @@ impl NetworkService { next_fork_update, next_fork_subscriptions, next_unsubscribe, - subscribe_all_data_column_subnets: config.subscribe_all_data_column_subnets, subscribe_all_subnets: config.subscribe_all_subnets, shutdown_after_sync: config.shutdown_after_sync, metrics_enabled: config.metrics_enabled, @@ -717,6 +714,7 @@ impl NetworkService { for topic_kind in core_topics_to_subscribe::( self.fork_context.current_fork(), &self.fork_context.spec, + &self.network_globals.as_topic_config(), ) { for fork_digest in self.required_gossip_fork_digests() { let topic = GossipTopic::new( @@ -751,10 +749,6 @@ impl NetworkService { } } - if self.fork_context.spec.is_peer_das_scheduled() { - self.subscribe_to_peer_das_topics(&mut subscribed_topics); - } - // If we are to subscribe to all subnets we do it here if self.subscribe_all_subnets { for subnet_id in 0..<::EthSpec as EthSpec>::SubnetBitfieldLength::to_u64() { @@ -801,37 +795,6 @@ impl NetworkService { } } - /// Keeping these separate from core topics because it has custom logic: - /// 1. Data column subscription logic depends on subscription configuration. - /// 2. Data column topic subscriptions will be dynamic based on validator balances due to - /// validator custody. - /// - /// TODO(das): The downside with not including it in core fork topic is - we subscribe to - /// PeerDAS topics on startup if Fulu is scheduled, rather than waiting until the fork. - /// If this is an issue we could potentially consider adding the logic to - /// `network.subscribe_new_fork_topics()`. - fn subscribe_to_peer_das_topics(&mut self, subscribed_topics: &mut Vec) { - let column_subnets_to_subscribe = if self.subscribe_all_data_column_subnets { - &(0..self.fork_context.spec.data_column_sidecar_subnet_count) - .map(DataColumnSubnetId::new) - .collect() - } else { - &self.network_globals.sampling_subnets - }; - - for column_subnet in column_subnets_to_subscribe.iter() { - for fork_digest in self.required_gossip_fork_digests() { - let gossip_kind = Subnet::DataColumn(*column_subnet).into(); - let topic = GossipTopic::new(gossip_kind, GossipEncoding::default(), fork_digest); - if self.libp2p.subscribe(topic.clone()) { - subscribed_topics.push(topic); - } else { - warn!(self.log, "Could not subscribe to topic"; "topic" => %topic); - } - } - } - } - /// Handle a message sent to the network service. async fn on_validator_subscription_msg(&mut self, msg: ValidatorSubscriptionMessage) { match msg { @@ -947,6 +910,7 @@ impl NetworkService { let core_topics = core_topics_to_subscribe::( self.fork_context.current_fork(), &self.fork_context.spec, + &self.network_globals.as_topic_config(), ); let core_topics: HashSet<&GossipKind> = HashSet::from_iter(&core_topics); let subscriptions = self.network_globals.gossipsub_subscriptions.read();