Skip to content

feat: buffered IO implementation #386

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions compio-driver/src/buffer_pool/fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ impl BufferPool {
}

/// Select an [`OwnedBuffer`] when the op creates.
pub(crate) fn get_buffer(&self, len: usize) -> io::Result<OwnedBuffer> {
#[doc(hidden)]
pub fn get_buffer(&self, len: usize) -> io::Result<OwnedBuffer> {
let buffer = self
.inner
.buffers
Expand All @@ -78,15 +79,17 @@ impl BufferPool {

/// ## Safety
/// * `len` should be valid.
pub(crate) unsafe fn create_proxy(&self, mut slice: OwnedBuffer, len: usize) -> BorrowedBuffer {
#[doc(hidden)]
pub unsafe fn create_proxy(&self, mut slice: OwnedBuffer, len: usize) -> BorrowedBuffer {
unsafe {
slice.set_buf_init(len);
}
BorrowedBuffer::new(slice.into_inner(), self)
}
}

pub(crate) struct OwnedBuffer {
#[doc(hidden)]
pub struct OwnedBuffer {
buffer: ManuallyDrop<Slice<Vec<u8>>>,
pool: ManuallyDrop<Rc<BufferPoolInner>>,
}
Expand Down
3 changes: 2 additions & 1 deletion compio-driver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,8 @@ impl Proactor {
///
/// # Safety
///
/// Caller must make sure to release the buffer pool with the correct driver, i.e., the one they created the buffer pool with.
/// Caller must make sure to release the buffer pool with the correct
/// driver, i.e., the one they created the buffer pool with.
pub unsafe fn release_buffer_pool(&mut self, buffer_pool: BufferPool) -> io::Result<()> {
self.driver.release_buffer_pool(buffer_pool)
}
Expand Down
25 changes: 23 additions & 2 deletions compio-driver/src/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! The operation itself doesn't perform anything.
//! You need to pass them to [`crate::Proactor`], and poll the driver.

use std::{marker::PhantomPinned, mem::ManuallyDrop, net::Shutdown};
use std::{io, marker::PhantomPinned, mem::ManuallyDrop, net::Shutdown};

use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, SetBufInit};
use socket2::SockAddr;
Expand All @@ -23,7 +23,7 @@ pub use crate::sys::op::{
#[cfg(buf_ring)]
pub use crate::sys::op::{ReadManagedAt, RecvManaged};
use crate::{
OwnedFd, SharedFd,
OwnedFd, SharedFd, TakeBuffer,
sys::{sockaddr_storage, socklen_t},
};

Expand Down Expand Up @@ -99,6 +99,27 @@ impl<T> RecvResultExt for BufResult<usize, (T, sockaddr_storage, socklen_t, usiz
}
}

/// Helper trait for [`ReadManagedAt`] and [`RecvManaged`].
pub trait ResultTakeBuffer {
/// The buffer pool of the op.
type BufferPool;
/// The buffer type of the op.
type Buffer<'a>;

/// Take the buffer from result.
fn take_buffer(self, pool: &Self::BufferPool) -> io::Result<Self::Buffer<'_>>;
}

impl<T: TakeBuffer> ResultTakeBuffer for (BufResult<usize, T>, u32) {
type Buffer<'a> = T::Buffer<'a>;
type BufferPool = T::BufferPool;

fn take_buffer(self, pool: &Self::BufferPool) -> io::Result<Self::Buffer<'_>> {
let (BufResult(result, op), flags) = self;
op.take_buffer(pool, result, flags)
}
}

/// Spawn a blocking function in the thread pool.
pub struct Asyncify<F, D> {
pub(crate) f: Option<F>,
Expand Down
37 changes: 34 additions & 3 deletions compio-fs/src/async_fd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ use std::os::windows::io::{
use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut};
use compio_driver::{
AsRawFd, SharedFd, ToSharedFd,
op::{BufResultExt, Recv, Send},
op::{BufResultExt, Recv, RecvManaged, ResultTakeBuffer, Send},
};
use compio_io::{AsyncRead, AsyncWrite};
use compio_runtime::Attacher;
use compio_io::{AsyncRead, AsyncReadManaged, AsyncWrite};
use compio_runtime::{Attacher, BorrowedBuffer, BufferPool};
#[cfg(unix)]
use {
compio_buf::{IoVectoredBuf, IoVectoredBufMut},
Expand Down Expand Up @@ -59,6 +59,37 @@ impl<T: AsRawFd + 'static> AsyncRead for AsyncFd<T> {
}
}

impl<T: AsRawFd + 'static> AsyncReadManaged for AsyncFd<T> {
type Buffer<'a> = BorrowedBuffer<'a>;
type BufferPool = BufferPool;

async fn read_managed<'a>(
&mut self,
buffer_pool: &'a Self::BufferPool,
len: usize,
) -> io::Result<Self::Buffer<'a>> {
(&*self).read_managed(buffer_pool, len).await
}
}

