Skip to content

Added BigQuery Metastore Catalog #2068

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions mkdocs/docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ You can mix and match optional dependencies depending on your needs:
| hive-kerberos | Support for Hive metastore in Kerberos environment |
| glue | Support for AWS Glue |
| dynamodb | Support for AWS DynamoDB |
| bigquery | Support for Google Cloud BigQuery |
| sql-postgres | Support for SQL Catalog backed by Postgresql |
| sql-sqlite | Support for SQL Catalog backed by SQLite |
| pyarrow | PyArrow as a FileIO implementation to interact with the object store |
Expand Down
170 changes: 148 additions & 22 deletions poetry.lock

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ class CatalogType(Enum):
DYNAMODB = "dynamodb"
SQL = "sql"
IN_MEMORY = "in-memory"
BIGQUERY = "bigquery"


def load_rest(name: str, conf: Properties) -> Catalog:
Expand Down Expand Up @@ -172,13 +173,23 @@ def load_in_memory(name: str, conf: Properties) -> Catalog:
raise NotInstalledError("SQLAlchemy support not installed: pip install 'pyiceberg[sql-sqlite]'") from exc


def load_bigquery(name: str, conf: Properties) -> Catalog:
try:
from pyiceberg.catalog.bigquery_metastore import BigQueryMetastoreCatalog

return BigQueryMetastoreCatalog(name, **conf)
except ImportError as exc:
raise NotInstalledError("BigQuery support not installed: pip install 'pyiceberg[bigquery]'") from exc


AVAILABLE_CATALOGS: dict[CatalogType, Callable[[str, Properties], Catalog]] = {
CatalogType.REST: load_rest,
CatalogType.HIVE: load_hive,
CatalogType.GLUE: load_glue,
CatalogType.DYNAMODB: load_dynamodb,
CatalogType.SQL: load_sql,
CatalogType.IN_MEMORY: load_in_memory,
CatalogType.BIGQUERY: load_bigquery,
}


Expand Down
419 changes: 419 additions & 0 deletions pyiceberg/catalog/bigquery_metastore.py

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ ray = [
python-snappy = { version = ">=0.6.0,<1.0.0", optional = true }
thrift = { version = ">=0.13.0,<1.0.0", optional = true }
boto3 = { version = ">=1.24.59", optional = true }
google-cloud-bigquery = { version = "^3.33.0", optional = true }
s3fs = { version = ">=2023.1.0", optional = true }
adlfs = { version = ">=2023.1.0", optional = true }
gcsfs = { version = ">=2023.1.0", optional = true }
Expand Down Expand Up @@ -282,6 +283,10 @@ ignore_missing_imports = true
module = "pyiceberg_core.*"
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = "google.*"
ignore_missing_imports = true

[tool.poetry.scripts]
pyiceberg = "pyiceberg.cli.console:run"

Expand All @@ -307,6 +312,7 @@ s3fs = ["s3fs"]
glue = ["boto3"]
adlfs = ["adlfs"]
dynamodb = ["boto3"]
bigquery = ["google-cloud-bigquery"]
zstandard = ["zstandard"]
sql-postgres = ["sqlalchemy", "psycopg2-binary"]
sql-sqlite = ["sqlalchemy"]
Expand Down
149 changes: 149 additions & 0 deletions tests/catalog/integration_test_bigquery_metastore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os

import pytest
from pytest_mock import MockFixture

from pyiceberg.catalog.bigquery_metastore import BigQueryMetastoreCatalog
from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchTableError
from pyiceberg.io import load_file_io
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC
from pyiceberg.schema import Schema
from pyiceberg.serializers import ToOutputFile
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER
from tests.conftest import BQ_TABLE_METADATA_LOCATION_REGEX


def test_create_table_with_database_location(
mocker: MockFixture, _bucket_initialize: None, table_schema_nested: Schema, gcp_dataset_name: str, table_name: str
) -> None:
mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"})

catalog_name = "test_ddb_catalog"
identifier = (gcp_dataset_name, table_name)
test_catalog = BigQueryMetastoreCatalog(
catalog_name, **{"gcp.project-id": "alexstephen-test-1", "warehouse": "gs://alexstephen-test-bq-bucket/"}
)
test_catalog.create_namespace(namespace=gcp_dataset_name)
table = test_catalog.create_table(identifier, table_schema_nested)
assert table.name() == identifier
assert BQ_TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)

