Skip to content

Commit

Permalink
Subscribe to PeerDAS topics on Fulu fork (#6849)
Browse files Browse the repository at this point in the history
`TODO(das)` now that PeerDAS is scheduled in a hard fork we can subscribe to its topics on the fork activation. In current stable we subscribe to PeerDAS topics as soon as the node starts if PeerDAS is scheduled.

This PR adds another todo to unsubscribe to blob topics at the fork. This other PR included solution for that, but I can include it in a separate PR
- https://github.com/sigp/lighthouse/pull/5899/files


  Include PeerDAS topics as part of Fulu fork in `fork_core_topics`.
  • Loading branch information
dapplion authored Feb 3, 2025
1 parent a088b0b commit 55d1e75
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 50 deletions.
8 changes: 7 additions & 1 deletion beacon_node/lighthouse_network/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -708,11 +708,17 @@ impl<E: EthSpec> Network<E> {
}

// Subscribe to core topics for the new fork
for kind in fork_core_topics::<E>(&new_fork, &self.fork_context.spec) {
for kind in fork_core_topics::<E>(
&new_fork,
&self.fork_context.spec,
&self.network_globals.as_topic_config(),
) {
let topic = GossipTopic::new(kind, GossipEncoding::default(), new_fork_digest);
self.subscribe(topic);
}

// TODO(das): unsubscribe from blob topics at the Fulu fork

// Register the new topics for metrics
let topics_to_keep_metrics_for = attestation_sync_committee_topics::<E>()
.map(|gossip_kind| {
Expand Down
9 changes: 9 additions & 0 deletions beacon_node/lighthouse_network/src/types/globals.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! A collection of variables that are accessible outside of the network thread itself.
use super::TopicConfig;
use crate::peer_manager::peerdb::PeerDB;
use crate::rpc::{MetaData, MetaDataV3};
use crate::types::{BackFillState, SyncState};
Expand Down Expand Up @@ -183,6 +184,14 @@ impl<E: EthSpec> NetworkGlobals<E> {
.collect::<Vec<_>>()
}

/// Returns the TopicConfig to compute the set of Gossip topics for a given fork
pub fn as_topic_config(&self) -> TopicConfig {
TopicConfig {
subscribe_all_data_column_subnets: self.config.subscribe_all_data_column_subnets,
sampling_subnets: &self.sampling_subnets,
}
}

/// TESTING ONLY. Build a dummy NetworkGlobals instance.
pub fn new_test_globals(
trusted_peers: Vec<PeerId>,
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/lighthouse_network/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ pub use subnet::{Subnet, SubnetDiscovery};
pub use sync_state::{BackFillState, SyncState};
pub use topics::{
attestation_sync_committee_topics, core_topics_to_subscribe, fork_core_topics,
subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, ALTAIR_CORE_TOPICS,
BASE_CORE_TOPICS, CAPELLA_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS,
subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, TopicConfig,
ALTAIR_CORE_TOPICS, BASE_CORE_TOPICS, CAPELLA_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS,
};
47 changes: 40 additions & 7 deletions beacon_node/lighthouse_network/src/types/topics.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use gossipsub::{IdentTopic as Topic, TopicHash};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use strum::AsRefStr;
use types::{ChainSpec, DataColumnSubnetId, EthSpec, ForkName, SubnetId, SyncSubnetId, Unsigned};

Expand Down Expand Up @@ -41,8 +42,18 @@ pub const LIGHT_CLIENT_GOSSIP_TOPICS: [GossipKind; 2] = [
GossipKind::LightClientOptimisticUpdate,
];

#[derive(Debug)]
pub struct TopicConfig<'a> {
pub subscribe_all_data_column_subnets: bool,
pub sampling_subnets: &'a HashSet<DataColumnSubnetId>,
}

/// Returns the core topics associated with each fork that are new to the previous fork
pub fn fork_core_topics<E: EthSpec>(fork_name: &ForkName, spec: &ChainSpec) -> Vec<GossipKind> {
pub fn fork_core_topics<E: EthSpec>(
fork_name: &ForkName,
spec: &ChainSpec,
topic_config: &TopicConfig,
) -> Vec<GossipKind> {
match fork_name {
ForkName::Base => BASE_CORE_TOPICS.to_vec(),
ForkName::Altair => ALTAIR_CORE_TOPICS.to_vec(),
Expand All @@ -64,7 +75,21 @@ pub fn fork_core_topics<E: EthSpec>(fork_name: &ForkName, spec: &ChainSpec) -> V
}
electra_blob_topics
}
ForkName::Fulu => vec![],
ForkName::Fulu => {
let mut topics = vec![];
if topic_config.subscribe_all_data_column_subnets {
for column_subnet in 0..spec.data_column_sidecar_subnet_count {
topics.push(GossipKind::DataColumnSidecar(DataColumnSubnetId::new(
column_subnet,
)));
}
} else {
for column_subnet in topic_config.sampling_subnets {
topics.push(GossipKind::DataColumnSidecar(*column_subnet));
}
}
topics
}
}
}

Expand All @@ -84,10 +109,11 @@ pub fn attestation_sync_committee_topics<E: EthSpec>() -> impl Iterator<Item = G
pub fn core_topics_to_subscribe<E: EthSpec>(
mut current_fork: ForkName,
spec: &ChainSpec,
topic_config: &TopicConfig,
) -> Vec<GossipKind> {
let mut topics = fork_core_topics::<E>(&current_fork, spec);
let mut topics = fork_core_topics::<E>(&current_fork, spec, topic_config);
while let Some(previous_fork) = current_fork.previous_fork() {
let previous_fork_topics = fork_core_topics::<E>(&previous_fork, spec);
let previous_fork_topics = fork_core_topics::<E>(&previous_fork, spec, topic_config);
topics.extend(previous_fork_topics);
current_fork = previous_fork;
}
Expand Down Expand Up @@ -475,16 +501,23 @@ mod tests {
type E = MainnetEthSpec;
let spec = E::default_spec();
let mut all_topics = Vec::new();
let mut electra_core_topics = fork_core_topics::<E>(&ForkName::Electra, &spec);
let mut deneb_core_topics = fork_core_topics::<E>(&ForkName::Deneb, &spec);
let topic_config = TopicConfig {
subscribe_all_data_column_subnets: false,
sampling_subnets: &HashSet::from_iter([1, 2].map(DataColumnSubnetId::new)),
};
let mut fulu_core_topics = fork_core_topics::<E>(&ForkName::Fulu, &spec, &topic_config);
let mut electra_core_topics =
fork_core_topics::<E>(&ForkName::Electra, &spec, &topic_config);
let mut deneb_core_topics = fork_core_topics::<E>(&ForkName::Deneb, &spec, &topic_config);
all_topics.append(&mut fulu_core_topics);
all_topics.append(&mut electra_core_topics);
all_topics.append(&mut deneb_core_topics);
all_topics.extend(CAPELLA_CORE_TOPICS);
all_topics.extend(ALTAIR_CORE_TOPICS);
all_topics.extend(BASE_CORE_TOPICS);

let latest_fork = *ForkName::list_all().last().unwrap();
let core_topics = core_topics_to_subscribe::<E>(latest_fork, &spec);
let core_topics = core_topics_to_subscribe::<E>(latest_fork, &spec, &topic_config);
// Need to check all the topics exist in an order independent manner
for topic in all_topics {
assert!(core_topics.contains(&topic));
Expand Down
44 changes: 4 additions & 40 deletions beacon_node/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use task_executor::ShutdownReason;
use tokio::sync::mpsc;
use tokio::time::Sleep;
use types::{
ChainSpec, DataColumnSubnetId, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription,
SyncSubnetId, Unsigned, ValidatorSubscription,
ChainSpec, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription, SyncSubnetId,
Unsigned, ValidatorSubscription,
};

mod tests;
Expand Down Expand Up @@ -181,8 +181,6 @@ pub struct NetworkService<T: BeaconChainTypes> {
next_fork_subscriptions: Pin<Box<OptionFuture<Sleep>>>,
/// A delay that expires when we need to unsubscribe from old fork topics.
next_unsubscribe: Pin<Box<OptionFuture<Sleep>>>,
/// Subscribe to all the data column subnets.
subscribe_all_data_column_subnets: bool,
/// Subscribe to all the subnets once synced.
subscribe_all_subnets: bool,
/// Shutdown beacon node after sync is complete.
Expand Down Expand Up @@ -349,7 +347,6 @@ impl<T: BeaconChainTypes> NetworkService<T> {
next_fork_update,
next_fork_subscriptions,
next_unsubscribe,
subscribe_all_data_column_subnets: config.subscribe_all_data_column_subnets,
subscribe_all_subnets: config.subscribe_all_subnets,
shutdown_after_sync: config.shutdown_after_sync,
metrics_enabled: config.metrics_enabled,
Expand Down Expand Up @@ -717,6 +714,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
for topic_kind in core_topics_to_subscribe::<T::EthSpec>(
self.fork_context.current_fork(),
&self.fork_context.spec,
&self.network_globals.as_topic_config(),
) {
for fork_digest in self.required_gossip_fork_digests() {
let topic = GossipTopic::new(
Expand Down Expand Up @@ -751,10 +749,6 @@ impl<T: BeaconChainTypes> NetworkService<T> {
}
}

if self.fork_context.spec.is_peer_das_scheduled() {
self.subscribe_to_peer_das_topics(&mut subscribed_topics);
}

// If we are to subscribe to all subnets we do it here
if self.subscribe_all_subnets {
for subnet_id in 0..<<T as BeaconChainTypes>::EthSpec as EthSpec>::SubnetBitfieldLength::to_u64() {
Expand Down Expand Up @@ -801,37 +795,6 @@ impl<T: BeaconChainTypes> NetworkService<T> {
}
}

/// Keeping these separate from core topics because it has custom logic:
/// 1. Data column subscription logic depends on subscription configuration.
/// 2. Data column topic subscriptions will be dynamic based on validator balances due to
/// validator custody.
///
/// TODO(das): The downside with not including it in core fork topic is - we subscribe to
/// PeerDAS topics on startup if Fulu is scheduled, rather than waiting until the fork.
/// If this is an issue we could potentially consider adding the logic to
/// `network.subscribe_new_fork_topics()`.
fn subscribe_to_peer_das_topics(&mut self, subscribed_topics: &mut Vec<GossipTopic>) {
let column_subnets_to_subscribe = if self.subscribe_all_data_column_subnets {
&(0..self.fork_context.spec.data_column_sidecar_subnet_count)
.map(DataColumnSubnetId::new)
.collect()
} else {
&self.network_globals.sampling_subnets
};

for column_subnet in column_subnets_to_subscribe.iter() {
for fork_digest in self.required_gossip_fork_digests() {
let gossip_kind = Subnet::DataColumn(*column_subnet).into();
let topic = GossipTopic::new(gossip_kind, GossipEncoding::default(), fork_digest);
if self.libp2p.subscribe(topic.clone()) {
subscribed_topics.push(topic);
} else {
warn!(self.log, "Could not subscribe to topic"; "topic" => %topic);
}
}
}
}

/// Handle a message sent to the network service.
async fn on_validator_subscription_msg(&mut self, msg: ValidatorSubscriptionMessage) {
match msg {
Expand Down Expand Up @@ -947,6 +910,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
let core_topics = core_topics_to_subscribe::<T::EthSpec>(
self.fork_context.current_fork(),
&self.fork_context.spec,
&self.network_globals.as_topic_config(),
);
let core_topics: HashSet<&GossipKind> = HashSet::from_iter(&core_topics);
let subscriptions = self.network_globals.gossipsub_subscriptions.read();
Expand Down

0 comments on commit 55d1e75

Please sign in to comment.