Skip to content

feat: improve lookup table overhead for cloning pipeline #457

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Jul 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
e32a1d9
feat: make lookup table creation async in account cloner
thlorenz Jul 14, 2025
900fc2d
feat: add ensure_pubkeys_table method and refcount queries
thlorenz Jul 14, 2025
f926854
feat: committor service handles table creation failure, converting co…
thlorenz Jul 14, 2025
05861fa
chore: addendum to previous for commits using buffer
thlorenz Jul 14, 2025
080b447
ci: temporarily add CI running ix tests on this branch
thlorenz Jul 14, 2025
2d7d140
chore: log ix test name on success and failures
thlorenz Jul 15, 2025
727254c
chore: adding safety multiplier to table mania CU budgets
thlorenz Jul 15, 2025
6e4296b
Merge branch 'thlorenz/committor-increase-compute-budget' into thlore…
thlorenz Jul 15, 2025
4b674b4
chore: remove special handling of lookup table creation error
thlorenz Jul 15, 2025
075fa0d
chore: add convenience tasks to Makefile to isolate ix tests
thlorenz Jul 15, 2025
ac78e86
chore: properly cleanup when ledger restore single airdrop test fails
thlorenz Jul 15, 2025
bccd780
chore: remove tmp CI workflow
thlorenz Jul 15, 2025
dc2599b
Merge branch 'dev' into thlorenz/committor-improve-table-speed
thlorenz Jul 15, 2025
f007d4f
chore: removing obsolete CouldNotCreateLookupTable error and commit s…
thlorenz Jul 15, 2025
dd6bf69
chore: address - use relaxed ordering in get_refcount method - taco-paco
thlorenz Jul 17, 2025
5dc7a33
chore: optimize lock acquisition in ensure_pubkeys_table - taco-paco
thlorenz Jul 17, 2025
5ed50bd
Merge branch 'dev' into thlorenz/committor-improve-table-speed
thlorenz Jul 17, 2025
17d728c
fix: clippy warning
thlorenz Jul 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 23 additions & 26 deletions magicblock-account-cloner/src/remote_account_cloner_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -710,43 +710,40 @@ where

// Allow the committer service to reserve pubkeys in lookup tables
// that could be needed when we commit this account
// NOTE: we start reserving pubkeys so the transaction can complete while we
// clone the account.
let reserve_pubkeys_handle = if let Some(committor) =
self.changeset_committor.as_ref()
{
if let Some(committor) = self.changeset_committor.as_ref() {
let committor = Arc::clone(committor);
let pubkey = *pubkey;
let owner = delegation_record.owner;
Some(tokio::spawn(map_committor_request_result(
committor.reserve_pubkeys_for_committee(pubkey, owner),
committor,
)))
} else {
None
};
tokio::spawn(async move {
match map_committor_request_result(
committor
.reserve_pubkeys_for_committee(pubkey, owner),
committor,
)
.await
{
Ok(initiated) => {
trace!(
"Reserving lookup keys for {pubkey} took {:?}",
initiated.elapsed()
);
}
Err(err) => {
error!("Failed to reserve lookup keys for {pubkey}: {err:?}");
}
};
});
}

let sig = self.do_clone_delegated_account(
self.do_clone_delegated_account(
pubkey,
// TODO(GabrielePicco): Avoid cloning
&Account {
lamports: delegation_record.lamports,
..account.clone()
},
delegation_record,
)?;

if let Some(handle) = reserve_pubkeys_handle {
let initiated = handle.await.map_err(|err| {
AccountClonerError::JoinError(format!("{err} {err:?}"))
})??;
trace!(
"Reserving lookup keys for {pubkey} took {:?}",
initiated.elapsed()
);
}

sig
)?
}
};
// Return the result
Expand Down
7 changes: 3 additions & 4 deletions magicblock-accounts-db/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use lmdb::{
Cursor, DatabaseFlags, Environment, RwTransaction, Transaction, WriteFlags,
};
use log::warn;
use magicblock_config::AccountsDbConfig;
use solana_pubkey::Pubkey;
use table::Table;
use utils::*;
Expand Down Expand Up @@ -131,12 +130,12 @@ impl AccountsDbIndex {
};
let offset =
// SAFETY:
// The accounts index stores two u32 values (offset and blocks)
// The accounts index stores two u32 values (offset and blocks)
// serialized into 8 byte long slice. Here we are interested only in the first 4 bytes
// (offset). The memory used by lmdb to store the serialization might not be u32
// aligned, so we make use `read_unaligned`.
// aligned, so we make use `read_unaligned`.
//
// We read the data stored by corresponding put in `insert_account`,
// We read the data stored by corresponding put in `insert_account`,
// thus it should be of valid length and contain valid value
unsafe { (offset.as_ptr() as *const u32).read_unaligned() };
Ok(offset)
Expand Down
41 changes: 21 additions & 20 deletions magicblock-committor-service/src/commit/commit_using_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ impl CommittorProcessor {
(tm, keys)
});

