-
Notifications
You must be signed in to change notification settings - Fork 236
Basic Integration with Datafusion #324
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
333b607
4bfc87a
e4ba25d
881cf37
47a041f
66a1667
709ab3e
9510b7c
2b92021
5141d11
475a9a3
0f706ca
e05fc45
c868439
26b257e
60ff7f2
7ef31fd
2f152fe
8135bc7
6cd85cc
d646785
b466936
3c9bafc
d24a0d3
948fc56
5b9d9c7
0d55fbc
294e575
13cc2d8
c95b1dd
32f33cb
30830ec
391f983
d94d615
996f249
199382d
177e5c8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
|
||
[package] | ||
name = "iceberg-datafusion" | ||
version = { workspace = true } | ||
edition = { workspace = true } | ||
homepage = { workspace = true } | ||
rust-version = { workspace = true } | ||
|
||
categories = ["database"] | ||
description = "Apache Iceberg Datafusion Integration" | ||
repository = { workspace = true } | ||
license = { workspace = true } | ||
keywords = ["iceberg", "integrations", "datafusion"] | ||
|
||
[dependencies] | ||
anyhow = { workspace = true } | ||
async-trait = { workspace = true } | ||
datafusion = { version = "37.0.0" } | ||
futures = { workspace = true } | ||
iceberg = { workspace = true } | ||
log = { workspace = true } | ||
tokio = { workspace = true } | ||
|
||
[dev-dependencies] | ||
iceberg-catalog-hms = { workspace = true } | ||
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } | ||
port_scanner = { workspace = true } |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
<!-- | ||
~ Licensed to the Apache Software Foundation (ASF) under one | ||
~ or more contributor license agreements. See the NOTICE file | ||
~ distributed with this work for additional information | ||
~ regarding copyright ownership. The ASF licenses this file | ||
~ to you under the Apache License, Version 2.0 (the | ||
~ "License"); you may not use this file except in compliance | ||
~ with the License. You may obtain a copy of the License at | ||
~ | ||
~ http://www.apache.org/licenses/LICENSE-2.0 | ||
~ | ||
~ Unless required by applicable law or agreed to in writing, | ||
~ software distributed under the License is distributed on an | ||
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
~ KIND, either express or implied. See the License for the | ||
~ specific language governing permissions and limitations | ||
~ under the License. | ||
--> | ||
|
||
# Apache Iceberg DataFusion Integration | ||
|
||
This crate contains the integration of Apache DataFusion and Apache Iceberg. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
use std::{any::Any, collections::HashMap, sync::Arc}; | ||
|
||
use datafusion::catalog::{schema::SchemaProvider, CatalogProvider}; | ||
use futures::future::try_join_all; | ||
use iceberg::{Catalog, NamespaceIdent, Result}; | ||
|
||
use crate::schema::IcebergSchemaProvider; | ||
|
||
/// Provides an interface to manage and access multiple schemas | ||
/// within an Iceberg [`Catalog`]. | ||
/// | ||
/// Acts as a centralized catalog provider that aggregates | ||
/// multiple [`SchemaProvider`], each associated with distinct namespaces. | ||
pub struct IcebergCatalogProvider { | ||
/// A `HashMap` where keys are namespace names | ||
/// and values are dynamic references to objects implementing the | ||
/// [`SchemaProvider`] trait. | ||
schemas: HashMap<String, Arc<dyn SchemaProvider>>, | ||
} | ||
|
||
impl IcebergCatalogProvider { | ||
/// Asynchronously tries to construct a new [`IcebergCatalogProvider`] | ||
/// using the given client to fetch and initialize schema providers for | ||
/// each namespace in the Iceberg [`Catalog`]. | ||
/// | ||
/// This method retrieves the list of namespace names | ||
/// attempts to create a schema provider for each namespace, and | ||
/// collects these providers into a `HashMap`. | ||
pub async fn try_new(client: Arc<dyn Catalog>) -> Result<Self> { | ||
// TODO: | ||
// Schemas and providers should be cached and evicted based on time | ||
// As of right now; schemas might become stale. | ||
let schema_names: Vec<_> = client | ||
.list_namespaces(None) | ||
.await? | ||
.iter() | ||
.flat_map(|ns| ns.as_ref().clone()) | ||
marvinlanhenke marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.collect(); | ||
|
||
let providers = try_join_all( | ||
schema_names | ||
.iter() | ||
.map(|name| { | ||
IcebergSchemaProvider::try_new( | ||
client.clone(), | ||
NamespaceIdent::new(name.clone()), | ||
) | ||
}) | ||
.collect::<Vec<_>>(), | ||
) | ||
.await?; | ||
|
||
let schemas: HashMap<String, Arc<dyn SchemaProvider>> = schema_names | ||
.into_iter() | ||
.zip(providers.into_iter()) | ||
.map(|(name, provider)| { | ||
let provider = Arc::new(provider) as Arc<dyn SchemaProvider>; | ||
(name, provider) | ||
}) | ||
.collect(); | ||
|
||
Ok(IcebergCatalogProvider { schemas }) | ||
} | ||
} | ||
|
||
impl CatalogProvider for IcebergCatalogProvider { | ||
fn as_any(&self) -> &dyn Any { | ||
self | ||
} | ||
|
||
fn schema_names(&self) -> Vec<String> { | ||
self.schemas.keys().cloned().collect() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm thinking that this maybe incorrect since others processes may create new namespaces after creating the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, you're right this is a simple but naive impl. based on the docs and the ref from delta-rs. I think having a caching wrapper would be a better solution. However, since we have to provide a vec of all schema_names we cannot check if a single schema is in the cache and if not fetch again. So I'm guessing we can only implement it with a time based eviction policy i.e. cache all schemas for 1min in order to avoid multiple network calls in a short amount of time (same concept as debouncing)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, when adding a cache, we need to a design for the trade of between performance and consistency. But I think we could leave it to actualy cache implentation? For this pr, we should not cache them in the providers, e.g. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sure, we can do it like that for this PR and impl. the more sophisticated solution later. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @liurenjie1024 So perhaps; we leave it as is for this PR and work on the cache instead? However, I'll guess we're facing the same problem here as well? We could use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
It does possible but we should avoid using
I'm second with this suggestion. We can move on and figure how to improve this part in the future. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should be hesitant with caching, especially avoiding upfront optimizations. For Iceberg in general, consistency is king.
This makes sense, but I see the issue with blocking on async calls. At first, I would take the price of waiting for the blocking calls. Even it is still a remote call, the ones to the REST catalog should be lightning-fast (that's where the caching happens). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think blocking + timeout would be a reasonable solution before we implement sophiscated caching. For this pr, we can leave it as now since it's even not caching, it's a snapshot. WDYT? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
+1 for "moving on" and create or track those issues in #357 before this PR gets too big. |
||
} | ||
|
||
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> { | ||
self.schemas.get(name).cloned() | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
use anyhow::anyhow; | ||
use iceberg::{Error, ErrorKind}; | ||
|
||
/// Converts a datafusion error into an iceberg error. | ||
pub fn from_datafusion_error(error: datafusion::error::DataFusionError) -> Error { | ||
Error::new( | ||
ErrorKind::Unexpected, | ||
"Operation failed for hitting datafusion error".to_string(), | ||
) | ||
.with_source(anyhow!("datafusion error: {:?}", error)) | ||
} | ||
/// Converts an iceberg error into a datafusion error. | ||
pub fn to_datafusion_error(error: Error) -> datafusion::error::DataFusionError { | ||
datafusion::error::DataFusionError::External(error.into()) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
mod catalog; | ||
pub use catalog::*; | ||
|
||
mod error; | ||
pub use error::*; | ||
|
||
mod physical_plan; | ||
mod schema; | ||
mod table; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
pub(crate) mod scan; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there no way to leave this up to the user?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😄the classic.
I think leaving it up to the user, leads us to the issue about blocking an async call in a sync trait function? I think if we have an idea how to handle this, we can better reason about if, when, and where to cache?
from the docs: