Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,501 changes: 752 additions & 749 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ validator_services = { git = "https://github.com/sigp/lighthouse", rev = "b59feb
validator_store = { git = "https://github.com/sigp/lighthouse", rev = "b59feb04" }
workspace_members = { git = "https://github.com/sigp/lighthouse", rev = "b59feb04" }

alloy = { version = "1.0.22", features = [
alloy = { version = "1.0.42", features = [
"sol-types",
"transports",
"json",
Expand Down
15 changes: 15 additions & 0 deletions anchor/database/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ struct SingleState {
clusters: HashSet<ClusterId>,
/// Nonce of the owner account
nonces: HashMap<Address, u16>,
/// Monotonically increasing OperatorId count. None indicates a migrated database.
max_operator_id_seen: Option<u64>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -214,6 +216,19 @@ impl NetworkDatabase {
Ok(())
}

/// Update the largest seen OperatorId in the database
pub fn set_max_operator_id_seen(
&self,
operator_id: u64,
tx: &Transaction<'_>,
) -> Result<(), DatabaseError> {
tx.prepare_cached(sql_operations::SET_MAX_OPERATOR_ID_SEEN)?
.execute(params![operator_id])?;
self.modify_state(|state| state.single_state.max_operator_id_seen = Some(operator_id));

Ok(())
}

// Open an existing database at the given `path`, or create one if none exists.
fn open_or_create(path: &Path, domain: DomainType) -> Result<Pool, DatabaseError> {
schema::ensure_up_to_date(path, domain)?;
Expand Down
2 changes: 2 additions & 0 deletions anchor/database/src/sql_operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
pub const INSERT_METADATA: &str = r#"INSERT INTO metadata (domain_type) VALUES (?1)"#;
pub const GET_METADATA: &str = r#"SELECT schema_version, domain_type FROM metadata"#;
pub const GET_LEGACY_BLOCK: &str = r#"SELECT * FROM block"#;
pub const GET_MAX_OPERATOR_ID_SEEN: &str = r#"SELECT max_operator_id_seen FROM metadata"#;
pub const SET_MAX_OPERATOR_ID_SEEN: &str = r#"UPDATE metadata SET max_operator_id_seen = ?1"#;

// Operator
pub const INSERT_OPERATOR: &str = r#"
Expand Down
14 changes: 14 additions & 0 deletions anchor/database/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ impl NetworkState {
// Get the last processed block from the database
let last_processed_block = Self::get_last_processed_block_from_db(&conn)?;

// Get number of joined operators from the database
let max_operator_id_seen = Self::get_max_operator_id_seen_from_db(&conn)?;

// Without an ID, we have no idea who we are. Check to see if an operator with our public
// key is stored the database. If it does not exist, that means the operator still
// has to be registered with the network contract or that we have not seen the
Expand Down Expand Up @@ -71,6 +74,7 @@ impl NetworkState {
.map(|m| m.keys().copied().collect())
.unwrap_or_default(),
nonces,
max_operator_id_seen,
};

// Populate all multi-index maps in a single pass through clusters
Expand Down Expand Up @@ -135,6 +139,12 @@ impl NetworkState {
.map_err(DatabaseError::from)
}

fn get_max_operator_id_seen_from_db(conn: &PoolConn) -> Result<Option<u64>, DatabaseError> {
conn.prepare_cached(sql_operations::GET_MAX_OPERATOR_ID_SEEN)?
.query_row(params![], |row| row.get(0))
.map_err(DatabaseError::from)
}

// Check to see if an operator with the public key already exists in the database
fn does_self_exist(
conn: &PoolConn,
Expand Down Expand Up @@ -341,6 +351,10 @@ impl NetworkState {
self.single_state.last_processed_block
}

pub fn get_max_operator_id_seen(&self) -> Option<u64> {
self.single_state.max_operator_id_seen
}

pub fn get_committee_info_by_committee_id(
&self,
committee_id: &CommitteeId,
Expand Down
3 changes: 2 additions & 1 deletion anchor/database/src/table_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
CREATE TABLE metadata (
schema_version INTEGER NOT NULL DEFAULT 1,
domain_type INTEGER NOT NULL,
block_number INTEGER NOT NULL DEFAULT 0 CHECK (block_number >= 0)
block_number INTEGER NOT NULL DEFAULT 0 CHECK (block_number >= 0),
max_operator_id_seen INTEGER
);
CREATE TRIGGER unique_metadata
BEFORE INSERT ON metadata
Expand Down
16 changes: 16 additions & 0 deletions anchor/eth/src/event_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,22 @@ impl EventProcessor {
)));
}

let max_seen = self.db.state().get_max_operator_id_seen();

// Only check for missing operators if we have a previous max (not a migrated database)
if let Some(max_seen) = max_seen
&& max_seen != operatorId - 1
{
return Err(ExecutionError::InvalidEvent(format!(
"Missing OperatorAdded events: database has only seen up to id {max_seen}, \
but got operator {operator_id}."
)));
}

self.db
.set_max_operator_id_seen(operatorId, tx)
.map_err(|e| ExecutionError::Database(e.to_string()))?;

let data = publicKey.as_ref();

// If the data is 704 bytes, remove the ssv encoding. Else, just parse the key
Expand Down
11 changes: 11 additions & 0 deletions anchor/eth/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,17 @@ impl SsvEventSyncer {
start_block = end_block + 1;
}
info!("Historical sync completed");

if self
.event_processor
.db
.state()
.get_all_operators()
.is_empty()
{
warn!("No OperatorAdded events found in historical sync, there is likely a sync error");
}

Ok(())
}

Expand Down