diff --git a/docker-compose.yaml b/docker-compose.yaml index 04ddc062..1590c6d0 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -16,6 +16,7 @@ x-airflow-common: - ./variables/docker-env-secrets # added to gitignore volumes: - ./dags:/opt/airflow/dags + - ./gcp_airflow_foundations_config:/opt/airflow/gcp_airflow_foundations_config - ./logs:/opt/airflow/logs - ./plugins:/opt/airflow/plugins - ./variables:/opt/airflow/variables diff --git a/docker/Dockerfile b/docker/Dockerfile index e48f3d1f..46974db7 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -5,7 +5,7 @@ FROM "apache/airflow:${AIRFLOW_IMAGE_NAME}" # - Install GCP util RUN curl -sSL https://sdk.cloud.google.com | bash ENV PATH $PATH:/home/airflow/google-cloud-sdk/bin - +ENV PKG_NAME gcp_airflow_foundations # - Copy a custom airflow config file #COPY airflow.cfg ${AIRFLOW_HOME}/airflow.cfg @@ -15,16 +15,16 @@ ENV PATH $PATH:/home/airflow/google-cloud-sdk/bin RUN mkdir /opt/airflow/gcp_airflow_foundations # Copying only files essential for installing gcp_airflow_foundations -COPY setup.py setup.cfg README.md MANIFEST.in requirements.txt requirements-providers.txt requirements-ci.txt /opt/airflow/ +COPY setup.py setup.cfg README.md MANIFEST.in requirements.txt requirements-providers.txt requirements-test.txt /opt/airflow/ #COPY tests /opt/airflow/tests COPY gcp_airflow_foundations/__init__.py /opt/airflow/gcp_airflow_foundations/__init__.py COPY gcp_airflow_foundations/version.py /opt/airflow/gcp_airflow_foundations/version.py - +COPY gcp_airflow_foundations_config/framework.yaml /opt/airflow/gcp_airflow_foundations_config/framework.yaml WORKDIR /opt/airflow # USER root RUN pip install --upgrade pip - +RUN pip install pyyaml # - Install dependencies RUN pip install -e .[providers,test] diff --git a/docker/Dockerfile-ci b/docker/Dockerfile-ci index 52016226..eba5c77d 100644 --- a/docker/Dockerfile-ci +++ b/docker/Dockerfile-ci @@ -6,6 +6,7 @@ FROM "apache/airflow:${AIRFLOW_IMAGE_NAME}" # - Install GCP util RUN curl -sSL https://sdk.cloud.google.com | bash ENV PATH $PATH:/home/airflow/google-cloud-sdk/bin +ENV PKG_NAME gcp_airflow_foundations # - Copy a custom airflow config file #COPY airflow.cfg ${AIRFLOW_HOME}/airflow.cfg @@ -16,18 +17,19 @@ ENV PATH $PATH:/home/airflow/google-cloud-sdk/bin RUN mkdir /opt/airflow/gcp_airflow_foundations # Copying only files essential for installing gcp_airflow_foundations -COPY setup.py setup.cfg README.md MANIFEST.in requirements.txt requirements-providers.txt requirements-ci.txt /opt/airflow/ +COPY setup.py setup.cfg README.md MANIFEST.in requirements.txt requirements-providers.txt requirements-test.txt /opt/airflow/ COPY gcp_airflow_foundations/__init__.py /opt/airflow/gcp_airflow_foundations/__init__.py COPY gcp_airflow_foundations/version.py /opt/airflow/gcp_airflow_foundations/version.py +COPY gcp_airflow_foundations_config/framework.yaml /opt/airflow/gcp_airflow_foundations_config/framework.yaml WORKDIR /opt/airflow RUN pip install --upgrade pip - +RUN pip install -r requirements-test.txt # - Install dependencies RUN pip install -e .[providers,test] -RUN pip install -r requirements.txt +# init db airflow RUN airflow db init diff --git a/gcp_airflow_foundations/version.py b/gcp_airflow_foundations/version.py index a8d4557d..d7b30e12 100644 --- a/gcp_airflow_foundations/version.py +++ b/gcp_airflow_foundations/version.py @@ -1 +1 @@ -__version__ = "0.3.5" +__version__ = "0.3.6" diff --git a/gcp_airflow_foundations_config/framework.yaml b/gcp_airflow_foundations_config/framework.yaml new file mode 100644 index 00000000..02af7908 --- /dev/null +++ b/gcp_airflow_foundations_config/framework.yaml @@ -0,0 +1,44 @@ +package-name: gcp_airflow_foundations + +package-startwith: gcp_airflow_foundations. + +name: gcp-airflow-foundations + +description: | + `gcp-airflow-foundations` + +long-description: | + Please upload README from this metadata, with range dependencies and versions available + +versions: + - "0.3.5" + - "0.3.3" + - "0.3.2" + - "0.3.1" + - "0.3.0" + - "0.2.11" + - "0.2.10" + - "0.2.9" + - "0.2.8" + - "0.2.7" + - "0.2.6" + - "0.2.5" + - "0.2.4" + - "0.2.3" + - "0.2.2" + - "0.2.1" + - "0.2.0" + + +dependencies: + - pydantic==1.8.2 + - facebook_business>=10.0.0 + - dacite>=1.5.0 + - regex>=2021.11.1 + - twilio + - pandas>=0.17.1 + - pyarrow>=3.0.0 + +extras: + - providers + - test diff --git a/gcp_airflow_foundations_config/plugins_facebook.yaml b/gcp_airflow_foundations_config/plugins_facebook.yaml new file mode 100755 index 00000000..cdae90a5 --- /dev/null +++ b/gcp_airflow_foundations_config/plugins_facebook.yaml @@ -0,0 +1,20 @@ +package-name: gcp_airflow_foundations_facebook + +package-startwith: gcp_airflow_foundations_facebook + +name: gcp-airflow-foundations-facebook + +description: | + `A Facebook plug-in for gcp-airflow-foundations` + +long-description: | + Please upload README from this metadata, with range dependencies and versions available + +versions: + - "0.0.1" + +dependencies: + - pydantic==1.8.2 + - facebook-business>=6.0.2 + +extras: {} diff --git a/gcp_airflow_foundations_facebook/__init__.py b/gcp_airflow_foundations_facebook/__init__.py new file mode 100755 index 00000000..e69de29b diff --git a/gcp_airflow_foundations_facebook/base_class/__init__.py b/gcp_airflow_foundations_facebook/base_class/__init__.py new file mode 100755 index 00000000..e69de29b diff --git a/gcp_airflow_foundations_facebook/base_class/facebook_config.py b/gcp_airflow_foundations_facebook/base_class/facebook_config.py new file mode 100755 index 00000000..aeefe5fe --- /dev/null +++ b/gcp_airflow_foundations_facebook/base_class/facebook_config.py @@ -0,0 +1,90 @@ +from typing import List, Optional + + +from pydantic import validator +from pydantic.dataclasses import dataclass + +from facebook_business.adobjects.adsinsights import AdsInsights + +from gcp_airflow_foundations_facebook.enums.facebook import ( + Level, + DatePreset, + AccountLookupScope, + ApiObject, +) + +valid_fields = { + "account_name": AdsInsights.Field.account_name, + "account_id": AdsInsights.Field.account_id, + "attribution_setting": AdsInsights.Field.attribution_setting, + "account_currency": AdsInsights.Field.account_currency, + "campaign_name": AdsInsights.Field.campaign_name, + "campaign_id": AdsInsights.Field.campaign_id, + "adset_name": AdsInsights.Field.adset_name, + "adset_id": AdsInsights.Field.adset_id, + "ad_name": AdsInsights.Field.ad_name, + "ad_id": AdsInsights.Field.ad_id, + "impressions": AdsInsights.Field.impressions, + "spend": AdsInsights.Field.spend, + "reach": AdsInsights.Field.reach, + "clicks": AdsInsights.Field.clicks, + "cpc": AdsInsights.Field.cpc, + "ctr": AdsInsights.Field.ctr, + "cpm": AdsInsights.Field.cpm, + "unique_clicks": AdsInsights.Field.unique_clicks, + "inline_link_clicks": AdsInsights.Field.inline_link_clicks, + "unique_inline_link_click_ctr": AdsInsights.Field.unique_inline_link_click_ctr, + "inline_link_click_ctr": AdsInsights.Field.inline_link_click_ctr, + "unique_inline_link_clicks": AdsInsights.Field.unique_inline_link_clicks, + "cost_per_unique_inline_link_click": AdsInsights.Field.cost_per_unique_inline_link_click, + "cost_per_unique_outbound_click": AdsInsights.Field.cost_per_unique_outbound_click, + "cost_per_unique_click": AdsInsights.Field.cost_per_unique_click, + "cost_per_thruplay": AdsInsights.Field.cost_per_thruplay, + "video_30_sec_watched_actions": AdsInsights.Field.video_30_sec_watched_actions, + "video_p25_watched_actions": AdsInsights.Field.video_p25_watched_actions, + "video_p50_watched_actions": AdsInsights.Field.video_p50_watched_actions, + "video_p75_watched_actions": AdsInsights.Field.video_p75_watched_actions, + "video_p100_watched_actions": AdsInsights.Field.video_p100_watched_actions, + "video_play_actions": AdsInsights.Field.video_play_actions, + "conversion_values": AdsInsights.Field.conversion_values, + "conversions": AdsInsights.Field.conversions, + "cost_per_conversion": AdsInsights.Field.cost_per_conversion, + "actions": AdsInsights.Field.actions, + "action_values": AdsInsights.Field.action_values, + "cost_per_action_type": AdsInsights.Field.cost_per_action_type, +} + + +@dataclass +class FacebookConfig: + """ + Attributes: + fields: A list of dimensions and metrics for the Facebook Graph API. For more information see: https://developers.facebook.com/docs/marketing-api/insights/parameters/v12.0 + level: Represents the level of result {ad, adset, campaign, account} + account_lookup_scope: Whether to query all accounts managed by the user or only the active ones + account_bq_table: A BigQuery table with the account_id's + time_increment: The time dimension of the results + time_range: The time range used to query the Graph API + use_account_attribution_setting: When this parameter is set to true, your ads results will be shown using the attribution settings defined for the ad account. + use_unified_attribution_setting: When this parameter is set to true, your ads results will be shown using unified attribution settings defined at ad set level and parameter + """ + + fields: Optional[List[str]] + level: Optional[Level] + account_lookup_scope: AccountLookupScope + accounts_bq_table: Optional[str] + time_increment: Optional[str] + time_range: Optional[dict] + use_account_attribution_setting: Optional[bool] = False + use_unified_attribution_setting: Optional[bool] = False + + @validator("fields") + def valid_fields(cls, v): + if v is not None: + for field in v: + assert ( + field in valid_fields + ), f"`{field}` is not a valid field for the Facebook API" + return [valid_fields[field] for field in v] + else: + return [] diff --git a/gcp_airflow_foundations_facebook/base_class/facebook_table_config.py b/gcp_airflow_foundations_facebook/base_class/facebook_table_config.py new file mode 100755 index 00000000..10de441e --- /dev/null +++ b/gcp_airflow_foundations_facebook/base_class/facebook_table_config.py @@ -0,0 +1,27 @@ + +from dataclasses import dataclass +from pydantic import validator +from typing import List, Optional + +from gcp_airflow_foundations_facebook.enums.facebook import ApiObject + + +@dataclass +class FacebookTableConfig: + """ + Attributes: + api_object: The API object to query {insights, campaign, adset} + breakdowns: How to break down the result. For more than one breakdown, only certain combinations are available. + action_breakdowns: How to break down action results. Supports more than one breakdowns. Default value is ["action_type"]. + """ + + api_object: Optional[ApiObject] + breakdowns: Optional[List[str]] + action_breakdowns: Optional[List[str]] + + @validator("api_object") + def valid_fields(cls, v): + if v is None: + return ApiObject.INSIGHTS + else: + return v diff --git a/gcp_airflow_foundations_facebook/enums/__init__.py b/gcp_airflow_foundations_facebook/enums/__init__.py new file mode 100755 index 00000000..e69de29b diff --git a/gcp_airflow_foundations_facebook/enums/facebook.py b/gcp_airflow_foundations_facebook/enums/facebook.py new file mode 100755 index 00000000..22160f2f --- /dev/null +++ b/gcp_airflow_foundations_facebook/enums/facebook.py @@ -0,0 +1,31 @@ +from enum import Enum, unique + + +@unique +class Level(Enum): + AD = "ad" + ADSET = "adset" + CAMPAIGN = "campaign" + ACCOUNT = "account" + + +@unique +class DatePreset(Enum): + TODAY = "today" + YESTERDAY = "yesterday" + THIS_MONTH = "this_month" + LAST_MONTH = "last_month" + MAXIMUM = "maximum" + + +@unique +class AccountLookupScope(Enum): + ALL = "all" + ACTIVE = "active" + + +@unique +class ApiObject(Enum): + INSIGHTS = "INSIGHTS" + CAMPAIGNS = "CAMPAIGNS" + ADSETS = "ADSETS" diff --git a/gcp_airflow_foundations_facebook/operators/__init__.py b/gcp_airflow_foundations_facebook/operators/__init__.py new file mode 100755 index 00000000..e69de29b diff --git a/gcp_airflow_foundations_facebook/operators/facebook/__init__.py b/gcp_airflow_foundations_facebook/operators/facebook/__init__.py new file mode 100755 index 00000000..e69de29b diff --git a/gcp_airflow_foundations_facebook/operators/facebook/hooks/__init__.py b/gcp_airflow_foundations_facebook/operators/facebook/hooks/__init__.py new file mode 100755 index 00000000..e69de29b diff --git a/gcp_airflow_foundations_facebook/operators/facebook/hooks/ads.py b/gcp_airflow_foundations_facebook/operators/facebook/hooks/ads.py new file mode 100755 index 00000000..34c02298 --- /dev/null +++ b/gcp_airflow_foundations_facebook/operators/facebook/hooks/ads.py @@ -0,0 +1,300 @@ +import time +from typing import Any, Dict, List, Optional +from enum import Enum +from datetime import datetime +import requests +import json + +from airflow.exceptions import AirflowException +from airflow.providers.facebook.ads.hooks.ads import FacebookAdsReportingHook + +from facebook_business.api import FacebookAdsApi +from facebook_business.adobjects.adreportrun import AdReportRun +from facebook_business.adobjects.adaccount import AdAccount +from facebook_business.adobjects.adsinsights import AdsInsights + +from google.cloud import bigquery + + +class JobStatus(Enum): + """Available options for gcp_airflow_foundations_facebook async task status""" + + COMPLETED = "Job Completed" + STARTED = "Job Started" + RUNNING = "Job Running" + FAILED = "Job Failed" + SKIPPED = "Job Skipped" + + +class CustomFacebookAdsReportingHook(FacebookAdsReportingHook): + """ + Custom Hook for the Facebook Ads API. It extends the default FacebookAdsReportingHook. + + :param facebook_conn_id: Airflow Facebook Ads connection ID + :type facebook_conn_id: str + :param api_version: The version of Facebook API. Default to None. If it is None, + it will use the Facebook business SDK default version. + :type api_version: Optional[str] + """ + + conn_name_attr = "facebook_conn_id" + default_conn_name = "facebook_custom" + + def __init__( + self, + facebook_conn_id: str = default_conn_name, + api_version: Optional[str] = None, + **kwargs, + ) -> None: + super(CustomFacebookAdsReportingHook, self).__init__( + facebook_conn_id=facebook_conn_id, api_version=api_version, **kwargs + ) + + self.facebook_conn_id = facebook_conn_id + self.api_version = api_version + self.client_required_fields = ["app_id", "app_secret", "access_token"] + self.config = self.facebook_ads_config + + def _get_service(self, facebook_acc_id) -> FacebookAdsApi: + """Returns Facebook Ads Client using a service account""" + + return FacebookAdsApi.init( + app_id=self.config["app_id"], + app_secret=self.config["app_secret"], + access_token=self.config["access_token"], + account_id=facebook_acc_id, + api_version=self.api_version, + ) + + def bulk_facebook_report_async( + self, + facebook_acc_id: str, + params: Dict[str, Any], + fields: List[str], + sleep_time: int = 5, + ) -> List[AdsInsights]: + """ + Pulls data from the Facebook Ads API using async calls. + :param facebook_acc_id: The Facebook account ID to pull data from. + :type facebook_acc_id: str + :param fields: List of fields that is obtained from Facebook. Found in AdsInsights.Field class. + https://developers.facebook.com/docs/marketing-api/insights/parameters/v6.0 + :type fields: List[str] + :param params: Parameters that determine the query for Facebook + https://developers.facebook.com/docs/marketing-api/insights/parameters/v6.0 + :type fields: Dict[str, Any] + :param sleep_time: Time to sleep when async call is happening + :type sleep_time: int + :return: Facebook Ads API response, converted to rows. + :rtype: List[dict] + """ + + api = self._get_service(facebook_acc_id=facebook_acc_id) + ad_account = AdAccount(api.get_default_account_id(), api=api) + _async = ad_account.get_insights(params=params, fields=fields, is_async=True) + while True: + request = _async.api_get() + async_status = request[AdReportRun.Field.async_status] + percent = request[AdReportRun.Field.async_percent_completion] + self.log.info( + "%s %s completed, async_status: %s", percent, "%", async_status + ) + if async_status == JobStatus.COMPLETED.value: + self.log.info("Job run completed") + break + if async_status in [JobStatus.SKIPPED.value, JobStatus.FAILED.value]: + message = f"{async_status}. Please retry." + raise AirflowException(message) + time.sleep(sleep_time) + report_run_id = _async.api_get()["report_run_id"] + report_object = AdReportRun(report_run_id, api=api) + insights = report_object.get_insights() + + max_current_usage = self.usage_throttle(insights) + + if max_current_usage >= 75: + return -1 + + self.log.info("Extracting data from returned Facebook Ads Iterators") + + rows = [] + while True: + max_current_usage = self.usage_throttle(insights) + if max_current_usage >= 75: + self.log.info("75% Rate Limit Reached. Cooling Time 5 Minutes.") + time.sleep(300) + try: + rows.append(next(insights)) + except StopIteration: + break + + return [dict(row) for row in rows] + + def bulk_facebook_report( + self, + facebook_acc_id: str, + params: Dict[str, Any], + fields: List[str], + sleep_time: int = 5, + ) -> List[AdsInsights]: + """ + Pulls data from the Facebook Ads API using sync calls. + :param facebook_acc_id: The Facebook account ID to pull data from. + :type facebook_acc_id: str + :param fields: List of fields that is obtained from Facebook. Found in AdsInsights.Field class. + https://developers.facebook.com/docs/marketing-api/insights/parameters/v6.0 + :type fields: List[str] + :param params: Parameters that determine the query for Facebook + https://developers.facebook.com/docs/marketing-api/insights/parameters/v6.0 + :type fields: Dict[str, Any] + :return: Facebook Ads API response, converted to rows. + :rtype: List[dict] + """ + + api = self._get_service(facebook_acc_id=facebook_acc_id) + ad_account = AdAccount(api.get_default_account_id(), api=api) + insights = ad_account.get_insights(params=params, fields=fields, is_async=False) + rows = list(insights) + + self.usage_throttle(insights) + + return [dict(row) for row in rows] + + def get_campaigns(self, facebook_acc_id: str, params: Dict[str, Any]) -> List[dict]: + """ + Pulls campaign data from the Facebook Ads API using sync calls. + :param facebook_acc_id: The Facebook account ID to pull data from. + :type facebook_acc_id: str + :param params: Parameters that determine the query for Facebook + https://developers.facebook.com/docs/marketing-api/insights/parameters/v6.0 + :type params: Dict[str, Any] + :return: Facebook Ads API response, converted to rows. + :rtype: List[dict] + """ + + api = self._get_service(facebook_acc_id=facebook_acc_id) + ad_account = AdAccount(api.get_default_account_id(), api=api) + + campaigns = ad_account.get_campaigns( + params={"limit": "20000", "time_range": params["time_range"]}, + fields=[ + "account_id", + # 'name', TO-DO: troubleshoot why pyarrow fails to convert the `name` column + "daily_budget", + "effective_status", + "lifetime_budget", + "start_time", + "stop_time", + ], + ) + + rows = [] + for row in campaigns: + converted_row = row._data + if "name" in converted_row: + converted_row["name"] = str(converted_row["name"]) + if "start_time" in converted_row: + converted_row["start_time"] = datetime.strptime( + converted_row["start_time"], "%Y-%m-%dT%H:%M:%S%z" + ) + if "stop_time" in row: + converted_row["stop_time"] = datetime.strptime( + converted_row["stop_time"], "%Y-%m-%dT%H:%M:%S%z" + ) + rows.append(converted_row) + + return rows + + def get_adsets(self, facebook_acc_id: str, params: Dict[str, Any]) -> List[dict]: + """ + Pulls adset data from the Facebook Ads API using sync calls. + :param facebook_acc_id: The Facebook account ID to pull data from. + :type facebook_acc_id: str + :param params: Parameters that determine the query for Facebook + https://developers.facebook.com/docs/marketing-api/insights/parameters/v6.0 + :type fields: Dict[str, Any] + :return: Facebook Ads API response, converted to rows. + :rtype: List[dict] + """ + + api = self._get_service(facebook_acc_id=facebook_acc_id) + ad_account = AdAccount(api.get_default_account_id(), api=api) + + adsets = ad_account.get_ad_sets( + params={"limit": "20000", "time_range": params["time_range"]}, + fields=[ + "account_id", + "name", + "campaign_id", + "daily_budget", + "effective_status", + "lifetime_budget", + "created_time", + "end_time", + ], + ) + + rows = [] + for row in adsets: + converted_row = row._data + if "name" in converted_row: + converted_row["name"] = str(converted_row["name"]) + if "created_time" in converted_row: + converted_row["created_time"] = datetime.strptime( + converted_row["created_time"], "%Y-%m-%dT%H:%M:%S%z" + ) + if "end_time" in row: + converted_row["end_time"] = datetime.strptime( + converted_row["end_time"], "%Y-%m-%dT%H:%M:%S%z" + ) + rows.append(converted_row) + + return rows + + def get_active_accounts_from_bq(self, project_id, table_id) -> List[str]: + """ + Pulls a list of Facebook account IDs from a BigQuery table. + :param project_id: The Google Cloud Platform project ID. + :type project_id: str + :param table_id: Name of BigQuery table that contains the Facebook account IDS. + :type table_id: str + :return: A list with the Facebook account IDs. + :rtype: List[str] + """ + + sql = f"SELECT account_id FROM `{table_id}`" + + query_config = bigquery.QueryJobConfig(use_legacy_sql=False) + + client = bigquery.Client(project=project_id) + + df = client.query(sql, job_config=query_config).to_dataframe() + + return [f"act_{i}" for i in df.account_id] + + def get_all_accounts(self) -> List[str]: + """ + Pulls a list of Facebook account IDs from the Facebook API. + :return: A list with the Facebook account IDs. + :rtype: List[str] + """ + + self.log.info("Extracting all accounts") + + user_id = self.config["user_id"] + + URL = f"https://graph.facebook.com/v12.0/{user_id}/adaccounts" + params = {"access_token": self.config["access_token"], "limit": 10000} + + accounts = requests.get(URL, params=params).json()["data"] + + return [i["id"] for i in accounts] + + def usage_throttle(self, insights) -> int: + """ + Queries the 'x-business-use-case-usage' header of the Cursor object returned by the Facebook API. + """ + + usage_header = json.loads(insights._headers["x-business-use-case-usage"]) + values = list(usage_header.values())[0][0] + return max(values["call_count"], values["total_cputime"], values["total_time"]) diff --git a/gcp_airflow_foundations_facebook/operators/facebook/operators/__init__.py b/gcp_airflow_foundations_facebook/operators/facebook/operators/__init__.py new file mode 100755 index 00000000..e69de29b diff --git a/gcp_airflow_foundations_facebook/operators/facebook/operators/facebook_ads_to_gcs.py b/gcp_airflow_foundations_facebook/operators/facebook/operators/facebook_ads_to_gcs.py new file mode 100755 index 00000000..8595986b --- /dev/null +++ b/gcp_airflow_foundations_facebook/operators/facebook/operators/facebook_ads_to_gcs.py @@ -0,0 +1,235 @@ +from typing import Any, Dict, List, Optional, Sequence, Union +from random import shuffle +import pandas as pd +from datetime import datetime + +import pyarrow.parquet as pq +import pyarrow + + +from gcp_airflow_foundations_facebook.operators.facebook.hooks.ads import ( + CustomFacebookAdsReportingHook, +) +from gcp_airflow_foundations_facebook.enums.facebook import AccountLookupScope, ApiObject + +from airflow.models import BaseOperator +from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook + +from google.cloud import bigquery + + +class FacebookAdsReportToBqOperator(BaseOperator): + """ + Fetches the results from the Facebook Ads API as desired in the params and fields + Converts to parquet format and loads to directly BigQuery maintaining the native nested + representation of the data. + + :param api_object: The API Object to query from + :type api_object: ApiObject + :param gcp_project: The Google Cloud Platform project ID + :type gcp_project: str + :param account_lookup_scope: Whether to query all or only the active accounts managed by the user. + :type account_lookup_scope: AccountLookupScope + :param destination_project_dataset_table: BigQuery staging zone table. String in dotted (.). format. + :type destination_project_dataset_table: str + :param accounts_bq_table: BigQuery table with the Facebook Account IDs to query data from. String in dotted (.).
format. + :type accounts_bq_table: str + :param time_range: Time range used in the Graph API query. + :type time_range: Dict[str, Any] + :param gcp_conn_id: Airflow Google Cloud connection ID + :type gcp_conn_id: str + :param facebook_conn_id: Airflow Facebook Ads connection ID + :type facebook_conn_id: str + :param api_version: The version of Facebook API. Default to None. If it is None, + it will use the Facebook business SDK default version. + :type api_version: str + :param fields: List of fields that is obtained from Facebook. Found in AdsInsights.Field class. + https://developers.facebook.com/docs/marketing-api/insights/parameters/v6.0 + :type fields: List[str] + :param parameters: Parameters that determine the query for Facebook + https://developers.facebook.com/docs/marketing-api/insights/parameters/v6.0 + :type parameters: Dict[str, Any] + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + :type impersonation_chain: Union[str, Sequence[str]] + """ + + template_fields = ("facebook_conn_id", "impersonation_chain", "parameters") + + def __init__( + self, + *, + api_object: ApiObject, + gcp_project: str, + account_lookup_scope: AccountLookupScope, + destination_project_dataset_table: str, + accounts_bq_table: str, + fields: List[str], + parameters: Dict[str, Any] = None, + time_range: Dict[str, Any] = None, + api_version: Optional[str] = None, + gcp_conn_id: str = "google_cloud_default", + facebook_conn_id: str = "facebook_custom", + impersonation_chain: Optional[Union[str, Sequence[str]]] = None, + **kwargs, + ) -> None: + super(FacebookAdsReportToBqOperator, self).__init__(**kwargs) + + self.api_object = api_object + self.gcp_project = gcp_project + self.account_lookup_scope = account_lookup_scope + self.destination_project_dataset_table = destination_project_dataset_table + self.accounts_bq_table = accounts_bq_table + self.gcp_conn_id = gcp_conn_id + self.facebook_conn_id = facebook_conn_id + self.api_version = api_version + self.fields = fields + self.parameters = parameters + self.time_range = time_range + self.impersonation_chain = impersonation_chain + + def execute(self, context: dict): + + ds = context["ds"] + + if not self.time_range: + self.parameters["time_range"] = {"since": ds, "until": ds} + else: + self.parameters["time_range"] = { + "since": self.time_range["since"], + "until": ds, + } + + self.log.info( + "Currently loading data for date range: %s", self.parameters["time_range"] + ) + + service = CustomFacebookAdsReportingHook( + facebook_conn_id=self.facebook_conn_id, api_version=self.api_version + ) + + if self.account_lookup_scope == AccountLookupScope.ALL: + facebook_acc_ids = service.get_all_accounts() + + elif self.account_lookup_scope == AccountLookupScope.ACTIVE: + facebook_acc_ids = service.get_active_accounts_from_bq( + project_id=self.gcp_project, table_id=self.accounts_bq_table + ) + + shuffle(facebook_acc_ids) + + converted_rows = [] + while True: + for facebook_acc_id in facebook_acc_ids: + + self.log.info( + "Currently loading data from Account ID: %s", facebook_acc_id + ) + + try: + if self.api_object == ApiObject.INSIGHTS: + rows = service.bulk_facebook_report_async( + facebook_acc_id=facebook_acc_id, + params=self.parameters, + fields=self.fields, + ) + if rows == -1: + self.log.info( + "Rate Limit has reached 75%. Moving on to the next account. Will retry later" + ) + continue + + elif self.api_object == ApiObject.CAMPAIGNS: + rows = service.get_campaigns( + facebook_acc_id=facebook_acc_id, params=self.parameters + ) + + elif self.api_object == ApiObject.ADSETS: + rows = service.get_adsets( + facebook_acc_id=facebook_acc_id, params=self.parameters + ) + + converted_rows.extend(rows) + + facebook_acc_ids.remove(facebook_acc_id) + + self.log.info( + "Extracting data for account %s completed", facebook_acc_id + ) + except: # noqa: E722 + self.log.info( + "Extracting data for account %s failed. Will retry later.", + facebook_acc_id, + ) + + if len(facebook_acc_ids) == 0: + break + + self.log.info("Facebook Returned %s data points", len(converted_rows)) + + self.transform_data_types(converted_rows) + + df = pd.DataFrame.from_dict(converted_rows) + + writer = pyarrow.BufferOutputStream() + pq.write_table( + pyarrow.Table.from_pandas(df), writer, use_compliant_nested_type=True + ) + reader = pyarrow.BufferReader(writer.getvalue()) + + hook = BigQueryHook(gcp_conn_id=self.gcp_conn_id) + + client = hook.get_client(project_id=self.gcp_project) + + parquet_options = bigquery.format_options.ParquetOptions() + parquet_options.enable_list_inference = True + + job_config = bigquery.LoadJobConfig() + job_config.source_format = bigquery.SourceFormat.PARQUET + job_config.parquet_options = parquet_options + job_config.write_disposition = "WRITE_TRUNCATE" + + client.load_table_from_file( + reader, + f"{self.destination_project_dataset_table}_{ds}", + job_config=job_config, + ) + + def transform_data_types(self, rows): + """ + Transforms the fields returned by the Facebook API to float or date data types as appropriate. + + :param rows: List of dictionary rows returned by the Facebook API. + :type rows: List[dict] + """ + for i in rows: + i.pop("date_stop") + i["date_start"] = datetime.strptime(i["date_start"], "%Y-%m-%d").date() + for j in i: + if j.endswith("id") or j.endswith("name"): + continue + elif type(i[j]) == str: + i[j] = self.get_float(i[j]) + elif type(i[j]) == list: + for k in i[j]: + for w in k: + if (type(k[w]) == str) and (not w.endswith("id")): + k[w] = self.get_float(k[w]) + + def get_float(self, element): + """ + Attempts to cast a string object into float. + + :param element: Value to be converted to float. + :type element: str + """ + try: + return float(element) + except ValueError: + return element diff --git a/gcp_airflow_foundations_facebook/source_class/__init__.py b/gcp_airflow_foundations_facebook/source_class/__init__.py new file mode 100755 index 00000000..e69de29b diff --git a/gcp_airflow_foundations_facebook/source_class/facebook.py b/gcp_airflow_foundations_facebook/source_class/facebook.py new file mode 100755 index 00000000..ed4f9af3 --- /dev/null +++ b/gcp_airflow_foundations_facebook/source_class/facebook.py @@ -0,0 +1,85 @@ +import logging +from datetime import datetime + +from airflow.models.dag import DAG +from airflow.models import Variable +from airflow.utils.task_group import TaskGroup + +from gcp_airflow_foundations_facebook.operators.facebook.operators.facebook_ads_to_gcs import ( + FacebookAdsReportToBqOperator, +) +from gcp_airflow_foundations.source_class.source import DagBuilder + +from airflow.sensors.external_task import ExternalTaskSensor + + +class FacebooktoBQDagBuilder(DagBuilder): + """ + Builds DAGs to load Facebook Ads data to a staging BigQuery table. + """ + + source_type = "FACEBOOK" + + def set_schema_method_type(self): + self.schema_source_type = self.config.source.schema_options.schema_source_type + + def get_bq_ingestion_task(self, dag, table_config): + data_source = self.config.source + + task_group = TaskGroup(group_id="ingest_facebook_data") + + facebook_options = data_source.facebook_options + + GCP_PROJECT_ID = data_source.gcp_project + + FIELDS = facebook_options.fields + + facebook_table_config = table_config.facebook_table_config + level = facebook_options.level.value if facebook_options.level else None + + PARAMETERS = { + "level": level, + "time_increment": facebook_options.time_increment, + "breakdowns": facebook_table_config.breakdowns, + "action_breakdowns": facebook_table_config.action_breakdowns, + "use_account_attribution_setting": facebook_options.use_account_attribution_setting, + "use_unified_attribution_setting": facebook_options.use_unified_attribution_setting, + "limit": "20000", + } + + TIME_RANGE = facebook_options.time_range + + run_operator = FacebookAdsReportToBqOperator( + task_id="fetch_facebook_data_to_bq_staging", + api_object=facebook_table_config.api_object, + gcp_project=GCP_PROJECT_ID, + destination_project_dataset_table=f"{data_source.landing_zone_options.landing_zone_dataset}.{table_config.landing_zone_table_name_override}", + accounts_bq_table=facebook_options.accounts_bq_table, + time_range=TIME_RANGE, + parameters=PARAMETERS, + fields=FIELDS, + account_lookup_scope=facebook_options.account_lookup_scope, + gcp_conn_id="google_cloud_default", + api_version="v12.0", + task_group=task_group, + dag=dag, + ) + + if data_source.external_dag_id: + sensor_op = ExternalTaskSensor( + task_id="wait_for_accounts_ingestion", + external_dag_id=data_source.external_dag_id, + mode="reschedule", + check_existence=True, + task_group=task_group, + dag=dag, + ) + sensor_op >> run_operator + + else: + run_operator + + return task_group + + def validate_extra_options(self): + pass diff --git a/packaging.sh b/packaging.sh new file mode 100755 index 00000000..29c0268f --- /dev/null +++ b/packaging.sh @@ -0,0 +1,9 @@ +#!/bin/bash +echo "package_name: $1"; + +#we set variable from the command line +export PKG_NAME="$1" + +echo $PKG_NAME +python setup.py bdist_wheel --universal + diff --git a/requirements-ci.txt b/requirements-test.txt similarity index 98% rename from requirements-ci.txt rename to requirements-test.txt index 7e939ee0..ba40912a 100644 --- a/requirements-ci.txt +++ b/requirements-test.txt @@ -3,3 +3,4 @@ pytest-testconfig==0.2.0 pytest-airflow>=0.0.3 pytest-cov==3.0.0 flake8==4.0.1 + diff --git a/requirements.txt b/requirements.txt index 3b681c37..cb2365ab 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,6 @@ dacite>=1.5.0 regex>=2021.11.1 twilio pandas>=0.17.1 -pyarrow>=3.0.0 \ No newline at end of file +pyarrow>=3.0.0 +setuptools +pyyaml \ No newline at end of file diff --git a/setup.py b/setup.py old mode 100644 new mode 100755 index 5e1351ef..a83c8f7e --- a/setup.py +++ b/setup.py @@ -1,64 +1,83 @@ import os import sys - +import yaml import setuptools -from setuptools import setup, find_packages +from setuptools import setup, find_packages, Distribution -from pathlib import Path here = os.path.abspath(os.path.dirname(__file__)) about = {} - -with open(os.path.join(here, "requirements.txt"), "r") as f: - requirements = f.read().strip().split("\n") +# Load extra dependencies from txt with open(os.path.join(here, "requirements-providers.txt"), "r") as f: requirements_providers = f.read().strip().split("\n") -with open(os.path.join(here, "requirements-ci.txt"), "r") as f: +with open(os.path.join(here, "requirements-test.txt"), "r") as f: requirements_test = f.read().strip().split("\n") -this_directory = Path(__file__).parent -long_description = (this_directory / "README.md").read_text() -packages = [ - package - for package in setuptools.PEP420PackageFinder.find() - if package.startswith("gcp_airflow_foundations") -] +def main() -> dict: + if os.environ.get("PKG_NAME"): + pkg = os.environ.get("PKG_NAME") -version = {} -with open(os.path.join(here, "gcp_airflow_foundations/version.py")) as fp: - exec(fp.read(), version) -version = version["__version__"] + # Framework configuration + if pkg == "gcp_airflow_foundations": + plug = yaml.safe_load(open("gcp_airflow_foundations_config/framework.yaml")) + return _process_metadata(plug) + # Facebook plugin configuration + if pkg == "gcp_airflow_foundations_facebook": + plug = yaml.safe_load(open("gcp_airflow_foundations_config/plugins_facebook.yaml")) + return _process_metadata(plug) -def main(): - metadata = dict( - name="gcp-airflow-foundations", - version=version, - description="Opinionated framework based on Airflow 2.0 for building pipelines to ingest data into a BigQuery data warehouse", - long_description=long_description, - long_description_content_type="text/markdown", - url="https://github.com/badal-io/gcp-airflow-foundations", - author="Badal.io", - author_email="info@badal.io", - license="Apache 2.0", - packages=packages, - install_requires=requirements, - extras_require={'providers': requirements_providers, 'test': requirements_test}, - classifiers=[ - "Development Status :: 4 - Beta", - "Intended Audience :: Developers", - "Topic :: Software Development :: Libraries", - "License :: OSI Approved :: Apache Software License", - "Programming Language :: Python :: 3.6", - "Programming Language :: Python :: 3.7", - "Programming Language :: Python :: 3.8", - "Programming Language :: Python :: 3.9", - ], - ) + else: + return None + else: + return None - setup(**metadata) + +def _process_metadata(plug) -> dict: + try: + packages = [ + package + for package in setuptools.PEP420PackageFinder.find() + if package.startswith(str(plug['package-startwith'])) + ] + extras = {} + for x in plug['extras']: + with open(os.path.join(here, f"requirements-{x}.txt"), "r") as f: + extra = f.read().strip().split("\n") + extras.update({x: extra}) + metadata = dict( + name=plug['name'], + version=plug['versions'][0], + description=plug['description'], + long_description=plug['long-description'], + packages=packages, + install_requires=plug['dependencies'], + extras_require=extras, + ) + base_metadata = dict( + long_description_content_type="text/markdown", + author="Badal.io", + author_email="info@badal.io", + license="Apache 2.0", + classifiers=[ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "Topic :: Software Development :: Libraries", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + ], + ) + metadata.update(base_metadata) + except Exception as e: + print(e) + metadata = dict(name="no_package") + return metadata if __name__ == "__main__": - main() + metadata = main() + setup(**metadata) diff --git a/tests/airflow b/tests/airflow index 25902826..287b4574 100755 --- a/tests/airflow +++ b/tests/airflow @@ -1,4 +1,4 @@ # # Run commands in a temporary ops container # -exec docker-compose run --rm -e CONNECTION_CHECK_MAX_COUNT=0 airflow-ops bash -c "${@}" +exec docker-compose run --rm -e CONNECTION_CHECK_MAX_COUNT=0 airflow-init bash -c "${@}" diff --git a/tests/integration/dlp/test_dlp.py b/tests/integration/dlp/test_dlp.py index 9bcc96be..4640f3b7 100644 --- a/tests/integration/dlp/test_dlp.py +++ b/tests/integration/dlp/test_dlp.py @@ -1,7 +1,7 @@ import logging import pytz import unittest -from airflow.models import DAG +from airflow.models.dag import DAG from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook from airflow.utils.state import State from airflow.utils.types import DagRunType diff --git a/tests/integration/hds/test_hds_upsert_scd2.py b/tests/integration/hds/test_hds_upsert_scd2.py index ea3b98c2..cc6f50df 100644 --- a/tests/integration/hds/test_hds_upsert_scd2.py +++ b/tests/integration/hds/test_hds_upsert_scd2.py @@ -1,6 +1,7 @@ import pytz import unittest -from airflow.models import DAG, TaskInstance, XCom, DagRun, DagTag, DagModel +from airflow.models import TaskInstance, XCom, DagRun +from airflow.models.dag import DAG, DagTag, DagModel from airflow.operators.dummy import DummyOperator from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook from airflow.providers.google.cloud.transfers.bigquery_to_bigquery import BigQueryToBigQueryOperator diff --git a/tests/integration/hds/test_hds_upsert_snapshot.py b/tests/integration/hds/test_hds_upsert_snapshot.py index 0fed26ac..d0e046c4 100644 --- a/tests/integration/hds/test_hds_upsert_snapshot.py +++ b/tests/integration/hds/test_hds_upsert_snapshot.py @@ -1,6 +1,7 @@ import pytz import unittest -from airflow.models import DAG, TaskInstance, XCom, DagRun, DagTag, DagModel +from airflow.models import TaskInstance, XCom, DagRun +from airflow.models.dag import DAG, DagTag, DagModel from airflow.operators.dummy import DummyOperator from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook from airflow.utils.session import create_session, provide_session diff --git a/tests/integration/ods/test_ods_upsert.py b/tests/integration/ods/test_ods_upsert.py index 1c316c18..5ead46fa 100644 --- a/tests/integration/ods/test_ods_upsert.py +++ b/tests/integration/ods/test_ods_upsert.py @@ -1,6 +1,7 @@ import pytz import unittest -from airflow.models import DAG, TaskInstance, XCom, DagRun, DagTag, DagModel +from airflow.models import TaskInstance, XCom, DagRun +from airflow.models.dag import DAG, DagTag, DagModel from airflow.operators.dummy import DummyOperator # from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook diff --git a/tests/integration/pytest.ini b/tests/integration/pytest.ini old mode 100644 new mode 100755 diff --git a/tests/integration/schema/test_schema_migration.py b/tests/integration/schema/test_schema_migration.py index 5260ade2..a5d69249 100644 --- a/tests/integration/schema/test_schema_migration.py +++ b/tests/integration/schema/test_schema_migration.py @@ -3,7 +3,8 @@ import pytz import unittest from airflow.exceptions import AirflowException -from airflow.models import TaskInstance, XCom, DagRun, DagTag, DagModel +from airflow.models import TaskInstance, XCom, DagRun +from airflow.models.dag import DagTag, DagModel from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook from airflow.providers.google.cloud.transfers.bigquery_to_bigquery import BigQueryToBigQueryOperator from airflow.utils.session import create_session, provide_session diff --git a/tests/integration/schema/test_schema_parsing.py b/tests/integration/schema/test_schema_parsing.py index a1c23af7..66f04fe8 100644 --- a/tests/integration/schema/test_schema_parsing.py +++ b/tests/integration/schema/test_schema_parsing.py @@ -1,7 +1,8 @@ import os import pytz import unittest -from airflow.models import DAG, TaskInstance, XCom, DagRun, DagTag, DagModel +from airflow.models import TaskInstance, XCom, DagRun +from airflow.models.dag import DAG, DagTag, DagModel from airflow.operators.dummy import DummyOperator from airflow.utils.session import create_session, provide_session from airflow.utils.state import State diff --git a/tests/test_utils/test_utils.py b/tests/test_utils/test_utils.py old mode 100644 new mode 100755 index ba31ae36..f42b5dc1 --- a/tests/test_utils/test_utils.py +++ b/tests/test_utils/test_utils.py @@ -1,6 +1,7 @@ from airflow.utils.session import create_session, provide_session -from airflow.models import DAG, TaskInstance, XCom, DagBag, DagRun, DagTag, DagModel +from airflow.models import TaskInstance, XCom, DagRun +from airflow.models.dag import DAG, DagTag, DagModel import pytz from airflow.utils.state import State from datetime import datetime diff --git a/tests/unit/dags/test_task_groups.py b/tests/unit/dags/test_task_groups.py index a6d893d2..fc691e9a 100644 --- a/tests/unit/dags/test_task_groups.py +++ b/tests/unit/dags/test_task_groups.py @@ -6,12 +6,11 @@ from gcp_airflow_foundations.operators.gcp.ods.load_ods_taskgroup import ods_builder from gcp_airflow_foundations.base_class.utils import load_tables_config_from_dir -from airflow.models.dag import DAG - from airflow.utils.session import create_session from airflow.utils.timezone import datetime -from airflow.models import DagBag, DagRun, DagTag, TaskInstance, DagModel +from airflow.models import DagRun, TaskInstance +from airflow.models.dag import DAG, DagTag, DagModel DEFAULT_DATE = datetime(2015, 1, 1) TEST_DAG_ID = "unit_test_dag" diff --git a/tests/unit/operators/airflow/test_table_ingestion_sensor.py b/tests/unit/operators/airflow/test_table_ingestion_sensor.py index d98a7cf7..eed6e638 100644 --- a/tests/unit/operators/airflow/test_table_ingestion_sensor.py +++ b/tests/unit/operators/airflow/test_table_ingestion_sensor.py @@ -4,8 +4,8 @@ from airflow.exceptions import AirflowException -from airflow.models import DagBag, DagRun, DagTag, TaskInstance, DagModel -from airflow.models.dag import DAG +from airflow.models import DagBag, DagRun, TaskInstance +from airflow.models.dag import DAG, DagTag, DagModel from airflow.utils.state import State from airflow.utils.timezone import datetime @@ -412,7 +412,7 @@ def test_catch_no_dags_error(self): with pytest.raises(AirflowException) as ctx: op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - execute_task(task=op, execution_date=DEFAULT_DATE) + execute_task(task=op) assert str(ctx.value) == "No active dags found." def test_catch_no_dags_for_source_error(self): diff --git a/tests/unit/operators/branch/test_cron_branch.py b/tests/unit/operators/branch/test_cron_branch.py index fd8af101..ea18ec47 100644 --- a/tests/unit/operators/branch/test_cron_branch.py +++ b/tests/unit/operators/branch/test_cron_branch.py @@ -1,7 +1,8 @@ import datetime import unittest -from airflow.models import DAG, DagRun, TaskInstance as TI +from airflow.models import DagRun, TaskInstance as TI +from airflow.models.dag import DAG from airflow.operators.dummy import DummyOperator from airflow.utils import timezone from airflow.utils.session import create_session diff --git a/tests/unit/operators/gcp/test_bigquery.py b/tests/unit/operators/gcp/test_bigquery.py index cd3371bd..1eeb3806 100644 --- a/tests/unit/operators/gcp/test_bigquery.py +++ b/tests/unit/operators/gcp/test_bigquery.py @@ -1,7 +1,8 @@ import pytz import unittest -from airflow.models import DAG, TaskInstance, XCom, DagRun, DagTag, DagModel -from airflow.models.xcom import XCOM_RETURN_KEY +from airflow.models import TaskInstance, DagRun +from airflow.models.dag import DAG, DagTag, DagModel +from airflow.models.xcom import XCOM_RETURN_KEY, BaseXCom from airflow.operators.dummy import DummyOperator from airflow.utils.session import create_session, provide_session from airflow.utils.state import State @@ -37,7 +38,7 @@ @provide_session def cleanup_xcom(session=None): - session.query(XCom).delete() + session.query(BaseXCom).delete() def clear_db_dags(): diff --git a/tests/unit/operators/hds/test_merge_hds.py b/tests/unit/operators/hds/test_merge_hds.py index af79b640..4b5490b7 100644 --- a/tests/unit/operators/hds/test_merge_hds.py +++ b/tests/unit/operators/hds/test_merge_hds.py @@ -1,6 +1,7 @@ import pytz import unittest -from airflow.models import DAG, TaskInstance, XCom, DagRun, DagTag, DagModel +from airflow.models import TaskInstance, XCom, DagRun +from airflow.models.dag import DAG, DagTag, DagModel from airflow.models.xcom import XCOM_RETURN_KEY from airflow.operators.dummy import DummyOperator from airflow.utils.session import create_session, provide_session diff --git a/tests/unit/operators/ods/test_merge_ods.py b/tests/unit/operators/ods/test_merge_ods.py index b06e35a6..b0280aa7 100644 --- a/tests/unit/operators/ods/test_merge_ods.py +++ b/tests/unit/operators/ods/test_merge_ods.py @@ -1,6 +1,7 @@ import pytz import unittest -from airflow.models import DAG, TaskInstance, XCom, DagRun, DagTag, DagModel +from airflow.models import TaskInstance, XCom, DagRun +from airflow.models.dag import DAG, DagTag, DagModel from airflow.models.xcom import XCOM_RETURN_KEY from airflow.operators.dummy import DummyOperator from airflow.utils.session import create_session, provide_session diff --git a/tests/unit/operators/schema/test_schema_migration.py b/tests/unit/operators/schema/test_schema_migration.py index 59a2a3f4..f06b1d05 100644 --- a/tests/unit/operators/schema/test_schema_migration.py +++ b/tests/unit/operators/schema/test_schema_migration.py @@ -1,6 +1,7 @@ import pytz import unittest -from airflow.models import DAG, TaskInstance, XCom, DagRun, DagTag, DagModel +from airflow.models import TaskInstance, XCom, DagRun +from airflow.models.dag import DAG, DagTag, DagModel from airflow.models.xcom import XCOM_RETURN_KEY from airflow.operators.dummy import DummyOperator from airflow.utils.session import create_session, provide_session diff --git a/tests/unit/operators/schema/test_schema_parsing.py b/tests/unit/operators/schema/test_schema_parsing.py index a9673e4e..fd49b3ef 100644 --- a/tests/unit/operators/schema/test_schema_parsing.py +++ b/tests/unit/operators/schema/test_schema_parsing.py @@ -1,7 +1,8 @@ import os import pytz import unittest -from airflow.models import DAG, TaskInstance, XCom, DagRun, DagTag, DagModel +from airflow.models import TaskInstance, XCom, DagRun +from airflow.models.dag import DAG, DagTag, DagModel from airflow.operators.dummy import DummyOperator from airflow.utils.session import create_session, provide_session from airflow.utils.state import State diff --git a/tests/unit/sources/facebook/test_facebook.py b/tests/unit/sources/facebook/test_facebook.py index ba8e20e1..8e65fd8f 100644 --- a/tests/unit/sources/facebook/test_facebook.py +++ b/tests/unit/sources/facebook/test_facebook.py @@ -16,8 +16,8 @@ facebook, ) -from airflow.models.dag import DAG -from airflow.models import DagBag, DagRun, DagTag, TaskInstance, DagModel +from airflow.models import DagRun, TaskInstance +from airflow.models.dag import DAG, DagTag, DagModel from airflow.utils.session import create_session DEFAULT_DATE = datetime.datetime(2015, 1, 1) diff --git a/tests/unit/sources/gcs/test_gcs.py b/tests/unit/sources/gcs/test_gcs.py index 434835da..84cf56a7 100644 --- a/tests/unit/sources/gcs/test_gcs.py +++ b/tests/unit/sources/gcs/test_gcs.py @@ -9,8 +9,9 @@ from gcp_airflow_foundations.base_class.file_source_config import FileSourceConfig from gcp_airflow_foundations.base_class.file_table_config import FileTableConfig from dacite import from_dict -from airflow.models.dag import DAG -from airflow.models import DagRun, DagTag, TaskInstance, DagModel + +from airflow.models import DagRun, TaskInstance +from airflow.models.dag import DAG, DagTag, DagModel from airflow.utils.session import create_session from airflow.utils.task_group import TaskGroup diff --git a/variables/docker-env-vars b/variables/docker-env-vars index 1c4026c6..7c0c115a 100644 --- a/variables/docker-env-vars +++ b/variables/docker-env-vars @@ -5,7 +5,7 @@ # -- Airflow Configuration: AIRFLOW__API__AUTH_BACKEND=airflow.api.auth.backend.basic_auth AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION=false -AIRFLOW__CORE__LOAD_EXAMPLES=false +AIRFLOW__CORE__LOAD_EXAMPLES=true AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS=false AIRFLOW__WEBSERVER__RELOAD_ON_PLUGIN_CHANGE=true AIRFLOW__EMAIL__EMAIL_BACKEND=airflow.providers.sendgrid.utils.emailer.send_email