diff --git a/crates/iceberg/src/puffin/metadata.rs b/crates/iceberg/src/puffin/metadata.rs index ee74ae95a5..b56e4d8fd0 100644 --- a/crates/iceberg/src/puffin/metadata.rs +++ b/crates/iceberg/src/puffin/metadata.rs @@ -131,6 +131,7 @@ impl Flag { } /// Metadata about a puffin file. +/// /// For more information, see: https://iceberg.apache.org/puffin-spec/#filemetadata #[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] pub struct FileMetadata { @@ -144,18 +145,28 @@ impl FileMetadata { pub(crate) const MAGIC_LENGTH: u8 = 4; pub(crate) const MAGIC: [u8; FileMetadata::MAGIC_LENGTH as usize] = [0x50, 0x46, 0x41, 0x31]; - // We use the term FOOTER_STRUCT to refer to the fixed-length portion of the Footer, as illustrated below. - // - // Footer - // | - // ------------------------------------------------- - // | | - // Magic FooterPayload FooterPayloadLength Flags Magic - // | | - // ----------------------------- - // | - // FOOTER_STRUCT - + /// We use the term FOOTER_STRUCT to refer to the fixed-length portion of the Footer. + /// The structure of the Footer specification is illustrated below: + /// + /// ```text + /// Footer + /// ┌────────────────────┐ + /// │ Magic (4 bytes) │ + /// │ │ + /// ├────────────────────┤ + /// │ FooterPayload │ + /// │ (PAYLOAD_LENGTH) │ + /// ├────────────────────┤ ◀─┐ + /// │ FooterPayloadSize │ │ + /// │ (4 bytes) │ │ + /// ├────────────────────┤ + /// │ Flags (4 bytes) │ FOOTER_STRUCT + /// │ │ + /// ├────────────────────┤ │ + /// │ Magic (4 bytes) │ │ + /// │ │ │ + /// └────────────────────┘ ◀─┘ + /// ``` const FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET: u8 = 0; const FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH: u8 = 4; const FOOTER_STRUCT_FLAGS_OFFSET: u8 = FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET @@ -166,6 +177,11 @@ impl FileMetadata { pub(crate) const FOOTER_STRUCT_LENGTH: u8 = FileMetadata::FOOTER_STRUCT_MAGIC_OFFSET + FileMetadata::MAGIC_LENGTH; + /// Constructs new puffin `FileMetadata` + pub fn new(blobs: Vec, properties: HashMap) -> Self { + Self { blobs, properties } + } + fn check_magic(bytes: &[u8]) -> Result<()> { if bytes == FileMetadata::MAGIC { Ok(()) @@ -285,9 +301,73 @@ impl FileMetadata { let footer_payload_str = FileMetadata::extract_footer_payload_as_str(&footer_bytes, footer_payload_length)?; + FileMetadata::from_json_str(&footer_payload_str) } + /// Reads file_metadata in puffin file with a prefetch hint + /// + /// `prefetch_hint` is used to try to fetch the entire footer in one read. If + /// the entire footer isn't fetched in one read the function will call the regular + /// read option. + #[allow(dead_code)] + pub(crate) async fn read_with_prefetch( + input_file: &InputFile, + prefetch_hint: u8, + ) -> Result { + if prefetch_hint > 16 { + let input_file_length = input_file.metadata().await?.size; + let file_read = input_file.reader().await?; + + // Hint cannot be larger than input file + if prefetch_hint as u64 > input_file_length { + return FileMetadata::read(input_file).await; + } + + // Read footer based on prefetchi hint + let start = input_file_length - prefetch_hint as u64; + let end = input_file_length; + let footer_bytes = file_read.read(start..end).await?; + + let payload_length_start = + footer_bytes.len() - (FileMetadata::FOOTER_STRUCT_LENGTH as usize); + let payload_length_end = + payload_length_start + (FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH as usize); + let payload_length_bytes = &footer_bytes[payload_length_start..payload_length_end]; + + let mut buf = [0; 4]; + buf.copy_from_slice(payload_length_bytes); + let footer_payload_length = u32::from_le_bytes(buf); + + // If the (footer payload length + FOOTER_STRUCT_LENGTH + MAGIC_LENGTH) is greater + // than the fetched footer then you can have it read regularly from a read with no + // prefetch while passing in the footer_payload_length. + let footer_length = (footer_payload_length as usize) + + FileMetadata::FOOTER_STRUCT_LENGTH as usize + + FileMetadata::MAGIC_LENGTH as usize; + if footer_length > prefetch_hint as usize { + return FileMetadata::read(input_file).await; + } + + // Read footer bytes + let footer_start = footer_bytes.len() - footer_length; + let footer_end = footer_bytes.len(); + let footer_bytes = &footer_bytes[footer_start..footer_end]; + + let magic_length = FileMetadata::MAGIC_LENGTH as usize; + // check first four bytes of footer + FileMetadata::check_magic(&footer_bytes[..magic_length])?; + // check last four bytes of footer + FileMetadata::check_magic(&footer_bytes[footer_bytes.len() - magic_length..])?; + + let footer_payload_str = + FileMetadata::extract_footer_payload_as_str(footer_bytes, footer_payload_length)?; + return FileMetadata::from_json_str(&footer_payload_str); + } + + FileMetadata::read(input_file).await + } + #[inline] /// Metadata about blobs in file pub fn blobs(&self) -> &[BlobMetadata] { @@ -800,6 +880,7 @@ mod tests { let temp_dir = TempDir::new().unwrap(); let input_file = input_file_with_payload(&temp_dir, r#""blobs" = []"#).await; + assert_eq!( FileMetadata::read(&input_file).await.unwrap_err().to_string(), "DataInvalid => Given string is not valid JSON, source: invalid type: string \"blobs\", expected struct FileMetadata at line 1 column 7", @@ -809,6 +890,7 @@ mod tests { #[tokio::test] async fn test_read_file_metadata_of_uncompressed_empty_file() { let input_file = java_empty_uncompressed_input_file(); + let file_metadata = FileMetadata::read(&input_file).await.unwrap(); assert_eq!(file_metadata, empty_footer_payload()) } @@ -816,6 +898,7 @@ mod tests { #[tokio::test] async fn test_read_file_metadata_of_uncompressed_metric_data() { let input_file = java_uncompressed_metric_input_file(); + let file_metadata = FileMetadata::read(&input_file).await.unwrap(); assert_eq!(file_metadata, uncompressed_metric_file_metadata()) } @@ -823,7 +906,40 @@ mod tests { #[tokio::test] async fn test_read_file_metadata_of_zstd_compressed_metric_data() { let input_file = java_zstd_compressed_metric_input_file(); - let file_metadata = FileMetadata::read(&input_file).await.unwrap(); + + let file_metadata = FileMetadata::read_with_prefetch(&input_file, 64) + .await + .unwrap(); assert_eq!(file_metadata, zstd_compressed_metric_file_metadata()) } + + #[tokio::test] + async fn test_read_file_metadata_of_empty_file_with_prefetching() { + let input_file = java_empty_uncompressed_input_file(); + let file_metadata = FileMetadata::read_with_prefetch(&input_file, 64) + .await + .unwrap(); + + assert_eq!(file_metadata, empty_footer_payload()); + } + + #[tokio::test] + async fn test_read_file_metadata_of_uncompressed_metric_data_with_prefetching() { + let input_file = java_uncompressed_metric_input_file(); + let file_metadata = FileMetadata::read_with_prefetch(&input_file, 64) + .await + .unwrap(); + + assert_eq!(file_metadata, uncompressed_metric_file_metadata()); + } + + #[tokio::test] + async fn test_read_file_metadata_of_zstd_compressed_metric_data_with_prefetching() { + let input_file = java_zstd_compressed_metric_input_file(); + let file_metadata = FileMetadata::read_with_prefetch(&input_file, 64) + .await + .unwrap(); + + assert_eq!(file_metadata, zstd_compressed_metric_file_metadata()); + } }