@@ -47,7 +47,7 @@ async fn test_pyiceberg_integration() {
4747 let sqlite_uri = format ! ( "sqlite:///{}?mode=rwc" , catalog_db_path. display( ) ) ;
4848
4949 // Setup object store with local filesystem
50- let object_store = ObjectStoreBuilder :: filesystem ( & warehouse_path ) ;
50+ let object_store = ObjectStoreBuilder :: filesystem ( "/" ) ;
5151
5252 // Create SQL catalog with SQLite file database
5353 let sql_catalog_list = Arc :: new (
@@ -70,8 +70,12 @@ async fn test_pyiceberg_integration() {
7070
7171 let ctx = SessionContext :: new_with_state ( state) ;
7272
73- // Create external CSV table
74- let sql = "CREATE EXTERNAL TABLE lineitem (
73+ // Create external CSV table with absolute path
74+ let csv_path = std:: env:: current_dir ( )
75+ . unwrap ( )
76+ . join ( "testdata/tpch/lineitem.csv" ) ;
77+ let sql = format ! (
78+ "CREATE EXTERNAL TABLE lineitem (
7579 L_ORDERKEY BIGINT NOT NULL,
7680 L_PARTKEY BIGINT NOT NULL,
7781 L_SUPPKEY BIGINT NOT NULL,
@@ -89,10 +93,12 @@ async fn test_pyiceberg_integration() {
8993 L_SHIPMODE VARCHAR NOT NULL,
9094 L_COMMENT VARCHAR NOT NULL )
9195 STORED AS CSV
92- LOCATION 'datafusion_iceberg/testdata/tpch/lineitem.csv'
93- OPTIONS ('has_header' 'false');" ;
96+ LOCATION '{}'
97+ OPTIONS ('has_header' 'false');" ,
98+ csv_path. display( )
99+ ) ;
94100
95- let plan = ctx. state ( ) . create_logical_plan ( sql) . await . unwrap ( ) ;
101+ let plan = ctx. state ( ) . create_logical_plan ( & sql) . await . unwrap ( ) ;
96102 let transformed = plan. transform ( iceberg_transform) . data ( ) . unwrap ( ) ;
97103 ctx. execute_logical_plan ( transformed)
98104 . await
@@ -112,8 +118,10 @@ async fn test_pyiceberg_integration() {
112118 . await
113119 . expect ( "Failed to create schema" ) ;
114120
115- // Create Iceberg table
116- let sql = "CREATE EXTERNAL TABLE warehouse.tpch.lineitem (
121+ // Create Iceberg table with location relative to warehouse path
122+ let table_location = format ! ( "{}/tpch/lineitem" , warehouse_path. display( ) ) ;
123+ let sql = format ! (
124+ "CREATE EXTERNAL TABLE warehouse.tpch.lineitem (
117125 L_ORDERKEY BIGINT NOT NULL,
118126 L_PARTKEY BIGINT NOT NULL,
119127 L_SUPPKEY BIGINT NOT NULL,
@@ -131,10 +139,12 @@ async fn test_pyiceberg_integration() {
131139 L_SHIPMODE VARCHAR NOT NULL,
132140 L_COMMENT VARCHAR NOT NULL )
133141 STORED AS ICEBERG
134- LOCATION '/warehouse/tpch/lineitem'
135- PARTITIONED BY ( \" month(L_SHIPDATE)\" );" ;
142+ LOCATION '{}'
143+ PARTITIONED BY ( \" month(L_SHIPDATE)\" );" ,
144+ table_location
145+ ) ;
136146
137- let plan = ctx. state ( ) . create_logical_plan ( sql) . await . unwrap ( ) ;
147+ let plan = ctx. state ( ) . create_logical_plan ( & sql) . await . unwrap ( ) ;
138148 let transformed = plan. transform ( iceberg_transform) . data ( ) . unwrap ( ) ;
139149 ctx. execute_logical_plan ( transformed)
140150 . await
@@ -185,7 +195,7 @@ async fn test_pyiceberg_integration() {
185195 assert_eq ! ( amount. unwrap( ) , 24.0 ) ;
186196 found_24027 = true ;
187197 } else if product_id. unwrap ( ) == 63700 {
188- assert_eq ! ( amount. unwrap( ) , 8 .0) ;
198+ assert_eq ! ( amount. unwrap( ) , 23 .0) ;
189199 found_63700 = true ;
190200 }
191201 }
@@ -227,7 +237,7 @@ async fn test_pyiceberg_integration() {
227237
228238 // Load catalog
229239 let catalog = load_catalog
230- . call ( ( "default " , ) , Some ( & config) )
240+ . call ( ( "warehouse " , ) , Some ( & config) )
231241 . expect ( "Failed to load catalog" ) ;
232242
233243 // Load table
0 commit comments