@@ -63,7 +63,9 @@ impl<J: 'static + AsyncJob, T: Copy + Send + 'static>
63
63
}
64
64
}
65
65
66
- /// spawns `task` if nothing is running currently, otherwise schedules as `next` overwriting if `next` was set before
66
+ /// spawns `task` if nothing is running currently,
67
+ /// otherwise schedules as `next` overwriting if `next` was set before.
68
+ /// return `true` if the new task gets started right away.
67
69
pub fn spawn ( & mut self , task : J ) -> bool {
68
70
self . schedule_next ( task) ;
69
71
self . check_for_job ( )
@@ -129,23 +131,35 @@ mod test {
129
131
use crossbeam_channel:: unbounded;
130
132
use pretty_assertions:: assert_eq;
131
133
use std:: {
132
- sync:: atomic:: AtomicU32 , thread:: sleep, time:: Duration ,
134
+ sync:: atomic:: { AtomicBool , AtomicU32 , Ordering } ,
135
+ thread,
136
+ time:: Duration ,
133
137
} ;
134
138
135
139
#[ derive( Clone ) ]
136
140
struct TestJob {
137
141
v : Arc < AtomicU32 > ,
142
+ finish : Arc < AtomicBool > ,
138
143
value_to_add : u32 ,
139
144
}
140
145
141
146
impl AsyncJob for TestJob {
142
147
fn run ( & mut self ) {
143
- sleep ( Duration :: from_millis ( 100 ) ) ;
148
+ self . finish . store ( false , Ordering :: Relaxed ) ;
144
149
145
- self . v . fetch_add (
146
- self . value_to_add ,
147
- std:: sync:: atomic:: Ordering :: Relaxed ,
148
- ) ;
150
+ println ! ( "[job] wait" ) ;
151
+
152
+ while self . finish . load ( Ordering :: Relaxed ) {
153
+ std:: thread:: yield_now ( ) ;
154
+ }
155
+
156
+ println ! ( "[job] sleep" ) ;
157
+
158
+ thread:: sleep ( Duration :: from_millis ( 100 ) ) ;
159
+
160
+ println ! ( "[job] done sleeping" ) ;
161
+
162
+ self . v . fetch_add ( self . value_to_add , Ordering :: Relaxed ) ;
149
163
}
150
164
}
151
165
@@ -160,15 +174,20 @@ mod test {
160
174
161
175
let task = TestJob {
162
176
v : Arc :: new ( AtomicU32 :: new ( 1 ) ) ,
177
+ finish : Arc :: new ( AtomicBool :: new ( false ) ) ,
163
178
value_to_add : 1 ,
164
179
} ;
165
180
166
181
assert ! ( job. spawn( task. clone( ) ) ) ;
167
- sleep ( Duration :: from_millis ( 1 ) ) ;
182
+ thread:: sleep ( Duration :: from_millis ( 10 ) ) ;
183
+
168
184
for _ in 0 ..5 {
185
+ println ! ( "spawn" ) ;
169
186
assert ! ( !job. spawn( task. clone( ) ) ) ;
170
187
}
171
188
189
+ task. finish . store ( true , Ordering :: Relaxed ) ;
190
+
172
191
let _foo = receiver. recv ( ) . unwrap ( ) ;
173
192
let _foo = receiver. recv ( ) . unwrap ( ) ;
174
193
assert ! ( receiver. is_empty( ) ) ;
@@ -179,6 +198,12 @@ mod test {
179
198
) ;
180
199
}
181
200
201
+ fn wait_for_job ( job : & AsyncSingleJob < TestJob , Notificaton > ) {
202
+ while job. is_pending ( ) {
203
+ thread:: sleep ( Duration :: from_millis ( 10 ) ) ;
204
+ }
205
+ }
206
+
182
207
#[ test]
183
208
fn test_cancel ( ) {
184
209
let ( sender, receiver) = unbounded ( ) ;
@@ -188,17 +213,26 @@ mod test {
188
213
189
214
let task = TestJob {
190
215
v : Arc :: new ( AtomicU32 :: new ( 1 ) ) ,
216
+ finish : Arc :: new ( AtomicBool :: new ( false ) ) ,
191
217
value_to_add : 1 ,
192
218
} ;
193
219
194
220
assert ! ( job. spawn( task. clone( ) ) ) ;
195
- sleep ( Duration :: from_millis ( 1 ) ) ;
221
+ task. finish . store ( true , Ordering :: Relaxed ) ;
222
+ thread:: sleep ( Duration :: from_millis ( 10 ) ) ;
196
223
197
224
for _ in 0 ..5 {
225
+ println ! ( "spawn" ) ;
198
226
assert ! ( !job. spawn( task. clone( ) ) ) ;
199
227
}
228
+
229
+ println ! ( "cancel" ) ;
200
230
assert ! ( job. cancel( ) ) ;
201
231
232
+ task. finish . store ( true , Ordering :: Relaxed ) ;
233
+
234
+ wait_for_job ( & job) ;
235
+
202
236
let _foo = receiver. recv ( ) . unwrap ( ) ;
203
237
204
238
assert_eq ! (
0 commit comments