Skip to content

Commit

Permalink
Make signer lockless: substitute RwLock with ArcSwap
Browse files Browse the repository at this point in the history
  • Loading branch information
povi committed Apr 8, 2024
1 parent 56cac49 commit 287eb79
Show file tree
Hide file tree
Showing 16 changed files with 331 additions and 273 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 13 additions & 8 deletions grandine/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ struct Context {
storage_config: StorageConfig,
command: Option<GrandineCommand>,
builder_config: Option<BuilderConfig>,
signer: Signer,
signer: Arc<Signer>,
slasher_config: Option<SlasherConfig>,
state_slot: Option<Slot>,
eth1_auth: Arc<Auth>,
Expand Down Expand Up @@ -142,7 +142,7 @@ impl Context {
storage_config,
command,
builder_config,
mut signer,
signer,
slasher_config,
state_slot,
eth1_auth,
Expand All @@ -153,10 +153,15 @@ impl Context {
} = self;

// Load keys early so we can validate `eth1_rpc_urls`.
signer.load_keys_from_web3signer().await?;
signer.load_keys_from_web3signer().await;

let signer_snapshot = signer.load();

if eth1_rpc_urls.is_empty() {
ensure!(signer.no_keys(), Error::MissingEth1RpcUrlsWithValidators);
ensure!(
signer_snapshot.no_keys(),
Error::MissingEth1RpcUrlsWithValidators,
);
}

let default_deposit_tree = predefined_network.map(PredefinedNetwork::genesis_deposit_tree);
Expand Down Expand Up @@ -194,7 +199,7 @@ impl Context {
let eth1_chain = Eth1Chain::new(
chain_config.clone_arc(),
eth1_config.clone_arc(),
signer.client().clone(),
signer_snapshot.client().clone(),
eth1_database,
eth1_api_to_metrics_tx.clone(),
metrics_config.metrics.clone(),
Expand All @@ -206,7 +211,7 @@ impl Context {
&chain_config,
genesis_state_file,
predefined_network,
signer.client(),
signer_snapshot.client(),
storage_config
.directories
.store_directory
Expand Down Expand Up @@ -423,12 +428,12 @@ fn try_main() -> Result<()> {
None => ValidatorKeyCache::default(),
};

let signer = Signer::new(
let signer = Arc::new(Signer::new(
validators.normalize(cache.as_mut(), &keystore_storage)?,
client,
web3signer_config,
metrics.clone(),
);
));

if let Some(cache) = cache {
if let Err(error) = cache.save() {
Expand Down
18 changes: 12 additions & 6 deletions http_api/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use slashing_protection::{SlashingProtector, DEFAULT_SLASHING_PROTECTION_HISTORY
use snapshot_test_utils::Case;
use std_ext::ArcExt as _;
use tap::Pipe as _;
use tokio::{runtime::Builder, sync::RwLock};
use tokio::runtime::Builder;
use types::{
combined::{BeaconState, SignedBeaconBlock},
config::Config as ChainConfig,
Expand Down Expand Up @@ -214,18 +214,24 @@ impl<P: Preset> Context<P> {
let execution_service =
ExecutionService::new(eth1_api, controller.clone_arc(), execution_service_rx);

let signer = Signer::new(validator_keys, client, Web3SignerConfig::default(), None);
let validator_keys = Arc::new(signer.keys().copied().collect());
let signer = Arc::new(Signer::new(
validator_keys,
client,
Web3SignerConfig::default(),
None,
));

let signer_snapshot = signer.load();

let validator_keys = Arc::new(signer_snapshot.keys().copied().collect());

let mut slashing_protector =
SlashingProtector::in_memory(DEFAULT_SLASHING_PROTECTION_HISTORY_LIMIT)?;

slashing_protector.register_validators(signer.keys().copied())?;
slashing_protector.register_validators(signer_snapshot.keys().copied())?;

let slashing_protector = Arc::new(Mutex::new(slashing_protector));

let signer = Arc::new(RwLock::new(signer));

let validator_config = Arc::new(ValidatorConfig::default());

let keymanager = Arc::new(KeyManager::new_in_memory(
Expand Down
79 changes: 43 additions & 36 deletions keymanager/src/keystores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use signer::{KeyOrigin, Signer};
use slashing_protection::{interchange_format::InterchangeFormat, SlashingProtector};
use std_ext::ArcExt as _;
use tap::{Pipe as _, TryConv as _};
use tokio::sync::RwLock;
use types::phase0::primitives::H256;
use uuid::Uuid;
use validator_key_cache::ValidatorKeyCache;
Expand Down Expand Up @@ -56,7 +55,7 @@ pub struct ValidatingPubkey {
}

pub struct KeystoreManager {
signer: Arc<RwLock<Signer>>,
signer: Arc<Signer>,
slashing_protector: Arc<Mutex<SlashingProtector>>,
genesis_validators_root: H256,
storage: Mutex<Option<ValidatorKeyCache>>,
Expand All @@ -66,7 +65,7 @@ pub struct KeystoreManager {
impl KeystoreManager {
#[must_use]
pub fn new_in_memory(
signer: Arc<RwLock<Signer>>,
signer: Arc<Signer>,
slashing_protector: Arc<Mutex<SlashingProtector>>,
genesis_validators_root: H256,
) -> Self {
Expand All @@ -80,7 +79,7 @@ impl KeystoreManager {
}

pub fn new_persistent(
signer: Arc<RwLock<Signer>>,
signer: Arc<Signer>,
slashing_protector: Arc<Mutex<SlashingProtector>>,
genesis_validators_root: H256,
validator_directory: PathBuf,
Expand Down Expand Up @@ -115,24 +114,22 @@ impl KeystoreManager {
self.persistence_config
.validate_storage_password_presence()?;

let signer_keys = self
.signer
.read()
.await
.keys_with_origin()
.collect::<HashMap<_, _>>();
let mut deleted_keys = vec![];
let mut delete_results = vec![];

self.signer.update(|snapshot| {
let mut snapshot = snapshot.as_ref().clone();

let signer_keys = snapshot.keys_with_origin().collect::<HashMap<_, _>>();

let statuses = {
let mut signer = self.signer.write().await;
deleted_keys.clear();
delete_results.clear();

pubkeys
.iter()
.copied()
.map(|pubkey| match signer_keys.get(&pubkey) {
for pubkey in pubkeys.iter().copied() {
let result = match signer_keys.get(&pubkey) {
Some(origin) => match origin {
KeyOrigin::KeymanagerAPI => {
signer.delete_key(pubkey);
snapshot.delete_key(pubkey);
deleted_keys.push(pubkey);
Status::Deleted.into()
}
Expand All @@ -141,9 +138,13 @@ impl KeystoreManager {
}
},
None => Error::NotFound.into(),
})
.collect()
};
};

delete_results.push(result);
}

snapshot
});

if !deleted_keys.is_empty() {
let mut key_storage = self.key_storage_mut().await?;
Expand All @@ -157,7 +158,7 @@ impl KeystoreManager {
.await
.build_interchange_data_for_validators(self.genesis_validators_root, pubkeys)?;

Ok((statuses, serde_json::to_string(&slashing_protection)?))
Ok((delete_results, serde_json::to_string(&slashing_protection)?))
}

pub async fn import(
Expand Down Expand Up @@ -223,7 +224,14 @@ impl KeystoreManager {
.lock()
.await
.register_validators(imported_keys.iter().map(|(pubkey, _)| *pubkey))?;
self.signer.write().await.append_keys(imported_keys);

self.signer.update(|snapshot| {
let mut snapshot = snapshot.as_ref().clone();

snapshot.append_keys(imported_keys.clone());

snapshot
});
}

Ok(statuses)
Expand All @@ -250,10 +258,9 @@ impl KeystoreManager {
Ok(())
}

pub async fn list_validating_pubkeys(&self) -> Vec<ValidatingPubkey> {
pub fn list_validating_pubkeys(&self) -> Vec<ValidatingPubkey> {
self.signer
.read()
.await
.load()
.keys_with_origin()
.map(|(pubkey, origin)| ValidatingPubkey {
validating_pubkey: pubkey,
Expand Down Expand Up @@ -549,13 +556,13 @@ mod tests {

fn build_keystore_manager(
storage_dir: Option<PathBuf>,
) -> Result<(KeystoreManager, Arc<RwLock<Signer>>)> {
let signer = Arc::new(RwLock::new(Signer::new(
) -> Result<(KeystoreManager, Arc<Signer>)> {
let signer = Arc::new(Signer::new(
vec![],
Client::new(),
Web3SignerConfig::default(),
None,
)));
));
let slashing_protector = Arc::new(Mutex::new(SlashingProtector::in_memory(
DEFAULT_SLASHING_PROTECTION_HISTORY_LIMIT,
)?));
Expand Down Expand Up @@ -593,7 +600,7 @@ mod tests {
.tempdir()?;
let (manager, signer) = build_keystore_manager(Some(storage_tempdir.path().to_path_buf()))?;

assert!(manager.list_validating_pubkeys().await.is_empty());
assert!(manager.list_validating_pubkeys().is_empty());

let normalized_password = eip_2335::normalize_password(KEYSTORE_PASSWORD)?;
let expected_pubkey = PublicKeyBytes::from(PUBKEY_BYTES);
Expand All @@ -617,15 +624,15 @@ mod tests {
);

assert_eq!(
manager.list_validating_pubkeys().await,
manager.list_validating_pubkeys(),
vec![ValidatingPubkey {
validating_pubkey: expected_pubkey,
readonly: false
}],
);

assert_eq!(
signer.read().await.keys().copied().collect_vec(),
signer.load().keys().copied().collect_vec(),
vec![expected_pubkey],
);

Expand All @@ -648,7 +655,7 @@ mod tests {
);

assert_eq!(
manager.list_validating_pubkeys().await,
manager.list_validating_pubkeys(),
vec![ValidatingPubkey {
validating_pubkey: expected_pubkey,
readonly: false
Expand Down Expand Up @@ -727,7 +734,7 @@ mod tests {
async fn test_keystore_import_load_and_delete_with_in_memory_storage() -> Result<()> {
let (manager, signer) = build_keystore_manager(None)?;

assert!(manager.list_validating_pubkeys().await.is_empty());
assert!(manager.list_validating_pubkeys().is_empty());

let normalized_password = eip_2335::normalize_password(KEYSTORE_PASSWORD)?;
let expected_pubkey = PublicKeyBytes::from(PUBKEY_BYTES);
Expand All @@ -751,15 +758,15 @@ mod tests {
);

assert_eq!(
manager.list_validating_pubkeys().await,
manager.list_validating_pubkeys(),
vec![ValidatingPubkey {
validating_pubkey: expected_pubkey,
readonly: false
}],
);

assert_eq!(
signer.read().await.keys().copied().collect_vec(),
signer.load().keys().copied().collect_vec(),
vec![expected_pubkey],
);

Expand All @@ -782,7 +789,7 @@ mod tests {
);

assert_eq!(
manager.list_validating_pubkeys().await,
manager.list_validating_pubkeys(),
vec![ValidatingPubkey {
validating_pubkey: expected_pubkey,
readonly: false
Expand Down
5 changes: 2 additions & 3 deletions keymanager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use futures::lock::Mutex;
use signer::Signer;
use slashing_protection::SlashingProtector;
use std_ext::ArcExt as _;
use tokio::sync::RwLock;
use types::phase0::primitives::{ExecutionAddress, H256};

use crate::{keystores::KeystoreManager, remote_keys::RemoteKeyManager};
Expand All @@ -32,7 +31,7 @@ pub struct KeyManager {

impl KeyManager {
pub fn new_in_memory(
signer: Arc<RwLock<Signer>>,
signer: Arc<Signer>,
slashing_protector: Arc<Mutex<SlashingProtector>>,
genesis_validators_root: H256,
default_fee_recipient: ExecutionAddress,
Expand All @@ -59,7 +58,7 @@ impl KeyManager {
}

pub fn new_persistent(
signer: Arc<RwLock<Signer>>,
signer: Arc<Signer>,
slashing_protector: Arc<Mutex<SlashingProtector>>,
genesis_validators_root: H256,
validator_directory: PathBuf,
Expand Down
Loading

0 comments on commit 287eb79

Please sign in to comment.