diff --git a/rust/src/file.rs b/rust/src/file.rs index bf6e00c2..32db5e87 100644 --- a/rust/src/file.rs +++ b/rust/src/file.rs @@ -264,7 +264,6 @@ impl FileWriter { pub async fn write(&mut self, mut buf: Bytes) -> Result { let bytes_to_write = buf.len(); - // Create a shallow copy of the bytes instance to mutate and track what's been read while !buf.is_empty() { let block_writer = self.get_block_writer().await?; diff --git a/rust/src/hdfs/block_writer.rs b/rust/src/hdfs/block_writer.rs index 7468a10e..0014dc89 100644 --- a/rust/src/hdfs/block_writer.rs +++ b/rust/src/hdfs/block_writer.rs @@ -846,11 +846,12 @@ pub(crate) struct StripedBlockWriter { server_defaults: hdfs::FsServerDefaultsProto, config: Arc, handle: Handle, - block_writers: Vec>, + block_writers: Vec>>, cell_buffer: CellBuffer, bytes_written: usize, capacity: usize, status: hdfs::HdfsFileStatusProto, + data_units: usize, } impl StripedBlockWriter { @@ -876,6 +877,7 @@ impl StripedBlockWriter { bytes_written: 0, capacity: ec_schema.data_units * status.blocksize() as usize, status: status.clone(), + data_units: ec_schema.data_units, } } @@ -883,8 +885,14 @@ impl StripedBlockWriter { self.capacity - self.bytes_written } + fn is_full(&self) -> bool { + self.bytes_remaining() == 0 + } + async fn write_cells(&mut self) -> Result<()> { let mut write_futures = vec![]; + let mut writer_indices = vec![]; + for (index, (data, writer)) in self .cell_buffer .encode() @@ -915,16 +923,39 @@ impl StripedBlockWriter { self.handle.clone(), &self.status, ) - .await?, + .await, ) } + if writer.as_ref().unwrap().is_err() { + continue; + } + let mut data = data.clone(); - write_futures.push(async move { writer.as_mut().unwrap().write(&mut data).await }) + writer_indices.push(index); + + // We know at this point writer is Some(Ok(_)) + let block_writer = writer.as_mut().unwrap().as_mut().unwrap(); + write_futures.push(async move { block_writer.write(&mut data).await }) } - for write in join_all(write_futures).await { - write?; + for (write_idx, write) in join_all(write_futures).await.into_iter().enumerate() { + if let Err(e) = write { + let index = writer_indices[write_idx]; + warn!("Write failed for block index {}: {:?}", index, e); + self.block_writers[index] = Some(Err(e)); + } + } + + let failed_writers = self + .block_writers + .iter() + .filter(|writer| writer.as_ref().is_some_and(|w| w.is_err())) + .count(); + if failed_writers > (self.block_writers.len() - self.data_units) { + return Err(HdfsError::DataTransferError( + "Insufficient blocks succeeded during write".to_string(), + )); } Ok(()) @@ -952,26 +983,42 @@ impl StripedBlockWriter { self.write_cells().await?; } - let close_futures = self - .block_writers - .into_iter() - .filter_map(|mut writer| writer.take()) - .map(|writer| async move { writer.close().await }); + let mut close_futures = vec![]; + let mut writer_indices = vec![]; + let mut succeeded_blocks = 0usize; + + for (index, writer) in self.block_writers.into_iter().enumerate() { + match writer { + Some(Ok(writer)) => { + writer_indices.push(index); + close_futures.push(async move { writer.close().await }); + } + Some(Err(_)) => {} + None => { + // If no data was written to a block, treat it as successful + succeeded_blocks += 1; + } + } + } - for close_result in join_all(close_futures).await { - close_result?; + for (close_idx, close_result) in join_all(close_futures).await.into_iter().enumerate() { + if let Err(e) = close_result { + let index = writer_indices[close_idx]; + warn!("Write failed for block index {}: {:?}", index, e); + } else { + succeeded_blocks += 1; + } } + if succeeded_blocks < self.data_units { + return Err(HdfsError::DataTransferError( + "Insufficient blocks succeeded during write".to_string(), + )); + } let mut extended_block = self.block.b; extended_block.num_bytes = Some(self.bytes_written as u64); Ok(extended_block) } - - fn is_full(&self) -> bool { - self.block_writers - .iter() - .all(|writer| writer.as_ref().is_some_and(|w| w.is_full())) - } } diff --git a/rust/tests/test_write_resiliency.rs b/rust/tests/test_write_resiliency.rs index e3095490..1781ef95 100644 --- a/rust/tests/test_write_resiliency.rs +++ b/rust/tests/test_write_resiliency.rs @@ -250,4 +250,119 @@ mod test { } Ok(()) } + + #[tokio::test] + #[serial] + async fn test_striped_write_failures() -> Result<()> { + let _ = env_logger::builder().is_test(true).try_init(); + + let dfs_features = HashSet::from([DfsFeatures::HA, DfsFeatures::EC]); + let _dfs = MiniDfs::with_features(&dfs_features); + let client = Client::default(); + + // A full stripe is 3 MiB, so write two full stripes to test two failures + let bytes_to_write = 6usize * 1024 * 1024; + + let mut data = BytesMut::with_capacity(bytes_to_write); + for i in 0..(bytes_to_write / 4) { + data.put_i32(i as i32); + } + let data = data.freeze(); + + // Test connection failure before writing data with erasure coding + let file = "/ec-3-2/striped_testfile1"; + let mut writer = client.create(file, WriteOptions::default()).await?; + + WRITE_CONNECTION_FAULT_INJECTOR.store(true, Ordering::SeqCst); + + writer.write(data.clone()).await?; + writer.close().await?; + + let reader = client.read(file).await?; + check_file_content(&reader, data.clone()).await?; + + // Test two connection failures after data has been written + let file = "/ec-3-2/striped_testfile2"; + let mut writer = client.create(file, WriteOptions::default()).await?; + + WRITE_CONNECTION_FAULT_INJECTOR.store(true, Ordering::SeqCst); + + writer.write(data.slice(..bytes_to_write / 2)).await?; + + // Give a little time for the packets to send + tokio::time::sleep(Duration::from_millis(100)).await; + + assert!(!WRITE_CONNECTION_FAULT_INJECTOR.load(Ordering::SeqCst)); + WRITE_CONNECTION_FAULT_INJECTOR.store(true, Ordering::SeqCst); + + writer.write(data.slice(bytes_to_write / 2..)).await?; + writer.close().await?; + + // Give a little time for the packets to send + tokio::time::sleep(Duration::from_millis(100)).await; + + assert!(!WRITE_CONNECTION_FAULT_INJECTOR.load(Ordering::SeqCst)); + + let reader = client.read(file).await?; + check_file_content(&reader, data.clone()).await?; + + // Test failure in from ack status + let file = "/ec-3-2/striped_testfile3"; + let mut writer = client.create(file, WriteOptions::default()).await?; + + *WRITE_REPLY_FAULT_INJECTOR.lock().unwrap() = Some(2); + + writer.write(data.clone()).await?; + writer.close().await?; + + let reader = client.read(file).await?; + check_file_content(&reader, data.clone()).await?; + + // Test failure in from ack status after some data has been written + let file = "/ec-3-2/striped_testfile4"; + let mut writer = client.create(file, WriteOptions::default()).await?; + + writer.write(data.slice(..bytes_to_write / 2)).await?; + + // Give a little time for the packets to send + tokio::time::sleep(Duration::from_millis(100)).await; + + *WRITE_REPLY_FAULT_INJECTOR.lock().unwrap() = Some(2); + + writer.write(data.slice(bytes_to_write / 2..)).await?; + writer.close().await?; + + let reader = client.read(file).await?; + check_file_content(&reader, data.clone()).await?; + + // Test three failures which should cause a write failure + let file = "/ec-3-2/striped_testfile5"; + let mut writer = client.create(file, WriteOptions::default()).await?; + + WRITE_CONNECTION_FAULT_INJECTOR.store(true, Ordering::SeqCst); + writer.write(data.slice(..bytes_to_write / 2)).await?; + tokio::time::sleep(Duration::from_millis(100)).await; + assert!(!WRITE_CONNECTION_FAULT_INJECTOR.load(Ordering::SeqCst)); + + WRITE_CONNECTION_FAULT_INJECTOR.store(true, Ordering::SeqCst); + writer.write(data.slice(..bytes_to_write / 2)).await?; + tokio::time::sleep(Duration::from_millis(100)).await; + assert!(!WRITE_CONNECTION_FAULT_INJECTOR.load(Ordering::SeqCst)); + + // Third write. It won't fail yet because the sending of the data is asynchronous. + // The failure will happen when the client tries to send the next block. + WRITE_CONNECTION_FAULT_INJECTOR.store(true, Ordering::SeqCst); + writer.write(data.slice(..bytes_to_write / 2)).await?; + tokio::time::sleep(Duration::from_millis(100)).await; + assert!(!WRITE_CONNECTION_FAULT_INJECTOR.load(Ordering::SeqCst)); + + assert!( + writer + .write(data.slice(..bytes_to_write / 2)) + .await + .is_err() + ); + + Ok(()) + } }