Skip to content

Commit cdad331

Browse files
committed
Added observer to TickedAsyncExecutor
- Contains 4 TaskStates -- Spawn -- Wake -- Tick -- Drop
1 parent b2af6f5 commit cdad331

File tree

2 files changed

+67
-43
lines changed

2 files changed

+67
-43
lines changed

src/task_identifier.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::sync::Arc;
22

33
/// Cheaply clonable TaskIdentifier
4-
#[derive(Clone)]
4+
#[derive(Debug, Clone)]
55
pub enum TaskIdentifier {
66
Literal(&'static str),
77
Arc(Arc<String>),

src/ticked_async_executor.rs

Lines changed: 66 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -8,48 +8,65 @@ use std::{
88

99
use async_task::{Runnable, Task};
1010

11-
use crate::DroppableFuture;
11+
use crate::{DroppableFuture, TaskIdentifier};
12+
13+
#[derive(Debug)]
14+
pub enum TaskState {
15+
Spawn(TaskIdentifier),
16+
Wake(TaskIdentifier),
17+
Tick(TaskIdentifier),
18+
Drop(TaskIdentifier),
19+
}
20+
21+
type Payload = (TaskIdentifier, Runnable);
1222

13-
pub struct TickedAsyncExecutor {
14-
channel: (mpsc::Sender<Runnable>, mpsc::Receiver<Runnable>),
23+
pub struct TickedAsyncExecutor<O> {
24+
channel: (mpsc::Sender<Payload>, mpsc::Receiver<Payload>),
1525
num_woken_tasks: Arc<AtomicUsize>,
1626
num_spawned_tasks: Arc<AtomicUsize>,
27+
observer: O,
1728
}
1829

19-
impl Default for TickedAsyncExecutor {
20-
fn default() -> Self {
21-
Self::new()
22-
}
23-
}
24-
25-
// TODO, Observer: Task spawn/wake/drop events
26-
// TODO, Task Identifier String
27-
impl TickedAsyncExecutor {
28-
pub fn new() -> Self {
30+
impl<O> TickedAsyncExecutor<O>
31+
where
32+
O: Fn(TaskState) + Clone + Send + Sync + 'static,
33+
{
34+
pub fn new(observer: O) -> Self {
2935
Self {
3036
channel: mpsc::channel(),
3137
num_woken_tasks: Arc::new(AtomicUsize::new(0)),
3238
num_spawned_tasks: Arc::new(AtomicUsize::new(0)),
39+
observer,
3340
}
3441
}
3542

36-
pub fn spawn<T>(&self, future: impl Future<Output = T> + Send + 'static) -> Task<T>
43+
pub fn spawn<T>(
44+
&self,
45+
identifier: impl Into<TaskIdentifier>,
46+
future: impl Future<Output = T> + Send + 'static,
47+
) -> Task<T>
3748
where
3849
T: Send + 'static,
3950
{
40-
let future = self.droppable_future(future);
41-
let schedule = self.runnable_schedule_cb();
51+
let identifier = identifier.into();
52+
let future = self.droppable_future(identifier.clone(), future);
53+
let schedule = self.runnable_schedule_cb(identifier);
4254
let (runnable, task) = async_task::spawn(future, schedule);
4355
runnable.schedule();
4456
task
4557
}
4658

47-
pub fn spawn_local<T>(&self, future: impl Future<Output = T> + 'static) -> Task<T>
59+
pub fn spawn_local<T>(
60+
&self,
61+
identifier: impl Into<TaskIdentifier>,
62+
future: impl Future<Output = T> + 'static,
63+
) -> Task<T>
4864
where
4965
T: 'static,
5066
{
51-
let future = self.droppable_future(future);
52-
let schedule = self.runnable_schedule_cb();
67+
let identifier = identifier.into();
68+
let future = self.droppable_future(identifier.clone(), future);
69+
let schedule = self.runnable_schedule_cb(identifier);
5370
let (runnable, task) = async_task::spawn_local(future, schedule);
5471
runnable.schedule();
5572
task
@@ -68,91 +85,98 @@ impl TickedAsyncExecutor {
6885
.1
6986
.try_iter()
7087
.take(num_woken_tasks)
71-
.for_each(|runnable| {
88+
.for_each(|(identifier, runnable)| {
89+
(self.observer)(TaskState::Tick(identifier));
7290
runnable.run();
7391
});
7492
self.num_woken_tasks
7593
.fetch_sub(num_woken_tasks, Ordering::Relaxed);
7694
}
7795

78-
fn droppable_future<F>(&self, future: F) -> DroppableFuture<F, impl Fn()>
96+
fn droppable_future<F>(
97+
&self,
98+
identifier: TaskIdentifier,
99+
future: F,
100+
) -> DroppableFuture<F, impl Fn()>
79101
where
80102
F: Future,
81103
{
104+
let observer = self.observer.clone();
105+
106+
// Spawn Task
82107
self.num_spawned_tasks.fetch_add(1, Ordering::Relaxed);
108+
observer(TaskState::Spawn(identifier.clone()));
109+
110+
// Droppable Future registering on_drop callback
83111
let num_spawned_tasks = self.num_spawned_tasks.clone();
84112
DroppableFuture::new(future, move || {
85113
num_spawned_tasks.fetch_sub(1, Ordering::Relaxed);
114+
observer(TaskState::Drop(identifier.clone()));
86115
})
87116
}
88117

89-
fn runnable_schedule_cb(&self) -> impl Fn(Runnable) {
118+
fn runnable_schedule_cb(&self, identifier: TaskIdentifier) -> impl Fn(Runnable) {
90119
let sender = self.channel.0.clone();
91120
let num_woken_tasks = self.num_woken_tasks.clone();
121+
let observer = self.observer.clone();
92122
move |runnable| {
93-
sender.send(runnable).unwrap_or(());
123+
sender.send((identifier.clone(), runnable)).unwrap_or(());
94124
num_woken_tasks.fetch_add(1, Ordering::Relaxed);
125+
observer(TaskState::Wake(identifier.clone()));
95126
}
96127
}
97128
}
98129

99130
#[cfg(test)]
100131
mod tests {
132+
use tokio::join;
133+
101134
use super::*;
102135

103136
#[test]
104137
fn test_multiple_tasks() {
105-
let executor = TickedAsyncExecutor::new();
138+
let executor = TickedAsyncExecutor::new(|_state| {});
106139
executor
107-
.spawn_local(async move {
108-
println!("A: Start");
140+
.spawn_local("A", async move {
109141
tokio::task::yield_now().await;
110-
println!("A: End");
111142
})
112143
.detach();
113144

114145
executor
115-
.spawn_local(async move {
116-
println!("B: Start");
146+
.spawn_local(format!("B"), async move {
117147
tokio::task::yield_now().await;
118-
println!("B: End");
119148
})
120149
.detach();
121150

122-
// A, B, C: Start
123151
executor.tick();
124152
assert_eq!(executor.num_tasks(), 2);
125153

126-
// A, B, C: End
127154
executor.tick();
128155
assert_eq!(executor.num_tasks(), 0);
129156
}
130157

131158
#[test]
132159
fn test_task_cancellation() {
133-
let executor = TickedAsyncExecutor::new();
134-
let task1 = executor.spawn_local(async move {
160+
let executor = TickedAsyncExecutor::new(|_state| println!("{_state:?}"));
161+
let task1 = executor.spawn_local("A", async move {
135162
loop {
136-
println!("A: Start");
137163
tokio::task::yield_now().await;
138-
println!("A: End");
139164
}
140165
});
141166

142-
let task2 = executor.spawn_local(async move {
167+
let task2 = executor.spawn_local(format!("B"), async move {
143168
loop {
144-
println!("B: Start");
145169
tokio::task::yield_now().await;
146-
println!("B: End");
147170
}
148171
});
149172
assert_eq!(executor.num_tasks(), 2);
150173
executor.tick();
151174

152175
executor
153-
.spawn_local(async move {
154-
task1.cancel().await;
155-
task2.cancel().await;
176+
.spawn_local("CancelTasks", async move {
177+
let (t1, t2) = join!(task1.cancel(), task2.cancel());
178+
assert_eq!(t1, None);
179+
assert_eq!(t2, None);
156180
})
157181
.detach();
158182
assert_eq!(executor.num_tasks(), 3);

0 commit comments

Comments
 (0)