Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
dfaf1b4
feat: add CLI config for operator doppelgänger protection
diegomrsantos Oct 15, 2025
5e2b95e
feat: implement operator doppelgänger detection service
diegomrsantos Oct 15, 2025
fd01236
feat: integrate operator doppelgänger detection with message receiver
diegomrsantos Oct 15, 2025
4237308
Merge branch 'unstable' into feat/operator-doppelganger-protection
diegomrsantos Oct 15, 2025
be56784
Merge branch 'unstable' into feat/operator-doppelganger-protection
diegomrsantos Oct 16, 2025
f776d11
Merge branch 'unstable' into feat/operator-doppelganger-protection
diegomrsantos Oct 17, 2025
eb99a99
fix: pass current epoch explicitly instead of using unwrap_or_else
diegomrsantos Oct 17, 2025
3117374
fix: handle slot clock read failure in doppelgänger check
diegomrsantos Oct 17, 2025
67b7126
refactor: simplify doppelgänger state management with Mutex
diegomrsantos Oct 17, 2025
4834057
chore: change stale message log from warn to debug
diegomrsantos Oct 17, 2025
8f98ea0
refactor: remove redundant enabled field from doppelgänger service
diegomrsantos Oct 20, 2025
91836e3
test: add comprehensive tests for doppelgänger service
diegomrsantos Oct 20, 2025
d93ef67
Merge branch 'unstable' into feat/operator-doppelganger-protection
diegomrsantos Oct 20, 2025
64146a6
refactor: extract operator doppelgänger initialization and add monito…
diegomrsantos Oct 20, 2025
e268715
refactor: apply best practices to operator doppelgänger feature
diegomrsantos Oct 20, 2025
982db2e
refactor: parameterize slot duration in operator doppelganger service
diegomrsantos Oct 20, 2025
892fcf6
refactor: remove update_and_check_freshness in favor of separate oper…
diegomrsantos Oct 20, 2025
8006a91
test: remove redundant operator doppelganger tests
diegomrsantos Oct 21, 2025
741c8ad
test: remove redundant initial state test
diegomrsantos Oct 21, 2025
58170c6
lint
diegomrsantos Oct 21, 2025
9276f25
refactor: extract operator_doppelganger to separate crate
diegomrsantos Oct 21, 2025
632f5d4
refactor: remove blocking wait for operator doppelganger monitoring
diegomrsantos Oct 21, 2025
db1d9b2
refactor: simplify operator doppelganger by removing intermediary cha…
diegomrsantos Oct 21, 2025
ffc12a2
Merge branch 'unstable' into feat/operator-doppelganger-protection
diegomrsantos Oct 22, 2025
90b58b1
docs: add architectural principles to CLAUDE.md
diegomrsantos Oct 22, 2025
e0bd8b2
refactor: replace height-based with grace period approach for operato…
diegomrsantos Oct 22, 2025
2d1b48c
style: apply cargo fmt
diegomrsantos Oct 22, 2025
fc962b0
feat: expand doppelgänger detection to all operator-signed messages
diegomrsantos Oct 22, 2025
00c7546
chore: simplify redundant grace period comments
diegomrsantos Oct 22, 2025
c06e7ce
refactor: simplify operator doppelgänger state management
diegomrsantos Oct 22, 2025
571c3f8
refactor: replace epoch-based monitoring with single sleep timer
diegomrsantos Oct 22, 2025
5a8ba76
refactor: remove unnecessary create_operator_doppelganger wrapper
diegomrsantos Oct 22, 2025
433feba
refactor: remove unnecessary generics from OperatorDoppelgangerService
diegomrsantos Oct 22, 2025
b4212b1
test: add async timer tests for operator doppelgänger monitoring
diegomrsantos Oct 22, 2025
ba86842
refactor: convert all detection logic tests to use async timers
diegomrsantos Oct 22, 2025
de1a339
refactor: simplify async timer tests by using single yield
diegomrsantos Oct 22, 2025
26ff439
refactor: replace boolean flags with explicit state enum
diegomrsantos Oct 22, 2025
2aa1ffb
refactor: move DoppelgangerState to private implementation
diegomrsantos Oct 22, 2025
56ef9a8
chore: remove unused dependencies from operator_doppelganger
diegomrsantos Oct 22, 2025
7122d72
perf: use RwLock for read-optimized state access
diegomrsantos Oct 22, 2025
885ab36
feat: block all outgoing messages during doppelgänger monitoring
diegomrsantos Oct 22, 2025
072924a
Merge branch 'unstable' into feat/operator-doppelganger-protection
diegomrsantos Oct 23, 2025
716b85c
docs: update CLAUDE.md with session learnings
diegomrsantos Oct 24, 2025
cd826ae
Merge branch 'unstable' into feat/operator-doppelganger-protection
diegomrsantos Oct 24, 2025
c56dd46
Merge branch 'unstable' into feat/operator-doppelganger-protection
diegomrsantos Oct 24, 2025
550aea5
Merge branch 'unstable' into feat/operator-doppelganger-protection
diegomrsantos Oct 25, 2025
ff3c4a9
refactor: align operator doppelgänger grace period with message TTL w…
diegomrsantos Oct 25, 2025
0fda8d4
fix: block outgoing messages during entire doppelgänger protection wi…
diegomrsantos Oct 27, 2025
a1a59b3
docs: update operator doppelgänger CLI help text
diegomrsantos Oct 27, 2025
3ea5c20
refactor: replace grace period with slot-based operator doppelgänger …
diegomrsantos Oct 29, 2025
ba750d4
refactor: pass ValidatedSSVMessage to eliminate redundant SSZ decoding
diegomrsantos Oct 30, 2025
42a1756
refactor: remove unnecessary start_operator_doppelganger wrapper
diegomrsantos Oct 30, 2025
03f56a0
refactor: simplify doppelgänger protection with blocking monitoring
diegomrsantos Oct 30, 2025
9ca3edb
Merge branch 'unstable' into feat/operator-doppelganger-protection
diegomrsantos Nov 3, 2025
fccefd0
remove unused dep
diegomrsantos Nov 3, 2025
d1499db
refactor: notifier with layered state architecture
diegomrsantos Nov 4, 2025
0d9948a
Merge branch 'unstable' into feat/operator-doppelganger-protection
diegomrsantos Nov 5, 2025
b2172a6
refactor: store operator_id in OperatorState enum to eliminate unsafe…
diegomrsantos Nov 5, 2025
9b53677
refactor: simplify startup_slot initialization
diegomrsantos Nov 5, 2025
1269e0c
feat: disable operator doppelgänger protection by default
diegomrsantos Nov 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions anchor/client/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,43 @@ pub struct Node {
#[clap(long, help = "Disables gossipsub topic scoring.", hide = true)]
pub disable_gossipsub_topic_scoring: bool,

// Operator Doppelgänger Protection
#[clap(
long,
help = "Enable operator doppelgänger protection. When enabled, the node will monitor \
for messages signed by its operator ID on startup and shut down if a twin \
(duplicate operator) is detected. Enabled by default.",
display_order = 0,
default_value_t = true,
help_heading = FLAG_HEADER,
action = ArgAction::Set
)]
pub operator_dg: bool,

