Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion rust/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,6 @@ impl FileWriter {

pub async fn write(&mut self, mut buf: Bytes) -> Result<usize> {
let bytes_to_write = buf.len();
// Create a shallow copy of the bytes instance to mutate and track what's been read
while !buf.is_empty() {
let block_writer = self.get_block_writer().await?;

Expand Down
83 changes: 65 additions & 18 deletions rust/src/hdfs/block_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -846,11 +846,12 @@ pub(crate) struct StripedBlockWriter {
server_defaults: hdfs::FsServerDefaultsProto,
config: Arc<Configuration>,
handle: Handle,
block_writers: Vec<Option<ReplicatedBlockWriter>>,
block_writers: Vec<Option<Result<ReplicatedBlockWriter>>>,
cell_buffer: CellBuffer,
bytes_written: usize,
capacity: usize,
status: hdfs::HdfsFileStatusProto,
data_units: usize,
}

impl StripedBlockWriter {
Expand All @@ -876,15 +877,22 @@ impl StripedBlockWriter {
bytes_written: 0,
capacity: ec_schema.data_units * status.blocksize() as usize,
status: status.clone(),
data_units: ec_schema.data_units,
}
}

fn bytes_remaining(&self) -> usize {
self.capacity - self.bytes_written
}

fn is_full(&self) -> bool {
self.bytes_remaining() == 0
}

async fn write_cells(&mut self) -> Result<()> {
let mut write_futures = vec![];
let mut writer_indices = vec![];

for (index, (data, writer)) in self
.cell_buffer
.encode()
Expand Down Expand Up @@ -915,16 +923,39 @@ impl StripedBlockWriter {
self.handle.clone(),
&self.status,
)
.await?,
.await,
)
}

if writer.as_ref().unwrap().is_err() {
continue;
}

let mut data = data.clone();
write_futures.push(async move { writer.as_mut().unwrap().write(&mut data).await })
writer_indices.push(index);

// We know at this point writer is Some(Ok(_))
let block_writer = writer.as_mut().unwrap().as_mut().unwrap();
write_futures.push(async move { block_writer.write(&mut data).await })
}

for write in join_all(write_futures).await {
write?;
for (write_idx, write) in join_all(write_futures).await.into_iter().enumerate() {
if let Err(e) = write {
let index = writer_indices[write_idx];
warn!("Write failed for block index {}: {:?}", index, e);
self.block_writers[index] = Some(Err(e));
}
}

let failed_writers = self
.block_writers
.iter()
.filter(|writer| writer.as_ref().is_some_and(|w| w.is_err()))
.count();
if failed_writers > (self.block_writers.len() - self.data_units) {
return Err(HdfsError::DataTransferError(
"Insufficient blocks succeeded during write".to_string(),
));
}

Ok(())
Expand Down Expand Up @@ -952,26 +983,42 @@ impl StripedBlockWriter {
self.write_cells().await?;
}

let close_futures = self
.block_writers
.into_iter()
.filter_map(|mut writer| writer.take())
.map(|writer| async move { writer.close().await });
let mut close_futures = vec![];
let mut writer_indices = vec![];
let mut succeeded_blocks = 0usize;

for (index, writer) in self.block_writers.into_iter().enumerate() {
match writer {
Some(Ok(writer)) => {
writer_indices.push(index);
close_futures.push(async move { writer.close().await });
}
Some(Err(_)) => {}
None => {
// If no data was written to a block, treat it as successful
succeeded_blocks += 1;
}
}
}

for close_result in join_all(close_futures).await {
close_result?;
for (close_idx, close_result) in join_all(close_futures).await.into_iter().enumerate() {
if let Err(e) = close_result {
let index = writer_indices[close_idx];
warn!("Write failed for block index {}: {:?}", index, e);
} else {
succeeded_blocks += 1;
}
}

if succeeded_blocks < self.data_units {
return Err(HdfsError::DataTransferError(
"Insufficient blocks succeeded during write".to_string(),
));
}
let mut extended_block = self.block.b;

extended_block.num_bytes = Some(self.bytes_written as u64);

Ok(extended_block)
}

fn is_full(&self) -> bool {
self.block_writers
.iter()
.all(|writer| writer.as_ref().is_some_and(|w| w.is_full()))
}
}
115 changes: 115 additions & 0 deletions rust/tests/test_write_resiliency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,4 +250,119 @@ mod test {
}
Ok(())
}

