Skip to content

Commit 75f97f5

Browse files
review: remove coordinator confirmation for actor shutdown
1 parent 24e1be3 commit 75f97f5

3 files changed

Lines changed: 28 additions & 144 deletions

File tree

crates/ntx-builder/src/actor/mod.rs

Lines changed: 2 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,6 @@ pub enum ActorRequest {
5353
},
5454
/// A note script was fetched from the remote store and should be persisted to the local DB.
5555
CacheNoteScript { script_root: Word, script: NoteScript },
56-
/// The actor has been idle (in `NoViableNotes` mode) for longer than the idle timeout
57-
/// and is requesting to shut down. The builder validates the request against the DB before
58-
/// approving. If approved (ack received), the actor exits. If rejected (`ack_tx` dropped), the
59-
/// actor resumes in `NotesAvailable` mode.
60-
Shutdown {
61-
account_id: NetworkAccountId,
62-
ack_tx: tokio::sync::oneshot::Sender<()>,
63-
},
6456
}
6557

6658
// ACTOR SHUTDOWN REASON
@@ -349,31 +341,14 @@ impl AccountActor {
349341
}
350342
}
351343
}
352-
// Idle timeout: actor has been idle too long, request shutdown.
344+
// Idle timeout: actor has been idle too long, deactivate account.
353345
_ = idle_timeout_sleep => {
354-
match self.initiate_shutdown(account_id).await {
355-
Ok(()) => return ActorShutdownReason::IdleTimeout(account_id),
356-
Err(()) => self.mode = ActorMode::NotesAvailable,
357-
}
346+
return ActorShutdownReason::IdleTimeout(account_id);
358347
}
359348
}
360349
}
361350
}
362351