#[clap(
long,
value_name = "EPOCHS",
help = "Number of epochs to wait in monitor mode before starting normal operation. \
During this period, the node listens for messages from its own operator ID \
to detect if another instance is running.",
display_order = 0,
default_value_t = 2,
requires = "operator_dg"
)]
pub operator_dg_wait_epochs: u64,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are not actually waiting anywhere

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a bit weird that when we use this, we need to set --operator-dg=true


#[clap(
long,
value_name = "HEIGHTS",
help = "The freshness threshold for detecting operator twins. Only messages within \
this many consensus heights from the maximum observed height are considered \
fresh evidence of a twin. This prevents false positives from replayed old messages.",
display_order = 0,
default_value_t = 3,
requires = "operator_dg"
)]
pub operator_dg_fresh_k: u64,

#[clap(flatten)]
pub logging_flags: FileLoggingFlags,
}
14 changes: 14 additions & 0 deletions anchor/client/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ pub struct Config {
pub prefer_builder_proposals: bool,
/// Controls whether the latency measurement service is enabled
pub disable_latency_measurement_service: bool,
/// Enable operator doppelgänger protection
pub operator_dg: bool,
/// Number of epochs to wait in monitor mode
pub operator_dg_wait_epochs: u64,
/// Freshness threshold (K) for detecting operator twins
pub operator_dg_fresh_k: u64,
}

