diff --git a/python/GCP_BigQuery2ADW/BigQuery2ADW_DFNotebook.ipynb b/python/GCP_BigQuery2ADW/BigQuery2ADW_DFNotebook.ipynb new file mode 100644 index 0000000..79e1503 --- /dev/null +++ b/python/GCP_BigQuery2ADW/BigQuery2ADW_DFNotebook.ipynb @@ -0,0 +1,479 @@ +{ + "cells": [ + { + "attachments": {}, + "cell_type": "markdown", + "id": "eea23a34", + "metadata": {}, + "source": [ + "### Use ReadBigQuery_FinalNotebook.ipynb to Read GCP BigQuery Table from OCI Data Flow. And write it down to OCI Object Storage or ADW. " + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "c5c02083", + "metadata": {}, + "outputs": [], + "source": [ + "import ads\n", + "ads.set_auth(\"resource_principal\") # Supported values: resource_principal, api_key" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4a9179ff", + "metadata": {}, + "outputs": [], + "source": [ + "%load_ext dataflow.magics" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3fd85c20", + "metadata": {}, + "outputs": [], + "source": [ + "import datetime\n", + "print(\"Start creating Session : \",datetime.datetime.now())" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "id": "ad6b93cc", + "metadata": {}, + "source": [ + "### Create OCI Dataflow Session using LIVY service through OCI Data SCience Notebook." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5d1e7932", + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "import json\n", + "command = {\n", + " \"compartmentId\": \"ocid1.compartment.oc1..xxxxxxx\",\n", + " \"displayName\": \"Demo_BigQuery_v1\",\n", + " \"sparkVersion\": \"3.2.1\",\n", + " \"driverShape\": \"VM.Standard.E3.Flex\",\n", + " \"executorShape\": \"VM.Standard.E3.Flex\",\n", + " \"driverShapeConfig\":{\"ocpus\":1,\"memoryInGBs\":16},\n", + " \"executorShapeConfig\":{\"ocpus\":1,\"memoryInGBs\":16},\n", + " \"numExecutors\": 1,\n", + " \"configuration\":{\"spark.archives\":\"oci://demo-ds-conda-env@osnamespace/conda_environments/cpu/PySpark 3.2 and Data Flow/1.0/pyspark32_p38_cpu_v1#conda\",\n", + " \"spark.files\":\"oci://demo-ds-conda-env@osnamespace/gcp_utility/BigQuery/ocigcp_user_creds.json\",\n", + " \"spark.oracle.datasource.enabled\":\"true\",\n", + " \"spark.hadoop.google.cloud.auth.service.account.enable\":\"true\",\n", + " \"spark.jars\":\"oci://demo-ds-conda-env@osnamespace/gcp_utility/BigQuery/bigquery_spark-bigquery-with-dependencies_2.12-0.23.2.jar\"\n", + " }\n", + "}\n", + "command = f'\\'{json.dumps(command)}\\''\n", + "print(\"command\",command)\n", + "\n", + "#enableHiveSupport()\n", + "\n", + "%create_session -l python -c $command" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "45a262e6", + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "print(\"Session is Created : \",datetime.datetime.now())" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "190da379", + "metadata": {}, + "outputs": [], + "source": [ + "%help" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5d4ca141", + "metadata": {}, + "outputs": [], + "source": [ + "%%spark\n", + "#Import required libraries.\n", + "\n", + "import json\n", + "import os\n", + "import sys\n", + "import datetime\n", + "import oci\n", + "import google.cloud.bigquery as bigquery\n", + "import google.cloud\n", + "import pyspark.sql \n", + "from pyspark.sql.functions import countDistinct\n" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "id": "e5f5743e", + "metadata": {}, + "source": [ + "### Read GCP BigQuery Table" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "988bf05f", + "metadata": {}, + "outputs": [], + "source": [ + "%%spark\n", + "# Read from BigQuery : \"bitcoin_blockchain.transactions\". i.e. At Source \"BigQuery\"\n", + "#Number of rows : 340,311,544\n", + "#Total logical bytes : 587.14 GB\n", + " \n", + "df_bitcoin_blockchain = spark.read.format('bigquery').option('project','bigquery-public-data').option('parentProject','core-invention-366213').option(\"credentialsFile\",\"/opt/spark/work-dir/ocigcp_user_creds.json\").option('table','bitcoin_blockchain.transactions').load()\n", + "print(\"Total Records Count bitcoin_blockchain.transactions : \",df.count())" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6e074b72", + "metadata": {}, + "outputs": [], + "source": [ + "%%spark\n", + "\n", + "#Read another BigQuery Table\n", + "df_RetailPOS_15min = spark.read.format('bigquery').option('project','core-invention-366213').option('parentProject','core-invention-366213').option(\"credentialsFile\",\"/opt/spark/work-dir/ocigcp_user_creds.json\").option('table','Retail_Channel.RetailPOS_15min').load()\n", + "df_RetailPOS_15min.show()" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "id": "94373b84", + "metadata": {}, + "source": [ + "### Load Data into Object Storage " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "76bd02b1", + "metadata": {}, + "outputs": [], + "source": [ + "%%spark\n", + "\n", + "#Write in Object Storage \n", + "print(\"Writing Started at : \",datetime.datetime.now())\n", + "df_RetailPOS_15min.write.format(\"json\").option(\"mode\",\"overwrite\").save(\"oci://ds-Raw@osnamespace/TargetData/bitcoin_blockchain_transactions\")\n", + "print(\"Writing Completed at : \",datetime.datetime.now())" + ] + }, + { + "cell_type": "markdown", + "id": "6a4c8334", + "metadata": {}, + "source": [ + "### Load Data into ADW using Wallet Password" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "70411140", + "metadata": {}, + "outputs": [], + "source": [ + "%%spark\n", + "print(\"Set Parameters for ADW connectivity.\")\n", + "#ADB_ID = \"ocid1.autonomousdatabase.oc1.phx.xxxxx\"\n", + "USERNAME = \"admin\"\n", + "PASSWORD = \"xxxxx\"\n", + "connectionId= \"demolakehouseadw_medium\"\n", + "walletUri = \"oci://demo-ds-conda-env@osnamespace/oci_utility/Wallet_DemoLakeHouseADW.zip\"\n", + "\n", + "#properties = {\"adbId\": ADB_ID,\"user\" : USERNAME,\"password\": PASSWORD}\n", + "properties = {\"connectionId\": connectionId,\"user\" : USERNAME,\"password\": PASSWORD,\"walletUri\": walletUri}\n", + "print(\"properties:\",properties)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b1516048", + "metadata": {}, + "outputs": [], + "source": [ + "%%spark\n", + "\n", + "#Load into ADW:\n", + "TARGET_TABLE = \"ADMIN.RETAILPOS_15MINUTES\"\n", + "print(\"TARGET_TABLE : \",TARGET_TABLE)\n", + "\n", + "# Write to ADW.\n", + "print(\"Write to ADW : \")\n", + "df_RetailPOS_15min.write.format(\"oracle\").mode(\"append\").option(\"dbtable\",TARGET_TABLE).options(**properties).save()\n", + "print(\"Writing completed to ADW.....\")" + ] + }, + { + "cell_type": "markdown", + "id": "8d66b823", + "metadata": {}, + "source": [ + "### Load Data into ADW using Secret Vault for Wallet" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3f97f84e", + "metadata": {}, + "outputs": [], + "source": [ + "%%spark\n", + "\n", + "def get_authenticated_client(token_path, client, file_location=None, profile_name=None):\n", + " \"\"\"\n", + " Get an an authenticated OCI client.\n", + " Example: get_authenticated_client(token_path, oci.object_storage.ObjectStorageClient)\n", + " \"\"\"\n", + " import oci\n", + "\n", + " if not in_dataflow():\n", + " # We are running locally, use our API Key.\n", + " if file_location is None:\n", + " file_location = oci.config.DEFAULT_LOCATION\n", + " if profile_name is None:\n", + " profile_name = oci.config.DEFAULT_PROFILE\n", + " config = oci.config.from_file(file_location=file_location, profile_name=profile_name)\n", + " authenticated_client = client(config)\n", + " else:\n", + " # We are running in Data Flow, use our Delegation Token.\n", + " with open(token_path) as fd:\n", + " delegation_token = fd.read()\n", + " signer = oci.auth.signers.InstancePrincipalsDelegationTokenSigner(\n", + " delegation_token=delegation_token\n", + " )\n", + " authenticated_client = client(config={}, signer=signer)\n", + " return authenticated_client\n", + "\n", + "\n", + "def get_password_from_secrets(token_path, password_ocid):\n", + " \"\"\"\n", + " Get a password from the OCI Secrets Service.\n", + " \"\"\"\n", + " import base64\n", + " import oci\n", + "\n", + " secrets_client = get_authenticated_client(token_path, oci.secrets.SecretsClient)\n", + " response = secrets_client.get_secret_bundle(password_ocid)\n", + " base64_secret_content = response.data.secret_bundle_content.content\n", + " base64_secret_bytes = base64_secret_content.encode(\"ascii\")\n", + " base64_message_bytes = base64.b64decode(base64_secret_bytes)\n", + " secret_content = base64_message_bytes.decode(\"ascii\")\n", + " return secret_content\n", + "\n", + "\n", + "def get_delegation_token_path(spark):\n", + " \"\"\"\n", + " Get the delegation token path when we're running in Data Flow.\n", + " \"\"\"\n", + " if not in_dataflow():\n", + " return None\n", + " token_key = \"spark.hadoop.fs.oci.client.auth.delegationTokenPath\"\n", + " token_path = spark.sparkContext.getConf().get(token_key)\n", + " if not token_path:\n", + " raise Exception(f\"{token_key} is not set\")\n", + " return token_path \n", + "\n", + "def get_temporary_directory():\n", + " if in_dataflow():\n", + " return \"/opt/spark/work-dir/\"\n", + " else:\n", + " import tempfile\n", + " return tempfile.gettempdir()\n", + "\n", + "def in_dataflow():\n", + " \"\"\"\n", + " Determine if we are running in OCI Data Flow by checking the environment.\n", + " \"\"\"\n", + " if os.environ.get(\"HOME\") == \"/home/dataflow\":\n", + " return True\n", + " return False\n", + "\n", + "\n", + "def download_wallet(spark, wallet_path):\n", + " \"\"\"\n", + " Download an ADW/ATP wallet file and prepare it for use in a Data Flow\n", + " application.\n", + " \"\"\"\n", + " import oci\n", + " import zipfile\n", + "\n", + " # Get an object store client.\n", + " token_path = get_delegation_token_path(spark)\n", + " object_store_client = get_authenticated_client(\n", + " token_path, oci.object_storage.ObjectStorageClient\n", + " )\n", + "\n", + " # Download the wallet file.\n", + " from urllib.parse import urlparse\n", + " parsed = urlparse(wallet_path)\n", + " bucket_name, namespace = parsed.netloc.split(\"@\")\n", + " file_name = parsed.path[1:]\n", + " response = object_store_client.get_object(namespace, bucket_name, file_name)\n", + " temporary_directory = get_temporary_directory()\n", + " zip_file_path = os.path.join(temporary_directory, \"wallet.zip\")\n", + " with open(zip_file_path, \"wb\") as fd:\n", + " for chunk in response.data.raw.stream(1024 * 1024, decode_content=False):\n", + " fd.write(chunk)\n", + "\n", + " # Extract everything locally.\n", + " with zipfile.ZipFile(zip_file_path, \"r\") as zip_ref:\n", + " zip_ref.extractall(temporary_directory)\n", + "\n", + " # Distribute all wallet files.\n", + " contents = \"cwallet.sso ewallet.p12 keystore.jks ojdbc.properties sqlnet.ora tnsnames.ora truststore.jks\".split()\n", + " spark_context = spark.sparkContext\n", + " for file in contents:\n", + " spark_context.addFile(os.path.join(temporary_directory, file))\n", + "\n", + " return temporary_directory" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9647d4dd", + "metadata": {}, + "outputs": [], + "source": [ + "%%spark\n", + "PASSWORD_SECRET_OCID = \"ocid1.vaultsecret.oc1.phx.xxxxxxx\"\n", + "TARGET_TABLE = \"ADMIN.RETAILPOS_15MINUTES\"\n", + "TNSNAME = \"demolakehouseadw_medium\"\n", + "USER = \"admin\"\n", + "WALLET_PATH = \"oci://demo-ds-conda-env@osnamespace/oci_utility/Wallet_DemoLakeHouseADW.zip\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "22eddb2b", + "metadata": {}, + "outputs": [], + "source": [ + "%%spark\n", + "# Download and distribute our wallet file.\n", + "print(\"Downloading wallet\")\n", + "wallet_path = download_wallet(spark, WALLET_PATH)\n", + "adw_url = \"jdbc:oracle:thin:@{}?TNS_ADMIN={}\".format(TNSNAME, wallet_path)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2953880e", + "metadata": {}, + "outputs": [], + "source": [ + "%%spark\n", + "# Get our password using the secret service.\n", + "print(\"Getting wallet password\")\n", + "token_path = get_delegation_token_path(spark)\n", + "password = get_password_from_secrets(token_path, PASSWORD_SECRET_OCID)\n", + "print(\"Done getting wallet password\")\n", + "\n", + "\n", + "# Save the results to the database.\n", + "print(\"Saving processed data to \" + adw_url)\n", + "properties = {\n", + " \"driver\": \"oracle.jdbc.driver.OracleDriver\",\n", + " \"oracle.net.tns_admin\": TNSNAME,\n", + " \"password\": password,\n", + " \"user\": USER\n", + "}" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "affdad7a", + "metadata": {}, + "outputs": [], + "source": [ + "%%spark\n", + "\n", + "#Load into ADW:\n", + "\n", + "TARGET_TABLE = \"ADMIN.RETAILPOS_15MINUTES\"\n", + "print(\"TARGET_TABLE : \",TARGET_TABLE)\n", + "\n", + "# Write to ADW.\n", + "print(\"Write to ADW : \")\n", + "df_RetailPOS_15min.write.jdbc(url=adw_url, table=TARGET_TABLE, mode=\"Append\", properties=properties)\n", + "print(\"Writing done to ADW : \")\n" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "id": "aa5adff5", + "metadata": {}, + "source": [ + "### Stop OCI Data Flow Session" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fb109465", + "metadata": {}, + "outputs": [], + "source": [ + "%stop_session" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python [conda env:pyspark32_p38_cpu_v1]", + "language": "python", + "name": "conda-env-pyspark32_p38_cpu_v1-py" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.12" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/python/GCP_BigQuery2ADW/README.md b/python/GCP_BigQuery2ADW/README.md new file mode 100644 index 0000000..7d9abc1 --- /dev/null +++ b/python/GCP_BigQuery2ADW/README.md @@ -0,0 +1,53 @@ +# Overview + +This Blog will demonstrate how to connect GCP Bigquery from OCI DataFlow Spark Notebook. Following that will perform some read operation on Bigquery Table using Spark & write down the resultant spark dataframe to OCI Object Storage & also on Autonomous Datawarehouse. + +![Convert CSV Data to Parquet](./images/GCP_BigQuery_To_OCI_ADW.png) + +## Prerequisites + +Before you begin : +1. Assuming you already have active OCI & GCP Subscription &have access to portal. + +2. Setup OCI Data Flow, OCI Object Storage Bucket and OCI Data Science Notebook. + + Refer [instructions] : (https://docs.oracle.com/en-us/iaas/data-flow/using/data-flow-studio.htm#data-flow-studio) + +3. Create & Download "Google API JSON Key secret OCID" for the Project where BigQuery Database is residing on GCP. + +4. Upload the "Google API JSON Key secret OCID" to OCI Object Storage + + "oci://demo-bucketname@OSnamespace/gcp_utility/BigQuery/ocigcp_user_creds.json" + +5. Download Spark BigQuery Jar and upload it to Object Storage: + + Sample: spark-bigquery-with-dependencies_2.12-0.23.2.jar + + [Download Spark BigQuery Jar] : https://mvnrepository.com/artifact/com.google.cloud.spark/spark-bigquery-with-dependencies_2.12/0.23.0 + + +6. Collect below parameters for you GCP BigQuery Table. + + 'project' : 'bigquery-public-data' + 'parentProject' : 'core-invention-366213' + 'table' : 'bitcoin_blockchain.transactions' + "credentialsFile" : "./ocigcp_user_creds.json" + +7. Download ADW Wallet from OCI Portal & keep the User & Password handy. + +## + +## Access GCP BigQuery Using OCI Data Science Notebook with OCI Data Flow: + +1. Open OCI Data Science Session, where you have already created Conda environment for OCI Data Flow. [Refer] : Prerequisite Point 2. + +2. Open New Notebook with DataFlow as Kernel. +3. Now, Create livy session for OCI Data Flow & provide other required information including GCP BigQuery. + + spark.archives : oci://demo-ds-conda-env@OSNameSpace/conda_environments/cpu/PySpark 3.2 and Data Flow/1.0/pyspark32_p38_cpu_v1#conda + spark.files : oci://demo-ds-conda-env@OSNameSpace/gcp_utility/BigQuery/ocigcp_user_creds.json + spark.jars : oci://demo-ds-conda-env@OSNameSpace/gcp_utility/BigQuery/bigquery_spark-bigquery-with-dependencies_2.12-0.23.2.jar + spark.oracle.datasource.enabled : true + +Use [ReadBigQuery_FinalNotebook.ipynb] to Access GCP BigQuery Table from OCI Data Flow. And write it down to OCI Object Storage or ADW. + diff --git a/python/GCP_BigQuery2ADW/images/GCP_BigQuery_To_OCI_ADW.png b/python/GCP_BigQuery2ADW/images/GCP_BigQuery_To_OCI_ADW.png new file mode 100644 index 0000000..a8605ee Binary files /dev/null and b/python/GCP_BigQuery2ADW/images/GCP_BigQuery_To_OCI_ADW.png differ