Skip to content

Commit 742c1e7

Browse files
committed
add a check after writing && fix a latent bug when writing with retries
1 parent 01165c2 commit 742c1e7

File tree

3 files changed

+34
-6
lines changed

3 files changed

+34
-6
lines changed

core/src/layers/complete.rs

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@ impl<A: Access> LayeredAccess for CompleteAccessor<A> {
368368

369369
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
370370
let (rp, w) = self.inner.write(path, args.clone()).await?;
371-
let w = CompleteWriter::new(w);
371+
let w = CompleteWriter::new(w, args.append());
372372
Ok((rp, w))
373373
}
374374

@@ -400,9 +400,10 @@ impl<A: Access> LayeredAccess for CompleteAccessor<A> {
400400
}
401401

402402
fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
403+
let append = args.append();
403404
self.inner
404405
.blocking_write(path, args)
405-
.map(|(rp, w)| (rp, CompleteWriter::new(w)))
406+
.map(|(rp, w)| (rp, CompleteWriter::new(w, append)))
406407
}
407408

408409
fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
@@ -487,16 +488,38 @@ impl<R: oio::BlockingRead> oio::BlockingRead for CompleteReader<R> {
487488

488489
pub struct CompleteWriter<W> {
489490
inner: Option<W>,
491+
append: bool,
490492
size: u64,
491493
}
492494

493495
impl<W> CompleteWriter<W> {
494-
pub fn new(inner: W) -> CompleteWriter<W> {
496+
pub fn new(inner: W, append: bool) -> CompleteWriter<W> {
495497
CompleteWriter {
496498
inner: Some(inner),
499+
append,
497500
size: 0,
498501
}
499502
}
503+
504+
fn check(&self, content_length: u64) -> Result<()> {
505+
if self.append || content_length == 0 {
506+
return Ok(());
507+
}
508+
509+
match self.size.cmp(&content_length) {
510+
Ordering::Equal => Ok(()),
511+
Ordering::Less => Err(
512+
Error::new(ErrorKind::Unexpected, "writer got too little data")
513+
.with_context("expect", content_length)
514+
.with_context("actual", self.size),
515+
),
516+
Ordering::Greater => Err(
517+
Error::new(ErrorKind::Unexpected, "writer got too much data")
518+
.with_context("expect", content_length)
519+
.with_context("actual", self.size),
520+
),
521+
}
522+
}
500523
}
501524

502525
/// Check if the writer has been closed or aborted while debug_assertions
@@ -534,6 +557,7 @@ where
534557
// we must return `Err` before setting inner to None; otherwise,
535558
// we won't be able to retry `close` in `RetryLayer`.
536559
let mut ret = w.close().await?;
560+
self.check(ret.content_length())?;
537561
if ret.content_length() == 0 {
538562
ret = ret.with_content_length(self.size);
539563
}
@@ -576,6 +600,7 @@ where
576600
})?;
577601

578602
let mut ret = w.close()?;
603+
self.check(ret.content_length())?;
579604
if ret.content_length() == 0 {
580605
ret = ret.with_content_length(self.size);
581606
}

core/src/raw/oio/write/block_write.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,8 +201,9 @@ where
201201
None => (0, Buffer::new()),
202202
};
203203

204+
let meta = self.w.write_once(size as u64, body).await?;
204205
self.cache = None;
205-
return self.w.write_once(size as u64, body).await;
206+
return Ok(meta);
206207
}
207208

208209
if let Some(cache) = self.cache.clone() {

core/src/raw/oio/write/multipart_write.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -250,9 +250,11 @@ where
250250
None => (0, Buffer::new()),
251251
};
252252

253-
self.cache = None;
254253
// Call write_once if there is no upload_id.
255-
return self.w.write_once(size as u64, body).await;
254+
let meta = self.w.write_once(size as u64, body).await?;
255+
// make sure to clear the cache only after write_once succeeds; otherwise, retries may fail.
256+
self.cache = None;
257+
return Ok(meta);
256258
}
257259
};
258260

0 commit comments

Comments
 (0)