impl Config {
Expand Down Expand Up @@ -115,6 +121,9 @@ impl Config {
prefer_builder_proposals: false,
gas_limit: 36_000_000,
disable_latency_measurement_service: false,
operator_dg: true,
operator_dg_wait_epochs: 2,
operator_dg_fresh_k: 3,
}
}
}
Expand Down Expand Up @@ -243,6 +252,11 @@ pub fn from_cli(cli_args: &Node, global_config: GlobalConfig) -> Result<Config,
config.impostor = cli_args.impostor.map(OperatorId);
config.disable_latency_measurement_service = cli_args.disable_latency_measurement_service;

// Operator doppelgänger protection
config.operator_dg = cli_args.operator_dg;
config.operator_dg_wait_epochs = cli_args.operator_dg_wait_epochs;
config.operator_dg_fresh_k = cli_args.operator_dg_fresh_k;

// Performance options
if let Some(max_workers) = cli_args.max_workers {
config.processor.max_workers = max_workers;
Expand Down
64 changes: 60 additions & 4 deletions anchor/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ pub mod config;
mod key;
mod metrics;
mod notifier;
mod operator_doppelganger;

use std::{
fs::File,
io::Read,
net::SocketAddr,
path::Path,
sync::Arc,
sync::{Arc, Mutex},
time::{Duration, SystemTime, UNIX_EPOCH},
};

Expand All @@ -31,7 +32,7 @@ use eth2::{
BeaconNodeHttpClient, Timeouts,
reqwest::{Certificate, ClientBuilder},
};
use message_receiver::NetworkMessageReceiver;
use message_receiver::{DoppelgangerChecker, DoppelgangerConfig, NetworkMessageReceiver};
use message_sender::{MessageSender, NetworkMessageSender, impostor::ImpostorMessageSender};
use message_validator::Validator;
use network::Network;
Expand All @@ -47,7 +48,7 @@ use task_executor::TaskExecutor;
use tokio::{
net::TcpListener,
select,
sync::{mpsc, mpsc::unbounded_channel},
sync::{mpsc, mpsc::unbounded_channel, oneshot},
time::{Instant, interval, sleep},
};
use tracing::{debug, error, info, warn};
Expand All @@ -63,7 +64,10 @@ use validator_services::{
sync_committee_service::SyncCommitteeService,
};

use crate::{key::read_or_generate_private_key, notifier::spawn_notifier};
use crate::{
key::read_or_generate_private_key, notifier::spawn_notifier,
operator_doppelganger::OperatorDoppelgangerService,
};

/// Specific timeout constants for HTTP requests involved in different validator duties.
/// This can help ensure that proper endpoint fallback occurs.
Expand Down Expand Up @@ -466,6 +470,57 @@ impl Client {

let (outcome_tx, outcome_rx) = mpsc::channel::<message_receiver::Outcome>(9000);

// Initialize operator doppelgänger protection if enabled
let doppelganger_config = if config.operator_dg && config.impostor.is_none() {
let own_operator_id = operator_id
.get()
.ok_or_else(|| "Operator ID not yet available".to_string())?;

let doppelganger_service = Arc::new(OperatorDoppelgangerService::<E, _>::new(
own_operator_id,
slot_clock.clone(),
config.operator_dg_wait_epochs,
config.operator_dg_fresh_k,
true, // enabled
));

// Create shutdown channel
let (shutdown_tx, shutdown_rx) = oneshot::channel();

// Create checker callback
let service = doppelganger_service.clone();
let checker: DoppelgangerChecker =
Box::new(move |signed_msg, qbft_msg| service.check_message(signed_msg, qbft_msg));

// Spawn task to listen for shutdown signal
let executor_clone = executor.clone();
executor.spawn_without_exit(
async move {
if shutdown_rx.await.is_ok() {
error!(
"Operator doppelgänger detected! Initiating fatal shutdown to prevent equivocation."
);
// Give time for the error log to be flushed
tokio::time::sleep(Duration::from_millis(100)).await;
// Trigger executor shutdown with failure reason
let _ = executor_clone
.shutdown_sender()
.try_send(task_executor::ShutdownReason::Failure(
"Operator doppelgänger detected",
));
}
},
"doppelganger-shutdown",
);

Some(DoppelgangerConfig {
checker: Arc::new(checker),
shutdown_tx: Arc::new(Mutex::new(Some(shutdown_tx))),
})
} else {
None
};

let message_receiver = NetworkMessageReceiver::new(
processor_senders.clone(),
qbft_manager.clone(),
Expand All @@ -474,6 +529,7 @@ impl Client {
is_synced.clone(),
outcome_tx,
message_validator,
doppelganger_config,
);

// Start the p2p network
Expand Down
4 changes: 4 additions & 0 deletions anchor/client/src/operator_doppelganger/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
mod service;
mod state;

pub use service::OperatorDoppelgangerService;
156 changes: 156 additions & 0 deletions anchor/client/src/operator_doppelganger/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
use std::{marker::PhantomData, sync::Arc};

use parking_lot::RwLock;
use slot_clock::SlotClock;
use ssv_types::{
OperatorId, consensus::QbftMessage, message::SignedSSVMessage, msgid::DutyExecutor,
};
use tracing::{error, info, warn};
use types::EthSpec;

use super::state::{DoppelgangerMode, DoppelgangerState};

/// Service for detecting operator doppelgängers (duplicate instances)
pub struct OperatorDoppelgangerService<E: EthSpec, S: SlotClock> {
/// Our operator ID to watch for
own_operator_id: OperatorId,
/// Current state
state: Arc<RwLock<DoppelgangerState>>,
/// Slot clock for epoch tracking
slot_clock: S,
/// Enabled flag
enabled: bool,
/// Phantom data for EthSpec
_phantom: PhantomData<E>,
}

impl<E: EthSpec, S: SlotClock> OperatorDoppelgangerService<E, S> {
/// Create a new operator doppelgänger service
pub fn new(
own_operator_id: OperatorId,
slot_clock: S,
wait_epochs: u64,
fresh_k: u64,
enabled: bool,
) -> Self {
let current_epoch = slot_clock
.now()
.map(|slot| slot.epoch(E::slots_per_epoch()))
.unwrap_or_else(|| types::Epoch::new(0));

let state = Arc::new(RwLock::new(DoppelgangerState::new(
current_epoch,
wait_epochs,
fresh_k,
)));

if enabled {
info!(
operator_id = *own_operator_id,
current_epoch = current_epoch.as_u64(),
wait_epochs,
fresh_k,
"Operator doppelgänger protection enabled, entering monitor mode"
);
} else {
info!("Operator doppelgänger protection disabled");
}

Self {
own_operator_id,
state,
slot_clock,
enabled,
_phantom: PhantomData,
}
}

/// Check if a message indicates a potential doppelgänger
///
/// Returns true if a twin is detected (should trigger shutdown)
pub fn check_message(
&self,
signed_message: &SignedSSVMessage,
qbft_message: &QbftMessage,
) -> bool {
if !self.enabled {
return false;
}

// Update mode based on current epoch
if let Some(slot) = self.slot_clock.now() {
let current_epoch = slot.epoch(E::slots_per_epoch());
self.state.write().update_mode(current_epoch);
}

let state = self.state.read();

// Only check in monitor mode
if !state.is_monitoring() {
return false;
}

// Extract committee ID from message
let committee_id = match signed_message.ssv_message().msg_id().duty_executor() {
Some(DutyExecutor::Committee(committee_id)) => committee_id,
_ => return false, // Not a committee message
};

// Update the maximum height we've seen for this committee
drop(state);
self.state
.write()
.update_max_height(committee_id, qbft_message.height);
let state = self.state.read();

// Check if this is a single-signer message with our operator ID
let operator_ids = signed_message.operator_ids();
if operator_ids.len() != 1 {
// Not a single-signer message (could be aggregate/decided)
return false;
}

let signer = operator_ids[0];
if signer != self.own_operator_id {
// Not signed by us
return false;
}

// Check if the message is fresh
if !state.is_fresh(committee_id, qbft_message.height) {
// Stale message, likely a replay - not evidence of a twin
warn!(
operator_id = *self.own_operator_id,
committee = ?committee_id,
height = qbft_message.height,
"Received stale message with our operator ID (likely replay), ignoring"
);
return false;
}

// Fresh single-signer message with our operator ID = twin detected!
error!(
operator_id = *self.own_operator_id,
committee = ?committee_id,
height = qbft_message.height,
round = qbft_message.round,
message_type = ?qbft_message.qbft_message_type,
"OPERATOR DOPPELGÄNGER DETECTED: Received fresh message signed with our operator ID. \
Another instance of this operator is running. Shutting down to prevent equivocation."
);

true
}

/// Get the current mode
#[allow(dead_code)]
pub fn mode(&self) -> DoppelgangerMode {
self.state.read().mode()
}

/// Check if we're still in monitor mode
#[allow(dead_code)]
pub fn is_monitoring(&self) -> bool {
self.enabled && self.state.read().is_monitoring()
}
}
Loading