diff --git a/mkdocs/docs/index.md b/mkdocs/docs/index.md index 714baa0d69..ee4a9156e0 100644 --- a/mkdocs/docs/index.md +++ b/mkdocs/docs/index.md @@ -46,6 +46,7 @@ You can mix and match optional dependencies depending on your needs: | hive-kerberos | Support for Hive metastore in Kerberos environment | | glue | Support for AWS Glue | | dynamodb | Support for AWS DynamoDB | +| bigquery | Support for Google Cloud BigQuery | | sql-postgres | Support for SQL Catalog backed by Postgresql | | sql-sqlite | Support for SQL Catalog backed by SQLite | | pyarrow | PyArrow as a FileIO implementation to interact with the object store | diff --git a/poetry.lock b/poetry.lock index bbd207d7bb..24b3669fc0 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.1.3 and should not be changed by hand. [[package]] name = "adlfs" @@ -201,7 +201,7 @@ description = "aiosignal: a list of registered asynchronous callbacks" optional = true python-versions = ">=3.9" groups = ["main"] -markers = "(extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\") and (extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"ray\")" +markers = "python_version == \"3.9\" and extra == \"ray\" or (extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\") and python_version < \"3.10\" or python_version >= \"3.10\" and (extra == \"ray\" or extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\")" files = [ {file = "aiosignal-1.3.2-py2.py3-none-any.whl", hash = "sha256:45cde58e409a301715980c2b01d0c28bdde3770d8290b5eb2173759d9acb31a5"}, {file = "aiosignal-1.3.2.tar.gz", hash = "sha256:a8c255c66fafb1e499c9351d0bf32ff2d8a0321595ebac3b93713656d2436f54"}, @@ -284,7 +284,7 @@ files = [ {file = "attrs-25.3.0-py3-none-any.whl", hash = "sha256:427318ce031701fea540783410126f03899a97ffc6f61596ad581ac2e40e3bc3"}, {file = "attrs-25.3.0.tar.gz", hash = "sha256:75d7cefc7fb576747b2c81b4442d4d4a1ce0900973527c011d1030fd3bf4af1b"}, ] -markers = {main = "(extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\") and (extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"ray\")"} +markers = {main = "python_version == \"3.9\" and extra == \"ray\" or (extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\") and python_version < \"3.10\" or python_version >= \"3.10\" and (extra == \"ray\" or extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\")"} [package.extras] benchmark = ["cloudpickle ; platform_python_implementation == \"CPython\"", "hypothesis", "mypy (>=1.11.1) ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pympler", "pytest (>=4.3.0)", "pytest-codspeed", "pytest-mypy-plugins ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pytest-xdist[psutil]"] @@ -1434,7 +1434,7 @@ files = [ {file = "filelock-3.18.0-py3-none-any.whl", hash = "sha256:c401f4f8377c4464e6db25fff06205fd89bdd83b65eb0488ed1b160f780e21de"}, {file = "filelock-3.18.0.tar.gz", hash = "sha256:adbc88eabb99d2fec8c9c1b229b171f18afa655400173ddc653d5d01501fb9f2"}, ] -markers = {main = "extra == \"ray\" or extra == \"hf\""} +markers = {main = "python_version == \"3.9\" and (extra == \"ray\" or extra == \"hf\") or extra == \"hf\" or extra == \"ray\""} [package.extras] docs = ["furo (>=2024.8.6)", "sphinx (>=8.1.3)", "sphinx-autodoc-typehints (>=3)"] @@ -1489,7 +1489,7 @@ description = "A list-like structure which implements collections.abc.MutableSeq optional = true python-versions = ">=3.9" groups = ["main"] -markers = "(extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\") and (extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"ray\")" +markers = "python_version == \"3.9\" and extra == \"ray\" or (extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\") and python_version < \"3.10\" or python_version >= \"3.10\" and (extra == \"ray\" or extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\")" files = [ {file = "frozenlist-1.7.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:cc4df77d638aa2ed703b878dd093725b72a824c3c546c076e8fdf276f78ee84a"}, {file = "frozenlist-1.7.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:716a9973a2cc963160394f701964fe25012600f3d311f60c790400b00e568b61"}, @@ -1726,7 +1726,7 @@ description = "Google API client core library" optional = true python-versions = ">=3.7" groups = ["main"] -markers = "extra == \"gcsfs\"" +markers = "extra == \"bigquery\" or extra == \"gcsfs\"" files = [ {file = "google_api_core-2.25.0-py3-none-any.whl", hash = "sha256:1db79d1281dcf9f3d10023283299ba38f3dc9f639ec41085968fd23e5bcf512e"}, {file = "google_api_core-2.25.0.tar.gz", hash = "sha256:9b548e688702f82a34ed8409fb8a6961166f0b7795032f0be8f48308dff4333a"}, @@ -1735,6 +1735,14 @@ files = [ [package.dependencies] google-auth = ">=2.14.1,<3.0.0" googleapis-common-protos = ">=1.56.2,<2.0.0" +grpcio = [ + {version = ">=1.33.2,<2.0.0", optional = true, markers = "extra == \"grpc\""}, + {version = ">=1.49.1,<2.0.0", optional = true, markers = "python_version >= \"3.11\" and extra == \"grpc\""}, +] +grpcio-status = [ + {version = ">=1.33.2,<2.0.0", optional = true, markers = "extra == \"grpc\""}, + {version = ">=1.49.1,<2.0.0", optional = true, markers = "python_version >= \"3.11\" and extra == \"grpc\""}, +] proto-plus = [ {version = ">=1.22.3,<2.0.0"}, {version = ">=1.25.0,<2.0.0", markers = "python_version >= \"3.13\""}, @@ -1755,7 +1763,7 @@ description = "Google Authentication Library" optional = true python-versions = ">=3.7" groups = ["main"] -markers = "extra == \"gcsfs\"" +markers = "extra == \"bigquery\" or extra == \"gcsfs\"" files = [ {file = "google_auth-2.40.3-py2.py3-none-any.whl", hash = "sha256:1370d4593e86213563547f97a92752fc658456fe4514c809544f330fed45a7ca"}, {file = "google_auth-2.40.3.tar.gz", hash = "sha256:500c3a29adedeb36ea9cf24b8d10858e152f2412e3ca37829b3fa18e33d63b77"}, @@ -1796,6 +1804,40 @@ requests-oauthlib = ">=0.7.0" [package.extras] tool = ["click (>=6.0.0)"] +[[package]] +name = "google-cloud-bigquery" +version = "3.34.0" +description = "Google BigQuery API client library" +optional = true +python-versions = ">=3.9" +groups = ["main"] +markers = "extra == \"bigquery\"" +files = [ + {file = "google_cloud_bigquery-3.34.0-py3-none-any.whl", hash = "sha256:de20ded0680f8136d92ff5256270b5920dfe4fae479f5d0f73e90e5df30b1cf7"}, + {file = "google_cloud_bigquery-3.34.0.tar.gz", hash = "sha256:5ee1a78ba5c2ccb9f9a8b2bf3ed76b378ea68f49b6cac0544dc55cc97ff7c1ce"}, +] + +[package.dependencies] +google-api-core = {version = ">=2.11.1,<3.0.0", extras = ["grpc"]} +google-auth = ">=2.14.1,<3.0.0" +google-cloud-core = ">=2.4.1,<3.0.0" +google-resumable-media = ">=2.0.0,<3.0.0" +packaging = ">=24.2.0" +python-dateutil = ">=2.8.2,<3.0.0" +requests = ">=2.21.0,<3.0.0" + +[package.extras] +all = ["google-cloud-bigquery[bigquery-v2,bqstorage,geopandas,ipython,ipywidgets,matplotlib,opentelemetry,pandas,tqdm]"] +bigquery-v2 = ["proto-plus (>=1.22.3,<2.0.0)", "protobuf (>=3.20.2,!=4.21.0,!=4.21.1,!=4.21.2,!=4.21.3,!=4.21.4,!=4.21.5,<7.0.0)"] +bqstorage = ["google-cloud-bigquery-storage (>=2.18.0,<3.0.0)", "grpcio (>=1.47.0,<2.0.0)", "grpcio (>=1.49.1,<2.0.0) ; python_version >= \"3.11\"", "pyarrow (>=4.0.0)"] +geopandas = ["Shapely (>=1.8.4,<3.0.0)", "geopandas (>=0.9.0,<2.0.0)"] +ipython = ["bigquery-magics (>=0.6.0)", "ipython (>=7.23.1)"] +ipywidgets = ["ipykernel (>=6.2.0)", "ipywidgets (>=7.7.1)"] +matplotlib = ["matplotlib (>=3.10.3) ; python_version >= \"3.10\"", "matplotlib (>=3.7.1,<=3.9.2) ; python_version == \"3.9\""] +opentelemetry = ["opentelemetry-api (>=1.1.0)", "opentelemetry-instrumentation (>=0.20b0)", "opentelemetry-sdk (>=1.1.0)"] +pandas = ["db-dtypes (>=1.0.4,<2.0.0)", "grpcio (>=1.47.0,<2.0.0)", "grpcio (>=1.49.1,<2.0.0) ; python_version >= \"3.11\"", "pandas (>=1.3.0)", "pandas-gbq (>=0.26.1)", "pyarrow (>=3.0.0)"] +tqdm = ["tqdm (>=4.23.4,<5.0.0)"] + [[package]] name = "google-cloud-core" version = "2.4.3" @@ -1803,7 +1845,7 @@ description = "Google Cloud API client core library" optional = true python-versions = ">=3.7" groups = ["main"] -markers = "extra == \"gcsfs\"" +markers = "extra == \"bigquery\" or extra == \"gcsfs\"" files = [ {file = "google_cloud_core-2.4.3-py2.py3-none-any.whl", hash = "sha256:5130f9f4c14b4fafdff75c79448f9495cfade0d8775facf1b09c3bf67e027f6e"}, {file = "google_cloud_core-2.4.3.tar.gz", hash = "sha256:1fab62d7102844b278fe6dead3af32408b1df3eb06f5c7e8634cbd40edc4da53"}, @@ -1848,7 +1890,7 @@ description = "A python wrapper of the C library 'Google CRC32C'" optional = true python-versions = ">=3.9" groups = ["main"] -markers = "extra == \"gcsfs\"" +markers = "extra == \"bigquery\" or extra == \"gcsfs\"" files = [ {file = "google_crc32c-1.7.1-cp310-cp310-macosx_12_0_arm64.whl", hash = "sha256:b07d48faf8292b4db7c3d64ab86f950c2e94e93a11fd47271c28ba458e4a0d76"}, {file = "google_crc32c-1.7.1-cp310-cp310-macosx_12_0_x86_64.whl", hash = "sha256:7cc81b3a2fbd932a4313eb53cc7d9dde424088ca3a0337160f35d91826880c1d"}, @@ -1896,7 +1938,7 @@ description = "Utilities for Google Media Downloads and Resumable Uploads" optional = true python-versions = ">=3.7" groups = ["main"] -markers = "extra == \"gcsfs\"" +markers = "extra == \"bigquery\" or extra == \"gcsfs\"" files = [ {file = "google_resumable_media-2.7.2-py2.py3-none-any.whl", hash = "sha256:3ce7551e9fe6d99e9a126101d2536612bb73486721951e9562fee0f90c6ababa"}, {file = "google_resumable_media-2.7.2.tar.gz", hash = "sha256:5280aed4629f2b60b847b0d42f9857fd4935c11af266744df33d8074cae92fe0"}, @@ -1916,7 +1958,7 @@ description = "Common protobufs used in Google APIs" optional = true python-versions = ">=3.7" groups = ["main"] -markers = "extra == \"gcsfs\"" +markers = "extra == \"bigquery\" or extra == \"gcsfs\"" files = [ {file = "googleapis_common_protos-1.70.0-py3-none-any.whl", hash = "sha256:b8bfcca8c25a2bb253e0e0b0adaf8c00773e5e6af6fd92397576680b807e0fd8"}, {file = "googleapis_common_protos-1.70.0.tar.gz", hash = "sha256:0e1b44e0ea153e6594f9f394fef15193a68aaaea2d843f83e2742717ca753257"}, @@ -2027,6 +2069,89 @@ files = [ [package.dependencies] colorama = ">=0.4" +[[package]] +name = "grpcio" +version = "1.73.0" +description = "HTTP/2-based RPC framework" +optional = true +python-versions = ">=3.9" +groups = ["main"] +markers = "extra == \"bigquery\"" +files = [ + {file = "grpcio-1.73.0-cp310-cp310-linux_armv7l.whl", hash = "sha256:d050197eeed50f858ef6c51ab09514856f957dba7b1f7812698260fc9cc417f6"}, + {file = "grpcio-1.73.0-cp310-cp310-macosx_11_0_universal2.whl", hash = "sha256:ebb8d5f4b0200916fb292a964a4d41210de92aba9007e33d8551d85800ea16cb"}, + {file = "grpcio-1.73.0-cp310-cp310-manylinux_2_17_aarch64.whl", hash = "sha256:c0811331b469e3f15dda5f90ab71bcd9681189a83944fd6dc908e2c9249041ef"}, + {file = "grpcio-1.73.0-cp310-cp310-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:12787c791c3993d0ea1cc8bf90393647e9a586066b3b322949365d2772ba965b"}, + {file = "grpcio-1.73.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2c17771e884fddf152f2a0df12478e8d02853e5b602a10a9a9f1f52fa02b1d32"}, + {file = "grpcio-1.73.0-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:275e23d4c428c26b51857bbd95fcb8e528783597207ec592571e4372b300a29f"}, + {file = "grpcio-1.73.0-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:9ffc972b530bf73ef0f948f799482a1bf12d9b6f33406a8e6387c0ca2098a833"}, + {file = "grpcio-1.73.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:ebd8d269df64aff092b2cec5e015d8ae09c7e90888b5c35c24fdca719a2c9f35"}, + {file = "grpcio-1.73.0-cp310-cp310-win32.whl", hash = "sha256:072d8154b8f74300ed362c01d54af8b93200c1a9077aeaea79828d48598514f1"}, + {file = "grpcio-1.73.0-cp310-cp310-win_amd64.whl", hash = "sha256:ce953d9d2100e1078a76a9dc2b7338d5415924dc59c69a15bf6e734db8a0f1ca"}, + {file = "grpcio-1.73.0-cp311-cp311-linux_armv7l.whl", hash = "sha256:51036f641f171eebe5fa7aaca5abbd6150f0c338dab3a58f9111354240fe36ec"}, + {file = "grpcio-1.73.0-cp311-cp311-macosx_11_0_universal2.whl", hash = "sha256:d12bbb88381ea00bdd92c55aff3da3391fd85bc902c41275c8447b86f036ce0f"}, + {file = "grpcio-1.73.0-cp311-cp311-manylinux_2_17_aarch64.whl", hash = "sha256:483c507c2328ed0e01bc1adb13d1eada05cc737ec301d8e5a8f4a90f387f1790"}, + {file = "grpcio-1.73.0-cp311-cp311-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c201a34aa960c962d0ce23fe5f423f97e9d4b518ad605eae6d0a82171809caaa"}, + {file = "grpcio-1.73.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:859f70c8e435e8e1fa060e04297c6818ffc81ca9ebd4940e180490958229a45a"}, + {file = "grpcio-1.73.0-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:e2459a27c6886e7e687e4e407778425f3c6a971fa17a16420227bda39574d64b"}, + {file = "grpcio-1.73.0-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:e0084d4559ee3dbdcce9395e1bc90fdd0262529b32c417a39ecbc18da8074ac7"}, + {file = "grpcio-1.73.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:ef5fff73d5f724755693a464d444ee0a448c6cdfd3c1616a9223f736c622617d"}, + {file = "grpcio-1.73.0-cp311-cp311-win32.whl", hash = "sha256:965a16b71a8eeef91fc4df1dc40dc39c344887249174053814f8a8e18449c4c3"}, + {file = "grpcio-1.73.0-cp311-cp311-win_amd64.whl", hash = "sha256:b71a7b4483d1f753bbc11089ff0f6fa63b49c97a9cc20552cded3fcad466d23b"}, + {file = "grpcio-1.73.0-cp312-cp312-linux_armv7l.whl", hash = "sha256:fb9d7c27089d9ba3746f18d2109eb530ef2a37452d2ff50f5a6696cd39167d3b"}, + {file = "grpcio-1.73.0-cp312-cp312-macosx_11_0_universal2.whl", hash = "sha256:128ba2ebdac41e41554d492b82c34586a90ebd0766f8ebd72160c0e3a57b9155"}, + {file = "grpcio-1.73.0-cp312-cp312-manylinux_2_17_aarch64.whl", hash = "sha256:068ecc415f79408d57a7f146f54cdf9f0acb4b301a52a9e563973dc981e82f3d"}, + {file = "grpcio-1.73.0-cp312-cp312-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6ddc1cfb2240f84d35d559ade18f69dcd4257dbaa5ba0de1a565d903aaab2968"}, + {file = "grpcio-1.73.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e53007f70d9783f53b41b4cf38ed39a8e348011437e4c287eee7dd1d39d54b2f"}, + {file = "grpcio-1.73.0-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:4dd8d8d092efede7d6f48d695ba2592046acd04ccf421436dd7ed52677a9ad29"}, + {file = "grpcio-1.73.0-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:70176093d0a95b44d24baa9c034bb67bfe2b6b5f7ebc2836f4093c97010e17fd"}, + {file = "grpcio-1.73.0-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:085ebe876373ca095e24ced95c8f440495ed0b574c491f7f4f714ff794bbcd10"}, + {file = "grpcio-1.73.0-cp312-cp312-win32.whl", hash = "sha256:cfc556c1d6aef02c727ec7d0016827a73bfe67193e47c546f7cadd3ee6bf1a60"}, + {file = "grpcio-1.73.0-cp312-cp312-win_amd64.whl", hash = "sha256:bbf45d59d090bf69f1e4e1594832aaf40aa84b31659af3c5e2c3f6a35202791a"}, + {file = "grpcio-1.73.0-cp313-cp313-linux_armv7l.whl", hash = "sha256:da1d677018ef423202aca6d73a8d3b2cb245699eb7f50eb5f74cae15a8e1f724"}, + {file = "grpcio-1.73.0-cp313-cp313-macosx_11_0_universal2.whl", hash = "sha256:36bf93f6a657f37c131d9dd2c391b867abf1426a86727c3575393e9e11dadb0d"}, + {file = "grpcio-1.73.0-cp313-cp313-manylinux_2_17_aarch64.whl", hash = "sha256:d84000367508ade791d90c2bafbd905574b5ced8056397027a77a215d601ba15"}, + {file = "grpcio-1.73.0-cp313-cp313-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c98ba1d928a178ce33f3425ff823318040a2b7ef875d30a0073565e5ceb058d9"}, + {file = "grpcio-1.73.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a73c72922dfd30b396a5f25bb3a4590195ee45ecde7ee068acb0892d2900cf07"}, + {file = "grpcio-1.73.0-cp313-cp313-musllinux_1_1_aarch64.whl", hash = "sha256:10e8edc035724aba0346a432060fd192b42bd03675d083c01553cab071a28da5"}, + {file = "grpcio-1.73.0-cp313-cp313-musllinux_1_1_i686.whl", hash = "sha256:f5cdc332b503c33b1643b12ea933582c7b081957c8bc2ea4cc4bc58054a09288"}, + {file = "grpcio-1.73.0-cp313-cp313-musllinux_1_1_x86_64.whl", hash = "sha256:07ad7c57233c2109e4ac999cb9c2710c3b8e3f491a73b058b0ce431f31ed8145"}, + {file = "grpcio-1.73.0-cp313-cp313-win32.whl", hash = "sha256:0eb5df4f41ea10bda99a802b2a292d85be28958ede2a50f2beb8c7fc9a738419"}, + {file = "grpcio-1.73.0-cp313-cp313-win_amd64.whl", hash = "sha256:38cf518cc54cd0c47c9539cefa8888549fcc067db0b0c66a46535ca8032020c4"}, + {file = "grpcio-1.73.0-cp39-cp39-linux_armv7l.whl", hash = "sha256:1284850607901cfe1475852d808e5a102133461ec9380bc3fc9ebc0686ee8e32"}, + {file = "grpcio-1.73.0-cp39-cp39-macosx_11_0_universal2.whl", hash = "sha256:0e092a4b28eefb63eec00d09ef33291cd4c3a0875cde29aec4d11d74434d222c"}, + {file = "grpcio-1.73.0-cp39-cp39-manylinux_2_17_aarch64.whl", hash = "sha256:33577fe7febffe8ebad458744cfee8914e0c10b09f0ff073a6b149a84df8ab8f"}, + {file = "grpcio-1.73.0-cp39-cp39-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:60813d8a16420d01fa0da1fc7ebfaaa49a7e5051b0337cd48f4f950eb249a08e"}, + {file = "grpcio-1.73.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2a9c957dc65e5d474378d7bcc557e9184576605d4b4539e8ead6e351d7ccce20"}, + {file = "grpcio-1.73.0-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:3902b71407d021163ea93c70c8531551f71ae742db15b66826cf8825707d2908"}, + {file = "grpcio-1.73.0-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:1dd7fa7276dcf061e2d5f9316604499eea06b1b23e34a9380572d74fe59915a8"}, + {file = "grpcio-1.73.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:2d1510c4ea473110cb46a010555f2c1a279d1c256edb276e17fa571ba1e8927c"}, + {file = "grpcio-1.73.0-cp39-cp39-win32.whl", hash = "sha256:d0a1517b2005ba1235a1190b98509264bf72e231215dfeef8db9a5a92868789e"}, + {file = "grpcio-1.73.0-cp39-cp39-win_amd64.whl", hash = "sha256:6228f7eb6d9f785f38b589d49957fca5df3d5b5349e77d2d89b14e390165344c"}, + {file = "grpcio-1.73.0.tar.gz", hash = "sha256:3af4c30918a7f0d39de500d11255f8d9da4f30e94a2033e70fe2a720e184bd8e"}, +] + +[package.extras] +protobuf = ["grpcio-tools (>=1.73.0)"] + +[[package]] +name = "grpcio-status" +version = "1.73.0" +description = "Status proto mapping for gRPC" +optional = true +python-versions = ">=3.9" +groups = ["main"] +markers = "extra == \"bigquery\"" +files = [ + {file = "grpcio_status-1.73.0-py3-none-any.whl", hash = "sha256:a3f3a9994b44c364f014e806114ba44cc52e50c426779f958c8b22f14ff0d892"}, + {file = "grpcio_status-1.73.0.tar.gz", hash = "sha256:a2b7f430568217f884fe52a5a0133b6f4c9338beae33fb5370134a8eaf58f974"}, +] + +[package.dependencies] +googleapis-common-protos = ">=1.5.5" +grpcio = ">=1.73.0" +protobuf = ">=6.30.0,<7.0.0" + [[package]] name = "hf-xet" version = "1.1.3" @@ -3353,7 +3478,7 @@ description = "Fundamental package for array computing in Python" optional = true python-versions = ">=3.10" groups = ["main"] -markers = "(extra == \"pandas\" or extra == \"ray\") and python_version == \"3.10\"" +markers = "python_version == \"3.10\" and (extra == \"pandas\" or extra == \"ray\")" files = [ {file = "numpy-2.2.6-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:b412caa66f72040e6d268491a59f2c43bf03eb6c96dd8f0307829feb7fa2b6fb"}, {file = "numpy-2.2.6-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:8e41fd67c52b86603a91c1a505ebaef50b3314de0213461c7a6e99c9a3beff90"}, @@ -3419,7 +3544,7 @@ description = "Fundamental package for array computing in Python" optional = true python-versions = ">=3.11" groups = ["main"] -markers = "python_version >= \"3.11\" and (extra == \"pandas\" or extra == \"ray\")" +markers = "(extra == \"pandas\" or extra == \"ray\") and python_version >= \"3.11\"" files = [ {file = "numpy-2.3.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:c3c9fdde0fa18afa1099d6257eb82890ea4f3102847e692193b54e00312a9ae9"}, {file = "numpy-2.3.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:46d16f72c2192da7b83984aa5455baee640e33a9f1e61e656f29adf55e406c2b"}, @@ -3538,7 +3663,7 @@ files = [ {file = "packaging-25.0-py3-none-any.whl", hash = "sha256:29572ef2b1f17581046b3a2227d5c611fb25ec70ca1ba8554b24b0e69331a484"}, {file = "packaging-25.0.tar.gz", hash = "sha256:d443872c98d677bf60f6a1f2f8c1cb748e8fe762d2bf9d3148b5599295b0fc4f"}, ] -markers = {main = "extra == \"ray\" or extra == \"hf\""} +markers = {main = "python_version == \"3.9\" and extra == \"ray\" or (extra == \"bigquery\" or extra == \"hf\") and python_version < \"3.10\" or python_version >= \"3.10\" and (extra == \"ray\" or extra == \"bigquery\" or extra == \"hf\")"} [[package]] name = "paginate" @@ -3612,8 +3737,8 @@ files = [ [package.dependencies] numpy = [ {version = ">=1.22.4", markers = "python_version < \"3.11\""}, - {version = ">=1.23.2", markers = "python_version == \"3.11\""}, {version = ">=1.26.0", markers = "python_version >= \"3.12\""}, + {version = ">=1.23.2", markers = "python_version == \"3.11\""}, ] python-dateutil = ">=2.8.2" pytz = ">=2020.1" @@ -3893,7 +4018,7 @@ description = "Beautiful, Pythonic protocol buffers" optional = true python-versions = ">=3.7" groups = ["main"] -markers = "extra == \"gcsfs\"" +markers = "extra == \"bigquery\" or extra == \"gcsfs\"" files = [ {file = "proto_plus-1.26.1-py3-none-any.whl", hash = "sha256:13285478c2dcf2abb829db158e1047e2f1e8d63a077d94263c2b88b043c75a66"}, {file = "proto_plus-1.26.1.tar.gz", hash = "sha256:21a515a4c4c0088a773899e23c7bbade3d18f9c66c73edd4c7ee3816bc96a012"}, @@ -3912,7 +4037,7 @@ description = "" optional = true python-versions = ">=3.9" groups = ["main"] -markers = "extra == \"ray\" or extra == \"gcsfs\"" +markers = "python_version == \"3.9\" and extra == \"ray\" or (extra == \"bigquery\" or extra == \"gcsfs\") and python_version < \"3.10\" or python_version >= \"3.10\" and (extra == \"ray\" or extra == \"bigquery\" or extra == \"gcsfs\")" files = [ {file = "protobuf-6.31.1-cp310-abi3-win32.whl", hash = "sha256:7fa17d5a29c2e04b7d90e5e32388b8bfd0e7107cd8e616feef7ed3fa6bdab5c9"}, {file = "protobuf-6.31.1-cp310-abi3-win_amd64.whl", hash = "sha256:426f59d2964864a1a366254fa703b8632dcec0790d8862d30034d8245e1cd447"}, @@ -4123,7 +4248,7 @@ description = "Pure-Python implementation of ASN.1 types and DER/BER/CER codecs optional = true python-versions = ">=3.8" groups = ["main"] -markers = "extra == \"gcsfs\"" +markers = "extra == \"bigquery\" or extra == \"gcsfs\"" files = [ {file = "pyasn1-0.6.1-py3-none-any.whl", hash = "sha256:0d632f46f2ba09143da3a8afe9e33fb6f92fa2320ab7e886e2d0f7672af84629"}, {file = "pyasn1-0.6.1.tar.gz", hash = "sha256:6f580d2bdd84365380830acf45550f2511469f673cb4a5ae3857a3170128b034"}, @@ -4136,7 +4261,7 @@ description = "A collection of ASN.1-based protocols modules" optional = true python-versions = ">=3.8" groups = ["main"] -markers = "extra == \"gcsfs\"" +markers = "extra == \"bigquery\" or extra == \"gcsfs\"" files = [ {file = "pyasn1_modules-0.4.2-py3-none-any.whl", hash = "sha256:29253a9207ce32b64c3ac6600edc75368f98473906e8fd1043bd6b5b1de2c14a"}, {file = "pyasn1_modules-0.4.2.tar.gz", hash = "sha256:677091de870a80aae844b1ca6134f54652fa2c8c5a52aa396440ac3106e941e6"}, @@ -4716,7 +4841,7 @@ files = [ {file = "PyYAML-6.0.2-cp39-cp39-win_amd64.whl", hash = "sha256:39693e1f8320ae4f43943590b49779ffb98acb81f788220ea932a6b6c51004d8"}, {file = "pyyaml-6.0.2.tar.gz", hash = "sha256:d584d9ec91ad65861cc08d42e834324ef890a082e591037abe114850ff7bbc3e"}, ] -markers = {main = "extra == \"ray\" or extra == \"hf\""} +markers = {main = "python_version == \"3.9\" and (extra == \"ray\" or extra == \"hf\") or extra == \"hf\" or extra == \"ray\""} [[package]] name = "pyyaml-env-tag" @@ -5198,7 +5323,7 @@ description = "Pure-Python RSA implementation" optional = true python-versions = "<4,>=3.6" groups = ["main"] -markers = "extra == \"gcsfs\"" +markers = "extra == \"bigquery\" or extra == \"gcsfs\"" files = [ {file = "rsa-4.9.1-py3-none-any.whl", hash = "sha256:68635866661c6836b8d39430f97a996acbd61bfa49406748ea243539fe239762"}, {file = "rsa-4.9.1.tar.gz", hash = "sha256:e7bdbfdb5497da4c07dfd35530e1a902659db6ff241e39d9953cad06ebd0ae75"}, @@ -6283,6 +6408,7 @@ cffi = ["cffi (>=1.11)"] [extras] adlfs = ["adlfs"] +bigquery = ["google-cloud-bigquery"] daft = ["getdaft"] duckdb = ["duckdb", "pyarrow"] dynamodb = ["boto3"] @@ -6306,4 +6432,4 @@ zstandard = ["zstandard"] [metadata] lock-version = "2.1" python-versions = "^3.9.2, !=3.9.7" -content-hash = "7e2fa5ddc3b2389fc07541d6f2d4b4136cec8bef32dbe1dec13199818ef88212" +content-hash = "48ee72159d6ec314b7ddb528fc8cd33991118aabede8739dbd1cfb5614b0a23f" diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index 81d3a34eaa..ff5a17f19e 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -117,6 +117,7 @@ class CatalogType(Enum): DYNAMODB = "dynamodb" SQL = "sql" IN_MEMORY = "in-memory" + BIGQUERY = "bigquery" def load_rest(name: str, conf: Properties) -> Catalog: @@ -172,6 +173,15 @@ def load_in_memory(name: str, conf: Properties) -> Catalog: raise NotInstalledError("SQLAlchemy support not installed: pip install 'pyiceberg[sql-sqlite]'") from exc +def load_bigquery(name: str, conf: Properties) -> Catalog: + try: + from pyiceberg.catalog.bigquery_metastore import BigQueryMetastoreCatalog + + return BigQueryMetastoreCatalog(name, **conf) + except ImportError as exc: + raise NotInstalledError("BigQuery support not installed: pip install 'pyiceberg[bigquery]'") from exc + + AVAILABLE_CATALOGS: dict[CatalogType, Callable[[str, Properties], Catalog]] = { CatalogType.REST: load_rest, CatalogType.HIVE: load_hive, @@ -179,6 +189,7 @@ def load_in_memory(name: str, conf: Properties) -> Catalog: CatalogType.DYNAMODB: load_dynamodb, CatalogType.SQL: load_sql, CatalogType.IN_MEMORY: load_in_memory, + CatalogType.BIGQUERY: load_bigquery, } diff --git a/pyiceberg/catalog/bigquery_metastore.py b/pyiceberg/catalog/bigquery_metastore.py new file mode 100644 index 0000000000..546d16fae7 --- /dev/null +++ b/pyiceberg/catalog/bigquery_metastore.py @@ -0,0 +1,419 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import json +from typing import TYPE_CHECKING, Any, List, Optional, Set, Tuple, Union + +from google.api_core.exceptions import NotFound +from google.cloud.bigquery import Client, Dataset, DatasetReference, TableReference +from google.cloud.bigquery import Table as BQTable +from google.cloud.bigquery.external_config import ExternalCatalogDatasetOptions, ExternalCatalogTableOptions +from google.cloud.bigquery.schema import SerDeInfo, StorageDescriptor +from google.cloud.exceptions import Conflict +from google.oauth2 import service_account + +from pyiceberg.catalog import WAREHOUSE_LOCATION, MetastoreCatalog, PropertiesUpdateSummary +from pyiceberg.exceptions import NamespaceAlreadyExistsError, NoSuchNamespaceError, NoSuchTableError, TableAlreadyExistsError +from pyiceberg.io import load_file_io +from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.serializers import FromInputFile +from pyiceberg.table import CommitTableResponse, Table +from pyiceberg.table.locations import load_location_provider +from pyiceberg.table.metadata import TableMetadata, new_table_metadata +from pyiceberg.table.snapshots import TOTAL_DATA_FILES, TOTAL_FILE_SIZE, TOTAL_RECORDS +from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder +from pyiceberg.table.update import TableRequirement, TableUpdate +from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties +from pyiceberg.utils.config import Config + +if TYPE_CHECKING: + import pyarrow as pa + +GCP_PROJECT_ID = "gcp.project-id" +GCP_LOCATION = "gcp.location" +GCP_CREDENTIALS_LOCATION = "gcp.credentials-location" +GCP_CREDENTIALS_INFO = "gcp.credentials-info" + +METADATA_LOCATION_PROP = "metadata_location" +PREVIOUS_METADATA_LOCATION_PROP = "previous_metadata_location" +TABLE_TYPE_PROP = "table_type" +ICEBERG_TABLE_TYPE_VALUE = "ICEBERG" + +HIVE_SERIALIZATION_LIBRARY = "org.apache.iceberg.mr.hive.HiveIcebergSerDe" +HIVE_FILE_INPUT_FORMAT = "org.apache.iceberg.mr.hive.HiveIcebergInputFormat" +HIVE_FILE_OUTPUT_FORMAT = "org.apache.iceberg.mr.hive.HiveIcebergOutputFormat" + + +class BigQueryMetastoreCatalog(MetastoreCatalog): + def __init__(self, name: str, **properties: str): + super().__init__(name, **properties) + + project_id: Optional[str] = self.properties.get(GCP_PROJECT_ID) + location: Optional[str] = self.properties.get(GCP_LOCATION) + credentials_location: Optional[str] = self.properties.get(GCP_CREDENTIALS_LOCATION) + credentials_info_str: Optional[str] = self.properties.get(GCP_CREDENTIALS_INFO) + + if not project_id: + raise ValueError(f"Missing property: {GCP_PROJECT_ID}") + + # BigQuery requires current-snapshot-id to be present for tables to be created. + if not Config().get_bool("legacy-current-snapshot-id"): + raise ValueError("legacy-current-snapshot-id must be enabled to work with BigQuery.") + + gcp_credentials = None + if credentials_location: + gcp_credentials = service_account.Credentials.from_service_account_file(credentials_location) + elif credentials_info_str: + try: + credentials_info_dict = json.loads(credentials_info_str) + gcp_credentials = service_account.Credentials.from_service_account_info(credentials_info_dict) + except json.JSONDecodeError as e: + raise ValueError(f"Invalid JSON string for {GCP_CREDENTIALS_INFO}: {e}") from e + except TypeError as e: # from_service_account_info can raise TypeError for bad structure + raise ValueError(f"Invalid credentials structure for {GCP_CREDENTIALS_INFO}: {e}") from e + + self.client: Client = Client( + project=project_id, + credentials=gcp_credentials, + location=location, + ) + + self.location = location + self.project_id = project_id + + def create_table( + self, + identifier: Union[str, Identifier], + schema: Union[Schema, "pa.Schema"], + location: Optional[str] = None, + partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, + sort_order: SortOrder = UNSORTED_SORT_ORDER, + properties: Properties = EMPTY_DICT, + ) -> Table: + """ + Create an Iceberg table. + + Args: + identifier: Table identifier. + schema: Table's schema. + location: Location for the table. Optional Argument. + partition_spec: PartitionSpec for the table. + sort_order: SortOrder for the table. + properties: Table properties that can be a string based dictionary. + + Returns: + Table: the created table instance. + + Raises: + AlreadyExistsError: If a table with the name already exists. + ValueError: If the identifier is invalid, or no path is given to store metadata. + + """ + schema: Schema = self._convert_schema_if_needed(schema) # type: ignore + + dataset_name, table_name = self.identifier_to_database_and_table(identifier) + + dataset_ref = DatasetReference(project=self.project_id, dataset_id=dataset_name) + location = self._resolve_table_location(location, dataset_name, table_name) + provider = load_location_provider(table_location=location, table_properties=properties) + metadata_location = provider.new_table_metadata_file_location() + + metadata = new_table_metadata( + location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties + ) + + io = load_file_io(properties=self.properties, location=metadata_location) + self._write_metadata(metadata, io, metadata_location) + + dataset_ref = DatasetReference(project=self.project_id, dataset_id=dataset_name) + + try: + table = self._make_new_table( + metadata, metadata_location, TableReference(dataset_ref=dataset_ref, table_id=table_name) + ) + self.client.create_table(table) + except Conflict as e: + raise TableAlreadyExistsError(f"Table {table_name} already exists") from e + + return self.load_table(identifier=identifier) + + def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None: + """Create a namespace in the catalog. + + Args: + namespace: Namespace identifier. + properties: A string dictionary of properties for the given namespace. + + Raises: + ValueError: If the identifier is invalid. + AlreadyExistsError: If a namespace with the given name already exists. + """ + database_name = self.identifier_to_database(namespace) + + try: + dataset_ref = DatasetReference(project=self.project_id, dataset_id=database_name) + dataset = Dataset(dataset_ref=dataset_ref) + dataset.external_catalog_dataset_options = self._create_external_catalog_dataset_options( + self._get_default_warehouse_location_for_dataset(database_name), properties, dataset_ref + ) + self.client.create_dataset(dataset) + except Conflict as e: + raise NamespaceAlreadyExistsError("Namespace {database_name} already exists") from e + + def load_table(self, identifier: Union[str, Identifier]) -> Table: + """ + Load the table's metadata and returns the table instance. + + You can also use this method to check for table existence using 'try catalog.table() except TableNotFoundError'. + Note: This method doesn't scan data stored in the table. + + Args: + identifier: Table identifier. + + Returns: + Table: the table instance with its metadata. + + Raises: + NoSuchTableError: If a table with the name does not exist, or the identifier is invalid. + """ + database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError) + dataset_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError) + + try: + table_ref = TableReference( + dataset_ref=DatasetReference(project=self.project_id, dataset_id=dataset_name), + table_id=table_name, + ) + table = self.client.get_table(table_ref) + return self._convert_bigquery_table_to_iceberg_table(identifier, table) + except NotFound as e: + raise NoSuchTableError(f"Table does not exist: {dataset_name}.{table_name}") from e + + def drop_table(self, identifier: Union[str, Identifier]) -> None: + """Drop a table. + + Args: + identifier: Table identifier. + + Raises: + NoSuchTableError: If a table with the name does not exist, or the identifier is invalid. + """ + dataset_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError) + + try: + table_ref = TableReference( + dataset_ref=DatasetReference(project=self.project_id, dataset_id=dataset_name), + table_id=table_name, + ) + self.client.delete_table(table_ref) + except NoSuchTableError as e: + raise NoSuchTableError(f"Table does not exist: {dataset_name}.{table_name}") from e + + def commit_table( + self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...] + ) -> CommitTableResponse: + raise NotImplementedError + + def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table: + raise NotImplementedError + + def drop_namespace(self, namespace: Union[str, Identifier]) -> None: + database_name = self.identifier_to_database(namespace) + + try: + dataset_ref = DatasetReference(project=self.project_id, dataset_id=database_name) + dataset = Dataset(dataset_ref=dataset_ref) + self.client.delete_dataset(dataset) + except NotFound as e: + raise NoSuchNamespaceError(f"Namespace {namespace} does not exist.") from e + + def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: + database_name = self.identifier_to_database(namespace) + iceberg_tables: List[Identifier] = [] + try: + dataset_ref = DatasetReference(project=self.project_id, dataset_id=database_name) + # The list_tables method returns an iterator of TableListItem + bq_tables_iterator = self.client.list_tables(dataset=dataset_ref) + + for bq_table_list_item in bq_tables_iterator: + iceberg_tables.append((database_name, bq_table_list_item.table_id)) + except NotFound: + raise NoSuchNamespaceError(f"Namespace (dataset) '{database_name}' not found.") from None + return iceberg_tables + + def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]: + # Since this catalog only supports one-level namespaces, it always returns an empty list unless + # passed an empty namespace to list all namespaces within the catalog. + if namespace: + raise NoSuchNamespaceError(f"Namespace (dataset) '{namespace}' not found.") from None + + # List top-level datasets + datasets_iterator = self.client.list_datasets() + return [(dataset.dataset_id,) for dataset in datasets_iterator] + + def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: + """Register a new table using existing metadata. + + Args: + identifier Union[str, Identifier]: Table identifier for the table + metadata_location str: The location to the metadata + + Returns: + Table: The newly registered table + + Raises: + TableAlreadyExistsError: If the table already exists + """ + dataset_name, table_name = self.identifier_to_database_and_table(identifier) + + dataset_ref = DatasetReference(project=self.project_id, dataset_id=dataset_name) + + io = self._load_file_io(location=metadata_location) + file = io.new_input(metadata_location) + metadata = FromInputFile.table_metadata(file) + + try: + table = self._make_new_table( + metadata, metadata_location, TableReference(dataset_ref=dataset_ref, table_id=table_name) + ) + self.client.create_table(table) + except Conflict as e: + raise TableAlreadyExistsError(f"Table {table_name} already exists") from e + + return self.load_table(identifier=identifier) + + def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]: + raise NotImplementedError + + def drop_view(self, identifier: Union[str, Identifier]) -> None: + raise NotImplementedError + + def view_exists(self, identifier: Union[str, Identifier]) -> bool: + raise NotImplementedError + + def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties: + dataset_name = self.identifier_to_database(namespace) + + try: + dataset = self.client.get_dataset(DatasetReference(project=self.project_id, dataset_id=dataset_name)) + + if dataset and dataset.external_catalog_dataset_options: + return dataset.external_catalog_dataset_options.to_api_repr() + except NotFound as e: + raise NoSuchNamespaceError(f"Namespace {namespace} not found") from e + return {} + + def update_namespace_properties( + self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT + ) -> PropertiesUpdateSummary: + raise NotImplementedError + + def _make_new_table(self, metadata: TableMetadata, metadata_file_location: str, table_ref: TableReference) -> BQTable: + """To make the table queryable from Hive, the user would likely be setting the HIVE_ENGINE_ENABLED parameter.""" + table = BQTable(table_ref) + + # In Python, you typically set the external data configuration directly. + # BigQueryMetastoreUtils.create_external_catalog_table_options is mapped to + # constructing the external_data_configuration for the Table object. + external_config_options = self._create_external_catalog_table_options( + metadata.location, + self._create_table_parameters(metadata_file_location=metadata_file_location, table_metadata=metadata), + ) + + # Apply the external configuration to the Table object. + # This will depend on the exact structure returned by create_external_catalog_table_options. + # A common way to set up an external table in BigQuery Python client is: + table.external_catalog_table_options = external_config_options + + return table + + def _create_external_catalog_table_options(self, location: str, parameters: dict[str, Any]) -> ExternalCatalogTableOptions: + # This structure directly maps to what BigQuery's ExternalConfig expects for Hive. + return ExternalCatalogTableOptions( + storage_descriptor=StorageDescriptor( + location_uri=location, + input_format=HIVE_FILE_INPUT_FORMAT, + output_format=HIVE_FILE_OUTPUT_FORMAT, + serde_info=SerDeInfo(serialization_library=HIVE_SERIALIZATION_LIBRARY), + ), + parameters=parameters, + ) + + def _create_external_catalog_dataset_options( + self, default_storage_location: str, metadataParameters: dict[str, Any], dataset_ref: DatasetReference + ) -> ExternalCatalogDatasetOptions: + return ExternalCatalogDatasetOptions( + default_storage_location_uri=self._get_default_warehouse_location_for_dataset(dataset_ref.dataset_id), + parameters=metadataParameters, + ) + + def _convert_bigquery_table_to_iceberg_table(self, identifier: Union[str, Identifier], table: BQTable) -> Table: + dataset_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError) + metadata_location = "" + if table.external_catalog_table_options and table.external_catalog_table_options.parameters: + metadata_location = table.external_catalog_table_options.parameters[METADATA_LOCATION_PROP] + io = load_file_io(properties=self.properties, location=metadata_location) + file = io.new_input(metadata_location) + metadata = FromInputFile.table_metadata(file) + + return Table( + identifier=(dataset_name, table_name), + metadata=metadata, + metadata_location=metadata_location, + io=self._load_file_io(metadata.properties, metadata_location), + catalog=self, + ) + + def _create_table_parameters(self, metadata_file_location: str, table_metadata: TableMetadata) -> dict[str, Any]: + parameters: dict[str, Any] = table_metadata.properties + if table_metadata.table_uuid: + parameters["uuid"] = str(table_metadata.table_uuid) + parameters[METADATA_LOCATION_PROP] = metadata_file_location + parameters[TABLE_TYPE_PROP] = ICEBERG_TABLE_TYPE_VALUE + parameters["EXTERNAL"] = True + + # Add Hive-style basic statistics from snapshot metadata if it exists. + snapshot = table_metadata.current_snapshot() + if snapshot: + summary = snapshot.summary + if summary: + if summary.get(TOTAL_DATA_FILES): + parameters["numFiles"] = summary.get(TOTAL_DATA_FILES) + + if summary.get(TOTAL_RECORDS): + parameters["numRows"] = summary.get(TOTAL_RECORDS) + + if summary.get(TOTAL_FILE_SIZE): + parameters["totalSize"] = summary.get(TOTAL_FILE_SIZE) + + return parameters + + def _default_storage_location(self, location: Optional[str], dataset_ref: DatasetReference) -> Union[str, None]: + if location: + return location + dataset = self.client.get_dataset(dataset_ref) + if dataset and dataset.external_catalog_dataset_options: + return dataset.external_catalog_dataset_options.default_storage_location_uri + + raise ValueError("Could not find default storage location") + + def _get_default_warehouse_location_for_dataset(self, database_name: str) -> str: + if warehouse_path := self.properties.get(WAREHOUSE_LOCATION): + warehouse_path = warehouse_path.rstrip("/") + return f"{warehouse_path}/{database_name}.db" + + raise ValueError("No default path is set, please specify a location when creating a table") diff --git a/pyproject.toml b/pyproject.toml index d759a95f3e..032cbae856 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -72,6 +72,7 @@ ray = [ python-snappy = { version = ">=0.6.0,<1.0.0", optional = true } thrift = { version = ">=0.13.0,<1.0.0", optional = true } boto3 = { version = ">=1.24.59", optional = true } +google-cloud-bigquery = { version = "^3.33.0", optional = true } s3fs = { version = ">=2023.1.0", optional = true } adlfs = { version = ">=2023.1.0", optional = true } gcsfs = { version = ">=2023.1.0", optional = true } @@ -282,6 +283,10 @@ ignore_missing_imports = true module = "pyiceberg_core.*" ignore_missing_imports = true +[[tool.mypy.overrides]] +module = "google.*" +ignore_missing_imports = true + [tool.poetry.scripts] pyiceberg = "pyiceberg.cli.console:run" @@ -307,6 +312,7 @@ s3fs = ["s3fs"] glue = ["boto3"] adlfs = ["adlfs"] dynamodb = ["boto3"] +bigquery = ["google-cloud-bigquery"] zstandard = ["zstandard"] sql-postgres = ["sqlalchemy", "psycopg2-binary"] sql-sqlite = ["sqlalchemy"] diff --git a/tests/catalog/integration_test_bigquery_metastore.py b/tests/catalog/integration_test_bigquery_metastore.py new file mode 100644 index 0000000000..6f5f93be35 --- /dev/null +++ b/tests/catalog/integration_test_bigquery_metastore.py @@ -0,0 +1,149 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import os + +import pytest +from pytest_mock import MockFixture + +from pyiceberg.catalog.bigquery_metastore import BigQueryMetastoreCatalog +from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchTableError +from pyiceberg.io import load_file_io +from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC +from pyiceberg.schema import Schema +from pyiceberg.serializers import ToOutputFile +from pyiceberg.table.metadata import new_table_metadata +from pyiceberg.table.sorting import UNSORTED_SORT_ORDER +from tests.conftest import BQ_TABLE_METADATA_LOCATION_REGEX + + +def test_create_table_with_database_location( + mocker: MockFixture, _bucket_initialize: None, table_schema_nested: Schema, gcp_dataset_name: str, table_name: str +) -> None: + mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"}) + + catalog_name = "test_ddb_catalog" + identifier = (gcp_dataset_name, table_name) + test_catalog = BigQueryMetastoreCatalog( + catalog_name, **{"gcp.project-id": "alexstephen-test-1", "warehouse": "gs://alexstephen-test-bq-bucket/"} + ) + test_catalog.create_namespace(namespace=gcp_dataset_name) + table = test_catalog.create_table(identifier, table_schema_nested) + assert table.name() == identifier + assert BQ_TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location) + + tables_in_namespace = test_catalog.list_tables(namespace=gcp_dataset_name) + assert identifier in tables_in_namespace + + +def test_drop_table_with_database_location( + mocker: MockFixture, _bucket_initialize: None, table_schema_nested: Schema, gcp_dataset_name: str, table_name: str +) -> None: + mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"}) + + catalog_name = "test_ddb_catalog" + identifier = (gcp_dataset_name, table_name) + test_catalog = BigQueryMetastoreCatalog( + catalog_name, **{"gcp.project-id": "alexstephen-test-1", "warehouse": "gs://alexstephen-test-bq-bucket/"} + ) + test_catalog.create_namespace(namespace=gcp_dataset_name) + test_catalog.create_table(identifier, table_schema_nested) + test_catalog.drop_table(identifier) + + tables_in_namespace_after_drop = test_catalog.list_tables(namespace=gcp_dataset_name) + assert identifier not in tables_in_namespace_after_drop + + # Expect that the table no longer exists. + try: + test_catalog.load_table(identifier) + raise AssertionError() + except NoSuchTableError: + assert True + + +def test_create_and_drop_namespace( + mocker: MockFixture, _bucket_initialize: None, table_schema_nested: Schema, gcp_dataset_name: str, table_name: str +) -> None: + mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"}) + + # Create namespace. + catalog_name = "test_ddb_catalog" + test_catalog = BigQueryMetastoreCatalog( + catalog_name, **{"gcp.project-id": "alexstephen-test-1", "warehouse": "gs://alexstephen-test-bq-bucket/"} + ) + test_catalog.create_namespace(namespace=gcp_dataset_name) + + # Ensure that the namespace exists. + namespaces = test_catalog.list_namespaces() + assert (gcp_dataset_name,) in namespaces + + # Drop the namespace and ensure it does not exist. + test_catalog.drop_namespace(namespace=gcp_dataset_name) + namespaces_after_drop = test_catalog.list_namespaces() + assert (gcp_dataset_name,) not in namespaces_after_drop + + # Verify with load_namespace_properties as well + with pytest.raises(NoSuchNamespaceError): + test_catalog.load_namespace_properties(gcp_dataset_name) + + +def test_register_table( + mocker: MockFixture, _bucket_initialize: None, table_schema_nested: Schema, gcp_dataset_name: str, table_name: str +) -> None: + mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"}) + + catalog_name = "test_bq_register_catalog" + identifier = (gcp_dataset_name, table_name) + warehouse_path = "gs://alexstephen-test-bq-bucket/" # Matches conftest BUCKET_NAME for GCS interaction + gcp_project_id = "alexstephen-test-1" + + test_catalog = BigQueryMetastoreCatalog(catalog_name, **{"gcp.project-id": gcp_project_id, "warehouse": warehouse_path}) + + test_catalog.create_namespace(namespace=gcp_dataset_name) + + # Manually create a metadata file in GCS + table_gcs_location = f"{warehouse_path.rstrip('/')}/{gcp_dataset_name}.db/{table_name}" + # Construct a unique metadata file name similar to how pyiceberg would + metadata_file_name = "00000-aaaaaaaa-aaaa-4aaa-aaaa-aaaaaaaaaaaa.metadata.json" + metadata_gcs_path = f"{table_gcs_location}/metadata/{metadata_file_name}" + + metadata = new_table_metadata( + location=table_gcs_location, + schema=table_schema_nested, + properties={}, + partition_spec=UNPARTITIONED_PARTITION_SPEC, + sort_order=UNSORTED_SORT_ORDER, + ) + io = load_file_io(properties=test_catalog.properties, location=metadata_gcs_path) + test_catalog._write_metadata(metadata, io, metadata_gcs_path) + ToOutputFile.table_metadata(metadata, io.new_output(metadata_gcs_path), overwrite=True) + + # Register the table + registered_table = test_catalog.register_table(identifier, metadata_gcs_path) + + assert registered_table.name() == identifier + assert registered_table.metadata_location == metadata_gcs_path + assert registered_table.metadata.location == table_gcs_location + assert BQ_TABLE_METADATA_LOCATION_REGEX.match(registered_table.metadata_location) + + # Verify table exists and is loadable + loaded_table = test_catalog.load_table(identifier) + assert loaded_table.name() == registered_table.name() + assert loaded_table.metadata_location == metadata_gcs_path + + # Clean up + test_catalog.drop_table(identifier) + test_catalog.drop_namespace(gcp_dataset_name) diff --git a/tests/catalog/test_bigquery_metastore.py b/tests/catalog/test_bigquery_metastore.py new file mode 100644 index 0000000000..5260ffa145 --- /dev/null +++ b/tests/catalog/test_bigquery_metastore.py @@ -0,0 +1,178 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import os +from unittest.mock import MagicMock + +from google.api_core.exceptions import NotFound +from google.cloud.bigquery import Dataset, DatasetReference, Table, TableReference +from google.cloud.bigquery.external_config import ExternalCatalogDatasetOptions, ExternalCatalogTableOptions +from pytest_mock import MockFixture + +from pyiceberg.catalog.bigquery_metastore import ICEBERG_TABLE_TYPE_VALUE, TABLE_TYPE_PROP, BigQueryMetastoreCatalog +from pyiceberg.exceptions import NoSuchTableError +from pyiceberg.schema import Schema + + +def dataset_mock() -> Dataset: + d = Dataset(DatasetReference(dataset_id="my-dataset", project="my-project")) + d.external_catalog_dataset_options = ExternalCatalogDatasetOptions( + default_storage_location_uri="gs://test-bucket/iceberg-dataset" + ) + return d + + +def table_mock() -> Table: + t = Table(TableReference(dataset_ref=DatasetReference(dataset_id="my-dataset", project="my-project"), table_id="my-table")) + t.external_catalog_table_options = ExternalCatalogTableOptions( + parameters={ + "metadata_location": "gs://alexstephen-test-bq-bucket/my_iceberg_database_aaaaaaaaaaaaaaaaaaaa.db/my_iceberg_table-bbbbbbbbbbbbbbbbbbbb/metadata/12343-aaaaaaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaa.metadata", + TABLE_TYPE_PROP: ICEBERG_TABLE_TYPE_VALUE, + } + ) + return t + + +def test_create_table_with_database_location( + mocker: MockFixture, _bucket_initialize: None, table_schema_nested: Schema, gcp_dataset_name: str, table_name: str +) -> None: + # Setup mocks for GCP. + client_mock = MagicMock() + client_mock.get_dataset.return_value = dataset_mock() + client_mock.get_table.return_value = table_mock() + + # Setup mocks for GCS. + file_mock = MagicMock() + + mocker.patch("pyiceberg.catalog.bigquery_metastore.Client", return_value=client_mock) + mocker.patch("pyiceberg.catalog.bigquery_metastore.FromInputFile.table_metadata", return_value=file_mock) + mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"}) + + catalog_name = "test_ddb_catalog" + identifier = (gcp_dataset_name, table_name) + test_catalog = BigQueryMetastoreCatalog( + catalog_name, **{"gcp.project-id": "alexstephen-test-1", "warehouse": "gs://alexstephen-test-bq-bucket/"} + ) + test_catalog.create_namespace(namespace=gcp_dataset_name) + table = test_catalog.create_table(identifier, table_schema_nested) + assert table.name() == identifier + + +def test_drop_table_with_database_location( + mocker: MockFixture, _bucket_initialize: None, table_schema_nested: Schema, gcp_dataset_name: str, table_name: str +) -> None: + # Setup mocks for GCP. + client_mock = MagicMock() + client_mock.get_dataset.return_value = dataset_mock() + client_mock.get_table.return_value = table_mock() + + # Setup mocks for GCS. + file_mock = MagicMock() + + mocker.patch("pyiceberg.catalog.bigquery_metastore.Client", return_value=client_mock) + mocker.patch("pyiceberg.catalog.bigquery_metastore.FromInputFile.table_metadata", return_value=file_mock) + mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"}) + + catalog_name = "test_ddb_catalog" + identifier = (gcp_dataset_name, table_name) + test_catalog = BigQueryMetastoreCatalog( + catalog_name, **{"gcp.project-id": "alexstephen-test-1", "warehouse": "gs://alexstephen-test-bq-bucket/"} + ) + test_catalog.create_namespace(namespace=gcp_dataset_name) + test_catalog.create_table(identifier, table_schema_nested) + test_catalog.drop_table(identifier) + + client_mock.get_table.side_effect = NotFound("Table Not Found") + mocker.patch("pyiceberg.catalog.bigquery_metastore.Client", return_value=client_mock) + + # Expect that the table no longer exists. + try: + test_catalog.load_table(identifier) + raise AssertionError() + except NoSuchTableError: + assert True + + +def test_drop_namespace(mocker: MockFixture, gcp_dataset_name: str) -> None: + client_mock = MagicMock() + mocker.patch("pyiceberg.catalog.bigquery_metastore.Client", return_value=client_mock) + mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"}) + + catalog_name = "test_catalog" + test_catalog = BigQueryMetastoreCatalog(catalog_name, **{"gcp.project-id": "alexstephen-test-1"}) + + test_catalog.drop_namespace(gcp_dataset_name) + client_mock.delete_dataset.assert_called_once() + args, _ = client_mock.delete_dataset.call_args + assert isinstance(args[0], Dataset) + assert args[0].dataset_id == gcp_dataset_name + + +def test_list_tables(mocker: MockFixture, gcp_dataset_name: str) -> None: + client_mock = MagicMock() + + # Mock list_tables to return an iterator of TableListItem + table_list_item_1 = MagicMock() + table_list_item_1.table_id = "iceberg_table_A" + table_list_item_1.reference = TableReference( + dataset_ref=DatasetReference(project="my-project", dataset_id=gcp_dataset_name), table_id="iceberg_table_A" + ) + + table_list_item_2 = MagicMock() + table_list_item_2.table_id = "iceberg_table_B" + table_list_item_2.reference = TableReference( + dataset_ref=DatasetReference(project="my-project", dataset_id=gcp_dataset_name), table_id="iceberg_table_B" + ) + + client_mock.list_tables.return_value = iter([table_list_item_1, table_list_item_2]) + + # Mock get_table to always return a table that is considered an Iceberg table. + # The table_mock() function already creates a table with the necessary Iceberg properties. + client_mock.get_table.return_value = table_mock() + + mocker.patch("pyiceberg.catalog.bigquery_metastore.Client", return_value=client_mock) + mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"}) + + catalog_name = "test_catalog" + test_catalog = BigQueryMetastoreCatalog(catalog_name, **{"gcp.project-id": "my-project"}) + + tables = test_catalog.list_tables(gcp_dataset_name) + + # Assert that all tables returned by client.list_tables are listed + assert len(tables) == 2 + assert (gcp_dataset_name, "iceberg_table_A") in tables + assert (gcp_dataset_name, "iceberg_table_B") in tables + + client_mock.list_tables.assert_called_once_with(dataset=DatasetReference(project="my-project", dataset_id=gcp_dataset_name)) + + +def test_list_namespaces(mocker: MockFixture) -> None: + client_mock = MagicMock() + dataset_item_1 = Dataset(DatasetReference(project="my-project", dataset_id="dataset1")) + dataset_item_2 = Dataset(DatasetReference(project="my-project", dataset_id="dataset2")) + client_mock.list_datasets.return_value = iter([dataset_item_1, dataset_item_2]) + + mocker.patch("pyiceberg.catalog.bigquery_metastore.Client", return_value=client_mock) + mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"}) + + catalog_name = "test_catalog" + test_catalog = BigQueryMetastoreCatalog(catalog_name, **{"gcp.project-id": "my-project"}) + + namespaces = test_catalog.list_namespaces() + assert len(namespaces) == 2 + assert ("dataset1",) in namespaces + assert ("dataset2",) in namespaces + client_mock.list_datasets.assert_called_once() diff --git a/tests/conftest.py b/tests/conftest.py index 729e29cb0c..d72ee84da1 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2211,6 +2211,13 @@ def database_name() -> str: return (prefix + random_tag).lower() +@pytest.fixture() +def gcp_dataset_name() -> str: + prefix = "my_iceberg_database_" + random_tag = "".join(choice(string.ascii_letters) for _ in range(RANDOM_LENGTH)) + return (prefix + random_tag).lower() + + @pytest.fixture() def database_list(database_name: str) -> List[str]: return [f"{database_name}_{idx}" for idx in range(NUM_TABLES)] @@ -2237,6 +2244,13 @@ def hierarchical_namespace_list(hierarchical_namespace_name: str) -> List[str]: re.X, ) +BQ_TABLE_METADATA_LOCATION_REGEX = re.compile( + r"""gs://alexstephen-test-bq-bucket/my_iceberg_database_[a-z]{20}.db/ + my_iceberg_table-[a-z]{20}/metadata/ + [0-9]{5}-[a-f0-9]{8}-?[a-f0-9]{4}-?4[a-f0-9]{3}-?[89ab][a-f0-9]{3}-?[a-f0-9]{12}.metadata.json""", + re.X, +) + UNIFIED_AWS_SESSION_PROPERTIES = { "client.access-key-id": "client.access-key-id", "client.secret-access-key": "client.secret-access-key", @@ -2258,6 +2272,13 @@ def get_bucket_name() -> str: return bucket_name +def get_gcs_bucket_name() -> str: + bucket_name = os.getenv("GCS_TEST_BUCKET") + if bucket_name is None: + raise ValueError("Please specify a bucket to run the test by setting environment variable GCS_TEST_BUCKET") + return bucket_name + + def get_glue_endpoint() -> Optional[str]: """Set the optional environment variable AWS_TEST_GLUE_ENDPOINT for a glue endpoint to test.""" return os.getenv("AWS_TEST_GLUE_ENDPOINT") @@ -2273,6 +2294,16 @@ def get_s3_path(bucket_name: str, database_name: Optional[str] = None, table_nam return result_path +def get_gcs_path(bucket_name: str, database_name: Optional[str] = None, table_name: Optional[str] = None) -> str: + result_path = f"gcs://{bucket_name}" + if database_name is not None: + result_path += f"/{database_name}.db" + + if table_name is not None: + result_path += f"/{table_name}" + return result_path + + @pytest.fixture(name="s3", scope="module") def fixture_s3_client() -> boto3.client: """Real S3 client for AWS Integration Tests."""