tables_in_namespace = test_catalog.list_tables(namespace=gcp_dataset_name)
assert identifier in tables_in_namespace


def test_drop_table_with_database_location(
mocker: MockFixture, _bucket_initialize: None, table_schema_nested: Schema, gcp_dataset_name: str, table_name: str
) -> None:
mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"})

catalog_name = "test_ddb_catalog"
identifier = (gcp_dataset_name, table_name)
test_catalog = BigQueryMetastoreCatalog(
catalog_name, **{"gcp.project-id": "alexstephen-test-1", "warehouse": "gs://alexstephen-test-bq-bucket/"}
)
test_catalog.create_namespace(namespace=gcp_dataset_name)
test_catalog.create_table(identifier, table_schema_nested)
test_catalog.drop_table(identifier)

tables_in_namespace_after_drop = test_catalog.list_tables(namespace=gcp_dataset_name)
assert identifier not in tables_in_namespace_after_drop

# Expect that the table no longer exists.
try:
test_catalog.load_table(identifier)
raise AssertionError()
except NoSuchTableError:
assert True


def test_create_and_drop_namespace(
mocker: MockFixture, _bucket_initialize: None, table_schema_nested: Schema, gcp_dataset_name: str, table_name: str
) -> None:
mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"})

# Create namespace.
catalog_name = "test_ddb_catalog"
test_catalog = BigQueryMetastoreCatalog(
catalog_name, **{"gcp.project-id": "alexstephen-test-1", "warehouse": "gs://alexstephen-test-bq-bucket/"}
)
test_catalog.create_namespace(namespace=gcp_dataset_name)

# Ensure that the namespace exists.
namespaces = test_catalog.list_namespaces()
assert (gcp_dataset_name,) in namespaces

# Drop the namespace and ensure it does not exist.
test_catalog.drop_namespace(namespace=gcp_dataset_name)
namespaces_after_drop = test_catalog.list_namespaces()
assert (gcp_dataset_name,) not in namespaces_after_drop

# Verify with load_namespace_properties as well
with pytest.raises(NoSuchNamespaceError):
test_catalog.load_namespace_properties(gcp_dataset_name)


def test_register_table(
mocker: MockFixture, _bucket_initialize: None, table_schema_nested: Schema, gcp_dataset_name: str, table_name: str
) -> None:
mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"})

catalog_name = "test_bq_register_catalog"
identifier = (gcp_dataset_name, table_name)
warehouse_path = "gs://alexstephen-test-bq-bucket/" # Matches conftest BUCKET_NAME for GCS interaction
gcp_project_id = "alexstephen-test-1"

test_catalog = BigQueryMetastoreCatalog(catalog_name, **{"gcp.project-id": gcp_project_id, "warehouse": warehouse_path})

test_catalog.create_namespace(namespace=gcp_dataset_name)

# Manually create a metadata file in GCS
table_gcs_location = f"{warehouse_path.rstrip('/')}/{gcp_dataset_name}.db/{table_name}"
# Construct a unique metadata file name similar to how pyiceberg would
metadata_file_name = "00000-aaaaaaaa-aaaa-4aaa-aaaa-aaaaaaaaaaaa.metadata.json"
metadata_gcs_path = f"{table_gcs_location}/metadata/{metadata_file_name}"

metadata = new_table_metadata(
location=table_gcs_location,
schema=table_schema_nested,
properties={},
partition_spec=UNPARTITIONED_PARTITION_SPEC,
sort_order=UNSORTED_SORT_ORDER,
)
io = load_file_io(properties=test_catalog.properties, location=metadata_gcs_path)
test_catalog._write_metadata(metadata, io, metadata_gcs_path)
ToOutputFile.table_metadata(metadata, io.new_output(metadata_gcs_path), overwrite=True)

