Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retry logic to image layer fetching and decompression #291

Merged
merged 2 commits into from
Jan 16, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 97 additions & 50 deletions src/tardev-snapshotter/src/snapshotter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -887,63 +887,110 @@ impl TarDevSnapshotter {

let upstream_name = base_name.with_extension(layer_type);

info!("Fetching {} layer image to {:?}", layer_type, upstream_name);
self.get_layer_image(&upstream_name, digest_str).await?;

// Process the layer
let generated_root_hash = tokio::task::spawn_blocking(move || -> Result<_> {
if layer_type == TAR_EXTENSION {
info!("Renaming {:?} to {:?}", &upstream_name, &base_name);
std::fs::rename(&upstream_name, &base_name)?;
}
let mut file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(layer_type == TAR_GZ_EXTENSION)
.open(&base_name)?;
if layer_type == TAR_GZ_EXTENSION {
info!("Decompressing {:?} to {:?}", &upstream_name, &base_name);
let compressed = fs::File::open(&upstream_name).map_err(|e| {
let file_error = format!(
"Failed to open file {:?} for decompression: {:?}",
&upstream_name, e
);
error!("{}", file_error);
anyhow::anyhow!(file_error)
})?;

let mut gz_decoder = flate2::read::GzDecoder::new(compressed);

if let Err(e) = std::io::copy(&mut gz_decoder, &mut file) {
let copy_error = format!("failed to copy payload from gz decoder {:?}", e);
error!("{}", copy_error);
return Err(anyhow::anyhow!(copy_error));
const MAX_RETRIES: usize = 3;
let mut retries = 0;

while retries < MAX_RETRIES {
jiria marked this conversation as resolved.
Show resolved Hide resolved
jiria marked this conversation as resolved.
Show resolved Hide resolved
info!("Fetching {} layer image to {:?}", layer_type, upstream_name);
if let Err(download_err) = self.get_layer_image(&upstream_name, digest_str).await {
retries += 1;
error!(
"Failed to fetch layer image (attempt {}/{}): {:?}",
retries,
MAX_RETRIES,
download_err
);
if retries >= MAX_RETRIES {
return Err(Status::unknown(format!(
"Failed to fetch layer image after {} attempts: {:?}",
MAX_RETRIES, download_err
)));
}

warn!("Retrying layer image download...");
continue; // Retry fetching the layer image
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, but would it make sense to sleep here for a bit? Presumably we will run against the deadline that containerd has, so cannot sleep for too long.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree. It would make sense to sleep a bit if it's truely a network issue. I think the specific timeout is managed by the client (k8s) instead of containerd itself. Given that I set sleep time to be 500ms for now.

}

trace!("Appending index to {:?}", &base_name);
file.rewind().context("failed to rewind the file handle")?;
tarindex::append_index(&mut file).context("failed to append tar index")?;
// Process the layer
let process_result = tokio::task::spawn_blocking({
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the download itself be part of the spawn_blocking block?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved download itself to be part of the new function to be run inside the while loop.

let upstream_name = upstream_name.clone();
let base_name = base_name.clone();
let salt = salt.clone();

trace!("Appending dm-verity tree to {:?}", &base_name);
let root_hash = verity::append_tree::<Sha256>(&mut file, &salt)
.context("failed to append verity tree")?;
move || -> Result<_> {
if layer_type == TAR_EXTENSION {
info!("Renaming {:?} to {:?}", &upstream_name, &base_name);
std::fs::rename(&upstream_name, &base_name)?;
}
let mut file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(layer_type == TAR_GZ_EXTENSION)
.open(&base_name)?;
if layer_type == TAR_GZ_EXTENSION {
info!("Decompressing {:?} to {:?}", &upstream_name, &base_name);
let compressed = fs::File::open(&upstream_name).map_err(|e| {
let file_error = format!(
"Failed to open file {:?} for decompression: {:?}",
&upstream_name, e
);
error!("{}", file_error);
anyhow::anyhow!(file_error)
})?;

let mut gz_decoder = flate2::read::GzDecoder::new(compressed);

if let Err(e) = std::io::copy(&mut gz_decoder, &mut file) {
let copy_error = format!("failed to copy payload from gz decoder {:?}", e);
error!("{}", copy_error);
return Err(anyhow::anyhow!(copy_error));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the error we hit; should we trigger retry with a new download here as well? Or what are we doing to resolve it?

failed to extract image layer: failed to copy payload from gz decoder Error { kind: UnexpectedEof, message: \"failed to fill whole buffer\" }: unknown

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, failing here will trigger a new download.

}
}

trace!("Root hash for {:?} is {:x}", &base_name, root_hash);
Ok(root_hash)
})
.await
.map_err(|e| Status::unknown(format!("error in worker task: {e}")))?
.map_err(|e| Status::unknown(format!("failed to extract image layer: {e}")))?;
trace!("Appending index to {:?}", &base_name);
file.rewind().context("failed to rewind the file handle")?;
tarindex::append_index(&mut file).context("failed to append tar index")?;

let generated_root_hash = format!("{:x}", generated_root_hash);
trace!("Appending dm-verity tree to {:?}", &base_name);
let root_hash = verity::append_tree::<Sha256>(&mut file, &salt)
.context("failed to append verity tree")?;

if root_hash != generated_root_hash {
return Err(Status::internal(format!(
"root hash mismatch: expected {}, got {}",
root_hash, generated_root_hash
)));
trace!("Root hash for {:?} is {:x}", &base_name, root_hash);
Ok(root_hash)
}
})
.await
.map_err(|e| Status::unknown(format!("error in worker task: {e}")))?;

match process_result {
Ok(generated_root_hash) => {
let generated_root_hash = format!("{:x}", generated_root_hash);
if root_hash != generated_root_hash {
return Err(Status::internal(format!(
"root hash mismatch: expected {}, got {}",
root_hash, generated_root_hash
)));
}
break; // succeeded; exit retry loop
}
Err(process_err) => {
retries += 1;
error!(
"Failed to process layer (attempt {}/{}): {:?}",
retries,
MAX_RETRIES,
process_err
);
if retries >= MAX_RETRIES {
return Err(Status::unknown(format!(
"Failed to process layer after {} attempts: {:?}",
MAX_RETRIES, process_err
)));
}
warn!("Retrying layer processing...");
}
}
}

// Store a label with the root hash so that we can recall it later when mounting.
Expand Down
Loading