impl<T: AsRawFd + 'static> AsyncReadManaged for &AsyncFd<T> {
type Buffer<'a> = BorrowedBuffer<'a>;
type BufferPool = BufferPool;

async fn read_managed<'a>(
&mut self,
buffer_pool: &'a Self::BufferPool,
len: usize,
) -> io::Result<Self::Buffer<'a>> {
let fd = self.to_shared_fd();
let buffer_pool = buffer_pool.try_inner()?;
let op = RecvManaged::new(fd, buffer_pool, len)?;
compio_runtime::submit_with_flags(op)
.await
.take_buffer(buffer_pool)
}
}

impl<T: AsRawFd + 'static> AsyncRead for &AsyncFd<T> {
async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
let fd = self.inner.to_shared_fd();
Expand Down
25 changes: 22 additions & 3 deletions compio-fs/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut};
use compio_driver::op::FileStat;
use compio_driver::{
ToSharedFd, impl_raw_fd,
op::{BufResultExt, CloseFile, ReadAt, Sync, WriteAt},
op::{BufResultExt, CloseFile, ReadAt, ReadManagedAt, ResultTakeBuffer, Sync, WriteAt},
};
use compio_io::{AsyncReadAt, AsyncWriteAt};
use compio_runtime::Attacher;
use compio_io::{AsyncReadAt, AsyncReadManagedAt, AsyncWriteAt};
use compio_runtime::{Attacher, BorrowedBuffer, BufferPool};
#[cfg(all(unix, not(solarish)))]
use {
compio_buf::{IoVectoredBuf, IoVectoredBufMut},
Expand Down Expand Up @@ -169,6 +169,25 @@ impl AsyncReadAt for File {
}
}

impl AsyncReadManagedAt for File {
type Buffer<'a> = BorrowedBuffer<'a>;
type BufferPool = BufferPool;

async fn read_managed_at<'a>(
&self,
pos: u64,
buffer_pool: &'a Self::BufferPool,
len: usize,
) -> io::Result<Self::Buffer<'a>> {
let fd = self.inner.to_shared_fd();
let buffer_pool = buffer_pool.try_inner()?;
let op = ReadManagedAt::new(fd, pos, buffer_pool, len)?;
compio_runtime::submit_with_flags(op)
.await
.take_buffer(buffer_pool)
}
}

impl AsyncWriteAt for File {
#[inline]
async fn write_at<T: IoBuf>(&mut self, buf: T, pos: u64) -> BufResult<usize, T> {
Expand Down
59 changes: 58 additions & 1 deletion compio-fs/src/named_pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ use std::{ffi::OsStr, io, os::windows::io::FromRawHandle, ptr::null};

use compio_buf::{BufResult, IoBuf, IoBufMut};
use compio_driver::{AsRawFd, RawFd, ToSharedFd, impl_raw_fd, op::ConnectNamedPipe, syscall};
use compio_io::{AsyncRead, AsyncReadAt, AsyncWrite, AsyncWriteAt};
use compio_io::{
AsyncRead, AsyncReadAt, AsyncReadManaged, AsyncReadManagedAt, AsyncWrite, AsyncWriteAt,
};
use compio_runtime::{BorrowedBuffer, BufferPool};
use widestring::U16CString;
use windows_sys::Win32::{
Storage::FileSystem::{
Expand Down Expand Up @@ -192,6 +195,33 @@ impl AsyncRead for &NamedPipeServer {
}
}

impl AsyncReadManaged for NamedPipeServer {
type Buffer<'a> = BorrowedBuffer<'a>;
type BufferPool = BufferPool;

async fn read_managed<'a>(
&mut self,
buffer_pool: &'a Self::BufferPool,
len: usize,
) -> io::Result<Self::Buffer<'a>> {
(&*self).read_managed(buffer_pool, len).await
}
}

impl AsyncReadManaged for &NamedPipeServer {
type Buffer<'a> = BorrowedBuffer<'a>;
type BufferPool = BufferPool;

async fn read_managed<'a>(
&mut self,
buffer_pool: &'a Self::BufferPool,
len: usize,
) -> io::Result<Self::Buffer<'a>> {
// The position is ignored.
(&self.handle).read_managed(buffer_pool, len).await
}
}