let strategy = CommitStrategy::args(use_lookup);
let compute_budget_ixs = me
.compute_budget_config
.args_process_budget()
Expand All @@ -119,7 +120,7 @@ impl CommittorProcessor {
Ok(sig) => sig,
Err(err) => {
error!("Failed to commit changeset with {} accounts using args: {:?}", committees.len(), err);
let strategy = CommitStrategy::args(use_lookup);

let sigs = err.signature().map(|sig| CommitSignatures {
process_signature: sig,
finalize_signature: None,
Expand All @@ -128,11 +129,7 @@ impl CommittorProcessor {
return commit_infos
.into_iter()
.map(|x| {
CommitStage::FailedProcess((
x,
strategy,
sigs.as_ref().cloned(),
))
CommitStage::FailedProcess((x, strategy, sigs.clone()))
})
.collect();
}
Expand Down Expand Up @@ -165,17 +162,19 @@ impl CommittorProcessor {
"Failed to finalize changeset using args: {:?}",
err
);

let sigs = CommitSignatures {
process_signature: process_sig,
finalize_signature: err.signature(),
undelegate_signature: None,
};
return commit_infos
.into_iter()
.map(|x| {
CommitStage::FailedFinalize((
x,
CommitStrategy::args(use_lookup),
CommitSignatures {
process_signature: process_sig,
finalize_signature: err.signature(),
undelegate_signature: None,
},
strategy,
sigs.clone(),
))
})
.collect();
Expand Down Expand Up @@ -219,7 +218,7 @@ impl CommittorProcessor {
.map(|x| {
CommitStage::FailedUndelegate((
x,
CommitStrategy::args(use_lookup),
strategy,
CommitSignatures {
process_signature: process_sig,
finalize_signature: finalize_sig,
Expand Down Expand Up @@ -254,17 +253,19 @@ impl CommittorProcessor {
"Failed to undelegate accounts via transaction '{}': {:?}",
err, err
);
let sigs = CommitSignatures {
process_signature: process_sig,
finalize_signature: finalize_sig,
undelegate_signature: err.signature(),
};

return commit_infos
.into_iter()
.map(|x| {
CommitStage::FailedUndelegate((
x,
CommitStrategy::args(use_lookup),
CommitSignatures {
process_signature: process_sig,
finalize_signature: finalize_sig,
undelegate_signature: err.signature(),
},
strategy,
sigs.clone(),
))
})
.collect();
Expand All @@ -282,7 +283,7 @@ impl CommittorProcessor {
.map(|x| {
CommitStage::Succeeded((
x,
CommitStrategy::args(use_lookup),
strategy,
CommitSignatures {
process_signature: process_sig,
finalize_signature: finalize_sig,
Expand Down
102 changes: 58 additions & 44 deletions magicblock-committor-service/src/commit/commit_using_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,20 @@ use magicblock_rpc_client::{
MagicBlockSendTransactionConfig,
};
use solana_pubkey::Pubkey;
use solana_sdk::{hash::Hash, instruction::Instruction, signer::Signer};
use solana_sdk::{
hash::Hash, instruction::Instruction, signature::Signature, signer::Signer,
};
use tokio::task::JoinSet;

use super::{
common::send_and_confirm,
common::{get_accounts_to_undelegate, send_and_confirm},
process_buffers::{
chunked_ixs_to_process_commitables_and_close_pdas,
ChunkedIxsToProcessCommitablesAndClosePdasResult,
},
CommittorProcessor,
};
use crate::{
commit::common::get_accounts_to_undelegate,
commit_stage::CommitSignatures,
error::{CommitAccountError, CommitAccountResult},
finalize::{
Expand Down Expand Up @@ -275,22 +276,34 @@ impl CommittorProcessor {
.await;
commit_stages.extend(failed_finalize.into_iter().flat_map(
|(sig, infos)| {
fn get_sigs_for_bundle(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This adds more lines of code than before + extra indirection

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While the refactored version does add more lines, it significantly improves code maintainability (isolating functionality is good ;))

bundle_id: u64,
processed_signatures: &HashMap<u64, Signature>,
finalized_sig: Option<Signature>,
) -> CommitSignatures {
CommitSignatures {
// SAFETY: signatures for all bundles of succeeded process transactions
// have been added above
process_signature: *processed_signatures
.get(&bundle_id)
.unwrap(),
finalize_signature: finalized_sig,
undelegate_signature: None,
}
}

infos
.into_iter()
.map(|x| {
let bundle_id = x.bundle_id();
CommitStage::FailedFinalize((
x,
commit_strategy,
CommitSignatures {
// SAFETY: signatures for all bundles of succeeded process transactions
// have been added above
process_signature: *processed_signatures
.get(&bundle_id)
.unwrap(),
finalize_signature: sig,
undelegate_signature: None,
},
get_sigs_for_bundle(
bundle_id,
&processed_signatures,
sig,
),
))
})
.collect::<Vec<_>>()
Expand Down Expand Up @@ -359,6 +372,25 @@ impl CommittorProcessor {
finalize_and_undelegate.len(),
"BUG: same amount of accounts to undelegate as to finalize and undelegate"
);
fn get_sigs_for_bundle(
bundle_id: u64,
processed_signatures: &HashMap<u64, Signature>,
finalized_signatures: &HashMap<u64, Signature>,
undelegated_sig: Option<Signature>,
) -> CommitSignatures {
CommitSignatures {
// SAFETY: signatures for all bundles of succeeded process transactions
// have been added above
process_signature: *processed_signatures
.get(&bundle_id)
.unwrap(),
finalize_signature: finalized_signatures
.get(&bundle_id)
.cloned(),
undelegate_signature: undelegated_sig,
}
}

let undelegate_ixs = match undelegate_commitables_ixs(
&processor.magicblock_rpc_client,
processor.authority.pubkey(),
Expand All @@ -378,19 +410,12 @@ impl CommittorProcessor {
CommitStage::FailedUndelegate((
x.clone(),
CommitStrategy::args(use_lookup),
CommitSignatures {
// SAFETY: signatures for all bundles of succeeded process transactions
// have been added above
process_signature:
*processed_signatures
.get(&bundle_id)
.unwrap(),
finalize_signature:
finalized_signatures
.get(&bundle_id)
.cloned(),
undelegate_signature: err.signature(),
},
get_sigs_for_bundle(
bundle_id,
&processed_signatures,
&finalized_signatures,
err.signature(),
),
))
}),
);
Expand Down Expand Up @@ -425,19 +450,12 @@ impl CommittorProcessor {
CommitStage::FailedUndelegate((
x,
commit_strategy,
CommitSignatures {
// SAFETY: signatures for all bundles of succeeded process transactions
// have been added above
process_signature:
*processed_signatures
.get(&bundle_id)
.unwrap(),
finalize_signature:
finalized_signatures
.get(&bundle_id)
.cloned(),
undelegate_signature: sig,
},
get_sigs_for_bundle(
bundle_id,
&processed_signatures,
&finalized_signatures,
sig,
),
))
})
.collect::<Vec<_>>()
Expand Down Expand Up @@ -1009,18 +1027,14 @@ impl CommittorProcessor {
rpc_client,
authority,
[write_budget_ixs, vec![ix]].concat(),
format!("write chunk for offset {}", chunk.offset),
format!("write chunk ({} bytes)", chunk_bytes),
Some(latest_blockhash),
// NOTE: We could use `processed` here and wait to get the processed status at
// least which would make things a bit slower.
// However that way we would avoid sending unnecessary transactions potentially
// since we may not see some written chunks yet when we get the chunks account.
MagicBlockSendTransactionConfig::ensure_processed(),
None,
)
.await
.inspect_err(|err| {
error!("{:?}", err);
warn!("{:?}", err);
})
});
}
Expand Down
Loading