|
| 1 | +use std::{io, pin::Pin}; |
| 2 | + |
| 3 | +use bytes::Bytes; |
| 4 | +use tokio::io::{AsyncWrite, AsyncWriteExt}; |
| 5 | + |
| 6 | +use crate::{init_maybeuninit_io_slices_mut, ReusableIoSlices}; |
| 7 | + |
| 8 | +/// * `buffer` - must not contain empty `Bytes`s. |
| 9 | +#[cfg_attr(docsrs, doc(cfg(feature = "bytes")))] |
| 10 | +pub async fn write_all_bytes( |
| 11 | + mut writer: Pin<&mut (dyn AsyncWrite + Send)>, |
| 12 | + buffer: &mut Vec<Bytes>, |
| 13 | + reusable_io_slices: &mut ReusableIoSlices, |
| 14 | +) -> io::Result<()> { |
| 15 | + static EMPTY_BYTES: Bytes = Bytes::from_static(b""); |
| 16 | + |
| 17 | + // `buffer` does not contain any empty `Bytes`s, so: |
| 18 | + // - We can check for `io::ErrorKind::WriteZero` error easily |
| 19 | + // - It won't occupy precise slot in `reusable_io_slices` so that |
| 20 | + // we can group as many non-zero IoSlice in one write. |
| 21 | + // - Avoid conserion from/to `VecDeque` unless necessary, |
| 22 | + // which might allocate. |
| 23 | + // - Simplify the loop below. |
| 24 | + |
| 25 | + if buffer.is_empty() { |
| 26 | + return Ok(()); |
| 27 | + } |
| 28 | + |
| 29 | + let mut start = 0; |
| 30 | + |
| 31 | + // do-while style loop, because on the first iteration |
| 32 | + // start < buffer.len() |
| 33 | + 'outer: loop { |
| 34 | + let uninit_io_slices = reusable_io_slices.get_mut(); |
| 35 | + |
| 36 | + // start < buffer.len() |
| 37 | + // io_slices.is_empty() == false |
| 38 | + let io_slices = init_maybeuninit_io_slices_mut( |
| 39 | + uninit_io_slices, |
| 40 | + buffer[start..].iter().map(|bytes| io::IoSlice::new(bytes)), |
| 41 | + ); |
| 42 | + |
| 43 | + let mut n = writer.write_vectored(io_slices).await?; |
| 44 | + |
| 45 | + if n == 0 { |
| 46 | + return Err(io::Error::from(io::ErrorKind::WriteZero)); |
| 47 | + } |
| 48 | + |
| 49 | + // On first iteration, start < buffer.len() |
| 50 | + while n >= buffer[start].len() { |
| 51 | + n -= buffer[start].len(); |
| 52 | + // Release `Bytes` so that the memory they occupied |
| 53 | + // can be reused in `BytesMut`. |
| 54 | + buffer[start] = EMPTY_BYTES.clone(); |
| 55 | + start += 1; |
| 56 | + |
| 57 | + if start == buffer.len() { |
| 58 | + debug_assert_eq!(n, 0); |
| 59 | + break 'outer; |
| 60 | + } |
| 61 | + } |
| 62 | + |
| 63 | + // start < buffer.len(), |
| 64 | + // n < buffer[start].len() |
| 65 | + buffer[start] = buffer[start].slice(n..); |
| 66 | + } |
| 67 | + |
| 68 | + buffer.clear(); |
| 69 | + |
| 70 | + Ok(()) |
| 71 | +} |
0 commit comments