Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/changelog/next_release/413.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implement ``Iceberg.DelegatedWarehouse`` allowing to use vended S3 credentials or remote-signing.
1 change: 1 addition & 0 deletions docs/connection/db_connection/iceberg/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Iceberg

warehouse_filesystem
warehouse_s3
warehouse_delegated

.. toctree::
:maxdepth: 1
Expand Down
12 changes: 6 additions & 6 deletions docs/connection/db_connection/iceberg/prerequisites.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ Popular Metastore Implementations

Iceberg supports multiple catalog implementations. Here are some popular options:

* **Apache Iceberg Hadoop Catalog** — `File-based catalog using Hadoop filesystem <https://iceberg.apache.org/docs/latest/spark-configuration/>`_
* **Apache Iceberg REST Catalog** — `Iceberg REST catalog <https://iceberg.apache.org/rest-catalog-spec/>`_
* **Polaris** — `Сatalog service enabling multi-engine table access <https://polaris.apache.org/in-dev/unreleased/getting-started/>`_
* **Lakekeeper** — `Rust native сatalog service <https://docs.lakekeeper.io/getting-started/>`_
* **Gravitino** — `Apache Gravitino unified catalog <https://gravitino.apache.org/docs/>`_
* **Unity Catalog** — `Databricks Unity Catalog <https://docs.databricks.com/aws/en/external-access/iceberg/>`_
* `Apache Iceberg Hadoop Catalog <https://iceberg.apache.org/docs/latest/spark-configuration/>`_
* `Lakekeeper <https://docs.lakekeeper.io/getting-started/>`_
* `Polaris <https://polaris.apache.org/in-dev/unreleased/getting-started/>`_
* `Nessie <https://projectnessie.org/guides/iceberg-rest/>`_
* `Apache Gravitino <https://gravitino.apache.org/docs/>`_
* `Databricks Unity Catalog <https://docs.databricks.com/aws/en/external-access/iceberg/>`_
10 changes: 10 additions & 0 deletions docs/connection/db_connection/iceberg/warehouse_delegated.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
.. _iceberg-warehouse-delegated:

Delegated Warehouse
===================

.. currentmodule:: onetl.connection.db_connection.iceberg.warehouse.delegated

.. autoclass:: IcebergDelegatedWarehouse
:members: get_packages, get_config
:member-order: bysource
7 changes: 7 additions & 0 deletions onetl/connection/db_connection/iceberg/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,10 @@
from onetl.connection.db_connection.iceberg.options import (
IcebergWriteOptions,
)

__all__ = [
"Iceberg",
"IcebergDialect",
"IcebergExtra",
"IcebergWriteOptions",
]
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class IcebergRESTCatalogOAuth2ClientCredentials(IcebergRESTCatalogAuth, FrozenMo
client_id="my_client_id",
client_secret="my_client_secret",
scopes=["catalog:read"],
oauth2_server_uri="https://oauth.example.com/token",
oauth2_server_uri="http://keycloak.domain.com/realms/my-realm/protocol/openid-connect/token",
token_refresh_interval=timedelta(minutes=30),
audience="iceberg-catalog",
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class IcebergRESTCatalogOAuth2TokenExchange(IcebergRESTCatalogAuth, FrozenModel)
client_id="my_client_id",
client_secret="my_client_secret",
scopes=["catalog:read"],
oauth2_server_uri="https://oauth.example.com/token",
oauth2_server_uri="http://keycloak.domain.com/realms/my-realm/protocol/openid-connect/token",
token_refresh_interval=timedelta(minutes=30),
audience="iceberg-catalog",
)
Expand Down
11 changes: 6 additions & 5 deletions onetl/connection/db_connection/iceberg/catalog/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations

from typing import Dict, Optional
from typing import Any, Dict, Optional

from onetl._util.spark import stringify
from onetl.connection.db_connection.iceberg.catalog.auth import (
IcebergRESTCatalogAuth,
IcebergRESTCatalogBasicAuth,
Expand Down Expand Up @@ -125,19 +126,19 @@ class IcebergRESTCatalog(IcebergCatalog, FrozenModel):
OAuth2TokenExchange = IcebergRESTCatalogOAuth2TokenExchange

uri: str
headers: Dict[str, str] = Field(default_factory=dict)
extra: Dict[str, str] = Field(default_factory=dict)
headers: Dict[str, Any] = Field(default_factory=dict)
extra: Dict[str, Any] = Field(default_factory=dict)

auth: Optional[IcebergRESTCatalogAuth] = None

def get_config(self) -> Dict[str, str]:
config = {
"type": "rest",
"uri": self.uri,
**self.extra,
**stringify(self.extra),
}
for key, value in self.headers.items():
config[f"header.{key}"] = value
config[f"header.{key}"] = stringify(value)

if self.auth:
config.update(self.auth.get_config())
Expand Down
80 changes: 60 additions & 20 deletions onetl/connection/db_connection/iceberg/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from __future__ import annotations

import logging
from typing import TYPE_CHECKING, Any, Dict, Iterable, Union
from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional, Union

from onetl._util.java import try_import_java_class
from onetl._util.scala import get_default_scala_version
Expand All @@ -24,6 +24,9 @@
IcebergS3Warehouse,
IcebergWarehouse,
)
from onetl.connection.db_connection.iceberg.warehouse.delegated import (
IcebergDelegatedWarehouse,
)
from onetl.exception import MISSING_JVM_CLASS_MSG

