From 064c927aaffe92bc9e97e59da4bc61a6b414588e Mon Sep 17 00:00:00 2001 From: Gabriel Igliozzi Date: Sun, 27 Jul 2025 23:03:49 -0700 Subject: [PATCH 1/2] implement register table for rest catalog --- crates/catalog/glue/src/catalog.rs | 1 + crates/catalog/hms/src/catalog.rs | 1 + crates/catalog/rest/src/catalog.rs | 151 +++++++++++++++++- crates/catalog/rest/src/types.rs | 8 + .../catalog/rest/tests/rest_catalog_test.rs | 40 +++++ crates/catalog/s3tables/src/catalog.rs | 1 + crates/catalog/sql/src/catalog.rs | 1 + crates/iceberg/src/catalog/memory/catalog.rs | 6 +- crates/iceberg/src/catalog/mod.rs | 7 +- 9 files changed, 206 insertions(+), 10 deletions(-) diff --git a/crates/catalog/glue/src/catalog.rs b/crates/catalog/glue/src/catalog.rs index f4b4a01f9a..7403600828 100644 --- a/crates/catalog/glue/src/catalog.rs +++ b/crates/catalog/glue/src/catalog.rs @@ -627,6 +627,7 @@ impl Catalog for GlueCatalog { &self, _table_ident: &TableIdent, _metadata_location: String, + _overwrite: Option, ) -> Result { Err(Error::new( ErrorKind::FeatureUnsupported, diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index 72fb8c6b33..0342f785d7 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -504,6 +504,7 @@ impl Catalog for HmsCatalog { &self, _table_ident: &TableIdent, _metadata_location: String, + _overwrite: Option, ) -> Result
{ Err(Error::new( ErrorKind::FeatureUnsupported, diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 5c9e6e15ac..97e25bfc05 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -42,7 +42,7 @@ use crate::client::{ use crate::types::{ CatalogConfig, CommitTableRequest, CommitTableResponse, CreateTableRequest, ListNamespaceResponse, ListTableResponse, LoadTableResponse, NamespaceSerde, - RenameTableRequest, + RegisterTableRequest, RenameTableRequest, }; const ICEBERG_REST_SPEC_VERSION: &str = "0.14.1"; @@ -101,6 +101,10 @@ impl RestCatalogConfig { self.url_prefixed(&["tables", "rename"]) } + fn register_table_endpoint(&self, ns: &NamespaceIdent) -> String { + self.url_prefixed(&["namespaces", &ns.to_url_string(), "register"]) + } + fn table_endpoint(&self, table: &TableIdent) -> String { self.url_prefixed(&[ "namespaces", @@ -238,7 +242,7 @@ struct RestContext { pub struct RestCatalog { /// User config is stored as-is and never be changed. /// - /// It's could be different from the config fetched from the server and used at runtime. + /// It could be different from the config fetched from the server and used at runtime. user_config: RestCatalogConfig, ctx: OnceCell, /// Extensions for the FileIOBuilder. @@ -755,13 +759,61 @@ impl Catalog for RestCatalog { async fn register_table( &self, - _table_ident: &TableIdent, - _metadata_location: String, + table_ident: &TableIdent, + metadata_location: String, + overwrite: Option, ) -> Result
{ - Err(Error::new( - ErrorKind::FeatureUnsupported, - "Registering a table is not supported yet", - )) + let context = self.context().await?; + + let request = context + .client + .request( + Method::POST, + context + .config + .register_table_endpoint(table_ident.namespace()), + ) + .json(&RegisterTableRequest { + name: table_ident.name.clone(), + metadata_location: metadata_location.clone(), + overwrite, + }) + .build()?; + + let http_response = context.client.query_catalog(request).await?; + + let response: LoadTableResponse = match http_response.status() { + StatusCode::OK => { + deserialize_catalog_response::(http_response).await? + } + StatusCode::NOT_FOUND => { + return Err(Error::new( + ErrorKind::NamespaceNotFound, + "The namespace specified does not exist.", + )); + } + StatusCode::CONFLICT => { + return Err(Error::new( + ErrorKind::TableAlreadyExists, + "The given table already exists.", + )); + } + _ => return Err(deserialize_unexpected_catalog_error(http_response).await), + }; + + let metadata_location = response.metadata_location.as_ref().ok_or(Error::new( + ErrorKind::DataInvalid, + "Metadata location missing in `register_table` response!", + ))?; + + let file_io = self.load_file_io(Some(metadata_location), None).await?; + + Table::builder() + .identifier(table_ident.clone()) + .file_io(file_io) + .metadata(response.metadata) + .metadata_location(metadata_location.clone()) + .build() } async fn update_table(&self, mut commit: TableCommit) -> Result
{ @@ -2470,4 +2522,87 @@ mod tests { update_table_mock.assert_async().await; load_table_mock.assert_async().await; } + + #[tokio::test] + async fn test_register_table() { + let mut server = Server::new_async().await; + + let config_mock = create_config_mock(&mut server).await; + + let register_table_mock = server + .mock("POST", "/v1/namespaces/ns1/register") + .with_status(200) + .with_body_from_file(format!( + "{}/testdata/{}", + env!("CARGO_MANIFEST_DIR"), + "load_table_response.json" + )) + .create_async() + .await; + + let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build()); + let table_ident = + TableIdent::new(NamespaceIdent::new("ns1".to_string()), "test1".to_string()); + let metadata_location = String::from( + "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json", + ); + + let table = catalog + .register_table(&table_ident, metadata_location, Some(false)) + .await + .unwrap(); + + assert_eq!( + &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(), + table.identifier() + ); + assert_eq!( + "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json", + table.metadata_location().unwrap() + ); + + config_mock.assert_async().await; + register_table_mock.assert_async().await; + } + + #[tokio::test] + async fn test_register_table_404() { + let mut server = Server::new_async().await; + + let config_mock = create_config_mock(&mut server).await; + + let register_table_mock = server + .mock("POST", "/v1/namespaces/ns1/register") + .with_status(404) + .with_body( + r#" +{ + "error": { + "message": "The namespace specified does not exist", + "type": "NoSuchNamespaceErrorException", + "code": 404 + } +} + "#, + ) + .create_async() + .await; + + let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build()); + + let table_ident = + TableIdent::new(NamespaceIdent::new("ns1".to_string()), "test1".to_string()); + let metadata_location = String::from( + "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json", + ); + let table = catalog + .register_table(&table_ident, metadata_location, None) + .await; + + assert!(table.is_err()); + assert!(table.err().unwrap().message().contains("does not exist")); + + config_mock.assert_async().await; + register_table_mock.assert_async().await; + } } diff --git a/crates/catalog/rest/src/types.rs b/crates/catalog/rest/src/types.rs index 63be9b768e..70ed72051a 100644 --- a/crates/catalog/rest/src/types.rs +++ b/crates/catalog/rest/src/types.rs @@ -191,3 +191,11 @@ pub(super) struct CommitTableResponse { pub(super) metadata_location: String, pub(super) metadata: TableMetadata, } + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub(super) struct RegisterTableRequest { + pub(super) name: String, + pub(super) metadata_location: String, + pub(super) overwrite: Option, +} diff --git a/crates/catalog/rest/tests/rest_catalog_test.rs b/crates/catalog/rest/tests/rest_catalog_test.rs index 94d2e3927c..35ba7fef4d 100644 --- a/crates/catalog/rest/tests/rest_catalog_test.rs +++ b/crates/catalog/rest/tests/rest_catalog_test.rs @@ -407,3 +407,43 @@ async fn test_list_empty_multi_level_namespace() { .unwrap(); assert!(nss.is_empty()); } + +#[tokio::test] +async fn test_register_table() { + let catalog = get_catalog().await; + + // Create namespace + let ns = NamespaceIdent::from_strs(["ns"]).unwrap(); + catalog.create_namespace(&ns, HashMap::new()).await.unwrap(); + + // Create the table, store the metadata location, drop the table + let empty_schema = Schema::builder().build().unwrap(); + let table_creation = TableCreation::builder() + .name("t1".to_string()) + .schema(empty_schema) + .build(); + + let table = catalog.create_table(&ns, table_creation).await.unwrap(); + + let metadata_location = table.metadata_location().unwrap(); + catalog.drop_table(table.identifier()).await.unwrap(); + + let new_table_identifier = TableIdent::from_strs(["ns", "t2"]).unwrap(); + let table_registered = catalog + .register_table( + &new_table_identifier, + metadata_location.to_string(), + Some(false), + ) + .await + .unwrap(); + + assert_eq!( + table.metadata_location(), + table_registered.metadata_location() + ); + assert_ne!( + table.identifier().to_string(), + table_registered.identifier().to_string() + ); +} diff --git a/crates/catalog/s3tables/src/catalog.rs b/crates/catalog/s3tables/src/catalog.rs index 191356d711..aece34f0ac 100644 --- a/crates/catalog/s3tables/src/catalog.rs +++ b/crates/catalog/s3tables/src/catalog.rs @@ -472,6 +472,7 @@ impl Catalog for S3TablesCatalog { &self, _table_ident: &TableIdent, _metadata_location: String, + _overwrite: Option, ) -> Result
{ Err(Error::new( ErrorKind::FeatureUnsupported, diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 56c6fadcf1..93a6dee3ba 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -771,6 +771,7 @@ impl Catalog for SqlCatalog { &self, _table_ident: &TableIdent, _metadata_location: String, + _overwrite: Option, ) -> Result
{ Err(Error::new( ErrorKind::FeatureUnsupported, diff --git a/crates/iceberg/src/catalog/memory/catalog.rs b/crates/iceberg/src/catalog/memory/catalog.rs index 12d18b9f36..35f2eab814 100644 --- a/crates/iceberg/src/catalog/memory/catalog.rs +++ b/crates/iceberg/src/catalog/memory/catalog.rs @@ -279,7 +279,11 @@ impl Catalog for MemoryCatalog { &self, table_ident: &TableIdent, metadata_location: String, + overwrite: Option, ) -> Result
{ + // TODO: Use overwrite in `insert_new_table` to overwrite metadata for an already existing table + let _overwrite = overwrite.unwrap_or(false); + let mut root_namespace_state = self.root_namespace_state.lock().await; root_namespace_state.insert_new_table(&table_ident.clone(), metadata_location.clone())?; @@ -1767,7 +1771,7 @@ mod tests { let register_table_ident = TableIdent::new(namespace_ident.clone(), "register_table".into()); let registered_table = catalog - .register_table(®ister_table_ident, metadata_location.clone()) + .register_table(®ister_table_ident, metadata_location.clone(), None) .await .unwrap(); diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index a468edc475..5c15f31be5 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -104,7 +104,12 @@ pub trait Catalog: Debug + Sync + Send { async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()>; /// Register an existing table to the catalog. - async fn register_table(&self, table: &TableIdent, metadata_location: String) -> Result
; + async fn register_table( + &self, + table: &TableIdent, + metadata_location: String, + overwrite: Option, + ) -> Result
; /// Update a table to the catalog. async fn update_table(&self, commit: TableCommit) -> Result
; From f01c9873a3f8e3457e8a31f4eb564e530b2aa59f Mon Sep 17 00:00:00 2001 From: Gabriel Igliozzi Date: Wed, 30 Jul 2025 10:13:27 -0700 Subject: [PATCH 2/2] Revert catalog trait change --- crates/catalog/glue/src/catalog.rs | 1 - crates/catalog/hms/src/catalog.rs | 1 - crates/catalog/rest/src/catalog.rs | 7 +++---- crates/catalog/rest/tests/rest_catalog_test.rs | 6 +----- crates/catalog/s3tables/src/catalog.rs | 1 - crates/catalog/sql/src/catalog.rs | 1 - crates/iceberg/src/catalog/memory/catalog.rs | 6 +----- crates/iceberg/src/catalog/mod.rs | 7 +------ 8 files changed, 6 insertions(+), 24 deletions(-) diff --git a/crates/catalog/glue/src/catalog.rs b/crates/catalog/glue/src/catalog.rs index 7403600828..f4b4a01f9a 100644 --- a/crates/catalog/glue/src/catalog.rs +++ b/crates/catalog/glue/src/catalog.rs @@ -627,7 +627,6 @@ impl Catalog for GlueCatalog { &self, _table_ident: &TableIdent, _metadata_location: String, - _overwrite: Option, ) -> Result
{ Err(Error::new( ErrorKind::FeatureUnsupported, diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index 0342f785d7..72fb8c6b33 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -504,7 +504,6 @@ impl Catalog for HmsCatalog { &self, _table_ident: &TableIdent, _metadata_location: String, - _overwrite: Option, ) -> Result
{ Err(Error::new( ErrorKind::FeatureUnsupported, diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 97e25bfc05..7d81982f55 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -761,7 +761,6 @@ impl Catalog for RestCatalog { &self, table_ident: &TableIdent, metadata_location: String, - overwrite: Option, ) -> Result
{ let context = self.context().await?; @@ -776,7 +775,7 @@ impl Catalog for RestCatalog { .json(&RegisterTableRequest { name: table_ident.name.clone(), metadata_location: metadata_location.clone(), - overwrite, + overwrite: Some(false), }) .build()?; @@ -2548,7 +2547,7 @@ mod tests { ); let table = catalog - .register_table(&table_ident, metadata_location, Some(false)) + .register_table(&table_ident, metadata_location) .await .unwrap(); @@ -2596,7 +2595,7 @@ mod tests { "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json", ); let table = catalog - .register_table(&table_ident, metadata_location, None) + .register_table(&table_ident, metadata_location) .await; assert!(table.is_err()); diff --git a/crates/catalog/rest/tests/rest_catalog_test.rs b/crates/catalog/rest/tests/rest_catalog_test.rs index 35ba7fef4d..393b243537 100644 --- a/crates/catalog/rest/tests/rest_catalog_test.rs +++ b/crates/catalog/rest/tests/rest_catalog_test.rs @@ -430,11 +430,7 @@ async fn test_register_table() { let new_table_identifier = TableIdent::from_strs(["ns", "t2"]).unwrap(); let table_registered = catalog - .register_table( - &new_table_identifier, - metadata_location.to_string(), - Some(false), - ) + .register_table(&new_table_identifier, metadata_location.to_string()) .await .unwrap(); diff --git a/crates/catalog/s3tables/src/catalog.rs b/crates/catalog/s3tables/src/catalog.rs index aece34f0ac..191356d711 100644 --- a/crates/catalog/s3tables/src/catalog.rs +++ b/crates/catalog/s3tables/src/catalog.rs @@ -472,7 +472,6 @@ impl Catalog for S3TablesCatalog { &self, _table_ident: &TableIdent, _metadata_location: String, - _overwrite: Option, ) -> Result
{ Err(Error::new( ErrorKind::FeatureUnsupported, diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 93a6dee3ba..56c6fadcf1 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -771,7 +771,6 @@ impl Catalog for SqlCatalog { &self, _table_ident: &TableIdent, _metadata_location: String, - _overwrite: Option, ) -> Result
{ Err(Error::new( ErrorKind::FeatureUnsupported, diff --git a/crates/iceberg/src/catalog/memory/catalog.rs b/crates/iceberg/src/catalog/memory/catalog.rs index 35f2eab814..12d18b9f36 100644 --- a/crates/iceberg/src/catalog/memory/catalog.rs +++ b/crates/iceberg/src/catalog/memory/catalog.rs @@ -279,11 +279,7 @@ impl Catalog for MemoryCatalog { &self, table_ident: &TableIdent, metadata_location: String, - overwrite: Option, ) -> Result
{ - // TODO: Use overwrite in `insert_new_table` to overwrite metadata for an already existing table - let _overwrite = overwrite.unwrap_or(false); - let mut root_namespace_state = self.root_namespace_state.lock().await; root_namespace_state.insert_new_table(&table_ident.clone(), metadata_location.clone())?; @@ -1771,7 +1767,7 @@ mod tests { let register_table_ident = TableIdent::new(namespace_ident.clone(), "register_table".into()); let registered_table = catalog - .register_table(®ister_table_ident, metadata_location.clone(), None) + .register_table(®ister_table_ident, metadata_location.clone()) .await .unwrap(); diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index 5c15f31be5..a468edc475 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -104,12 +104,7 @@ pub trait Catalog: Debug + Sync + Send { async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()>; /// Register an existing table to the catalog. - async fn register_table( - &self, - table: &TableIdent, - metadata_location: String, - overwrite: Option, - ) -> Result
; + async fn register_table(&self, table: &TableIdent, metadata_location: String) -> Result
; /// Update a table to the catalog. async fn update_table(&self, commit: TableCommit) -> Result
;