# Register the table
registered_table = test_catalog.register_table(identifier, metadata_gcs_path)

assert registered_table.name() == identifier
assert registered_table.metadata_location == metadata_gcs_path
assert registered_table.metadata.location == table_gcs_location
assert BQ_TABLE_METADATA_LOCATION_REGEX.match(registered_table.metadata_location)

# Verify table exists and is loadable
loaded_table = test_catalog.load_table(identifier)
assert loaded_table.name() == registered_table.name()
assert loaded_table.metadata_location == metadata_gcs_path

# Clean up
test_catalog.drop_table(identifier)
test_catalog.drop_namespace(gcp_dataset_name)
178 changes: 178 additions & 0 deletions tests/catalog/test_bigquery_metastore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
from unittest.mock import MagicMock

from google.api_core.exceptions import NotFound
from google.cloud.bigquery import Dataset, DatasetReference, Table, TableReference
from google.cloud.bigquery.external_config import ExternalCatalogDatasetOptions, ExternalCatalogTableOptions
from pytest_mock import MockFixture

from pyiceberg.catalog.bigquery_metastore import ICEBERG_TABLE_TYPE_VALUE, TABLE_TYPE_PROP, BigQueryMetastoreCatalog
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.schema import Schema


def dataset_mock() -> Dataset:
d = Dataset(DatasetReference(dataset_id="my-dataset", project="my-project"))
d.external_catalog_dataset_options = ExternalCatalogDatasetOptions(
default_storage_location_uri="gs://test-bucket/iceberg-dataset"
)
return d


def table_mock() -> Table:
t = Table(TableReference(dataset_ref=DatasetReference(dataset_id="my-dataset", project="my-project"), table_id="my-table"))
t.external_catalog_table_options = ExternalCatalogTableOptions(
parameters={
"metadata_location": "gs://alexstephen-test-bq-bucket/my_iceberg_database_aaaaaaaaaaaaaaaaaaaa.db/my_iceberg_table-bbbbbbbbbbbbbbbbbbbb/metadata/12343-aaaaaaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaa.metadata",
TABLE_TYPE_PROP: ICEBERG_TABLE_TYPE_VALUE,
}
)
return t


def test_create_table_with_database_location(
mocker: MockFixture, _bucket_initialize: None, table_schema_nested: Schema, gcp_dataset_name: str, table_name: str
) -> None:
# Setup mocks for GCP.
client_mock = MagicMock()
client_mock.get_dataset.return_value = dataset_mock()
client_mock.get_table.return_value = table_mock()

# Setup mocks for GCS.
file_mock = MagicMock()

mocker.patch("pyiceberg.catalog.bigquery_metastore.Client", return_value=client_mock)
mocker.patch("pyiceberg.catalog.bigquery_metastore.FromInputFile.table_metadata", return_value=file_mock)
mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"})

catalog_name = "test_ddb_catalog"
identifier = (gcp_dataset_name, table_name)
test_catalog = BigQueryMetastoreCatalog(
catalog_name, **{"gcp.project-id": "alexstephen-test-1", "warehouse": "gs://alexstephen-test-bq-bucket/"}
)
test_catalog.create_namespace(namespace=gcp_dataset_name)
table = test_catalog.create_table(identifier, table_schema_nested)
assert table.name() == identifier


def test_drop_table_with_database_location(
mocker: MockFixture, _bucket_initialize: None, table_schema_nested: Schema, gcp_dataset_name: str, table_name: str
) -> None:
# Setup mocks for GCP.
client_mock = MagicMock()
client_mock.get_dataset.return_value = dataset_mock()
client_mock.get_table.return_value = table_mock()

# Setup mocks for GCS.
file_mock = MagicMock()

mocker.patch("pyiceberg.catalog.bigquery_metastore.Client", return_value=client_mock)
mocker.patch("pyiceberg.catalog.bigquery_metastore.FromInputFile.table_metadata", return_value=file_mock)
mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"})