try:
Expand Down Expand Up @@ -64,10 +67,7 @@ class Iceberg(DBConnection):
Parameters
----------
catalog_name : str
Catalog name

spark : :obj:`pyspark.sql.SparkSession`
Spark session
Catalog name. Arbitrary string used by Spark to identify catalog and tables (``mycatalog.myschema.mytable``).

catalog : :obj:`IcebergCatalog`
Iceberg catalog configuration
Expand Down Expand Up @@ -97,12 +97,15 @@ class Iceberg(DBConnection):
spark.sql.catalog.my_catalog.cache-enabled = 'true'
spark.sql.catalog.my_catalog.cache.expiration-interval-ms = '40000'

spark : :obj:`pyspark.sql.SparkSession`
Spark session

Examples
--------

.. tabs::

.. code-tab:: python REST catalog and S3 warehouse
.. code-tab:: python REST catalog with Bearer token auth, S3 warehouse with explicit credentials

from onetl.connection import Iceberg
from pyspark.sql import SparkSession
Expand All @@ -121,12 +124,12 @@ class Iceberg(DBConnection):
catalog_name="my_catalog",
spark=spark,
catalog=Iceberg.RESTCatalog(
uri="http://rest.domain.com:8080",
auth=Iceberg.RESTCatalog.BasicAuth(
user="my_user",
password="my_password"
)
uri="http://my.rest.catalog/iceberg",
auth=Iceberg.RESTCatalog.OAuth2BearerToken(
token="my_token",
),
),
# explicit S3 warehouse params
warehouse=Iceberg.S3Warehouse(
path="/warehouse",
host="s3.domain.com",
Expand All @@ -136,10 +139,44 @@ class Iceberg(DBConnection):
region="us-east-1",
access_key="access_key",
secret_key="secret_key"
)
),
)

.. code-tab:: python Filesystem catalog and HDFS warehouse
.. code-tab:: python REST catalog with OAuth2 client credentials, S3 warehouse with vended credentials

from onetl.connection import Iceberg
from pyspark.sql import SparkSession

maven_packages = [
*Iceberg.get_packages(package_version="1.10.0", spark_version="3.5"),
# required to use S3 warehouse
*Iceberg.S3Warehouse.get_packages(package_version="1.10.0"),
]
spark = (
SparkSession.builder.appName("spark-app-name")
.config("spark.jars.packages", ",".join(maven_packages))
.getOrCreate()
)

iceberg = Iceberg(
catalog_name="my_catalog",
spark=spark,
catalog=Iceberg.RESTCatalog(
uri="http://my.rest.catalog/iceberg",
auth=Iceberg.RESTCatalog.OAuth2ClientCredentials(
client_id="my_client",
client_secret="my_secret",
oauth2_server_uri="http://keycloak.domain.com/realms/my-realm/protocol/openid-connect/token",
),
),
# S3 warehouse params and credentials are provided by REST Catalog
warehouse=Iceberg.DeletatedWarehouse(
name="my-warehouse",
access_delegation=["vended-credentials"],
),
)

.. code-tab:: python HDFS Filesystem catalog, HDFS warehouse

