Skip to content

Commit b11f6c5

Browse files
committed
Merge pull request JanKaul#228 from JanKaul/fix-equalitly-deletes
use separate manifest files for delete files
2 parents 5453b20 + e6bc9a4 commit b11f6c5

File tree

10 files changed

+444
-200
lines changed

10 files changed

+444
-200
lines changed

.github/workflows/rust.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ jobs:
2020
android: true
2121
dotnet: true
2222
haskell: true
23-
large-packages: false
23+
large-packages: true
2424
docker-images: false
2525
swap-storage: false
2626
- uses: actions/checkout@v3

Cargo.lock

Lines changed: 67 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Makefile

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,16 @@ test-iceberg-rust:
77
cargo test -p iceberg-rust --lib
88

99
test-datafusion_iceberg:
10-
cargo test -p datafusion_iceberg --tests -j 2
10+
cargo test -p datafusion_iceberg --tests -j 2 && cargo clean -p datafusion_iceberg
1111

1212
test-rest-catalog:
13-
cargo test -p iceberg-rest-catalog --lib
13+
cargo test -p iceberg-rest-catalog --lib && cargo clean -p iceberg-rest-catalog
1414

1515
test-file-catalog:
16-
cargo test -p iceberg-file-catalog --lib
16+
cargo test -p iceberg-file-catalog --lib && cargo clean -p iceberg-file-catalog
1717

1818
test-sql-catalog:
19-
cargo test -p iceberg-sql-catalog --lib
19+
cargo test -p iceberg-sql-catalog --lib && cargo clean -p iceberg-sql-catalog
2020
clippy:
2121
cargo clippy --all-targets --all-features -- -D warnings
2222
fmt:

datafusion_iceberg/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ url = { workspace = true }
2929
uuid = { workspace = true }
3030

3131
[dev-dependencies]
32+
duckdb = { version = "1.3.2", features = ["bundled"] }
3233
iceberg-rest-catalog = { path = "../catalogs/iceberg-rest-catalog" }
3334
iceberg-sql-catalog = { path = "../catalogs/iceberg-sql-catalog" }
3435
reqwest = "0.12"

datafusion_iceberg/tests/equality_delete.rs

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
1-
use datafusion::{arrow::error::ArrowError, assert_batches_eq, prelude::SessionContext};
1+
use datafusion::{
2+
arrow::{error::ArrowError, record_batch::RecordBatch},
3+
assert_batches_eq,
4+
prelude::SessionContext,
5+
};
26
use datafusion_iceberg::catalog::catalog::IcebergCatalog;
7+
use duckdb::Connection;
38
use futures::stream;
49
use iceberg_rust::catalog::identifier::Identifier;
510
use iceberg_rust::catalog::tabular::Tabular;
@@ -16,11 +21,15 @@ use iceberg_rust::{
1621
table::Table,
1722
};
1823
use iceberg_sql_catalog::SqlCatalog;
24+
use object_store::local::LocalFileSystem;
1925
use std::sync::Arc;
26+
use tempfile::TempDir;
2027

