From d9f5ffd635b10758a7d35357027f44f261c29e54 Mon Sep 17 00:00:00 2001 From: fqaiser94 Date: Wed, 9 Apr 2025 07:48:29 -0400 Subject: [PATCH] Make Puffin APIs public --- crates/iceberg/src/lib.rs | 2 +- crates/iceberg/src/puffin/blob.rs | 48 ++++++++++++++--- crates/iceberg/src/puffin/metadata.rs | 78 ++++++++++++++++++++++----- crates/iceberg/src/puffin/mod.rs | 12 +++-- crates/iceberg/src/puffin/reader.rs | 8 +-- crates/iceberg/src/puffin/writer.rs | 12 ++--- 6 files changed, 123 insertions(+), 37 deletions(-) diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 80444df4a4..278fe18f74 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -88,4 +88,4 @@ mod utils; pub mod writer; mod delete_vector; -mod puffin; +pub mod puffin; diff --git a/crates/iceberg/src/puffin/blob.rs b/crates/iceberg/src/puffin/blob.rs index a08fd9417d..53c4963e92 100644 --- a/crates/iceberg/src/puffin/blob.rs +++ b/crates/iceberg/src/puffin/blob.rs @@ -18,21 +18,53 @@ use std::collections::HashMap; /// A serialized form of a "compact" Theta sketch produced by the Apache DataSketches library. -pub(crate) const APACHE_DATASKETCHES_THETA_V1: &str = "apache-datasketches-theta-v1"; +pub const APACHE_DATASKETCHES_THETA_V1: &str = "apache-datasketches-theta-v1"; /// The blob #[derive(Debug, PartialEq, Clone)] -pub(crate) struct Blob { - /// See blob types: https://iceberg.apache.org/puffin-spec/#blob-types +pub struct Blob { pub(crate) r#type: String, - /// List of field IDs the blob was computed for; the order of items is used to compute sketches stored in the blob. pub(crate) fields: Vec, - /// ID of the Iceberg table's snapshot the blob was computed from pub(crate) snapshot_id: i64, - /// Sequence number of the Iceberg table's snapshot the blob was computed from pub(crate) sequence_number: i64, - /// The uncompressed blob data pub(crate) data: Vec, - /// Arbitrary meta-information about the blob pub(crate) properties: HashMap, } + +impl Blob { + #[inline] + /// See blob types: https://iceberg.apache.org/puffin-spec/#blob-types + pub fn blob_type(&self) -> &str { + &self.r#type + } + + #[inline] + /// List of field IDs the blob was computed for; the order of items is used to compute sketches stored in the blob. + pub fn fields(&self) -> &[i32] { + &self.fields + } + + #[inline] + /// ID of the Iceberg table's snapshot the blob was computed from + pub fn snapshot_id(&self) -> i64 { + self.snapshot_id + } + + #[inline] + /// Sequence number of the Iceberg table's snapshot the blob was computed from + pub fn sequence_number(&self) -> i64 { + self.sequence_number + } + + #[inline] + /// The uncompressed blob data + pub fn data(&self) -> &[u8] { + &self.data + } + + #[inline] + /// Arbitrary meta-information about the blob + pub fn properties(&self) -> &HashMap { + &self.properties + } +} diff --git a/crates/iceberg/src/puffin/metadata.rs b/crates/iceberg/src/puffin/metadata.rs index 9d00032253..ee74ae95a5 100644 --- a/crates/iceberg/src/puffin/metadata.rs +++ b/crates/iceberg/src/puffin/metadata.rs @@ -26,35 +26,77 @@ use crate::{Error, ErrorKind, Result}; /// Human-readable identification of the application writing the file, along with its version. /// Example: "Trino version 381" -pub(crate) const CREATED_BY_PROPERTY: &str = "created-by"; +pub const CREATED_BY_PROPERTY: &str = "created-by"; /// Metadata about a blob. /// For more information, see: https://iceberg.apache.org/puffin-spec/#blobmetadata #[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] #[serde(rename_all = "kebab-case")] -pub(crate) struct BlobMetadata { - /// See blob types: https://iceberg.apache.org/puffin-spec/#blob-types +pub struct BlobMetadata { pub(crate) r#type: String, - /// List of field IDs the blob was computed for; the order of items is used to compute sketches stored in the blob. pub(crate) fields: Vec, - /// ID of the Iceberg table's snapshot the blob was computed from pub(crate) snapshot_id: i64, - /// Sequence number of the Iceberg table's snapshot the blob was computed from pub(crate) sequence_number: i64, - /// The offset in the file where the blob contents start pub(crate) offset: u64, - /// The length of the blob stored in the file (after compression, if compressed) pub(crate) length: u64, - /// The compression codec used to compress the data #[serde(skip_serializing_if = "CompressionCodec::is_none")] #[serde(default)] pub(crate) compression_codec: CompressionCodec, - /// Arbitrary meta-information about the blob #[serde(skip_serializing_if = "HashMap::is_empty")] #[serde(default)] pub(crate) properties: HashMap, } +impl BlobMetadata { + #[inline] + /// See blob types: https://iceberg.apache.org/puffin-spec/#blob-types + pub fn blob_type(&self) -> &str { + &self.r#type + } + + #[inline] + /// List of field IDs the blob was computed for; the order of items is used to compute sketches stored in the blob. + pub fn fields(&self) -> &[i32] { + &self.fields + } + + #[inline] + /// ID of the Iceberg table's snapshot the blob was computed from + pub fn snapshot_id(&self) -> i64 { + self.snapshot_id + } + + #[inline] + /// Sequence number of the Iceberg table's snapshot the blob was computed from + pub fn sequence_number(&self) -> i64 { + self.sequence_number + } + + #[inline] + /// The offset in the file where the blob contents start + pub fn offset(&self) -> u64 { + self.offset + } + + #[inline] + /// The length of the blob stored in the file (after compression, if compressed) + pub fn length(&self) -> u64 { + self.length + } + + #[inline] + /// The compression codec used to compress the data + pub fn compression_codec(&self) -> CompressionCodec { + self.compression_codec + } + + #[inline] + /// Arbitrary meta-information about the blob + pub fn properties(&self) -> &HashMap { + &self.properties + } +} + #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] pub(crate) enum Flag { FooterPayloadCompressed = 0, @@ -91,10 +133,8 @@ impl Flag { /// Metadata about a puffin file. /// For more information, see: https://iceberg.apache.org/puffin-spec/#filemetadata #[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] -pub(crate) struct FileMetadata { - /// Metadata about blobs in file +pub struct FileMetadata { pub(crate) blobs: Vec, - /// Arbitrary meta-information, like writer identification/version. #[serde(skip_serializing_if = "HashMap::is_empty")] #[serde(default)] pub(crate) properties: HashMap, @@ -247,6 +287,18 @@ impl FileMetadata { FileMetadata::extract_footer_payload_as_str(&footer_bytes, footer_payload_length)?; FileMetadata::from_json_str(&footer_payload_str) } + + #[inline] + /// Metadata about blobs in file + pub fn blobs(&self) -> &[BlobMetadata] { + &self.blobs + } + + #[inline] + /// Arbitrary meta-information, like writer identification/version. + pub fn properties(&self) -> &HashMap { + &self.properties + } } #[cfg(test)] diff --git a/crates/iceberg/src/puffin/mod.rs b/crates/iceberg/src/puffin/mod.rs index 4630dcb184..e60911181d 100644 --- a/crates/iceberg/src/puffin/mod.rs +++ b/crates/iceberg/src/puffin/mod.rs @@ -18,15 +18,21 @@ //! Iceberg Puffin implementation. #![deny(missing_docs)] -// Temporarily allowing this while crate is under active development -#![allow(dead_code)] mod blob; +pub use blob::{Blob, APACHE_DATASKETCHES_THETA_V1}; + mod compression; +pub use compression::CompressionCodec; + mod metadata; -#[cfg(feature = "tokio")] +pub use metadata::{BlobMetadata, FileMetadata, CREATED_BY_PROPERTY}; + mod reader; +pub use reader::PuffinReader; + mod writer; +pub use writer::PuffinWriter; #[cfg(test)] mod test_utils; diff --git a/crates/iceberg/src/puffin/reader.rs b/crates/iceberg/src/puffin/reader.rs index 1114d29e9f..4e6fb2fb9a 100644 --- a/crates/iceberg/src/puffin/reader.rs +++ b/crates/iceberg/src/puffin/reader.rs @@ -23,14 +23,14 @@ use crate::puffin::metadata::{BlobMetadata, FileMetadata}; use crate::Result; /// Puffin reader -pub(crate) struct PuffinReader { +pub struct PuffinReader { input_file: InputFile, file_metadata: OnceCell, } impl PuffinReader { /// Returns a new Puffin reader - pub(crate) fn new(input_file: InputFile) -> Self { + pub fn new(input_file: InputFile) -> Self { Self { input_file, file_metadata: OnceCell::new(), @@ -38,14 +38,14 @@ impl PuffinReader { } /// Returns file metadata - pub(crate) async fn file_metadata(&self) -> Result<&FileMetadata> { + pub async fn file_metadata(&self) -> Result<&FileMetadata> { self.file_metadata .get_or_try_init(|| FileMetadata::read(&self.input_file)) .await } /// Returns blob - pub(crate) async fn blob(&self, blob_metadata: &BlobMetadata) -> Result { + pub async fn blob(&self, blob_metadata: &BlobMetadata) -> Result { let file_read = self.input_file.reader().await?; let start = blob_metadata.offset; let end = start + blob_metadata.length; diff --git a/crates/iceberg/src/puffin/writer.rs b/crates/iceberg/src/puffin/writer.rs index 57c4fd0798..7d6d0548c5 100644 --- a/crates/iceberg/src/puffin/writer.rs +++ b/crates/iceberg/src/puffin/writer.rs @@ -26,7 +26,7 @@ use crate::puffin::metadata::{BlobMetadata, FileMetadata, Flag}; use crate::Result; /// Puffin writer -pub(crate) struct PuffinWriter { +pub struct PuffinWriter { writer: Box, is_header_written: bool, num_bytes_written: u64, @@ -38,7 +38,7 @@ pub(crate) struct PuffinWriter { impl PuffinWriter { /// Returns a new Puffin writer - pub(crate) async fn new( + pub async fn new( output_file: &OutputFile, properties: HashMap, compress_footer: bool, @@ -63,11 +63,7 @@ impl PuffinWriter { } /// Adds blob to Puffin file - pub(crate) async fn add( - &mut self, - blob: Blob, - compression_codec: CompressionCodec, - ) -> Result<()> { + pub async fn add(&mut self, blob: Blob, compression_codec: CompressionCodec) -> Result<()> { self.write_header_once().await?; let offset = self.num_bytes_written; @@ -89,7 +85,7 @@ impl PuffinWriter { } /// Finalizes the Puffin file - pub(crate) async fn close(mut self) -> Result<()> { + pub async fn close(mut self) -> Result<()> { self.write_header_once().await?; self.write_footer().await?; self.writer.close().await?;