15
15
# specific language governing permissions and limitations
16
16
# under the License.
17
17
import json
18
- from typing import Any , List , Optional , Set , Tuple , Union
18
+ from typing import TYPE_CHECKING , Any , List , Optional , Set , Tuple , Union
19
19
20
20
from google .api_core .exceptions import NotFound
21
21
from google .cloud .bigquery import Client , Dataset , DatasetReference , TableReference
40
40
from pyiceberg .typedef import EMPTY_DICT , Identifier , Properties
41
41
from pyiceberg .utils .config import Config
42
42
43
+ if TYPE_CHECKING :
44
+ import pyarrow as pa
45
+
43
46
GCP_PROJECT_ID = "gcp.project-id"
44
47
GCP_LOCATION = "gcp.location"
45
48
GCP_CREDENTIALS_LOCATION = "gcp.credentials-location"
54
57
HIVE_FILE_INPUT_FORMAT = "org.apache.iceberg.mr.hive.HiveIcebergInputFormat"
55
58
HIVE_FILE_OUTPUT_FORMAT = "org.apache.iceberg.mr.hive.HiveIcebergOutputFormat"
56
59
60
+
57
61
class BigQueryMetastoreCatalog (MetastoreCatalog ):
58
62
def __init__ (self , name : str , ** properties : str ):
59
63
super ().__init__ (name , ** properties )
@@ -138,7 +142,9 @@ def create_table(
138
142
dataset_ref = DatasetReference (project = self .project_id , dataset_id = dataset_name )
139
143
140
144
try :
141
- table = self ._make_new_table (metadata , metadata_location , TableReference (dataset_ref = dataset_ref , table_id = table_name ))
145
+ table = self ._make_new_table (
146
+ metadata , metadata_location , TableReference (dataset_ref = dataset_ref , table_id = table_name )
147
+ )
142
148
self .client .create_table (table )
143
149
except Conflict as e :
144
150
raise TableAlreadyExistsError (f"Table { table_name } already exists" ) from e
@@ -161,12 +167,13 @@ def create_namespace(self, namespace: Union[str, Identifier], properties: Proper
161
167
try :
162
168
dataset_ref = DatasetReference (project = self .project_id , dataset_id = database_name )
163
169
dataset = Dataset (dataset_ref = dataset_ref )
164
- dataset .external_catalog_dataset_options = self ._create_external_catalog_dataset_options (self ._get_default_warehouse_location_for_dataset (database_name ), properties , dataset_ref )
170
+ dataset .external_catalog_dataset_options = self ._create_external_catalog_dataset_options (
171
+ self ._get_default_warehouse_location_for_dataset (database_name ), properties , dataset_ref
172
+ )
165
173
self .client .create_dataset (dataset )
166
174
except Conflict as e :
167
175
raise NamespaceAlreadyExistsError ("Namespace {database_name} already exists" ) from e
168
176
169
-
170
177
def load_table (self , identifier : Union [str , Identifier ]) -> Table :
171
178
"""
172
179
Load the table's metadata and returns the table instance.
@@ -196,7 +203,6 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:
196
203
except NotFound as e :
197
204
raise NoSuchTableError (f"Table does not exist: { dataset_name } .{ table_name } " ) from e
198
205
199
-
200
206
def drop_table (self , identifier : Union [str , Identifier ]) -> None :
201
207
"""Drop a table.
202
208
@@ -222,11 +228,9 @@ def commit_table(
222
228
) -> CommitTableResponse :
223
229
raise NotImplementedError
224
230
225
-
226
231
def rename_table (self , from_identifier : Union [str , Identifier ], to_identifier : Union [str , Identifier ]) -> Table :
227
232
raise NotImplementedError
228
233
229
-
230
234
def drop_namespace (self , namespace : Union [str , Identifier ]) -> None :
231
235
database_name = self .identifier_to_database (namespace )
232
236
@@ -283,7 +287,9 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
283
287
metadata = FromInputFile .table_metadata (file )
284
288
285
289
try :
286
- table = self ._make_new_table (metadata , metadata_location , TableReference (dataset_ref = dataset_ref , table_id = table_name ))
290
+ table = self ._make_new_table (
291
+ metadata , metadata_location , TableReference (dataset_ref = dataset_ref , table_id = table_name )
292
+ )
287
293
self .client .create_table (table )
288
294
except Conflict as e :
289
295
raise TableAlreadyExistsError (f"Table { table_name } already exists" ) from e
@@ -316,21 +322,16 @@ def update_namespace_properties(
316
322
) -> PropertiesUpdateSummary :
317
323
raise NotImplementedError
318
324
319
-
320
325
def _make_new_table (self , metadata : TableMetadata , metadata_file_location : str , table_ref : TableReference ) -> BQTable :
321
- """
322
- To make the table queryable from Hive, the user would likely be setting the HIVE_ENGINE_ENABLED
323
- parameter.
324
-
325
- """
326
+ """To make the table queryable from Hive, the user would likely be setting the HIVE_ENGINE_ENABLED parameter."""
326
327
table = BQTable (table_ref )
327
328
328
329
# In Python, you typically set the external data configuration directly.
329
330
# BigQueryMetastoreUtils.create_external_catalog_table_options is mapped to
330
331
# constructing the external_data_configuration for the Table object.
331
332
external_config_options = self ._create_external_catalog_table_options (
332
333
metadata .location ,
333
- self ._create_table_parameters (metadata_file_location = metadata_file_location , table_metadata = metadata )
334
+ self ._create_table_parameters (metadata_file_location = metadata_file_location , table_metadata = metadata ),
334
335
)
335
336
336
337
# Apply the external configuration to the Table object.
@@ -340,22 +341,27 @@ def _make_new_table(self, metadata: TableMetadata, metadata_file_location: str,
340
341
341
342
return table
342
343
343
- def _create_external_catalog_table_options (self , location : str , parameters : dict ) -> ExternalCatalogTableOptions :
344
+ def _create_external_catalog_table_options (self , location : str , parameters : dict [ str , Any ] ) -> ExternalCatalogTableOptions :
344
345
# This structure directly maps to what BigQuery's ExternalConfig expects for Hive.
345
346
return ExternalCatalogTableOptions (
346
347
storage_descriptor = StorageDescriptor (
347
348
location_uri = location ,
348
349
input_format = HIVE_FILE_INPUT_FORMAT ,
349
350
output_format = HIVE_FILE_OUTPUT_FORMAT ,
350
- serde_info = SerDeInfo (serialization_library = HIVE_SERIALIZATION_LIBRARY )
351
+ serde_info = SerDeInfo (serialization_library = HIVE_SERIALIZATION_LIBRARY ),
351
352
),
352
- parameters = parameters
353
+ parameters = parameters ,
353
354
)
354
355
355
- def _create_external_catalog_dataset_options (self , default_storage_location : str , metadataParameters : dict , dataset_ref : DatasetReference ) -> ExternalCatalogDatasetOptions :
356
- return ExternalCatalogDatasetOptions (default_storage_location_uri = self ._get_default_warehouse_location_for_dataset (dataset_ref .dataset_id ), parameters = metadataParameters )
356
+ def _create_external_catalog_dataset_options (
357
+ self , default_storage_location : str , metadataParameters : dict [str , Any ], dataset_ref : DatasetReference
358
+ ) -> ExternalCatalogDatasetOptions :
359
+ return ExternalCatalogDatasetOptions (
360
+ default_storage_location_uri = self ._get_default_warehouse_location_for_dataset (dataset_ref .dataset_id ),
361
+ parameters = metadataParameters ,
362
+ )
357
363
358
- def _convert_bigquery_table_to_iceberg_table (self , identifier : str , table : BQTable ) -> Table :
364
+ def _convert_bigquery_table_to_iceberg_table (self , identifier : Union [ str , Identifier ] , table : BQTable ) -> Table :
359
365
dataset_name , table_name = self .identifier_to_database_and_table (identifier , NoSuchTableError )
360
366
metadata_location = ""
361
367
if table .external_catalog_table_options and table .external_catalog_table_options .parameters :
@@ -381,29 +387,33 @@ def _create_table_parameters(self, metadata_file_location: str, table_metadata:
381
387
parameters ["EXTERNAL" ] = True
382
388
383
389
# Add Hive-style basic statistics from snapshot metadata if it exists.
384
- if table_metadata .current_snapshot ():
385
-
386
- if table_metadata .current_snapshot ().summary .get (TOTAL_DATA_FILES ):
387
- parameters ["numFiles" ] = table_metadata .current_snapshot .summary .get (TOTAL_DATA_FILES )
390
+ snapshot = table_metadata .current_snapshot ()
391
+ if snapshot :
392
+ summary = snapshot .summary
393
+ if summary :
394
+ if summary .get (TOTAL_DATA_FILES ):
395
+ parameters ["numFiles" ] = summary .get (TOTAL_DATA_FILES )
388
396
389
- if table_metadata . current_snapshot (). summary .get (TOTAL_RECORDS ):
390
- parameters ["numRows" ] = table_metadata . current_snapshot . summary .get (TOTAL_RECORDS )
397
+ if summary .get (TOTAL_RECORDS ):
398
+ parameters ["numRows" ] = summary .get (TOTAL_RECORDS )
391
399
392
- if table_metadata . current_snapshot (). summary .get (TOTAL_FILE_SIZE ):
393
- parameters ["totalSize" ] = table_metadata . current_snapshot . summary .get (TOTAL_FILE_SIZE )
400
+ if summary .get (TOTAL_FILE_SIZE ):
401
+ parameters ["totalSize" ] = summary .get (TOTAL_FILE_SIZE )
394
402
395
403
return parameters
396
404
397
- def _default_storage_location (self , location : Optional [str ], dataset_ref : DatasetReference ) -> str | None :
405
+ def _default_storage_location (self , location : Optional [str ], dataset_ref : DatasetReference ) -> Union [ str , None ] :
398
406
if location :
399
407
return location
400
408
dataset = self .client .get_dataset (dataset_ref )
401
409
if dataset and dataset .external_catalog_dataset_options :
402
410
return dataset .external_catalog_dataset_options .default_storage_location_uri
403
411
412
+ raise ValueError ("Could not find default storage location" )
413
+
404
414
def _get_default_warehouse_location_for_dataset (self , database_name : str ) -> str :
405
415
if warehouse_path := self .properties .get (WAREHOUSE_LOCATION ):
406
416
warehouse_path = warehouse_path .rstrip ("/" )
407
417
return f"{ warehouse_path } /{ database_name } .db"
408
418
409
- raise ValueError ("No default path is set, please specify a location when creating a table" )
419
+ raise ValueError ("No default path is set, please specify a location when creating a table" )
0 commit comments