Skip to content

Commit 292a47e

Browse files
refactor(ntx-builder): simplify coordinator-actor messaging with Notify (#1699)
1 parent a7ff19b commit 292a47e

8 files changed

Lines changed: 315 additions & 307 deletions

File tree

crates/ntx-builder/src/actor/mod.rs

Lines changed: 104 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ use account_state::TransactionCandidate;
1111
use futures::FutureExt;
1212
use miden_node_proto::clients::{Builder, ValidatorClient};
1313
use miden_node_proto::domain::account::NetworkAccountId;
14-
use miden_node_proto::domain::mempool::MempoolEvent;
1514
use miden_node_utils::ErrorReport;
1615
use miden_node_utils::lru_cache::LruCache;
1716
use miden_protocol::Word;
@@ -20,7 +19,7 @@ use miden_protocol::block::BlockNumber;
2019
use miden_protocol::note::{NoteScript, Nullifier};
2120
use miden_protocol::transaction::TransactionId;
2221
use miden_remote_prover_client::RemoteTransactionProver;
23-
use tokio::sync::{AcquireError, RwLock, Semaphore, mpsc};
22+
use tokio::sync::{AcquireError, Notify, RwLock, Semaphore, mpsc};
2423
use tokio_util::sync::CancellationToken;
2524
use url::Url;
2625

@@ -30,16 +29,19 @@ use crate::builder::ChainState;
3029
use crate::db::Db;
3130
use crate::store::StoreClient;
3231

33-
// ACTOR NOTIFICATION
32+
// ACTOR REQUESTS
3433
// ================================================================================================
3534

36-
/// A notification sent from an account actor to the coordinator.
37-
pub enum ActorNotification {
35+
/// A request sent from an account actor to the coordinator via a shared mpsc channel.
36+
pub enum ActorRequest {
3837
/// One or more notes failed during transaction execution and should have their attempt
39-
/// counters incremented.
38+
/// counters incremented. The actor waits for the coordinator to acknowledge the DB write via
39+
/// the oneshot channel, preventing race conditions where the actor could re-select the same
40+
/// notes before the failure is persisted.
4041
NotesFailed {
4142
nullifiers: Vec<Nullifier>,
4243
block_num: BlockNumber,
44+
ack_tx: tokio::sync::oneshot::Sender<()>,
4345
},
4446
/// A note script was fetched from the remote store and should be persisted to the local DB.
4547
CacheNoteScript { script_root: Word, script: NoteScript },
@@ -50,15 +52,17 @@ pub enum ActorNotification {
5052

5153
/// The reason an actor has shut down.
5254
pub enum ActorShutdownReason {
53-
/// Occurs when an account actor detects failure in the messaging channel used by the
54-
/// coordinator.
55-
EventChannelClosed,
5655
/// Occurs when an account actor detects failure in acquiring the rate-limiting semaphore.
5756
SemaphoreFailed(AcquireError),
5857
/// Occurs when an account actor detects its corresponding cancellation token has been triggered
5958
/// by the coordinator. Cancellation tokens are triggered by the coordinator to initiate
6059
/// graceful shutdown of actors.
6160
Cancelled(NetworkAccountId),
61+
/// Occurs when the actor encounters a database error it cannot recover from.
62+
DbError(NetworkAccountId, miden_node_db::DatabaseError),
63+
/// Occurs when an account actor detects that its account has been removed from the database
64+
/// (e.g. due to a reverted account creation).
65+
AccountRemoved(NetworkAccountId),
6266
}
6367

6468
// ACCOUNT ACTOR CONFIG
@@ -88,8 +92,8 @@ pub struct AccountActorContext {
8892
pub max_note_attempts: usize,
8993
/// Database for persistent state.
9094
pub db: Db,
91-
/// Channel for sending notifications to the coordinator (via the builder event loop).
92-
pub notification_tx: mpsc::Sender<ActorNotification>,
95+
/// Channel for sending requests to the coordinator (via the builder event loop).
96+
pub request_tx: mpsc::Sender<ActorRequest>,
9397
}
9498

9599
// ACCOUNT ORIGIN
@@ -180,7 +184,7 @@ pub struct AccountActor {
180184
store: StoreClient,
181185
db: Db,
182186
mode: ActorMode,
183-
event_rx: mpsc::Receiver<Arc<MempoolEvent>>,
187+
notify: Arc<Notify>,
184188
cancel_token: CancellationToken,
185189
block_producer: BlockProducerClient,
186190
validator: ValidatorClient,
@@ -191,17 +195,16 @@ pub struct AccountActor {
191195
max_notes_per_tx: NonZeroUsize,
192196
/// Maximum number of note execution attempts before dropping a note.
193197
max_note_attempts: usize,
194-
/// Channel for sending notifications to the coordinator.
195-
notification_tx: mpsc::Sender<ActorNotification>,
198+
/// Channel for sending requests to the coordinator.
199+
request_tx: mpsc::Sender<ActorRequest>,
196200
}
197201

198202
impl AccountActor {
199-
/// Constructs a new account actor and corresponding messaging channel with the given
200-
/// configuration.
203+
/// Constructs a new account actor with the given configuration.
201204
pub fn new(
202205
origin: AccountOrigin,
203206
actor_context: &AccountActorContext,
204-
event_rx: mpsc::Receiver<Arc<MempoolEvent>>,
207+
notify: Arc<Notify>,
205208
cancel_token: CancellationToken,
206209
) -> Self {
207210
let block_producer = BlockProducerClient::new(actor_context.block_producer_url.clone());
@@ -218,7 +221,7 @@ impl AccountActor {
218221
store: actor_context.store.clone(),
219222
db: actor_context.db.clone(),
220223
mode: ActorMode::NoViableNotes,
221-
event_rx,
224+
notify,
222225
cancel_token,
223226
block_producer,
224227
validator,
@@ -227,13 +230,13 @@ impl AccountActor {
227230
script_cache: actor_context.script_cache.clone(),
228231
max_notes_per_tx: actor_context.max_notes_per_tx,
229232
max_note_attempts: actor_context.max_note_attempts,
230-
notification_tx: actor_context.notification_tx.clone(),
233+
request_tx: actor_context.request_tx.clone(),
231234
}
232235
}
233236

234237
/// Runs the account actor, processing events and managing state until a reason to shutdown is
235238
/// encountered.
236-
pub async fn run(mut self, semaphore: Arc<Semaphore>) -> ActorShutdownReason {
239+
pub async fn run(mut self, semaphore: Arc<Semaphore>) -> Result<(), ActorShutdownReason> {
237240
let account_id = self.origin.id();
238241

239242
// Determine initial mode by checking DB for available notes.
@@ -242,7 +245,10 @@ impl AccountActor {
242245
.db
243246
.has_available_notes(account_id, block_num, self.max_note_attempts)
244247
.await
245-
.expect("actor should be able to check for available notes");
248+
.map_err(|err| {
249+
tracing::error!(err = err.as_report(), account_id = %account_id, "failed to check for available notes");
250+
ActorShutdownReason::DbError(account_id, err)
251+
})?;
246252

247253
if has_notes {
248254
self.mode = ActorMode::NotesAvailable;
@@ -260,55 +266,50 @@ impl AccountActor {
260266
};
261267
tokio::select! {
262268
_ = self.cancel_token.cancelled() => {
263-
return ActorShutdownReason::Cancelled(account_id);
269+
return Err(ActorShutdownReason::Cancelled(account_id));
264270
}
265-
// Handle mempool events.
266-
event = self.event_rx.recv() => {
267-
let Some(event) = event else {
268-
return ActorShutdownReason::EventChannelClosed;
269-
};
270-
// Re-enable transaction execution if the transaction being waited on has
271-
// been resolved (added to mempool, committed in a block, or reverted).
272-
if let ActorMode::TransactionInflight(awaited_id) = self.mode {
273-
let should_wake = match event.as_ref() {
274-
MempoolEvent::TransactionAdded { id, .. } => *id == awaited_id,
275-
MempoolEvent::BlockCommitted { txs, .. } => {
276-
txs.contains(&awaited_id)
277-
},
278-
MempoolEvent::TransactionsReverted(tx_ids) => {
279-
tx_ids.contains(&awaited_id)
280-
},
281-
};
282-
if should_wake {
271+
// Handle coordinator notifications. On notification, re-evaluate state from DB.
272+
_ = self.notify.notified() => {
273+
match self.mode {
274+
ActorMode::TransactionInflight(awaited_id) => {
275+
// Check DB: is the inflight tx still pending?
276+
let exists = self
277+
.db
278+
.transaction_exists(awaited_id)
279+
.await
280+
.inspect_err(|err| {
281+
tracing::error!(err = err.as_report(), account_id = %account_id, "failed to check transaction status");
282+
})
283+
.map_err(|err| {
284+
ActorShutdownReason::DbError(account_id, err)
285+
})?;
286+
if exists {
287+
self.mode = ActorMode::NotesAvailable;
288+
}
289+
},
290+
_ => {
283291
self.mode = ActorMode::NotesAvailable;
284292
}
285-
} else {
286-
self.mode = ActorMode::NotesAvailable;
287293
}
288294
},
289295
// Execute transactions.
290296
permit = tx_permit_acquisition => {
291-
match permit {
292-
Ok(_permit) => {
293-
// Read the chain state.
294-
let chain_state = self.chain_state.read().await.clone();
295-
296-
// Query DB for latest account and available notes.
297-
let tx_candidate = self.select_candidate_from_db(
298-
account_id,
299-
chain_state,
300-
).await;
301-
302-
if let Some(tx_candidate) = tx_candidate {
303-
self.execute_transactions(account_id, tx_candidate).await;
304-
} else {
305-
// No transactions to execute, wait for events.
306-
self.mode = ActorMode::NoViableNotes;
307-
}
308-
}
309-
Err(err) => {
310-
return ActorShutdownReason::SemaphoreFailed(err);
311-
}
297+
let _permit = permit.map_err(ActorShutdownReason::SemaphoreFailed)?;
298+
299+
// Read the chain state.
300+
let chain_state = self.chain_state.read().await.clone();
301+
302+
// Query DB for latest account and available notes.
303+
let tx_candidate = self.select_candidate_from_db(
304+
account_id,
305+
chain_state,
306+
).await?;
307+
308+
if let Some(tx_candidate) = tx_candidate {
309+
self.execute_transactions(account_id, tx_candidate).await;
310+
} else {
311+
// No transactions to execute, wait for events.
312+
self.mode = ActorMode::NoViableNotes;
312313
}
313314
}
314315
}
@@ -320,30 +321,35 @@ impl AccountActor {
320321
&self,
321322
account_id: NetworkAccountId,
322323
chain_state: ChainState,
323-
) -> Option<TransactionCandidate> {
324+
) -> Result<Option<TransactionCandidate>, ActorShutdownReason> {
324325
let block_num = chain_state.chain_tip_header.block_num();
325326
let max_notes = self.max_notes_per_tx.get();
326327

327328
let (latest_account, notes) = self
328329
.db
329330
.select_candidate(account_id, block_num, self.max_note_attempts)
330331
.await
331-
.expect("actor should be able to query DB for candidate");
332+
.map_err(|err| {
333+
tracing::error!(err = err.as_report(), account_id = %account_id, "failed to query DB for transaction candidate");
334+
ActorShutdownReason::DbError(account_id, err)
335+
})?;
332336

333-
let account = latest_account?;
337+
let Some(account) = latest_account else {
338+
return Err(ActorShutdownReason::AccountRemoved(account_id));
339+
};
334340

335341
let notes: Vec<_> = notes.into_iter().take(max_notes).collect();
336342
if notes.is_empty() {
337-
return None;
343+
return Ok(None);
338344
}
339345

340346
let (chain_tip_header, chain_mmr) = chain_state.into_parts();
341-
Some(TransactionCandidate {
347+
Ok(Some(TransactionCandidate {
342348
account,
343349
notes,
344350
chain_tip_header,
345351
chain_mmr,
346-
})
352+
}))
347353
}
348354

349355
/// Execute a transaction candidate and mark notes as failed as required.
@@ -370,17 +376,13 @@ impl AccountActor {
370376
let notes = tx_candidate.notes.clone();
371377
let execution_result = context.execute_transaction(tx_candidate).await;
372378
match execution_result {
373-
// Execution completed without failed notes.
374-
Ok((tx_id, failed, scripts_to_cache)) if failed.is_empty() => {
375-
self.cache_note_scripts(scripts_to_cache).await;
376-
self.mode = ActorMode::TransactionInflight(tx_id);
377-
},
378-
// Execution completed with some failed notes.
379379
Ok((tx_id, failed, scripts_to_cache)) => {
380380
self.cache_note_scripts(scripts_to_cache).await;
381-
let nullifiers: Vec<_> =
382-
failed.into_iter().map(|note| note.note.nullifier()).collect();
383-
self.mark_notes_failed(&nullifiers, block_num).await;
381+
if !failed.is_empty() {
382+
let nullifiers: Vec<_> =
383+
failed.into_iter().map(|note| note.note.nullifier()).collect();
384+
self.mark_notes_failed(&nullifiers, block_num).await;
385+
}
384386
self.mode = ActorMode::TransactionInflight(tx_id);
385387
},
386388
// Transaction execution failed.
@@ -393,25 +395,39 @@ impl AccountActor {
393395
}
394396
}
395397

396-
/// Sends notifications to the coordinator to cache note scripts fetched from the remote store.
398+
/// Sends requests to the coordinator to cache note scripts fetched from the remote store.
397399
async fn cache_note_scripts(&self, scripts: Vec<(Word, NoteScript)>) {
398400
for (script_root, script) in scripts {
399-
let _ = self
400-
.notification_tx
401-
.send(ActorNotification::CacheNoteScript { script_root, script })
402-
.await;
401+
if self
402+
.request_tx
403+
.send(ActorRequest::CacheNoteScript { script_root, script })
404+
.await
405+
.is_err()
406+
{
407+
break;
408+
}
403409
}
404410
}
405411

406-
/// Sends a notification to the coordinator to mark notes as failed.
412+
/// Sends a request to the coordinator to mark notes as failed and waits for the DB write to
413+
/// complete. This prevents a race condition where the actor could re-select the same notes
414+
/// before the failure counts are updated in the database.
407415
async fn mark_notes_failed(&self, nullifiers: &[Nullifier], block_num: BlockNumber) {
408-
let _ = self
409-
.notification_tx
410-
.send(ActorNotification::NotesFailed {
416+
let (ack_tx, ack_rx) = tokio::sync::oneshot::channel();
417+
if self
418+
.request_tx
419+
.send(ActorRequest::NotesFailed {
411420
nullifiers: nullifiers.to_vec(),
412421
block_num,
422+
ack_tx,
413423
})
414-
.await;
424+
.await
425+
.is_err()
426+
{
427+
return;
428+
}
429+
// Wait for the coordinator to confirm the DB write.
430+
let _ = ack_rx.await;
415431
}
416432
}
417433

0 commit comments

Comments
 (0)