diff --git a/magicblock-committor-service/src/intent_execution_manager/intent_execution_engine.rs b/magicblock-committor-service/src/intent_execution_manager/intent_execution_engine.rs index 6fce85908..f1bc8f7fa 100644 --- a/magicblock-committor-service/src/intent_execution_manager/intent_execution_engine.rs +++ b/magicblock-committor-service/src/intent_execution_manager/intent_execution_engine.rs @@ -294,7 +294,7 @@ where intent: &ScheduledBaseIntentWrapper, result: &IntentExecutorResult, ) { - const EXECUTION_TIME_THRESHOLD: f64 = 2.0; + const EXECUTION_TIME_THRESHOLD: f64 = 5.0; let intent_execution_secs = execution_time.as_secs_f64(); metrics::observe_committor_intent_execution_time_histogram( @@ -309,12 +309,8 @@ where // Loki alerts if intent_execution_secs >= EXECUTION_TIME_THRESHOLD { info!( - "Intent took too long to execute: {}s. {}", - intent_execution_secs, - result - .as_ref() - .map(|_| "succeeded".to_string()) - .unwrap_or_else(|err| format!("{err:?}")) + "Intent took too long to execute: {}s. {:?}", + intent_execution_secs, result ); } else { trace!("Seconds took to execute intent: {}", intent_execution_secs); diff --git a/magicblock-committor-service/src/intent_executor/mod.rs b/magicblock-committor-service/src/intent_executor/mod.rs index fa85cfee1..464d8cdf6 100644 --- a/magicblock-committor-service/src/intent_executor/mod.rs +++ b/magicblock-committor-service/src/intent_executor/mod.rs @@ -102,7 +102,7 @@ pub struct IntentExecutorImpl { impl IntentExecutorImpl where - T: TransactionPreparator, + T: TransactionPreparator + Clone, F: TaskInfoFetcher, { pub fn new( @@ -253,7 +253,6 @@ where } /// Starting execution from single stage - // TODO(edwin): introduce recursion stop value in case of some bug? pub async fn single_stage_execution_flow( &self, base_intent: ScheduledBaseIntent, @@ -264,21 +263,7 @@ where let res = SingleStageExecutor::new(self) .execute(base_intent, transaction_strategy, &mut junk, persister) .await; - - // Cleanup after intent - // Note: in some cases it maybe critical to execute cleanup synchronously - // Example: if commit nonces were invalid during execution - // next intent could use wrongly initiated buffers by current intent - let cleanup_futs = junk.iter().map(|to_cleanup| { - self.transaction_preparator.cleanup_for_strategy( - &self.authority, - &to_cleanup.optimized_tasks, - &to_cleanup.lookup_tables_keys, - ) - }); - if let Err(err) = try_join_all(cleanup_futs).await { - error!("Failed to cleanup after intent: {}", err); - } + self.spawn_cleanup_task(junk); res } @@ -300,21 +285,7 @@ where persister, ) .await; - - // Cleanup after intent - // Note: in some cases it maybe critical to execute cleanup synchronously - // Example: if commit nonces were invalid during execution - // next intent could use wrongly initiated buffers by current intent - let cleanup_futs = junk.iter().map(|to_cleanup| { - self.transaction_preparator.cleanup_for_strategy( - &self.authority, - &to_cleanup.optimized_tasks, - &to_cleanup.lookup_tables_keys, - ) - }); - if let Err(err) = try_join_all(cleanup_futs).await { - error!("Failed to cleanup after intent: {}", err); - } + self.spawn_cleanup_task(junk); res } @@ -678,6 +649,31 @@ where .await } + /// Cleanup after intent + /// Note: in some cases it maybe critical to execute cleanup synchronously + /// Example: if commit nonces were invalid during execution + /// next intent could use wrongly initiated buffers by current intent + /// We assume that this case is highly unlikely since it would mean: + /// user redelegates amd reaches current commit id faster than we execute transactions below + fn spawn_cleanup_task(&self, junk: Vec) { + let authority = self.authority.insecure_clone(); + let transaction_preparator = self.transaction_preparator.clone(); + let cleanup_fut = async move { + let cleanup_futs = junk.iter().map(|to_cleanup| { + transaction_preparator.cleanup_for_strategy( + &authority, + &to_cleanup.optimized_tasks, + &to_cleanup.lookup_tables_keys, + ) + }); + + if let Err(err) = try_join_all(cleanup_futs).await { + error!("Failed to cleanup after intent: {}", err); + } + }; + tokio::spawn(cleanup_fut); + } + async fn intent_metrics( rpc_client: MagicblockRpcClient, execution_outcome: ExecutionOutput, @@ -741,7 +737,7 @@ where #[async_trait] impl IntentExecutor for IntentExecutorImpl where - T: TransactionPreparator, + T: TransactionPreparator + Clone, C: TaskInfoFetcher, { /// Executes Message on Base layer diff --git a/magicblock-committor-service/src/intent_executor/single_stage_executor.rs b/magicblock-committor-service/src/intent_executor/single_stage_executor.rs index ad42448c6..4a752edb9 100644 --- a/magicblock-committor-service/src/intent_executor/single_stage_executor.rs +++ b/magicblock-committor-service/src/intent_executor/single_stage_executor.rs @@ -24,7 +24,7 @@ pub struct SingleStageExecutor<'a, T, F> { impl<'a, T, F> SingleStageExecutor<'a, T, F> where - T: TransactionPreparator, + T: TransactionPreparator + Clone, F: TaskInfoFetcher, { pub fn new(executor: &'a IntentExecutorImpl) -> Self { diff --git a/magicblock-committor-service/src/intent_executor/two_stage_executor.rs b/magicblock-committor-service/src/intent_executor/two_stage_executor.rs index 227a67221..64751fef2 100644 --- a/magicblock-committor-service/src/intent_executor/two_stage_executor.rs +++ b/magicblock-committor-service/src/intent_executor/two_stage_executor.rs @@ -23,7 +23,7 @@ pub struct TwoStageExecutor<'a, T, F> { impl<'a, T, F> TwoStageExecutor<'a, T, F> where - T: TransactionPreparator, + T: TransactionPreparator + Clone, F: TaskInfoFetcher, { pub fn new(executor: &'a IntentExecutorImpl) -> Self { diff --git a/magicblock-committor-service/src/tasks/args_task.rs b/magicblock-committor-service/src/tasks/args_task.rs index 301db3b63..fd41a0db4 100644 --- a/magicblock-committor-service/src/tasks/args_task.rs +++ b/magicblock-committor-service/src/tasks/args_task.rs @@ -1,4 +1,5 @@ use dlp::args::{CallHandlerArgs, CommitStateArgs}; +use magicblock_metrics::metrics::LabelValue; use solana_pubkey::Pubkey; use solana_sdk::instruction::{AccountMeta, Instruction}; @@ -165,3 +166,14 @@ impl BaseTask for ArgsTask { commit_task.commit_id = commit_id; } } + +impl LabelValue for ArgsTask { + fn value(&self) -> &str { + match self.task_type { + ArgsTaskType::Commit(_) => "args_commit", + ArgsTaskType::BaseAction(_) => "args_action", + ArgsTaskType::Finalize(_) => "args_finalize", + ArgsTaskType::Undelegate(_) => "args_undelegate", + } + } +} diff --git a/magicblock-committor-service/src/tasks/buffer_task.rs b/magicblock-committor-service/src/tasks/buffer_task.rs index dbb23a9b5..853c0dcd9 100644 --- a/magicblock-committor-service/src/tasks/buffer_task.rs +++ b/magicblock-committor-service/src/tasks/buffer_task.rs @@ -1,5 +1,6 @@ use dlp::args::CommitStateFromBufferArgs; use magicblock_committor_program::Chunks; +use magicblock_metrics::metrics::LabelValue; use solana_pubkey::Pubkey; use solana_sdk::instruction::Instruction; @@ -140,3 +141,11 @@ impl BaseTask for BufferTask { self.preparation_state = Self::preparation_required(&self.task_type) } } + +impl LabelValue for BufferTask { + fn value(&self) -> &str { + match self.task_type { + BufferTaskType::Commit(_) => "buffer_commit", + } + } +} diff --git a/magicblock-committor-service/src/tasks/mod.rs b/magicblock-committor-service/src/tasks/mod.rs index a31e52c4a..3841cab6a 100644 --- a/magicblock-committor-service/src/tasks/mod.rs +++ b/magicblock-committor-service/src/tasks/mod.rs @@ -10,6 +10,7 @@ use magicblock_committor_program::{ }, pdas, ChangesetChunks, Chunks, }; +use magicblock_metrics::metrics::LabelValue; use magicblock_program::magic_scheduled_base_intent::{ BaseAction, CommittedAccount, }; @@ -50,7 +51,7 @@ pub enum TaskStrategy { } /// A trait representing a task that can be executed on Base layer -pub trait BaseTask: Send + Sync + DynClone { +pub trait BaseTask: Send + Sync + DynClone + LabelValue { /// Gets all pubkeys that involved in Task's instruction fn involved_accounts(&self, validator: &Pubkey) -> Vec { self.instruction(validator) diff --git a/magicblock-committor-service/src/transaction_preparator/delivery_preparator.rs b/magicblock-committor-service/src/transaction_preparator/delivery_preparator.rs index 13001df0e..94da08628 100644 --- a/magicblock-committor-service/src/transaction_preparator/delivery_preparator.rs +++ b/magicblock-committor-service/src/transaction_preparator/delivery_preparator.rs @@ -6,6 +6,7 @@ use log::{error, info}; use magicblock_committor_program::{ instruction_chunks::chunk_realloc_ixs, Chunks, }; +use magicblock_metrics::metrics; use magicblock_rpc_client::{ utils::{ decide_rpc_error_flow, map_magicblock_client_error, @@ -66,13 +67,30 @@ impl DeliveryPreparator { persister: &Option

, ) -> DeliveryPreparatorResult> { let preparation_futures = - strategy.optimized_tasks.iter_mut().map(|task| { - self.prepare_task_handling_errors(authority, task, persister) + strategy.optimized_tasks.iter_mut().map(|task| async move { + let timer = + metrics::observe_committor_intent_task_preparation_time( + task.as_ref(), + ); + let res = self + .prepare_task_handling_errors(authority, task, persister) + .await; + timer.stop_and_record(); + + res }); let task_preparations = join_all(preparation_futures); - let alts_preparations = - self.prepare_lookup_tables(authority, &strategy.lookup_tables_keys); + let alts_preparations = async { + let timer = + metrics::observe_committor_intent_alt_preparation_time(); + let res = self + .prepare_lookup_tables(authority, &strategy.lookup_tables_keys) + .await; + timer.stop_and_record(); + + res + }; let (res1, res2) = join(task_preparations, alts_preparations).await; res1.into_iter() @@ -215,11 +233,16 @@ impl DeliveryPreparator { }) .collect::>(); - // Initialization & reallocs - for instructions in preparation_instructions { - self.send_ixs_with_retry(&instructions, authority, 5) - .await?; - } + // Initialization + self.send_ixs_with_retry(&preparation_instructions[0], authority, 5) + .await?; + + // Reallocs can be performed in parallel + let preparation_futs = + preparation_instructions.iter().skip(1).map(|instructions| { + self.send_ixs_with_retry(instructions, authority, 5) + }); + try_join_all(preparation_futs).await?; Ok(()) } diff --git a/magicblock-metrics/src/metrics/mod.rs b/magicblock-metrics/src/metrics/mod.rs index 96171bcdd..6dcddcefc 100644 --- a/magicblock-metrics/src/metrics/mod.rs +++ b/magicblock-metrics/src/metrics/mod.rs @@ -192,6 +192,10 @@ lazy_static::lazy_static! { // ----------------- // CommittorService // ----------------- + static ref COMMITTOR_INTENTS_COUNT: IntCounter = IntCounter::new( + "committor_intents_count", "Total number of scheduled committor intents" + ).unwrap(); + static ref COMMITTOR_INTENTS_BACKLOG_COUNT: IntGauge = IntGauge::new( "committor_intent_backlog_count", "Number of intents in backlog", ).unwrap(); @@ -219,6 +223,28 @@ lazy_static::lazy_static! { static ref COMMITTOR_INTENT_CU_USAGE: IntGauge = IntGauge::new( "committor_intent_cu_usage", "Compute units used for Intent" ).unwrap(); + + + static ref COMMITTOR_INTENT_TASK_PREPARATION_TIME: HistogramVec = HistogramVec::new( + HistogramOpts::new( + "committor_intent_task_preparation_time", + "Time in seconds spent on task preparation" + ) + .buckets( + vec![0.1, 1.0, 2.0, 3.0, 5.0] + ), + &["task_type"], + ).unwrap(); + + static ref COMMITTOR_INTENT_ALT_PREPARATION_TIME: Histogram = Histogram::with_opts( + HistogramOpts::new( + "committor_intent_alt_preparation_time", + "Time in seconds spent on ALTs preparation" + ) + .buckets( + vec![1.0, 3.0, 5.0, 10.0, 15.0, 17.0, 20.0] + ), + ).unwrap(); } pub(crate) fn register() { @@ -252,11 +278,14 @@ pub(crate) fn register() { register!(MONITORED_ACCOUNTS_GAUGE); register!(SUBSCRIPTIONS_COUNT_GAUGE); register!(EVICTED_ACCOUNTS_COUNT); + register!(COMMITTOR_INTENTS_COUNT); register!(COMMITTOR_INTENTS_BACKLOG_COUNT); register!(COMMITTOR_FAILED_INTENTS_COUNT); register!(COMMITTOR_EXECUTORS_BUSY_COUNT); register!(COMMITTOR_INTENT_EXECUTION_TIME_HISTOGRAM); register!(COMMITTOR_INTENT_CU_USAGE); + register!(COMMITTOR_INTENT_TASK_PREPARATION_TIME); + register!(COMMITTOR_INTENT_ALT_PREPARATION_TIME); register!(ENSURE_ACCOUNTS_TIME); register!(RPC_REQUEST_HANDLING_TIME); register!(TRANSACTION_PROCESSING_TIME); @@ -363,6 +392,14 @@ pub fn inc_evicted_accounts_count() { EVICTED_ACCOUNTS_COUNT.inc(); } +pub fn inc_committor_intents_count() { + COMMITTOR_INTENTS_COUNT.inc() +} + +pub fn inc_committor_intents_count_by(by: u64) { + COMMITTOR_INTENTS_COUNT.inc_by(by) +} + pub fn set_committor_intents_backlog_count(value: i64) { COMMITTOR_INTENTS_BACKLOG_COUNT.set(value) } @@ -393,3 +430,17 @@ pub fn observe_committor_intent_execution_time_histogram( pub fn set_commmittor_intent_cu_usage(value: i64) { COMMITTOR_INTENT_CU_USAGE.set(value) } + +pub fn observe_committor_intent_task_preparation_time< + L: LabelValue + ?Sized, +>( + task_type: &L, +) -> HistogramTimer { + COMMITTOR_INTENT_TASK_PREPARATION_TIME + .with_label_values(&[task_type.value()]) + .start_timer() +} + +pub fn observe_committor_intent_alt_preparation_time() -> HistogramTimer { + COMMITTOR_INTENT_ALT_PREPARATION_TIME.start_timer() +} diff --git a/test-integration/test-committor-service/tests/test_intent_executor.rs b/test-integration/test-committor-service/tests/test_intent_executor.rs index 8e904e902..cd301c764 100644 --- a/test-integration/test-committor-service/tests/test_intent_executor.rs +++ b/test-integration/test-committor-service/tests/test_intent_executor.rs @@ -316,6 +316,8 @@ async fn test_commit_id_error_recovery() { ) }) .collect(); + + tokio::time::sleep(Duration::from_secs(1)).await; verify( &fixture.table_mania, fixture.rpc_client.get_inner(), @@ -369,6 +371,8 @@ async fn test_action_error_recovery() { &[committed_account], ) .await; + + tokio::time::sleep(Duration::from_secs(1)).await; verify_table_mania_released( &fixture.table_mania, &pre_test_tablemania_state, @@ -426,6 +430,8 @@ async fn test_commit_id_and_action_errors_recovery() { &[committed_account], ) .await; + + tokio::time::sleep(Duration::from_secs(1)).await; verify_table_mania_released( &fixture.table_mania, &pre_test_tablemania_state, @@ -498,6 +504,7 @@ async fn test_cpi_limits_error_recovery() { } )); + tokio::time::sleep(Duration::from_secs(1)).await; let commit_ids_by_pk: HashMap<_, _> = committed_accounts .iter() .map(|el| { @@ -610,6 +617,7 @@ async fn test_commit_id_actions_cpi_limit_errors_recovery() { } )); + tokio::time::sleep(Duration::from_secs(1)).await; let commit_ids_by_pk: HashMap<_, _> = committed_accounts .iter() .map(|el| {