363-
/// Sends a shutdown request to the builder and waits for acknowledgment.
364-
///
365-
/// Returns `Ok(())` if the builder approved the shutdown (actor should exit).
366-
/// Returns `Err(())` if the builder rejected the shutdown or the channel was dropped
367-
/// (actor should resume as `NotesAvailable`).
368-
async fn initiate_shutdown(&self, account_id: NetworkAccountId) -> Result<(), ()> {
369-
let (ack_tx, ack_rx) = tokio::sync::oneshot::channel();
370-
self.request_tx
371-
.send(ActorRequest::Shutdown { account_id, ack_tx })
372-
.await
373-
.map_err(|_| ())?;
374-
ack_rx.await.map_err(|_| ())
375-
}
376-
377352
/// Selects a transaction candidate by querying the DB.
378353
async fn select_candidate_from_db(
379354
&self,

crates/ntx-builder/src/builder.rs

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -112,9 +112,12 @@ impl NetworkTransactionBuilder {
112112
// Main event loop.
113113
loop {
114114
tokio::select! {
115-
// Handle actor result.
115+
// Handle actor result. If a timed-out actor needs respawning, do so.
116116
result = self.coordinator.next() => {
117-
result?;
117+
if let Some(account_id) = result? {
118+
self.coordinator
119+
.spawn_actor(AccountOrigin::store(account_id), &self.actor_context);
120+
}
118121
},
119122
// Handle mempool events.
120123
event = self.mempool_events.next() => {
@@ -264,17 +267,6 @@ impl NetworkTransactionBuilder {
264267
tracing::error!(err = %err, "failed to cache note script");
265268
}
266269
},
267-
ActorRequest::Shutdown { account_id, ack_tx } => {
268-
let block_num = self.chain_state.read().await.chain_tip_header.block_num();
269-
self.coordinator
270-
.handle_shutdown_request(
271-
account_id,
272-
block_num,
273-
self.config.max_note_attempts,
274-
ack_tx,
275-
)
276-
.await;
277-
},
278270
}
279271
}
280272

crates/ntx-builder/src/coordinator.rs

Lines changed: 21 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use miden_node_proto::domain::account::NetworkAccountId;
77
use miden_node_proto::domain::mempool::MempoolEvent;
88
use miden_node_proto::domain::note::{NetworkNote, SingleTargetNetworkNote};
99
use miden_protocol::account::delta::AccountUpdateDetails;
10-
use miden_protocol::block::BlockNumber;
1110
use tokio::sync::{Notify, Semaphore};
1211
use tokio::task::JoinSet;
1312
use tokio_util::sync::CancellationToken;
@@ -58,10 +57,9 @@ impl ActorHandle {
5857
/// - Prevents resource exhaustion by limiting simultaneous transaction processing.
5958
///
6059
/// ## Actor Lifecycle
61-
/// - Actors that have been idle for longer than the idle timeout request shutdown from the
62-
/// coordinator.
63-
/// - The coordinator validates shutdown requests against the DB: if notes are still available for
64-
/// the account, the request is rejected and the actor resumes processing.
60+
/// - Actors that have been idle for longer than the idle timeout deactivate themselves.
61+
/// - When an actor deactivates, the coordinator checks if a notification arrived just as the actor
62+
/// timed out. If so, the actor is respawned immediately.
6563
/// - Deactivated actors are re-spawned when [`Coordinator::send_targeted`] detects notes targeting
6664
/// an account without an active actor.
6765
///
@@ -160,29 +158,42 @@ impl Coordinator {
160158
///
161159
/// If no actors are currently running, this method will wait indefinitely until
162160
/// new actors are spawned. This prevents busy-waiting when the coordinator is idle.
163-
pub async fn next(&mut self) -> anyhow::Result<()> {
161+
///
162+
/// Returns `Some(account_id)` if a timed-out actor should be respawned (because a
163+
/// notification arrived just as it timed out), or `None` otherwise.
164+
pub async fn next(&mut self) -> anyhow::Result<Option<NetworkAccountId>> {
164165
let actor_result = self.actor_join_set.join_next().await;
165166
match actor_result {
166167
Some(Ok(shutdown_reason)) => match shutdown_reason {
167168
ActorShutdownReason::Cancelled(account_id) => {
168169
// Do not remove the actor from the registry, as it may be re-spawned.
169170
// The coordinator should always remove actors immediately after cancellation.
170171
tracing::info!(account_id = %account_id, "Account actor cancelled");
171-
Ok(())
172+
Ok(None)
172173
},
173174
ActorShutdownReason::SemaphoreFailed(err) => Err(err).context("semaphore failed"),
174175
ActorShutdownReason::DbError(account_id) => {
175176
tracing::error!(account_id = %account_id, "Account actor shut down due to DB error");
176-
Ok(())
177+
Ok(None)
177178
},
178179
ActorShutdownReason::IdleTimeout(account_id) => {
179180
tracing::info!(account_id = %account_id, "Account actor shut down due to idle timeout");
180-
Ok(())
181+
182+
// Remove the actor from the registry, but check if a notification arrived
183+
// just as the actor timed out. If so, the caller should respawn it.
184+
let should_respawn =
185+
self.actor_registry.remove(&account_id).is_some_and(|handle| {
186+
let notified = handle.notify.notified();
187+
tokio::pin!(notified);
188+
notified.enable()
189+
});
190+
191+
Ok(should_respawn.then_some(account_id))
181192
},
182193
},
183194
Some(Err(err)) => {
184195
tracing::error!(err = %err, "actor task failed");
185-
Ok(())
196+
Ok(None)
186197
},
187198
None => {
188199
// There are no actors to wait for. Wait indefinitely until actors are spawned.
@@ -284,57 +295,18 @@ impl Coordinator {
284295
}
285296
}
286297

287-
/// Handles a shutdown request from an actor that has been idle for longer than the idle
288-
/// timeout.
289-
///
290-
/// Validates the request by checking the DB for available notes. If notes are available, the
291-
/// shutdown is rejected by dropping `ack_tx` (the actor detects the `RecvError` and resumes).
292-
/// If no notes are available, the actor is deregistered and the ack is sent, allowing the
293-
/// actor to exit gracefully.
294-
pub async fn handle_shutdown_request(
295-
&mut self,
296-
account_id: NetworkAccountId,
297-
block_num: BlockNumber,
298-
max_note_attempts: usize,
299-
ack_tx: tokio::sync::oneshot::Sender<()>,
300-
) {
301-
let has_notes = self
302-
.db
303-
.has_available_notes(account_id, block_num, max_note_attempts)
304-
.await
305-
.unwrap_or(false);
306-
307-
if has_notes {
308-
// Reject: drop ack_tx -> actor detects RecvError, resumes.
309-
tracing::debug!(
310-
%account_id,
311-
"Rejected actor shutdown: notes available in DB"
312-
);
313-
} else {
314-
self.actor_registry.remove(&account_id);
315-
let _ = ack_tx.send(());
316-
}
317-
}
318-
319298
/// Cancels an actor by its account ID.
320299
pub fn cancel_actor(&mut self, account_id: &NetworkAccountId) {
321300
if let Some(handle) = self.actor_registry.remove(account_id) {
322301
handle.cancel_token.cancel();
323302
}
324303
}
325-
326-
/// Returns `true` if an actor is registered for the given account ID.
327-
#[cfg(test)]
328-
pub fn has_actor(&self, account_id: &NetworkAccountId) -> bool {
329-
self.actor_registry.contains_key(account_id)
330-
}
331304
}
332305

333306
#[cfg(test)]
334307
mod tests {
335308
use miden_node_proto::domain::mempool::MempoolEvent;
336309
use miden_node_proto::domain::note::NetworkNote;
337-
use miden_protocol::block::BlockNumber;
338310

339311
use super::*;
340312
use crate::db::Db;
@@ -355,61 +327,6 @@ mod tests {
355327
.insert(account_id, ActorHandle::new(notify, cancel_token));
356328
}
357329

358-
// HANDLE SHUTDOWN REQUEST TESTS
359-
// ============================================================================================
360-
361-
#[tokio::test]
362-
async fn shutdown_approved_when_no_notes() {
363-
let (mut coordinator, _dir) = test_coordinator().await;
364-
let account_id = mock_network_account_id();
365-
366-
register_dummy_actor(&mut coordinator, account_id);
367-
assert!(coordinator.has_actor(&account_id));
368-
369-
let (ack_tx, ack_rx) = tokio::sync::oneshot::channel();
370-
let block_num = BlockNumber::from(1u32);
371-
let max_note_attempts = 30;
372-
373-
coordinator
374-
.handle_shutdown_request(account_id, block_num, max_note_attempts, ack_tx)
375-
.await;
376-
377-
// Ack should be received (shutdown approved).
378-
assert!(ack_rx.await.is_ok());
379-
// Actor should be deregistered.
380-
assert!(!coordinator.has_actor(&account_id));
381-
}
382-
383-
#[tokio::test]
384-
async fn shutdown_rejected_when_notes_available() {
385-
let (mut coordinator, _dir) = test_coordinator().await;
386-
let account_id = mock_network_account_id();
387-
388-
// Insert a committed note for this account.
389-
let note = mock_single_target_note(account_id, 10);
390-
coordinator
391-
.db
392-
.sync_account_from_store(account_id, mock_account(account_id), vec![note])
393-
.await
394-
.unwrap();
395-
396-
register_dummy_actor(&mut coordinator, account_id);
397-
assert!(coordinator.has_actor(&account_id));
398-
399-
let (ack_tx, ack_rx) = tokio::sync::oneshot::channel();
400-
let block_num = BlockNumber::from(1u32);
401-
let max_note_attempts = 30;
402-
403-
coordinator
404-
.handle_shutdown_request(account_id, block_num, max_note_attempts, ack_tx)
405-
.await;
406-
407-
// Ack_tx should have been dropped (shutdown rejected).
408-
assert!(ack_rx.await.is_err());
409-
// Actor should still be registered.
410-
assert!(coordinator.has_actor(&account_id));
411-
}
412-
413330
// SEND TARGETED TESTS
414331
// ============================================================================================
415332

0 commit comments

Comments
 (0)