Skip to content

Commit 770e1e5

Browse files
committed
Fix custdoy context initialisation race condition.
1 parent 93b8f46 commit 770e1e5

File tree

7 files changed

+64
-49
lines changed

7 files changed

+64
-49
lines changed

beacon_node/beacon_chain/src/builder.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use std::time::Duration;
4040
use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp};
4141
use task_executor::{ShutdownReason, TaskExecutor};
4242
use tracing::{debug, error, info};
43+
use types::data_column_custody_group::{CustodyIndex, get_custody_groups_ordered};
4344
use types::{
4445
BeaconBlock, BeaconState, BlobSidecarList, ChainSpec, DataColumnSidecarList, Epoch, EthSpec,
4546
FixedBytesExtended, Hash256, Signature, SignedBeaconBlock, Slot,
@@ -102,6 +103,7 @@ pub struct BeaconChainBuilder<T: BeaconChainTypes> {
102103
task_executor: Option<TaskExecutor>,
103104
validator_monitor_config: Option<ValidatorMonitorConfig>,
104105
node_custody_type: NodeCustodyType,
106+
node_id: Option<[u8; 32]>,
105107
rng: Option<Box<dyn RngCore + Send>>,
106108
}
107109

@@ -141,6 +143,7 @@ where
141143
task_executor: None,
142144
validator_monitor_config: None,
143145
node_custody_type: NodeCustodyType::Fullnode,
146+
node_id: None,
144147
rng: None,
145148
}
146149
}
@@ -647,6 +650,11 @@ where
647650
self
648651
}
649652

653+
pub fn node_id(mut self, node_id: [u8; 32]) -> Self {
654+
self.node_id = Some(node_id);
655+
self
656+
}
657+
650658
/// Sets the `BeaconChain` event handler backend.
651659
///
652660
/// For example, provide `ServerSentEventHandler` as a `handler`.
@@ -740,6 +748,7 @@ where
740748
.genesis_state_root
741749
.ok_or("Cannot build without a genesis state root")?;
742750
let validator_monitor_config = self.validator_monitor_config.unwrap_or_default();
751+
let node_id = self.node_id.ok_or("Cannot build without a node id")?;
743752
let rng = self.rng.ok_or("Cannot build without an RNG")?;
744753
let beacon_proposer_cache: Arc<Mutex<BeaconProposerCache>> = <_>::default();
745754

@@ -929,6 +938,10 @@ where
929938
}
930939
};
931940

941+
let all_custody_groups_ordered =
942+
get_custody_groups_ordered(node_id, self.spec.number_of_custody_groups, &self.spec)
943+
.map_err(|e| format!("Failed to compute custody groups: {:?}", e))?;
944+
932945
// Load the persisted custody context from the db and initialize
933946
// the context for this run
934947
let (custody_context, cgc_changed_opt) = if let Some(custody) =
@@ -942,11 +955,16 @@ where
942955
custody,
943956
self.node_custody_type,
944957
head_epoch,
958+
all_custody_groups_ordered,
945959
&self.spec,
946960
)
947961
} else {
948962
(
949-
CustodyContext::new(self.node_custody_type, &self.spec),
963+
CustodyContext::new(
964+
self.node_custody_type,
965+
all_custody_groups_ordered,
966+
&self.spec,
967+
),
950968
None,
951969
)
952970
};

beacon_node/beacon_chain/src/custody_context.rs

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ pub struct CustodyContext<E: EthSpec> {
250250
validator_registrations: RwLock<ValidatorRegistrations>,
251251
/// Stores an immutable, ordered list of all custody columns as determined by the node's NodeID
252252
/// on startup.
253-
all_custody_columns_ordered: OnceLock<Box<[ColumnIndex]>>,
253+
all_custody_columns_ordered: Vec<ColumnIndex>,
254254
_phantom_data: PhantomData<E>,
255255
}
256256