#[tokio::test]
#[serial]
async fn test_striped_write_failures() -> Result<()> {
let _ = env_logger::builder().is_test(true).try_init();

let dfs_features = HashSet::from([DfsFeatures::HA, DfsFeatures::EC]);
let _dfs = MiniDfs::with_features(&dfs_features);
let client = Client::default();

// A full stripe is 3 MiB, so write two full stripes to test two failures
let bytes_to_write = 6usize * 1024 * 1024;

let mut data = BytesMut::with_capacity(bytes_to_write);
for i in 0..(bytes_to_write / 4) {
data.put_i32(i as i32);
}
let data = data.freeze();

// Test connection failure before writing data with erasure coding
let file = "/ec-3-2/striped_testfile1";
let mut writer = client.create(file, WriteOptions::default()).await?;

WRITE_CONNECTION_FAULT_INJECTOR.store(true, Ordering::SeqCst);

writer.write(data.clone()).await?;
writer.close().await?;

let reader = client.read(file).await?;
check_file_content(&reader, data.clone()).await?;

// Test two connection failures after data has been written
let file = "/ec-3-2/striped_testfile2";
let mut writer = client.create(file, WriteOptions::default()).await?;

WRITE_CONNECTION_FAULT_INJECTOR.store(true, Ordering::SeqCst);

writer.write(data.slice(..bytes_to_write / 2)).await?;

// Give a little time for the packets to send
tokio::time::sleep(Duration::from_millis(100)).await;

assert!(!WRITE_CONNECTION_FAULT_INJECTOR.load(Ordering::SeqCst));
WRITE_CONNECTION_FAULT_INJECTOR.store(true, Ordering::SeqCst);

writer.write(data.slice(bytes_to_write / 2..)).await?;
writer.close().await?;

// Give a little time for the packets to send
tokio::time::sleep(Duration::from_millis(100)).await;

assert!(!WRITE_CONNECTION_FAULT_INJECTOR.load(Ordering::SeqCst));

let reader = client.read(file).await?;
check_file_content(&reader, data.clone()).await?;

// Test failure in from ack status
let file = "/ec-3-2/striped_testfile3";
let mut writer = client.create(file, WriteOptions::default()).await?;

*WRITE_REPLY_FAULT_INJECTOR.lock().unwrap() = Some(2);

writer.write(data.clone()).await?;
writer.close().await?;

let reader = client.read(file).await?;
check_file_content(&reader, data.clone()).await?;

// Test failure in from ack status after some data has been written
let file = "/ec-3-2/striped_testfile4";
let mut writer = client.create(file, WriteOptions::default()).await?;

writer.write(data.slice(..bytes_to_write / 2)).await?;

// Give a little time for the packets to send
tokio::time::sleep(Duration::from_millis(100)).await;

*WRITE_REPLY_FAULT_INJECTOR.lock().unwrap() = Some(2);

writer.write(data.slice(bytes_to_write / 2..)).await?;
writer.close().await?;

let reader = client.read(file).await?;
check_file_content(&reader, data.clone()).await?;

// Test three failures which should cause a write failure
let file = "/ec-3-2/striped_testfile5";
let mut writer = client.create(file, WriteOptions::default()).await?;

WRITE_CONNECTION_FAULT_INJECTOR.store(true, Ordering::SeqCst);
writer.write(data.slice(..bytes_to_write / 2)).await?;
tokio::time::sleep(Duration::from_millis(100)).await;
assert!(!WRITE_CONNECTION_FAULT_INJECTOR.load(Ordering::SeqCst));

WRITE_CONNECTION_FAULT_INJECTOR.store(true, Ordering::SeqCst);
writer.write(data.slice(..bytes_to_write / 2)).await?;
tokio::time::sleep(Duration::from_millis(100)).await;
assert!(!WRITE_CONNECTION_FAULT_INJECTOR.load(Ordering::SeqCst));

// Third write. It won't fail yet because the sending of the data is asynchronous.
// The failure will happen when the client tries to send the next block.
WRITE_CONNECTION_FAULT_INJECTOR.store(true, Ordering::SeqCst);
writer.write(data.slice(..bytes_to_write / 2)).await?;
tokio::time::sleep(Duration::from_millis(100)).await;
assert!(!WRITE_CONNECTION_FAULT_INJECTOR.load(Ordering::SeqCst));

assert!(
writer
.write(data.slice(..bytes_to_write / 2))
.await
.is_err()
);

Ok(())
}
}
Loading