impl AsyncWrite for NamedPipeServer {
#[inline]
async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
Expand Down Expand Up @@ -312,6 +342,33 @@ impl AsyncRead for &NamedPipeClient {
}
}

impl AsyncReadManaged for NamedPipeClient {
type Buffer<'a> = BorrowedBuffer<'a>;
type BufferPool = BufferPool;

async fn read_managed<'a>(
&mut self,
buffer_pool: &'a Self::BufferPool,
len: usize,
) -> io::Result<Self::Buffer<'a>> {
(&*self).read_managed(buffer_pool, len).await
}
}

impl AsyncReadManaged for &NamedPipeClient {
type Buffer<'a> = BorrowedBuffer<'a>;
type BufferPool = BufferPool;

async fn read_managed<'a>(
&mut self,
buffer_pool: &'a Self::BufferPool,
len: usize,
) -> io::Result<Self::Buffer<'a>> {
// The position is ignored.
self.handle.read_managed_at(0, buffer_pool, len).await
}
}

impl AsyncWrite for NamedPipeClient {
#[inline]
async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
Expand Down
36 changes: 34 additions & 2 deletions compio-fs/src/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ use std::{
use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
use compio_driver::{
AsRawFd, ToSharedFd, impl_raw_fd,
op::{BufResultExt, Recv, RecvVectored, Send, SendVectored},
op::{BufResultExt, Recv, RecvManaged, RecvVectored, ResultTakeBuffer, Send, SendVectored},
syscall,
};
use compio_io::{AsyncRead, AsyncWrite};
use compio_io::{AsyncRead, AsyncReadManaged, AsyncWrite};
use compio_runtime::{BorrowedBuffer, BufferPool};

use crate::File;

Expand Down Expand Up @@ -502,6 +503,37 @@ impl AsyncRead for &Receiver {
}
}

impl AsyncReadManaged for Receiver {
type Buffer<'a> = BorrowedBuffer<'a>;
type BufferPool = BufferPool;

async fn read_managed<'a>(
&mut self,
buffer_pool: &'a Self::BufferPool,
len: usize,
) -> io::Result<Self::Buffer<'a>> {
(&*self).read_managed(buffer_pool, len).await
}
}

impl AsyncReadManaged for &Receiver {
type Buffer<'a> = BorrowedBuffer<'a>;
type BufferPool = BufferPool;

async fn read_managed<'a>(
&mut self,
buffer_pool: &'a Self::BufferPool,
len: usize,
) -> io::Result<Self::Buffer<'a>> {
let fd = self.to_shared_fd();
let buffer_pool = buffer_pool.try_inner()?;
let op = RecvManaged::new(fd, buffer_pool, len)?;
compio_runtime::submit_with_flags(op)
.await
.take_buffer(buffer_pool)
}
}

impl_raw_fd!(Receiver, std::fs::File, file, file);

/// Checks if file is a FIFO
Expand Down
29 changes: 28 additions & 1 deletion compio-fs/src/stdio/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use std::io;

use compio_buf::{BufResult, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
use compio_driver::{AsRawFd, RawFd};
use compio_io::{AsyncRead, AsyncWrite};
use compio_io::{AsyncRead, AsyncReadManaged, AsyncWrite};
use compio_runtime::{BorrowedBuffer, BufferPool};

#[cfg(doc)]
use super::{stderr, stdin, stdout};
Expand Down Expand Up @@ -41,6 +42,32 @@ impl AsyncRead for &Stdin {
}
}

impl AsyncReadManaged for Stdin {
type Buffer<'a> = BorrowedBuffer<'a>;
type BufferPool = BufferPool;

async fn read_managed<'a>(
&mut self,
buffer_pool: &'a Self::BufferPool,
len: usize,
) -> io::Result<Self::Buffer<'a>> {
(&*self).read_managed(buffer_pool, len).await
}
}

impl AsyncReadManaged for &Stdin {
type Buffer<'a> = BorrowedBuffer<'a>;
type BufferPool = BufferPool;

async fn read_managed<'a>(
&mut self,
buffer_pool: &'a Self::BufferPool,
len: usize,
) -> io::Result<Self::Buffer<'a>> {
(&self.0).read_managed(buffer_pool, len).await
}
}

impl AsRawFd for Stdin {
fn as_raw_fd(&self) -> RawFd {
self.0.as_raw_fd()
Expand Down
Loading
Loading