Skip to content
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions canister/src/execute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ pub const EXECUTOR: Executor = Executor;
pub enum ExecutionStatus {
Complete,
MoreWork,
/// The caller couldn't acquire the matching guard because another task
/// is already running. The holder is responsible for rescheduling if it
/// leaves work unfinished.
AlreadyRunning,
}

impl ExecutionStatus {
Expand Down
25 changes: 0 additions & 25 deletions canister/src/guard/mod.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,10 @@
use crate::Task;
use crate::order::TokenId;
use crate::state::with_state_mut;
use candid::Principal;

#[cfg(test)]
mod tests;

#[derive(Eq, PartialEq, Debug)]
pub struct TimerGuard {
task: Task,
}

impl TimerGuard {
pub fn new(task: Task) -> Option<Self> {
with_state_mut(|s| {
if !s.active_tasks_mut().insert(task) {
return None;
}
Some(Self { task })
})
}
}

impl Drop for TimerGuard {
fn drop(&mut self) {
with_state_mut(|s| {
s.active_tasks_mut().remove(&self.task);
});
}
}

/// RAII guard to prevent concurrent deposit/withdraw operations per
/// `(caller, token)`.
#[derive(Eq, PartialEq, Debug)]
Expand Down
40 changes: 18 additions & 22 deletions canister/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,6 @@ mod tests;

pub const MATCHING_INTERVAL: Duration = Duration::from_mins(1);

#[derive(Copy, Clone, Eq, PartialEq, Debug, Ord, PartialOrd)]
pub enum Task {
ProcessPendingOrders,
}

pub fn add_limit_order(
request: LimitOrderRequest,
runtime: &impl Runtime,
Expand Down Expand Up @@ -108,32 +103,33 @@ pub fn cancel_limit_order(
}

pub fn process_pending_orders(runtime: &impl Runtime) -> execute::ExecutionStatus {
let _guard = match guard::TimerGuard::new(Task::ProcessPendingOrders) {
Some(guard) => guard,
None => return execute::ExecutionStatus::AlreadyRunning,
};

state::with_state_mut(|s| EXECUTOR.run_once(s, runtime))
}

/// Schedule a zero-delay timer to drive matching, unless one is already
/// pending. Collapses a burst of kickoffs — e.g. back-to-back `add_limit_order`
/// calls plus the [`drive_matching`] self-reschedule chain — into a single
/// drive loop instead of one timer per call.
pub fn schedule_matching_timer() {
let should_schedule = state::with_state_mut(|s| s.try_mark_matching_timer_scheduled());
if should_schedule {
ic_cdk_timers::set_timer(Duration::ZERO, async {
drive_matching();
});
}
}

/// Run one chunk of matching/settling and, if more work remains, schedule a
/// zero-delay timer to continue. Intended for IC entry points (the periodic
/// matching timer and the post-`add_limit_order` kickoff) — tests should call
/// [`process_pending_orders`] directly, which is synchronous and timer-free.
pub fn drive_matching() {
// This timer has now fired; clear the flag so the `MoreWork` path below
// re-arms exactly one continuation and a fresh kickoff can schedule again.
state::with_state_mut(|s| s.clear_matching_timer_scheduled());
Comment thread
gregorydemay marked this conversation as resolved.
Outdated
match process_pending_orders(&IC_RUNTIME) {
Comment thread
gregorydemay marked this conversation as resolved.
Comment thread
gregorydemay marked this conversation as resolved.
execute::ExecutionStatus::MoreWork => {
// TODO DEFI-2823: coalesce zero-delay matching timers so a
// burst of `add_limit_order` kickoffs plus this self-reschedule
// chain doesn't queue O(N) redundant timers per burst.
ic_cdk_timers::set_timer(Duration::ZERO, async {
drive_matching();
});
}
// Complete: nothing left to do. AlreadyRunning: the holder will
// reschedule itself if its run left work unfinished, so we don't
// pile on another timer.
execute::ExecutionStatus::Complete | execute::ExecutionStatus::AlreadyRunning => {}
execute::ExecutionStatus::MoreWork => schedule_matching_timer(),
execute::ExecutionStatus::Complete => {}
}
}

Expand Down
11 changes: 3 additions & 8 deletions canister/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,8 @@ fn add_limit_order(request: LimitOrderRequest) -> Result<OrderId, AddLimitOrderE
request
);
// Trigger matching immediately, no need to wait for the periodic timer.
// TODO DEFI-2823: coalesce — a burst of order placements currently
// queues one zero-delay timer per call.
ic_cdk_timers::set_timer(std::time::Duration::ZERO, async {
oisy_trade_canister::drive_matching();
});
// Coalesced: a burst of placements schedules a single matching timer.
oisy_trade_canister::schedule_matching_timer();
Ok(order_id)
}

