Skip to content

Commit c5c84d8

Browse files
committed
Add Puffin reader and writer
1 parent f3a571d commit c5c84d8

18 files changed

+1810
-8
lines changed

Diff for: Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ members = [
2323
"crates/iceberg",
2424
"crates/integration_tests",
2525
"crates/integrations/*",
26+
"crates/puffin",
2627
"crates/test_utils",
2728
]
2829
exclude = ["bindings/python"]
@@ -98,3 +99,4 @@ uuid = { version = "1.6.1", features = ["v7"] }
9899
volo-thrift = "0.10"
99100
hive_metastore = "0.1"
100101
tera = "1"
102+
zstd = "0.13.2"

Diff for: crates/iceberg/src/writer/file_writer/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ use crate::Result;
2626

2727
mod parquet_writer;
2828
pub use parquet_writer::{ParquetWriter, ParquetWriterBuilder};
29+
2930
mod track_writer;
31+
pub use track_writer::TrackWriter;
3032

3133
pub mod location_generator;
3234

Diff for: crates/iceberg/src/writer/file_writer/parquet_writer.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
//! The module contains the file writer for parquet file format.
1919
2020
use std::collections::HashMap;
21-
use std::sync::atomic::AtomicI64;
21+
use std::sync::atomic::AtomicU64;
2222
use std::sync::Arc;
2323

2424
use arrow_schema::SchemaRef as ArrowSchemaRef;
@@ -81,7 +81,7 @@ impl<T: LocationGenerator, F: FileNameGenerator> FileWriterBuilder for ParquetWr
8181

