Skip to content

Commit 3c94c83

Browse files
committed
Added BigQuery Metastore Catalog
1 parent a67c559 commit 3c94c83

File tree

8 files changed

+1061
-94
lines changed

8 files changed

+1061
-94
lines changed

mkdocs/docs/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ You can mix and match optional dependencies depending on your needs:
4646
| hive-kerberos | Support for Hive metastore in Kerberos environment |
4747
| glue | Support for AWS Glue |
4848
| dynamodb | Support for AWS DynamoDB |
49+
| bigquery | Support for Google Cloud BigQuery |
4950
| sql-postgres | Support for SQL Catalog backed by Postgresql |
5051
| sql-sqlite | Support for SQL Catalog backed by SQLite |
5152
| pyarrow | PyArrow as a FileIO implementation to interact with the object store |

poetry.lock

Lines changed: 306 additions & 94 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyiceberg/catalog/__init__.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ class CatalogType(Enum):
117117
DYNAMODB = "dynamodb"
118118
SQL = "sql"
119119
IN_MEMORY = "in-memory"
120+
BIGQUERY = "bigquery"
120121

121122

122123
def load_rest(name: str, conf: Properties) -> Catalog:
@@ -171,6 +172,15 @@ def load_in_memory(name: str, conf: Properties) -> Catalog:
171172
except ImportError as exc:
172173
raise NotInstalledError("SQLAlchemy support not installed: pip install 'pyiceberg[sql-sqlite]'") from exc
173174

175+
def load_bigquery(name: str, conf: Properties) -> Catalog:
176+
try:
177+
from pyiceberg.catalog.bigquery_metastore import BigQueryMetastoreCatalog
178+
179+
return BigQueryMetastoreCatalog(name, **conf)
180+
except ImportError as exc:
181+
raise NotInstalledError("BigQuery support not installed: pip install 'pyiceberg[bigquery]'") from exc
182+
183+
174184

175185
AVAILABLE_CATALOGS: dict[CatalogType, Callable[[str, Properties], Catalog]] = {
176186
CatalogType.REST: load_rest,
@@ -179,6 +189,7 @@ def load_in_memory(name: str, conf: Properties) -> Catalog:
179189
CatalogType.DYNAMODB: load_dynamodb,
180190
CatalogType.SQL: load_sql,
181191
CatalogType.IN_MEMORY: load_in_memory,
192+
CatalogType.BIGQUERY: load_bigquery,
182193
}
183194

184195

pyiceberg/catalog/bigquery_metastore.py

