Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions lib/runtime/src/utils/tasks/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,15 +493,25 @@ impl<T> TaskHandle<T> {
/// This token is a child of the tracker's cancellation token and can be used
/// to cancel just this individual task without affecting other tasks.
///
/// Note: For regular tasks spawned with `spawn()`, the cancellation token is created
/// but the task must explicitly check it to respond to cancellation. Use `spawn_cancellable()`
/// for tasks that need automatic cancellation support.
///
/// # Example
/// ```rust
/// # use dynamo_runtime::utils::tasks::tracker::*;
/// # #[tokio::main]
/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// # let tracker = TaskTracker::new(UnlimitedScheduler::new(), LogOnlyPolicy::new())?;
/// let handle = tracker.spawn(async {
/// tokio::time::sleep(std::time::Duration::from_secs(10)).await;
/// Ok("completed")
/// let handle = tracker.spawn_cancellable(|cancel_token| async move {
/// tokio::select! {
/// _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
/// CancellableTaskResult::Ok("completed")
/// }
/// _ = cancel_token.cancelled() => {
/// CancellableTaskResult::Cancelled
/// }
/// }
/// });
///
/// // Cancel this specific task
Expand Down Expand Up @@ -2708,11 +2718,11 @@ impl TaskTrackerInner {
debug!("Executing task with acquired resources");
match &mut current_executable {
CurrentExecutable::TaskExecutor(executor) => {
executor.execute(inner.cancel_token.child_token()).await
executor.execute(task_cancellation_token.clone()).await
}
CurrentExecutable::Continuation(continuation) => {
// Execute continuation and handle type erasure
match continuation.execute(inner.cancel_token.child_token()).await {
match continuation.execute(task_cancellation_token.clone()).await {
TaskExecutionResult::Success(result) => {
// Try to downcast the result to the expected type T
if let Ok(typed_result) = result.downcast::<T>() {
Expand Down
Loading