@@ -259,15 +259,22 @@ impl<E: EthSpec> CustodyContext<E> {
259259
/// exists.
260260
///
261261
/// The `node_custody_type` value is based on current cli parameters.
262-
pub fn new(node_custody_type: NodeCustodyType, spec: &ChainSpec) -> Self {
262+
pub fn new(
263+
node_custody_type: NodeCustodyType,
264+
all_custody_groups_ordered: Vec<CustodyIndex>,
265+
spec: &ChainSpec,
266+
) -> Self {
263267
let cgc_override = node_custody_type.get_custody_count_override(spec);
264268
// If there's no override, we initialise `validator_custody_count` to 0. This has been the
265269
// existing behaviour and we maintain this for now to avoid a semantic schema change until
266270
// a later release.
271+
// FIXME: remove unwrap
272+
let all_custody_columns_ordered =
273+
Self::compute_ordered_data_columns(all_custody_groups_ordered, spec).unwrap();
267274
Self {
268275
validator_custody_count: AtomicU64::new(cgc_override.unwrap_or(0)),
269276
validator_registrations: RwLock::new(ValidatorRegistrations::new(cgc_override)),
270-
all_custody_columns_ordered: OnceLock::new(),
277+
all_custody_columns_ordered,
271278
_phantom_data: PhantomData,
272279
}
273280
}
@@ -290,6 +297,7 @@ impl<E: EthSpec> CustodyContext<E> {
290297
ssz_context: CustodyContextSsz,
291298
node_custody_type: NodeCustodyType,
292299
head_epoch: Epoch,
300+
all_custody_groups_ordered: Vec<CustodyIndex>,
293301
spec: &ChainSpec,
294302
) -> (Self, Option<CustodyCountChanged>) {
295303
let CustodyContextSsz {
@@ -347,6 +355,10 @@ impl<E: EthSpec> CustodyContext<E> {
347355
}
348356
}
349357

358+
// FIXME: remove unwrap
359+
let all_custody_columns_ordered =
360+
Self::compute_ordered_data_columns(all_custody_groups_ordered, spec).unwrap();
361+
350362
let custody_context = CustodyContext {
351363
validator_custody_count: AtomicU64::new(validator_custody_at_head),
352364
validator_registrations: RwLock::new(ValidatorRegistrations {
@@ -355,7 +367,7 @@ impl<E: EthSpec> CustodyContext<E> {
355367
.into_iter()
356368
.collect(),
357369
}),
358-
all_custody_columns_ordered: OnceLock::new(),
370+
all_custody_columns_ordered,
359371
_phantom_data: PhantomData,
360372
};
361373

@@ -370,22 +382,17 @@ impl<E: EthSpec> CustodyContext<E> {
370382
///
371383
/// # Returns
372384
/// Ok(()) if initialization succeeds, Err with description string if it fails
373-
pub fn init_ordered_data_columns_from_custody_groups(
374-
&self,
385+
fn compute_ordered_data_columns(
375386
all_custody_groups_ordered: Vec<CustodyIndex>,
376387
spec: &ChainSpec,
377-
) -> Result<(), String> {
388+
) -> Result<Vec<ColumnIndex>, String> {
378389
let mut ordered_custody_columns = vec![];
379390
for custody_index in all_custody_groups_ordered {
380391
let columns = compute_columns_for_custody_group::<E>(custody_index, spec)
381392
.map_err(|e| format!("Failed to compute columns for custody group {e:?}"))?;
382393
ordered_custody_columns.extend(columns);
383394
}
384-
self.all_custody_columns_ordered
385-
.set(ordered_custody_columns.into_boxed_slice())
386-
.map_err(|_| {
387-
"Failed to initialise CustodyContext with computed custody columns".to_string()
388-
})
395+
Ok(ordered_custody_columns)
389396
}
390397

391398
/// Register a new validator index and updates the list of validators if required.
@@ -497,11 +504,7 @@ impl<E: EthSpec> CustodyContext<E> {
497504
/// A slice of ordered column indices that should be sampled for this epoch based on the node's custody configuration
498505
pub fn sampling_columns_for_epoch(&self, epoch: Epoch, spec: &ChainSpec) -> &[ColumnIndex] {
499506
let num_of_columns_to_sample = self.num_of_data_columns_to_sample(epoch, spec);
500-
let all_columns_ordered = self
501-
.all_custody_columns_ordered
502-
.get()
503-
.expect("all_custody_columns_ordered should be initialized");
504-
&all_columns_ordered[..num_of_columns_to_sample]
507+
&self.all_custody_columns_ordered[..num_of_columns_to_sample]
505508
}
506509

507510
/// Returns the ordered list of column indices that the node is assigned to custody
@@ -528,12 +531,7 @@ impl<E: EthSpec> CustodyContext<E> {
528531
self.custody_group_count_at_head(spec) as usize
529532
};
530533

531-
let all_columns_ordered = self
532-
.all_custody_columns_ordered
533-
.get()
534-
.expect("all_custody_columns_ordered should be initialized");
535-
536-
&all_columns_ordered[..custody_group_count]
534+
&self.all_custody_columns_ordered[..custody_group_count]
537535
}
538536

539537
/// The node has completed backfill for this epoch. Update the internal records so the function

beacon_node/client/src/builder.rs

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::Client;
22
use crate::compute_light_client_updates::{
33
LIGHT_CLIENT_SERVER_CHANNEL_CAPACITY, compute_light_client_updates,
44
};
5-
use crate::config::{ClientGenesis, Config as ClientConfig};
5+
use crate::config::{ClientGenesis, Config as ClientConfig, Config};
66
use crate::notifier::spawn_notifier;
77
use beacon_chain::attestation_simulator::start_attestation_simulator_service;
88
use beacon_chain::data_availability_checker::start_availability_cache_maintenance_service;
@@ -28,6 +28,8 @@ use execution_layer::ExecutionLayer;
2828
use execution_layer::test_utils::generate_genesis_header;
2929
use futures::channel::mpsc::Receiver;
3030
use genesis::{DEFAULT_ETH1_BLOCK_HASH, interop_genesis_state};
31+
use lighthouse_network::discv5::enr::NodeId;
32+
use lighthouse_network::identity::Keypair;
3133
use lighthouse_network::{NetworkGlobals, prometheus_client::registry::Registry};
3234
use monitoring_api::{MonitoringHttpClient, ProcessType};
3335
use network::{NetworkConfig, NetworkSenders, NetworkService};
@@ -42,7 +44,7 @@ use std::time::{SystemTime, UNIX_EPOCH};
4244
use store::database::interface::BeaconNodeBackend;
4345
use timer::spawn_timer;
4446
use tracing::{debug, info, warn};
45-
use types::data_column_custody_group::get_custody_groups_ordered;
47+
use types::data_column_custody_group::{CustodyIndex, get_custody_groups_ordered};
4648
use types::{
4749
BeaconState, BlobSidecarList, ChainSpec, EthSpec, ExecutionBlockHash, Hash256,
4850
SignedBeaconBlock, test_utils::generate_deterministic_keypairs,
@@ -154,6 +156,7 @@ where
154156
mut self,
155157
client_genesis: ClientGenesis,
156158
config: ClientConfig,
159+
local_keypair: Keypair,
157160
) -> Result<Self, String> {
158161
let store = self.store.clone();
159162
let chain_spec = self.chain_spec.clone();
@@ -203,6 +206,7 @@ where
203206
.event_handler(event_handler)
204207
.execution_layer(execution_layer)
205208
.node_custody_type(config.chain.node_custody_type)
209+
.node_id(NodeId::from(local_keypair.public()).raw())
206210
.validator_monitor_config(config.validator_monitor.clone())
207211
.rng(Box::new(
208212
StdRng::try_from_rng(&mut OsRng)
@@ -453,7 +457,11 @@ where
453457
}
454458

455459
/// Starts the networking stack.
456-
pub async fn network(mut self, config: Arc<NetworkConfig>) -> Result<Self, String> {
460+
pub async fn network(
461+
mut self,
462+
config: Arc<NetworkConfig>,
463+
local_keypair: Keypair,
464+
) -> Result<Self, String> {
457465
let beacon_chain = self
458466
.beacon_chain
459467
.clone()
@@ -481,12 +489,11 @@ where
481489
context.executor,
482490
libp2p_registry.as_mut(),
483491
beacon_processor_channels.beacon_processor_tx.clone(),
492+
local_keypair,
484493
)
485494
.await
486495
.map_err(|e| format!("Failed to start network: {:?}", e))?;
487496

488-
init_custody_context(beacon_chain, &network_globals)?;
489-
490497
self.network_globals = Some(network_globals);
491498
self.network_senders = Some(network_senders);
492499
self.libp2p_registry = libp2p_registry;
@@ -788,21 +795,6 @@ where
788795
}
789796
}
790797

791-
fn init_custody_context<T: BeaconChainTypes>(
792-
chain: Arc<BeaconChain<T>>,
793-
network_globals: &NetworkGlobals<T::EthSpec>,
794-
) -> Result<(), String> {
795-
let node_id = network_globals.local_enr().node_id().raw();
796-
let spec = &chain.spec;
797-
let custody_groups_ordered =
798-
get_custody_groups_ordered(node_id, spec.number_of_custody_groups, spec)
799-
.map_err(|e| format!("Failed to compute custody groups: {:?}", e))?;
800-
chain
801-
.data_availability_checker
802-
.custody_context()
803-
.init_ordered_data_columns_from_custody_groups(custody_groups_ordered, spec)
804-
}
805-
806798
impl<TSlotClock, E, THotStore, TColdStore>
807799
ClientBuilder<Witness<TSlotClock, E, THotStore, TColdStore>>
808800
where

beacon_node/lighthouse_network/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ pub use discovery::Eth2Enr;
109109
pub use discv5;
110110
pub use gossipsub::{IdentTopic, MessageAcceptance, MessageId, Topic, TopicHash};
111111
pub use libp2p;
112-
pub use libp2p::{Multiaddr, multiaddr};
112+
pub use libp2p::{Multiaddr, identity, multiaddr};
113113
pub use libp2p::{PeerId, Swarm, core::ConnectedPoint};
114114
pub use peer_manager::{
115115
ConnectionDirection, PeerConnectionStatus, PeerInfo, PeerManager, SyncInfo, SyncStatus,

beacon_node/lighthouse_network/src/service/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use gossipsub::{
2626
TopicScoreParams,
2727
};
2828
use gossipsub_scoring_parameters::{PeerScoreSettings, lighthouse_gossip_thresholds};
29+
use libp2p::identity::Keypair;
2930
use libp2p::multiaddr::{self, Multiaddr, Protocol as MProtocol};
3031
use libp2p::swarm::behaviour::toggle::Toggle;
3132
use libp2p::swarm::{NetworkBehaviour, Swarm, SwarmEvent};
@@ -171,11 +172,10 @@ impl<E: EthSpec> Network<E> {
171172
executor: task_executor::TaskExecutor,
172173
mut ctx: ServiceContext<'_>,
173174
custody_group_count: u64,
175+
local_keypair: Keypair,
174176
) -> Result<(Self, Arc<NetworkGlobals<E>>), String> {
175177
let config = ctx.config.clone();
176178
trace!("Libp2p Service starting");
177-
// initialise the node's ID
178-
let local_keypair = utils::load_private_key(&config);
179179

180180
// Trusted peers will also be marked as explicit in GossipSub.
181181
// Cfr. https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#explicit-peering-agreements

beacon_node/network/src/service.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use futures::future::OptionFuture;
1212
use futures::prelude::*;
1313

1414
use lighthouse_network::Enr;
15+
use lighthouse_network::identity::Keypair;
1516
use lighthouse_network::rpc::InboundRequestId;
1617
use lighthouse_network::rpc::RequestType;
1718
use lighthouse_network::rpc::methods::RpcResponse;
@@ -212,6 +213,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
212213
executor: task_executor::TaskExecutor,
213214
libp2p_registry: Option<&'_ mut Registry>,
214215
beacon_processor_send: BeaconProcessorSend<T::EthSpec>,
216+
local_keypair: Keypair,
215217
) -> Result<
216218
(
217219
NetworkService<T>,
@@ -284,6 +286,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
284286
.data_availability_checker
285287
.custody_context()
286288
.custody_group_count_at_head(&beacon_chain.spec),
289+
local_keypair,
287290
)
288291
.await?;
289292

@@ -366,13 +369,15 @@ impl<T: BeaconChainTypes> NetworkService<T> {
366369
executor: task_executor::TaskExecutor,
367370
libp2p_registry: Option<&'_ mut Registry>,
368371
beacon_processor_send: BeaconProcessorSend<T::EthSpec>,
372+
local_keypair: Keypair,
369373
) -> Result<(Arc<NetworkGlobals<T::EthSpec>>, NetworkSenders<T::EthSpec>), String> {
370374
let (network_service, network_globals, network_senders) = Self::build(
371375
beacon_chain,
372376
config,
373377
executor.clone(),
374378
libp2p_registry,
375379
beacon_processor_send,
380+
local_keypair,
376381
)
377382
.await?;
378383

beacon_node/src/lib.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ pub use client::{Client, ClientBuilder, ClientConfig, ClientGenesis};
99
pub use config::{get_config, get_data_dir, set_network_config};
1010
use environment::RuntimeContext;
1111
pub use eth2_config::Eth2Config;
12+
use lighthouse_network::load_private_key;
1213
use slasher::{DatabaseBackendOverride, Slasher};
1314
use std::ops::{Deref, DerefMut};
1415
use std::sync::Arc;
@@ -120,8 +121,9 @@ impl<E: EthSpec> ProductionBeaconNode<E> {
120121
builder
121122
};
122123

124+
let local_keypair = load_private_key(&client_config.network);
123125
let builder = builder
124-
.beacon_chain_builder(client_genesis, client_config.clone())
126+
.beacon_chain_builder(client_genesis, client_config.clone(), local_keypair)
125127
.await?;
126128
info!("Block production enabled");
127129

@@ -133,7 +135,7 @@ impl<E: EthSpec> ProductionBeaconNode<E> {
133135

134136
builder
135137
.build_beacon_chain()?
136-
.network(Arc::new(client_config.network))
138+
.network(Arc::new(client_config.network), local_keypair)
137139
.await?
138140
.notifier()?
139141
.http_metrics_config(client_config.http_metrics.clone())

0 commit comments

Comments
 (0)