Lines changed: 409 additions & 0 deletions
Large diffs are not rendered by default.

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ python-snappy = { version = ">=0.6.0,<1.0.0", optional = true }
7373
thrift = { version = ">=0.13.0,<1.0.0", optional = true }
7474
mypy-boto3-glue = { version = ">=1.28.18", optional = true }
7575
boto3 = { version = ">=1.24.59", optional = true }
76+
google-cloud-bigquery = { version = "^3.33.0", optional = true }
7677
s3fs = { version = ">=2023.1.0", optional = true }
7778
adlfs = { version = ">=2023.1.0", optional = true }
7879
gcsfs = { version = ">=2023.1.0", optional = true }
@@ -302,6 +303,7 @@ s3fs = ["s3fs"]
302303
glue = ["boto3", "mypy-boto3-glue"]
303304
adlfs = ["adlfs"]
304305
dynamodb = ["boto3"]
306+
bigquery = ["google-cloud-bigquery"]
305307
zstandard = ["zstandard"]
306308
sql-postgres = ["sqlalchemy", "psycopg2-binary"]
307309
sql-sqlite = ["sqlalchemy"]
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
import os
18+
import pytest
19+
from pytest_mock import MockFixture
20+
21+
from pyiceberg.catalog.bigquery_metastore import BigQueryMetastoreCatalog
22+
from pyiceberg.exceptions import NamespaceAlreadyExistsError, NoSuchNamespaceError, NoSuchTableError
23+
from pyiceberg.io import load_file_io
24+
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC
25+
from pyiceberg.schema import Schema
26+
from pyiceberg.serializers import ToOutputFile
27+
from pyiceberg.table.metadata import new_table_metadata
28+
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER
29+
from tests.conftest import BQ_TABLE_METADATA_LOCATION_REGEX, BUCKET_NAME, TABLE_METADATA_LOCATION_REGEX
30+
31+
def test_create_table_with_database_location(
32+
mocker: MockFixture, _bucket_initialize: None, table_schema_nested: Schema, gcp_dataset_name: str, table_name: str
33+
) -> None:
34+
mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"})
35+
36+
catalog_name = "test_ddb_catalog"
37+
identifier = (gcp_dataset_name, table_name)
38+
test_catalog = BigQueryMetastoreCatalog(catalog_name, **{"gcp.project-id": "alexstephen-test-1", "warehouse": "gs://alexstephen-test-bq-bucket/"})
39+
test_catalog.create_namespace(namespace=gcp_dataset_name)
40+
table = test_catalog.create_table(identifier, table_schema_nested)
41+
assert table.name() == identifier
42+
assert BQ_TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
43+
44+
tables_in_namespace = test_catalog.list_tables(namespace=gcp_dataset_name)
45+
assert identifier in tables_in_namespace
46+
47+
def test_drop_table_with_database_location(
48+
mocker: MockFixture, _bucket_initialize: None, table_schema_nested: Schema, gcp_dataset_name: str, table_name: str
49+
) -> None:
50+
mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"})
51+
52+
catalog_name = "test_ddb_catalog"
53+
identifier = (gcp_dataset_name, table_name)
54+
test_catalog = BigQueryMetastoreCatalog(catalog_name, **{"gcp.project-id": "alexstephen-test-1", "warehouse": "gs://alexstephen-test-bq-bucket/"})
55+
test_catalog.create_namespace(namespace=gcp_dataset_name)
56+
table = test_catalog.create_table(identifier, table_schema_nested)
57+
test_catalog.drop_table(identifier)
58+
59+
tables_in_namespace_after_drop = test_catalog.list_tables(namespace=gcp_dataset_name)
60+
assert identifier not in tables_in_namespace_after_drop
61+
62+
# Expect that the table no longer exists.
63+
try:
64+
test_catalog.load_table(identifier)
65+
raise AssertionError()
66+
except NoSuchTableError as e:
67+
assert True
68+
69+
def test_create_and_drop_namespace(
70+
mocker: MockFixture, _bucket_initialize: None, table_schema_nested: Schema, gcp_dataset_name: str, table_name: str
71+
) -> None:
72+
mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"})
73+
74+
# Create namespace.
75+
catalog_name = "test_ddb_catalog"
76+
identifier = (gcp_dataset_name, table_name)
77+
test_catalog = BigQueryMetastoreCatalog(catalog_name, **{"gcp.project-id": "alexstephen-test-1", "warehouse": "gs://alexstephen-test-bq-bucket/"})
78+
test_catalog.create_namespace(namespace=gcp_dataset_name)
79+
80+
# Ensure that the namespace exists.
81+
namespaces = test_catalog.list_namespaces()
82+
assert (gcp_dataset_name,) in namespaces
83+
84+
# Drop the namespace and ensure it does not exist.
85+
test_catalog.drop_namespace(namespace=gcp_dataset_name)
86+
namespaces_after_drop = test_catalog.list_namespaces()
87+
assert (gcp_dataset_name,) not in namespaces_after_drop
88+
89+
# Verify with load_namespace_properties as well
90+
with pytest.raises(NoSuchNamespaceError):
91+
test_catalog.load_namespace_properties(gcp_dataset_name)
92+
93+
def test_register_table(
94+
mocker: MockFixture, _bucket_initialize: None, table_schema_nested: Schema, gcp_dataset_name: str, table_name: str
95+
) -> None:
96+
mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"})
97+
98+
catalog_name = "test_bq_register_catalog"
99+
identifier = (gcp_dataset_name, table_name)
100+
warehouse_path = "gs://alexstephen-test-bq-bucket/" # Matches conftest BUCKET_NAME for GCS interaction
101+
gcp_project_id = "alexstephen-test-1"
102+
103+
test_catalog = BigQueryMetastoreCatalog(catalog_name, **{"gcp.project-id": gcp_project_id, "warehouse": warehouse_path})
104+
105+
test_catalog.create_namespace(namespace=gcp_dataset_name)
106+
107+
# Manually create a metadata file in GCS
108+
table_gcs_location = f"{warehouse_path.rstrip('/')}/{gcp_dataset_name}.db/{table_name}"
109+
# Construct a unique metadata file name similar to how pyiceberg would
110+
metadata_file_name = "00000-aaaaaaaa-aaaa-4aaa-aaaa-aaaaaaaaaaaa.metadata.json"
111+
metadata_gcs_path = f"{table_gcs_location}/metadata/{metadata_file_name}"
112+
113+
metadata = new_table_metadata(location=table_gcs_location, schema=table_schema_nested, properties={}, partition_spec=UNPARTITIONED_PARTITION_SPEC, sort_order=UNSORTED_SORT_ORDER)
114+
io = load_file_io(properties=test_catalog.properties, location=metadata_gcs_path)
115+
test_catalog._write_metadata(metadata, io, metadata_gcs_path)
116+
ToOutputFile.table_metadata(metadata, io.new_output(metadata_gcs_path), overwrite=True)
117+
118+
# Register the table
119+
registered_table = test_catalog.register_table(identifier, metadata_gcs_path)
120+
121+
assert registered_table.name() == identifier
122+
assert registered_table.metadata_location == metadata_gcs_path
123+
assert registered_table.metadata.location == table_gcs_location
124+
assert BQ_TABLE_METADATA_LOCATION_REGEX.match(registered_table.metadata_location)
125+
126+
# Verify table exists and is loadable
127+
loaded_table = test_catalog.load_table(identifier)
128+
assert loaded_table.name() == registered_table.name()
129+
assert loaded_table.metadata_location == metadata_gcs_path
130+
131+
# Clean up
132+
test_catalog.drop_table(identifier)
133+
test_catalog.drop_namespace(gcp_dataset_name)
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
import os
18+
from unittest.mock import MagicMock
19+
20+
from google.api_core.exceptions import NotFound
21+
from google.cloud.bigquery import Dataset, DatasetReference, Table, TableReference
22+
from google.cloud.bigquery.external_config import ExternalCatalogDatasetOptions, ExternalCatalogTableOptions
23+
from pytest_mock import MockFixture
24+
25+
from pyiceberg.catalog.bigquery_metastore import ICEBERG_TABLE_TYPE_VALUE, TABLE_TYPE_PROP, BigQueryMetastoreCatalog
26+
from pyiceberg.exceptions import NoSuchTableError
27+
from pyiceberg.schema import Schema
28+
29+
30+
def dataset_mock() -> Dataset:
31+
d = Dataset(DatasetReference(dataset_id="my-dataset", project="my-project"))
32+
d.external_catalog_dataset_options = ExternalCatalogDatasetOptions(
33+
default_storage_location_uri="gs://test-bucket/iceberg-dataset"
34+
)
35+
return d
36+
37+
38+
def table_mock() -> Table:
39+
t = Table(TableReference(dataset_ref=DatasetReference(dataset_id="my-dataset", project="my-project"), table_id="my-table"))
40+
t.external_catalog_table_options = ExternalCatalogTableOptions(
41+
parameters={
42+
"metadata_location": "gs://alexstephen-test-bq-bucket/my_iceberg_database_aaaaaaaaaaaaaaaaaaaa.db/my_iceberg_table-bbbbbbbbbbbbbbbbbbbb/metadata/12343-aaaaaaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaa.metadata",
43+
TABLE_TYPE_PROP: ICEBERG_TABLE_TYPE_VALUE,
44+
}
45+
)
46+
return t
47+
48+
def test_create_table_with_database_location(
49+
mocker: MockFixture, _bucket_initialize: None, table_schema_nested: Schema, gcp_dataset_name: str, table_name: str
50+
) -> None:
51+
# Setup mocks for GCP.
52+
client_mock = MagicMock()
53+
client_mock.get_dataset.return_value = dataset_mock()
54+
client_mock.get_table.return_value = table_mock()
55+
56+
# Setup mocks for GCS.
57+
file_mock = MagicMock()
58+
59+
mocker.patch('pyiceberg.catalog.bigquery_metastore.Client', return_value=client_mock)
60+
mocker.patch('pyiceberg.catalog.bigquery_metastore.FromInputFile.table_metadata', return_value=file_mock)
61+
mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"})
62+
63+
catalog_name = "test_ddb_catalog"
64+
identifier = (gcp_dataset_name, table_name)
65+
test_catalog = BigQueryMetastoreCatalog(catalog_name, **{"gcp.project-id": "alexstephen-test-1", "warehouse": "gs://alexstephen-test-bq-bucket/"})
66+
test_catalog.create_namespace(namespace=gcp_dataset_name)
67+
table = test_catalog.create_table(identifier, table_schema_nested)
68+
assert table.name() == identifier
69+
70+
def test_drop_table_with_database_location(
71+
mocker: MockFixture, _bucket_initialize: None, table_schema_nested: Schema, gcp_dataset_name: str, table_name: str
72+
) -> None:
73+
# Setup mocks for GCP.
74+
client_mock = MagicMock()
75+
client_mock.get_dataset.return_value = dataset_mock()
76+
client_mock.get_table.return_value = table_mock()
77+
78+
# Setup mocks for GCS.
79+
file_mock = MagicMock()
80+
81+
mocker.patch('pyiceberg.catalog.bigquery_metastore.Client', return_value=client_mock)
82+
mocker.patch('pyiceberg.catalog.bigquery_metastore.FromInputFile.table_metadata', return_value=file_mock)
83+
mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"})
84+
85+
catalog_name = "test_ddb_catalog"
86+
identifier = (gcp_dataset_name, table_name)
87+
test_catalog = BigQueryMetastoreCatalog(catalog_name, **{"gcp.project-id": "alexstephen-test-1", "warehouse": "gs://alexstephen-test-bq-bucket/"})
88+
test_catalog.create_namespace(namespace=gcp_dataset_name)
89+
table = test_catalog.create_table(identifier, table_schema_nested)
90+
test_catalog.drop_table(identifier)
91+
92+
client_mock.get_table.side_effect = NotFound("Table Not Found")
93+
mocker.patch('pyiceberg.catalog.bigquery_metastore.Client', return_value=client_mock)
94+
95+
# Expect that the table no longer exists.
96+
try:
97+
test_catalog.load_table(identifier)
98+
raise AssertionError()
99+
except NoSuchTableError as e:
100+
assert True
101+
102+
103+
def test_drop_namespace(mocker: MockFixture, gcp_dataset_name: str) -> None:
104+
client_mock = MagicMock()
105+
mocker.patch('pyiceberg.catalog.bigquery_metastore.Client', return_value=client_mock)
106+
mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"})
107+
108+
catalog_name = "test_catalog"
109+
test_catalog = BigQueryMetastoreCatalog(catalog_name, **{"gcp.project-id": "alexstephen-test-1"})
110+
111+
test_catalog.drop_namespace(gcp_dataset_name)
112+
client_mock.delete_dataset.assert_called_once()
113+
args, _ = client_mock.delete_dataset.call_args
114+
assert isinstance(args[0], Dataset)
115+
assert args[0].dataset_id == gcp_dataset_name
116+
117+
118+
def test_list_tables(mocker: MockFixture, gcp_dataset_name: str) -> None:
119+
client_mock = MagicMock()
120+
121+
# Mock list_tables to return an iterator of TableListItem
122+
table_list_item_1 = MagicMock()
123+
table_list_item_1.table_id = "iceberg_table_A"
124+
table_list_item_1.reference = TableReference(
125+
dataset_ref=DatasetReference(project="my-project", dataset_id=gcp_dataset_name), table_id="iceberg_table_A"
126+
)
127+
128+
table_list_item_2 = MagicMock()
129+
table_list_item_2.table_id = "iceberg_table_B"
130+
table_list_item_2.reference = TableReference(
131+
dataset_ref=DatasetReference(project="my-project", dataset_id=gcp_dataset_name), table_id="iceberg_table_B"
132+
)
133+
134+
client_mock.list_tables.return_value = iter([table_list_item_1, table_list_item_2])
135+
136+
# Mock get_table to always return a table that is considered an Iceberg table.
137+
# The table_mock() function already creates a table with the necessary Iceberg properties.
138+
client_mock.get_table.return_value = table_mock()
139+
140+
mocker.patch('pyiceberg.catalog.bigquery_metastore.Client', return_value=client_mock)
141+
mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"})
142+
143+
catalog_name = "test_catalog"
144+
test_catalog = BigQueryMetastoreCatalog(catalog_name, **{"gcp.project-id": "my-project"})
145+
146+
tables = test_catalog.list_tables(gcp_dataset_name)
147+
148+
# Assert that all tables returned by client.list_tables are listed
149+
assert len(tables) == 2
150+
assert (gcp_dataset_name, "iceberg_table_A") in tables
151+
assert (gcp_dataset_name, "iceberg_table_B") in tables
152+
153+
client_mock.list_tables.assert_called_once_with(dataset=DatasetReference(project="my-project", dataset_id=gcp_dataset_name))
154+
155+
def test_list_namespaces(mocker: MockFixture) -> None:
156+
client_mock = MagicMock()
157+
dataset_item_1 = Dataset(DatasetReference(project="my-project", dataset_id="dataset1"))
158+
dataset_item_2 = Dataset(DatasetReference(project="my-project", dataset_id="dataset2"))
159+
client_mock.list_datasets.return_value = iter([dataset_item_1, dataset_item_2])
160+
161+
mocker.patch('pyiceberg.catalog.bigquery_metastore.Client', return_value=client_mock)
162+
mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"})
163+
164+
catalog_name = "test_catalog"
165+
test_catalog = BigQueryMetastoreCatalog(catalog_name, **{"gcp.project-id": "my-project"})
166+
167+
namespaces = test_catalog.list_namespaces()
168+
assert len(namespaces) == 2
169+
assert ("dataset1",) in namespaces
170+
assert ("dataset2",) in namespaces
171+
client_mock.list_datasets.assert_called_once()

0 commit comments

Comments
 (0)