2128
#[tokio::test]
2229
pub async fn test_equality_delete() {
23-
let object_store = ObjectStoreBuilder::memory();
30+
let temp_dir = TempDir::new().unwrap();
31+
let table_dir = format!("{}/test/orders", temp_dir.path().to_str().unwrap());
32+
let object_store = ObjectStoreBuilder::Filesystem(Arc::new(LocalFileSystem::new()));
2433

2534
let catalog: Arc<dyn Catalog> = Arc::new(
2635
SqlCatalog::new("sqlite://", "warehouse", object_store.clone())
@@ -79,7 +88,7 @@ pub async fn test_equality_delete() {
7988

8089
let table = Table::builder()
8190
.with_name("orders")
82-
.with_location("/test/orders")
91+
.with_location(&table_dir)
8392
.with_schema(schema)
8493
.with_partition_spec(partition_spec)
8594
.build(&["test".to_owned()], catalog.clone())
@@ -194,4 +203,28 @@ pub async fn test_equality_delete() {
194203
"+----+-------------+------------+------------+--------+",
195204
];
196205
assert_batches_eq!(expected, &batches);
206+
207+
let latest_version = table
208+
.object_store()
209+
.get(&object_store::path::Path::from(format!(
210+
"{table_dir}/metadata/version-hint.text"
211+
)))
212+
.await
213+
.unwrap()
214+
.bytes()
215+
.await
216+
.unwrap();
217+
let latest_version_str = std::str::from_utf8(&latest_version).unwrap();
218+
219+
let conn = Connection::open_in_memory().unwrap();
220+
conn.execute("install iceberg", []).unwrap();
221+
conn.execute("load iceberg", []).unwrap();
222+
223+
let duckdb_batches: Vec<RecordBatch> = conn
224+
.prepare("select * from iceberg_scan(?) order by id")
225+
.unwrap()
226+
.query_arrow([latest_version_str])
227+
.unwrap()
228+
.collect();
229+
assert_batches_eq!(expected, &duckdb_batches);
197230
}

iceberg-rust-spec/src/spec/manifest_list.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ pub struct FieldSummary {
9494
pub upper_bound: Option<Value>,
9595
}
9696

97-
#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone)]
97+
#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone, Copy)]
9898
#[repr(u8)]
9999
/// Type of content stored by the data file.
100100
pub enum Content {

iceberg-rust/src/table/manifest.rs

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ impl<'schema, 'metadata> ManifestWriter<'schema, 'metadata> {
181181
snapshot_id: i64,
182182
schema: &'schema AvroSchema,
183183
table_metadata: &'metadata TableMetadata,
184+
content: manifest_list::Content,
184185
branch: Option<&str>,
185186
) -> Result<Self, Error> {
186187
let mut writer = AvroWriter::new(schema, Vec::new());
@@ -228,14 +229,20 @@ impl<'schema, 'metadata> ManifestWriter<'schema, 'metadata> {
228229
serde_json::to_string(&spec_id)?,
229230
)?;
230231

231-
writer.add_user_metadata("content".to_string(), "data")?;
232+
writer.add_user_metadata(
233+
"content".to_string(),
234+
match content {
235+
manifest_list::Content::Data => "data",
236+
manifest_list::Content::Deletes => "deletes",
237+
},
238+
)?;
232239

233240
let manifest = ManifestListEntry {
234241
format_version: table_metadata.format_version,
235242
manifest_path: manifest_location.to_owned(),
236243
manifest_length: 0,
237244
partition_spec_id: table_metadata.default_spec_id,
238-
content: manifest_list::Content::Data,
245+
content,
239246
sequence_number: table_metadata.last_sequence_number + 1,
240247
min_sequence_number: table_metadata.last_sequence_number + 1,
241248
added_snapshot_id: snapshot_id,
@@ -330,7 +337,13 @@ impl<'schema, 'metadata> ManifestWriter<'schema, 'metadata> {
330337
serde_json::to_string(&spec_id)?,
331338
)?;
332339

333-
writer.add_user_metadata("content".to_string(), "data")?;
340+
writer.add_user_metadata(
341+
"content".to_string(),
342+
match manifest.content {
343+
manifest_list::Content::Data => "data",
344+
manifest_list::Content::Deletes => "deletes",
345+
},
346+
)?;
334347

335348
writer.extend(
336349
manifest_reader
@@ -454,7 +467,13 @@ impl<'schema, 'metadata> ManifestWriter<'schema, 'metadata> {
454467
serde_json::to_string(&spec_id)?,
455468
)?;
456469

457-
writer.add_user_metadata("content".to_string(), "data")?;
470+
writer.add_user_metadata(
471+
"content".to_string(),
472+
match manifest.content {
473+
manifest_list::Content::Data => "data",
474+
manifest_list::Content::Deletes => "deletes",
475+
},
476+
)?;
458477

459478
writer.extend(manifest_reader.filter_map(|entry| {
460479
let mut entry = entry

0 commit comments

Comments
 (0)