From a3a0145ad2a17115541078b3753eb23d9f270170 Mon Sep 17 00:00:00 2001 From: Pierre Tachoire Date: Tue, 11 Feb 2025 17:48:17 +0100 Subject: [PATCH] add cancel_one --- io/darwin.zig | 24 ++++++++++++++++++--- io/linux.zig | 60 ++++++++++++++++++++++++++++++++++++++++++++++++++- io/test.zig | 59 ++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 139 insertions(+), 4 deletions(-) diff --git a/io/darwin.zig b/io/darwin.zig index da7aa29..5a36546 100644 --- a/io/darwin.zig +++ b/io/darwin.zig @@ -292,14 +292,32 @@ pub const IO = struct { } } - pub fn cancel(_: *IO, _: *Completion) void { + pub const CancelOneError = error{ NotFound, ExpirationInProgress } || posix.UnexpectedError; + + pub fn cancel_one( + self: *IO, + comptime Context: type, + context: Context, + comptime callback: fn ( + context: Context, + completion: *Completion, + result: CancelOneError!void, + ) void, + completion: *Completion, + cancel_completion: *Completion, + ) void { + _ = self; + _ = context; + _ = callback; + _ = completion; + _ = cancel_completion; // TODO implement cancellation w/ kqueue. - log.debug("cancel implementation is missing on macOS", .{}); + log.debug("cancel_one implementation is missing on macOS", .{}); } pub fn cancel_all(_: *IO) void { // TODO Cancel in-flight async IO and wait for all completions. - log.debug("cancel all implementation is missing on macOS", .{}); + log.debug("cancel_all implementation is missing on macOS", .{}); } pub const AcceptError = posix.AcceptError || posix.SetSockOptError; diff --git a/io/linux.zig b/io/linux.zig index 0ca0f80..44c0205 100644 --- a/io/linux.zig +++ b/io/linux.zig @@ -331,7 +331,7 @@ pub const IO = struct { assert(self.ios_in_kernel == 0); } - pub fn cancel(self: *IO, target: *Completion) void { + fn cancel(self: *IO, target: *Completion) void { self.cancel_completion = .{ .io = self, .context = self, @@ -376,6 +376,42 @@ pub const IO = struct { }; } + pub const CancelOneError = error{ NotFound, ExpirationInProgress } || posix.UnexpectedError; + + pub fn cancel_one( + self: *IO, + comptime Context: type, + context: Context, + comptime callback: fn ( + context: Context, + completion: *Completion, + result: CancelOneError!void, + ) void, + completion: *Completion, + cancel_one_completion: *Completion, + ) void { + completion.* = .{ + .io = self, + .context = context, + .callback = struct { + fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void { + callback( + @as(Context, @ptrFromInt(@intFromPtr(ctx))), + comp, + @as(*const CancelOneError!void, @ptrFromInt(@intFromPtr(res))).*, + ); + } + }.wrapper, + .operation = .{ + .cancel_one = .{ + .c = @intFromPtr(cancel_one_completion), + }, + }, + }; + + self.enqueue(completion); + } + /// This struct holds the data needed for a single io_uring operation pub const Completion = struct { io: *IO, @@ -395,6 +431,9 @@ pub const IO = struct { fn prep(completion: *Completion, sqe: *io_uring_sqe) void { switch (completion.operation) { + .cancel_one => |op| { + sqe.prep_cancel(op.c, 0); + }, .cancel => |op| { sqe.prep_cancel(@intFromPtr(op.target), 0); }, @@ -465,6 +504,22 @@ pub const IO = struct { fn complete(completion: *Completion) void { switch (completion.operation) { + .cancel_one => { + const result: CancelOneError!void = blk: { + if (completion.result < 0) { + const err = switch (@as(posix.E, @enumFromInt(-completion.result))) { + .SUCCESS => {}, + .NOENT => error.NotFound, + .ALREADY => error.ExpirationInProgress, + else => |errno| posix.unexpectedErrno(errno), + }; + break :blk err; + } else { + break :blk; + } + }; + completion.callback(completion.context, completion, &result); + }, .cancel => { const result: CancelError!void = result: { if (completion.result < 0) { @@ -797,6 +852,9 @@ pub const IO = struct { /// This union encodes the set of operations supported as well as their arguments. const Operation = union(enum) { + cancel_one: struct { + c: u64, + }, cancel: struct { target: *Completion, }, diff --git a/io/test.zig b/io/test.zig index f7d87fd..e8b7cb9 100644 --- a/io/test.zig +++ b/io/test.zig @@ -900,3 +900,62 @@ test "cancel_all" { } }.run_test(); } + +test "cancel_one" { + try struct { + const Context = @This(); + + io: IO, + timeout_res: IO.TimeoutError!void = undefined, + timeout_done: bool = false, + cancel_done: bool = false, + + fn run_test() !void { + var self: Context = .{ + .io = try IO.init(32, 0), + }; + defer self.io.deinit(); + + var completion: IO.Completion = undefined; + self.io.timeout( + *Context, + &self, + timeout_callback, + &completion, + 100 * std.time.ns_per_ms, + ); + + var cancel_one_completion: IO.Completion = undefined; + self.io.cancel_one( + *Context, + &self, + cancel_one_callback, + &cancel_one_completion, + &completion, + ); + while (!self.cancel_done and !self.timeout_done) try self.io.tick(); + + try testing.expectEqual(true, self.timeout_done); + try testing.expectEqual(true, self.cancel_done); + try testing.expectError(IO.TimeoutError.Canceled, self.timeout_res); + } + + fn timeout_callback( + self: *Context, + _: *IO.Completion, + result: IO.TimeoutError!void, + ) void { + self.timeout_res = result; + self.timeout_done = true; + } + + fn cancel_one_callback( + self: *Context, + _: *IO.Completion, + result: IO.CancelOneError!void, + ) void { + result catch |err| std.debug.panic("cancel one error: {}", .{err}); + self.cancel_done = true; + } + }.run_test(); +}