diff --git a/cloudbuild.yaml b/cloudbuild.yaml index fb059491..c231b11a 100644 --- a/cloudbuild.yaml +++ b/cloudbuild.yaml @@ -34,7 +34,7 @@ steps: entrypoint: python args: ['-m', 'flake8', '.'] -timeout: 1200s +timeout: 1600s images: - 'gcr.io/${PROJECT_ID}/${_BUILD_IMG_NAME}' options: diff --git a/docker/Dockerfile b/docker/Dockerfile index 17c746de..407ef781 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -1,4 +1,4 @@ -ARG AIRFLOW_IMAGE_NAME="2.1.1-python3.8" +ARG AIRFLOW_IMAGE_NAME="2.2.5-python3.8" FROM "apache/airflow:${AIRFLOW_IMAGE_NAME}" diff --git a/docker/Dockerfile-ci b/docker/Dockerfile-ci index 129f0e0c..eb7e5ec9 100644 --- a/docker/Dockerfile-ci +++ b/docker/Dockerfile-ci @@ -1,5 +1,5 @@ # TODO: use multi-stage builds. This file is exactly the same as Dockerfile except the last line -ARG AIRFLOW_IMAGE_NAME="2.1.1-python3.8" +ARG AIRFLOW_IMAGE_NAME="2.2.5-python3.8" FROM "apache/airflow:${AIRFLOW_IMAGE_NAME}" diff --git a/gcp_airflow_foundations/common/gcp/source_schema/bq.py b/gcp_airflow_foundations/common/gcp/source_schema/bq.py index d7d3af45..77d0b611 100644 --- a/gcp_airflow_foundations/common/gcp/source_schema/bq.py +++ b/gcp_airflow_foundations/common/gcp/source_schema/bq.py @@ -1,7 +1,7 @@ import json import logging -from airflow.contrib.hooks.bigquery_hook import BigQueryHook +from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook def read_schema_from_bq( @@ -16,7 +16,7 @@ def read_schema_from_bq( Helper method to load table schema from the staging table """ - bq_hook = BigQueryHook(bigquery_conn_id=bigquery_conn_id, delegate_to=None) + bq_hook = BigQueryHook(gcp_conn_id=bigquery_conn_id, delegate_to=None) schema = bq_hook.get_schema(dataset_id=dataset_id, table_id=table_id, project_id=project_id) diff --git a/gcp_airflow_foundations/common/gcp/source_schema/gcs.py b/gcp_airflow_foundations/common/gcp/source_schema/gcs.py index 2e2f2825..f3fe1741 100644 --- a/gcp_airflow_foundations/common/gcp/source_schema/gcs.py +++ b/gcp_airflow_foundations/common/gcp/source_schema/gcs.py @@ -1,8 +1,8 @@ import json import logging -from airflow.contrib.hooks.bigquery_hook import BigQueryHook -from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook +from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook +from airflow.providers.google.cloud.hooks.gcs import GCSHook from urllib.parse import urlparse @@ -21,8 +21,8 @@ def read_schema_from_gcs( gcs_bucket = parsed_url.netloc gcs_object = parsed_url.path.lstrip("/") - gcs_hook = GoogleCloudStorageHook( - google_cloud_storage_conn_id=google_cloud_storage_conn_id, delegate_to=None + gcs_hook = GCSHook( + gcp_conn_id=google_cloud_storage_conn_id, delegate_to=None ) schema_fields = json.loads( diff --git a/gcp_airflow_foundations/operators/api/operators/twilio_operator.py b/gcp_airflow_foundations/operators/api/operators/twilio_operator.py index ea59320f..325d7314 100644 --- a/gcp_airflow_foundations/operators/api/operators/twilio_operator.py +++ b/gcp_airflow_foundations/operators/api/operators/twilio_operator.py @@ -1,11 +1,11 @@ import json import logging -from airflow.contrib.hooks.bigquery_hook import BigQueryHook +from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook from airflow.models.baseoperator import BaseOperator -from airflow.utils.decorators import apply_defaults +# from airflow.utils.decorators import apply_defaults from gcp_airflow_foundations.operators.api.hooks.twilio_hook import TwilioHook from urllib.parse import urlparse @@ -31,7 +31,7 @@ class TwilioToBigQueryOperator(BaseOperator): template_fields = ("dataset_id", "table_id", "project_id", "labels") # pylint: disable=too-many-arguments - @apply_defaults + # @apply_defaults def __init__( self, twilio_account_sid, diff --git a/gcp_airflow_foundations/operators/facebook/operators/facebook_ads_to_gcs.py b/gcp_airflow_foundations/operators/facebook/operators/facebook_ads_to_gcs.py index 60ed1169..b03f7391 100644 --- a/gcp_airflow_foundations/operators/facebook/operators/facebook_ads_to_gcs.py +++ b/gcp_airflow_foundations/operators/facebook/operators/facebook_ads_to_gcs.py @@ -19,7 +19,7 @@ from gcp_airflow_foundations.enums.facebook import AccountLookupScope, ApiObject from airflow.models import BaseOperator, Variable -from airflow.contrib.hooks.bigquery_hook import BigQueryHook +from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook from google.cloud import bigquery diff --git a/gcp_airflow_foundations/operators/gcp/dlp/dlp_to_datacatalog_taskgroup.py b/gcp_airflow_foundations/operators/gcp/dlp/dlp_to_datacatalog_taskgroup.py index cd9f4f46..0d74d391 100644 --- a/gcp_airflow_foundations/operators/gcp/dlp/dlp_to_datacatalog_taskgroup.py +++ b/gcp_airflow_foundations/operators/gcp/dlp/dlp_to_datacatalog_taskgroup.py @@ -1,6 +1,6 @@ import logging from airflow.models import BaseOperator -from airflow.operators.python_operator import PythonOperator +from airflow.operators.python import PythonOperator from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook from airflow.providers.google.cloud.operators.bigquery import ( BigQueryDeleteTableOperator, diff --git a/gcp_airflow_foundations/operators/gcp/hds/hds_merge_table_operator.py b/gcp_airflow_foundations/operators/gcp/hds/hds_merge_table_operator.py index 1d8d4ecf..3368c4d9 100644 --- a/gcp_airflow_foundations/operators/gcp/hds/hds_merge_table_operator.py +++ b/gcp_airflow_foundations/operators/gcp/hds/hds_merge_table_operator.py @@ -2,13 +2,13 @@ from datetime import datetime from airflow.models import BaseOperator, BaseOperatorLink -from airflow.contrib.operators.bigquery_operator import ( - BigQueryOperator, +from airflow.providers.google.cloud.operators.bigquery import ( + BigQueryInsertJobOperator, BigQueryCreateEmptyTableOperator, ) -from airflow.utils.decorators import apply_defaults -from airflow.contrib.hooks.bigquery_hook import BigQueryHook +# from airflow.utils.decorators import apply_defaults +# from airflow.contrib.hooks.bigquery_hook import BigQueryHook from airflow.exceptions import AirflowException @@ -22,7 +22,7 @@ from gcp_airflow_foundations.enums.ingestion_type import IngestionType -class MergeBigQueryHDS(BigQueryOperator): +class MergeBigQueryHDS(BigQueryInsertJobOperator): """ Merges data into a BigQuery HDS table. @@ -56,10 +56,11 @@ class MergeBigQueryHDS(BigQueryOperator): template_fields = ("stg_table_name", "data_table_name", "stg_dataset_name") - @apply_defaults + # @apply_defaults def __init__( self, *, + task_id: str, project_id: str, stg_table_name: str, data_table_name: str, @@ -89,6 +90,7 @@ def __init__( sql="", **kwargs, ) + self.task_id = task_id, self.project_id = project_id self.stg_table_name = stg_table_name self.data_table_name = data_table_name diff --git a/gcp_airflow_foundations/operators/gcp/ods/ods_merge_table_operator.py b/gcp_airflow_foundations/operators/gcp/ods/ods_merge_table_operator.py index 987e8aea..ac9eedbc 100644 --- a/gcp_airflow_foundations/operators/gcp/ods/ods_merge_table_operator.py +++ b/gcp_airflow_foundations/operators/gcp/ods/ods_merge_table_operator.py @@ -2,13 +2,13 @@ from datetime import datetime from airflow.models import BaseOperator, BaseOperatorLink -from airflow.contrib.operators.bigquery_operator import ( - BigQueryOperator, +from airflow.providers.google.cloud.operators.bigquery import ( + BigQueryInsertJobOperator, BigQueryCreateEmptyTableOperator, ) -from airflow.utils.decorators import apply_defaults -from airflow.contrib.hooks.bigquery_hook import BigQueryHook +# from airflow.utils.decorators import apply_defaults +# from airflow.contrib.hooks.bigquery_hook import BigQueryHook from airflow.exceptions import AirflowException @@ -21,7 +21,7 @@ from gcp_airflow_foundations.enums.ingestion_type import IngestionType -class MergeBigQueryODS(BigQueryOperator): +class MergeBigQueryODS(BigQueryInsertJobOperator): """ Merges data into a BigQuery ODS table. @@ -55,10 +55,11 @@ class MergeBigQueryODS(BigQueryOperator): template_fields = ("stg_table_name", "data_table_name", "stg_dataset_name") - @apply_defaults + # @apply_defaults def __init__( self, *, + task_id: Optional[str] = None, project_id: str, stg_table_name: str, data_table_name: str, @@ -78,6 +79,7 @@ def __init__( **kwargs, ) -> None: super(MergeBigQueryODS, self).__init__( + task_id=task_id, delegate_to=delegate_to, gcp_conn_id=gcp_conn_id, use_legacy_sql=False, @@ -87,6 +89,7 @@ def __init__( sql="", **kwargs, ) + self.project_id = project_id self.stg_table_name = stg_table_name self.data_table_name = data_table_name diff --git a/gcp_airflow_foundations/operators/gcp/schema_parsing/schema_parsing_operator.py b/gcp_airflow_foundations/operators/gcp/schema_parsing/schema_parsing_operator.py index d5a85125..bd954766 100644 --- a/gcp_airflow_foundations/operators/gcp/schema_parsing/schema_parsing_operator.py +++ b/gcp_airflow_foundations/operators/gcp/schema_parsing/schema_parsing_operator.py @@ -6,8 +6,8 @@ BigQueryCreateEmptyTableOperator, ) -from airflow.utils.decorators import apply_defaults -from airflow.contrib.hooks.bigquery_hook import BigQueryHook +# from airflow.utils.decorators import apply_defaults +# from airflow.contrib.hooks.bigquery_hook import BigQueryHook from airflow.exceptions import AirflowException @@ -23,7 +23,7 @@ class ParseSchema(BaseOperator): - @apply_defaults + # @apply_defaults def __init__( self, *, diff --git a/requirements.txt b/requirements.txt index 5bc3bce2..aca6fb6f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,66 +1,75 @@ -dacite==1.5.1 -pydantic==1.8.2 -pyyaml==5.4.1 -wheel==0.36.2 -black==19.10b0 -configparser>=3.5.0 -pyclean>=2.0.0 -flake8>=3.8.0 -pre-commit>=1.18.3 -croniter>=1.0.15 -facebook-business>=6.0.2 -google-cloud-bigquery==2.28.1 -pyarrow>=4.0.1 -facebook-business>=6.0.2 -apache-airflow-providers-salesforce==3.3.0 -apache-airflow-providers-apache-beam==3.1.0 -apache-airflow-providers-sftp==2.0.0 -apache-airflow-providers-ssh==2.1.0 -paramiko>=2.6.0 -pysftp>=0.2.9 -sshtunnel>=0.1.4,<0.2 - -apache-airflow-providers-facebook==2.1.0 -apache-airflow-providers-google==6.3.0 -pyopenssl==20.0.1 -google-ads==14.0.0 -google-api-core[grpc,grpcgcp]==1.31.5 -google-api-python-client==1.12.8 -google-auth-httplib2==0.1.0 -google-auth==1.35.0 -google-cloud-automl==2.4.2 -google-cloud-bigtable==1.7.0 -google-cloud-bigquery==2.28.1 -google-cloud-build==3.0.0 -google-cloud-container==1.0.1 -google-cloud-datacatalog==3.4.1 -google-cloud-dataproc==3.1.0 -google-cloud-dlp==1.0.0 -google-cloud-kms==2.6.0 -google-cloud-language==1.3.0 -google-cloud-logging==2.6.0 -google-cloud-memcache==1.0.0 -google-cloud-monitoring==2.5.0 -google-cloud-os-login==2.3.1 -google-cloud-pubsub==2.8.0 -google-cloud-redis==2.2.2 -google-cloud-secret-manager==1.0.0 -google-cloud-spanner==1.19.1 -google-cloud-speech==1.3.2 -google-cloud-storage==1.42.2 -google-cloud-tasks==2.5.1 -google-cloud-texttospeech==1.0.1 -google-cloud-translate==1.7.0 -google-cloud-videointelligence==1.16.1 -google-cloud-vision==1.0.0 -google-cloud-workflows==1.2.1 -grpcio-gcp==0.2.2 +SQLAlchemy==1.3.23 +aiohttp==3.8.1 +aiosignal==1.2.0 +apache-airflow-providers-apache-beam==4.0.0 +apache-airflow-providers-common-sql==1.0.0 +apache-airflow-providers-facebook==3.0.1 +apache-airflow-providers-google==8.2.0 +apache-airflow-providers-salesforce==5.0.0 +apache-beam==2.40.0 +appdirs==1.4.4 +async-timeout==4.0.2 +backoff==1.8.0 +black==19.10b0 +cached-property==1.5.2 +cfgv==3.3.1 +cloudpickle==2.1.0 +configparser==5.2.0 +crcmod==1.7 +curlify==2.2.1 +dacite==1.5.1 +docopt==0.6.2 +et-xmlfile==1.1.0 +facebook-business==14.0.0 +fastavro==1.5.4 +frozenlist==1.3.1 +gcp-airflow-foundations +google-ads==17.0.0 +google-api-core==2.8.2 +google-cloud-bigtable==1.7.2 +google-cloud-container==2.11.0 +google-cloud-core==2.3.2 +google-cloud-dlp==1.0.2 +google-cloud-language==1.3.2 +google-cloud-secret-manager==1.0.2 +google-cloud-spanner==1.19.3 +google-cloud-speech==1.3.4 +google-cloud-texttospeech==1.0.3 +google-cloud-translate==1.7.2 +google-cloud-videointelligence==1.16.3 +google-cloud-vision==1.0.2 +googleapis-common-protos==1.56.4 +grpc-google-iam-v1==0.12.4 +hdfs==2.7.0 +httpcore==0.13.7 httpx==0.19.0 -json-merge-patch==0.2 -pandas-gbq==0.14.1 -pandas==1.3.5 +identify==2.5.2 +multidict==6.0.2 +nodeenv==1.7.0 +numpy==1.21.0 +oauth2client==4.1.3 openpyxl==3.0.9 -pyparsing>=2.4.2 -numpy==1.21 +orjson==3.7.11 +pandas-gbq==0.14.1 +pathspec==0.9.0 +pre-commit==2.20.0 +proto-plus==1.19.6 +protobuf==3.20.0 +pyclean==2.2.0 +pycountry==22.3.5 +pydantic==1.8.2 +pydot==1.4.2 +pymongo==3.12.3 +pyopenssl==20.0.1 +regex==2022.7.25 +requests-file==1.5.1 +requests-toolbelt==0.9.1 +simple-salesforce==1.12.1 +toml==0.10.2 +twitter-ads==10.0.0 +typed-ast==1.5.4 +wheel==0.36.2 +yarl==1.8.1 +zeep==4.1.0 diff --git a/tests/integration/hds/test_hds_upsert_scd2.py b/tests/integration/hds/test_hds_upsert_scd2.py index 48d582cd..de42a7ce 100644 --- a/tests/integration/hds/test_hds_upsert_scd2.py +++ b/tests/integration/hds/test_hds_upsert_scd2.py @@ -77,7 +77,7 @@ def doCleanups(self): cleanup_xcom() clear_db_dags() - BigQueryHook().run_copy( + BigQueryHook().insert_job( source_project_dataset_tables="airflow-framework.test_tables.ga_sessions_HDS", destination_project_dataset_table=f"{PROJECT_ID}.{DATASET}.{self.table_id}", write_disposition="WRITE_TRUNCATE", @@ -155,7 +155,7 @@ def doCleanups(self): cleanup_xcom() clear_db_dags() - BigQueryHook().run_copy( + BigQueryHook().insert_job( source_project_dataset_tables="airflow-framework.test_tables.ga_sessions_HDS", destination_project_dataset_table=f"{PROJECT_ID}.{DATASET}.{self.table_id}", write_disposition="WRITE_TRUNCATE", diff --git a/tests/integration/hds/test_hds_upsert_snapshot.py b/tests/integration/hds/test_hds_upsert_snapshot.py index 0fed26ac..344007ed 100644 --- a/tests/integration/hds/test_hds_upsert_snapshot.py +++ b/tests/integration/hds/test_hds_upsert_snapshot.py @@ -77,7 +77,7 @@ def doCleanups(self): cleanup_xcom() clear_db_dags() - BigQueryHook().run_query( + BigQueryHook().insert_job( sql="""SELECT * EXCEPT(af_metadata_expired_at), TIMESTAMP_TRUNC('2017-07-31T00:00:00+00:00', DAY) AS partition_time FROM `airflow-framework.test_tables.ga_sessions_HDS`""", use_legacy_sql=False, destination_dataset_table=f"{DATASET}.{self.table_id}", diff --git a/tests/integration/ods/test_ods_upsert.py b/tests/integration/ods/test_ods_upsert.py index 6e751e44..c7d2d83f 100644 --- a/tests/integration/ods/test_ods_upsert.py +++ b/tests/integration/ods/test_ods_upsert.py @@ -74,7 +74,7 @@ def doCleanups(self): cleanup_xcom() clear_db_dags() - BigQueryHook().run_copy( + BigQueryHook().insert_job( source_project_dataset_tables="airflow-framework.test_tables.ga_sessions_ODS", destination_project_dataset_table=f"{PROJECT_ID}.{DATASET}.{self.table_id}", write_disposition="WRITE_TRUNCATE", @@ -144,7 +144,7 @@ def doCleanups(self): cleanup_xcom() clear_db_dags() - BigQueryHook().run_copy( + BigQueryHook().insert_job( source_project_dataset_tables="airflow-framework.test_tables.ga_sessions_ODS", destination_project_dataset_table=f"{PROJECT_ID}.{DATASET}.{self.table_id}", write_disposition="WRITE_TRUNCATE", diff --git a/tests/integration/schema/test_schema_migration.py b/tests/integration/schema/test_schema_migration.py index 4708d16b..fdac0e2b 100644 --- a/tests/integration/schema/test_schema_migration.py +++ b/tests/integration/schema/test_schema_migration.py @@ -155,7 +155,7 @@ def doCleanups(self): cleanup_xcom() clear_db_dags() - BigQueryHook().run_copy( + BigQueryHook().insert_job( source_project_dataset_tables="airflow-framework.test_tables.ga_sessions_ODS", destination_project_dataset_table=f"{self.source_config.gcp_project}.{self.source_config.dataset_data_name}.{self.table_id}", write_disposition="WRITE_TRUNCATE", diff --git a/tests/test_utils/bq_test_utils.py b/tests/test_utils/bq_test_utils.py index bcbcdce0..9dfe5596 100644 --- a/tests/test_utils/bq_test_utils.py +++ b/tests/test_utils/bq_test_utils.py @@ -2,7 +2,7 @@ from google.cloud.bigquery import SchemaField import pandas from time import sleep -from airflow.contrib.hooks.bigquery_hook import BigQueryHook +from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook def insert_to_bq_from_csv(csv, project_id, dataset_id, table_id):