Skip to content
Merged
179 changes: 140 additions & 39 deletions crates/iceberg/src/puffin/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,18 +144,28 @@ impl FileMetadata {
pub(crate) const MAGIC_LENGTH: u8 = 4;
pub(crate) const MAGIC: [u8; FileMetadata::MAGIC_LENGTH as usize] = [0x50, 0x46, 0x41, 0x31];

// We use the term FOOTER_STRUCT to refer to the fixed-length portion of the Footer, as illustrated below.
//
// Footer
// |
// -------------------------------------------------
// | |
// Magic FooterPayload FooterPayloadLength Flags Magic
// | |
// -----------------------------
// |
// FOOTER_STRUCT

/// We use the term FOOTER_STRUCT to refer to the fixed-length portion of the Footer.
/// The structure of the Footer specification is illustrated below:
///
/// ```text
/// Footer
/// ┌────────────────────┐
/// │ Magic (4 bytes) │
/// │ │
/// ├────────────────────┤
/// │ FooterPayload │
/// │ (PAYLOAD_LENGTH) │
/// ├────────────────────┤ ◀─┐
/// │ FooterPayloadSize │ │
/// │ (4 bytes) │ │
/// ├────────────────────┤
/// │ Flags (4 bytes) │ FOOTER_STRUCT
/// │ │
/// ├────────────────────┤ │
/// │ Magic (4 bytes) │ │
/// │ │ │
/// └────────────────────┘ ◀─┘
/// ```
const FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET: u8 = 0;
const FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH: u8 = 4;
const FOOTER_STRUCT_FLAGS_OFFSET: u8 = FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET
Expand Down Expand Up @@ -264,18 +274,84 @@ impl FileMetadata {
}

/// Returns the file metadata about a Puffin file
pub(crate) async fn read(input_file: &InputFile) -> Result<FileMetadata> {
///
/// `prefetch_hint` is used to try to fetch the entire footer in one read.
pub(crate) async fn read(
input_file: &InputFile,
prefetch_hint: Option<u8>,
) -> Result<FileMetadata> {
if let Some(hint) = prefetch_hint.filter(|&h| h > 16) {
let input_file_length = input_file.metadata().await?.size;
let file_read = input_file.reader().await?;

// Hint cannot be larger than input file
if hint as u64 > input_file_length {
return FileMetadata::read_no_prefetch(input_file, None).await;
}

// Read footer based on prefetchi hint
let start = input_file_length - hint as u64;
let end = input_file_length;
let footer_bytes = file_read.read(start..end).await?;

let payload_length_start =
footer_bytes.len() - (FileMetadata::FOOTER_STRUCT_LENGTH as usize);
let payload_length_end =
payload_length_start + (FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH as usize);
let payload_length_bytes = &footer_bytes[payload_length_start..payload_length_end];

let mut buf = [0; 4];
buf.copy_from_slice(payload_length_bytes);
let footer_payload_length = u32::from_le_bytes(buf);

// If the (footer payload length + FOOTER_STRUCT_LENGTH + MAGIC_LENGTH) is greater
// than the fetched footer then you can have it read regularly from a read with no
// prefetch while passing in the footer_payload_length.
let footer_length = (footer_payload_length as usize)
+ FileMetadata::FOOTER_STRUCT_LENGTH as usize
+ FileMetadata::MAGIC_LENGTH as usize;
if footer_length > hint as usize {
return FileMetadata::read_no_prefetch(input_file, Some(footer_payload_length))
.await;
}

// Read footer bytes
let footer_start = footer_bytes.len() as usize - footer_length;
let footer_end = footer_bytes.len() as usize;
let footer_bytes = &footer_bytes[footer_start..footer_end];

let magic_length = FileMetadata::MAGIC_LENGTH as usize;
// check first four bytes of footer
FileMetadata::check_magic(&footer_bytes[..magic_length])?;
// check last four bytes of footer
FileMetadata::check_magic(&footer_bytes[footer_bytes.len() - magic_length..])?;

let footer_payload_str =
FileMetadata::extract_footer_payload_as_str(footer_bytes, footer_payload_length)?;
return FileMetadata::from_json_str(&footer_payload_str);
}

FileMetadata::read_no_prefetch(input_file, None).await
}

async fn read_no_prefetch(
input_file: &InputFile,
footer_payload_length: Option<u32>,
) -> Result<FileMetadata> {
let input_file_length = input_file.metadata().await?.size;
let file_read = input_file.reader().await?;

let first_four_bytes = file_read.read(0..FileMetadata::MAGIC_LENGTH.into()).await?;
FileMetadata::check_magic(&first_four_bytes)?;

let input_file_length = input_file.metadata().await?.size;
let footer_payload_length =
FileMetadata::read_footer_payload_length(&file_read, input_file_length).await?;
let payload_length = if let Some(length) = footer_payload_length {
length
} else {
FileMetadata::read_footer_payload_length(&file_read, input_file_length).await?
};

let footer_bytes =
FileMetadata::read_footer_bytes(&file_read, input_file_length, footer_payload_length)
.await?;
FileMetadata::read_footer_bytes(&file_read, input_file_length, payload_length).await?;

let magic_length = FileMetadata::MAGIC_LENGTH as usize;
// check first four bytes of footer
Expand All @@ -284,7 +360,8 @@ impl FileMetadata {
FileMetadata::check_magic(&footer_bytes[footer_bytes.len() - magic_length..])?;

let footer_payload_str =
FileMetadata::extract_footer_payload_as_str(&footer_bytes, footer_payload_length)?;
FileMetadata::extract_footer_payload_as_str(&footer_bytes, payload_length)?;

FileMetadata::from_json_str(&footer_payload_str)
}

Expand Down Expand Up @@ -363,7 +440,7 @@ mod tests {
let input_file = input_file_with_bytes(&temp_dir, &bytes).await;

assert_eq!(
FileMetadata::read(&input_file)
FileMetadata::read(&input_file, None)
.await
.unwrap_err()
.to_string(),
Expand All @@ -386,7 +463,7 @@ mod tests {
let input_file = input_file_with_bytes(&temp_dir, &bytes).await;

assert_eq!(
FileMetadata::read(&input_file)
FileMetadata::read(&input_file, None)
.await
.unwrap_err()
.to_string(),
Expand All @@ -409,7 +486,7 @@ mod tests {
let input_file = input_file_with_bytes(&temp_dir, &bytes).await;

assert_eq!(
FileMetadata::read(&input_file)
FileMetadata::read(&input_file, None)
.await
.unwrap_err()
.to_string(),
Expand All @@ -434,7 +511,7 @@ mod tests {
let input_file = input_file_with_bytes(&temp_dir, &bytes).await;

assert_eq!(
FileMetadata::read(&input_file)
FileMetadata::read(&input_file, None)
.await
.unwrap_err()
.to_string(),
Expand All @@ -459,7 +536,7 @@ mod tests {
let input_file = input_file_with_bytes(&temp_dir, &bytes).await;

assert_eq!(
FileMetadata::read(&input_file)
FileMetadata::read(&input_file, None)
.await
.unwrap_err()
.to_string(),
Expand All @@ -482,7 +559,7 @@ mod tests {
let input_file = input_file_with_bytes(&temp_dir, &bytes).await;

assert_eq!(
FileMetadata::read(&input_file)
FileMetadata::read(&input_file, None)
.await
.unwrap_err()
.to_string(),
Expand All @@ -505,7 +582,7 @@ mod tests {
let input_file = input_file_with_bytes(&temp_dir, &bytes).await;

assert_eq!(
FileMetadata::read(&input_file)
FileMetadata::read(&input_file, None)
.await
.unwrap_err()
.to_string(),
Expand All @@ -531,7 +608,7 @@ mod tests {
let input_file = input_file_with_bytes(&temp_dir, &bytes).await;

assert_eq!(
FileMetadata::read(&input_file).await.unwrap_err().to_string(),
FileMetadata::read(&input_file, None).await.unwrap_err().to_string(),
"DataInvalid => Footer is not a valid UTF-8 string, source: invalid utf-8 sequence of 1 bytes from index 1",
)
}
Expand All @@ -551,7 +628,7 @@ mod tests {
let input_file = input_file_with_bytes(&temp_dir, &bytes).await;

assert_eq!(
FileMetadata::read(&input_file).await.unwrap(),
FileMetadata::read(&input_file, None).await.unwrap(),
FileMetadata {
blobs: vec![],
properties: HashMap::new(),
Expand All @@ -575,7 +652,7 @@ mod tests {
.await;

assert_eq!(
FileMetadata::read(&input_file).await.unwrap(),
FileMetadata::read(&input_file, None).await.unwrap(),
FileMetadata {
blobs: vec![],
properties: {
Expand Down Expand Up @@ -604,7 +681,7 @@ mod tests {
.await;

assert_eq!(
FileMetadata::read(&input_file).await.unwrap(),
FileMetadata::read(&input_file, None).await.unwrap(),
FileMetadata {
blobs: vec![],
properties: {
Expand All @@ -630,7 +707,7 @@ mod tests {
.await;

assert_eq!(
FileMetadata::read(&input_file).await.unwrap_err().to_string(),
FileMetadata::read(&input_file, None).await.unwrap_err().to_string(),
format!(
"DataInvalid => Given string is not valid JSON, source: missing field `blobs` at line 3 column 13"
),
Expand All @@ -650,7 +727,7 @@ mod tests {
.await;

assert_eq!(
FileMetadata::read(&input_file).await.unwrap_err().to_string(),
FileMetadata::read(&input_file, None).await.unwrap_err().to_string(),
format!("DataInvalid => Given string is not valid JSON, source: invalid type: map, expected a sequence at line 2 column 26"),
)
}
Expand Down Expand Up @@ -685,7 +762,7 @@ mod tests {
.await;

assert_eq!(
FileMetadata::read(&input_file).await.unwrap(),
FileMetadata::read(&input_file, None).await.unwrap(),
FileMetadata {
blobs: vec![
BlobMetadata {
Expand Down Expand Up @@ -739,7 +816,7 @@ mod tests {
.await;

assert_eq!(
FileMetadata::read(&input_file).await.unwrap(),
FileMetadata::read(&input_file, None).await.unwrap(),
FileMetadata {
blobs: vec![BlobMetadata {
r#type: "type-a".to_string(),
Expand Down Expand Up @@ -787,7 +864,7 @@ mod tests {
.await;

assert_eq!(
FileMetadata::read(&input_file).await.unwrap_err().to_string(),
FileMetadata::read(&input_file, None).await.unwrap_err().to_string(),
format!(
"DataInvalid => Given string is not valid JSON, source: invalid value: integer `{}`, expected i32 at line 5 column 51",
out_of_i32_range_number
Expand All @@ -801,29 +878,53 @@ mod tests {

let input_file = input_file_with_payload(&temp_dir, r#""blobs" = []"#).await;
assert_eq!(
FileMetadata::read(&input_file).await.unwrap_err().to_string(),
FileMetadata::read(&input_file, None).await.unwrap_err().to_string(),
"DataInvalid => Given string is not valid JSON, source: invalid type: string \"blobs\", expected struct FileMetadata at line 1 column 7",
)
}

#[tokio::test]
async fn test_read_file_metadata_of_uncompressed_empty_file() {
let input_file = java_empty_uncompressed_input_file();
let file_metadata = FileMetadata::read(&input_file).await.unwrap();
let file_metadata = FileMetadata::read(&input_file, None).await.unwrap();
assert_eq!(file_metadata, empty_footer_payload())
}

#[tokio::test]
async fn test_read_file_metadata_of_uncompressed_metric_data() {
let input_file = java_uncompressed_metric_input_file();
let file_metadata = FileMetadata::read(&input_file).await.unwrap();
let file_metadata = FileMetadata::read(&input_file, None).await.unwrap();
assert_eq!(file_metadata, uncompressed_metric_file_metadata())
}

#[tokio::test]
async fn test_read_file_metadata_of_zstd_compressed_metric_data() {
let input_file = java_zstd_compressed_metric_input_file();
let file_metadata = FileMetadata::read(&input_file).await.unwrap();
let file_metadata = FileMetadata::read(&input_file, None).await.unwrap();
assert_eq!(file_metadata, zstd_compressed_metric_file_metadata())
}

#[tokio::test]
async fn test_read_file_metadata_of_empty_file_with_prefetching() {
let input_file = java_empty_uncompressed_input_file();
let file_metadata = FileMetadata::read(&input_file, Some(64)).await.unwrap();

assert_eq!(file_metadata, empty_footer_payload());
}

#[tokio::test]
async fn test_read_file_metadata_of_uncompressed_metric_data_with_prefetching() {
let input_file = java_uncompressed_metric_input_file();
let file_metadata = FileMetadata::read(&input_file, Some(64)).await.unwrap();

assert_eq!(file_metadata, uncompressed_metric_file_metadata());
}

#[tokio::test]
async fn test_read_file_metadata_of_zstd_compressed_metric_data_with_prefetching() {
let input_file = java_zstd_compressed_metric_input_file();
let file_metadata = FileMetadata::read(&input_file, Some(64)).await.unwrap();

assert_eq!(file_metadata, zstd_compressed_metric_file_metadata());
}
}
8 changes: 4 additions & 4 deletions crates/iceberg/src/puffin/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ impl PuffinReader {
}

/// Returns file metadata
pub async fn file_metadata(&self) -> Result<&FileMetadata> {
pub async fn file_metadata(&self, prefetch_hint: Option<u8>) -> Result<&FileMetadata> {
self.file_metadata
.get_or_try_init(|| FileMetadata::read(&self.input_file))
.get_or_try_init(|| FileMetadata::read(&self.input_file, prefetch_hint))
.await
}

Expand Down Expand Up @@ -78,7 +78,7 @@ mod tests {
let input_file = java_uncompressed_metric_input_file();
let puffin_reader = PuffinReader::new(input_file);

let file_metadata = puffin_reader.file_metadata().await.unwrap().clone();
let file_metadata = puffin_reader.file_metadata(None).await.unwrap().clone();
assert_eq!(file_metadata, uncompressed_metric_file_metadata());

assert_eq!(
Expand All @@ -103,7 +103,7 @@ mod tests {
let input_file = java_zstd_compressed_metric_input_file();
let puffin_reader = PuffinReader::new(input_file);

let file_metadata = puffin_reader.file_metadata().await.unwrap().clone();
let file_metadata = puffin_reader.file_metadata(None).await.unwrap().clone();
assert_eq!(file_metadata, zstd_compressed_metric_file_metadata());

assert_eq!(
Expand Down
Loading
Loading