From a0fd83998dd5acb2e89514f895920cf87f7819c6 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Thu, 25 Apr 2024 00:54:11 +0800 Subject: [PATCH 01/14] support append data file and add e2e test --- Cargo.toml | 11 +- crates/e2e_test/Cargo.toml | 37 ++ crates/e2e_test/testdata/docker-compose.yaml | 59 +++ .../e2e_test/tests/append_data_file_test.rs | 212 +++++++++++ crates/e2e_test/tests/conflict_commit_test.rs | 198 ++++++++++ crates/iceberg/src/spec/manifest.rs | 6 + crates/iceberg/src/spec/snapshot.rs | 20 + crates/iceberg/src/transaction.rs | 360 +++++++++++++++++- 8 files changed, 894 insertions(+), 9 deletions(-) create mode 100644 crates/e2e_test/Cargo.toml create mode 100644 crates/e2e_test/testdata/docker-compose.yaml create mode 100644 crates/e2e_test/tests/append_data_file_test.rs create mode 100644 crates/e2e_test/tests/conflict_commit_test.rs diff --git a/Cargo.toml b/Cargo.toml index efff593b3..b7909cb61 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,11 +18,12 @@ [workspace] resolver = "2" members = [ - "crates/catalog/*", - "crates/examples", - "crates/iceberg", - "crates/integrations/*", - "crates/test_utils", + "crates/catalog/*", + "crates/e2e_test", + "crates/examples", + "crates/iceberg", + "crates/integrations/*", + "crates/test_utils", ] exclude = ["bindings/python"] diff --git a/crates/e2e_test/Cargo.toml b/crates/e2e_test/Cargo.toml new file mode 100644 index 000000000..e72ffe623 --- /dev/null +++ b/crates/e2e_test/Cargo.toml @@ -0,0 +1,37 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iceberg-e2e_test" +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +repository = { workspace = true } +license = { workspace = true } +rust-version = { workspace = true } + +[dependencies] +arrow-array = { workspace = true } +arrow-schema = { workspace = true } +parquet = { workspace = true } +futures = { workspace = true } +iceberg = { workspace = true } +iceberg-catalog-rest = { workspace = true } +iceberg_test_utils = { path = "../test_utils", features = ["tests"] } +tokio = { version = "1", features = ["full"] } +port_scanner = { workspace = true } +log = { workspace = true } \ No newline at end of file diff --git a/crates/e2e_test/testdata/docker-compose.yaml b/crates/e2e_test/testdata/docker-compose.yaml new file mode 100644 index 000000000..0152a22ca --- /dev/null +++ b/crates/e2e_test/testdata/docker-compose.yaml @@ -0,0 +1,59 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +version: '3.8' + +services: + rest: + image: tabulario/iceberg-rest:0.10.0 + environment: + - AWS_ACCESS_KEY_ID=admin + - AWS_SECRET_ACCESS_KEY=password + - AWS_REGION=us-east-1 + - CATALOG_CATOLOG__IMPL=org.apache.iceberg.jdbc.JdbcCatalog + - CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory + - CATALOG_WAREHOUSE=s3://icebergdata/demo + - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO + - CATALOG_S3_ENDPOINT=http://minio:9000 + depends_on: + - minio + links: + - minio:icebergdata.minio + expose: + - 8181 + + minio: + image: minio/minio:RELEASE.2024-03-07T00-43-48Z + environment: + - MINIO_ROOT_USER=admin + - MINIO_ROOT_PASSWORD=password + - MINIO_DOMAIN=minio + expose: + - 9001 + - 9000 + command: [ "server", "/data", "--console-address", ":9001" ] + + mc: + depends_on: + - minio + image: minio/mc:RELEASE.2024-03-07T00-31-49Z + environment: + - AWS_ACCESS_KEY_ID=admin + - AWS_SECRET_ACCESS_KEY=password + - AWS_REGION=us-east-1 + entrypoint: > + /bin/sh -c " until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done; /usr/bin/mc rm -r --force minio/icebergdata; /usr/bin/mc mb minio/icebergdata; /usr/bin/mc policy set public minio/icebergdata; tail -f /dev/null " diff --git a/crates/e2e_test/tests/append_data_file_test.rs b/crates/e2e_test/tests/append_data_file_test.rs new file mode 100644 index 000000000..7b2490c60 --- /dev/null +++ b/crates/e2e_test/tests/append_data_file_test.rs @@ -0,0 +1,212 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration tests for rest catalog. + +use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; +use futures::TryStreamExt; +use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; +use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; +use iceberg::transaction::Transaction; +use iceberg::writer::base_writer::data_file_writer::{DataFileWriterBuilder, DataFileWriterConfig}; +use iceberg::writer::file_writer::location_generator::{ + DefaultFileNameGenerator, DefaultLocationGenerator, +}; +use iceberg::writer::file_writer::ParquetWriterBuilder; +use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; +use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation}; +use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig}; +use iceberg_test_utils::docker::DockerCompose; +use iceberg_test_utils::{normalize_test_name, set_up}; +use parquet::file::properties::WriterProperties; +use port_scanner::scan_port_addr; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::time::sleep; + +const REST_CATALOG_PORT: u16 = 8181; + +struct TestFixture { + _docker_compose: DockerCompose, + rest_catalog: RestCatalog, +} + +async fn set_test_fixture(func: &str) -> TestFixture { + set_up(); + let docker_compose = DockerCompose::new( + normalize_test_name(format!("{}_{func}", module_path!())), + format!("{}/testdata", env!("CARGO_MANIFEST_DIR")), + ); + + // Start docker compose + docker_compose.run(); + + let rest_catalog_ip = docker_compose.get_container_ip("rest"); + + let read_port = format!("{}:{}", rest_catalog_ip, REST_CATALOG_PORT); + loop { + if !scan_port_addr(&read_port) { + log::info!("Waiting for 1s rest catalog to ready..."); + sleep(std::time::Duration::from_millis(1000)).await; + } else { + break; + } + } + + let container_ip = docker_compose.get_container_ip("minio"); + let read_port = format!("{}:{}", container_ip, 9000); + + let config = RestCatalogConfig::builder() + .uri(format!("http://{}:{}", rest_catalog_ip, REST_CATALOG_PORT)) + .props(HashMap::from([ + (S3_ENDPOINT.to_string(), format!("http://{}", read_port)), + (S3_ACCESS_KEY_ID.to_string(), "admin".to_string()), + (S3_SECRET_ACCESS_KEY.to_string(), "password".to_string()), + (S3_REGION.to_string(), "us-east-1".to_string()), + ])) + .build(); + let rest_catalog = RestCatalog::new(config).await.unwrap(); + + TestFixture { + _docker_compose: docker_compose, + rest_catalog, + } +} + +#[tokio::test] +async fn test_append_data_file() { + let fixture = set_test_fixture("test_create_table").await; + + let ns = Namespace::with_properties( + NamespaceIdent::from_strs(["apple", "ios"]).unwrap(), + HashMap::from([ + ("owner".to_string(), "ray".to_string()), + ("community".to_string(), "apache".to_string()), + ]), + ); + + fixture + .rest_catalog + .create_namespace(ns.name(), ns.properties().clone()) + .await + .unwrap(); + + let schema = Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![2]) + .with_fields(vec![ + NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(), + ]) + .build() + .unwrap(); + + let table_creation = TableCreation::builder() + .name("t1".to_string()) + .schema(schema.clone()) + .build(); + + let table = fixture + .rest_catalog + .create_table(ns.name(), table_creation) + .await + .unwrap(); + + // Create the writer and write the data + let schema: Arc = Arc::new( + table + .metadata() + .current_schema() + .as_ref() + .try_into() + .unwrap(), + ); + let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap(); + let file_name_generator = DefaultFileNameGenerator::new( + "test".to_string(), + None, + iceberg::spec::DataFileFormat::Parquet, + ); + let parquet_writer_builder = ParquetWriterBuilder::new( + WriterProperties::default(), + schema.clone(), + table.file_io().clone(), + location_generator.clone(), + file_name_generator.clone(), + ); + let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder); + let mut data_file_writer = data_file_writer_builder + .build(DataFileWriterConfig::new(None)) + .await + .unwrap(); + let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]); + let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]); + let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(col1) as ArrayRef, + Arc::new(col2) as ArrayRef, + Arc::new(col3) as ArrayRef, + ], + ) + .unwrap(); + data_file_writer.write(batch.clone()).await.unwrap(); + let data_file = data_file_writer.close().await.unwrap(); + + // commit result + let tx = Transaction::new(&table); + let mut merge_action = tx.merge_snapshot(None, vec![]).unwrap(); + merge_action.add_data_files(data_file.clone()).unwrap(); + let tx = merge_action.apply().await.unwrap(); + let table = tx.commit(&fixture.rest_catalog).await.unwrap(); + + // check result + let batch_stream = table + .scan() + .select_all() + .build() + .unwrap() + .to_arrow() + .await + .unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + assert_eq!(batches.len(), 1); + assert_eq!(batches[0], batch); + + // commit result again + let tx = Transaction::new(&table); + let mut merge_action = tx.merge_snapshot(None, vec![]).unwrap(); + merge_action.add_data_files(data_file.clone()).unwrap(); + let tx = merge_action.apply().await.unwrap(); + let table = tx.commit(&fixture.rest_catalog).await.unwrap(); + + // check result again + let batch_stream = table + .scan() + .select_all() + .build() + .unwrap() + .to_arrow() + .await + .unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + assert_eq!(batches.len(), 2); + assert_eq!(batches[0], batch); + assert_eq!(batches[1], batch); +} diff --git a/crates/e2e_test/tests/conflict_commit_test.rs b/crates/e2e_test/tests/conflict_commit_test.rs new file mode 100644 index 000000000..64b9bbbbd --- /dev/null +++ b/crates/e2e_test/tests/conflict_commit_test.rs @@ -0,0 +1,198 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration tests for rest catalog. + +use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; +use futures::TryStreamExt; +use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; +use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; +use iceberg::transaction::Transaction; +use iceberg::writer::base_writer::data_file_writer::{DataFileWriterBuilder, DataFileWriterConfig}; +use iceberg::writer::file_writer::location_generator::{ + DefaultFileNameGenerator, DefaultLocationGenerator, +}; +use iceberg::writer::file_writer::ParquetWriterBuilder; +use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; +use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation}; +use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig}; +use iceberg_test_utils::docker::DockerCompose; +use iceberg_test_utils::{normalize_test_name, set_up}; +use parquet::file::properties::WriterProperties; +use port_scanner::scan_port_addr; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::time::sleep; + +const REST_CATALOG_PORT: u16 = 8181; + +struct TestFixture { + _docker_compose: DockerCompose, + rest_catalog: RestCatalog, +} + +async fn set_test_fixture(func: &str) -> TestFixture { + set_up(); + let docker_compose = DockerCompose::new( + normalize_test_name(format!("{}_{func}", module_path!())), + format!("{}/testdata", env!("CARGO_MANIFEST_DIR")), + ); + + // Start docker compose + docker_compose.run(); + + let rest_catalog_ip = docker_compose.get_container_ip("rest"); + + let read_port = format!("{}:{}", rest_catalog_ip, REST_CATALOG_PORT); + loop { + if !scan_port_addr(&read_port) { + log::info!("Waiting for 1s rest catalog to ready..."); + sleep(std::time::Duration::from_millis(1000)).await; + } else { + break; + } + } + + let container_ip = docker_compose.get_container_ip("minio"); + let read_port = format!("{}:{}", container_ip, 9000); + + let config = RestCatalogConfig::builder() + .uri(format!("http://{}:{}", rest_catalog_ip, REST_CATALOG_PORT)) + .props(HashMap::from([ + (S3_ENDPOINT.to_string(), format!("http://{}", read_port)), + (S3_ACCESS_KEY_ID.to_string(), "admin".to_string()), + (S3_SECRET_ACCESS_KEY.to_string(), "password".to_string()), + (S3_REGION.to_string(), "us-east-1".to_string()), + ])) + .build(); + let rest_catalog = RestCatalog::new(config).await.unwrap(); + + TestFixture { + _docker_compose: docker_compose, + rest_catalog, + } +} + +#[tokio::test] +async fn test_append_data_file_conflict() { + let fixture = set_test_fixture("test_create_table").await; + + let ns = Namespace::with_properties( + NamespaceIdent::from_strs(["apple", "ios"]).unwrap(), + HashMap::from([ + ("owner".to_string(), "ray".to_string()), + ("community".to_string(), "apache".to_string()), + ]), + ); + + fixture + .rest_catalog + .create_namespace(ns.name(), ns.properties().clone()) + .await + .unwrap(); + + let schema = Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![2]) + .with_fields(vec![ + NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(), + ]) + .build() + .unwrap(); + + let table_creation = TableCreation::builder() + .name("t1".to_string()) + .schema(schema.clone()) + .build(); + + let table = fixture + .rest_catalog + .create_table(ns.name(), table_creation) + .await + .unwrap(); + + // Create the writer and write the data + let schema: Arc = Arc::new( + table + .metadata() + .current_schema() + .as_ref() + .try_into() + .unwrap(), + ); + let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap(); + let file_name_generator = DefaultFileNameGenerator::new( + "test".to_string(), + None, + iceberg::spec::DataFileFormat::Parquet, + ); + let parquet_writer_builder = ParquetWriterBuilder::new( + WriterProperties::default(), + schema.clone(), + table.file_io().clone(), + location_generator.clone(), + file_name_generator.clone(), + ); + let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder); + let mut data_file_writer = data_file_writer_builder + .build(DataFileWriterConfig::new(None)) + .await + .unwrap(); + let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]); + let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]); + let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(col1) as ArrayRef, + Arc::new(col2) as ArrayRef, + Arc::new(col3) as ArrayRef, + ], + ) + .unwrap(); + data_file_writer.write(batch.clone()).await.unwrap(); + let data_file = data_file_writer.close().await.unwrap(); + + // start two transcation and commit one of them + let tx1 = Transaction::new(&table); + let mut merge_action = tx1.merge_snapshot(None, vec![]).unwrap(); + merge_action.add_data_files(data_file.clone()).unwrap(); + let tx1 = merge_action.apply().await.unwrap(); + let tx2 = Transaction::new(&table); + let mut merge_action = tx2.merge_snapshot(None, vec![]).unwrap(); + merge_action.add_data_files(data_file.clone()).unwrap(); + let tx2 = merge_action.apply().await.unwrap(); + let table = tx2.commit(&fixture.rest_catalog).await.unwrap(); + + // check result + let batch_stream = table + .scan() + .select_all() + .build() + .unwrap() + .to_arrow() + .await + .unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + assert_eq!(batches.len(), 1); + assert_eq!(batches[0], batch); + + // another commit should fail + assert!(tx1.commit(&fixture.rest_catalog).await.is_err()); +} diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index 085200b7c..327fc29e0 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -900,6 +900,12 @@ impl ManifestEntry { } } + /// Snapshot id + #[inline] + pub fn snapshot_id(&self) -> Option { + self.snapshot_id + } + /// Data sequence number. #[inline] pub fn sequence_number(&self) -> Option { diff --git a/crates/iceberg/src/spec/snapshot.rs b/crates/iceberg/src/spec/snapshot.rs index f42e736ea..09fa7dc87 100644 --- a/crates/iceberg/src/spec/snapshot.rs +++ b/crates/iceberg/src/spec/snapshot.rs @@ -365,6 +365,26 @@ pub enum SnapshotRetention { }, } +impl SnapshotRetention { + /// Create a new branch retention policy + pub fn branch( + min_snapshots_to_keep: Option, + max_snapshot_age_ms: Option, + max_ref_age_ms: Option, + ) -> Self { + SnapshotRetention::Branch { + min_snapshots_to_keep, + max_snapshot_age_ms, + max_ref_age_ms, + } + } + + /// Create a new tag retention policy + pub fn tag(max_ref_age_ms: i64) -> Self { + SnapshotRetention::Tag { max_ref_age_ms } + } +} + #[cfg(test)] mod tests { use std::collections::HashMap; diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index f29cf5122..382616f19 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -22,11 +22,20 @@ use std::collections::HashMap; use std::mem::discriminant; use crate::error::Result; -use crate::spec::{FormatVersion, NullOrder, SortDirection, SortField, SortOrder, Transform}; +use crate::spec::{ + DataFile, DataFileFormat, FormatVersion, Manifest, ManifestEntry, ManifestFile, + ManifestListWriter, ManifestMetadata, ManifestWriter, NullOrder, PartitionSpec, Schema, + Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Summary, + Transform, +}; use crate::table::Table; use crate::TableUpdate::UpgradeFormatVersion; use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate}; +const INITIAL_SEQUENCE_NUMBER: i64 = 0; +const META_ROOT_PATH: &str = "metadata"; +const MAIN_BRACNH: &str = "main"; + /// Table transaction. pub struct Transaction<'a> { table: &'a Table, @@ -96,6 +105,42 @@ impl<'a> Transaction<'a> { Ok(self) } + /// Creates a merge snapshot action. + pub fn merge_snapshot( + self, + commit_uuid: Option, + key_metadata: Vec, + ) -> Result> { + let parent_snapshot_id = self + .table + .metadata() + .current_snapshot() + .map(|s| s.snapshot_id()); + let snapshot_id = parent_snapshot_id.map(|id| id + 1).unwrap_or(0); + let schema = self.table.metadata().current_schema().as_ref().clone(); + let schema_id = schema.schema_id(); + let format_version = self.table.metadata().format_version(); + let partition_spec = self + .table + .metadata() + .default_partition_spec() + .map(|spec| spec.as_ref().clone()) + .unwrap_or_default(); + let commit_uuid = commit_uuid.unwrap_or_else(|| uuid::Uuid::new_v4().to_string()); + + MergeSnapshotAction::new( + self, + parent_snapshot_id, + snapshot_id, + schema, + schema_id, + format_version, + partition_spec, + key_metadata, + commit_uuid, + ) + } + /// Creates replace sort order action. pub fn replace_sort_order(self) -> ReplaceSortOrderAction<'a> { ReplaceSortOrderAction { @@ -122,6 +167,218 @@ impl<'a> Transaction<'a> { } } +/// Transaction action for merging snapshot. +pub struct MergeSnapshotAction<'a> { + tx: Transaction<'a>, + + parent_snapshot_id: Option, + snapshot_id: i64, + schema: Schema, + schema_id: i32, + format_version: FormatVersion, + partition_spec: PartitionSpec, + key_metadata: Vec, + + commit_uuid: String, + manifest_id: i64, + + appended_data_files: Vec, +} + +impl<'a> MergeSnapshotAction<'a> { + #[allow(clippy::too_many_arguments)] + pub(crate) fn new( + tx: Transaction<'a>, + parent_snapshot_id: Option, + snapshot_id: i64, + schema: Schema, + schema_id: i32, + format_version: FormatVersion, + partition_spec: PartitionSpec, + key_metadata: Vec, + commit_uuid: String, + ) -> Result { + Ok(Self { + tx, + parent_snapshot_id, + snapshot_id, + schema, + schema_id, + format_version, + partition_spec, + key_metadata, + commit_uuid, + manifest_id: 0, + appended_data_files: vec![], + }) + } + + /// Add data files to the snapshot. + pub fn add_data_files( + &mut self, + data_file: impl IntoIterator, + ) -> Result<&mut Self> { + self.appended_data_files.extend(data_file); + Ok(self) + } + + fn generate_manifest_file_path(&mut self) -> String { + let manifest_id = self.manifest_id; + self.manifest_id += 1; + format!( + "{}/{}/{}-m{}.{}", + self.tx.table.metadata().location(), + META_ROOT_PATH, + &self.commit_uuid, + manifest_id, + DataFileFormat::Avro + ) + } + + fn generate_manifest_list_file_path(&self, next_seq_num: i64) -> String { + format!( + "{}/{}/snap-{}-{}-{}.{}", + self.tx.table.metadata().location(), + META_ROOT_PATH, + self.snapshot_id, + next_seq_num, + self.commit_uuid, + DataFileFormat::Avro + ) + } + + // # TODO: + // This method act like fast append now, because we don't support overwrite and partial overwrite. + // In the future, this method should be modify when we support them. + async fn manifest_from_parent_snapshot(&self) -> Result> { + if let Some(snapshot) = self.tx.table.metadata().current_snapshot() { + let manifest_list = snapshot + .load_manifest_list(self.tx.table.file_io(), &self.tx.table.metadata_ref()) + .await?; + let mut manifest_files = Vec::with_capacity(manifest_list.entries().len()); + for entry in manifest_list.entries() { + // From: https://github.com/apache/iceberg-python/blob/659a951d6397ab280cae80206fe6e8e4be2d3738/pyiceberg/table/__init__.py#L2921 + // Why we need this? + if entry.added_snapshot_id == self.snapshot_id { + continue; + } + let manifest = entry.load_manifest(self.tx.table.file_io()).await?; + // Skip manifest with all delete entries. + if manifest.entries().iter().all(|entry| !entry.is_alive()) { + continue; + } + manifest_files.push(entry.clone()); + } + Ok(manifest_files) + } else { + Ok(vec![]) + } + } + + // Write manifest file for added data files and return the ManifestFile for ManifestList. + async fn manifest_for_data_file(&mut self) -> Result { + let appended_data_files = std::mem::take(&mut self.appended_data_files); + let manifest_entries = appended_data_files + .into_iter() + .map(|data_file| { + ManifestEntry::builder() + .status(crate::spec::ManifestStatus::Added) + .snapshot_id(self.snapshot_id) + .data_file(data_file) + .build() + }) + .collect(); + let manifest_meta = ManifestMetadata::builder() + .schema(self.schema.clone()) + .schema_id(self.schema_id) + .format_version(self.format_version) + .partition_spec(self.partition_spec.clone()) + .content(crate::spec::ManifestContentType::Data) + .build(); + let manifest = Manifest::new(manifest_meta, manifest_entries); + let writer = ManifestWriter::new( + self.tx + .table + .file_io() + .new_output(self.generate_manifest_file_path())?, + self.snapshot_id, + self.key_metadata.clone(), + ); + writer.write(manifest).await + } + + fn summary(&self) -> Summary { + Summary { + operation: crate::spec::Operation::Append, + other: HashMap::new(), + } + } + + /// Finished building the action and apply it to the transaction. + pub async fn apply(mut self) -> Result> { + let next_seq_num = if self.format_version as u8 > 1u8 { + self.tx.table.metadata().last_sequence_number() + 1 + } else { + INITIAL_SEQUENCE_NUMBER + }; + let commit_ts = chrono::Utc::now().timestamp_millis(); + let summary = self.summary(); + + let manifest = self.manifest_for_data_file().await?; + + let manifest_list_path = self.generate_manifest_list_file_path(next_seq_num); + let mut manifest_list_writer = ManifestListWriter::v2( + self.tx + .table + .file_io() + .new_output(self.generate_manifest_list_file_path(next_seq_num))?, + self.snapshot_id, + // # TODO + // Should we use `0` here for default parent snapshot id? + self.parent_snapshot_id.unwrap_or_default(), + next_seq_num, + ); + manifest_list_writer + .add_manifests(self.manifest_from_parent_snapshot().await?.into_iter())?; + manifest_list_writer.add_manifests(vec![manifest].into_iter())?; + manifest_list_writer.close().await?; + + let new_snapshot = Snapshot::builder() + .with_manifest_list(manifest_list_path) + .with_snapshot_id(self.snapshot_id) + .with_parent_snapshot_id(self.parent_snapshot_id) + .with_sequence_number(next_seq_num) + .with_summary(summary) + .with_schema_id(self.schema_id) + .with_timestamp_ms(commit_ts) + .build(); + + let new_snapshot_id = new_snapshot.snapshot_id(); + self.tx.append_updates(vec![ + TableUpdate::AddSnapshot { + snapshot: new_snapshot, + }, + TableUpdate::SetSnapshotRef { + ref_name: MAIN_BRACNH.to_string(), + reference: SnapshotReference::new( + new_snapshot_id, + SnapshotRetention::branch(None, None, None), + ), + }, + ])?; + self.tx.append_requirements(vec![ + TableRequirement::UuidMatch { + uuid: self.tx.table.metadata().uuid(), + }, + TableRequirement::RefSnapshotIdMatch { + r#ref: MAIN_BRACNH.to_string(), + snapshot_id: self.parent_snapshot_id, + }, + ])?; + Ok(self.tx) + } +} + /// Transaction action for replacing sort order. pub struct ReplaceSortOrderAction<'a> { tx: Transaction<'a>, @@ -206,7 +463,7 @@ mod tests { use crate::io::FileIO; use crate::spec::{FormatVersion, TableMetadata}; use crate::table::Table; - use crate::transaction::Transaction; + use crate::transaction::{Transaction, MAIN_BRACNH}; use crate::{TableIdent, TableRequirement, TableUpdate}; fn make_v1_table() -> Table { @@ -223,7 +480,7 @@ mod tests { .metadata(resp) .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string()) .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap()) - .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap()) + .file_io(FileIOBuilder::new("memory").build().unwrap()) .build() .unwrap() } @@ -242,7 +499,25 @@ mod tests { .metadata(resp) .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string()) .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap()) - .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap()) + .file_io(FileIOBuilder::new("memory").build().unwrap()) + .build() + } + + fn make_v2_minimal_table() -> Table { + let file = File::open(format!( + "{}/testdata/table_metadata/{}", + env!("CARGO_MANIFEST_DIR"), + "TableMetadataV2ValidMinimal.json" + )) + .unwrap(); + let reader = BufReader::new(file); + let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap(); + + Table::builder() + .metadata(resp) + .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string()) + .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap()) + .file_io(FileIOBuilder::new("memory").build().unwrap()) .build() .unwrap() } @@ -347,6 +622,83 @@ mod tests { ); } + #[tokio::test] + async fn test_merge_snapshot_actio() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let data_file = DataFileBuilder::default() + .content(DataContentType::Data) + .file_path("test/3.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition(Struct::from_iter([Some(Literal::long(300))])) + .build() + .unwrap(); + let mut action = tx.merge_snapshot(None, vec![]).unwrap(); + action.add_data_files(vec![data_file.clone()]).unwrap(); + let tx = action.apply().await.unwrap(); + + // check updates and requirements + let new_snapshot_id = tx + .table + .metadata() + .current_snapshot_id + .map(|id| id + 1) + .unwrap_or(0); + assert!( + matches!(&tx.updates[0], TableUpdate::AddSnapshot { snapshot } if snapshot.snapshot_id() == new_snapshot_id) + && matches!(&tx.updates[1], TableUpdate::SetSnapshotRef { reference,ref_name } if reference.snapshot_id == new_snapshot_id && ref_name == MAIN_BRACNH), + ); + assert_eq!( + vec![ + TableRequirement::UuidMatch { + uuid: tx.table.metadata().uuid() + }, + TableRequirement::RefSnapshotIdMatch { + r#ref: MAIN_BRACNH.to_string(), + snapshot_id: tx.table.metadata().current_snapshot_id + } + ], + tx.requirements + ); + + // check manifest list + let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &tx.updates[0] { + snapshot + } else { + unreachable!() + }; + let manifest_list = new_snapshot + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + assert_eq!(1, manifest_list.entries().len()); + assert_eq!( + manifest_list.entries()[0].sequence_number, + new_snapshot.sequence_number() + ); + + // check manifset + let manifest = manifest_list.entries()[0] + .load_manifest(table.file_io()) + .await + .unwrap(); + assert_eq!(1, manifest.entries().len()); + assert_eq!( + new_snapshot.sequence_number(), + manifest.entries()[0] + .sequence_number() + .expect("Inherit sequence number by load manifest") + ); + assert_eq!( + new_snapshot.snapshot_id(), + manifest.entries()[0].snapshot_id().unwrap() + ); + assert_eq!(data_file, *manifest.entries()[0].data_file()); + } + #[test] fn test_do_same_update_in_same_transaction() { let table = make_v2_table(); From 5960d29f0de1a257676eebe1924f44a173d7bb5f Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Thu, 25 Apr 2024 01:17:50 +0800 Subject: [PATCH 02/14] fix typos --- crates/e2e_test/tests/conflict_commit_test.rs | 2 +- crates/iceberg/src/transaction.rs | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/e2e_test/tests/conflict_commit_test.rs b/crates/e2e_test/tests/conflict_commit_test.rs index 64b9bbbbd..c4590fbb1 100644 --- a/crates/e2e_test/tests/conflict_commit_test.rs +++ b/crates/e2e_test/tests/conflict_commit_test.rs @@ -169,7 +169,7 @@ async fn test_append_data_file_conflict() { data_file_writer.write(batch.clone()).await.unwrap(); let data_file = data_file_writer.close().await.unwrap(); - // start two transcation and commit one of them + // start two transaction and commit one of them let tx1 = Transaction::new(&table); let mut merge_action = tx1.merge_snapshot(None, vec![]).unwrap(); merge_action.add_data_files(data_file.clone()).unwrap(); diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index 382616f19..b6b7a538a 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -34,7 +34,7 @@ use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdat const INITIAL_SEQUENCE_NUMBER: i64 = 0; const META_ROOT_PATH: &str = "metadata"; -const MAIN_BRACNH: &str = "main"; +const MAIN_BRANCH: &str = "main"; /// Table transaction. pub struct Transaction<'a> { @@ -359,7 +359,7 @@ impl<'a> MergeSnapshotAction<'a> { snapshot: new_snapshot, }, TableUpdate::SetSnapshotRef { - ref_name: MAIN_BRACNH.to_string(), + ref_name: MAIN_BRANCH.to_string(), reference: SnapshotReference::new( new_snapshot_id, SnapshotRetention::branch(None, None, None), @@ -371,7 +371,7 @@ impl<'a> MergeSnapshotAction<'a> { uuid: self.tx.table.metadata().uuid(), }, TableRequirement::RefSnapshotIdMatch { - r#ref: MAIN_BRACNH.to_string(), + r#ref: MAIN_BRANCH.to_string(), snapshot_id: self.parent_snapshot_id, }, ])?; @@ -649,7 +649,7 @@ mod tests { .unwrap_or(0); assert!( matches!(&tx.updates[0], TableUpdate::AddSnapshot { snapshot } if snapshot.snapshot_id() == new_snapshot_id) - && matches!(&tx.updates[1], TableUpdate::SetSnapshotRef { reference,ref_name } if reference.snapshot_id == new_snapshot_id && ref_name == MAIN_BRACNH), + && matches!(&tx.updates[1], TableUpdate::SetSnapshotRef { reference,ref_name } if reference.snapshot_id == new_snapshot_id && ref_name == MAIN_BRANCH), ); assert_eq!( vec![ @@ -657,7 +657,7 @@ mod tests { uuid: tx.table.metadata().uuid() }, TableRequirement::RefSnapshotIdMatch { - r#ref: MAIN_BRACNH.to_string(), + r#ref: MAIN_BRANCH.to_string(), snapshot_id: tx.table.metadata().current_snapshot_id } ], From 04a68942dfc0fcf9e25fdc73d88fa606a9bed7eb Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Thu, 25 Apr 2024 11:37:42 +0800 Subject: [PATCH 03/14] refine append action --- .../e2e_test/tests/append_data_file_test.rs | 12 +- crates/e2e_test/tests/conflict_commit_test.rs | 12 +- crates/iceberg/src/transaction.rs | 114 +++++++++++++----- 3 files changed, 95 insertions(+), 43 deletions(-) diff --git a/crates/e2e_test/tests/append_data_file_test.rs b/crates/e2e_test/tests/append_data_file_test.rs index 7b2490c60..0c1484d39 100644 --- a/crates/e2e_test/tests/append_data_file_test.rs +++ b/crates/e2e_test/tests/append_data_file_test.rs @@ -171,9 +171,9 @@ async fn test_append_data_file() { // commit result let tx = Transaction::new(&table); - let mut merge_action = tx.merge_snapshot(None, vec![]).unwrap(); - merge_action.add_data_files(data_file.clone()).unwrap(); - let tx = merge_action.apply().await.unwrap(); + let mut append_action = tx.fast_append(None, vec![]).unwrap(); + append_action.add_data_files(data_file.clone()).unwrap(); + let tx = append_action.apply().await.unwrap(); let table = tx.commit(&fixture.rest_catalog).await.unwrap(); // check result @@ -191,9 +191,9 @@ async fn test_append_data_file() { // commit result again let tx = Transaction::new(&table); - let mut merge_action = tx.merge_snapshot(None, vec![]).unwrap(); - merge_action.add_data_files(data_file.clone()).unwrap(); - let tx = merge_action.apply().await.unwrap(); + let mut append_action = tx.fast_append(None, vec![]).unwrap(); + append_action.add_data_files(data_file.clone()).unwrap(); + let tx = append_action.apply().await.unwrap(); let table = tx.commit(&fixture.rest_catalog).await.unwrap(); // check result again diff --git a/crates/e2e_test/tests/conflict_commit_test.rs b/crates/e2e_test/tests/conflict_commit_test.rs index c4590fbb1..e9e4606a2 100644 --- a/crates/e2e_test/tests/conflict_commit_test.rs +++ b/crates/e2e_test/tests/conflict_commit_test.rs @@ -171,13 +171,13 @@ async fn test_append_data_file_conflict() { // start two transaction and commit one of them let tx1 = Transaction::new(&table); - let mut merge_action = tx1.merge_snapshot(None, vec![]).unwrap(); - merge_action.add_data_files(data_file.clone()).unwrap(); - let tx1 = merge_action.apply().await.unwrap(); + let mut append_action = tx1.fast_append(None, vec![]).unwrap(); + append_action.add_data_files(data_file.clone()).unwrap(); + let tx1 = append_action.apply().await.unwrap(); let tx2 = Transaction::new(&table); - let mut merge_action = tx2.merge_snapshot(None, vec![]).unwrap(); - merge_action.add_data_files(data_file.clone()).unwrap(); - let tx2 = merge_action.apply().await.unwrap(); + let mut append_action = tx2.fast_append(None, vec![]).unwrap(); + append_action.add_data_files(data_file.clone()).unwrap(); + let tx2 = append_action.apply().await.unwrap(); let table = tx2.commit(&fixture.rest_catalog).await.unwrap(); // check result diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index b6b7a538a..4d537bf5f 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -105,12 +105,12 @@ impl<'a> Transaction<'a> { Ok(self) } - /// Creates a merge snapshot action. - pub fn merge_snapshot( + /// Creates a fast append action. + pub fn fast_append( self, commit_uuid: Option, key_metadata: Vec, - ) -> Result> { + ) -> Result> { let parent_snapshot_id = self .table .metadata() @@ -128,7 +128,7 @@ impl<'a> Transaction<'a> { .unwrap_or_default(); let commit_uuid = commit_uuid.unwrap_or_else(|| uuid::Uuid::new_v4().to_string()); - MergeSnapshotAction::new( + FastAppendAction::new( self, parent_snapshot_id, snapshot_id, @@ -167,8 +167,8 @@ impl<'a> Transaction<'a> { } } -/// Transaction action for merging snapshot. -pub struct MergeSnapshotAction<'a> { +/// FastAppendAction is a transaction action for fast append data files to the table. +pub struct FastAppendAction<'a> { tx: Transaction<'a>, parent_snapshot_id: Option, @@ -185,7 +185,7 @@ pub struct MergeSnapshotAction<'a> { appended_data_files: Vec, } -impl<'a> MergeSnapshotAction<'a> { +impl<'a> FastAppendAction<'a> { #[allow(clippy::too_many_arguments)] pub(crate) fn new( tx: Transaction<'a>, @@ -235,21 +235,6 @@ impl<'a> MergeSnapshotAction<'a> { ) } - fn generate_manifest_list_file_path(&self, next_seq_num: i64) -> String { - format!( - "{}/{}/snap-{}-{}-{}.{}", - self.tx.table.metadata().location(), - META_ROOT_PATH, - self.snapshot_id, - next_seq_num, - self.commit_uuid, - DataFileFormat::Avro - ) - } - - // # TODO: - // This method act like fast append now, because we don't support overwrite and partial overwrite. - // In the future, this method should be modify when we support them. async fn manifest_from_parent_snapshot(&self) -> Result> { if let Some(snapshot) = self.tx.table.metadata().current_snapshot() { let manifest_list = snapshot @@ -316,31 +301,98 @@ impl<'a> MergeSnapshotAction<'a> { /// Finished building the action and apply it to the transaction. pub async fn apply(mut self) -> Result> { + let summary = self.summary(); + let manifest = self.manifest_for_data_file().await?; + let existing_manifest_files = self.manifest_from_parent_snapshot().await?; + + let snapshot_produce_action = SnapshotProduceAction::new( + self.tx, + self.snapshot_id, + self.parent_snapshot_id, + self.schema_id, + self.format_version, + self.commit_uuid, + )?; + + snapshot_produce_action + .apply( + vec![manifest] + .into_iter() + .chain(existing_manifest_files.into_iter()), + summary, + ) + .await + } +} + +struct SnapshotProduceAction<'a> { + tx: Transaction<'a>, + + parent_snapshot_id: Option, + snapshot_id: i64, + schema_id: i32, + format_version: FormatVersion, + + commit_uuid: String, +} + +impl<'a> SnapshotProduceAction<'a> { + pub(crate) fn new( + tx: Transaction<'a>, + snapshot_id: i64, + parent_snapshot_id: Option, + schema_id: i32, + format_version: FormatVersion, + commit_uuid: String, + ) -> Result { + Ok(Self { + tx, + parent_snapshot_id, + snapshot_id, + schema_id, + format_version, + commit_uuid, + }) + } + + fn generate_manifest_list_file_path(&self, next_seq_num: i64) -> String { + format!( + "{}/{}/snap-{}-{}-{}.{}", + self.tx.table.metadata().location(), + META_ROOT_PATH, + self.snapshot_id, + next_seq_num, + self.commit_uuid, + DataFileFormat::Avro + ) + } + + /// Finished building the action and apply it to the transaction. + pub async fn apply( + mut self, + manifest_files: impl IntoIterator, + summary: Summary, + ) -> Result> { let next_seq_num = if self.format_version as u8 > 1u8 { self.tx.table.metadata().last_sequence_number() + 1 } else { INITIAL_SEQUENCE_NUMBER }; let commit_ts = chrono::Utc::now().timestamp_millis(); - let summary = self.summary(); - - let manifest = self.manifest_for_data_file().await?; - let manifest_list_path = self.generate_manifest_list_file_path(next_seq_num); + let mut manifest_list_writer = ManifestListWriter::v2( self.tx .table .file_io() - .new_output(self.generate_manifest_list_file_path(next_seq_num))?, + .new_output(manifest_list_path.clone())?, self.snapshot_id, // # TODO // Should we use `0` here for default parent snapshot id? self.parent_snapshot_id.unwrap_or_default(), next_seq_num, ); - manifest_list_writer - .add_manifests(self.manifest_from_parent_snapshot().await?.into_iter())?; - manifest_list_writer.add_manifests(vec![manifest].into_iter())?; + manifest_list_writer.add_manifests(manifest_files.into_iter())?; manifest_list_writer.close().await?; let new_snapshot = Snapshot::builder() @@ -636,7 +688,7 @@ mod tests { .partition(Struct::from_iter([Some(Literal::long(300))])) .build() .unwrap(); - let mut action = tx.merge_snapshot(None, vec![]).unwrap(); + let mut action = tx.fast_append(None, vec![]).unwrap(); action.add_data_files(vec![data_file.clone()]).unwrap(); let tx = action.apply().await.unwrap(); From 247f6b5cc3a1cdc92ddd1da12353689605fe8f4c Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Thu, 25 Apr 2024 11:46:46 +0800 Subject: [PATCH 04/14] fix cargo sort --- crates/e2e_test/Cargo.toml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/e2e_test/Cargo.toml b/crates/e2e_test/Cargo.toml index e72ffe623..d3892d4db 100644 --- a/crates/e2e_test/Cargo.toml +++ b/crates/e2e_test/Cargo.toml @@ -27,11 +27,11 @@ rust-version = { workspace = true } [dependencies] arrow-array = { workspace = true } arrow-schema = { workspace = true } -parquet = { workspace = true } futures = { workspace = true } iceberg = { workspace = true } iceberg-catalog-rest = { workspace = true } iceberg_test_utils = { path = "../test_utils", features = ["tests"] } -tokio = { version = "1", features = ["full"] } +log = { workspace = true } +parquet = { workspace = true } port_scanner = { workspace = true } -log = { workspace = true } \ No newline at end of file +tokio = { version = "1", features = ["full"] } From a1e6504b42dde89b24221628af74d7c5c0c299d1 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Wed, 8 May 2024 00:05:16 +0800 Subject: [PATCH 05/14] add consistent check for partition value --- crates/iceberg/src/spec/datatypes.rs | 51 ++++++++++++++++++++++++++++ crates/iceberg/src/transaction.rs | 42 +++++++++++++++++++++-- 2 files changed, 91 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/spec/datatypes.rs b/crates/iceberg/src/spec/datatypes.rs index bce10ad5f..3eae922ef 100644 --- a/crates/iceberg/src/spec/datatypes.rs +++ b/crates/iceberg/src/spec/datatypes.rs @@ -102,6 +102,23 @@ impl fmt::Display for Type { } impl Type { + /// Check whether literal is compatible with the type. + pub fn compatible(&self, literal: &Literal) -> bool { + match (self, literal) { + (Type::Primitive(primitive), Literal::Primitive(primitive_literal)) => { + primitive.compatible(primitive_literal) + } + (Type::Struct(struct_type), Literal::Struct(struct_literal)) => { + struct_type.compatible(struct_literal) + } + (Type::List(list_type), Literal::List(list_literal)) => { + list_type.compatible(list_literal) + } + (Type::Map(map_type), Literal::Map(map_literal)) => map_type.compatible(map_literal), + _ => false, + } + } + /// Whether the type is primitive type. #[inline(always)] pub fn is_primitive(&self) -> bool { @@ -294,6 +311,29 @@ impl<'de> Deserialize<'de> for PrimitiveType { } } +impl PrimitiveType { + /// Check whether literal is compatible with the type. + pub fn compatible(&self, literal: &PrimitiveLiteral) -> bool { + matches!( + (self, literal), + (PrimitiveType::Boolean, PrimitiveLiteral::Boolean(_)) + | (PrimitiveType::Int, PrimitiveLiteral::Int(_)) + | (PrimitiveType::Long, PrimitiveLiteral::Long(_)) + | (PrimitiveType::Float, PrimitiveLiteral::Float(_)) + | (PrimitiveType::Double, PrimitiveLiteral::Double(_)) + | (PrimitiveType::Decimal { .. }, PrimitiveLiteral::Decimal(_)) + | (PrimitiveType::Date, PrimitiveLiteral::Date(_)) + | (PrimitiveType::Time, PrimitiveLiteral::Time(_)) + | (PrimitiveType::Timestamp, PrimitiveLiteral::Timestamp(_)) + | (PrimitiveType::Timestamptz, PrimitiveLiteral::TimestampTZ(_)) + | (PrimitiveType::String, PrimitiveLiteral::String(_)) + | (PrimitiveType::Uuid, PrimitiveLiteral::UUID(_)) + | (PrimitiveType::Fixed(_), PrimitiveLiteral::Fixed(_)) + | (PrimitiveType::Binary, PrimitiveLiteral::Binary(_)) + ) + } +} + impl Serialize for PrimitiveType { fn serialize(&self, serializer: S) -> std::result::Result where S: Serializer { @@ -481,6 +521,17 @@ impl StructType { pub fn fields(&self) -> &[NestedFieldRef] { &self.fields } + + /// Check whether literal is compatible with the type. + pub fn compatible(&self, struct_literal: &Struct) -> bool { + if self.fields().len() != struct_literal.fields().len() { + return false; + } + self.fields() + .iter() + .zip(struct_literal.fields()) + .all(|(field, literal)| field.field_type.compatible(literal)) + } } impl PartialEq for StructType { diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index 4d537bf5f..7cc017393 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -213,12 +213,50 @@ impl<'a> FastAppendAction<'a> { }) } + // Check if the partition value is compatible with the partition type. + fn validate_partition_value( + partition_value: &Struct, + partition_type: &StructType, + ) -> Result<()> { + if partition_value.fields().len() != partition_type.fields().len() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Partition value is not compatitable with partition type", + )); + } + if partition_value + .fields() + .iter() + .zip(partition_type.fields()) + .any(|(field, field_type)| !field_type.field_type.compatible(field)) + { + return Err(Error::new( + ErrorKind::DataInvalid, + "Partition value is not compatitable partition type", + )); + } + Ok(()) + } + /// Add data files to the snapshot. pub fn add_data_files( &mut self, - data_file: impl IntoIterator, + data_files: impl IntoIterator, ) -> Result<&mut Self> { - self.appended_data_files.extend(data_file); + let data_files: Vec = data_files.into_iter().collect(); + for data_file in &data_files { + if data_file.content_type() != crate::spec::DataContentType::Data { + return Err(Error::new( + ErrorKind::DataInvalid, + "Only data content type is allowed for fast append", + )); + } + Self::validate_partition_value( + data_file.partition(), + &self.partition_spec.partition_type(&self.schema)?, + )?; + } + self.appended_data_files.extend(data_files); Ok(self) } From 3ee8675fd114e4b87f0dc2ee4409dc24df7a6d2a Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Wed, 8 May 2024 00:07:37 +0800 Subject: [PATCH 06/14] generate unique snapshot id --- crates/iceberg/src/transaction.rs | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index 7cc017393..058da6e93 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -105,6 +105,28 @@ impl<'a> Transaction<'a> { Ok(self) } + fn generate_unique_snapshot_id(&self) -> i64 { + let generate_random_id = || -> i64 { + let (lhs, rhs) = Uuid::new_v4().as_u64_pair(); + let snapshot_id = (lhs ^ rhs) as i64; + if snapshot_id < 0 { + -snapshot_id + } else { + snapshot_id + } + }; + let mut snapshot_id = generate_random_id(); + while self + .table + .metadata() + .snapshots() + .any(|s| s.snapshot_id() == snapshot_id) + { + snapshot_id = generate_random_id(); + } + snapshot_id + } + /// Creates a fast append action. pub fn fast_append( self, @@ -116,7 +138,7 @@ impl<'a> Transaction<'a> { .metadata() .current_snapshot() .map(|s| s.snapshot_id()); - let snapshot_id = parent_snapshot_id.map(|id| id + 1).unwrap_or(0); + let snapshot_id = self.generate_unique_snapshot_id(); let schema = self.table.metadata().current_schema().as_ref().clone(); let schema_id = schema.schema_id(); let format_version = self.table.metadata().format_version(); From 89785ff28ef3bc4047edd676d8754b9cb8a70351 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Wed, 8 May 2024 00:08:29 +0800 Subject: [PATCH 07/14] avoid to set snapshot id for v2 --- crates/iceberg/src/transaction.rs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index 058da6e93..cf67c86fb 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -25,8 +25,8 @@ use crate::error::Result; use crate::spec::{ DataFile, DataFileFormat, FormatVersion, Manifest, ManifestEntry, ManifestFile, ManifestListWriter, ManifestMetadata, ManifestWriter, NullOrder, PartitionSpec, Schema, - Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Summary, - Transform, + Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Struct, + StructType, Summary, Transform, }; use crate::table::Table; use crate::TableUpdate::UpgradeFormatVersion; @@ -326,11 +326,16 @@ impl<'a> FastAppendAction<'a> { let manifest_entries = appended_data_files .into_iter() .map(|data_file| { - ManifestEntry::builder() + let builder = ManifestEntry::builder() .status(crate::spec::ManifestStatus::Added) - .snapshot_id(self.snapshot_id) - .data_file(data_file) - .build() + .data_file(data_file); + if self.format_version as u8 == 1u8 { + builder.snapshot_id(self.snapshot_id).build() + } else { + // For format version > 1, we set the snapshot id at the inherited time to avoid rewrite the manifest file when + // commit failed. + builder.build() + } }) .collect(); let manifest_meta = ManifestMetadata::builder() From 26ddf70a510a6d2106c9825d56d188137e512d0a Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Wed, 8 May 2024 00:10:32 +0800 Subject: [PATCH 08/14] refine test --- crates/e2e_test/testdata/docker-compose.yaml | 5 ++--- .../e2e_test/tests/append_data_file_test.rs | 21 +++++++++++++++++++ crates/iceberg/src/spec/values.rs | 3 +++ crates/iceberg/src/transaction.rs | 4 +++- 4 files changed, 29 insertions(+), 4 deletions(-) diff --git a/crates/e2e_test/testdata/docker-compose.yaml b/crates/e2e_test/testdata/docker-compose.yaml index 0152a22ca..c857de9c7 100644 --- a/crates/e2e_test/testdata/docker-compose.yaml +++ b/crates/e2e_test/testdata/docker-compose.yaml @@ -24,7 +24,6 @@ services: - AWS_ACCESS_KEY_ID=admin - AWS_SECRET_ACCESS_KEY=password - AWS_REGION=us-east-1 - - CATALOG_CATOLOG__IMPL=org.apache.iceberg.jdbc.JdbcCatalog - CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory - CATALOG_WAREHOUSE=s3://icebergdata/demo - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO @@ -37,7 +36,7 @@ services: - 8181 minio: - image: minio/minio:RELEASE.2024-03-07T00-43-48Z + image: minio/minio:latest environment: - MINIO_ROOT_USER=admin - MINIO_ROOT_PASSWORD=password @@ -50,7 +49,7 @@ services: mc: depends_on: - minio - image: minio/mc:RELEASE.2024-03-07T00-31-49Z + image: minio/mc:latest environment: - AWS_ACCESS_KEY_ID=admin - AWS_SECRET_ACCESS_KEY=password diff --git a/crates/e2e_test/tests/append_data_file_test.rs b/crates/e2e_test/tests/append_data_file_test.rs index 0c1484d39..777a7d0c4 100644 --- a/crates/e2e_test/tests/append_data_file_test.rs +++ b/crates/e2e_test/tests/append_data_file_test.rs @@ -32,6 +32,7 @@ use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation}; use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig}; use iceberg_test_utils::docker::DockerCompose; use iceberg_test_utils::{normalize_test_name, set_up}; +use parquet::arrow::ParquetRecordBatchStreamBuilder; use parquet::file::properties::WriterProperties; use port_scanner::scan_port_addr; use std::collections::HashMap; @@ -169,6 +170,26 @@ async fn test_append_data_file() { data_file_writer.write(batch.clone()).await.unwrap(); let data_file = data_file_writer.close().await.unwrap(); + // check parquet file schema + let batch_stream_builder = ParquetRecordBatchStreamBuilder::new( + table + .file_io() + .new_input(data_file[0].file_path()) + .unwrap() + .reader() + .await + .unwrap(), + ) + .await + .unwrap(); + let field_ids: Vec = batch_stream_builder + .parquet_schema() + .columns() + .iter() + .map(|col| col.self_type().get_basic_info().id()) + .collect(); + assert_eq!(field_ids, vec![1, 2, 3]); + // commit result let tx = Transaction::new(&table); let mut append_action = tx.fast_append(None, vec![]).unwrap(); diff --git a/crates/iceberg/src/spec/values.rs b/crates/iceberg/src/spec/values.rs index 552ac497f..0d3a685b4 100644 --- a/crates/iceberg/src/spec/values.rs +++ b/crates/iceberg/src/spec/values.rs @@ -1575,6 +1575,9 @@ impl Struct { /// returns true if the field at position `index` is null pub fn is_null_at_index(&self, index: usize) -> bool { self.null_bitmap[index] + /// Return fields in the struct. + pub fn fields(&self) -> &[Literal] { + &self.fields } } diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index cf67c86fb..01bbf6c9d 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -357,6 +357,8 @@ impl<'a> FastAppendAction<'a> { writer.write(manifest).await } + // # TODO: + // Complete the summary. fn summary(&self) -> Summary { Summary { operation: crate::spec::Operation::Append, @@ -740,7 +742,7 @@ mod tests { } #[tokio::test] - async fn test_merge_snapshot_actio() { + async fn test_merge_snapshot_action() { let table = make_v2_minimal_table(); let tx = Transaction::new(&table); From 59d8bed45341bd0bb8985fb6d5023f0db93baa4c Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Wed, 8 May 2024 00:39:05 +0800 Subject: [PATCH 09/14] fix unit test --- crates/iceberg/src/transaction.rs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index 01bbf6c9d..0b159005c 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -760,15 +760,8 @@ mod tests { let tx = action.apply().await.unwrap(); // check updates and requirements - let new_snapshot_id = tx - .table - .metadata() - .current_snapshot_id - .map(|id| id + 1) - .unwrap_or(0); assert!( - matches!(&tx.updates[0], TableUpdate::AddSnapshot { snapshot } if snapshot.snapshot_id() == new_snapshot_id) - && matches!(&tx.updates[1], TableUpdate::SetSnapshotRef { reference,ref_name } if reference.snapshot_id == new_snapshot_id && ref_name == MAIN_BRANCH), + matches!((&tx.updates[0],&tx.updates[1]), (TableUpdate::AddSnapshot { snapshot },TableUpdate::SetSnapshotRef { reference,ref_name }) if snapshot.snapshot_id() == reference.snapshot_id && ref_name == MAIN_BRANCH) ); assert_eq!( vec![ From f656cebe7974861a25f65504915fc7daf8661b54 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Thu, 9 May 2024 21:42:14 +0800 Subject: [PATCH 10/14] export ports --- crates/e2e_test/testdata/docker-compose.yaml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/e2e_test/testdata/docker-compose.yaml b/crates/e2e_test/testdata/docker-compose.yaml index c857de9c7..d461471b5 100644 --- a/crates/e2e_test/testdata/docker-compose.yaml +++ b/crates/e2e_test/testdata/docker-compose.yaml @@ -32,6 +32,8 @@ services: - minio links: - minio:icebergdata.minio + ports: + - 8181:8181 expose: - 8181 @@ -41,6 +43,8 @@ services: - MINIO_ROOT_USER=admin - MINIO_ROOT_PASSWORD=password - MINIO_DOMAIN=minio + ports: + - 9001:9001 expose: - 9001 - 9000 From de99dc6b820739f8cde73df38896681260f0a687 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Tue, 14 May 2024 00:18:32 +0800 Subject: [PATCH 11/14] fix None case for parant_snapshot_id --- crates/iceberg/src/scan.rs | 4 +--- crates/iceberg/src/spec/manifest_list.rs | 20 +++++++++++--------- crates/iceberg/src/transaction.rs | 4 +--- 3 files changed, 13 insertions(+), 15 deletions(-) diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 89e8846f0..aca8fd204 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -1124,9 +1124,7 @@ mod tests { .new_output(current_snapshot.manifest_list()) .unwrap(), current_snapshot.snapshot_id(), - current_snapshot - .parent_snapshot_id() - .unwrap_or(EMPTY_SNAPSHOT_ID), + current_snapshot.parent_snapshot_id(), current_snapshot.sequence_number(), ); manifest_list_write diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index 3aaecf12d..5363d10a8 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -122,18 +122,20 @@ impl ManifestListWriter { pub fn v2( output_file: OutputFile, snapshot_id: i64, - parent_snapshot_id: i64, + parent_snapshot_id: Option, sequence_number: i64, ) -> Self { - let metadata = HashMap::from_iter([ + let mut metadata = HashMap::from_iter([ ("snapshot-id".to_string(), snapshot_id.to_string()), - ( - "parent-snapshot-id".to_string(), - parent_snapshot_id.to_string(), - ), ("sequence-number".to_string(), sequence_number.to_string()), ("format-version".to_string(), "2".to_string()), ]); + if let Some(parent_snapshot_id) = parent_snapshot_id { + metadata.insert( + "parent-snapshot-id".to_string(), + parent_snapshot_id.to_string(), + ); + } Self::new( FormatVersion::V2, output_file, @@ -1213,7 +1215,7 @@ mod test { let mut writer = ManifestListWriter::v2( file_io.new_output(full_path.clone()).unwrap(), 1646658105718557341, - 1646658105718557341, + Some(1646658105718557341), 1, ); @@ -1391,7 +1393,7 @@ mod test { let io = FileIOBuilder::new_fs_io().build().unwrap(); let output_file = io.new_output(path.to_str().unwrap()).unwrap(); - let mut writer = ManifestListWriter::v2(output_file, snapshot_id, 0, seq_num); + let mut writer = ManifestListWriter::v2(output_file, snapshot_id, Some(0), seq_num); writer .add_manifests(expected_manifest_list.entries.clone().into_iter()) .unwrap(); @@ -1445,7 +1447,7 @@ mod test { let io = FileIOBuilder::new_fs_io().build().unwrap(); let output_file = io.new_output(path.to_str().unwrap()).unwrap(); - let mut writer = ManifestListWriter::v2(output_file, 1646658105718557341, 0, 1); + let mut writer = ManifestListWriter::v2(output_file, 1646658105718557341, Some(0), 1); writer .add_manifests(expected_manifest_list.entries.clone().into_iter()) .unwrap(); diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index 0b159005c..72c882980 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -454,9 +454,7 @@ impl<'a> SnapshotProduceAction<'a> { .file_io() .new_output(manifest_list_path.clone())?, self.snapshot_id, - // # TODO - // Should we use `0` here for default parent snapshot id? - self.parent_snapshot_id.unwrap_or_default(), + self.parent_snapshot_id, next_seq_num, ); manifest_list_writer.add_manifests(manifest_files.into_iter())?; From 939d6d7d0806f8176845cd81f80a49ba02fd1477 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Tue, 21 May 2024 11:14:18 +0800 Subject: [PATCH 12/14] fix parquect schema check --- crates/e2e_test/Cargo.toml | 2 +- .../e2e_test/tests/append_data_file_test.rs | 23 ++++++++++--------- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/crates/e2e_test/Cargo.toml b/crates/e2e_test/Cargo.toml index d3892d4db..2607b099c 100644 --- a/crates/e2e_test/Cargo.toml +++ b/crates/e2e_test/Cargo.toml @@ -34,4 +34,4 @@ iceberg_test_utils = { path = "../test_utils", features = ["tests"] } log = { workspace = true } parquet = { workspace = true } port_scanner = { workspace = true } -tokio = { version = "1", features = ["full"] } +tokio = { workspace = true } diff --git a/crates/e2e_test/tests/append_data_file_test.rs b/crates/e2e_test/tests/append_data_file_test.rs index 777a7d0c4..101101151 100644 --- a/crates/e2e_test/tests/append_data_file_test.rs +++ b/crates/e2e_test/tests/append_data_file_test.rs @@ -32,7 +32,7 @@ use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation}; use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig}; use iceberg_test_utils::docker::DockerCompose; use iceberg_test_utils::{normalize_test_name, set_up}; -use parquet::arrow::ParquetRecordBatchStreamBuilder; +use parquet::arrow::arrow_reader::ArrowReaderOptions; use parquet::file::properties::WriterProperties; use port_scanner::scan_port_addr; use std::collections::HashMap; @@ -171,18 +171,19 @@ async fn test_append_data_file() { let data_file = data_file_writer.close().await.unwrap(); // check parquet file schema - let batch_stream_builder = ParquetRecordBatchStreamBuilder::new( - table - .file_io() - .new_input(data_file[0].file_path()) - .unwrap() - .reader() - .await - .unwrap(), + let content = table + .file_io() + .new_input(data_file[0].file_path()) + .unwrap() + .read() + .await + .unwrap(); + let parquet_reader = parquet::arrow::arrow_reader::ArrowReaderMetadata::load( + &content, + ArrowReaderOptions::default(), ) - .await .unwrap(); - let field_ids: Vec = batch_stream_builder + let field_ids: Vec = parquet_reader .parquet_schema() .columns() .iter() From 94f0103a690a0a94a50a0c88fcebc37640caf04b Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Thu, 26 Sep 2024 19:29:18 +0800 Subject: [PATCH 13/14] refactor append action of transaction --- crates/catalog/rest/src/catalog.rs | 2 +- .../e2e_test/tests/append_data_file_test.rs | 22 +- crates/e2e_test/tests/conflict_commit_test.rs | 22 +- crates/iceberg/src/catalog/mod.rs | 2 +- crates/iceberg/src/io/object_cache.rs | 6 +- crates/iceberg/src/scan.rs | 1 - crates/iceberg/src/spec/datatypes.rs | 51 --- crates/iceberg/src/spec/manifest_list.rs | 30 +- crates/iceberg/src/spec/snapshot.rs | 11 +- crates/iceberg/src/spec/table_metadata.rs | 18 +- crates/iceberg/src/spec/values.rs | 10 + crates/iceberg/src/transaction.rs | 390 +++++++++++------- 12 files changed, 312 insertions(+), 253 deletions(-) diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 1181c3cc1..bd73f6d03 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -1376,7 +1376,7 @@ mod tests { .with_schema_id(0) .with_summary(Summary { operation: Operation::Append, - other: HashMap::from_iter([ + additional_properties: HashMap::from_iter([ ("spark.app.id", "local-1646787004168"), ("added-data-files", "1"), ("added-records", "1"), diff --git a/crates/e2e_test/tests/append_data_file_test.rs b/crates/e2e_test/tests/append_data_file_test.rs index 101101151..92d91b696 100644 --- a/crates/e2e_test/tests/append_data_file_test.rs +++ b/crates/e2e_test/tests/append_data_file_test.rs @@ -17,6 +17,9 @@ //! Integration tests for rest catalog. +use std::collections::HashMap; +use std::sync::Arc; + use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; use futures::TryStreamExt; use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; @@ -35,8 +38,6 @@ use iceberg_test_utils::{normalize_test_name, set_up}; use parquet::arrow::arrow_reader::ArrowReaderOptions; use parquet::file::properties::WriterProperties; use port_scanner::scan_port_addr; -use std::collections::HashMap; -use std::sync::Arc; use tokio::time::sleep; const REST_CATALOG_PORT: u16 = 8181; @@ -80,7 +81,7 @@ async fn set_test_fixture(func: &str) -> TestFixture { (S3_REGION.to_string(), "us-east-1".to_string()), ])) .build(); - let rest_catalog = RestCatalog::new(config).await.unwrap(); + let rest_catalog = RestCatalog::new(config); TestFixture { _docker_compose: docker_compose, @@ -145,7 +146,7 @@ async fn test_append_data_file() { ); let parquet_writer_builder = ParquetWriterBuilder::new( WriterProperties::default(), - schema.clone(), + table.metadata().current_schema().clone(), table.file_io().clone(), location_generator.clone(), file_name_generator.clone(), @@ -158,14 +159,11 @@ async fn test_append_data_file() { let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]); let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]); let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]); - let batch = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(col1) as ArrayRef, - Arc::new(col2) as ArrayRef, - Arc::new(col3) as ArrayRef, - ], - ) + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(col1) as ArrayRef, + Arc::new(col2) as ArrayRef, + Arc::new(col3) as ArrayRef, + ]) .unwrap(); data_file_writer.write(batch.clone()).await.unwrap(); let data_file = data_file_writer.close().await.unwrap(); diff --git a/crates/e2e_test/tests/conflict_commit_test.rs b/crates/e2e_test/tests/conflict_commit_test.rs index e9e4606a2..7890b2b12 100644 --- a/crates/e2e_test/tests/conflict_commit_test.rs +++ b/crates/e2e_test/tests/conflict_commit_test.rs @@ -17,6 +17,9 @@ //! Integration tests for rest catalog. +use std::collections::HashMap; +use std::sync::Arc; + use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; use futures::TryStreamExt; use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; @@ -34,8 +37,6 @@ use iceberg_test_utils::docker::DockerCompose; use iceberg_test_utils::{normalize_test_name, set_up}; use parquet::file::properties::WriterProperties; use port_scanner::scan_port_addr; -use std::collections::HashMap; -use std::sync::Arc; use tokio::time::sleep; const REST_CATALOG_PORT: u16 = 8181; @@ -79,7 +80,7 @@ async fn set_test_fixture(func: &str) -> TestFixture { (S3_REGION.to_string(), "us-east-1".to_string()), ])) .build(); - let rest_catalog = RestCatalog::new(config).await.unwrap(); + let rest_catalog = RestCatalog::new(config); TestFixture { _docker_compose: docker_compose, @@ -144,7 +145,7 @@ async fn test_append_data_file_conflict() { ); let parquet_writer_builder = ParquetWriterBuilder::new( WriterProperties::default(), - schema.clone(), + table.metadata().current_schema().clone(), table.file_io().clone(), location_generator.clone(), file_name_generator.clone(), @@ -157,14 +158,11 @@ async fn test_append_data_file_conflict() { let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]); let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]); let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]); - let batch = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(col1) as ArrayRef, - Arc::new(col2) as ArrayRef, - Arc::new(col3) as ArrayRef, - ], - ) + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(col1) as ArrayRef, + Arc::new(col2) as ArrayRef, + Arc::new(col3) as ArrayRef, + ]) .unwrap(); data_file_writer.write(batch.clone()).await.unwrap(); let data_file = data_file_writer.close().await.unwrap(); diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index 70a403905..09463f8b4 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -1378,7 +1378,7 @@ mod tests { .with_schema_id(1) .with_summary(Summary { operation: Operation::Append, - other: HashMap::default(), + additional_properties: HashMap::default(), }) .build(), }; diff --git a/crates/iceberg/src/io/object_cache.rs b/crates/iceberg/src/io/object_cache.rs index 35b6a2c94..88e2d0e2d 100644 --- a/crates/iceberg/src/io/object_cache.rs +++ b/crates/iceberg/src/io/object_cache.rs @@ -185,7 +185,7 @@ mod tests { use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, Literal, Manifest, ManifestContentType, ManifestEntry, ManifestListWriter, ManifestMetadata, ManifestStatus, - ManifestWriter, Struct, TableMetadata, EMPTY_SNAPSHOT_ID, + ManifestWriter, Struct, TableMetadata, }; use crate::table::Table; use crate::TableIdent; @@ -293,9 +293,7 @@ mod tests { .new_output(current_snapshot.manifest_list()) .unwrap(), current_snapshot.snapshot_id(), - current_snapshot - .parent_snapshot_id() - .unwrap_or(EMPTY_SNAPSHOT_ID), + current_snapshot.parent_snapshot_id(), current_snapshot.sequence_number(), ); manifest_list_write diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index aca8fd204..8f0bc38f6 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -977,7 +977,6 @@ mod tests { DataContentType, DataFileBuilder, DataFileFormat, Datum, FormatVersion, Literal, Manifest, ManifestContentType, ManifestEntry, ManifestListWriter, ManifestMetadata, ManifestStatus, ManifestWriter, NestedField, PrimitiveType, Schema, Struct, TableMetadata, Type, - EMPTY_SNAPSHOT_ID, }; use crate::table::Table; use crate::TableIdent; diff --git a/crates/iceberg/src/spec/datatypes.rs b/crates/iceberg/src/spec/datatypes.rs index 3eae922ef..bce10ad5f 100644 --- a/crates/iceberg/src/spec/datatypes.rs +++ b/crates/iceberg/src/spec/datatypes.rs @@ -102,23 +102,6 @@ impl fmt::Display for Type { } impl Type { - /// Check whether literal is compatible with the type. - pub fn compatible(&self, literal: &Literal) -> bool { - match (self, literal) { - (Type::Primitive(primitive), Literal::Primitive(primitive_literal)) => { - primitive.compatible(primitive_literal) - } - (Type::Struct(struct_type), Literal::Struct(struct_literal)) => { - struct_type.compatible(struct_literal) - } - (Type::List(list_type), Literal::List(list_literal)) => { - list_type.compatible(list_literal) - } - (Type::Map(map_type), Literal::Map(map_literal)) => map_type.compatible(map_literal), - _ => false, - } - } - /// Whether the type is primitive type. #[inline(always)] pub fn is_primitive(&self) -> bool { @@ -311,29 +294,6 @@ impl<'de> Deserialize<'de> for PrimitiveType { } } -impl PrimitiveType { - /// Check whether literal is compatible with the type. - pub fn compatible(&self, literal: &PrimitiveLiteral) -> bool { - matches!( - (self, literal), - (PrimitiveType::Boolean, PrimitiveLiteral::Boolean(_)) - | (PrimitiveType::Int, PrimitiveLiteral::Int(_)) - | (PrimitiveType::Long, PrimitiveLiteral::Long(_)) - | (PrimitiveType::Float, PrimitiveLiteral::Float(_)) - | (PrimitiveType::Double, PrimitiveLiteral::Double(_)) - | (PrimitiveType::Decimal { .. }, PrimitiveLiteral::Decimal(_)) - | (PrimitiveType::Date, PrimitiveLiteral::Date(_)) - | (PrimitiveType::Time, PrimitiveLiteral::Time(_)) - | (PrimitiveType::Timestamp, PrimitiveLiteral::Timestamp(_)) - | (PrimitiveType::Timestamptz, PrimitiveLiteral::TimestampTZ(_)) - | (PrimitiveType::String, PrimitiveLiteral::String(_)) - | (PrimitiveType::Uuid, PrimitiveLiteral::UUID(_)) - | (PrimitiveType::Fixed(_), PrimitiveLiteral::Fixed(_)) - | (PrimitiveType::Binary, PrimitiveLiteral::Binary(_)) - ) - } -} - impl Serialize for PrimitiveType { fn serialize(&self, serializer: S) -> std::result::Result where S: Serializer { @@ -521,17 +481,6 @@ impl StructType { pub fn fields(&self) -> &[NestedFieldRef] { &self.fields } - - /// Check whether literal is compatible with the type. - pub fn compatible(&self, struct_literal: &Struct) -> bool { - if self.fields().len() != struct_literal.fields().len() { - return false; - } - self.fields() - .iter() - .zip(struct_literal.fields()) - .all(|(field, literal)| field.field_type.compatible(literal)) - } } impl PartialEq for StructType { diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index 5363d10a8..7ef6b8c31 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -106,15 +106,17 @@ impl std::fmt::Debug for ManifestListWriter { impl ManifestListWriter { /// Construct a v1 [`ManifestListWriter`] that writes to a provided [`OutputFile`]. - pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: i64) -> Self { - let metadata = HashMap::from_iter([ + pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: Option) -> Self { + let mut metadata = HashMap::from_iter([ ("snapshot-id".to_string(), snapshot_id.to_string()), - ( - "parent-snapshot-id".to_string(), - parent_snapshot_id.to_string(), - ), ("format-version".to_string(), "1".to_string()), ]); + if let Some(parent_snapshot_id) = parent_snapshot_id { + metadata.insert( + "parent-snapshot-id".to_string(), + parent_snapshot_id.to_string(), + ); + } Self::new(FormatVersion::V1, output_file, metadata, 0, snapshot_id) } @@ -582,6 +584,18 @@ pub struct ManifestFile { pub key_metadata: Vec, } +impl ManifestFile { + /// Checks if the manifest file has any added files. + pub fn has_added_files(&self) -> bool { + self.added_files_count.is_none() || self.added_files_count.unwrap() > 0 + } + + /// Checks if the manifest file has any existed files. + pub fn has_existing_files(&self) -> bool { + self.existing_files_count.is_none() || self.existing_files_count.unwrap() > 0 + } +} + /// The type of files tracked by the manifest, either data or delete files; Data(0) for all v1 manifests #[derive(Debug, PartialEq, Clone, Eq)] pub enum ManifestContentType { @@ -1148,7 +1162,7 @@ mod test { let mut writer = ManifestListWriter::v1( file_io.new_output(full_path.clone()).unwrap(), 1646658105718557341, - 1646658105718557341, + Some(1646658105718557341), ); writer @@ -1337,7 +1351,7 @@ mod test { let io = FileIOBuilder::new_fs_io().build().unwrap(); let output_file = io.new_output(path.to_str().unwrap()).unwrap(); - let mut writer = ManifestListWriter::v1(output_file, 1646658105718557341, 0); + let mut writer = ManifestListWriter::v1(output_file, 1646658105718557341, Some(0)); writer .add_manifests(expected_manifest_list.entries.clone().into_iter()) .unwrap(); diff --git a/crates/iceberg/src/spec/snapshot.rs b/crates/iceberg/src/spec/snapshot.rs index 09fa7dc87..0240afaba 100644 --- a/crates/iceberg/src/spec/snapshot.rs +++ b/crates/iceberg/src/spec/snapshot.rs @@ -59,7 +59,7 @@ pub struct Summary { pub operation: Operation, /// Other summary data. #[serde(flatten)] - pub other: HashMap, + pub additional_properties: HashMap, } impl Default for Operation { @@ -291,7 +291,7 @@ pub(super) mod _serde { }, summary: v1.summary.unwrap_or(Summary { operation: Operation::default(), - other: HashMap::new(), + additional_properties: HashMap::new(), }), schema_id: v1.schema_id, }) @@ -378,11 +378,6 @@ impl SnapshotRetention { max_ref_age_ms, } } - - /// Create a new tag retention policy - pub fn tag(max_ref_age_ms: i64) -> Self { - SnapshotRetention::Tag { max_ref_age_ms } - } } #[cfg(test)] @@ -421,7 +416,7 @@ mod tests { assert_eq!( Summary { operation: Operation::Append, - other: HashMap::new() + additional_properties: HashMap::new() }, *result.summary() ); diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index daed758cb..01f526656 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -189,6 +189,18 @@ impl TableMetadata { self.last_sequence_number } + /// Returns the next sequence number for the table. + /// + /// For format version 1, it always returns the initial sequence number. + /// For other versions, it returns the last sequence number incremented by 1. + #[inline] + pub fn next_sequence_number(&self) -> i64 { + match self.format_version { + FormatVersion::V1 => INITIAL_SEQUENCE_NUMBER, + _ => self.last_sequence_number + 1, + } + } + /// Returns last updated time. #[inline] pub fn last_updated_timestamp(&self) -> Result> { @@ -1552,7 +1564,7 @@ mod tests { .with_sequence_number(0) .with_schema_id(0) .with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro") - .with_summary(Summary { operation: Operation::Append, other: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) }) + .with_summary(Summary { operation: Operation::Append, additional_properties: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) }) .build(); let expected = TableMetadata { @@ -1971,7 +1983,7 @@ mod tests { .with_manifest_list("s3://a/b/1.avro") .with_summary(Summary { operation: Operation::Append, - other: HashMap::new(), + additional_properties: HashMap::new(), }) .build(); @@ -1984,7 +1996,7 @@ mod tests { .with_manifest_list("s3://a/b/2.avro") .with_summary(Summary { operation: Operation::Append, - other: HashMap::new(), + additional_properties: HashMap::new(), }) .build(); diff --git a/crates/iceberg/src/spec/values.rs b/crates/iceberg/src/spec/values.rs index 0d3a685b4..6fb070527 100644 --- a/crates/iceberg/src/spec/values.rs +++ b/crates/iceberg/src/spec/values.rs @@ -1537,6 +1537,14 @@ impl Literal { })?; Ok(Self::decimal(decimal.mantissa())) } + + /// Attempts to convert the Literal to a PrimitiveLiteral + pub fn as_primitive_literal(&self) -> Option { + match self { + Literal::Primitive(primitive) => Some(primitive.clone()), + _ => None, + } + } } /// The partition struct stores the tuple of partition values for each file. @@ -1575,6 +1583,8 @@ impl Struct { /// returns true if the field at position `index` is null pub fn is_null_at_index(&self, index: usize) -> bool { self.null_bitmap[index] + } + /// Return fields in the struct. pub fn fields(&self) -> &[Literal] { &self.fields diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index 72c882980..6c8246e83 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -19,22 +19,26 @@ use std::cmp::Ordering; use std::collections::HashMap; +use std::future::Future; use std::mem::discriminant; +use std::ops::RangeFrom; +use std::sync::Arc; + +use uuid::Uuid; use crate::error::Result; +use crate::io::OutputFile; use crate::spec::{ DataFile, DataFileFormat, FormatVersion, Manifest, ManifestEntry, ManifestFile, - ManifestListWriter, ManifestMetadata, ManifestWriter, NullOrder, PartitionSpec, Schema, - Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Struct, - StructType, Summary, Transform, + ManifestListWriter, ManifestMetadata, ManifestWriter, NullOrder, Operation, PartitionSpec, + Schema, Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, + Struct, StructType, Summary, Transform, MAIN_BRANCH, }; use crate::table::Table; use crate::TableUpdate::UpgradeFormatVersion; use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate}; -const INITIAL_SEQUENCE_NUMBER: i64 = 0; const META_ROOT_PATH: &str = "metadata"; -const MAIN_BRANCH: &str = "main"; /// Table transaction. pub struct Transaction<'a> { @@ -142,12 +146,7 @@ impl<'a> Transaction<'a> { let schema = self.table.metadata().current_schema().as_ref().clone(); let schema_id = schema.schema_id(); let format_version = self.table.metadata().format_version(); - let partition_spec = self - .table - .metadata() - .default_partition_spec() - .map(|spec| spec.as_ref().clone()) - .unwrap_or_default(); + let partition_spec = self.table.metadata().default_partition_spec().clone(); let commit_uuid = commit_uuid.unwrap_or_else(|| uuid::Uuid::new_v4().to_string()); FastAppendAction::new( @@ -160,6 +159,7 @@ impl<'a> Transaction<'a> { partition_spec, key_metadata, commit_uuid, + HashMap::new(), ) } @@ -191,47 +191,175 @@ impl<'a> Transaction<'a> { /// FastAppendAction is a transaction action for fast append data files to the table. pub struct FastAppendAction<'a> { + snapshot_produce_action: SnapshotProduceAction<'a>, +} + +impl<'a> FastAppendAction<'a> { + #[allow(clippy::too_many_arguments)] + pub(crate) fn new( + tx: Transaction<'a>, + parent_snapshot_id: Option, + snapshot_id: i64, + schema: Schema, + schema_id: i32, + format_version: FormatVersion, + partition_spec: Arc, + key_metadata: Vec, + commit_uuid: String, + snapshot_properties: HashMap, + ) -> Result { + Ok(Self { + snapshot_produce_action: SnapshotProduceAction::new( + tx, + snapshot_id, + parent_snapshot_id, + schema_id, + format_version, + partition_spec, + schema, + key_metadata, + commit_uuid, + snapshot_properties, + )?, + }) + } + + /// Add data files to the snapshot. + pub fn add_data_files( + &mut self, + data_files: impl IntoIterator, + ) -> Result<&mut Self> { + self.snapshot_produce_action.add_data_files(data_files)?; + Ok(self) + } + + /// Finished building the action and apply it to the transaction. + pub async fn apply(self) -> Result> { + self.snapshot_produce_action + .apply(FastAppendOperation, DefaultManifestProcess) + .await + } +} + +struct FastAppendOperation; + +impl SnapshotProduceOperation for FastAppendOperation { + fn operation(&self) -> Operation { + Operation::Append + } + + async fn delete_entries( + &self, + _snapshot_produce: &SnapshotProduceAction<'_>, + ) -> Result> { + Ok(vec![]) + } + + async fn existing_manifest( + &self, + snapshot_produce: &SnapshotProduceAction<'_>, + ) -> Result> { + let Some(snapshot) = snapshot_produce + .parent_snapshot_id + .and_then(|id| snapshot_produce.tx.table.metadata().snapshot_by_id(id)) + else { + return Ok(vec![]); + }; + + let manifest_list = snapshot + .load_manifest_list( + snapshot_produce.tx.table.file_io(), + &snapshot_produce.tx.table.metadata_ref(), + ) + .await?; + + Ok(manifest_list + .entries() + .iter() + .filter(|entry| { + entry.has_added_files() + || entry.has_existing_files() + || entry.added_snapshot_id == snapshot_produce.snapshot_id + }) + .cloned() + .collect()) + } +} + +trait SnapshotProduceOperation: Send + Sync { + fn operation(&self) -> Operation; + #[allow(unused)] + fn delete_entries( + &self, + snapshot_produce: &SnapshotProduceAction, + ) -> impl Future>> + Send; + fn existing_manifest( + &self, + snapshot_produce: &SnapshotProduceAction, + ) -> impl Future>> + Send; +} + +struct DefaultManifestProcess; + +impl ManifestProcess for DefaultManifestProcess { + fn process_manifeset(&self, manifests: Vec) -> Vec { + manifests + } +} + +trait ManifestProcess: Send + Sync { + fn process_manifeset(&self, manifests: Vec) -> Vec; +} + +struct SnapshotProduceAction<'a> { tx: Transaction<'a>, parent_snapshot_id: Option, snapshot_id: i64, - schema: Schema, schema_id: i32, format_version: FormatVersion, - partition_spec: PartitionSpec, + partition_spec: Arc, + schema: Schema, key_metadata: Vec, commit_uuid: String, - manifest_id: i64, - appended_data_files: Vec, + snapshot_properties: HashMap, + added_data_files: Vec, + + // A counter used to generate unique manifest file names. + // It starts from 0 and increments for each new manifest file. + // Note: This counter is limited to the range of (0..u64::MAX). + manifest_counter: RangeFrom, } -impl<'a> FastAppendAction<'a> { +impl<'a> SnapshotProduceAction<'a> { #[allow(clippy::too_many_arguments)] pub(crate) fn new( tx: Transaction<'a>, - parent_snapshot_id: Option, snapshot_id: i64, - schema: Schema, + parent_snapshot_id: Option, schema_id: i32, format_version: FormatVersion, - partition_spec: PartitionSpec, + partition_spec: Arc, + schema: Schema, key_metadata: Vec, commit_uuid: String, + snapshot_properties: HashMap, ) -> Result { Ok(Self { tx, parent_snapshot_id, snapshot_id, - schema, schema_id, format_version, + commit_uuid, + snapshot_properties, + added_data_files: vec![], + manifest_counter: (0..), partition_spec, + schema, key_metadata, - commit_uuid, - manifest_id: 0, - appended_data_files: vec![], }) } @@ -250,7 +378,13 @@ impl<'a> FastAppendAction<'a> { .fields() .iter() .zip(partition_type.fields()) - .any(|(field, field_type)| !field_type.field_type.compatible(field)) + .any(|(field_from_value, field_from_type)| { + !field_from_type + .field_type + .as_primitive_type() + .unwrap() + .compatible(&field_from_value.as_primitive_literal().unwrap()) + }) { return Err(Error::new( ErrorKind::DataInvalid, @@ -278,52 +412,26 @@ impl<'a> FastAppendAction<'a> { &self.partition_spec.partition_type(&self.schema)?, )?; } - self.appended_data_files.extend(data_files); + self.added_data_files.extend(data_files); Ok(self) } - fn generate_manifest_file_path(&mut self) -> String { - let manifest_id = self.manifest_id; - self.manifest_id += 1; - format!( + fn new_manifest_output(&mut self) -> Result { + let new_manifest_path = format!( "{}/{}/{}-m{}.{}", self.tx.table.metadata().location(), META_ROOT_PATH, &self.commit_uuid, - manifest_id, + self.manifest_counter.next().unwrap(), DataFileFormat::Avro - ) - } - - async fn manifest_from_parent_snapshot(&self) -> Result> { - if let Some(snapshot) = self.tx.table.metadata().current_snapshot() { - let manifest_list = snapshot - .load_manifest_list(self.tx.table.file_io(), &self.tx.table.metadata_ref()) - .await?; - let mut manifest_files = Vec::with_capacity(manifest_list.entries().len()); - for entry in manifest_list.entries() { - // From: https://github.com/apache/iceberg-python/blob/659a951d6397ab280cae80206fe6e8e4be2d3738/pyiceberg/table/__init__.py#L2921 - // Why we need this? - if entry.added_snapshot_id == self.snapshot_id { - continue; - } - let manifest = entry.load_manifest(self.tx.table.file_io()).await?; - // Skip manifest with all delete entries. - if manifest.entries().iter().all(|entry| !entry.is_alive()) { - continue; - } - manifest_files.push(entry.clone()); - } - Ok(manifest_files) - } else { - Ok(vec![]) - } + ); + self.tx.table.file_io().new_output(new_manifest_path) } // Write manifest file for added data files and return the ManifestFile for ManifestList. - async fn manifest_for_data_file(&mut self) -> Result { - let appended_data_files = std::mem::take(&mut self.appended_data_files); - let manifest_entries = appended_data_files + async fn write_added_manifest(&mut self) -> Result { + let added_data_files = std::mem::take(&mut self.added_data_files); + let manifest_entries = added_data_files .into_iter() .map(|data_file| { let builder = ManifestEntry::builder() @@ -342,124 +450,99 @@ impl<'a> FastAppendAction<'a> { .schema(self.schema.clone()) .schema_id(self.schema_id) .format_version(self.format_version) - .partition_spec(self.partition_spec.clone()) + .partition_spec(self.partition_spec.as_ref().clone()) .content(crate::spec::ManifestContentType::Data) .build(); let manifest = Manifest::new(manifest_meta, manifest_entries); let writer = ManifestWriter::new( - self.tx - .table - .file_io() - .new_output(self.generate_manifest_file_path())?, + self.new_manifest_output()?, self.snapshot_id, self.key_metadata.clone(), ); writer.write(manifest).await } - // # TODO: - // Complete the summary. - fn summary(&self) -> Summary { - Summary { - operation: crate::spec::Operation::Append, - other: HashMap::new(), - } - } - - /// Finished building the action and apply it to the transaction. - pub async fn apply(mut self) -> Result> { - let summary = self.summary(); - let manifest = self.manifest_for_data_file().await?; - let existing_manifest_files = self.manifest_from_parent_snapshot().await?; + async fn manifest_file( + &mut self, + snapshot_produce_operation: &OP, + manifest_process: &MP, + ) -> Result> { + let added_manifest = self.write_added_manifest().await?; + let existing_manifests = snapshot_produce_operation.existing_manifest(self).await?; - let snapshot_produce_action = SnapshotProduceAction::new( - self.tx, - self.snapshot_id, - self.parent_snapshot_id, - self.schema_id, - self.format_version, - self.commit_uuid, - )?; - - snapshot_produce_action - .apply( - vec![manifest] - .into_iter() - .chain(existing_manifest_files.into_iter()), - summary, - ) - .await + let mut manifest_files = vec![added_manifest]; + manifest_files.extend(existing_manifests); + let manifest_files = manifest_process.process_manifeset(manifest_files); + Ok(manifest_files) } -} - -struct SnapshotProduceAction<'a> { - tx: Transaction<'a>, - - parent_snapshot_id: Option, - snapshot_id: i64, - schema_id: i32, - format_version: FormatVersion, - commit_uuid: String, -} - -impl<'a> SnapshotProduceAction<'a> { - pub(crate) fn new( - tx: Transaction<'a>, - snapshot_id: i64, - parent_snapshot_id: Option, - schema_id: i32, - format_version: FormatVersion, - commit_uuid: String, - ) -> Result { - Ok(Self { - tx, - parent_snapshot_id, - snapshot_id, - schema_id, - format_version, - commit_uuid, - }) + // # TODO + // Fulfill this function + fn summary(&self, snapshot_produce_operation: &OP) -> Summary { + Summary { + operation: snapshot_produce_operation.operation(), + additional_properties: self.snapshot_properties.clone(), + } } - fn generate_manifest_list_file_path(&self, next_seq_num: i64) -> String { + fn generate_manifest_list_file_path(&self, attempt: i64) -> String { format!( "{}/{}/snap-{}-{}-{}.{}", self.tx.table.metadata().location(), META_ROOT_PATH, self.snapshot_id, - next_seq_num, + attempt, self.commit_uuid, DataFileFormat::Avro ) } /// Finished building the action and apply it to the transaction. - pub async fn apply( + pub async fn apply( mut self, - manifest_files: impl IntoIterator, - summary: Summary, + snapshot_produce_operation: OP, + process: MP, ) -> Result> { - let next_seq_num = if self.format_version as u8 > 1u8 { - self.tx.table.metadata().last_sequence_number() + 1 - } else { - INITIAL_SEQUENCE_NUMBER + let new_manifests = self + .manifest_file(&snapshot_produce_operation, &process) + .await?; + let next_seq_num = self.tx.table.metadata().next_sequence_number(); + + let summary = self.summary(&snapshot_produce_operation); + + let manifest_list_path = self.generate_manifest_list_file_path(0); + + let mut manifest_list_writer = match self.tx.table.metadata().format_version() { + FormatVersion::V1 => ManifestListWriter::v1( + self.tx + .table + .file_io() + .new_output(manifest_list_path.clone())?, + self.snapshot_id, + self.parent_snapshot_id, + ), + FormatVersion::V2 => ManifestListWriter::v2( + self.tx + .table + .file_io() + .new_output(manifest_list_path.clone())?, + self.snapshot_id, + self.parent_snapshot_id, + next_seq_num, + ), }; - let commit_ts = chrono::Utc::now().timestamp_millis(); - let manifest_list_path = self.generate_manifest_list_file_path(next_seq_num); - - let mut manifest_list_writer = ManifestListWriter::v2( - self.tx - .table - .file_io() - .new_output(manifest_list_path.clone())?, - self.snapshot_id, - self.parent_snapshot_id, - next_seq_num, - ); - manifest_list_writer.add_manifests(manifest_files.into_iter())?; + manifest_list_writer.add_manifests(new_manifests.into_iter())?; manifest_list_writer.close().await?; + let input = self + .tx + .table + .file_io() + .new_input(manifest_list_path.clone()) + .unwrap(); + println!("{}", input.exists().await?); + + let commit_ts = chrono::Utc::now().timestamp_millis(); let new_snapshot = Snapshot::builder() .with_manifest_list(manifest_list_path) .with_snapshot_id(self.snapshot_id) @@ -470,7 +553,6 @@ impl<'a> SnapshotProduceAction<'a> { .with_timestamp_ms(commit_ts) .build(); - let new_snapshot_id = new_snapshot.snapshot_id(); self.tx.append_updates(vec![ TableUpdate::AddSnapshot { snapshot: new_snapshot, @@ -478,7 +560,7 @@ impl<'a> SnapshotProduceAction<'a> { TableUpdate::SetSnapshotRef { ref_name: MAIN_BRANCH.to_string(), reference: SnapshotReference::new( - new_snapshot_id, + self.snapshot_id, SnapshotRetention::branch(None, None, None), ), }, @@ -577,10 +659,13 @@ mod tests { use std::fs::File; use std::io::BufReader; - use crate::io::FileIO; - use crate::spec::{FormatVersion, TableMetadata}; + use crate::io::FileIOBuilder; + use crate::spec::{ + DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, Literal, Struct, + TableMetadata, + }; use crate::table::Table; - use crate::transaction::{Transaction, MAIN_BRACNH}; + use crate::transaction::{Transaction, MAIN_BRANCH}; use crate::{TableIdent, TableRequirement, TableUpdate}; fn make_v1_table() -> Table { @@ -618,6 +703,7 @@ mod tests { .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap()) .file_io(FileIOBuilder::new("memory").build().unwrap()) .build() + .unwrap() } fn make_v2_minimal_table() -> Table { @@ -740,7 +826,7 @@ mod tests { } #[tokio::test] - async fn test_merge_snapshot_action() { + async fn test_fast_append_action() { let table = make_v2_minimal_table(); let tx = Transaction::new(&table); From b2fc5ff2fb571468a3aea30c5a060f9bdd76bdc6 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Thu, 14 Nov 2024 17:31:08 +0800 Subject: [PATCH 14/14] refine --- crates/iceberg/src/catalog/mod.rs | 2 +- crates/iceberg/src/transaction.rs | 22 +++++++--------------- 2 files changed, 8 insertions(+), 16 deletions(-) diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index 09463f8b4..b78184e3c 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -1412,7 +1412,7 @@ mod tests { .with_manifest_list("s3://a/b/2.avro") .with_summary(Summary { operation: Operation::Append, - other: HashMap::default(), + additional_properties: HashMap::default(), }) .build(), }; diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index 6c8246e83..c662faa44 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -29,8 +29,8 @@ use uuid::Uuid; use crate::error::Result; use crate::io::OutputFile; use crate::spec::{ - DataFile, DataFileFormat, FormatVersion, Manifest, ManifestEntry, ManifestFile, - ManifestListWriter, ManifestMetadata, ManifestWriter, NullOrder, Operation, PartitionSpec, + BoundPartitionSpec, DataFile, DataFileFormat, FormatVersion, Manifest, ManifestEntry, + ManifestFile, ManifestListWriter, ManifestMetadata, ManifestWriter, NullOrder, Operation, Schema, Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Struct, StructType, Summary, Transform, MAIN_BRANCH, }; @@ -203,7 +203,7 @@ impl<'a> FastAppendAction<'a> { schema: Schema, schema_id: i32, format_version: FormatVersion, - partition_spec: Arc, + partition_spec: Arc, key_metadata: Vec, commit_uuid: String, snapshot_properties: HashMap, @@ -318,7 +318,7 @@ struct SnapshotProduceAction<'a> { snapshot_id: i64, schema_id: i32, format_version: FormatVersion, - partition_spec: Arc, + partition_spec: Arc, schema: Schema, key_metadata: Vec, @@ -341,7 +341,7 @@ impl<'a> SnapshotProduceAction<'a> { parent_snapshot_id: Option, schema_id: i32, format_version: FormatVersion, - partition_spec: Arc, + partition_spec: Arc, schema: Schema, key_metadata: Vec, commit_uuid: String, @@ -409,7 +409,7 @@ impl<'a> SnapshotProduceAction<'a> { } Self::validate_partition_value( data_file.partition(), - &self.partition_spec.partition_type(&self.schema)?, + self.partition_spec.partition_type(), )?; } self.added_data_files.extend(data_files); @@ -447,7 +447,7 @@ impl<'a> SnapshotProduceAction<'a> { }) .collect(); let manifest_meta = ManifestMetadata::builder() - .schema(self.schema.clone()) + .schema(self.schema.clone().into()) .schema_id(self.schema_id) .format_version(self.format_version) .partition_spec(self.partition_spec.as_ref().clone()) @@ -534,14 +534,6 @@ impl<'a> SnapshotProduceAction<'a> { manifest_list_writer.add_manifests(new_manifests.into_iter())?; manifest_list_writer.close().await?; - let input = self - .tx - .table - .file_io() - .new_input(manifest_list_path.clone()) - .unwrap(); - println!("{}", input.exists().await?); - let commit_ts = chrono::Utc::now().timestamp_millis(); let new_snapshot = Snapshot::builder() .with_manifest_list(manifest_list_path)