diff --git a/src/event/event_windows.c b/src/event/event_windows.c index 94674a3bf..31613ca00 100644 --- a/src/event/event_windows.c +++ b/src/event/event_windows.c @@ -277,6 +277,9 @@ _dispatch_pipe_monitor_thread(void *context) char cBuffer[1]; DWORD dwNumberOfBytesTransferred; OVERLAPPED ov = {0}; + // Block on a 0-byte read; this will only resume when data is + // available in the pipe. The pipe must be PIPE_WAIT or this thread + // will spin. BOOL bSuccess = ReadFile(hPipe, cBuffer, /* nNumberOfBytesToRead */ 0, &dwNumberOfBytesTransferred, &ov); DWORD dwBytesAvailable; diff --git a/src/io.c b/src/io.c index e31e28c82..0efb849ea 100644 --- a/src/io.c +++ b/src/io.c @@ -1437,20 +1437,20 @@ _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd, uintptr_t hash) int result = ioctlsocket((SOCKET)fd, (long)FIONBIO, &value); (void)dispatch_assume_zero(result); } else { - // Try to make writing nonblocking, although pipes not coming - // from Foundation.Pipe may not have FILE_WRITE_ATTRIBUTES. + // The _dispatch_pipe_monitor_thread expects pipes to be + // PIPE_WAIT and exploits this assumption by using a blocking + // 0-byte read as a synchronization mechanism. DWORD dwPipeMode = 0; if (GetNamedPipeHandleState((HANDLE)fd, &dwPipeMode, NULL, - NULL, NULL, NULL, 0) && !(dwPipeMode & PIPE_NOWAIT)) { - dwPipeMode |= PIPE_NOWAIT; + NULL, NULL, NULL, 0) && !(dwPipeMode & PIPE_WAIT)) { + dwPipeMode |= PIPE_WAIT; if (!SetNamedPipeHandleState((HANDLE)fd, &dwPipeMode, NULL, NULL)) { - // We may end up blocking on subsequent writes, but we - // don't have a good alternative. - // The WriteQuotaAvailable from NtQueryInformationFile - // erroneously returns 0 when there is a blocking read - // on the other end of the pipe. - _dispatch_fd_entry_debug("failed to set PIPE_NOWAIT", + // If setting the pipe to PIPE_WAIT fails, the + // monitoring thread will spin constantly, saturating + // a core, which is undesirable but non-fatal. + // The semantics will still be correct in this case. + _dispatch_fd_entry_debug("failed to set PIPE_WAIT", fd_entry); } } @@ -2518,13 +2518,40 @@ _dispatch_operation_perform(dispatch_operation_t op) NTSTATUS status = _dispatch_NtQueryInformationFile(hFile, &iosb, &fpli, sizeof(fpli), FilePipeLocalInformation); if (NT_SUCCESS(status)) { - // WriteQuotaAvailable is unreliable in the presence - // of a blocking reader, when it can return zero, so only - // account for it otherwise - if (fpli.WriteQuotaAvailable > 0) { - len = MIN(len, fpli.WriteQuotaAvailable); + // WriteQuotaAvailable is the free space in the output buffer + // that has not already been reserved for reading. In other words, + // WriteQuotaAvailable = + // OutboundQuota - WriteQuotaUsed - QueuedReadSize. + // It is not documented that QueuedReadSize is part of this + // calculation, but this behavior has been observed experimentally. + // Unfortunately, this means that it is not possible to distinguish + // between a full output buffer and a reader blocked waiting for a + // full buffer's worth of data. This is a problem because if the + // output buffer is full and no reader is waiting for data, then + // attempting to write to the buffer of a PIPE_WAIT, non- + // overlapped I/O pipe will block the dispatch queue thread. + // + // In order to work around this idiosyncrasy, we bound the size of + // the write to be OutboundQuota - 1. This affords us a sentinel value + // in WriteQuotaAvailable that can be used to detect if a reader is + // making progress or not. + // WriteQuotaAvailable = 0 => a reader is blocked waiting for data. + // WriteQuotaAvailable = 1 => the pipe has been written to, but no + // reader is making progress. + // When we detect that WriteQuotaAvailable == 1, we write 0 bytes to + // avoid blocking the dispatch queue thread. + if (fpli.WriteQuotaAvailable == 0) { + // This condition can only occur when we have a reader blocked + // waiting for data on the pipe. In this case, write a full + // buffer's worth of data (less one byte to preserve this + // sentinel value of WriteQuotaAvailable == 0). + len = MIN(len, fpli.OutboundQuota - 1); + } else { + // Subtract 1 from WriteQuotaAvailable to ensure we do not fill + // the pipe and preserve the sentinel value of + // WriteQuotaAvailable == 1. + len = MIN(len, fpli.WriteQuotaAvailable - 1); } - len = MIN(len, fpli.OutboundQuota); } OVERLAPPED ovlOverlapped = {}; diff --git a/tests/dispatch_io_pipe.c b/tests/dispatch_io_pipe.c index f94438483..50ddc8a87 100644 --- a/tests/dispatch_io_pipe.c +++ b/tests/dispatch_io_pipe.c @@ -404,7 +404,12 @@ test_dispatch_write(int kind, int delay) dispatch_group_t g = dispatch_group_create(); dispatch_group_enter(g); - const size_t bufsize = test_get_pipe_buffer_size(kind); + // The libdispatch implementation writes at most bufsize-1 bytes + // before requiring a reader to start making progress. Because + // these tests operate serially, the reader will not make progress + // until the write finishes, and a write of >= bufsize will not + // finish until the reader starts draining the pipe. + const size_t bufsize = test_get_pipe_buffer_size(kind) - 1; char *buf = calloc(bufsize, 1); assert(buf);