From 5f98e5247b8daf9eabe525767e9555a6f936cd42 Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sat, 9 Mar 2024 14:04:23 -0500 Subject: [PATCH 1/4] Tech report pipeline (#238) * add new pipeline - tech_report_pipeline.py * add new pipeline - tech_report_deletion.py * update requirements to include firestore * add tests for tech_report_pipeline.py * update constants to include technology pipeline queries --- .../deploy-dataflow-flex-template.yml | 2 +- .gitignore | 1 + modules/constants.py | 309 ++++++++++++++++++ modules/tech_report_deletion.py | 140 ++++++++ modules/tech_report_pipeline.py | 178 ++++++++++ requirements.txt | 3 +- setup.py | 2 +- test/test_tech_report_pipeline.py | 35 ++ 8 files changed, 667 insertions(+), 3 deletions(-) create mode 100644 modules/tech_report_deletion.py create mode 100644 modules/tech_report_pipeline.py create mode 100644 test/test_tech_report_pipeline.py diff --git a/.github/workflows/deploy-dataflow-flex-template.yml b/.github/workflows/deploy-dataflow-flex-template.yml index f08af22..440f4eb 100644 --- a/.github/workflows/deploy-dataflow-flex-template.yml +++ b/.github/workflows/deploy-dataflow-flex-template.yml @@ -10,7 +10,7 @@ on: - "cloudbuild.yaml" - "Dockerfile" - "flex_template_metadata_*.json" - - "requirements.txt" + - "requirements*.txt" jobs: deploy: diff --git a/.gitignore b/.gitignore index c5b2840..6dd1e02 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ *.pyc /.vscode .coverage +.tool-versions # Ignore generated credentials from google-github-actions/auth gha-creds-*.json diff --git a/modules/constants.py b/modules/constants.py index f5d02bc..3678dd6 100644 --- a/modules/constants.py +++ b/modules/constants.py @@ -103,3 +103,312 @@ class MaxContentSize(Enum): # limit response bodies to 20MB RESPONSE_BODIES = 20 * 1000000 + + +TECHNOLOGY_QUERY_ID_KEYS = { + "adoption": ["date", "technology", "geo", "rank"], + "lighthouse": ["date", "technology", "geo", "rank"], + "core_web_vitals": ["date", "technology", "geo", "rank"], + "page_weight": ["date", "technology", "geo", "rank"], + "technologies": ["client", "technology", "category"], + "categories": ["category"], +} +"""Mapping of query types to a list of fields that uniquely identify a row.""" + +# editorconfig-checker-disable +TECHNOLOGY_QUERIES = { + "adoption": """ + CREATE TEMPORARY FUNCTION GET_ADOPTION( + records ARRAY> + ) RETURNS STRUCT< + desktop INT64, + mobile INT64 + > LANGUAGE js AS ''' + return Object.fromEntries(records.map(({{client, origins}}) => {{ + return [client, origins]; + }})); + '''; + + SELECT + STRING(DATE(date)) as date, + app AS technology, + rank, + geo, + GET_ADOPTION(ARRAY_AGG(STRUCT( + client, + origins + ))) AS adoption + FROM + `httparchive.core_web_vitals.technologies` + WHERE date = '{date}' + GROUP BY date, app, rank, geo + """, + "lighthouse": """ + CREATE TEMPORARY FUNCTION GET_LIGHTHOUSE( + records ARRAY> + ) RETURNS ARRAY, + mobile STRUCT< + median_score NUMERIC + > + >> LANGUAGE js AS ''' + const METRIC_MAP = {{ + accessibility: 'median_lighthouse_score_accessibility', + best_practices: 'median_lighthouse_score_best_practices', + performance: 'median_lighthouse_score_performance', + pwa: 'median_lighthouse_score_pwa', + seo: 'median_lighthouse_score_seo', + }}; + + // Initialize the Lighthouse map. + const lighthouse = Object.fromEntries(Object.keys(METRIC_MAP).map(metricName => {{ + return [metricName, {{name: metricName}}]; + }})); + + // Populate each client record. + records.forEach(record => {{ + Object.entries(METRIC_MAP).forEach(([metricName, median_score]) => {{ + lighthouse[metricName][record.client] = {{median_score: record[median_score]}}; + }}); + }}); + + return Object.values(lighthouse); + '''; + + SELECT + STRING(DATE(date)) as date, + app AS technology, + rank, + geo, + GET_LIGHTHOUSE(ARRAY_AGG(STRUCT( + client, + median_lighthouse_score_accessibility, + median_lighthouse_score_best_practices, + median_lighthouse_score_performance, + median_lighthouse_score_pwa, + median_lighthouse_score_seo + + ))) AS lighthouse + FROM + `httparchive.core_web_vitals.technologies` + WHERE date = '{date}' + GROUP BY date, app, rank, geo + """, + "core_web_vitals": """ + CREATE TEMPORARY FUNCTION GET_VITALS( + records ARRAY> + ) RETURNS ARRAY, + mobile STRUCT< + good_number INT64, + tested INT64 + > + >> LANGUAGE js AS ''' + const METRIC_MAP = {{ + overall: ['origins_with_good_cwv', 'origins_eligible_for_cwv'], + LCP: ['origins_with_good_lcp', 'origins_with_any_lcp'], + CLS: ['origins_with_good_cls', 'origins_with_any_cls'], + FID: ['origins_with_good_fid', 'origins_with_any_fid'], + FCP: ['origins_with_good_fcp', 'origins_with_any_fcp'], + TTFB: ['origins_with_good_ttfb', 'origins_with_any_ttfb'], + INP: ['origins_with_good_inp', 'origins_with_any_inp'] + }}; + + // Initialize the vitals map. + const vitals = Object.fromEntries(Object.keys(METRIC_MAP).map(metricName => {{ + return [metricName, {{name: metricName}}]; + }})); + + // Populate each client record. + records.forEach(record => {{ + Object.entries(METRIC_MAP).forEach(([metricName, [good_number, tested]]) => {{ + vitals[metricName][record.client] = {{good_number: record[good_number], tested: record[tested]}}; + }}); + }}); + + return Object.values(vitals); + '''; + + SELECT + STRING(DATE(date)) as date, + app AS technology, + rank, + geo, + GET_VITALS(ARRAY_AGG(STRUCT( + client, + origins_with_good_fid, + origins_with_good_cls, + origins_with_good_lcp, + origins_with_good_fcp, + origins_with_good_ttfb, + origins_with_good_inp, + origins_with_any_fid, + origins_with_any_cls, + origins_with_any_lcp, + origins_with_any_fcp, + origins_with_any_ttfb, + origins_with_any_inp, + origins_with_good_cwv, + origins_eligible_for_cwv + ))) AS vitals + FROM + `httparchive.core_web_vitals.technologies` + WHERE date = '{date}' + GROUP BY date, app, rank, geo + """, + "technologies": """ + SELECT + client, + app AS technology, + description, + category, + SPLIT(category, ",") AS category_obj, + NULL AS similar_technologies, + origins + FROM + `httparchive.core_web_vitals.technologies` + JOIN + `httparchive.core_web_vitals.technology_descriptions` + ON + app = technology + WHERE date = '{date}' AND geo = 'ALL' AND rank = 'ALL' + ORDER BY origins DESC + """, + "page_weight": """ + CREATE TEMPORARY FUNCTION GET_PAGE_WEIGHT( + records ARRAY> + ) RETURNS ARRAY, + desktop STRUCT< + median_bytes INT64 + > + >> LANGUAGE js AS ''' + const METRICS = ['total', 'js', 'images']; + + // Initialize the page weight map. + const pageWeight = Object.fromEntries(METRICS.map(metricName => {{ + return [metricName, {{name: metricName}}]; + }})); + + // Populate each client record. + records.forEach(record => {{ + METRICS.forEach(metricName => {{ + pageWeight[metricName][record.client] = {{median_bytes: record[metricName]}}; + }}); + }}); + + return Object.values(pageWeight); + '''; + + SELECT + STRING(DATE(date)) as date, + app AS technology, + rank, + geo, + GET_PAGE_WEIGHT(ARRAY_AGG(STRUCT( + client, + median_bytes_total, + median_bytes_js, + median_bytes_image + ))) AS pageWeight + FROM + `httparchive.core_web_vitals.technologies` + WHERE date = '{date}' + GROUP BY date, app, rank, geo + """, + "categories": """ + WITH categories AS ( + SELECT + category, + COUNT(DISTINCT root_page) AS origins + FROM + `httparchive.all.pages`, + UNNEST(technologies) AS t, + UNNEST(t.categories) AS category + WHERE + date = '{date}' AND + client = 'mobile' + GROUP BY + category + ), + + technologies AS ( + SELECT + category, + technology, + COUNT(DISTINCT root_page) AS origins + FROM + `httparchive.all.pages`, + UNNEST(technologies) AS t, + UNNEST(t.categories) AS category + WHERE + date = '{date}' AND + client = 'mobile' + GROUP BY + category, + technology + ) + + SELECT + category, + categories.origins, + ARRAY_AGG(technology ORDER BY technologies.origins DESC) AS technologies + FROM + categories + JOIN + technologies + USING + (category) + GROUP BY + category, + categories.origins + ORDER BY + categories.origins DESC + """ +} +"""Mapping of query types to BigQuery SQL queries. + The queries are formatted with the `date` parameter. + Queries containing javascript UDFs require additional curly braces to escape the braces in the UDF. +""" +# editorconfig-checker-enable diff --git a/modules/tech_report_deletion.py b/modules/tech_report_deletion.py new file mode 100644 index 0000000..081adf7 --- /dev/null +++ b/modules/tech_report_deletion.py @@ -0,0 +1,140 @@ +#!/usr/bin/env python3 + +from sys import argv +import apache_beam as beam +from google.cloud import firestore # pylint: disable=import-error +from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions +import logging +import argparse + + +class QueryFirestoreDoFn(beam.DoFn): + def __init__(self, database, project): + self.client = None + self.project = project + self.database = database + + def start_bundle(self): + # initialize client if it doesn't exist and create a collection reference + if self.client is None: + self.client = firestore.Client(project=self.project, database=self.database) + + def process(self, collection_name): + collection_ref = self.client.collection(collection_name) + docs = collection_ref.stream() + + for doc in docs: + yield doc.id + + +class DeleteFromFirestoreDoFn(beam.DoFn): + # Delete a single element from Firestore + def __init__(self, project, database, collection): + self.client = None + self.project = project + self.database = database + self.collection = collection + + def start_bundle(self): + # initialize client if it doesn't exist + if self.client is None: + self.client = firestore.Client(project=self.project, database=self.database) + + def process(self, doc_ids): + collection_ref = self.client.collection(self.collection) + for doc_id in doc_ids: + timestamp = collection_ref.document(doc_id).delete() + yield doc_id, timestamp + + +def parse_arguments(argv): + """Parse command line arguments for the beam pipeline.""" + parser = argparse.ArgumentParser() + + # Firestore project + parser.add_argument( + '--firestore_project', + dest='firestore_project', + default='httparchive', + help='Firestore project', + required=True) + + # Firestore collection + parser.add_argument( + '--firestore_collection', + dest='firestore_collection', + default='lighthouse', + help='Firestore collection', + required=True) + + # Firestore database + parser.add_argument( + '--firestore_database', + dest='firestore_database', + default='(default)', + help='Firestore database', + required=True) + + # Firestore batch timeout + parser.add_argument( + '--batch_timeout', + dest='batch_timeout', + default=14400, + help='Firestore batch timeout', + required=False) + + # Firestore batch size + parser.add_argument( + '--batch_size', + dest='batch_size', + default=2000, + help='Firestore batch size', + required=False) + + # parse arguments + known_args, pipeline_args = parser.parse_known_args(argv) + return known_args, pipeline_args + + +def create_pipeline(argv=None, save_main_session=True): + """Build the pipeline.""" + known_args, pipeline_args = parse_arguments(argv) + + pipeline_options = PipelineOptions(pipeline_args) + + # log pipeline options + logging.info(f"Pipeline Options: {known_args=},{pipeline_args=},{pipeline_options.get_all_options()},") + + # We use the save_main_session option because one or more DoFn's in this + # workflow rely on global context (e.g., a module imported at module level) + pipeline_options.view_as(SetupOptions).save_main_session = save_main_session + + # with beam.Pipeline(options=pipeline_options) as p: + p = beam.Pipeline(options=pipeline_options) + + # Read from BigQuery, convert decimal to float, group into batches, and write to Firestore + ( + p + | 'Create' >> beam.Create([known_args.firestore_collection]) + | 'QueryFirestore' >> beam.ParDo(QueryFirestoreDoFn( + database=known_args.firestore_database, + project=known_args.firestore_project + )) + | 'Batch' >> beam.BatchElements(min_batch_size=known_args.batch_size, max_batch_size=known_args.batch_size) + | 'DeleteFromFirestore' >> beam.ParDo(DeleteFromFirestoreDoFn( + project=known_args.firestore_project, + database=known_args.firestore_database, + collection=known_args.firestore_collection + )) + # | 'Log' >> beam.Map(logging.info) + ) + + return p + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + p = create_pipeline(argv) + logging.debug("Pipeline created") + p.run() + logging.debug("Pipeline run") diff --git a/modules/tech_report_pipeline.py b/modules/tech_report_pipeline.py new file mode 100644 index 0000000..e4cb4df --- /dev/null +++ b/modules/tech_report_pipeline.py @@ -0,0 +1,178 @@ +#!/usr/bin/env python3 + +from decimal import Decimal +import hashlib +import apache_beam as beam +from apache_beam.utils import retry +from google.cloud import firestore # pylint: disable=import-error +from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions +import logging +import argparse +from modules import constants + +# Inspired by https://stackoverflow.com/a/67028348 + + +def technology_hash_id(element: dict, query_type: str, key_map=constants.TECHNOLOGY_QUERY_ID_KEYS): + """Returns a hashed id for a set of technology query keys. Keys are sorted alphabetically and joined with a dash. + The resulting string is hashed using SHA256.""" + if query_type not in key_map: + raise ValueError(f"Invalid query type: {query_type}") + keys = sorted(key_map[query_type]) + if not all(key in element for key in keys): + raise ValueError(f"Missing keys in element {element} for query type {query_type}") + values = [element.get(key) for key in keys] + hash = hashlib.sha256("-".join(values).encode()).hexdigest() + return hash + + +def build_query(query_type, date, queries=constants.TECHNOLOGY_QUERIES): + if query_type not in queries: + raise ValueError(f"Query type {query_type} not found in TECHNOLOGY_QUERIES") + query = queries[query_type] + parameterized_query = query.format(date=date) + logging.info(parameterized_query) + return parameterized_query + + +def convert_decimal_to_float(data): + if isinstance(data, Decimal): + return float(data) + elif isinstance(data, dict): + new_dict = {} + for key, value in data.items(): + new_dict[key] = convert_decimal_to_float(value) + return new_dict + elif isinstance(data, list): + new_list = [] + for item in data: + new_list.append(convert_decimal_to_float(item)) + return new_list + else: + return data + + +class WriteToFirestoreDoFn(beam.DoFn): + """Write a single element to Firestore. Yields the hash id of the document. + Retry on failure using exponential backoff, see :func:`apache_beam.utils.retry.with_exponential_backoff`.""" + def __init__(self, project, database, collection, query_type): + self.client = None + self.project = project + self.database = database + self.collection = collection + self.query_type = query_type + + def start_bundle(self): + # initialize client if it doesn't exist and create a collection reference + if self.client is None: + self.client = firestore.Client(project=self.project, database=self.database) + self.collection_ref = self.client.collection(self.collection) + + def process(self, element): + # creates a hash id for the document + hash_id = technology_hash_id(element, self.query_type) + self._add_record(hash_id, element) + yield hash_id, element + + @retry.with_exponential_backoff(retry_filter=lambda ex: isinstance(ex, Exception)) + def _add_record(self, id, data): + """Helper function to add a record to Firestore. Retries on any `Exception`.""" + doc_ref = self.collection_ref.document(id) + doc_ref.set(data) + + +class TechReportPipelineOptions(PipelineOptions): + @classmethod + def _add_argparse_args(cls, parser): + # Query type + parser.add_argument( + '--query_type', + dest='query_type', + help='Query type', + required=False, # should be true + choices=constants.TECHNOLOGY_QUERIES.keys()) + + # Firestore project + parser.add_argument( + '--firestore_project', + dest='firestore_project', + default='httparchive', + help='Firestore project', + required=False, # should be `True` but fails since beam expects all pipelines to have the same options + ) + + # Firestore collection + parser.add_argument( + '--firestore_collection', + dest='firestore_collection', + help='Firestore collection', + required=False, # should be `True` but fails since beam expects all pipelines to have the same options + ) + + # Firestore database + parser.add_argument( + '--firestore_database', + dest='firestore_database', + default='(default)', + help='Firestore database', + required=False, # should be `True` but fails since beam expects all pipelines to have the same options + ) + + # date + parser.add_argument( + '--date', + dest='date', + help='Date', + required=False) + + +def parse_args(): + parser = argparse.ArgumentParser() + known_args, beam_args = parser.parse_known_args() + + # Create and set your Pipeline Options. + beam_options = PipelineOptions(beam_args) + known_args = beam_options.view_as(TechReportPipelineOptions) + return known_args, beam_options + + +def create_pipeline(save_main_session=True): + """Build the pipeline.""" + known_args, beam_options = parse_args() + + query = build_query(known_args.query_type, known_args.date) + + logging.info(f"Pipeline options: {beam_options.get_all_options()}") + + # We use the save_main_session option because one or more DoFn's in this + # workflow rely on global context (e.g., a module imported at module level) + beam_options.view_as(SetupOptions).save_main_session = save_main_session + + p = beam.Pipeline(options=beam_options) + + # Read from BigQuery, convert decimal to float, group into batches, and write to Firestore + firestore_ids = ( + p + | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(query=query, use_standard_sql=True) + | 'ConvertDecimalToFloat' >> beam.Map(convert_decimal_to_float) + | 'WriteToFirestore' >> beam.ParDo(WriteToFirestoreDoFn( + project=known_args.firestore_project, + database=known_args.firestore_database, + collection=known_args.firestore_collection, + query_type=known_args.query_type + )) + ) + + # if logging level is DEBUG, log results + if logging.getLogger().getEffectiveLevel() == logging.DEBUG: + firestore_ids | 'LogResults' >> beam.Map(logging.debug) + + return p + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + p = create_pipeline() + logging.debug("Pipeline created") + p.run() + logging.debug("Pipeline run") diff --git a/requirements.txt b/requirements.txt index 6d79a5e..ccc7593 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ -apache-beam[gcp]==2.54.0 +apache-beam[gcp]==2.51.0 +google-cloud-firestore==2.14.0 black==24.2.0 coverage>=6.4.4 diff --git a/setup.py b/setup.py index bcb565a..2024df5 100644 --- a/setup.py +++ b/setup.py @@ -4,6 +4,6 @@ name="data-pipeline", version="0.0.1", packages=setuptools.find_packages(), - install_requires=["apache-beam[gcp]==2.54.0"], + install_requires=["apache-beam[gcp]==2.51.0", "google-cloud-firestore==2.14.0"], package_data={"schema": ["*.json"]}, ) diff --git a/test/test_tech_report_pipeline.py b/test/test_tech_report_pipeline.py new file mode 100644 index 0000000..58fa2c8 --- /dev/null +++ b/test/test_tech_report_pipeline.py @@ -0,0 +1,35 @@ +import unittest +import hashlib +from unittest.mock import patch +from modules.tech_report_pipeline import technology_hash_id +from modules import constants + + +class TestTechnologyHashId(unittest.TestCase): + @patch('modules.tech_report_pipeline.constants') + def test_technology_hash_id_valid(self, patched_constants): + # Test with valid input + element = {'key1': 'value1', 'key2': 'value2'} + query_type = 'valid_query_type' + patched_constants.TECHNOLOGY_QUERY_ID_KEYS = {query_type: ['key1', 'key2']} + expected_hash = hashlib.sha256('value1-value2'.encode()).hexdigest() + self.assertEqual(technology_hash_id(element, query_type), expected_hash) + + def test_technology_hash_id_invalid_query_type(self): + # Test with invalid query type + element = {'key1': 'value1', 'key2': 'value2'} + query_type = 'invalid_query_type' + with self.assertRaises(ValueError): + technology_hash_id(element, query_type) + + def test_technology_hash_id_missing_key(self): + # Test with missing key in element + element = {'key1': 'value1'} + query_type = 'valid_query_type' + constants.TECHNOLOGY_QUERY_ID_KEYS[query_type] = ['key1', 'key2'] + with self.assertRaises(ValueError): + technology_hash_id(element, query_type) + + +if __name__ == '__main__': + unittest.main() From da00479a0836745f37bca6406f600e1500547233 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Sun, 10 Mar 2024 20:21:29 -0400 Subject: [PATCH 2/4] Bump dataflow flex template build tag (#248) Co-authored-by: giancarloaf <14220134+giancarloaf@users.noreply.github.com> --- data-pipeline.workflows.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-pipeline.workflows.yaml b/data-pipeline.workflows.yaml index 34fad0f..0a4e3a0 100644 --- a/data-pipeline.workflows.yaml +++ b/data-pipeline.workflows.yaml @@ -22,7 +22,7 @@ main: - project: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")} - region: "us-west1" - flexTemplateRepo: "data-pipeline" - - flexTemplateBuildTag: "2024-03-01_07-51-29" + - flexTemplateBuildTag: "2024-03-09_19-04-52" - flexTemplateBasePath: ${"gs://" + project + "/dataflow/templates/" + flexTemplateRepo} - flexTemplateTemp: ${"gs://" + project + "-staging/dataflow"} From 45fe57f073f5d2359f35ebdc0032815a1db30bd5 Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sun, 10 Mar 2024 21:04:16 -0400 Subject: [PATCH 3/4] Bump apache-beam[gcp] from 2.52.0 to 2.54.0 (#249) --- requirements.txt | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/requirements.txt b/requirements.txt index ccc7593..d0b4bc7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -apache-beam[gcp]==2.51.0 +apache-beam[gcp]==2.54.0 google-cloud-firestore==2.14.0 black==24.2.0 coverage>=6.4.4 diff --git a/setup.py b/setup.py index 2024df5..6038a65 100644 --- a/setup.py +++ b/setup.py @@ -4,6 +4,6 @@ name="data-pipeline", version="0.0.1", packages=setuptools.find_packages(), - install_requires=["apache-beam[gcp]==2.51.0", "google-cloud-firestore==2.14.0"], + install_requires=["apache-beam[gcp]==2.54.0", "google-cloud-firestore==2.14.0"], package_data={"schema": ["*.json"]}, ) From de42ed9b953b2199f5d28e07cbe57296b52a0658 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Sun, 10 Mar 2024 21:14:33 -0400 Subject: [PATCH 4/4] Bump dataflow flex template build tag (#250) Co-authored-by: giancarloaf <14220134+giancarloaf@users.noreply.github.com> --- data-pipeline.workflows.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-pipeline.workflows.yaml b/data-pipeline.workflows.yaml index 0a4e3a0..f9d046a 100644 --- a/data-pipeline.workflows.yaml +++ b/data-pipeline.workflows.yaml @@ -22,7 +22,7 @@ main: - project: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")} - region: "us-west1" - flexTemplateRepo: "data-pipeline" - - flexTemplateBuildTag: "2024-03-09_19-04-52" + - flexTemplateBuildTag: "2024-03-11_01-04-46" - flexTemplateBasePath: ${"gs://" + project + "/dataflow/templates/" + flexTemplateRepo} - flexTemplateTemp: ${"gs://" + project + "-staging/dataflow"}