Expand Down Expand Up @@ -191,9 +188,7 @@ fn resume_trading(pairs: Option<Vec<TradingPair>>) -> Result<(), UnauthorizedErr
oisy_trade_canister::resume_trading(pairs, &oisy_trade_canister::IC_RUNTIME)?;
// Re-arm matching immediately so orders that piled up while halted match now,
// without waiting for the periodic timer. Mirrors the add_limit_order kickoff.
ic_cdk_timers::set_timer(std::time::Duration::ZERO, async {
oisy_trade_canister::drive_matching();
});
oisy_trade_canister::schedule_matching_timer();
Ok(())
}

Expand Down
33 changes: 20 additions & 13 deletions canister/src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ pub use snapshot::StateSnapshot;
mod tests;

use crate::Runtime;
use crate::Task;
use crate::Timestamp;
use crate::balance::{Balance, TokenBalance};
use crate::order::{
Expand Down Expand Up @@ -91,7 +90,7 @@ pub struct State<MH: Memory, MB: Memory> {
/// for cancels) and drained by the paired `SettlingEvent` dispatch in
/// [`Self::record_settling_event`].
pending_settling_events: VecDeque<event::SettlingEvent>,
active_tasks: BTreeSet<Task>,
matching_timer_scheduled: bool,
/// Per-`(caller, token)` guard set for in-flight deposit/withdraw
/// operations. Entries live only for the duration of a single async
/// request and are reset on upgrade.
Expand All @@ -118,7 +117,7 @@ impl<MH: Memory, MB: Memory> State<MH, MB> {
user_registry,
balances,
order_history,
active_tasks: BTreeSet::default(),
matching_timer_scheduled: false,
ledger_fee_cache: BTreeMap::default(),
pending_settling_events: VecDeque::default(),
in_flight_user_ops: BTreeSet::default(),
Expand Down Expand Up @@ -832,13 +831,21 @@ impl<MH: Memory, MB: Memory> State<MH, MB> {
}
}

/// Set of currently active tasks to avoid parallel execution.
pub fn active_tasks_mut(&mut self) -> &mut BTreeSet<Task> {
&mut self.active_tasks
/// Marks that a zero-delay matching timer is scheduled, returning `true`
/// only when none was already pending. A burst of callers therefore yields
/// a single scheduled timer; the rest observe `false` and skip scheduling.
pub fn try_mark_matching_timer_scheduled(&mut self) -> bool {
if self.matching_timer_scheduled {
false
} else {
self.matching_timer_scheduled = true;
true
}
}

pub fn active_tasks(&self) -> &BTreeSet<Task> {
&self.active_tasks
/// Clears the matching-timer-scheduled flag once the timer has fired.
pub fn clear_matching_timer_scheduled(&mut self) {
self.matching_timer_scheduled = false;
}

pub fn in_flight_user_ops_mut(&mut self) -> &mut BTreeSet<(Principal, TokenId)> {
Expand Down Expand Up @@ -1032,7 +1039,7 @@ impl Clone for State<ic_stable_structures::VectorMemory, ic_stable_structures::V
order_books,
user_registry,
balances,
active_tasks,
matching_timer_scheduled,
ledger_fee_cache,
order_history,
pending_settling_events,
Expand All @@ -1048,7 +1055,7 @@ impl Clone for State<ic_stable_structures::VectorMemory, ic_stable_structures::V
order_books: order_books.clone(),
user_registry: user_registry.clone(),
balances: balances.clone(),
active_tasks: active_tasks.clone(),
matching_timer_scheduled: *matching_timer_scheduled,
ledger_fee_cache: ledger_fee_cache.clone(),
order_history: order_history.clone(),
pending_settling_events: pending_settling_events.clone(),
Expand All @@ -1070,7 +1077,7 @@ impl PartialEq for State<ic_stable_structures::VectorMemory, ic_stable_structure
order_books,
user_registry,
balances,
active_tasks,
matching_timer_scheduled,
ledger_fee_cache,
order_history,
pending_settling_events,
Expand All @@ -1086,7 +1093,7 @@ impl PartialEq for State<ic_stable_structures::VectorMemory, ic_stable_structure
order_books: other_order_books,
user_registry: other_user_registry,
balances: other_balances,
active_tasks: other_active_tasks,
matching_timer_scheduled: other_matching_timer_scheduled,
ledger_fee_cache: other_ledger_fee_cache,
order_history: other_order_history,
pending_settling_events: other_pending_settling_events,
Expand All @@ -1101,7 +1108,7 @@ impl PartialEq for State<ic_stable_structures::VectorMemory, ic_stable_structure
&& order_books == other_order_books
&& user_registry == other_user_registry
&& balances == other_balances
&& active_tasks == other_active_tasks
&& matching_timer_scheduled == other_matching_timer_scheduled
&& ledger_fee_cache == other_ledger_fee_cache
&& order_history == other_order_history
&& pending_settling_events == other_pending_settling_events
Expand Down
4 changes: 2 additions & 2 deletions canister/src/state/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl StateSnapshot {
// ignored: live in stable memory,
order_history: _,
// ignored: timers are reset upon upgrades
active_tasks: _,
matching_timer_scheduled: _,
ledger_fee_cache,
pending_settling_events,
// ignored: per-request guard set, reset upon upgrades
Expand Down Expand Up @@ -246,7 +246,7 @@ impl StateSnapshot {
user_registry,
balances,
order_history,
active_tasks: Default::default(),
matching_timer_scheduled: false,
ledger_fee_cache,
pending_settling_events,
in_flight_user_ops: Default::default(),
Expand Down
16 changes: 7 additions & 9 deletions canister/src/state/snapshot/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,17 +371,15 @@ fn should_roundtrip_notional_bounds_through_snapshot() {
assert_eq!(book.max_notional(), max_notional);
}

/// Transient guard sets (`active_tasks`, `in_flight_user_ops`) are
/// intentionally excluded from the snapshot and reset to empty on restore.
/// Transient runtime state (`matching_timer_scheduled`, `in_flight_user_ops`)
/// is intentionally excluded from the snapshot and reset on restore.
#[test]
fn should_drop_transient_guard_sets_on_roundtrip() {
let mut state = fresh_state();
let user = Principal::from_slice(&[0x01]);
let token = crate::order::TokenId::new(Principal::from_slice(&[0xAA]));

state
.active_tasks_mut()
.insert(crate::Task::ProcessPendingOrders);
assert!(state.try_mark_matching_timer_scheduled());
state.in_flight_user_ops_mut().insert((user, token));

let snapshot = StateSnapshot::from_state(&state);
Expand All @@ -395,8 +393,8 @@ fn should_drop_transient_guard_sets_on_roundtrip() {
);

assert!(
restored.active_tasks().is_empty(),
"active_tasks must be empty after restore"
!restored.matching_timer_scheduled,
"matching_timer_scheduled must be reset after restore"
);
assert!(
restored.in_flight_user_ops().is_empty(),
Expand All @@ -406,11 +404,11 @@ fn should_drop_transient_guard_sets_on_roundtrip() {
assert_eq!(
state,
State {
active_tasks: state.active_tasks().clone(),
matching_timer_scheduled: state.matching_timer_scheduled,
in_flight_user_ops: state.in_flight_user_ops().clone(),
..restored
},
"Except for transient guard sets, restored state must be equal to original"
"Except for transient runtime state, restored state must be equal to original"
);
}

Expand Down
35 changes: 20 additions & 15 deletions canister/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2094,26 +2094,31 @@ mod get_fee_balances {
}
}

mod process_pending_orders {
use crate::execute::ExecutionStatus;
use crate::test_fixtures::init_state_with_order_book;
use crate::test_fixtures::mocks::mock_runtime_for;
use candid::Principal;
mod matching_timer_coalescing {
use crate::state;
use crate::test_fixtures::state_vmem;

#[test]
fn should_return_already_running_when_guard_is_held() {
init_state_with_order_book();
// Simulate a concurrent matching task holding the guard.
crate::state::with_state_mut(|s| {
fn should_schedule_a_single_timer_until_it_fires() {
state::init_state(state_vmem());
state::with_state_mut(|s| {
assert!(
s.active_tasks_mut()
.insert(crate::Task::ProcessPendingOrders)
s.try_mark_matching_timer_scheduled(),
"first kickoff schedules a timer"
);
assert!(
!s.try_mark_matching_timer_scheduled(),
"a burst of further kickoffs must not schedule more timers"
);
});

let status = crate::process_pending_orders(&mock_runtime_for(Principal::anonymous()));

assert_eq!(status, ExecutionStatus::AlreadyRunning);
// The timer fires and clears the flag, allowing the next kickoff
// (or the `MoreWork` self-reschedule) to arm a fresh timer.
s.clear_matching_timer_scheduled();
assert!(
s.try_mark_matching_timer_scheduled(),
"a kickoff after the timer fired schedules again"
);
});
}
}

Expand Down
Loading
Loading