-
Notifications
You must be signed in to change notification settings - Fork 25
feat: operator doppelgänger protection with slot-based detection #692
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 20 commits
dfaf1b4
5e2b95e
fd01236
4237308
be56784
f776d11
eb99a99
3117374
67b7126
4834057
8f98ea0
91836e3
d93ef67
64146a6
e268715
982db2e
892fcf6
8006a91
741c8ad
58170c6
9276f25
632f5d4
db1d9b2
ffc12a2
90b58b1
e0bd8b2
2d1b48c
fc962b0
00c7546
c06e7ce
571c3f8
5a8ba76
433feba
b4212b1
ba86842
de1a339
26ff439
2aa1ffb
56ef9a8
7122d72
885ab36
072924a
716b85c
cd826ae
c56dd46
550aea5
ff3c4a9
0fda8d4
a1a59b3
3ea5c20
ba750d4
42a1756
03f56a0
9ca3edb
fccefd0
d1499db
0d9948a
b2172a6
9b53677
1269e0c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -484,6 +484,43 @@ pub struct Node { | |
| #[clap(long, help = "Disables gossipsub topic scoring.", hide = true)] | ||
| pub disable_gossipsub_topic_scoring: bool, | ||
|
|
||
| // Operator Doppelgänger Protection | ||
| #[clap( | ||
| long, | ||
| help = "Enable operator doppelgänger protection. When enabled, the node will monitor \ | ||
| for messages signed by its operator ID on startup and shut down if a twin \ | ||
| (duplicate operator) is detected. Enabled by default.", | ||
| display_order = 0, | ||
| default_value_t = true, | ||
| help_heading = FLAG_HEADER, | ||
| action = ArgAction::Set | ||
| )] | ||
| pub operator_dg: bool, | ||
|
|
||
| #[clap( | ||
| long, | ||
| value_name = "EPOCHS", | ||
| help = "Number of epochs to wait in monitor mode before starting normal operation. \ | ||
| During this period, the node listens for messages from its own operator ID \ | ||
| to detect if another instance is running.", | ||
| display_order = 0, | ||
| default_value_t = 2, | ||
| requires = "operator_dg" | ||
dknopik marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| )] | ||
| pub operator_dg_wait_epochs: u64, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we are not actually waiting anywhere
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's a bit weird that when we use this, we need to set |
||
|
|
||
| #[clap( | ||
| long, | ||
| value_name = "HEIGHTS", | ||
| help = "The freshness threshold for detecting operator twins. Only messages within \ | ||
| this many consensus heights from the maximum observed height are considered \ | ||
| fresh evidence of a twin. This prevents false positives from replayed old messages.", | ||
| display_order = 0, | ||
| default_value_t = 3, | ||
| requires = "operator_dg" | ||
dknopik marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| )] | ||
| pub operator_dg_fresh_k: u64, | ||
diegomrsantos marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| #[clap(flatten)] | ||
| pub logging_flags: FileLoggingFlags, | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,13 +3,14 @@ pub mod config; | |
| mod key; | ||
| mod metrics; | ||
| mod notifier; | ||
| mod operator_doppelganger; | ||
|
|
||
| use std::{ | ||
| fs::File, | ||
| io::Read, | ||
| net::SocketAddr, | ||
| path::Path, | ||
| sync::Arc, | ||
| sync::{Arc, Mutex}, | ||
| time::{Duration, SystemTime, UNIX_EPOCH}, | ||
| }; | ||
|
|
||
|
|
@@ -31,7 +32,7 @@ use eth2::{ | |
| BeaconNodeHttpClient, Timeouts, | ||
| reqwest::{Certificate, ClientBuilder}, | ||
| }; | ||
| use message_receiver::NetworkMessageReceiver; | ||
| use message_receiver::{DoppelgangerChecker, DoppelgangerConfig, NetworkMessageReceiver}; | ||
| use message_sender::{MessageSender, NetworkMessageSender, impostor::ImpostorMessageSender}; | ||
| use message_validator::Validator; | ||
| use network::Network; | ||
|
|
@@ -47,7 +48,7 @@ use task_executor::TaskExecutor; | |
| use tokio::{ | ||
| net::TcpListener, | ||
| select, | ||
| sync::{mpsc, mpsc::unbounded_channel}, | ||
| sync::{mpsc, mpsc::unbounded_channel, oneshot}, | ||
| time::{Instant, interval, sleep}, | ||
| }; | ||
| use tracing::{debug, error, info, warn}; | ||
|
|
@@ -63,7 +64,10 @@ use validator_services::{ | |
| sync_committee_service::SyncCommitteeService, | ||
| }; | ||
|
|
||
| use crate::{key::read_or_generate_private_key, notifier::spawn_notifier}; | ||
| use crate::{ | ||
| key::read_or_generate_private_key, notifier::spawn_notifier, | ||
| operator_doppelganger::OperatorDoppelgangerService, | ||
| }; | ||
|
|
||
| /// Specific timeout constants for HTTP requests involved in different validator duties. | ||
| /// This can help ensure that proper endpoint fallback occurs. | ||
|
|
@@ -83,6 +87,90 @@ const HTTP_DEFAULT_TIMEOUT_QUOTIENT: u32 = 4; | |
|
|
||
| pub struct Client {} | ||
|
|
||
| /// Initialize operator doppelgänger protection if enabled | ||
| /// | ||
| /// Returns `(DoppelgangerConfig, Option<watch::Receiver<bool>>)` where: | ||
| /// - `DoppelgangerConfig` contains the checker callback and shutdown channel for message receiver | ||
| /// - `watch::Receiver<bool>` broadcasts monitoring status (true = monitoring, false = active) | ||
| fn initialize_operator_doppelganger<E: EthSpec>( | ||
| operator_dg: bool, | ||
| operator_dg_wait_epochs: u64, | ||
| operator_dg_fresh_k: u64, | ||
| operator_id: &OwnOperatorId, | ||
| slot_clock: &SystemTimeSlotClock, | ||
| slot_duration: Duration, | ||
| executor: &TaskExecutor, | ||
| ) -> Result< | ||
| ( | ||
| Option<DoppelgangerConfig>, | ||
| Option<tokio::sync::watch::Receiver<bool>>, | ||
| ), | ||
| String, | ||
| > { | ||
| if !operator_dg { | ||
| return Ok((None, None)); | ||
| } | ||
|
|
||
| let own_operator_id = operator_id | ||
| .get() | ||
| .ok_or_else(|| "Operator ID not yet available".to_string())?; | ||
|
||
|
|
||
| let current_epoch = slot_clock | ||
| .now() | ||
| .ok_or_else(|| "Unable to read current slot".to_string())? | ||
| .epoch(E::slots_per_epoch()); | ||
|
|
||
| let (service, is_monitoring_rx) = OperatorDoppelgangerService::<E, _>::new( | ||
| own_operator_id, | ||
| slot_clock.clone(), | ||
| current_epoch, | ||
| operator_dg_wait_epochs, | ||
| operator_dg_fresh_k, | ||
| slot_duration, | ||
| ); | ||
| let doppelganger_service = Arc::new(service); | ||
|
|
||
| // Spawn background task to watch for monitoring period end | ||
| doppelganger_service.clone().spawn_monitor_task(executor); | ||
|
|
||
| // Create shutdown channel | ||
| let (shutdown_tx, shutdown_rx) = oneshot::channel(); | ||
|
|
||
| // Create checker callback | ||
| let service = doppelganger_service.clone(); | ||
| let checker: DoppelgangerChecker = | ||
| Box::new(move |signed_msg, qbft_msg| service.check_message(signed_msg, qbft_msg)); | ||
|
|
||
| // Spawn task to listen for shutdown signal | ||
| let executor_clone = executor.clone(); | ||
| executor.spawn_without_exit( | ||
| async move { | ||
| if shutdown_rx.await.is_ok() { | ||
| error!( | ||
| "Operator doppelgänger detected! Initiating fatal shutdown to prevent equivocation." | ||
| ); | ||
| // Give time for the error log to be flushed | ||
| tokio::time::sleep(Duration::from_millis(100)).await; | ||
| // Trigger executor shutdown with failure reason | ||
| let _ = executor_clone | ||
| .shutdown_sender() | ||
| .try_send(task_executor::ShutdownReason::Failure( | ||
| "Operator doppelgänger detected", | ||
| )); | ||
| } | ||
| }, | ||
| "doppelganger-shutdown", | ||
| ); | ||
diegomrsantos marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| Ok(( | ||
| Some(DoppelgangerConfig { | ||
| checker: Arc::new(checker), | ||
| shutdown_tx: Arc::new(Mutex::new(Some(shutdown_tx))), | ||
| }), | ||
| Some(is_monitoring_rx), | ||
| )) | ||
| } | ||
|
|
||
| impl Client { | ||
| /// Runs the Anchor Client | ||
| pub async fn run<E: EthSpec>(executor: TaskExecutor, config: Config) -> Result<(), String> { | ||
|
|
@@ -466,6 +554,17 @@ impl Client { | |
|
|
||
| let (outcome_tx, outcome_rx) = mpsc::channel::<message_receiver::Outcome>(9000); | ||
|
|
||
| // Initialize operator doppelgänger protection if enabled | ||
| let (doppelganger_config, is_monitoring) = initialize_operator_doppelganger::<E>( | ||
| config.operator_dg && config.impostor.is_none(), | ||
| config.operator_dg_wait_epochs, | ||
| config.operator_dg_fresh_k, | ||
| &operator_id, | ||
| &slot_clock, | ||
| Duration::from_secs(spec.seconds_per_slot), | ||
| &executor, | ||
| )?; | ||
diegomrsantos marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| let message_receiver = NetworkMessageReceiver::new( | ||
| processor_senders.clone(), | ||
| qbft_manager.clone(), | ||
|
|
@@ -474,6 +573,7 @@ impl Client { | |
| is_synced.clone(), | ||
| outcome_tx, | ||
| message_validator, | ||
| doppelganger_config, | ||
| ); | ||
|
|
||
| // Start the p2p network | ||
|
|
@@ -573,6 +673,20 @@ impl Client { | |
| .map_err(|_| "Sync watch channel closed")?; | ||
| info!("Sync complete, starting services..."); | ||
|
|
||
| // Wait for operator doppelgänger monitoring to complete | ||
| if let Some(is_monitoring) = &is_monitoring { | ||
| info!( | ||
| wait_epochs = config.operator_dg_wait_epochs, | ||
| "Waiting for operator doppelgänger monitoring to complete before starting services..." | ||
| ); | ||
| is_monitoring | ||
| .clone() | ||
| .wait_for(|&is_monitoring| !is_monitoring) // Wait until NOT monitoring | ||
| .await | ||
| .map_err(|_| "Monitoring watch channel closed")?; | ||
| info!("Operator doppelgänger monitoring complete, starting services..."); | ||
| } | ||
|
|
||
| let mut block_service_builder = BlockServiceBuilder::new() | ||
| .slot_clock(slot_clock.clone()) | ||
| .validator_store(validator_store.clone()) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,4 @@ | ||
| mod service; | ||
| mod state; | ||
|
|
||
| pub use service::OperatorDoppelgangerService; |
Uh oh!
There was an error while loading. Please reload this page.