Skip to content

Switch Win32 pipes to PIPE_WAIT with sentinel bufsize #854

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/event/event_windows.c
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,9 @@ _dispatch_pipe_monitor_thread(void *context)
char cBuffer[1];
DWORD dwNumberOfBytesTransferred;
OVERLAPPED ov = {0};
// Block on a 0-byte read; this will only resume when data is
// available in the pipe. The pipe must be PIPE_WAIT or this thread
// will spin.
BOOL bSuccess = ReadFile(hPipe, cBuffer, /* nNumberOfBytesToRead */ 0,
&dwNumberOfBytesTransferred, &ov);
DWORD dwBytesAvailable;
Expand Down
59 changes: 43 additions & 16 deletions src/io.c
Original file line number Diff line number Diff line change
Expand Up @@ -1437,20 +1437,20 @@ _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd, uintptr_t hash)
int result = ioctlsocket((SOCKET)fd, (long)FIONBIO, &value);
(void)dispatch_assume_zero(result);
} else {
// Try to make writing nonblocking, although pipes not coming
// from Foundation.Pipe may not have FILE_WRITE_ATTRIBUTES.
// The _dispatch_pipe_monitor_thread expects pipes to be
// PIPE_WAIT and exploits this assumption by using a blocking
// 0-byte read as a synchronization mechanism.
DWORD dwPipeMode = 0;
if (GetNamedPipeHandleState((HANDLE)fd, &dwPipeMode, NULL,
NULL, NULL, NULL, 0) && !(dwPipeMode & PIPE_NOWAIT)) {
dwPipeMode |= PIPE_NOWAIT;
NULL, NULL, NULL, 0) && !(dwPipeMode & PIPE_WAIT)) {
dwPipeMode |= PIPE_WAIT;
if (!SetNamedPipeHandleState((HANDLE)fd, &dwPipeMode,
NULL, NULL)) {
// We may end up blocking on subsequent writes, but we
// don't have a good alternative.
// The WriteQuotaAvailable from NtQueryInformationFile
// erroneously returns 0 when there is a blocking read
// on the other end of the pipe.
_dispatch_fd_entry_debug("failed to set PIPE_NOWAIT",
// If setting the pipe to PIPE_WAIT fails, the
// monitoring thread will spin constantly, saturating
// a core, which is undesirable but non-fatal.
// The semantics will still be correct in this case.
_dispatch_fd_entry_debug("failed to set PIPE_WAIT",
fd_entry);
}
}
Expand Down Expand Up @@ -2518,13 +2518,40 @@ _dispatch_operation_perform(dispatch_operation_t op)
NTSTATUS status = _dispatch_NtQueryInformationFile(hFile,
&iosb, &fpli, sizeof(fpli), FilePipeLocalInformation);
if (NT_SUCCESS(status)) {
// WriteQuotaAvailable is unreliable in the presence
// of a blocking reader, when it can return zero, so only
// account for it otherwise
if (fpli.WriteQuotaAvailable > 0) {
len = MIN(len, fpli.WriteQuotaAvailable);
// WriteQuotaAvailable is the free space in the output buffer
// that has not already been reserved for reading. In other words,
// WriteQuotaAvailable =
// OutboundQuota - WriteQuotaUsed - QueuedReadSize.
// It is not documented that QueuedReadSize is part of this
// calculation, but this behavior has been observed experimentally.
// Unfortunately, this means that it is not possible to distinguish
// between a full output buffer and a reader blocked waiting for a
// full buffer's worth of data. This is a problem because if the
// output buffer is full and no reader is waiting for data, then
// attempting to write to the buffer of a PIPE_WAIT, non-
// overlapped I/O pipe will block the dispatch queue thread.
//
// In order to work around this idiosyncrasy, we bound the size of
// the write to be OutboundQuota - 1. This affords us a sentinel value
// in WriteQuotaAvailable that can be used to detect if a reader is
// making progress or not.
// WriteQuotaAvailable = 0 => a reader is blocked waiting for data.
// WriteQuotaAvailable = 1 => the pipe has been written to, but no
// reader is making progress.
// When we detect that WriteQuotaAvailable == 1, we write 0 bytes to
// avoid blocking the dispatch queue thread.
if (fpli.WriteQuotaAvailable == 0) {
// This condition can only occur when we have a reader blocked
// waiting for data on the pipe. In this case, write a full
// buffer's worth of data (less one byte to preserve this
// sentinel value of WriteQuotaAvailable == 0).
len = MIN(len, fpli.OutboundQuota - 1);
} else {
// Subtract 1 from WriteQuotaAvailable to ensure we do not fill
// the pipe and preserve the sentinel value of
// WriteQuotaAvailable == 1.
len = MIN(len, fpli.WriteQuotaAvailable - 1);
}
len = MIN(len, fpli.OutboundQuota);
}

OVERLAPPED ovlOverlapped = {};
Expand Down
7 changes: 6 additions & 1 deletion tests/dispatch_io_pipe.c
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,12 @@ test_dispatch_write(int kind, int delay)
dispatch_group_t g = dispatch_group_create();
dispatch_group_enter(g);

const size_t bufsize = test_get_pipe_buffer_size(kind);
// The libdispatch implementation writes at most bufsize-1 bytes
// before requiring a reader to start making progress. Because
// these tests operate serially, the reader will not make progress
// until the write finishes, and a write of >= bufsize will not
// finish until the reader starts draining the pipe.
const size_t bufsize = test_get_pipe_buffer_size(kind) - 1;

char *buf = calloc(bufsize, 1);
assert(buf);
Expand Down