from onetl.connection import Iceberg, SparkHDFS
from pyspark.sql import SparkSession
Expand All @@ -160,23 +197,26 @@ class Iceberg(DBConnection):
iceberg = Iceberg(
catalog_name="my_catalog",
spark=spark,
catalog=Iceberg.Filesystem.Catalog(),
warehouse=Iceberg.Filesystem.Warehouse(
catalog=Iceberg.FilesystemCatalog(),
warehouse=Iceberg.FilesystemWarehouse(
connection=hdfs_connection,
path="/warehouse/path"
)
path="/warehouse/path",
),
)
"""

catalog_name: str
catalog: IcebergCatalog
warehouse: IcebergWarehouse
warehouse: Optional[IcebergWarehouse] = None
extra: IcebergExtra = IcebergExtra()

FilesystemCatalog = IcebergFilesystemCatalog
RESTCatalog = IcebergRESTCatalog

FilesystemWarehouse = IcebergFilesystemWarehouse
S3Warehouse = IcebergS3Warehouse
DelegatedWarehouse = IcebergDelegatedWarehouse

WriteOptions = IcebergWriteOptions

Dialect = IcebergDialect
Expand All @@ -191,7 +231,7 @@ def __init__(
spark: SparkSession,
catalog_name: str,
catalog: IcebergCatalog,
warehouse: IcebergWarehouse,
warehouse: Optional[IcebergWarehouse] = None,
extra: Union[IcebergExtra, Dict[str, Any]] = IcebergExtra(), # noqa: B008, WPS404
):
super().__init__(
Expand All @@ -207,7 +247,7 @@ def __init__(
)
catalog_config = {
**self.catalog.get_config(),
**self.warehouse.get_config(),
**(self.warehouse.get_config() if self.warehouse else {}),
**self.extra.dict(),
}
for k, v in catalog_config.items():
Expand Down
10 changes: 10 additions & 0 deletions onetl/connection/db_connection/iceberg/warehouse/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
# SPDX-FileCopyrightText: 2021-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from onetl.connection.db_connection.iceberg.warehouse.base import IcebergWarehouse
from onetl.connection.db_connection.iceberg.warehouse.delegated import (
IcebergDelegatedWarehouse,
)
from onetl.connection.db_connection.iceberg.warehouse.filesystem import (
IcebergFilesystemWarehouse,
)
from onetl.connection.db_connection.iceberg.warehouse.s3 import (
IcebergS3Warehouse,
)

__all__ = [
"IcebergDelegatedWarehouse",
"IcebergFilesystemWarehouse",
"IcebergS3Warehouse",
"IcebergWarehouse",
]
9 changes: 3 additions & 6 deletions onetl/connection/db_connection/iceberg/warehouse/base.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
# SPDX-FileCopyrightText: 2021-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from abc import ABC, abstractmethod
from typing import Dict
from __future__ import annotations

from onetl.base import PurePathProtocol
from abc import ABC, abstractmethod


class IcebergWarehouse(ABC):
Expand All @@ -13,8 +12,6 @@ class IcebergWarehouse(ABC):
.. versionadded:: 0.14.1
"""

path: PurePathProtocol

@abstractmethod
def get_config(self) -> Dict[str, str]:
def get_config(self) -> dict[str, str]:
"""Return flat dict with warehouse configuration."""
83 changes: 83 additions & 0 deletions onetl/connection/db_connection/iceberg/warehouse/delegated.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# SPDX-FileCopyrightText: 2021-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations

from typing import Any, Dict, Optional

from typing_extensions import Literal

from onetl.hooks import slot, support_hooks

try:
from pydantic.v1 import Field
except (ImportError, AttributeError):
from pydantic import Field # type: ignore[no-redef, assignment]

from onetl._util.spark import stringify
from onetl.connection.db_connection.iceberg.warehouse import IcebergWarehouse
from onetl.impl.frozen_model import FrozenModel


@support_hooks
class IcebergDelegatedWarehouse(IcebergWarehouse, FrozenModel):
"""Delegate configuring Iceberg warehouse to Iceberg catalog. |support_hooks|

Used by some Iceberg catalog implementations like:
* `Lakekeeper <https://docs.lakekeeper.io/docs/latest/storage/#s3>`_
* `Polaris <https://polaris.apache.org/in-dev/unreleased/polaris-spark-client/>`_
* `Apache Gravitino <https://gravitino.apache.org/docs/1.0.0/security/credential-vending/>`_
* `Databricks Unity Catalog <https://docs.databricks.com/aws/en/external-access/iceberg#use-iceberg-tables-with-apache-spark>`_

.. versionadded:: 0.14.1

Parameters
----------
name : str, optional
Warehouse name/alias, if supported by specific Iceberg catalog

access_delegation : "vended-credentials" | "remote-signing"
Value of `X-Iceberg-Access-Delegation <https://github.com/apache/iceberg/blob/apache-iceberg-1.10.0/open-api/rest-catalog-open-api.yaml#L1854>`_ header.

extra : Dict[str, str], default: {}
Additional configuration parameters

Examples
--------

.. tabs::

.. code-tab:: python S3 client with vended credentials

from onetl.connection import Iceberg

warehouse = Iceberg.DeletatedWarehouse(
name="my-warehouse",
access_delegation="vended-credentials",
# other params passed to S3 client (optional)
extra={"client.region": "us-east-1"},
)

.. code-tab:: python S3 client with remote signing

from onetl.connection import Iceberg

warehouse = Iceberg.DeletatedWarehouse(
name="my-warehouse",
access_delegation="remote-signing",
# other params passed to S3 client (optional)
extra={"client.region": "us-east-1"},
)
"""

name: Optional[str] = None
access_delegation: Literal["vended-credentials", "remote-signing"]
extra: Dict[str, Any] = Field(default_factory=dict)

@slot
def get_config(self) -> dict[str, str]:
config = {
"warehouse": self.name,
"header.X-Iceberg-Access-Delegation": self.access_delegation,
**stringify(self.extra),
}
return {k: v for k, v in config.items() if v is not None}
Loading
Loading