8282
async fn build(self) -> crate::Result<Self::R> {
8383
let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?);
84-
let written_size = Arc::new(AtomicI64::new(0));
84+
let written_size = Arc::new(AtomicU64::new(0));
8585
let out_file = self.file_io.new_output(
8686
self.location_generator
8787
.generate_location(&self.file_name_generator.generate_file_name()),
@@ -214,7 +214,7 @@ pub struct ParquetWriter {
214214
schema: SchemaRef,
215215
out_file: OutputFile,
216216
writer: AsyncArrowWriter<AsyncFileWriter<TrackWriter>>,
217-
written_size: Arc<AtomicI64>,
217+
written_size: Arc<AtomicU64>,
218218
current_row_num: usize,
219219
}
220220

Diff for: crates/iceberg/src/writer/file_writer/track_writer.rs

+11-5
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::sync::atomic::AtomicI64;
18+
use std::sync::atomic::AtomicU64;
1919
use std::sync::Arc;
2020

2121
use bytes::Bytes;
@@ -24,18 +24,24 @@ use crate::io::FileWrite;
2424
use crate::Result;
2525

2626
/// `TrackWriter` is used to track the written size.
27-
pub(crate) struct TrackWriter {
27+
pub struct TrackWriter {
2828
inner: Box<dyn FileWrite>,
29-
written_size: Arc<AtomicI64>,
29+
written_size: Arc<AtomicU64>,
3030
}
3131

3232
impl TrackWriter {
33-
pub fn new(writer: Box<dyn FileWrite>, written_size: Arc<AtomicI64>) -> Self {
33+
/// Create new writer
34+
pub fn new(writer: Box<dyn FileWrite>, written_size: Arc<AtomicU64>) -> Self {
3435
Self {
3536
inner: writer,
3637
written_size,
3738
}
3839
}
40+
41+
/// Number of bytes written so far
42+
pub fn bytes_written(&self) -> u64 {
43+
self.written_size.load(std::sync::atomic::Ordering::SeqCst)
44+
}
3945
}
4046

4147
#[async_trait::async_trait]
@@ -44,7 +50,7 @@ impl FileWrite for TrackWriter {
4450
let size = bs.len();
4551
self.inner.write(bs).await.map(|v| {
4652
self.written_size
47-
.fetch_add(size as i64, std::sync::atomic::Ordering::Relaxed);
53+
.fetch_add(size as u64, std::sync::atomic::Ordering::Relaxed);
4854
v
4955
})
5056
}

Diff for: crates/puffin/Cargo.toml

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
[package]
19+
name = "iceberg-puffin"
20+
version = { workspace = true }
21+
edition = { workspace = true }
22+
homepage = { workspace = true }
23+
rust-version = { workspace = true }
24+
25+
categories = ["database"]
26+
description = "Apache Iceberg Puffin"
27+
repository = { workspace = true }
28+
license = { workspace = true }
29+
keywords = ["iceberg", "puffin"]
30+
31+
[dependencies]
32+
bytes = { workspace = true }
33+
iceberg = { workspace = true }
34+
once_cell = { workspace = true }
35+
serde = { workspace = true }
36+
serde_json = { workspace = true }
37+
zstd = { workspace = true }
38+
39+
[dev-dependencies]
40+
tempfile = { workspace = true }
41+
tokio = { workspace = true }

Diff for: crates/puffin/src/blob.rs

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::collections::HashMap;
19+
20+
/// A serialized form of a "compact" Theta sketch produced by the Apache DataSketches library.
21+
pub const APACHE_DATASKETCHES_THETA_V1: &str = "apache-datasketches-theta-v1";
22+
23+
/// The blob
24+
#[derive(Debug, PartialEq, Clone)]
25+
pub struct Blob {
26+
/// See blob types: https://iceberg.apache.org/puffin-spec/#blob-types
27+
pub r#type: String,
28+
/// List of field IDs the blob was computed for; the order of items is used to compute sketches stored in the blob.
29+
pub input_fields: Vec<i32>,
30+
/// ID of the Iceberg table's snapshot the blob was computed from
31+
pub snapshot_id: i64,
32+
/// Sequence number of the Iceberg table's snapshot the blob was computed from
33+
pub sequence_number: i64,
34+
/// The actual blob data
35+
pub data: Vec<u8>,
36+
/// Arbitrary meta-information about the blob
37+
pub properties: HashMap<String, String>,
38+
}

Diff for: crates/puffin/src/compression.rs

+123
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use iceberg::{Error, ErrorKind, Result};
19+
use serde::{Deserialize, Serialize};
20+
21+
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone, Copy)]
22+
#[serde(rename_all = "lowercase")]
23+
#[derive(Default)]
24+
/// Data compression formats
25+
pub enum CompressionCodec {
26+
#[default]
27+
/// No compression
28+
None,
29+
/// LZ4 single compression frame with content size present
30+
Lz4,
31+
/// Zstandard single compression frame with content size present
32+
Zstd,
33+
}
34+
35+
impl CompressionCodec {
36+
pub(crate) fn decompress(&self, bytes: Vec<u8>) -> Result<Vec<u8>> {
37+
match self {
38+
CompressionCodec::None => Ok(bytes),
39+
CompressionCodec::Lz4 => Err(Error::new(
40+
ErrorKind::FeatureUnsupported,
41+
"LZ4 decompression is not supported currently",
42+
)),
43+
CompressionCodec::Zstd => {
44+
let decompressed = zstd::stream::decode_all(&bytes[..])?;
45+
Ok(decompressed)
46+
}
47+
}
48+
}
49+
50+
pub(crate) fn compress(&self, bytes: Vec<u8>) -> Result<Vec<u8>> {
51+
match self {
52+
CompressionCodec::None => Ok(bytes),
53+
CompressionCodec::Lz4 => Err(Error::new(
54+
ErrorKind::FeatureUnsupported,
55+
"LZ4 compression is not supported currently",
56+
)),
57+
CompressionCodec::Zstd => {
58+
let writer = Vec::<u8>::new();
59+
let mut encoder = zstd::stream::Encoder::new(writer, 3)?;
60+
encoder.include_checksum(true)?;
61+
encoder.set_pledged_src_size(Some(bytes.len().try_into()?))?;
62+
std::io::copy(&mut &bytes[..], &mut encoder)?;
63+
let compressed = encoder.finish()?;
64+
Ok(compressed)
65+
}
66+
}
67+
}
68+
69+
pub(crate) fn is_none(&self) -> bool {
70+
matches!(self, CompressionCodec::None)
71+
}
72+
}
73+
74+
#[cfg(test)]
75+
mod tests {
76+
use crate::compression::CompressionCodec;
77+
78+
#[tokio::test]
79+
async fn test_compression_codec_none() {
80+
let compression_codec = CompressionCodec::None;
81+
let bytes_vec = [0_u8; 100].to_vec();
82+
83+
let compressed = compression_codec.compress(bytes_vec.clone()).unwrap();
84+
assert_eq!(bytes_vec, compressed);
85+
86+
let decompressed = compression_codec.decompress(compressed.clone()).unwrap();
87+
assert_eq!(compressed, decompressed)
88+
}
89+
90+
#[tokio::test]
91+
async fn test_compression_codec_lz4() {
92+
let compression_codec = CompressionCodec::Lz4;
93+
let bytes_vec = [0_u8; 100].to_vec();
94+
95+
assert_eq!(
96+
compression_codec
97+
.compress(bytes_vec.clone())
98+
.unwrap_err()
99+
.to_string(),
100+
"FeatureUnsupported => LZ4 compression is not supported currently",
101+
);
102+
103+
assert_eq!(
104+
compression_codec
105+
.decompress(bytes_vec.clone())
106+
.unwrap_err()
107+
.to_string(),
108+
"FeatureUnsupported => LZ4 decompression is not supported currently",
109+
)
110+
}
111+
112+
#[tokio::test]
113+
async fn test_compression_codec_zstd() {
114+
let compression_codec = CompressionCodec::Zstd;
115+
let bytes_vec = [0_u8; 100].to_vec();
116+
117+
let compressed = compression_codec.compress(bytes_vec.clone()).unwrap();
118+
assert!(compressed.len() < bytes_vec.len());
119+
120+
let decompressed = compression_codec.decompress(compressed.clone()).unwrap();
121+
assert_eq!(decompressed, bytes_vec)
122+
}
123+
}

Diff for: crates/puffin/src/lib.rs

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Iceberg Puffin implementation.
19+
20+
#![deny(missing_docs)]
21+
22+
mod blob;
23+
pub use blob::{Blob, APACHE_DATASKETCHES_THETA_V1};
24+
25+
mod compression;
26+
pub use compression::CompressionCodec;
27+
28+
mod metadata;
29+
pub use metadata::{BlobMetadata, FileMetadata, CREATED_BY_PROPERTY};
30+
31+
mod reader;
32+
pub use reader::PuffinReader;
33+
34+
#[cfg(test)]
35+
mod test_utils;
36+
37+
mod writer;
38+
pub use writer::PuffinWriter;

0 commit comments

Comments
 (0)