Skip to content

feat(puffin): Make Puffin APIs public #1165

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/iceberg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,4 @@ mod utils;
pub mod writer;

mod delete_vector;
mod puffin;
pub mod puffin;
48 changes: 40 additions & 8 deletions crates/iceberg/src/puffin/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,53 @@
use std::collections::HashMap;

/// A serialized form of a "compact" Theta sketch produced by the Apache DataSketches library.
pub(crate) const APACHE_DATASKETCHES_THETA_V1: &str = "apache-datasketches-theta-v1";
pub const APACHE_DATASKETCHES_THETA_V1: &str = "apache-datasketches-theta-v1";

/// The blob
#[derive(Debug, PartialEq, Clone)]
pub(crate) struct Blob {
/// See blob types: https://iceberg.apache.org/puffin-spec/#blob-types
pub struct Blob {
pub(crate) r#type: String,
/// List of field IDs the blob was computed for; the order of items is used to compute sketches stored in the blob.
pub(crate) fields: Vec<i32>,
/// ID of the Iceberg table's snapshot the blob was computed from
pub(crate) snapshot_id: i64,
/// Sequence number of the Iceberg table's snapshot the blob was computed from
pub(crate) sequence_number: i64,
/// The uncompressed blob data
pub(crate) data: Vec<u8>,
/// Arbitrary meta-information about the blob
pub(crate) properties: HashMap<String, String>,
}

impl Blob {
#[inline]
/// See blob types: https://iceberg.apache.org/puffin-spec/#blob-types
pub fn blob_type(&self) -> &str {
&self.r#type
}

#[inline]
/// List of field IDs the blob was computed for; the order of items is used to compute sketches stored in the blob.
pub fn fields(&self) -> &[i32] {
&self.fields
}

#[inline]
/// ID of the Iceberg table's snapshot the blob was computed from
pub fn snapshot_id(&self) -> i64 {
self.snapshot_id
}

#[inline]
/// Sequence number of the Iceberg table's snapshot the blob was computed from
pub fn sequence_number(&self) -> i64 {
self.sequence_number
}

#[inline]
/// The uncompressed blob data
pub fn data(&self) -> &[u8] {
&self.data
}

#[inline]
/// Arbitrary meta-information about the blob
pub fn properties(&self) -> &HashMap<String, String> {
&self.properties
}
}
78 changes: 65 additions & 13 deletions crates/iceberg/src/puffin/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,35 +26,77 @@ use crate::{Error, ErrorKind, Result};

/// Human-readable identification of the application writing the file, along with its version.
/// Example: "Trino version 381"
pub(crate) const CREATED_BY_PROPERTY: &str = "created-by";
pub const CREATED_BY_PROPERTY: &str = "created-by";

/// Metadata about a blob.
/// For more information, see: https://iceberg.apache.org/puffin-spec/#blobmetadata
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
#[serde(rename_all = "kebab-case")]
pub(crate) struct BlobMetadata {
/// See blob types: https://iceberg.apache.org/puffin-spec/#blob-types
pub struct BlobMetadata {
pub(crate) r#type: String,
/// List of field IDs the blob was computed for; the order of items is used to compute sketches stored in the blob.
pub(crate) fields: Vec<i32>,
/// ID of the Iceberg table's snapshot the blob was computed from
pub(crate) snapshot_id: i64,
/// Sequence number of the Iceberg table's snapshot the blob was computed from
pub(crate) sequence_number: i64,
/// The offset in the file where the blob contents start
pub(crate) offset: u64,
/// The length of the blob stored in the file (after compression, if compressed)
pub(crate) length: u64,
/// The compression codec used to compress the data
#[serde(skip_serializing_if = "CompressionCodec::is_none")]
#[serde(default)]
pub(crate) compression_codec: CompressionCodec,
/// Arbitrary meta-information about the blob
#[serde(skip_serializing_if = "HashMap::is_empty")]
#[serde(default)]
pub(crate) properties: HashMap<String, String>,
}

impl BlobMetadata {
#[inline]
/// See blob types: https://iceberg.apache.org/puffin-spec/#blob-types
pub fn blob_type(&self) -> &str {
&self.r#type
}

#[inline]
/// List of field IDs the blob was computed for; the order of items is used to compute sketches stored in the blob.
pub fn fields(&self) -> &[i32] {
&self.fields
}

#[inline]
/// ID of the Iceberg table's snapshot the blob was computed from
pub fn snapshot_id(&self) -> i64 {
self.snapshot_id
}

#[inline]
/// Sequence number of the Iceberg table's snapshot the blob was computed from
pub fn sequence_number(&self) -> i64 {
self.sequence_number
}

#[inline]
/// The offset in the file where the blob contents start
pub fn offset(&self) -> u64 {
self.offset
}

#[inline]
/// The length of the blob stored in the file (after compression, if compressed)
pub fn length(&self) -> u64 {
self.length
}

#[inline]
/// The compression codec used to compress the data
pub fn compression_codec(&self) -> CompressionCodec {
self.compression_codec
}

#[inline]
/// Arbitrary meta-information about the blob
pub fn properties(&self) -> &HashMap<String, String> {
&self.properties
}
}

#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub(crate) enum Flag {
FooterPayloadCompressed = 0,
Expand Down Expand Up @@ -91,10 +133,8 @@ impl Flag {
/// Metadata about a puffin file.
/// For more information, see: https://iceberg.apache.org/puffin-spec/#filemetadata
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
pub(crate) struct FileMetadata {
/// Metadata about blobs in file
pub struct FileMetadata {
pub(crate) blobs: Vec<BlobMetadata>,
/// Arbitrary meta-information, like writer identification/version.
#[serde(skip_serializing_if = "HashMap::is_empty")]
#[serde(default)]
pub(crate) properties: HashMap<String, String>,
Expand Down Expand Up @@ -247,6 +287,18 @@ impl FileMetadata {
FileMetadata::extract_footer_payload_as_str(&footer_bytes, footer_payload_length)?;
FileMetadata::from_json_str(&footer_payload_str)
}

#[inline]
/// Metadata about blobs in file
pub fn blobs(&self) -> &[BlobMetadata] {
&self.blobs
}

#[inline]
/// Arbitrary meta-information, like writer identification/version.
pub fn properties(&self) -> &HashMap<String, String> {
&self.properties
}
}

#[cfg(test)]
Expand Down
12 changes: 9 additions & 3 deletions crates/iceberg/src/puffin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,21 @@
//! Iceberg Puffin implementation.

#![deny(missing_docs)]
// Temporarily allowing this while crate is under active development
#![allow(dead_code)]

mod blob;
pub use blob::{Blob, APACHE_DATASKETCHES_THETA_V1};

mod compression;
pub use compression::CompressionCodec;

mod metadata;
#[cfg(feature = "tokio")]
pub use metadata::{BlobMetadata, FileMetadata, CREATED_BY_PROPERTY};

mod reader;
pub use reader::PuffinReader;

mod writer;
pub use writer::PuffinWriter;

#[cfg(test)]
mod test_utils;
8 changes: 4 additions & 4 deletions crates/iceberg/src/puffin/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,29 @@ use crate::puffin::metadata::{BlobMetadata, FileMetadata};
use crate::Result;

/// Puffin reader
pub(crate) struct PuffinReader {
pub struct PuffinReader {
input_file: InputFile,
file_metadata: OnceCell<FileMetadata>,
}

impl PuffinReader {
/// Returns a new Puffin reader
pub(crate) fn new(input_file: InputFile) -> Self {
pub fn new(input_file: InputFile) -> Self {
Self {
input_file,
file_metadata: OnceCell::new(),
}
}

/// Returns file metadata
pub(crate) async fn file_metadata(&self) -> Result<&FileMetadata> {
pub async fn file_metadata(&self) -> Result<&FileMetadata> {
self.file_metadata
.get_or_try_init(|| FileMetadata::read(&self.input_file))
.await
}

/// Returns blob
pub(crate) async fn blob(&self, blob_metadata: &BlobMetadata) -> Result<Blob> {
pub async fn blob(&self, blob_metadata: &BlobMetadata) -> Result<Blob> {
let file_read = self.input_file.reader().await?;
let start = blob_metadata.offset;
let end = start + blob_metadata.length;
Expand Down
12 changes: 4 additions & 8 deletions crates/iceberg/src/puffin/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::puffin::metadata::{BlobMetadata, FileMetadata, Flag};
use crate::Result;

/// Puffin writer
pub(crate) struct PuffinWriter {
pub struct PuffinWriter {
writer: Box<dyn FileWrite>,
is_header_written: bool,
num_bytes_written: u64,
Expand All @@ -38,7 +38,7 @@ pub(crate) struct PuffinWriter {

impl PuffinWriter {
/// Returns a new Puffin writer
pub(crate) async fn new(
pub async fn new(
output_file: &OutputFile,
properties: HashMap<String, String>,
compress_footer: bool,
Expand All @@ -63,11 +63,7 @@ impl PuffinWriter {
}

/// Adds blob to Puffin file
pub(crate) async fn add(
&mut self,
blob: Blob,
compression_codec: CompressionCodec,
) -> Result<()> {
pub async fn add(&mut self, blob: Blob, compression_codec: CompressionCodec) -> Result<()> {
self.write_header_once().await?;

let offset = self.num_bytes_written;
Expand All @@ -89,7 +85,7 @@ impl PuffinWriter {
}

/// Finalizes the Puffin file
pub(crate) async fn close(mut self) -> Result<()> {
pub async fn close(mut self) -> Result<()> {
self.write_header_once().await?;
self.write_footer().await?;
self.writer.close().await?;
Expand Down
Loading