Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ where
intent: &ScheduledBaseIntentWrapper,
result: &IntentExecutorResult<ExecutionOutput>,
) {
const EXECUTION_TIME_THRESHOLD: f64 = 2.0;
const EXECUTION_TIME_THRESHOLD: f64 = 5.0;

let intent_execution_secs = execution_time.as_secs_f64();
metrics::observe_committor_intent_execution_time_histogram(
Expand All @@ -309,12 +309,8 @@ where
// Loki alerts
if intent_execution_secs >= EXECUTION_TIME_THRESHOLD {
info!(
"Intent took too long to execute: {}s. {}",
intent_execution_secs,
result
.as_ref()
.map(|_| "succeeded".to_string())
.unwrap_or_else(|err| format!("{err:?}"))
"Intent took too long to execute: {}s. {:?}",
intent_execution_secs, result
);
} else {
trace!("Seconds took to execute intent: {}", intent_execution_secs);
Expand Down
62 changes: 29 additions & 33 deletions magicblock-committor-service/src/intent_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ pub struct IntentExecutorImpl<T, F> {

impl<T, F> IntentExecutorImpl<T, F>
where
T: TransactionPreparator,
T: TransactionPreparator + Clone,
F: TaskInfoFetcher,
{
pub fn new(
Expand Down Expand Up @@ -253,7 +253,6 @@ where
}

/// Starting execution from single stage
// TODO(edwin): introduce recursion stop value in case of some bug?
pub async fn single_stage_execution_flow<P: IntentPersister>(
&self,
base_intent: ScheduledBaseIntent,
Expand All @@ -264,21 +263,7 @@ where
let res = SingleStageExecutor::new(self)
.execute(base_intent, transaction_strategy, &mut junk, persister)
.await;

// Cleanup after intent
// Note: in some cases it maybe critical to execute cleanup synchronously
// Example: if commit nonces were invalid during execution
// next intent could use wrongly initiated buffers by current intent
let cleanup_futs = junk.iter().map(|to_cleanup| {
self.transaction_preparator.cleanup_for_strategy(
&self.authority,
&to_cleanup.optimized_tasks,
&to_cleanup.lookup_tables_keys,
)
});
if let Err(err) = try_join_all(cleanup_futs).await {
error!("Failed to cleanup after intent: {}", err);
}
self.spawn_cleanup_task(junk);

res
}
Expand All @@ -300,21 +285,7 @@ where
persister,
)
.await;

// Cleanup after intent
// Note: in some cases it maybe critical to execute cleanup synchronously
// Example: if commit nonces were invalid during execution
// next intent could use wrongly initiated buffers by current intent
let cleanup_futs = junk.iter().map(|to_cleanup| {
self.transaction_preparator.cleanup_for_strategy(
&self.authority,
&to_cleanup.optimized_tasks,
&to_cleanup.lookup_tables_keys,
)
});
if let Err(err) = try_join_all(cleanup_futs).await {
error!("Failed to cleanup after intent: {}", err);
}
self.spawn_cleanup_task(junk);

res
}
Expand Down Expand Up @@ -678,6 +649,31 @@ where
.await
}

/// Cleanup after intent
/// Note: in some cases it maybe critical to execute cleanup synchronously
/// Example: if commit nonces were invalid during execution
/// next intent could use wrongly initiated buffers by current intent
/// We assume that this case is highly unlikely since it would mean:
/// user redelegates amd reaches current commit id faster than we execute transactions below
fn spawn_cleanup_task(&self, junk: Vec<TransactionStrategy>) {
let authority = self.authority.insecure_clone();
let transaction_preparator = self.transaction_preparator.clone();
let cleanup_fut = async move {
let cleanup_futs = junk.iter().map(|to_cleanup| {
transaction_preparator.cleanup_for_strategy(
&authority,
&to_cleanup.optimized_tasks,
&to_cleanup.lookup_tables_keys,
)
});

if let Err(err) = try_join_all(cleanup_futs).await {
error!("Failed to cleanup after intent: {}", err);
}
};
tokio::spawn(cleanup_fut);
}

async fn intent_metrics(
rpc_client: MagicblockRpcClient,
execution_outcome: ExecutionOutput,
Expand Down Expand Up @@ -741,7 +737,7 @@ where
#[async_trait]
impl<T, C> IntentExecutor for IntentExecutorImpl<T, C>
where
T: TransactionPreparator,
T: TransactionPreparator + Clone,
C: TaskInfoFetcher,
{
/// Executes Message on Base layer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub struct SingleStageExecutor<'a, T, F> {

impl<'a, T, F> SingleStageExecutor<'a, T, F>
where
T: TransactionPreparator,
T: TransactionPreparator + Clone,
F: TaskInfoFetcher,
{
pub fn new(executor: &'a IntentExecutorImpl<T, F>) -> Self {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub struct TwoStageExecutor<'a, T, F> {

impl<'a, T, F> TwoStageExecutor<'a, T, F>
where
T: TransactionPreparator,
T: TransactionPreparator + Clone,
F: TaskInfoFetcher,
{
pub fn new(executor: &'a IntentExecutorImpl<T, F>) -> Self {
Expand Down
12 changes: 12 additions & 0 deletions magicblock-committor-service/src/tasks/args_task.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use dlp::args::{CallHandlerArgs, CommitStateArgs};
use magicblock_metrics::metrics::LabelValue;
use solana_pubkey::Pubkey;
use solana_sdk::instruction::{AccountMeta, Instruction};

Expand Down Expand Up @@ -165,3 +166,14 @@ impl BaseTask for ArgsTask {
commit_task.commit_id = commit_id;
}
}

impl LabelValue for ArgsTask {
fn value(&self) -> &str {
match self.task_type {
ArgsTaskType::Commit(_) => "args_commit",
ArgsTaskType::BaseAction(_) => "args_action",
ArgsTaskType::Finalize(_) => "args_finalize",
ArgsTaskType::Undelegate(_) => "args_undelegate",
}
}
}
9 changes: 9 additions & 0 deletions magicblock-committor-service/src/tasks/buffer_task.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use dlp::args::CommitStateFromBufferArgs;
use magicblock_committor_program::Chunks;
use magicblock_metrics::metrics::LabelValue;
use solana_pubkey::Pubkey;
use solana_sdk::instruction::Instruction;

Expand Down Expand Up @@ -140,3 +141,11 @@ impl BaseTask for BufferTask {
self.preparation_state = Self::preparation_required(&self.task_type)
}
}

impl LabelValue for BufferTask {
fn value(&self) -> &str {
match self.task_type {
BufferTaskType::Commit(_) => "buffer_commit",
}
}
}
3 changes: 2 additions & 1 deletion magicblock-committor-service/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use magicblock_committor_program::{
},
pdas, ChangesetChunks, Chunks,
};
use magicblock_metrics::metrics::LabelValue;
use magicblock_program::magic_scheduled_base_intent::{
BaseAction, CommittedAccount,
};
Expand Down Expand Up @@ -50,7 +51,7 @@ pub enum TaskStrategy {
}

/// A trait representing a task that can be executed on Base layer
pub trait BaseTask: Send + Sync + DynClone {
pub trait BaseTask: Send + Sync + DynClone + LabelValue {
/// Gets all pubkeys that involved in Task's instruction
fn involved_accounts(&self, validator: &Pubkey) -> Vec<Pubkey> {
self.instruction(validator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use log::{error, info};
use magicblock_committor_program::{
instruction_chunks::chunk_realloc_ixs, Chunks,
};
use magicblock_metrics::metrics;
use magicblock_rpc_client::{
utils::{
decide_rpc_error_flow, map_magicblock_client_error,
Expand Down Expand Up @@ -66,13 +67,30 @@ impl DeliveryPreparator {
persister: &Option<P>,
) -> DeliveryPreparatorResult<Vec<AddressLookupTableAccount>> {
let preparation_futures =
strategy.optimized_tasks.iter_mut().map(|task| {
self.prepare_task_handling_errors(authority, task, persister)
strategy.optimized_tasks.iter_mut().map(|task| async move {
let timer =
metrics::observe_committor_intent_task_preparation_time(
task.as_ref(),
);
let res = self
.prepare_task_handling_errors(authority, task, persister)
.await;
timer.stop_and_record();

res
});

let task_preparations = join_all(preparation_futures);
let alts_preparations =
self.prepare_lookup_tables(authority, &strategy.lookup_tables_keys);
let alts_preparations = async {
let timer =
metrics::observe_committor_intent_alt_preparation_time();
let res = self
.prepare_lookup_tables(authority, &strategy.lookup_tables_keys)
.await;
timer.stop_and_record();

res
};

let (res1, res2) = join(task_preparations, alts_preparations).await;
res1.into_iter()
Expand Down Expand Up @@ -215,11 +233,16 @@ impl DeliveryPreparator {
})
.collect::<Vec<_>>();

// Initialization & reallocs
for instructions in preparation_instructions {
self.send_ixs_with_retry(&instructions, authority, 5)
.await?;
}
// Initialization
self.send_ixs_with_retry(&preparation_instructions[0], authority, 5)
.await?;

// Reallocs can be performed in parallel
let preparation_futs =
preparation_instructions.iter().skip(1).map(|instructions| {
self.send_ixs_with_retry(instructions, authority, 5)
});
try_join_all(preparation_futs).await?;

Ok(())
}
Expand Down
51 changes: 51 additions & 0 deletions magicblock-metrics/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ lazy_static::lazy_static! {
// -----------------
// CommittorService
// -----------------
static ref COMMITTOR_INTENTS_COUNT: IntCounter = IntCounter::new(
"committor_intents_count", "Total number of scheduled committor intents"
).unwrap();

static ref COMMITTOR_INTENTS_BACKLOG_COUNT: IntGauge = IntGauge::new(
"committor_intent_backlog_count", "Number of intents in backlog",
).unwrap();
Expand Down Expand Up @@ -219,6 +223,28 @@ lazy_static::lazy_static! {
static ref COMMITTOR_INTENT_CU_USAGE: IntGauge = IntGauge::new(
"committor_intent_cu_usage", "Compute units used for Intent"
).unwrap();


static ref COMMITTOR_INTENT_TASK_PREPARATION_TIME: HistogramVec = HistogramVec::new(
HistogramOpts::new(
"committor_intent_task_preparation_time",
"Time in seconds spent on task preparation"
)
.buckets(
vec![0.1, 1.0, 2.0, 3.0, 5.0]
),
&["task_type"],
).unwrap();

static ref COMMITTOR_INTENT_ALT_PREPARATION_TIME: Histogram = Histogram::with_opts(
HistogramOpts::new(
"committor_intent_alt_preparation_time",
"Time in seconds spent on ALTs preparation"
)
.buckets(
vec![1.0, 3.0, 5.0, 10.0, 15.0, 17.0, 20.0]
),
).unwrap();
}

pub(crate) fn register() {
Expand Down Expand Up @@ -252,11 +278,14 @@ pub(crate) fn register() {
register!(MONITORED_ACCOUNTS_GAUGE);
register!(SUBSCRIPTIONS_COUNT_GAUGE);
register!(EVICTED_ACCOUNTS_COUNT);
register!(COMMITTOR_INTENTS_COUNT);
register!(COMMITTOR_INTENTS_BACKLOG_COUNT);
register!(COMMITTOR_FAILED_INTENTS_COUNT);
register!(COMMITTOR_EXECUTORS_BUSY_COUNT);
register!(COMMITTOR_INTENT_EXECUTION_TIME_HISTOGRAM);
register!(COMMITTOR_INTENT_CU_USAGE);
register!(COMMITTOR_INTENT_TASK_PREPARATION_TIME);
register!(COMMITTOR_INTENT_ALT_PREPARATION_TIME);
register!(ENSURE_ACCOUNTS_TIME);
register!(RPC_REQUEST_HANDLING_TIME);
register!(TRANSACTION_PROCESSING_TIME);
Expand Down Expand Up @@ -363,6 +392,14 @@ pub fn inc_evicted_accounts_count() {
EVICTED_ACCOUNTS_COUNT.inc();
}

pub fn inc_committor_intents_count() {
COMMITTOR_INTENTS_COUNT.inc()
}

pub fn inc_committor_intents_count_by(by: u64) {
COMMITTOR_INTENTS_COUNT.inc_by(by)
}

pub fn set_committor_intents_backlog_count(value: i64) {
COMMITTOR_INTENTS_BACKLOG_COUNT.set(value)
}
Expand Down Expand Up @@ -393,3 +430,17 @@ pub fn observe_committor_intent_execution_time_histogram(
pub fn set_commmittor_intent_cu_usage(value: i64) {
COMMITTOR_INTENT_CU_USAGE.set(value)
}

pub fn observe_committor_intent_task_preparation_time<
L: LabelValue + ?Sized,
>(
task_type: &L,
) -> HistogramTimer {
COMMITTOR_INTENT_TASK_PREPARATION_TIME
.with_label_values(&[task_type.value()])
.start_timer()
}

pub fn observe_committor_intent_alt_preparation_time() -> HistogramTimer {
COMMITTOR_INTENT_ALT_PREPARATION_TIME.start_timer()
}
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,8 @@ async fn test_commit_id_error_recovery() {
)
})
.collect();

tokio::time::sleep(Duration::from_secs(1)).await;
verify(
&fixture.table_mania,
fixture.rpc_client.get_inner(),
Expand Down Expand Up @@ -369,6 +371,8 @@ async fn test_action_error_recovery() {
&[committed_account],
)
.await;

tokio::time::sleep(Duration::from_secs(1)).await;
verify_table_mania_released(
&fixture.table_mania,
&pre_test_tablemania_state,
Expand Down Expand Up @@ -426,6 +430,8 @@ async fn test_commit_id_and_action_errors_recovery() {
&[committed_account],
)
.await;

tokio::time::sleep(Duration::from_secs(1)).await;
verify_table_mania_released(
&fixture.table_mania,
&pre_test_tablemania_state,
Expand Down Expand Up @@ -498,6 +504,7 @@ async fn test_cpi_limits_error_recovery() {
}
));

tokio::time::sleep(Duration::from_secs(1)).await;
let commit_ids_by_pk: HashMap<_, _> = committed_accounts
.iter()
.map(|el| {
Expand Down Expand Up @@ -610,6 +617,7 @@ async fn test_commit_id_actions_cpi_limit_errors_recovery() {
}
));

tokio::time::sleep(Duration::from_secs(1)).await;
let commit_ids_by_pk: HashMap<_, _> = committed_accounts
.iter()
.map(|el| {
Expand Down