catalog_name = "test_ddb_catalog"
identifier = (gcp_dataset_name, table_name)
test_catalog = BigQueryMetastoreCatalog(
catalog_name, **{"gcp.project-id": "alexstephen-test-1", "warehouse": "gs://alexstephen-test-bq-bucket/"}
)
test_catalog.create_namespace(namespace=gcp_dataset_name)
test_catalog.create_table(identifier, table_schema_nested)
test_catalog.drop_table(identifier)

client_mock.get_table.side_effect = NotFound("Table Not Found")
mocker.patch("pyiceberg.catalog.bigquery_metastore.Client", return_value=client_mock)

# Expect that the table no longer exists.
try:
test_catalog.load_table(identifier)
raise AssertionError()
except NoSuchTableError:
assert True


def test_drop_namespace(mocker: MockFixture, gcp_dataset_name: str) -> None:
client_mock = MagicMock()
mocker.patch("pyiceberg.catalog.bigquery_metastore.Client", return_value=client_mock)
mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"})

catalog_name = "test_catalog"
test_catalog = BigQueryMetastoreCatalog(catalog_name, **{"gcp.project-id": "alexstephen-test-1"})

test_catalog.drop_namespace(gcp_dataset_name)
client_mock.delete_dataset.assert_called_once()
args, _ = client_mock.delete_dataset.call_args
assert isinstance(args[0], Dataset)
assert args[0].dataset_id == gcp_dataset_name


def test_list_tables(mocker: MockFixture, gcp_dataset_name: str) -> None:
client_mock = MagicMock()

# Mock list_tables to return an iterator of TableListItem
table_list_item_1 = MagicMock()
table_list_item_1.table_id = "iceberg_table_A"
table_list_item_1.reference = TableReference(
dataset_ref=DatasetReference(project="my-project", dataset_id=gcp_dataset_name), table_id="iceberg_table_A"
)

table_list_item_2 = MagicMock()
table_list_item_2.table_id = "iceberg_table_B"
table_list_item_2.reference = TableReference(
dataset_ref=DatasetReference(project="my-project", dataset_id=gcp_dataset_name), table_id="iceberg_table_B"
)

client_mock.list_tables.return_value = iter([table_list_item_1, table_list_item_2])

# Mock get_table to always return a table that is considered an Iceberg table.
# The table_mock() function already creates a table with the necessary Iceberg properties.
client_mock.get_table.return_value = table_mock()

mocker.patch("pyiceberg.catalog.bigquery_metastore.Client", return_value=client_mock)
mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"})

catalog_name = "test_catalog"
test_catalog = BigQueryMetastoreCatalog(catalog_name, **{"gcp.project-id": "my-project"})

tables = test_catalog.list_tables(gcp_dataset_name)

# Assert that all tables returned by client.list_tables are listed
assert len(tables) == 2
assert (gcp_dataset_name, "iceberg_table_A") in tables
assert (gcp_dataset_name, "iceberg_table_B") in tables

client_mock.list_tables.assert_called_once_with(dataset=DatasetReference(project="my-project", dataset_id=gcp_dataset_name))


def test_list_namespaces(mocker: MockFixture) -> None:
client_mock = MagicMock()
dataset_item_1 = Dataset(DatasetReference(project="my-project", dataset_id="dataset1"))
dataset_item_2 = Dataset(DatasetReference(project="my-project", dataset_id="dataset2"))
client_mock.list_datasets.return_value = iter([dataset_item_1, dataset_item_2])

mocker.patch("pyiceberg.catalog.bigquery_metastore.Client", return_value=client_mock)
mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"})

catalog_name = "test_catalog"
test_catalog = BigQueryMetastoreCatalog(catalog_name, **{"gcp.project-id": "my-project"})

namespaces = test_catalog.list_namespaces()
assert len(namespaces) == 2
assert ("dataset1",) in namespaces
assert ("dataset2",) in namespaces
client_mock.list_datasets.assert_called_once()
Loading