Skip to content

Commit 3a328ed

Browse files
committed
Add a e2e test for writing an Iceberg table with pyiceberg and reading it with DataFusion
1 parent f9de01b commit 3a328ed

File tree

5 files changed

+161
-0
lines changed

5 files changed

+161
-0
lines changed

Diff for: crates/integration_tests/Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -31,5 +31,7 @@ futures = { workspace = true }
3131
iceberg = { workspace = true }
3232
iceberg-catalog-rest = { workspace = true }
3333
iceberg_test_utils = { path = "../test_utils", features = ["tests"] }
34+
iceberg-datafusion = { workspace = true }
35+
datafusion = "43"
3436
parquet = { workspace = true }
3537
tokio = { workspace = true }

Diff for: crates/integration_tests/testdata/docker-compose.yaml

+16
Original file line numberDiff line numberDiff line change
@@ -83,3 +83,19 @@ services:
8383
links:
8484
- rest:rest
8585
- minio:minio
86+
87+
pyiceberg:
88+
build: pyiceberg/
89+
networks:
90+
rest_bridge:
91+
depends_on:
92+
- rest
93+
- minio
94+
environment:
95+
- AWS_ACCESS_KEY_ID=admin
96+
- AWS_SECRET_ACCESS_KEY=password
97+
- AWS_REGION=us-east-1
98+
- PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE=true
99+
links:
100+
- rest:rest
101+
- minio:minio
+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
FROM python:3.9-bullseye
17+
18+
RUN pip install pyiceberg==0.8 pyarrow==18.0 datafusion==43.1.0
19+
20+
COPY load_types_table.py .
21+
22+
ENTRYPOINT python3 load_types_table.py
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
from datafusion import SessionContext
2+
from pyiceberg.catalog import load_catalog
3+
import pyarrow.parquet as pq
4+
5+
# Generate a table with various types in memory and dump to a Parquet file
6+
ctx = SessionContext()
7+
ctx.sql("""
8+
CREATE TABLE types_test (
9+
cboolean BOOLEAN,
10+
cint8 TINYINT,
11+
cint16 SMALLINT,
12+
cint32 INT,
13+
cint64 BIGINT,
14+
cfloat32 REAL,
15+
cfloat64 DOUBLE PRECISION,
16+
cdecimal DECIMAL(8, 2),
17+
cdate32 DATE,
18+
ctimestamp TIMESTAMP,
19+
ctimestamptz TIMESTAMPTZ,
20+
cutf8 TEXT,
21+
cbinary BYTEA
22+
) AS SELECT
23+
s % 2 = 1 as cboolean,
24+
(s % 256 - 128) as cint8,
25+
s as cint16,
26+
s as cint32,
27+
s as cint64,
28+
s as cfloat32,
29+
s as cfloat64,
30+
s::NUMERIC / 100 as cnumeric,
31+
s as cdate,
32+
s * 1000 as ctimestamp,
33+
s * 1000 as ctimestampz,
34+
s::TEXT as cutf8,
35+
s::TEXT cbinary
36+
FROM unnest(generate_series(0, 1000)) AS q(s);
37+
""")
38+
a = ctx.sql("COPY types_test TO 'types_test.parquet'")
39+
# File loading fails in the container without this line???
40+
print(f"Created a Parquet file with {a} rows")
41+
42+
# Load the Parquet file
43+
parquet_file = pq.read_table("./types_test.parquet")
44+
45+
# Connect to the REST catalog
46+
catalog = load_catalog(
47+
"rest",
48+
**{
49+
"type": "rest",
50+
"uri": "http://rest:8181",
51+
"s3.endpoint": "http://minio:9000",
52+
"s3.access-key-id": "admin",
53+
"s3.secret-access-key": "password",
54+
},
55+
)
56+
57+
# Create a corresponding Iceberg table and append the file to it
58+
iceberg_table = catalog.create_table_if_not_exists(
59+
identifier=f"default.types_test",
60+
schema=parquet_file.schema,
61+
)
62+
iceberg_table.append(df=parquet_file)

Diff for: crates/integration_tests/tests/datafusion.rs

+59
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
use std::sync::Arc;
2+
3+
pub use datafusion::assert_batches_eq;
4+
pub use datafusion::error::DataFusionError;
5+
pub use datafusion::prelude::SessionContext;
6+
use iceberg::{Catalog, TableIdent};
7+
use iceberg_datafusion::IcebergTableProvider;
8+
use iceberg_integration_tests::set_test_fixture;
9+
10+
#[tokio::test]
11+
async fn test_basic_queries() -> Result<(), DataFusionError> {
12+
let fixture = set_test_fixture("datafusion_basic_read").await;
13+
14+
let catalog = fixture.rest_catalog;
15+
16+
let table = catalog
17+
.load_table(&TableIdent::from_strs(["default", "types_test"]).unwrap())
18+
.await
19+
.unwrap();
20+
21+
let ctx = SessionContext::new();
22+
23+
let table_provider = Arc::new(
24+
IcebergTableProvider::try_new_from_table(table)
25+
.await
26+
.unwrap(),
27+
);
28+
29+
ctx.register_table("types_table", table_provider)?;
30+
31+
let batches = ctx
32+
.sql("SELECT * FROM types_table LIMIT 3")
33+
.await?
34+
.collect()
35+
.await?;
36+
let expected = [
37+
"+----------+-------+--------+--------+--------+----------+----------+----------+------------+----------------------------+-----------------------------+-------+---------+",
38+
"| cboolean | cint8 | cint16 | cint32 | cint64 | cfloat32 | cfloat64 | cdecimal | cdate32 | ctimestamp | ctimestamptz | cutf8 | cbinary |",
39+
"+----------+-------+--------+--------+--------+----------+----------+----------+------------+----------------------------+-----------------------------+-------+---------+",
40+
"| false | -128 | 0 | 0 | 0 | 0.0 | 0.0 | 0.00 | 1970-01-01 | 1970-01-01T00:00:00 | 1970-01-01T00:00:00Z | 0 | 30 |",
41+
"| true | -127 | 1 | 1 | 1 | 1.0 | 1.0 | 0.01 | 1970-01-02 | 1970-01-01T00:00:00.000001 | 1970-01-01T00:00:00.000001Z | 1 | 31 |",
42+
"| false | -126 | 2 | 2 | 2 | 2.0 | 2.0 | 0.02 | 1970-01-03 | 1970-01-01T00:00:00.000002 | 1970-01-01T00:00:00.000002Z | 2 | 32 |",
43+
"+----------+-------+--------+--------+--------+----------+----------+----------+------------+----------------------------+-----------------------------+-------+---------+",
44+
];
45+
assert_batches_eq!(expected, &batches);
46+
47+
// TODO: this isn't OK, and should be fixed with https://github.com/apache/iceberg-rust/issues/813
48+
let err = ctx
49+
.sql("SELECT cdecimal FROM types_table WHERE cint16 <= 2")
50+
.await?
51+
.collect()
52+
.await
53+
.unwrap_err();
54+
assert!(err
55+
.to_string()
56+
.contains("Invalid comparison operation: Int16 <= Int32"));
57+
58+
Ok(())
59+
}

0 commit comments

Comments
 (0)