diff --git a/docs/changelog/next_release/413.feature.rst b/docs/changelog/next_release/413.feature.rst new file mode 100644 index 000000000..9059a9751 --- /dev/null +++ b/docs/changelog/next_release/413.feature.rst @@ -0,0 +1 @@ +Implement ``Iceberg.DelegatedWarehouse`` allowing to use vended S3 credentials or remote-signing. diff --git a/docs/connection/db_connection/iceberg/index.rst b/docs/connection/db_connection/iceberg/index.rst index 191b88ab9..e1c2c4e02 100644 --- a/docs/connection/db_connection/iceberg/index.rst +++ b/docs/connection/db_connection/iceberg/index.rst @@ -16,6 +16,7 @@ Iceberg warehouse_filesystem warehouse_s3 + warehouse_delegated .. toctree:: :maxdepth: 1 diff --git a/docs/connection/db_connection/iceberg/prerequisites.rst b/docs/connection/db_connection/iceberg/prerequisites.rst index 49bb52152..f9ece793f 100644 --- a/docs/connection/db_connection/iceberg/prerequisites.rst +++ b/docs/connection/db_connection/iceberg/prerequisites.rst @@ -34,9 +34,9 @@ Popular Metastore Implementations Iceberg supports multiple catalog implementations. Here are some popular options: -* **Apache Iceberg Hadoop Catalog** — `File-based catalog using Hadoop filesystem `_ -* **Apache Iceberg REST Catalog** — `Iceberg REST catalog `_ -* **Polaris** — `Сatalog service enabling multi-engine table access `_ -* **Lakekeeper** — `Rust native сatalog service `_ -* **Gravitino** — `Apache Gravitino unified catalog `_ -* **Unity Catalog** — `Databricks Unity Catalog `_ +* `Apache Iceberg Hadoop Catalog `_ +* `Lakekeeper `_ +* `Polaris `_ +* `Nessie `_ +* `Apache Gravitino `_ +* `Databricks Unity Catalog `_ diff --git a/docs/connection/db_connection/iceberg/warehouse_delegated.rst b/docs/connection/db_connection/iceberg/warehouse_delegated.rst new file mode 100644 index 000000000..13c0158f0 --- /dev/null +++ b/docs/connection/db_connection/iceberg/warehouse_delegated.rst @@ -0,0 +1,10 @@ +.. _iceberg-warehouse-delegated: + +Delegated Warehouse +=================== + +.. currentmodule:: onetl.connection.db_connection.iceberg.warehouse.delegated + +.. autoclass:: IcebergDelegatedWarehouse + :members: get_packages, get_config + :member-order: bysource diff --git a/onetl/connection/db_connection/iceberg/__init__.py b/onetl/connection/db_connection/iceberg/__init__.py index a0a123796..e513f8c33 100644 --- a/onetl/connection/db_connection/iceberg/__init__.py +++ b/onetl/connection/db_connection/iceberg/__init__.py @@ -6,3 +6,10 @@ from onetl.connection.db_connection.iceberg.options import ( IcebergWriteOptions, ) + +__all__ = [ + "Iceberg", + "IcebergDialect", + "IcebergExtra", + "IcebergWriteOptions", +] diff --git a/onetl/connection/db_connection/iceberg/catalog/auth/oauth2_client_credentials.py b/onetl/connection/db_connection/iceberg/catalog/auth/oauth2_client_credentials.py index 2b9f1ead1..1cd1da9e4 100644 --- a/onetl/connection/db_connection/iceberg/catalog/auth/oauth2_client_credentials.py +++ b/onetl/connection/db_connection/iceberg/catalog/auth/oauth2_client_credentials.py @@ -71,7 +71,7 @@ class IcebergRESTCatalogOAuth2ClientCredentials(IcebergRESTCatalogAuth, FrozenMo client_id="my_client_id", client_secret="my_client_secret", scopes=["catalog:read"], - oauth2_server_uri="https://oauth.example.com/token", + oauth2_server_uri="http://keycloak.domain.com/realms/my-realm/protocol/openid-connect/token", token_refresh_interval=timedelta(minutes=30), audience="iceberg-catalog", ) diff --git a/onetl/connection/db_connection/iceberg/catalog/auth/oauth2_token_exchange.py b/onetl/connection/db_connection/iceberg/catalog/auth/oauth2_token_exchange.py index f002f38d3..f4ca22fa5 100644 --- a/onetl/connection/db_connection/iceberg/catalog/auth/oauth2_token_exchange.py +++ b/onetl/connection/db_connection/iceberg/catalog/auth/oauth2_token_exchange.py @@ -78,7 +78,7 @@ class IcebergRESTCatalogOAuth2TokenExchange(IcebergRESTCatalogAuth, FrozenModel) client_id="my_client_id", client_secret="my_client_secret", scopes=["catalog:read"], - oauth2_server_uri="https://oauth.example.com/token", + oauth2_server_uri="http://keycloak.domain.com/realms/my-realm/protocol/openid-connect/token", token_refresh_interval=timedelta(minutes=30), audience="iceberg-catalog", ) diff --git a/onetl/connection/db_connection/iceberg/catalog/rest.py b/onetl/connection/db_connection/iceberg/catalog/rest.py index 9921baeab..d7a84f6d8 100644 --- a/onetl/connection/db_connection/iceberg/catalog/rest.py +++ b/onetl/connection/db_connection/iceberg/catalog/rest.py @@ -2,8 +2,9 @@ # SPDX-License-Identifier: Apache-2.0 from __future__ import annotations -from typing import Dict, Optional +from typing import Any, Dict, Optional +from onetl._util.spark import stringify from onetl.connection.db_connection.iceberg.catalog.auth import ( IcebergRESTCatalogAuth, IcebergRESTCatalogBasicAuth, @@ -125,8 +126,8 @@ class IcebergRESTCatalog(IcebergCatalog, FrozenModel): OAuth2TokenExchange = IcebergRESTCatalogOAuth2TokenExchange uri: str - headers: Dict[str, str] = Field(default_factory=dict) - extra: Dict[str, str] = Field(default_factory=dict) + headers: Dict[str, Any] = Field(default_factory=dict) + extra: Dict[str, Any] = Field(default_factory=dict) auth: Optional[IcebergRESTCatalogAuth] = None @@ -134,10 +135,10 @@ def get_config(self) -> Dict[str, str]: config = { "type": "rest", "uri": self.uri, - **self.extra, + **stringify(self.extra), } for key, value in self.headers.items(): - config[f"header.{key}"] = value + config[f"header.{key}"] = stringify(value) if self.auth: config.update(self.auth.get_config()) diff --git a/onetl/connection/db_connection/iceberg/connection.py b/onetl/connection/db_connection/iceberg/connection.py index 07061664b..03adea2d4 100644 --- a/onetl/connection/db_connection/iceberg/connection.py +++ b/onetl/connection/db_connection/iceberg/connection.py @@ -3,7 +3,7 @@ from __future__ import annotations import logging -from typing import TYPE_CHECKING, Any, Dict, Iterable, Union +from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional, Union from onetl._util.java import try_import_java_class from onetl._util.scala import get_default_scala_version @@ -24,6 +24,9 @@ IcebergS3Warehouse, IcebergWarehouse, ) +from onetl.connection.db_connection.iceberg.warehouse.delegated import ( + IcebergDelegatedWarehouse, +) from onetl.exception import MISSING_JVM_CLASS_MSG try: @@ -64,10 +67,7 @@ class Iceberg(DBConnection): Parameters ---------- catalog_name : str - Catalog name - - spark : :obj:`pyspark.sql.SparkSession` - Spark session + Catalog name. Arbitrary string used by Spark to identify catalog and tables (``mycatalog.myschema.mytable``). catalog : :obj:`IcebergCatalog` Iceberg catalog configuration @@ -97,12 +97,15 @@ class Iceberg(DBConnection): spark.sql.catalog.my_catalog.cache-enabled = 'true' spark.sql.catalog.my_catalog.cache.expiration-interval-ms = '40000' + spark : :obj:`pyspark.sql.SparkSession` + Spark session + Examples -------- .. tabs:: - .. code-tab:: python REST catalog and S3 warehouse + .. code-tab:: python REST catalog with Bearer token auth, S3 warehouse with explicit credentials from onetl.connection import Iceberg from pyspark.sql import SparkSession @@ -121,12 +124,12 @@ class Iceberg(DBConnection): catalog_name="my_catalog", spark=spark, catalog=Iceberg.RESTCatalog( - uri="http://rest.domain.com:8080", - auth=Iceberg.RESTCatalog.BasicAuth( - user="my_user", - password="my_password" - ) + uri="http://my.rest.catalog/iceberg", + auth=Iceberg.RESTCatalog.OAuth2BearerToken( + token="my_token", + ), ), + # explicit S3 warehouse params warehouse=Iceberg.S3Warehouse( path="/warehouse", host="s3.domain.com", @@ -136,10 +139,44 @@ class Iceberg(DBConnection): region="us-east-1", access_key="access_key", secret_key="secret_key" - ) + ), ) - .. code-tab:: python Filesystem catalog and HDFS warehouse + .. code-tab:: python REST catalog with OAuth2 client credentials, S3 warehouse with vended credentials + + from onetl.connection import Iceberg + from pyspark.sql import SparkSession + + maven_packages = [ + *Iceberg.get_packages(package_version="1.10.0", spark_version="3.5"), + # required to use S3 warehouse + *Iceberg.S3Warehouse.get_packages(package_version="1.10.0"), + ] + spark = ( + SparkSession.builder.appName("spark-app-name") + .config("spark.jars.packages", ",".join(maven_packages)) + .getOrCreate() + ) + + iceberg = Iceberg( + catalog_name="my_catalog", + spark=spark, + catalog=Iceberg.RESTCatalog( + uri="http://my.rest.catalog/iceberg", + auth=Iceberg.RESTCatalog.OAuth2ClientCredentials( + client_id="my_client", + client_secret="my_secret", + oauth2_server_uri="http://keycloak.domain.com/realms/my-realm/protocol/openid-connect/token", + ), + ), + # S3 warehouse params and credentials are provided by REST Catalog + warehouse=Iceberg.DeletatedWarehouse( + name="my-warehouse", + access_delegation=["vended-credentials"], + ), + ) + + .. code-tab:: python HDFS Filesystem catalog, HDFS warehouse from onetl.connection import Iceberg, SparkHDFS from pyspark.sql import SparkSession @@ -160,23 +197,26 @@ class Iceberg(DBConnection): iceberg = Iceberg( catalog_name="my_catalog", spark=spark, - catalog=Iceberg.Filesystem.Catalog(), - warehouse=Iceberg.Filesystem.Warehouse( + catalog=Iceberg.FilesystemCatalog(), + warehouse=Iceberg.FilesystemWarehouse( connection=hdfs_connection, - path="/warehouse/path" - ) + path="/warehouse/path", + ), ) """ catalog_name: str catalog: IcebergCatalog - warehouse: IcebergWarehouse + warehouse: Optional[IcebergWarehouse] = None extra: IcebergExtra = IcebergExtra() FilesystemCatalog = IcebergFilesystemCatalog RESTCatalog = IcebergRESTCatalog + FilesystemWarehouse = IcebergFilesystemWarehouse S3Warehouse = IcebergS3Warehouse + DelegatedWarehouse = IcebergDelegatedWarehouse + WriteOptions = IcebergWriteOptions Dialect = IcebergDialect @@ -191,7 +231,7 @@ def __init__( spark: SparkSession, catalog_name: str, catalog: IcebergCatalog, - warehouse: IcebergWarehouse, + warehouse: Optional[IcebergWarehouse] = None, extra: Union[IcebergExtra, Dict[str, Any]] = IcebergExtra(), # noqa: B008, WPS404 ): super().__init__( @@ -207,7 +247,7 @@ def __init__( ) catalog_config = { **self.catalog.get_config(), - **self.warehouse.get_config(), + **(self.warehouse.get_config() if self.warehouse else {}), **self.extra.dict(), } for k, v in catalog_config.items(): diff --git a/onetl/connection/db_connection/iceberg/warehouse/__init__.py b/onetl/connection/db_connection/iceberg/warehouse/__init__.py index 4ddbb2aab..670662c58 100644 --- a/onetl/connection/db_connection/iceberg/warehouse/__init__.py +++ b/onetl/connection/db_connection/iceberg/warehouse/__init__.py @@ -1,9 +1,19 @@ # SPDX-FileCopyrightText: 2021-2024 MTS PJSC # SPDX-License-Identifier: Apache-2.0 from onetl.connection.db_connection.iceberg.warehouse.base import IcebergWarehouse +from onetl.connection.db_connection.iceberg.warehouse.delegated import ( + IcebergDelegatedWarehouse, +) from onetl.connection.db_connection.iceberg.warehouse.filesystem import ( IcebergFilesystemWarehouse, ) from onetl.connection.db_connection.iceberg.warehouse.s3 import ( IcebergS3Warehouse, ) + +__all__ = [ + "IcebergDelegatedWarehouse", + "IcebergFilesystemWarehouse", + "IcebergS3Warehouse", + "IcebergWarehouse", +] diff --git a/onetl/connection/db_connection/iceberg/warehouse/base.py b/onetl/connection/db_connection/iceberg/warehouse/base.py index 50e37c8ee..fb94f5d89 100644 --- a/onetl/connection/db_connection/iceberg/warehouse/base.py +++ b/onetl/connection/db_connection/iceberg/warehouse/base.py @@ -1,9 +1,8 @@ # SPDX-FileCopyrightText: 2021-2024 MTS PJSC # SPDX-License-Identifier: Apache-2.0 -from abc import ABC, abstractmethod -from typing import Dict +from __future__ import annotations -from onetl.base import PurePathProtocol +from abc import ABC, abstractmethod class IcebergWarehouse(ABC): @@ -13,8 +12,6 @@ class IcebergWarehouse(ABC): .. versionadded:: 0.14.1 """ - path: PurePathProtocol - @abstractmethod - def get_config(self) -> Dict[str, str]: + def get_config(self) -> dict[str, str]: """Return flat dict with warehouse configuration.""" diff --git a/onetl/connection/db_connection/iceberg/warehouse/delegated.py b/onetl/connection/db_connection/iceberg/warehouse/delegated.py new file mode 100644 index 000000000..a229d67ba --- /dev/null +++ b/onetl/connection/db_connection/iceberg/warehouse/delegated.py @@ -0,0 +1,83 @@ +# SPDX-FileCopyrightText: 2021-2024 MTS PJSC +# SPDX-License-Identifier: Apache-2.0 +from __future__ import annotations + +from typing import Any, Dict, Optional + +from typing_extensions import Literal + +from onetl.hooks import slot, support_hooks + +try: + from pydantic.v1 import Field +except (ImportError, AttributeError): + from pydantic import Field # type: ignore[no-redef, assignment] + +from onetl._util.spark import stringify +from onetl.connection.db_connection.iceberg.warehouse import IcebergWarehouse +from onetl.impl.frozen_model import FrozenModel + + +@support_hooks +class IcebergDelegatedWarehouse(IcebergWarehouse, FrozenModel): + """Delegate configuring Iceberg warehouse to Iceberg catalog. |support_hooks| + + Used by some Iceberg catalog implementations like: + * `Lakekeeper `_ + * `Polaris `_ + * `Apache Gravitino `_ + * `Databricks Unity Catalog `_ + + .. versionadded:: 0.14.1 + + Parameters + ---------- + name : str, optional + Warehouse name/alias, if supported by specific Iceberg catalog + + access_delegation : "vended-credentials" | "remote-signing" + Value of `X-Iceberg-Access-Delegation `_ header. + + extra : Dict[str, str], default: {} + Additional configuration parameters + + Examples + -------- + + .. tabs:: + + .. code-tab:: python S3 client with vended credentials + + from onetl.connection import Iceberg + + warehouse = Iceberg.DeletatedWarehouse( + name="my-warehouse", + access_delegation="vended-credentials", + # other params passed to S3 client (optional) + extra={"client.region": "us-east-1"}, + ) + + .. code-tab:: python S3 client with remote signing + + from onetl.connection import Iceberg + + warehouse = Iceberg.DeletatedWarehouse( + name="my-warehouse", + access_delegation="remote-signing", + # other params passed to S3 client (optional) + extra={"client.region": "us-east-1"}, + ) + """ + + name: Optional[str] = None + access_delegation: Literal["vended-credentials", "remote-signing"] + extra: Dict[str, Any] = Field(default_factory=dict) + + @slot + def get_config(self) -> dict[str, str]: + config = { + "warehouse": self.name, + "header.X-Iceberg-Access-Delegation": self.access_delegation, + **stringify(self.extra), + } + return {k: v for k, v in config.items() if v is not None} diff --git a/onetl/connection/db_connection/iceberg/warehouse/filesystem.py b/onetl/connection/db_connection/iceberg/warehouse/filesystem.py index 72c4bda89..1e4921efc 100644 --- a/onetl/connection/db_connection/iceberg/warehouse/filesystem.py +++ b/onetl/connection/db_connection/iceberg/warehouse/filesystem.py @@ -1,8 +1,6 @@ # SPDX-FileCopyrightText: 2021-2024 MTS PJSC # SPDX-License-Identifier: Apache-2.0 -from typing import Dict - -from onetl.connection.file_df_connection.spark_s3.connection import SparkS3 +from __future__ import annotations try: from pydantic.v1 import validator @@ -14,9 +12,12 @@ from onetl.connection.file_df_connection.spark_file_df_connection import ( SparkFileDFConnection, ) +from onetl.connection.file_df_connection.spark_s3.connection import SparkS3 +from onetl.hooks import slot, support_hooks from onetl.impl.frozen_model import FrozenModel +@support_hooks class IcebergFilesystemWarehouse(IcebergWarehouse, FrozenModel): """Iceberg Filesystem Warehouse. @@ -90,13 +91,17 @@ class IcebergFilesystemWarehouse(IcebergWarehouse, FrozenModel): connection: SparkFileDFConnection path: PurePathProtocol - def get_config(self) -> Dict[str, str]: + @slot + def get_config(self) -> dict[str, str]: config = { - "warehouse": self.connection._convert_to_url(self.path), + "warehouse": self.connection._convert_to_url(self.path), # noqa: WPS437 + "io-impl": "org.apache.iceberg.hadoop.HadoopFileIO", } if isinstance(self.connection, SparkS3): - prefix = self.connection._get_hadoop_config_prefix() - hadoop_config = {"hadoop." + k: v for k, v in self.connection._get_expected_hadoop_config(prefix).items()} + prefix = self.connection._get_hadoop_config_prefix() # noqa: WPS437 + hadoop_config = { + "hadoop." + k: v for k, v in self.connection._get_expected_hadoop_config(prefix).items() # noqa: WPS437 + } config.update(hadoop_config) return config diff --git a/onetl/connection/db_connection/iceberg/warehouse/s3.py b/onetl/connection/db_connection/iceberg/warehouse/s3.py index c60117192..ac0d45358 100644 --- a/onetl/connection/db_connection/iceberg/warehouse/s3.py +++ b/onetl/connection/db_connection/iceberg/warehouse/s3.py @@ -1,7 +1,9 @@ # SPDX-FileCopyrightText: 2021-2024 MTS PJSC # SPDX-License-Identifier: Apache-2.0 +from __future__ import annotations + import os -from typing import Dict, List, Optional +from typing import Any, Dict, List, Optional from typing_extensions import Literal @@ -90,14 +92,15 @@ class IcebergS3Warehouse(IcebergWarehouse, FrozenModel): port: Optional[int] = None protocol: Literal["http", "https"] = "https" bucket: str + region: str path_style_access: bool = False access_key: Optional[str] = None secret_key: Optional[SecretStr] = None session_token: Optional[SecretStr] = None - region: str - extra: Dict[str, str] = Field(default_factory=dict) + extra: Dict[str, Any] = Field(default_factory=dict) - def get_config(self) -> Dict[str, str]: + @slot + def get_config(self) -> dict[str, str]: config = { "warehouse": "s3a://" + self.bucket + "/" + self.path.as_posix().lstrip("/"), "io-impl": "org.apache.iceberg.aws.s3.S3FileIO", @@ -107,7 +110,7 @@ def get_config(self) -> Dict[str, str]: "s3.session-token": self.session_token.get_secret_value() if self.session_token else None, "s3.path-style-access": stringify(self.path_style_access), "client.region": self.region, - **self.extra, + **stringify(self.extra), } return {k: v for k, v in config.items() if v is not None} diff --git a/onetl/connection/file_df_connection/spark_s3/connection.py b/onetl/connection/file_df_connection/spark_s3/connection.py index b4329335d..4fe873b75 100644 --- a/onetl/connection/file_df_connection/spark_s3/connection.py +++ b/onetl/connection/file_df_connection/spark_s3/connection.py @@ -415,7 +415,7 @@ def _fix_root_conf(self, conf: dict, prefix: str) -> None: if prefixed_key in conf: conf[f"fs.s3a.{key}"] = conf.pop(prefixed_key) - def _get_expected_hadoop_config(self, prefix: str) -> dict: + def _get_expected_hadoop_config(self, prefix: str) -> dict[str, str]: conf = { f"{prefix}.endpoint": f"{self.protocol}://{self.host}:{self.port}", f"{prefix}.connection.ssl.enabled": self.protocol == "https", diff --git a/setup.cfg b/setup.cfg index 54df7962e..a145b2192 100644 --- a/setup.cfg +++ b/setup.cfg @@ -283,7 +283,9 @@ ignore = # WPS413 Found bad magic module function: __getattr__ WPS413, # WPS338 Found incorrect order of methods in a class - WPS338 + WPS338, +# P102 docstring does contain unindexed parameters + P102 # http://flake8.pycqa.org/en/latest/user/options.html?highlight=per-file-ignores#cmdoption-flake8-per-file-ignores per-file-ignores = @@ -315,9 +317,6 @@ per-file-ignores = yaml_hwm_store.py: # E800 Found commented out code E800, -# P102 docstring does contain unindexed parameters - P102, - db_writer.py: # E800 Found commented out code E800, kafka.py: diff --git a/tests/tests_unit/tests_db_connection_unit/test_iceberg_unit.py b/tests/tests_unit/tests_db_connection_unit/test_iceberg_unit.py index d8bf0944c..dc88a060e 100644 --- a/tests/tests_unit/tests_db_connection_unit/test_iceberg_unit.py +++ b/tests/tests_unit/tests_db_connection_unit/test_iceberg_unit.py @@ -21,7 +21,7 @@ def test_iceberg_missing_args(spark_mock): Iceberg(spark=spark_mock) -def test_iceberg_with_filesystem_catalog_local_connection(spark_mock, iceberg_warehouse_dir): +def test_iceberg_with_filesystem_catalog_local_warehouse(spark_mock, iceberg_warehouse_dir): iceberg = Iceberg( catalog_name="my_catalog", catalog=Iceberg.FilesystemCatalog(), @@ -31,22 +31,23 @@ def test_iceberg_with_filesystem_catalog_local_connection(spark_mock, iceberg_wa ), spark=spark_mock, ) - assert iceberg assert iceberg.catalog.get_config() == { "type": "hadoop", } assert iceberg.warehouse.get_config() == { "warehouse": f"file://{iceberg_warehouse_dir}", + "io-impl": "org.apache.iceberg.hadoop.HadoopFileIO", } expected_calls = [ call("spark.sql.catalog.my_catalog", "org.apache.iceberg.spark.SparkCatalog"), call("spark.sql.catalog.my_catalog.type", "hadoop"), call("spark.sql.catalog.my_catalog.warehouse", f"file://{iceberg_warehouse_dir}"), + call("spark.sql.catalog.my_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO"), ] spark_mock.conf.set.assert_has_calls(expected_calls, any_order=True) -def test_iceberg_with_filesystem_catalog_hdfs_connection(spark_mock, mocker, iceberg_warehouse_dir): +def test_iceberg_with_filesystem_catalog_hdfs_warehouse(spark_mock, mocker, iceberg_warehouse_dir): from onetl.connection.file_df_connection.spark_hdfs.connection import SparkHDFS def conn_str(self): @@ -64,22 +65,23 @@ def conn_str(self): ), spark=spark_mock, ) - assert iceberg assert iceberg.catalog.get_config() == { "type": "hadoop", } assert iceberg.warehouse.get_config() == { "warehouse": f"{connection._get_conn_str()}{iceberg_warehouse_dir}", + "io-impl": "org.apache.iceberg.hadoop.HadoopFileIO", } expected_calls = [ call("spark.sql.catalog.my_catalog", "org.apache.iceberg.spark.SparkCatalog"), call("spark.sql.catalog.my_catalog.type", "hadoop"), call("spark.sql.catalog.my_catalog.warehouse", f"{connection._get_conn_str()}{iceberg_warehouse_dir}"), + call("spark.sql.catalog.my_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO"), ] spark_mock.conf.set.assert_has_calls(expected_calls, any_order=True) -def test_iceberg_with_filesystem_catalog_s3_connection(spark_mock, iceberg_warehouse_dir): +def test_iceberg_with_filesystem_catalog_s3_warehouse(spark_mock, iceberg_warehouse_dir): connection = SparkS3( spark=spark_mock, host="localhost", @@ -99,12 +101,12 @@ def test_iceberg_with_filesystem_catalog_s3_connection(spark_mock, iceberg_wareh ), spark=spark_mock, ) - assert iceberg assert iceberg.catalog.get_config() == { "type": "hadoop", } assert iceberg.warehouse.get_config() == { "warehouse": f"s3a://onetl{iceberg_warehouse_dir}", + "io-impl": "org.apache.iceberg.hadoop.HadoopFileIO", "hadoop.fs.s3a.bucket.onetl.access.key": "onetl", "hadoop.fs.s3a.bucket.onetl.secret.key": "123UsedForTestOnly@!", "hadoop.fs.s3a.bucket.onetl.aws.credentials.provider": "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider", @@ -117,6 +119,7 @@ def test_iceberg_with_filesystem_catalog_s3_connection(spark_mock, iceberg_wareh call("spark.sql.catalog.my_catalog", "org.apache.iceberg.spark.SparkCatalog"), call("spark.sql.catalog.my_catalog.type", "hadoop"), call("spark.sql.catalog.my_catalog.warehouse", f"s3a://onetl{iceberg_warehouse_dir}"), + call("spark.sql.catalog.my_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO"), call("spark.sql.catalog.my_catalog.hadoop.fs.s3a.bucket.onetl.access.key", "onetl"), call("spark.sql.catalog.my_catalog.hadoop.fs.s3a.bucket.onetl.secret.key", "123UsedForTestOnly@!"), call( @@ -134,7 +137,7 @@ def test_iceberg_with_filesystem_catalog_s3_connection(spark_mock, iceberg_wareh spark_mock.conf.set.assert_has_calls(expected_calls, any_order=True) -def test_iceberg_with_rest_catalog_local_connection(spark_mock): +def test_iceberg_with_rest_catalog_local_warehouse(spark_mock): warehouse = Iceberg.FilesystemWarehouse( connection=SparkLocalFS(spark=spark_mock), path="/data", @@ -144,13 +147,12 @@ def test_iceberg_with_rest_catalog_local_connection(spark_mock): catalog=Iceberg.RESTCatalog( uri="http://localhost:8080", headers={ - "X-Custom-Header": "123", + "X-Custom-Header": 123, }, ), warehouse=warehouse, spark=spark_mock, ) - assert iceberg assert iceberg.catalog.get_config() == { "type": "rest", "uri": "http://localhost:8080", @@ -158,18 +160,20 @@ def test_iceberg_with_rest_catalog_local_connection(spark_mock): } assert iceberg.warehouse.get_config() == { "warehouse": "file:///data", + "io-impl": "org.apache.iceberg.hadoop.HadoopFileIO", } expected_calls = [ call("spark.sql.catalog.my_catalog", "org.apache.iceberg.spark.SparkCatalog"), call("spark.sql.catalog.my_catalog.type", "rest"), call("spark.sql.catalog.my_catalog.uri", "http://localhost:8080"), call("spark.sql.catalog.my_catalog.warehouse", "file:///data"), + call("spark.sql.catalog.my_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO"), call("spark.sql.catalog.my_catalog.header.X-Custom-Header", "123"), ] spark_mock.conf.set.assert_has_calls(expected_calls, any_order=True) -def test_iceberg_with_rest_catalog_hdfs_connection(spark_mock, mocker, iceberg_warehouse_dir): +def test_iceberg_with_rest_catalog_hdfs_warehouse(spark_mock, mocker, iceberg_warehouse_dir): from onetl.connection.file_df_connection.spark_hdfs.connection import SparkHDFS def conn_str(self): @@ -183,7 +187,7 @@ def conn_str(self): catalog=Iceberg.RESTCatalog( uri="http://localhost:8080", headers={ - "X-Custom-Header": "123", + "X-Custom-Header": 123, }, ), warehouse=Iceberg.FilesystemWarehouse( @@ -192,7 +196,6 @@ def conn_str(self): ), spark=spark_mock, ) - assert iceberg assert iceberg.catalog.get_config() == { "type": "rest", "uri": "http://localhost:8080", @@ -200,18 +203,20 @@ def conn_str(self): } assert iceberg.warehouse.get_config() == { "warehouse": f"{connection._get_conn_str()}{iceberg_warehouse_dir}", + "io-impl": "org.apache.iceberg.hadoop.HadoopFileIO", } expected_calls = [ call("spark.sql.catalog.my_catalog", "org.apache.iceberg.spark.SparkCatalog"), call("spark.sql.catalog.my_catalog.type", "rest"), call("spark.sql.catalog.my_catalog.uri", "http://localhost:8080"), call("spark.sql.catalog.my_catalog.warehouse", f"{connection._get_conn_str()}{iceberg_warehouse_dir}"), + call("spark.sql.catalog.my_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO"), call("spark.sql.catalog.my_catalog.header.X-Custom-Header", "123"), ] spark_mock.conf.set.assert_has_calls(expected_calls, any_order=True) -def test_iceberg_with_rest_catalog_s3_connection(spark_mock): +def test_iceberg_with_rest_catalog_s3_warehouse(spark_mock): warehouse = Iceberg.S3Warehouse( path="/data", host="localhost", @@ -228,13 +233,12 @@ def test_iceberg_with_rest_catalog_s3_connection(spark_mock): catalog=Iceberg.RESTCatalog( uri="http://localhost:8080", headers={ - "X-Custom-Header": "123", + "X-Custom-Header": 123, }, ), warehouse=warehouse, spark=spark_mock, ) - assert iceberg assert iceberg.catalog.get_config() == { "type": "rest", "uri": "http://localhost:8080", @@ -265,6 +269,44 @@ def test_iceberg_with_rest_catalog_s3_connection(spark_mock): spark_mock.conf.set.assert_has_calls(expected_calls, any_order=True) +def test_iceberg_with_rest_catalog_delegated_warehouse(spark_mock): + iceberg = Iceberg( + catalog_name="my_catalog", + catalog=Iceberg.RESTCatalog( + uri="http://localhost:8080", + headers={ + "X-Custom-Header": 123, + }, + ), + warehouse=Iceberg.DelegatedWarehouse( + name="my-warehouse", + access_delegation="vended-credentials", + extra={"client.region": "us-east-1"}, + ), + spark=spark_mock, + ) + assert iceberg.catalog.get_config() == { + "type": "rest", + "uri": "http://localhost:8080", + "header.X-Custom-Header": "123", + } + assert iceberg.warehouse.get_config() == { + "warehouse": "my-warehouse", + "client.region": "us-east-1", + "header.X-Iceberg-Access-Delegation": "vended-credentials", + } + expected_calls = [ + call("spark.sql.catalog.my_catalog", "org.apache.iceberg.spark.SparkCatalog"), + call("spark.sql.catalog.my_catalog.type", "rest"), + call("spark.sql.catalog.my_catalog.uri", "http://localhost:8080"), + call("spark.sql.catalog.my_catalog.warehouse", "my-warehouse"), + call("spark.sql.catalog.my_catalog.client.region", "us-east-1"), + call("spark.sql.catalog.my_catalog.header.X-Iceberg-Access-Delegation", "vended-credentials"), + call("spark.sql.catalog.my_catalog.header.X-Custom-Header", "123"), + ] + spark_mock.conf.set.assert_has_calls(expected_calls, any_order=True) + + def test_iceberg_rest_catalog_missing_args(): with pytest.raises(ValueError, match="field required"): Iceberg.RESTCatalog()