From e1eb9a39ab601b9bf1ed09dd0790ffb299ce3b13 Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sun, 12 Nov 2023 04:17:32 +0000 Subject: [PATCH 01/51] New tech_report_pipeline --- modules/tech_report_pipeline.py | 227 ++++++++++++++++++++++++++++++++ requirements.dev.txt | 2 + requirements.tech-report.txt | 2 + requirements.txt | 2 - setup.py | 2 +- 5 files changed, 232 insertions(+), 3 deletions(-) create mode 100644 modules/tech_report_pipeline.py create mode 100644 requirements.dev.txt create mode 100644 requirements.tech-report.txt diff --git a/modules/tech_report_pipeline.py b/modules/tech_report_pipeline.py new file mode 100644 index 0000000..cb4359f --- /dev/null +++ b/modules/tech_report_pipeline.py @@ -0,0 +1,227 @@ +#!/usr/bin/env python3 + +from decimal import Decimal +from sys import argv +import uuid +import apache_beam as beam +from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner +from google.cloud import firestore +from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions +import logging +import argparse + +# Inspired by https://stackoverflow.com/a/67028348 + + +DEFAULT_QUERY = """ + CREATE TEMPORARY FUNCTION GET_LIGHTHOUSE( + records ARRAY> + ) RETURNS ARRAY, + mobile STRUCT< + median_score NUMERIC + > + >> LANGUAGE js AS ''' + const METRIC_MAP = { + accessibility: 'median_lighthouse_score_accessibility', + best_practices: 'median_lighthouse_score_best_practices', + performance: 'median_lighthouse_score_performance', + pwa: 'median_lighthouse_score_pwa', + seo: 'median_lighthouse_score_seo', + }; + + // Initialize the Lighthouse map. + const lighthouse = Object.fromEntries(Object.keys(METRIC_MAP).map(metricName => { + return [metricName, {name: metricName}]; + })); + + // Populate each client record. + records.forEach(record => { + Object.entries(METRIC_MAP).forEach(([metricName, median_score]) => { + lighthouse[metricName][record.client] = {median_score: record[median_score]}; + }); + }); + + return Object.values(lighthouse); + '''; + + SELECT + STRING(DATE(date)) as date, + app AS technology, + rank, + geo, + GET_LIGHTHOUSE(ARRAY_AGG(STRUCT( + client, + median_lighthouse_score_accessibility, + median_lighthouse_score_best_practices, + median_lighthouse_score_performance, + median_lighthouse_score_pwa, + median_lighthouse_score_seo + + ))) AS lighthouse + FROM + `httparchive.core_web_vitals.technologies` + + """ + + +def convert_decimal_to_float(data): + if isinstance(data, Decimal): + return float(data) + elif isinstance(data, dict): + new_dict = {} + for key, value in data.items(): + new_dict[key] = convert_decimal_to_float(value) + return new_dict + elif isinstance(data, list): + new_list = [] + for item in data: + new_list.append(convert_decimal_to_float(item)) + return new_list + else: + return data + + +class WriteToFirestoreBatchedDoFn(beam.DoFn): + """Write a batch of elements to Firestore.""" + def __init__(self, project, collection, batch_timeout=14400): + self.client = None + self.project = project + self.collection = collection + self.batch_timeout = batch_timeout + + def start_bundle(self): + # initialize client if it doesn't exist and create a collection reference + if self.client is None: + self.client = firestore.Client(project=self.project) + self.collection_ref = self.client.collection(self.collection) + + # create a batch + self.batch = self.client.batch() + + def process(self, elements): + for element in elements: + doc_ref = self.collection_ref.document(uuid.uuid4().hex) + self.batch.set(doc_ref, element) + + # commit the batch with a timeout + self.batch.commit(timeout=self.batch_timeout) + + +class WriteToFirestoreDoFn(beam.DoFn): + """Write a single element to Firestore.""" + def __init__(self, project, collection): + self.client = None + self.project = project + self.collection = collection + + def start_bundle(self): + # initialize client if it doesn't exist + if self.client is None: + self.client = firestore.Client(project=self.project) + + def process(self, element): + self.client.write_data(element) + + +def parse_arguments(argv): + """Parse command line arguments for the beam pipeline.""" + parser = argparse.ArgumentParser() + + # Firestore project + parser.add_argument( + '--firestore_project', + dest='firestore_project', + default='httparchive', + help='Firestore project', + required=True) + + # Firestore collection + parser.add_argument( + '--firestore_collection', + dest='firestore_collection', + default='lighthouse', + help='Firestore collection', + required=True) + + # start date, optional + parser.add_argument( + '--start_date', + dest='start_date', + help='Start date', + required=False) + + # end date, optional + parser.add_argument( + '--end_date', + dest='end_date', + help='End date', + required=False) + + # parse arguments + known_args, pipeline_args = parser.parse_known_args(argv) + return known_args, pipeline_args + + +def create_pipeline(argv=None, save_main_session=True): + """Build the pipeline.""" + known_args, pipeline_args = parse_arguments(argv) + + # add dates to query + if known_args.start_date and not known_args.end_date: + query = f"{DEFAULT_QUERY} WHERE date >= '{known_args.start_date}'" + elif not known_args.start_date and known_args.end_date: + query = f"{DEFAULT_QUERY} WHERE date <= '{known_args.end_date}'" + elif known_args.start_date and known_args.end_date: + query = f"{DEFAULT_QUERY} WHERE date BETWEEN '{known_args.start_date}' AND '{known_args.end_date}'" + else: + query = DEFAULT_QUERY + + # add group by to query + query = f"{query} GROUP BY date, app, rank, geo" + + # testing query + # query = "SELECT 1 AS test, 2 AS test2" + logging.info(query) + + pipeline_options = PipelineOptions(pipeline_args) + + # We use the save_main_session option because one or more DoFn's in this + # workflow rely on global context (e.g., a module imported at module level) + pipeline_options.view_as(SetupOptions).save_main_session = save_main_session + + # with beam.Pipeline(options=pipeline_options) as p: + p = beam.Pipeline(options=pipeline_options) + + # Read from BigQuery, convert decimal to float, group into batches, and write to Firestore + (p + | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(query=query, use_standard_sql=True) + | 'ConvertDecimalToFloat' >> beam.Map(convert_decimal_to_float) + | 'GroupIntoBatches' >> beam.BatchElements(min_batch_size=50, max_batch_size=50) + | 'WriteToFirestoreBatched' >> beam.ParDo(WriteToFirestoreBatchedDoFn( + project=known_args.firestore_project, + collection=known_args.firestore_collection + )) + ) + + return p + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + p = create_pipeline(argv) + logging.debug("Pipeline created") + result = p.run() + logging.debug("Pipeline run") + # if not isinstance(p.runner, DataflowRunner): + # result.wait_until_finish() diff --git a/requirements.dev.txt b/requirements.dev.txt new file mode 100644 index 0000000..13a168f --- /dev/null +++ b/requirements.dev.txt @@ -0,0 +1,2 @@ +black==23.7.0 +coverage>=6.4.4 diff --git a/requirements.tech-report.txt b/requirements.tech-report.txt new file mode 100644 index 0000000..ce0cb18 --- /dev/null +++ b/requirements.tech-report.txt @@ -0,0 +1,2 @@ +apache-beam[gcp]==2.43.0 +google-cloud-firestore==2.13.0 diff --git a/requirements.txt b/requirements.txt index 52178b6..603e519 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1 @@ apache-beam[gcp]==2.43.0 -black==23.7.0 -coverage>=6.4.4 diff --git a/setup.py b/setup.py index cc077d6..d3311c6 100644 --- a/setup.py +++ b/setup.py @@ -4,6 +4,6 @@ name="data-pipeline", version="0.0.1", packages=setuptools.find_packages(), - install_requires=["apache-beam[gcp]==2.43.0"], + install_requires=["apache-beam[gcp]==2.43.0", "google-cloud-firestore==2.13.0"], package_data={"schema": ["*.json"]}, ) From 449231bfd9d13ad8a75ee95db09885ffae6bfcbf Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sun, 12 Nov 2023 04:20:49 +0000 Subject: [PATCH 02/51] clean up testing code --- modules/tech_report_pipeline.py | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/modules/tech_report_pipeline.py b/modules/tech_report_pipeline.py index cb4359f..11b304f 100644 --- a/modules/tech_report_pipeline.py +++ b/modules/tech_report_pipeline.py @@ -118,20 +118,21 @@ def process(self, elements): self.batch.commit(timeout=self.batch_timeout) -class WriteToFirestoreDoFn(beam.DoFn): - """Write a single element to Firestore.""" - def __init__(self, project, collection): - self.client = None - self.project = project - self.collection = collection +# commented out for testing later - need to compare performance between batched and individual writes +# class WriteToFirestoreDoFn(beam.DoFn): +# """Write a single element to Firestore.""" +# def __init__(self, project, collection): +# self.client = None +# self.project = project +# self.collection = collection - def start_bundle(self): - # initialize client if it doesn't exist - if self.client is None: - self.client = firestore.Client(project=self.project) +# def start_bundle(self): +# # initialize client if it doesn't exist +# if self.client is None: +# self.client = firestore.Client(project=self.project) - def process(self, element): - self.client.write_data(element) +# def process(self, element): +# self.client.write_data(element) def parse_arguments(argv): @@ -190,8 +191,6 @@ def create_pipeline(argv=None, save_main_session=True): # add group by to query query = f"{query} GROUP BY date, app, rank, geo" - # testing query - # query = "SELECT 1 AS test, 2 AS test2" logging.info(query) pipeline_options = PipelineOptions(pipeline_args) @@ -223,5 +222,7 @@ def create_pipeline(argv=None, save_main_session=True): logging.debug("Pipeline created") result = p.run() logging.debug("Pipeline run") + + # commented out for local testing # if not isinstance(p.runner, DataflowRunner): # result.wait_until_finish() From 685116e03b1b548bd09bccef1ebb9fb3950796aa Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Mon, 13 Nov 2023 22:15:52 +0000 Subject: [PATCH 03/51] update to make queries more flexible --- modules/constants.py | 89 +++++++++++++++++++++++ modules/tech_report_pipeline.py | 121 ++++++++++++-------------------- 2 files changed, 134 insertions(+), 76 deletions(-) diff --git a/modules/constants.py b/modules/constants.py index f5d02bc..4578b24 100644 --- a/modules/constants.py +++ b/modules/constants.py @@ -103,3 +103,92 @@ class MaxContentSize(Enum): # limit response bodies to 20MB RESPONSE_BODIES = 20 * 1000000 + +TECHNOLOGY_QUERIES = { + "adoption": """ + CREATE TEMPORARY FUNCTION GET_ADOPTION( + records ARRAY> + ) RETURNS STRUCT< + desktop INT64, + mobile INT64 + > LANGUAGE js AS ''' + return Object.fromEntries(records.map(({client, origins}) => { + return [client, origins]; + })); + '''; + + SELECT + STRING(DATE(date)) as date, + app AS technology, + rank, + geo, + GET_ADOPTION(ARRAY_AGG(STRUCT( + client, + origins + ))) AS adoption + FROM + `httparchive.core_web_vitals.technologies` + """, + "lighthouse": """ + CREATE TEMPORARY FUNCTION GET_LIGHTHOUSE( + records ARRAY> + ) RETURNS ARRAY, + mobile STRUCT< + median_score NUMERIC + > + >> LANGUAGE js AS ''' + const METRIC_MAP = { + accessibility: 'median_lighthouse_score_accessibility', + best_practices: 'median_lighthouse_score_best_practices', + performance: 'median_lighthouse_score_performance', + pwa: 'median_lighthouse_score_pwa', + seo: 'median_lighthouse_score_seo', + }; + + // Initialize the Lighthouse map. + const lighthouse = Object.fromEntries(Object.keys(METRIC_MAP).map(metricName => { + return [metricName, {name: metricName}]; + })); + + // Populate each client record. + records.forEach(record => { + Object.entries(METRIC_MAP).forEach(([metricName, median_score]) => { + lighthouse[metricName][record.client] = {median_score: record[median_score]}; + }); + }); + + return Object.values(lighthouse); + '''; + + SELECT + STRING(DATE(date)) as date, + app AS technology, + rank, + geo, + GET_LIGHTHOUSE(ARRAY_AGG(STRUCT( + client, + median_lighthouse_score_accessibility, + median_lighthouse_score_best_practices, + median_lighthouse_score_performance, + median_lighthouse_score_pwa, + median_lighthouse_score_seo + + ))) AS lighthouse + FROM + `httparchive.core_web_vitals.technologies` + """ +} \ No newline at end of file diff --git a/modules/tech_report_pipeline.py b/modules/tech_report_pipeline.py index 11b304f..ec0d4a9 100644 --- a/modules/tech_report_pipeline.py +++ b/modules/tech_report_pipeline.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 from decimal import Decimal +from distutils.command import build from sys import argv import uuid import apache_beam as beam @@ -9,70 +10,33 @@ from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions import logging import argparse +from constants import TECHNOLOGY_QUERIES # Inspired by https://stackoverflow.com/a/67028348 -DEFAULT_QUERY = """ - CREATE TEMPORARY FUNCTION GET_LIGHTHOUSE( - records ARRAY> - ) RETURNS ARRAY, - mobile STRUCT< - median_score NUMERIC - > - >> LANGUAGE js AS ''' - const METRIC_MAP = { - accessibility: 'median_lighthouse_score_accessibility', - best_practices: 'median_lighthouse_score_best_practices', - performance: 'median_lighthouse_score_performance', - pwa: 'median_lighthouse_score_pwa', - seo: 'median_lighthouse_score_seo', - }; - - // Initialize the Lighthouse map. - const lighthouse = Object.fromEntries(Object.keys(METRIC_MAP).map(metricName => { - return [metricName, {name: metricName}]; - })); - - // Populate each client record. - records.forEach(record => { - Object.entries(METRIC_MAP).forEach(([metricName, median_score]) => { - lighthouse[metricName][record.client] = {median_score: record[median_score]}; - }); - }); - - return Object.values(lighthouse); - '''; - - SELECT - STRING(DATE(date)) as date, - app AS technology, - rank, - geo, - GET_LIGHTHOUSE(ARRAY_AGG(STRUCT( - client, - median_lighthouse_score_accessibility, - median_lighthouse_score_best_practices, - median_lighthouse_score_performance, - median_lighthouse_score_pwa, - median_lighthouse_score_seo - - ))) AS lighthouse - FROM - `httparchive.core_web_vitals.technologies` - - """ +def buildQuery(start_date, end_date, query_type): + if query_type not in TECHNOLOGY_QUERIES: + raise ValueError(f"Query type {query_type} not found in TECHNOLOGY_QUERIES") + + query = TECHNOLOGY_QUERIES[query_type] + + # add dates to query + if start_date and not end_date: + query = f"{query} WHERE date >= '{start_date}'" + elif not start_date and end_date: + query = f"{query} WHERE date <= '{end_date}'" + elif start_date and end_date: + query = f"{query} WHERE date BETWEEN '{start_date}' AND '{end_date}'" + else: + query = query + + # add group by to query + query = f"{query} GROUP BY date, app, rank, geo" + + logging.info(query) + + return query def convert_decimal_to_float(data): @@ -94,8 +58,9 @@ def convert_decimal_to_float(data): class WriteToFirestoreBatchedDoFn(beam.DoFn): """Write a batch of elements to Firestore.""" - def __init__(self, project, collection, batch_timeout=14400): + def __init__(self, database, project, collection, batch_timeout=14400): self.client = None + self.database = database self.project = project self.collection = collection self.batch_timeout = batch_timeout @@ -103,7 +68,7 @@ def __init__(self, project, collection, batch_timeout=14400): def start_bundle(self): # initialize client if it doesn't exist and create a collection reference if self.client is None: - self.client = firestore.Client(project=self.project) + self.client = firestore.Client(project=self.project, database=self.database) self.collection_ref = self.client.collection(self.collection) # create a batch @@ -139,6 +104,14 @@ def parse_arguments(argv): """Parse command line arguments for the beam pipeline.""" parser = argparse.ArgumentParser() + # Query type + parser.add_argument( + '--query_type', + dest='query_type', + help='Query type', + required=True, + choices=TECHNOLOGY_QUERIES.keys()) + # Firestore project parser.add_argument( '--firestore_project', @@ -155,6 +128,14 @@ def parse_arguments(argv): help='Firestore collection', required=True) + # Firestore database + parser.add_argument( + '--firestore_database', + dest='firestore_database', + default='(default)', + help='Firestore database', + required=True) + # start date, optional parser.add_argument( '--start_date', @@ -178,20 +159,7 @@ def create_pipeline(argv=None, save_main_session=True): """Build the pipeline.""" known_args, pipeline_args = parse_arguments(argv) - # add dates to query - if known_args.start_date and not known_args.end_date: - query = f"{DEFAULT_QUERY} WHERE date >= '{known_args.start_date}'" - elif not known_args.start_date and known_args.end_date: - query = f"{DEFAULT_QUERY} WHERE date <= '{known_args.end_date}'" - elif known_args.start_date and known_args.end_date: - query = f"{DEFAULT_QUERY} WHERE date BETWEEN '{known_args.start_date}' AND '{known_args.end_date}'" - else: - query = DEFAULT_QUERY - - # add group by to query - query = f"{query} GROUP BY date, app, rank, geo" - - logging.info(query) + query = buildQuery(known_args.start_date, known_args.end_date, known_args.query_type) pipeline_options = PipelineOptions(pipeline_args) @@ -208,6 +176,7 @@ def create_pipeline(argv=None, save_main_session=True): | 'ConvertDecimalToFloat' >> beam.Map(convert_decimal_to_float) | 'GroupIntoBatches' >> beam.BatchElements(min_batch_size=50, max_batch_size=50) | 'WriteToFirestoreBatched' >> beam.ParDo(WriteToFirestoreBatchedDoFn( + database=known_args.firestore_database, project=known_args.firestore_project, collection=known_args.firestore_collection )) From f49c940cd98e04851c2204b6172a840f0c3e8e86 Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Tue, 14 Nov 2023 02:16:28 +0000 Subject: [PATCH 04/51] add tech_report_deletion.py --- modules/tech_report_deletion.py | 184 ++++++++++++++++++++++++++++++++ 1 file changed, 184 insertions(+) create mode 100644 modules/tech_report_deletion.py diff --git a/modules/tech_report_deletion.py b/modules/tech_report_deletion.py new file mode 100644 index 0000000..517dbf6 --- /dev/null +++ b/modules/tech_report_deletion.py @@ -0,0 +1,184 @@ +#!/usr/bin/env python3 + +from decimal import Decimal +from distutils.command import build +from sys import argv +import uuid +import apache_beam as beam +from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner +from google.cloud import firestore +from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions +import logging +import argparse + + +class QueryFirestoreDoFn(beam.DoFn): + def __init__(self, database, project): + self.client = None + self.project = project + self.database = database + + def start_bundle(self): + # initialize client if it doesn't exist and create a collection reference + if self.client is None: + self.client = firestore.Client(project=self.project, database=self.database) + + def process(self, collection_name): + collection_ref = self.client.collection(collection_name) + docs = collection_ref.stream() + + for doc in docs: + yield doc.id + + +class DeleteFromFirestoreBatchedDoFn(beam.DoFn): + """Write a batch of elements to Firestore.""" + def __init__(self, database, project, collection, batch_timeout=14400, batch_size=2000): + self.client = None + self.database = database + self.project = project + self.collection = collection + self.batch_timeout = batch_timeout + self.batch_size = batch_size + + def start_bundle(self): + # initialize client if it doesn't exist and create a collection reference + if self.client is None: + self.client = firestore.Client(project=self.project, database=self.database) + self.collection_ref = self.client.collection(self.collection) + + # create a batch + self.batch = self.client.batch() + + def process(self, elements): + count = 0 + + for doc in elements: + self.batch.delete(doc.reference) + count += 1 + + # Commit the batch every N documents + if count % self.batch_size == 0: + self.batch.commit() + self.batch = self.client.batch() + + # Commit the last batch + if count % self.batch_size != 0: + self.batch.commit() + logging.info("Bach deleted") + + +class DeleteFromFirestoreDoFn(beam.DoFn): + # Delete a single element from Firestore + def __init__(self, project, database, collection): + self.client = None + self.project = project + self.database = database + self.collection = collection + + def start_bundle(self): + # initialize client if it doesn't exist + if self.client is None: + self.client = firestore.Client(project=self.project, database=self.database) + + def process(self, doc_ids): + collection_ref = self.client.collection(self.collection) + for doc_id in doc_ids: + timestamp = collection_ref.document(doc_id).delete() + yield doc_id, timestamp + + +def parse_arguments(argv): + """Parse command line arguments for the beam pipeline.""" + parser = argparse.ArgumentParser() + + # Firestore project + parser.add_argument( + '--firestore_project', + dest='firestore_project', + default='httparchive', + help='Firestore project', + required=True) + + # Firestore collection + parser.add_argument( + '--firestore_collection', + dest='firestore_collection', + default='lighthouse', + help='Firestore collection', + required=True) + + # Firestore database + parser.add_argument( + '--firestore_database', + dest='firestore_database', + default='(default)', + help='Firestore database', + required=True) + + # Firestore batch timeout + parser.add_argument( + '--batch_timeout', + dest='batch_timeout', + default=14400, + help='Firestore batch timeout', + required=False) + + # Firestore batch size + parser.add_argument( + '--batch_size', + dest='batch_size', + default=2000, + help='Firestore batch size', + required=False) + + # parse arguments + known_args, pipeline_args = parser.parse_known_args(argv) + return known_args, pipeline_args + + +def create_pipeline(argv=None, save_main_session=True): + """Build the pipeline.""" + known_args, pipeline_args = parse_arguments(argv) + + pipeline_options = PipelineOptions(pipeline_args) + + # log pipeline options + logging.info(f"Pipeline Options: {known_args=},{pipeline_args=},{pipeline_options.get_all_options()},") + + # We use the save_main_session option because one or more DoFn's in this + # workflow rely on global context (e.g., a module imported at module level) + pipeline_options.view_as(SetupOptions).save_main_session = save_main_session + + # with beam.Pipeline(options=pipeline_options) as p: + p = beam.Pipeline(options=pipeline_options) + + # Read from BigQuery, convert decimal to float, group into batches, and write to Firestore + (p + | 'Create' >> beam.Create([known_args.firestore_collection]) + | 'QueryFirestoreDoFn' >> beam.ParDo(QueryFirestoreDoFn( + database=known_args.firestore_database, + project=known_args.firestore_project + )) + | 'Batch' >> beam.BatchElements(min_batch_size=known_args.batch_size, max_batch_size=known_args.batch_size) + | 'DeleteFromFirestore' >> beam.ParDo(DeleteFromFirestoreDoFn( + project=known_args.firestore_project, + database=known_args.firestore_database, + collection=known_args.firestore_collection + )) + # | 'Log' >> beam.Map(logging.info) + ) + + return p + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + p = create_pipeline(argv) + logging.debug("Pipeline created") + result = p.run() + logging.debug("Pipeline run") + + # commented out for local testing + # if not isinstance(p.runner, DataflowRunner): + # result.wait_until_finish() \ No newline at end of file From a47904503730f98f3ce8b23d6285318bc24dd908 Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Tue, 14 Nov 2023 02:25:37 +0000 Subject: [PATCH 05/51] cleanup tech_report_deletion.py --- modules/tech_report_deletion.py | 43 +-------------------------------- 1 file changed, 1 insertion(+), 42 deletions(-) diff --git a/modules/tech_report_deletion.py b/modules/tech_report_deletion.py index 517dbf6..dfcc41f 100644 --- a/modules/tech_report_deletion.py +++ b/modules/tech_report_deletion.py @@ -1,11 +1,7 @@ #!/usr/bin/env python3 -from decimal import Decimal -from distutils.command import build from sys import argv -import uuid import apache_beam as beam -from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner from google.cloud import firestore from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions import logging @@ -31,43 +27,6 @@ def process(self, collection_name): yield doc.id -class DeleteFromFirestoreBatchedDoFn(beam.DoFn): - """Write a batch of elements to Firestore.""" - def __init__(self, database, project, collection, batch_timeout=14400, batch_size=2000): - self.client = None - self.database = database - self.project = project - self.collection = collection - self.batch_timeout = batch_timeout - self.batch_size = batch_size - - def start_bundle(self): - # initialize client if it doesn't exist and create a collection reference - if self.client is None: - self.client = firestore.Client(project=self.project, database=self.database) - self.collection_ref = self.client.collection(self.collection) - - # create a batch - self.batch = self.client.batch() - - def process(self, elements): - count = 0 - - for doc in elements: - self.batch.delete(doc.reference) - count += 1 - - # Commit the batch every N documents - if count % self.batch_size == 0: - self.batch.commit() - self.batch = self.client.batch() - - # Commit the last batch - if count % self.batch_size != 0: - self.batch.commit() - logging.info("Bach deleted") - - class DeleteFromFirestoreDoFn(beam.DoFn): # Delete a single element from Firestore def __init__(self, project, database, collection): @@ -156,7 +115,7 @@ def create_pipeline(argv=None, save_main_session=True): # Read from BigQuery, convert decimal to float, group into batches, and write to Firestore (p | 'Create' >> beam.Create([known_args.firestore_collection]) - | 'QueryFirestoreDoFn' >> beam.ParDo(QueryFirestoreDoFn( + | 'QueryFirestore' >> beam.ParDo(QueryFirestoreDoFn( database=known_args.firestore_database, project=known_args.firestore_project )) From 88a2fff49db5fd4c86f61210a618c90d10328d1a Mon Sep 17 00:00:00 2001 From: Martin Aceto Date: Thu, 16 Nov 2023 21:32:18 -0500 Subject: [PATCH 06/51] downgrade firestore version to solve dependency conflict --- requirements.tech-report.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.tech-report.txt b/requirements.tech-report.txt index ce0cb18..24c1665 100644 --- a/requirements.tech-report.txt +++ b/requirements.tech-report.txt @@ -1,2 +1,2 @@ apache-beam[gcp]==2.43.0 -google-cloud-firestore==2.13.0 +google-cloud-firestore==2.0.0 From f7bebd1833aae5e09bf03b9cbf85f5519d92725c Mon Sep 17 00:00:00 2001 From: Martin Aceto Date: Thu, 16 Nov 2023 21:32:43 -0500 Subject: [PATCH 07/51] new pipeline where id is generated --- .gitignore | 1 + modules/tech_report_pipeline_inserts.py | 443 ++++++++++++++++++++++++ 2 files changed, 444 insertions(+) create mode 100644 modules/tech_report_pipeline_inserts.py diff --git a/.gitignore b/.gitignore index c5b2840..6dd1e02 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ *.pyc /.vscode .coverage +.tool-versions # Ignore generated credentials from google-github-actions/auth gha-creds-*.json diff --git a/modules/tech_report_pipeline_inserts.py b/modules/tech_report_pipeline_inserts.py new file mode 100644 index 0000000..5ef33b0 --- /dev/null +++ b/modules/tech_report_pipeline_inserts.py @@ -0,0 +1,443 @@ +#!/usr/bin/env python3 + +from decimal import Decimal +from distutils.command import build +from sys import argv +import uuid +import apache_beam as beam +from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner +from google.cloud import firestore +from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions +import logging +import argparse +import hashlib +#from constants import TECHNOLOGY_QUERIES + +# Inspired by https://stackoverflow.com/a/67028348 + +TECHNOLOGY_QUERIES = { + "adoption": """ + CREATE TEMPORARY FUNCTION GET_ADOPTION( + records ARRAY> + ) RETURNS STRUCT< + desktop INT64, + mobile INT64 + > LANGUAGE js AS ''' + return Object.fromEntries(records.map(({client, origins}) => { + return [client, origins]; + })); + '''; + + SELECT + STRING(DATE(date)) as date, + app AS technology, + rank, + geo, + GET_ADOPTION(ARRAY_AGG(STRUCT( + client, + origins + ))) AS adoption + FROM + `httparchive.core_web_vitals.technologies` + """, + "lighthouse": """ + CREATE TEMPORARY FUNCTION GET_LIGHTHOUSE( + records ARRAY> + ) RETURNS ARRAY, + mobile STRUCT< + median_score NUMERIC + > + >> LANGUAGE js AS ''' + const METRIC_MAP = { + accessibility: 'median_lighthouse_score_accessibility', + best_practices: 'median_lighthouse_score_best_practices', + performance: 'median_lighthouse_score_performance', + pwa: 'median_lighthouse_score_pwa', + seo: 'median_lighthouse_score_seo', + }; + + // Initialize the Lighthouse map. + const lighthouse = Object.fromEntries(Object.keys(METRIC_MAP).map(metricName => { + return [metricName, {name: metricName}]; + })); + + // Populate each client record. + records.forEach(record => { + Object.entries(METRIC_MAP).forEach(([metricName, median_score]) => { + lighthouse[metricName][record.client] = {median_score: record[median_score]}; + }); + }); + + return Object.values(lighthouse); + '''; + + SELECT + STRING(DATE(date)) as date, + app AS technology, + rank, + geo, + GET_LIGHTHOUSE(ARRAY_AGG(STRUCT( + client, + median_lighthouse_score_accessibility, + median_lighthouse_score_best_practices, + median_lighthouse_score_performance, + median_lighthouse_score_pwa, + median_lighthouse_score_seo + + ))) AS lighthouse + FROM + `httparchive.core_web_vitals.technologies` + """, + "core_web_vitals": """ + CREATE TEMPORARY FUNCTION GET_VITALS( + records ARRAY> + ) RETURNS ARRAY, + mobile STRUCT< + good_number INT64, + tested INT64 + > + >> LANGUAGE js AS ''' + const METRIC_MAP = { + overall: ['origins_with_good_cwv', 'origins_eligible_for_cwv'], + LCP: ['origins_with_good_lcp', 'origins_with_any_lcp'], + CLS: ['origins_with_good_cls', 'origins_with_any_cls'], + FID: ['origins_with_good_fid', 'origins_with_any_fid'], + FCP: ['origins_with_good_fcp', 'origins_with_any_fcp'], + TTFB: ['origins_with_good_ttfb', 'origins_with_any_ttfb'], + INP: ['origins_with_good_inp', 'origins_with_any_inp'] + }; + + // Initialize the vitals map. + const vitals = Object.fromEntries(Object.keys(METRIC_MAP).map(metricName => { + return [metricName, {name: metricName}]; + })); + + // Populate each client record. + records.forEach(record => { + Object.entries(METRIC_MAP).forEach(([metricName, [good_number, tested]]) => { + vitals[metricName][record.client] = {good_number: record[good_number], tested: record[tested]}; + }); + }); + + return Object.values(vitals); + '''; + + SELECT + STRING(DATE(date)) as date, + app AS technology, + rank, + geo, + GET_VITALS(ARRAY_AGG(STRUCT( + client, + origins_with_good_fid, + origins_with_good_cls, + origins_with_good_lcp, + origins_with_good_fcp, + origins_with_good_ttfb, + origins_with_good_inp, + origins_with_any_fid, + origins_with_any_cls, + origins_with_any_lcp, + origins_with_any_fcp, + origins_with_any_ttfb, + origins_with_any_inp, + origins_with_good_cwv, + origins_eligible_for_cwv + ))) AS vitals + FROM + `httparchive.core_web_vitals.technologies` + + """, + "technologies": """ + SELECT + client, + app AS technology, + description, + category, + NULL AS similar_technologies, + origins + FROM + `httparchive.core_web_vitals.technologies` + JOIN + `httparchive.core_web_vitals.technology_descriptions` + ON + app = technology + """, + "page_weight": """ + SELECT + STRING(DATE(date)) as date, + app AS technology, + rank, + geo, + client, + median_bytes_total, + median_bytes_js, + median_bytes_image + FROM + `httparchive.core_web_vitals.technologies` + """, + "categories": """ + WITH categories AS ( + SELECT + category, + COUNT(DISTINCT root_page) AS origins + FROM + `httparchive.all.pages`, + UNNEST(technologies) AS t, + UNNEST(t.categories) AS category + WHERE + date = '2023-08-01' AND + client = 'mobile' + GROUP BY + category + ), + + technologies AS ( + SELECT + category, + technology, + COUNT(DISTINCT root_page) AS origins + FROM + `httparchive.all.pages`, + UNNEST(technologies) AS t, + UNNEST(t.categories) AS category + WHERE + date = '2023-08-01' AND + client = 'mobile' + GROUP BY + category, + technology + ) + + SELECT + category, + categories.origins, + ARRAY_AGG(technology ORDER BY technologies.origins DESC) AS technologies + FROM + categories + JOIN + technologies + USING + (category) + GROUP BY + category, + categories.origins + ORDER BY + categories.origins DESC + """ +} + +def buildQuery(start_date, end_date, query_type): + if query_type not in TECHNOLOGY_QUERIES: + raise ValueError(f"Query type {query_type} not found in TECHNOLOGY_QUERIES") + + query = TECHNOLOGY_QUERIES[query_type] + + if query_type != "technologies" and query_type != "page_weight": + # add dates to query + if start_date and not end_date: + query = f"{query} WHERE date >= '{start_date}'" + elif not start_date and end_date: + query = f"{query} WHERE date <= '{end_date}'" + elif start_date and end_date: + query = f"{query} WHERE date BETWEEN '{start_date}' AND '{end_date}'" + else: + query = query + + if query_type == "adoption" or query_type == "lighthouse" or query_type == "core_web_vitals": + query = f"{query} GROUP BY date, app, rank, geo" + + if query_type == "technologies": + query = f"{query} WHERE date = '2023-07-01' AND geo = 'ALL' AND rank = 'ALL'" + query = f"{query} ORDER BY origins DESC" + + logging.info(query) + + return query + + +def convert_decimal_to_float(data): + if isinstance(data, Decimal): + return float(data) + elif isinstance(data, dict): + new_dict = {} + for key, value in data.items(): + new_dict[key] = convert_decimal_to_float(value) + return new_dict + elif isinstance(data, list): + new_list = [] + for item in data: + new_list.append(convert_decimal_to_float(item)) + return new_list + else: + return data + +def create_hash_id(element, query_type): + + if query_type == "adoption" or query_type == "lighthouse" or query_type == "core_web_vitals": + id = (element['date'] + "-" + element['technology'] + "-" + element['geo'] + "-" + element['rank']).encode('utf-8') + + if query_type == "technologies": + id = (element['client'] + "-" + element['technology'] + "-" + element['category']).encode('utf-8') + + if query_type == "page_weight": + id = (element['date'] + "-" + element['technology'] + "-" + element['geo'] + "-" + element['rank'] + "-" + element['client']).encode('utf-8') + + if query_type == "categories": + id = (element['category']).encode('utf-8') + + hash_object = hashlib.sha256(id) + + return hash_object.hexdigest() + +class WriteToFirestoreBatchedDoFn(beam.DoFn): + """Write a batch of elements to Firestore.""" + def __init__(self, database, project, collection, query_type): + self.client = None + self.database = database + self.project = project + self.collection = collection + self.query_type = query_type + + def start_bundle(self): + # initialize client if it doesn't exist and create a collection reference + if self.client is None: + self.client = firestore.Client(project=self.project, database=self.database) + self.collection_ref = self.client.collection(self.collection) + + def process(self, elements): + for element in elements: + # creates a hash id for the document + hash_id = create_hash_id(element, self.query_type) + + doc_ref = self.collection_ref.document(hash_id) + doc_ref.set(element) + +def parse_arguments(argv): + """Parse command line arguments for the beam pipeline.""" + parser = argparse.ArgumentParser() + + # Query type + parser.add_argument( + '--query_type', + dest='query_type', + help='Query type', + required=True, + choices=TECHNOLOGY_QUERIES.keys()) + + # Firestore project + parser.add_argument( + '--firestore_project', + dest='firestore_project', + default='httparchive', + help='Firestore project', + required=True) + + # Firestore collection + parser.add_argument( + '--firestore_collection', + dest='firestore_collection', + default='lighthouse', + help='Firestore collection', + required=True) + + # Firestore database + parser.add_argument( + '--firestore_database', + dest='firestore_database', + default='(default)', + help='Firestore database', + required=True) + + # start date, optional + parser.add_argument( + '--start_date', + dest='start_date', + help='Start date', + required=False) + + # end date, optional + parser.add_argument( + '--end_date', + dest='end_date', + help='End date', + required=False) + + # parse arguments + known_args, pipeline_args = parser.parse_known_args(argv) + return known_args, pipeline_args + + +def create_pipeline(argv=None, save_main_session=True): + """Build the pipeline.""" + known_args, pipeline_args = parse_arguments(argv) + + query = buildQuery(known_args.start_date, known_args.end_date, known_args.query_type) + + pipeline_options = PipelineOptions(pipeline_args) + + # We use the save_main_session option because one or more DoFn's in this + # workflow rely on global context (e.g., a module imported at module level) + pipeline_options.view_as(SetupOptions).save_main_session = save_main_session + + # with beam.Pipeline(options=pipeline_options) as p: + p = beam.Pipeline(options=pipeline_options) + + # Read from BigQuery, convert decimal to float, group into batches, and write to Firestore + (p + | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(query=query, use_standard_sql=True) + | 'ConvertDecimalToFloat' >> beam.Map(convert_decimal_to_float) + | 'GroupIntoBatches' >> beam.BatchElements(min_batch_size=499, max_batch_size=499) + | 'WriteToFirestoreBatched' >> beam.ParDo(WriteToFirestoreBatchedDoFn( + database=known_args.firestore_database, + project=known_args.firestore_project, + collection=known_args.firestore_collection, + query_type=known_args.query_type + )) + ) + + return p + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + p = create_pipeline(argv) + logging.debug("Pipeline created") + result = p.run() + logging.debug("Pipeline run") + From dd7de1c39d1da1620bcf09084f83ffcab8a869c0 Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Mon, 20 Nov 2023 02:56:04 +0000 Subject: [PATCH 08/51] add queries and keys to constants.py --- modules/constants.py | 285 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 230 insertions(+), 55 deletions(-) diff --git a/modules/constants.py b/modules/constants.py index 4578b24..d33ffdc 100644 --- a/modules/constants.py +++ b/modules/constants.py @@ -104,6 +104,18 @@ class MaxContentSize(Enum): # limit response bodies to 20MB RESPONSE_BODIES = 20 * 1000000 + +TECHNOLOGY_QUERY_ID_KEYS = { + "adoption": ["date", "technology", "rank", "geo"], + "lighthouse": ["date", "technology", "rank", "geo"], + "core_web_vitals": ["date", "technology", "rank", "geo"], + "technologies": ["client", "technology", "category"], + "page_weight": ["date", "technology", "geo"], + "categories": ["category"], +} +"""Mapping of query types to a list of fields that uniquely identify a row.""" + + TECHNOLOGY_QUERIES = { "adoption": """ CREATE TEMPORARY FUNCTION GET_ADOPTION( @@ -131,64 +143,227 @@ class MaxContentSize(Enum): ))) AS adoption FROM `httparchive.core_web_vitals.technologies` + GROUP BY date, app, rank, geo """, "lighthouse": """ - CREATE TEMPORARY FUNCTION GET_LIGHTHOUSE( - records ARRAY> - ) RETURNS ARRAY, - mobile STRUCT< - median_score NUMERIC - > - >> LANGUAGE js AS ''' - const METRIC_MAP = { - accessibility: 'median_lighthouse_score_accessibility', - best_practices: 'median_lighthouse_score_best_practices', - performance: 'median_lighthouse_score_performance', - pwa: 'median_lighthouse_score_pwa', - seo: 'median_lighthouse_score_seo', - }; - - // Initialize the Lighthouse map. - const lighthouse = Object.fromEntries(Object.keys(METRIC_MAP).map(metricName => { - return [metricName, {name: metricName}]; - })); - - // Populate each client record. - records.forEach(record => { - Object.entries(METRIC_MAP).forEach(([metricName, median_score]) => { - lighthouse[metricName][record.client] = {median_score: record[median_score]}; + CREATE TEMPORARY FUNCTION GET_LIGHTHOUSE( + records ARRAY> + ) RETURNS ARRAY, + mobile STRUCT< + median_score NUMERIC + > + >> LANGUAGE js AS ''' + const METRIC_MAP = { + accessibility: 'median_lighthouse_score_accessibility', + best_practices: 'median_lighthouse_score_best_practices', + performance: 'median_lighthouse_score_performance', + pwa: 'median_lighthouse_score_pwa', + seo: 'median_lighthouse_score_seo', + }; + + // Initialize the Lighthouse map. + const lighthouse = Object.fromEntries(Object.keys(METRIC_MAP).map(metricName => { + return [metricName, {name: metricName}]; + })); + + // Populate each client record. + records.forEach(record => { + Object.entries(METRIC_MAP).forEach(([metricName, median_score]) => { + lighthouse[metricName][record.client] = {median_score: record[median_score]}; + }); }); - }); - return Object.values(lighthouse); - '''; + return Object.values(lighthouse); + '''; - SELECT - STRING(DATE(date)) as date, - app AS technology, - rank, - geo, - GET_LIGHTHOUSE(ARRAY_AGG(STRUCT( + SELECT + STRING(DATE(date)) as date, + app AS technology, + rank, + geo, + GET_LIGHTHOUSE(ARRAY_AGG(STRUCT( + client, + median_lighthouse_score_accessibility, + median_lighthouse_score_best_practices, + median_lighthouse_score_performance, + median_lighthouse_score_pwa, + median_lighthouse_score_seo + + ))) AS lighthouse + FROM + `httparchive.core_web_vitals.technologies` + GROUP BY date, app, rank, geo + """, + "core_web_vitals": """ + CREATE TEMPORARY FUNCTION GET_VITALS( + records ARRAY> + ) RETURNS ARRAY, + mobile STRUCT< + good_number INT64, + tested INT64 + > + >> LANGUAGE js AS ''' + const METRIC_MAP = { + overall: ['origins_with_good_cwv', 'origins_eligible_for_cwv'], + LCP: ['origins_with_good_lcp', 'origins_with_any_lcp'], + CLS: ['origins_with_good_cls', 'origins_with_any_cls'], + FID: ['origins_with_good_fid', 'origins_with_any_fid'], + FCP: ['origins_with_good_fcp', 'origins_with_any_fcp'], + TTFB: ['origins_with_good_ttfb', 'origins_with_any_ttfb'], + INP: ['origins_with_good_inp', 'origins_with_any_inp'] + }; + + // Initialize the vitals map. + const vitals = Object.fromEntries(Object.keys(METRIC_MAP).map(metricName => { + return [metricName, {name: metricName}]; + })); + + // Populate each client record. + records.forEach(record => { + Object.entries(METRIC_MAP).forEach(([metricName, [good_number, tested]]) => { + vitals[metricName][record.client] = {good_number: record[good_number], tested: record[tested]}; + }); + }); + + return Object.values(vitals); + '''; + + SELECT + STRING(DATE(date)) as date, + app AS technology, + rank, + geo, + GET_VITALS(ARRAY_AGG(STRUCT( + client, + origins_with_good_fid, + origins_with_good_cls, + origins_with_good_lcp, + origins_with_good_fcp, + origins_with_good_ttfb, + origins_with_good_inp, + origins_with_any_fid, + origins_with_any_cls, + origins_with_any_lcp, + origins_with_any_fcp, + origins_with_any_ttfb, + origins_with_any_inp, + origins_with_good_cwv, + origins_eligible_for_cwv + ))) AS vitals + FROM + `httparchive.core_web_vitals.technologies` + GROUP BY date, app, rank, geo + """, + "technologies": """ + SELECT + client, + app AS technology, + description, + category, + NULL AS similar_technologies, + origins + FROM + `httparchive.core_web_vitals.technologies` + JOIN + `httparchive.core_web_vitals.technology_descriptions` + ON + app = technology + WHERE date = '2023-07-01' AND geo = 'ALL' AND rank = 'ALL' + ORDER BY origins DESC + """, + "page_weight": """ + SELECT + STRING(DATE(date)) as date, + app AS technology, + rank, + geo, client, - median_lighthouse_score_accessibility, - median_lighthouse_score_best_practices, - median_lighthouse_score_performance, - median_lighthouse_score_pwa, - median_lighthouse_score_seo - - ))) AS lighthouse - FROM - `httparchive.core_web_vitals.technologies` - """ -} \ No newline at end of file + median_bytes_total, + median_bytes_js, + median_bytes_image + FROM + `httparchive.core_web_vitals.technologies` + """, + "categories": """ + WITH categories AS ( + SELECT + category, + COUNT(DISTINCT root_page) AS origins + FROM + `httparchive.all.pages`, + UNNEST(technologies) AS t, + UNNEST(t.categories) AS category + WHERE + date = '2023-08-01' AND + client = 'mobile' + GROUP BY + category + ), + + technologies AS ( + SELECT + category, + technology, + COUNT(DISTINCT root_page) AS origins + FROM + `httparchive.all.pages`, + UNNEST(technologies) AS t, + UNNEST(t.categories) AS category + WHERE + date = '2023-08-01' AND + client = 'mobile' + GROUP BY + category, + technology + ) + + SELECT + category, + categories.origins, + ARRAY_AGG(technology ORDER BY technologies.origins DESC) AS technologies + FROM + categories + JOIN + technologies + USING + (category) + GROUP BY + category, + categories.origins + ORDER BY + categories.origins DESC + """ +} +"""Mapping of query types to BigQuery SQL queries.""" From 834951d481d25ac44c99653033e44fd39ea44a84 Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Mon, 20 Nov 2023 03:00:40 +0000 Subject: [PATCH 09/51] updates to tech_report_pipeline * add id hashing logic from tech_report_pipeline_inserts.py * update query builder * modify date filtering logic * change from batched to individual Firestore inserts * Add custom TechReportPipelineOptions class --- modules/tech_report_pipeline.py | 220 ++++++++++++++++---------------- 1 file changed, 107 insertions(+), 113 deletions(-) diff --git a/modules/tech_report_pipeline.py b/modules/tech_report_pipeline.py index ec0d4a9..27fa7de 100644 --- a/modules/tech_report_pipeline.py +++ b/modules/tech_report_pipeline.py @@ -1,42 +1,51 @@ #!/usr/bin/env python3 from decimal import Decimal -from distutils.command import build +import hashlib from sys import argv -import uuid import apache_beam as beam -from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner +from apache_beam.utils import retry from google.cloud import firestore from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions import logging import argparse -from constants import TECHNOLOGY_QUERIES +from modules import constants # Inspired by https://stackoverflow.com/a/67028348 -def buildQuery(start_date, end_date, query_type): - if query_type not in TECHNOLOGY_QUERIES: +def technology_hash_id(element: dict, query_type: str, key_map=constants.TECHNOLOGY_QUERY_ID_KEYS): + """Returns a hashed id for a set of technology query keys""" + if query_type not in key_map: + raise ValueError(f"Invalid query type: {query_type}") + keys = key_map[query_type] + values = [element.get(key) for key in keys] + hash = hashlib.sha256("-".join(values).encode()).hexdigest() + return hash + + +def build_query(query_type): + if query_type not in constants.TECHNOLOGY_QUERIES: raise ValueError(f"Query type {query_type} not found in TECHNOLOGY_QUERIES") + query = constants.TECHNOLOGY_QUERIES[query_type] + logging.info(query) + return query - query = TECHNOLOGY_QUERIES[query_type] - # add dates to query - if start_date and not end_date: - query = f"{query} WHERE date >= '{start_date}'" - elif not start_date and end_date: - query = f"{query} WHERE date <= '{end_date}'" +def filter_dates(row, start_date, end_date): + """Filter rows between start and end date""" + if not start_date and not end_date: + return True + elif 'date' not in row: + return True elif start_date and end_date: - query = f"{query} WHERE date BETWEEN '{start_date}' AND '{end_date}'" + return start_date <= row['date'] <= end_date + elif start_date: + return start_date <= row['date'] + elif end_date: + return row['date'] <= end_date else: - query = query - - # add group by to query - query = f"{query} GROUP BY date, app, rank, geo" - - logging.info(query) - - return query + return True def convert_decimal_to_float(data): @@ -56,14 +65,14 @@ def convert_decimal_to_float(data): return data -class WriteToFirestoreBatchedDoFn(beam.DoFn): - """Write a batch of elements to Firestore.""" - def __init__(self, database, project, collection, batch_timeout=14400): +class WriteToFirestoreDoFn(beam.DoFn): + """Write a single element to Firestore. Retry on failure using exponential backoff, see :func:`apache_beam.utils.retry.with_exponential_backoff`.""" + def __init__(self, project, database, collection, query_type): self.client = None - self.database = database self.project = project + self.database = database self.collection = collection - self.batch_timeout = batch_timeout + self.query_type = query_type def start_bundle(self): # initialize client if it doesn't exist and create a collection reference @@ -71,97 +80,81 @@ def start_bundle(self): self.client = firestore.Client(project=self.project, database=self.database) self.collection_ref = self.client.collection(self.collection) - # create a batch - self.batch = self.client.batch() - - def process(self, elements): - for element in elements: - doc_ref = self.collection_ref.document(uuid.uuid4().hex) - self.batch.set(doc_ref, element) + def process(self, element): + # creates a hash id for the document + hash_id = technology_hash_id(element, self.query_type) + self._add_record(hash_id, element) + + @retry.with_exponential_backoff() + def _add_record(self, id, data): + doc_ref = self.collection_ref.document(id) + doc_ref.set(data) + + +class TechReportPipelineOptions(PipelineOptions): + @classmethod + def _add_argparse_args(cls, parser): + # Query type + parser.add_argument( + '--query_type', + dest='query_type', + help='Query type', + required=True, + choices=constants.TECHNOLOGY_QUERIES.keys()) + + # Firestore project + parser.add_argument( + '--firestore_project', + dest='firestore_project', + default='httparchive', + help='Firestore project', + required=True) + + # Firestore collection + parser.add_argument( + '--firestore_collection', + dest='firestore_collection', + help='Firestore collection', + required=True) + + # Firestore database + parser.add_argument( + '--firestore_database', + dest='firestore_database', + default='(default)', + help='Firestore database', + required=True) + + # start date, optional + parser.add_argument( + '--start_date', + dest='start_date', + help='Start date', + required=False) + + # end date, optional + parser.add_argument( + '--end_date', + dest='end_date', + help='End date', + required=False) - # commit the batch with a timeout - self.batch.commit(timeout=self.batch_timeout) - - -# commented out for testing later - need to compare performance between batched and individual writes -# class WriteToFirestoreDoFn(beam.DoFn): -# """Write a single element to Firestore.""" -# def __init__(self, project, collection): -# self.client = None -# self.project = project -# self.collection = collection - -# def start_bundle(self): -# # initialize client if it doesn't exist -# if self.client is None: -# self.client = firestore.Client(project=self.project) - -# def process(self, element): -# self.client.write_data(element) +def create_pipeline(argv=None, save_main_session=True): + """Build the pipeline.""" -def parse_arguments(argv): - """Parse command line arguments for the beam pipeline.""" parser = argparse.ArgumentParser() + known_args, beam_args = parser.parse_known_args() - # Query type - parser.add_argument( - '--query_type', - dest='query_type', - help='Query type', - required=True, - choices=TECHNOLOGY_QUERIES.keys()) - - # Firestore project - parser.add_argument( - '--firestore_project', - dest='firestore_project', - default='httparchive', - help='Firestore project', - required=True) - - # Firestore collection - parser.add_argument( - '--firestore_collection', - dest='firestore_collection', - default='lighthouse', - help='Firestore collection', - required=True) - - # Firestore database - parser.add_argument( - '--firestore_database', - dest='firestore_database', - default='(default)', - help='Firestore database', - required=True) - - # start date, optional - parser.add_argument( - '--start_date', - dest='start_date', - help='Start date', - required=False) - - # end date, optional - parser.add_argument( - '--end_date', - dest='end_date', - help='End date', - required=False) - - # parse arguments - known_args, pipeline_args = parser.parse_known_args(argv) - return known_args, pipeline_args + # Create and set your Pipeline Options. + beam_options = PipelineOptions(beam_args) + known_args = beam_options.view_as(TechReportPipelineOptions) + query = build_query(known_args.query_type) -def create_pipeline(argv=None, save_main_session=True): - """Build the pipeline.""" - known_args, pipeline_args = parse_arguments(argv) - - query = buildQuery(known_args.start_date, known_args.end_date, known_args.query_type) + pipeline_options = PipelineOptions(beam_options) - pipeline_options = PipelineOptions(pipeline_args) + logging.info(f"Pipeline options: {pipeline_options.get_all_options()}") # We use the save_main_session option because one or more DoFn's in this # workflow rely on global context (e.g., a module imported at module level) @@ -173,12 +166,13 @@ def create_pipeline(argv=None, save_main_session=True): # Read from BigQuery, convert decimal to float, group into batches, and write to Firestore (p | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(query=query, use_standard_sql=True) + | 'FilterDates' >> beam.Filter(lambda row: filter_dates(row, known_args.start_date, known_args.end_date)) | 'ConvertDecimalToFloat' >> beam.Map(convert_decimal_to_float) - | 'GroupIntoBatches' >> beam.BatchElements(min_batch_size=50, max_batch_size=50) - | 'WriteToFirestoreBatched' >> beam.ParDo(WriteToFirestoreBatchedDoFn( - database=known_args.firestore_database, + | 'WriteToFirestore' >> beam.ParDo(WriteToFirestoreDoFn( project=known_args.firestore_project, - collection=known_args.firestore_collection + database=known_args.firestore_database, + collection=known_args.firestore_collection, + query_type=known_args.query_type )) ) From 721924afc37c0defd46042e0b352d612a2a54b7b Mon Sep 17 00:00:00 2001 From: Martin Aceto Date: Wed, 29 Nov 2023 20:32:51 -0500 Subject: [PATCH 10/51] fix query age weight --- modules/constants.py | 58 ++++++++++++++++++++----- modules/tech_report_pipeline_inserts.py | 53 +++++++++++++++++----- 2 files changed, 88 insertions(+), 23 deletions(-) diff --git a/modules/constants.py b/modules/constants.py index d33ffdc..7ca6c52 100644 --- a/modules/constants.py +++ b/modules/constants.py @@ -104,14 +104,13 @@ class MaxContentSize(Enum): # limit response bodies to 20MB RESPONSE_BODIES = 20 * 1000000 - TECHNOLOGY_QUERY_ID_KEYS = { - "adoption": ["date", "technology", "rank", "geo"], - "lighthouse": ["date", "technology", "rank", "geo"], - "core_web_vitals": ["date", "technology", "rank", "geo"], - "technologies": ["client", "technology", "category"], - "page_weight": ["date", "technology", "geo"], - "categories": ["category"], + "adoption": ["date", "technology", "geo", "rank"], + "lighthouse": ["date", "technology", "geo", "rank"], + "core_web_vitals": ["date", "technology", "geo", "rank"], + "page_weight": ["date", "technology", "geo", "rank"], + "technologies": ["client", "technology", "category"], + "categories": ["category"], } """Mapping of query types to a list of fields that uniquely identify a row.""" @@ -304,17 +303,52 @@ class MaxContentSize(Enum): ORDER BY origins DESC """, "page_weight": """ + CREATE TEMPORARY FUNCTION GET_PAGE_WEIGHT( + records ARRAY> + ) RETURNS ARRAY, + desktop STRUCT< + median_bytes INT64 + > + >> LANGUAGE js AS ''' + const METRICS = ['total', 'js', 'images']; + + // Initialize the page weight map. + const pageWeight = Object.fromEntries(METRICS.map(metricName => { + return [metricName, {name: metricName}]; + })); + + // Populate each client record. + records.forEach(record => { + METRICS.forEach(metricName => { + pageWeight[metricName][record.client] = {median_bytes: record[metricName]}; + }); + }); + + return Object.values(pageWeight); + '''; + SELECT - STRING(DATE(date)) as date, - app AS technology, - rank, - geo, + date, + app AS technology, + rank, + geo, + GET_PAGE_WEIGHT(ARRAY_AGG(STRUCT( client, median_bytes_total, median_bytes_js, median_bytes_image + ))) AS pageWeight FROM - `httparchive.core_web_vitals.technologies` + `httparchive.core_web_vitals.technologies` """, "categories": """ WITH categories AS ( diff --git a/modules/tech_report_pipeline_inserts.py b/modules/tech_report_pipeline_inserts.py index 5ef33b0..2981132 100644 --- a/modules/tech_report_pipeline_inserts.py +++ b/modules/tech_report_pipeline_inserts.py @@ -181,7 +181,6 @@ ))) AS vitals FROM `httparchive.core_web_vitals.technologies` - """, "technologies": """ SELECT @@ -199,15 +198,50 @@ app = technology """, "page_weight": """ + CREATE TEMPORARY FUNCTION GET_PAGE_WEIGHT( + records ARRAY> + ) RETURNS ARRAY, + desktop STRUCT< + median_bytes INT64 + > + >> LANGUAGE js AS ''' + const METRICS = ['total', 'js', 'images']; + + // Initialize the page weight map. + const pageWeight = Object.fromEntries(METRICS.map(metricName => { + return [metricName, {name: metricName}]; + })); + + // Populate each client record. + records.forEach(record => { + METRICS.forEach(metricName => { + pageWeight[metricName][record.client] = {median_bytes: record[metricName]}; + }); + }); + + return Object.values(pageWeight); + '''; + SELECT STRING(DATE(date)) as date, app AS technology, rank, geo, - client, - median_bytes_total, - median_bytes_js, - median_bytes_image + GET_PAGE_WEIGHT(ARRAY_AGG(STRUCT( + client, + median_bytes_total, + median_bytes_js, + median_bytes_image + ))) AS pageWeight FROM `httparchive.core_web_vitals.technologies` """, @@ -268,7 +302,7 @@ def buildQuery(start_date, end_date, query_type): query = TECHNOLOGY_QUERIES[query_type] - if query_type != "technologies" and query_type != "page_weight": + if query_type != "technologies": # add dates to query if start_date and not end_date: query = f"{query} WHERE date >= '{start_date}'" @@ -279,7 +313,7 @@ def buildQuery(start_date, end_date, query_type): else: query = query - if query_type == "adoption" or query_type == "lighthouse" or query_type == "core_web_vitals": + if query_type == "adoption" or query_type == "lighthouse" or query_type == "core_web_vitals" or query_type == "page_weight": query = f"{query} GROUP BY date, app, rank, geo" if query_type == "technologies": @@ -309,15 +343,12 @@ def convert_decimal_to_float(data): def create_hash_id(element, query_type): - if query_type == "adoption" or query_type == "lighthouse" or query_type == "core_web_vitals": + if query_type == "adoption" or query_type == "lighthouse" or query_type == "core_web_vitals" or query_type == "page_weight": id = (element['date'] + "-" + element['technology'] + "-" + element['geo'] + "-" + element['rank']).encode('utf-8') if query_type == "technologies": id = (element['client'] + "-" + element['technology'] + "-" + element['category']).encode('utf-8') - if query_type == "page_weight": - id = (element['date'] + "-" + element['technology'] + "-" + element['geo'] + "-" + element['rank'] + "-" + element['client']).encode('utf-8') - if query_type == "categories": id = (element['category']).encode('utf-8') From 85a704bda1e70e331b21ac8c80a0205372c586db Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sat, 2 Dec 2023 17:33:31 +0000 Subject: [PATCH 11/51] sort keys for id hashing --- modules/tech_report_pipeline.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/tech_report_pipeline.py b/modules/tech_report_pipeline.py index 27fa7de..782ea17 100644 --- a/modules/tech_report_pipeline.py +++ b/modules/tech_report_pipeline.py @@ -15,10 +15,10 @@ def technology_hash_id(element: dict, query_type: str, key_map=constants.TECHNOLOGY_QUERY_ID_KEYS): - """Returns a hashed id for a set of technology query keys""" + """Returns a hashed id for a set of technology query keys. Keys are sorted alphabetically and joined with a dash. The resulting string is hashed using SHA256.""" if query_type not in key_map: raise ValueError(f"Invalid query type: {query_type}") - keys = key_map[query_type] + keys = sorted(key_map[query_type]) values = [element.get(key) for key in keys] hash = hashlib.sha256("-".join(values).encode()).hexdigest() return hash From f755623d3e2629fd12d841c4c6bafbd8119493d4 Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sat, 2 Dec 2023 17:33:49 +0000 Subject: [PATCH 12/51] parameterize query builder --- modules/tech_report_pipeline.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/modules/tech_report_pipeline.py b/modules/tech_report_pipeline.py index 782ea17..7ac3e5c 100644 --- a/modules/tech_report_pipeline.py +++ b/modules/tech_report_pipeline.py @@ -24,10 +24,10 @@ def technology_hash_id(element: dict, query_type: str, key_map=constants.TECHNOL return hash -def build_query(query_type): - if query_type not in constants.TECHNOLOGY_QUERIES: +def build_query(query_type, queries=constants.TECHNOLOGY_QUERIES): + if query_type not in queries: raise ValueError(f"Query type {query_type} not found in TECHNOLOGY_QUERIES") - query = constants.TECHNOLOGY_QUERIES[query_type] + query = queries[query_type] logging.info(query) return query From b274fc51ee0729682651995717590946ecffcd41 Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sat, 2 Dec 2023 17:34:23 +0000 Subject: [PATCH 13/51] Firestore write to retry on any Exception --- modules/tech_report_pipeline.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/modules/tech_report_pipeline.py b/modules/tech_report_pipeline.py index 7ac3e5c..74238a7 100644 --- a/modules/tech_report_pipeline.py +++ b/modules/tech_report_pipeline.py @@ -66,7 +66,7 @@ def convert_decimal_to_float(data): class WriteToFirestoreDoFn(beam.DoFn): - """Write a single element to Firestore. Retry on failure using exponential backoff, see :func:`apache_beam.utils.retry.with_exponential_backoff`.""" + """Write a single element to Firestore. Yields the hash id of the document. Retry on failure using exponential backoff, see :func:`apache_beam.utils.retry.with_exponential_backoff`.""" def __init__(self, project, database, collection, query_type): self.client = None self.project = project @@ -84,9 +84,11 @@ def process(self, element): # creates a hash id for the document hash_id = technology_hash_id(element, self.query_type) self._add_record(hash_id, element) + yield hash_id - @retry.with_exponential_backoff() + @retry.with_exponential_backoff(retry_filter=lambda ex: isinstance(ex, Exception)) def _add_record(self, id, data): + """Helper function to add a record to Firestore. Retries on any `Exception`.""" doc_ref = self.collection_ref.document(id) doc_ref.set(data) From 61afe5dd38d0b066cd2a90db67e4f450f4e3b74d Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sat, 2 Dec 2023 17:34:54 +0000 Subject: [PATCH 14/51] fix pipeline argument parsing --- modules/tech_report_pipeline.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/modules/tech_report_pipeline.py b/modules/tech_report_pipeline.py index 74238a7..bf13c0e 100644 --- a/modules/tech_report_pipeline.py +++ b/modules/tech_report_pipeline.py @@ -154,16 +154,13 @@ def create_pipeline(argv=None, save_main_session=True): query = build_query(known_args.query_type) - pipeline_options = PipelineOptions(beam_options) - - logging.info(f"Pipeline options: {pipeline_options.get_all_options()}") + logging.info(f"Pipeline options: {beam_options.get_all_options()}") # We use the save_main_session option because one or more DoFn's in this # workflow rely on global context (e.g., a module imported at module level) - pipeline_options.view_as(SetupOptions).save_main_session = save_main_session + beam_options.view_as(SetupOptions).save_main_session = save_main_session - # with beam.Pipeline(options=pipeline_options) as p: - p = beam.Pipeline(options=pipeline_options) + p = beam.Pipeline(options=beam_options) # Read from BigQuery, convert decimal to float, group into batches, and write to Firestore (p From 95d2443836d1bdc62d41c8d02a3189572d2f98f1 Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sat, 2 Dec 2023 21:40:09 +0000 Subject: [PATCH 15/51] linting --- modules/constants.py | 1 + 1 file changed, 1 insertion(+) diff --git a/modules/constants.py b/modules/constants.py index 7ca6c52..d993f02 100644 --- a/modules/constants.py +++ b/modules/constants.py @@ -104,6 +104,7 @@ class MaxContentSize(Enum): # limit response bodies to 20MB RESPONSE_BODIES = 20 * 1000000 + TECHNOLOGY_QUERY_ID_KEYS = { "adoption": ["date", "technology", "geo", "rank"], "lighthouse": ["date", "technology", "geo", "rank"], From 89624d76d8aa652996a3372c46e617c84abe6014 Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sun, 3 Dec 2023 00:11:50 +0000 Subject: [PATCH 16/51] update page_weight query in constants --- modules/constants.py | 55 ++++++++++++++++++++++---------------------- 1 file changed, 28 insertions(+), 27 deletions(-) diff --git a/modules/constants.py b/modules/constants.py index d993f02..7efde26 100644 --- a/modules/constants.py +++ b/modules/constants.py @@ -305,20 +305,20 @@ class MaxContentSize(Enum): """, "page_weight": """ CREATE TEMPORARY FUNCTION GET_PAGE_WEIGHT( - records ARRAY> + records ARRAY> ) RETURNS ARRAY, - desktop STRUCT< - median_bytes INT64 - > + name STRING, + mobile STRUCT< + median_bytes INT64 + >, + desktop STRUCT< + median_bytes INT64 + > >> LANGUAGE js AS ''' const METRICS = ['total', 'js', 'images']; @@ -329,27 +329,28 @@ class MaxContentSize(Enum): // Populate each client record. records.forEach(record => { - METRICS.forEach(metricName => { - pageWeight[metricName][record.client] = {median_bytes: record[metricName]}; - }); + METRICS.forEach(metricName => { + pageWeight[metricName][record.client] = {median_bytes: record[metricName]}; + }); }); return Object.values(pageWeight); '''; SELECT - date, - app AS technology, - rank, - geo, - GET_PAGE_WEIGHT(ARRAY_AGG(STRUCT( - client, - median_bytes_total, - median_bytes_js, - median_bytes_image - ))) AS pageWeight + STRING(DATE(date)) as date, + app AS technology, + rank, + geo, + GET_PAGE_WEIGHT(ARRAY_AGG(STRUCT( + client, + median_bytes_total, + median_bytes_js, + median_bytes_image + ))) AS pageWeight FROM - `httparchive.core_web_vitals.technologies` + `httparchive.core_web_vitals.technologies` + GROUP BY date, app, rank, geo """, "categories": """ WITH categories AS ( From 775b2445ffa067e96d67067ed77cd6d09aadd317 Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sun, 3 Dec 2023 02:14:41 +0000 Subject: [PATCH 17/51] add date filtering by month --- modules/tech_report_pipeline.py | 40 +++++++++++++++++++++++++++++++-- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/modules/tech_report_pipeline.py b/modules/tech_report_pipeline.py index bf13c0e..5866b0a 100644 --- a/modules/tech_report_pipeline.py +++ b/modules/tech_report_pipeline.py @@ -1,5 +1,7 @@ #!/usr/bin/env python3 +import calendar +from datetime import date from decimal import Decimal import hashlib from sys import argv @@ -32,7 +34,41 @@ def build_query(query_type, queries=constants.TECHNOLOGY_QUERIES): return query -def filter_dates(row, start_date, end_date): +def filter_dates_by_query_type(query_type, row, start_date, end_date) -> bool: + """Filter rows by date. For some queries, use the latest month available. For others, use the date range specified by the user.""" + if query_type in ["categories", "technologies"]: + return filter_by_month(row, start_date, end_date) + else: + return filter_by_dates(row, start_date, end_date) + + +def filter_by_month(row, start_date, end_date) -> bool: + """Filter rows by date range if given, otherwise by current month. If start_date and end_date are given, use those. If only start_date is given, use the entire month. If only end_date is given, use the entire month. If neither are given, use the current month.""" + if 'date' not in row: + return True + + if start_date and end_date: + first = date.fromisoformat(start_date) + last = date.fromisoformat(end_date) + elif start_date: + first = date.fromisoformat(start_date).replace(day=1) + last = first.replace(day=calendar.monthrange(first.year, first.month)[1]) + elif end_date: + last = date.fromisoformat(end_date) + first = last.replace(day=1) + else: + today = date.today() + first = today.replace(day=1) + last = today.replace(day=calendar.monthrange(today.year, today.month)[1]) + + # if first and last are greater than one month apart, throw an error + if first.replace(day=1) != last.replace(day=1): + raise ValueError(f"Start and end dates must be within the same month. {start_date=}, {end_date=}") + + return first <= row['date'] <= last + + +def filter_by_dates(row, start_date, end_date) -> bool: """Filter rows between start and end date""" if not start_date and not end_date: return True @@ -165,7 +201,7 @@ def create_pipeline(argv=None, save_main_session=True): # Read from BigQuery, convert decimal to float, group into batches, and write to Firestore (p | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(query=query, use_standard_sql=True) - | 'FilterDates' >> beam.Filter(lambda row: filter_dates(row, known_args.start_date, known_args.end_date)) + | 'FilterDates' >> beam.Filter(lambda row: filter_dates_by_query_type(known_args.query_type, row, known_args.start_date, known_args.end_date)) | 'ConvertDecimalToFloat' >> beam.Map(convert_decimal_to_float) | 'WriteToFirestore' >> beam.ParDo(WriteToFirestoreDoFn( project=known_args.firestore_project, From e8c77cc9eb5d2b8fcb8e19b8304b4f6c741e702b Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sun, 3 Dec 2023 02:15:29 +0000 Subject: [PATCH 18/51] WriteToFirestoreDoFn yields hash_id, element --- modules/tech_report_pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/tech_report_pipeline.py b/modules/tech_report_pipeline.py index 5866b0a..42e5a96 100644 --- a/modules/tech_report_pipeline.py +++ b/modules/tech_report_pipeline.py @@ -120,7 +120,7 @@ def process(self, element): # creates a hash id for the document hash_id = technology_hash_id(element, self.query_type) self._add_record(hash_id, element) - yield hash_id + yield hash_id, element @retry.with_exponential_backoff(retry_filter=lambda ex: isinstance(ex, Exception)) def _add_record(self, id, data): From 6a47b095d25246d6b599a62dd98325519a6a5bde Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sun, 3 Dec 2023 02:16:45 +0000 Subject: [PATCH 19/51] extract pipeline arg parsing --- modules/tech_report_pipeline.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/modules/tech_report_pipeline.py b/modules/tech_report_pipeline.py index 42e5a96..289d597 100644 --- a/modules/tech_report_pipeline.py +++ b/modules/tech_report_pipeline.py @@ -4,7 +4,6 @@ from datetime import date from decimal import Decimal import hashlib -from sys import argv import apache_beam as beam from apache_beam.utils import retry from google.cloud import firestore @@ -178,15 +177,19 @@ def _add_argparse_args(cls, parser): required=False) -def create_pipeline(argv=None, save_main_session=True): - """Build the pipeline.""" - +def parse_args(): parser = argparse.ArgumentParser() known_args, beam_args = parser.parse_known_args() # Create and set your Pipeline Options. beam_options = PipelineOptions(beam_args) known_args = beam_options.view_as(TechReportPipelineOptions) + return known_args, beam_options + + +def create_pipeline(save_main_session=True): + """Build the pipeline.""" + known_args, beam_options = parse_args() query = build_query(known_args.query_type) @@ -216,7 +219,7 @@ def create_pipeline(argv=None, save_main_session=True): if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) - p = create_pipeline(argv) + p = create_pipeline() logging.debug("Pipeline created") result = p.run() logging.debug("Pipeline run") From 01fc722082f16da62c98f05f4bdd4183b0812966 Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sun, 3 Dec 2023 02:19:38 +0000 Subject: [PATCH 20/51] optional debug logging --- modules/tech_report_pipeline.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/modules/tech_report_pipeline.py b/modules/tech_report_pipeline.py index 289d597..9cb4a9e 100644 --- a/modules/tech_report_pipeline.py +++ b/modules/tech_report_pipeline.py @@ -214,6 +214,10 @@ def create_pipeline(save_main_session=True): )) ) + # if logging level is DEBUG, log results + if logging.getLogger().getEffectiveLevel() == logging.DEBUG: + p = p | 'LogResults' >> beam.Map(logging.debug) + return p From 546fc62a1c10fd4cc1893f527e7ae75e75d7dc2a Mon Sep 17 00:00:00 2001 From: Martin Aceto Date: Wed, 6 Dec 2023 20:47:47 -0500 Subject: [PATCH 21/51] added a field in techonologies query --- modules/constants.py | 1 + 1 file changed, 1 insertion(+) diff --git a/modules/constants.py b/modules/constants.py index 7efde26..60a912c 100644 --- a/modules/constants.py +++ b/modules/constants.py @@ -292,6 +292,7 @@ class MaxContentSize(Enum): app AS technology, description, category, + SPLIT(category, ",") AS category_obj, NULL AS similar_technologies, origins FROM From a07858b3c09c52fb2b13c9b788b5ba0a5a17d994 Mon Sep 17 00:00:00 2001 From: Martin Aceto Date: Wed, 6 Dec 2023 20:48:48 -0500 Subject: [PATCH 22/51] deleted test file tec_report_pipeline_inserts --- modules/tech_report_pipeline_inserts.py | 474 ------------------------ 1 file changed, 474 deletions(-) delete mode 100644 modules/tech_report_pipeline_inserts.py diff --git a/modules/tech_report_pipeline_inserts.py b/modules/tech_report_pipeline_inserts.py deleted file mode 100644 index 2981132..0000000 --- a/modules/tech_report_pipeline_inserts.py +++ /dev/null @@ -1,474 +0,0 @@ -#!/usr/bin/env python3 - -from decimal import Decimal -from distutils.command import build -from sys import argv -import uuid -import apache_beam as beam -from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner -from google.cloud import firestore -from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions -import logging -import argparse -import hashlib -#from constants import TECHNOLOGY_QUERIES - -# Inspired by https://stackoverflow.com/a/67028348 - -TECHNOLOGY_QUERIES = { - "adoption": """ - CREATE TEMPORARY FUNCTION GET_ADOPTION( - records ARRAY> - ) RETURNS STRUCT< - desktop INT64, - mobile INT64 - > LANGUAGE js AS ''' - return Object.fromEntries(records.map(({client, origins}) => { - return [client, origins]; - })); - '''; - - SELECT - STRING(DATE(date)) as date, - app AS technology, - rank, - geo, - GET_ADOPTION(ARRAY_AGG(STRUCT( - client, - origins - ))) AS adoption - FROM - `httparchive.core_web_vitals.technologies` - """, - "lighthouse": """ - CREATE TEMPORARY FUNCTION GET_LIGHTHOUSE( - records ARRAY> - ) RETURNS ARRAY, - mobile STRUCT< - median_score NUMERIC - > - >> LANGUAGE js AS ''' - const METRIC_MAP = { - accessibility: 'median_lighthouse_score_accessibility', - best_practices: 'median_lighthouse_score_best_practices', - performance: 'median_lighthouse_score_performance', - pwa: 'median_lighthouse_score_pwa', - seo: 'median_lighthouse_score_seo', - }; - - // Initialize the Lighthouse map. - const lighthouse = Object.fromEntries(Object.keys(METRIC_MAP).map(metricName => { - return [metricName, {name: metricName}]; - })); - - // Populate each client record. - records.forEach(record => { - Object.entries(METRIC_MAP).forEach(([metricName, median_score]) => { - lighthouse[metricName][record.client] = {median_score: record[median_score]}; - }); - }); - - return Object.values(lighthouse); - '''; - - SELECT - STRING(DATE(date)) as date, - app AS technology, - rank, - geo, - GET_LIGHTHOUSE(ARRAY_AGG(STRUCT( - client, - median_lighthouse_score_accessibility, - median_lighthouse_score_best_practices, - median_lighthouse_score_performance, - median_lighthouse_score_pwa, - median_lighthouse_score_seo - - ))) AS lighthouse - FROM - `httparchive.core_web_vitals.technologies` - """, - "core_web_vitals": """ - CREATE TEMPORARY FUNCTION GET_VITALS( - records ARRAY> - ) RETURNS ARRAY, - mobile STRUCT< - good_number INT64, - tested INT64 - > - >> LANGUAGE js AS ''' - const METRIC_MAP = { - overall: ['origins_with_good_cwv', 'origins_eligible_for_cwv'], - LCP: ['origins_with_good_lcp', 'origins_with_any_lcp'], - CLS: ['origins_with_good_cls', 'origins_with_any_cls'], - FID: ['origins_with_good_fid', 'origins_with_any_fid'], - FCP: ['origins_with_good_fcp', 'origins_with_any_fcp'], - TTFB: ['origins_with_good_ttfb', 'origins_with_any_ttfb'], - INP: ['origins_with_good_inp', 'origins_with_any_inp'] - }; - - // Initialize the vitals map. - const vitals = Object.fromEntries(Object.keys(METRIC_MAP).map(metricName => { - return [metricName, {name: metricName}]; - })); - - // Populate each client record. - records.forEach(record => { - Object.entries(METRIC_MAP).forEach(([metricName, [good_number, tested]]) => { - vitals[metricName][record.client] = {good_number: record[good_number], tested: record[tested]}; - }); - }); - - return Object.values(vitals); - '''; - - SELECT - STRING(DATE(date)) as date, - app AS technology, - rank, - geo, - GET_VITALS(ARRAY_AGG(STRUCT( - client, - origins_with_good_fid, - origins_with_good_cls, - origins_with_good_lcp, - origins_with_good_fcp, - origins_with_good_ttfb, - origins_with_good_inp, - origins_with_any_fid, - origins_with_any_cls, - origins_with_any_lcp, - origins_with_any_fcp, - origins_with_any_ttfb, - origins_with_any_inp, - origins_with_good_cwv, - origins_eligible_for_cwv - ))) AS vitals - FROM - `httparchive.core_web_vitals.technologies` - """, - "technologies": """ - SELECT - client, - app AS technology, - description, - category, - NULL AS similar_technologies, - origins - FROM - `httparchive.core_web_vitals.technologies` - JOIN - `httparchive.core_web_vitals.technology_descriptions` - ON - app = technology - """, - "page_weight": """ - CREATE TEMPORARY FUNCTION GET_PAGE_WEIGHT( - records ARRAY> - ) RETURNS ARRAY, - desktop STRUCT< - median_bytes INT64 - > - >> LANGUAGE js AS ''' - const METRICS = ['total', 'js', 'images']; - - // Initialize the page weight map. - const pageWeight = Object.fromEntries(METRICS.map(metricName => { - return [metricName, {name: metricName}]; - })); - - // Populate each client record. - records.forEach(record => { - METRICS.forEach(metricName => { - pageWeight[metricName][record.client] = {median_bytes: record[metricName]}; - }); - }); - - return Object.values(pageWeight); - '''; - - SELECT - STRING(DATE(date)) as date, - app AS technology, - rank, - geo, - GET_PAGE_WEIGHT(ARRAY_AGG(STRUCT( - client, - median_bytes_total, - median_bytes_js, - median_bytes_image - ))) AS pageWeight - FROM - `httparchive.core_web_vitals.technologies` - """, - "categories": """ - WITH categories AS ( - SELECT - category, - COUNT(DISTINCT root_page) AS origins - FROM - `httparchive.all.pages`, - UNNEST(technologies) AS t, - UNNEST(t.categories) AS category - WHERE - date = '2023-08-01' AND - client = 'mobile' - GROUP BY - category - ), - - technologies AS ( - SELECT - category, - technology, - COUNT(DISTINCT root_page) AS origins - FROM - `httparchive.all.pages`, - UNNEST(technologies) AS t, - UNNEST(t.categories) AS category - WHERE - date = '2023-08-01' AND - client = 'mobile' - GROUP BY - category, - technology - ) - - SELECT - category, - categories.origins, - ARRAY_AGG(technology ORDER BY technologies.origins DESC) AS technologies - FROM - categories - JOIN - technologies - USING - (category) - GROUP BY - category, - categories.origins - ORDER BY - categories.origins DESC - """ -} - -def buildQuery(start_date, end_date, query_type): - if query_type not in TECHNOLOGY_QUERIES: - raise ValueError(f"Query type {query_type} not found in TECHNOLOGY_QUERIES") - - query = TECHNOLOGY_QUERIES[query_type] - - if query_type != "technologies": - # add dates to query - if start_date and not end_date: - query = f"{query} WHERE date >= '{start_date}'" - elif not start_date and end_date: - query = f"{query} WHERE date <= '{end_date}'" - elif start_date and end_date: - query = f"{query} WHERE date BETWEEN '{start_date}' AND '{end_date}'" - else: - query = query - - if query_type == "adoption" or query_type == "lighthouse" or query_type == "core_web_vitals" or query_type == "page_weight": - query = f"{query} GROUP BY date, app, rank, geo" - - if query_type == "technologies": - query = f"{query} WHERE date = '2023-07-01' AND geo = 'ALL' AND rank = 'ALL'" - query = f"{query} ORDER BY origins DESC" - - logging.info(query) - - return query - - -def convert_decimal_to_float(data): - if isinstance(data, Decimal): - return float(data) - elif isinstance(data, dict): - new_dict = {} - for key, value in data.items(): - new_dict[key] = convert_decimal_to_float(value) - return new_dict - elif isinstance(data, list): - new_list = [] - for item in data: - new_list.append(convert_decimal_to_float(item)) - return new_list - else: - return data - -def create_hash_id(element, query_type): - - if query_type == "adoption" or query_type == "lighthouse" or query_type == "core_web_vitals" or query_type == "page_weight": - id = (element['date'] + "-" + element['technology'] + "-" + element['geo'] + "-" + element['rank']).encode('utf-8') - - if query_type == "technologies": - id = (element['client'] + "-" + element['technology'] + "-" + element['category']).encode('utf-8') - - if query_type == "categories": - id = (element['category']).encode('utf-8') - - hash_object = hashlib.sha256(id) - - return hash_object.hexdigest() - -class WriteToFirestoreBatchedDoFn(beam.DoFn): - """Write a batch of elements to Firestore.""" - def __init__(self, database, project, collection, query_type): - self.client = None - self.database = database - self.project = project - self.collection = collection - self.query_type = query_type - - def start_bundle(self): - # initialize client if it doesn't exist and create a collection reference - if self.client is None: - self.client = firestore.Client(project=self.project, database=self.database) - self.collection_ref = self.client.collection(self.collection) - - def process(self, elements): - for element in elements: - # creates a hash id for the document - hash_id = create_hash_id(element, self.query_type) - - doc_ref = self.collection_ref.document(hash_id) - doc_ref.set(element) - -def parse_arguments(argv): - """Parse command line arguments for the beam pipeline.""" - parser = argparse.ArgumentParser() - - # Query type - parser.add_argument( - '--query_type', - dest='query_type', - help='Query type', - required=True, - choices=TECHNOLOGY_QUERIES.keys()) - - # Firestore project - parser.add_argument( - '--firestore_project', - dest='firestore_project', - default='httparchive', - help='Firestore project', - required=True) - - # Firestore collection - parser.add_argument( - '--firestore_collection', - dest='firestore_collection', - default='lighthouse', - help='Firestore collection', - required=True) - - # Firestore database - parser.add_argument( - '--firestore_database', - dest='firestore_database', - default='(default)', - help='Firestore database', - required=True) - - # start date, optional - parser.add_argument( - '--start_date', - dest='start_date', - help='Start date', - required=False) - - # end date, optional - parser.add_argument( - '--end_date', - dest='end_date', - help='End date', - required=False) - - # parse arguments - known_args, pipeline_args = parser.parse_known_args(argv) - return known_args, pipeline_args - - -def create_pipeline(argv=None, save_main_session=True): - """Build the pipeline.""" - known_args, pipeline_args = parse_arguments(argv) - - query = buildQuery(known_args.start_date, known_args.end_date, known_args.query_type) - - pipeline_options = PipelineOptions(pipeline_args) - - # We use the save_main_session option because one or more DoFn's in this - # workflow rely on global context (e.g., a module imported at module level) - pipeline_options.view_as(SetupOptions).save_main_session = save_main_session - - # with beam.Pipeline(options=pipeline_options) as p: - p = beam.Pipeline(options=pipeline_options) - - # Read from BigQuery, convert decimal to float, group into batches, and write to Firestore - (p - | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(query=query, use_standard_sql=True) - | 'ConvertDecimalToFloat' >> beam.Map(convert_decimal_to_float) - | 'GroupIntoBatches' >> beam.BatchElements(min_batch_size=499, max_batch_size=499) - | 'WriteToFirestoreBatched' >> beam.ParDo(WriteToFirestoreBatchedDoFn( - database=known_args.firestore_database, - project=known_args.firestore_project, - collection=known_args.firestore_collection, - query_type=known_args.query_type - )) - ) - - return p - - -if __name__ == '__main__': - logging.getLogger().setLevel(logging.INFO) - p = create_pipeline(argv) - logging.debug("Pipeline created") - result = p.run() - logging.debug("Pipeline run") - From f50ff43123cfef18fafe42f48872af9c4cec87f1 Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Thu, 7 Dec 2023 14:57:10 +0000 Subject: [PATCH 23/51] fix debug logging for firestore ids --- modules/tech_report_pipeline.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/modules/tech_report_pipeline.py b/modules/tech_report_pipeline.py index 9cb4a9e..e34f54e 100644 --- a/modules/tech_report_pipeline.py +++ b/modules/tech_report_pipeline.py @@ -202,7 +202,8 @@ def create_pipeline(save_main_session=True): p = beam.Pipeline(options=beam_options) # Read from BigQuery, convert decimal to float, group into batches, and write to Firestore - (p + firestore_ids = ( + p | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(query=query, use_standard_sql=True) | 'FilterDates' >> beam.Filter(lambda row: filter_dates_by_query_type(known_args.query_type, row, known_args.start_date, known_args.end_date)) | 'ConvertDecimalToFloat' >> beam.Map(convert_decimal_to_float) @@ -216,7 +217,7 @@ def create_pipeline(save_main_session=True): # if logging level is DEBUG, log results if logging.getLogger().getEffectiveLevel() == logging.DEBUG: - p = p | 'LogResults' >> beam.Map(logging.debug) + firestore_ids | 'LogResults' >> beam.Map(logging.debug) return p From b25f298485c956f7cc30c2f1a0d798dbf5a6c48e Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sun, 28 Jan 2024 15:38:52 -0500 Subject: [PATCH 24/51] Update requirements --- requirements.tech-report.txt | 2 -- requirements.txt | 1 + 2 files changed, 1 insertion(+), 2 deletions(-) delete mode 100644 requirements.tech-report.txt diff --git a/requirements.tech-report.txt b/requirements.tech-report.txt deleted file mode 100644 index 24c1665..0000000 --- a/requirements.tech-report.txt +++ /dev/null @@ -1,2 +0,0 @@ -apache-beam[gcp]==2.43.0 -google-cloud-firestore==2.0.0 diff --git a/requirements.txt b/requirements.txt index 603e519..24c1665 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ apache-beam[gcp]==2.43.0 +google-cloud-firestore==2.0.0 From 5f5854513c01c564cb9bff2ed011955a37d27d87 Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sun, 28 Jan 2024 15:48:49 -0500 Subject: [PATCH 25/51] Update dependency installation in workflows --- .github/workflows/deploy-dataflow-flex-template.yml | 2 +- .github/workflows/unittest.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/deploy-dataflow-flex-template.yml b/.github/workflows/deploy-dataflow-flex-template.yml index e459d19..180b4fe 100644 --- a/.github/workflows/deploy-dataflow-flex-template.yml +++ b/.github/workflows/deploy-dataflow-flex-template.yml @@ -10,7 +10,7 @@ on: - "cloudbuild.yaml" - "Dockerfile" - "flex_template_metadata_*.json" - - "requirements.txt" + - "requirements*.txt" jobs: deploy: diff --git a/.github/workflows/unittest.yml b/.github/workflows/unittest.yml index db3b76b..de73ed1 100644 --- a/.github/workflows/unittest.yml +++ b/.github/workflows/unittest.yml @@ -27,7 +27,7 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + pip install -r requirements.txt -r requirements.dev.txt - name: "Test: unittest and code coverage" run: coverage run From 468098f7e9dfb8f87816390154657d9163410a69 Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sun, 28 Jan 2024 15:52:07 -0500 Subject: [PATCH 26/51] Update Apache Beam version to 2.52.0 --- requirements.txt | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/requirements.txt b/requirements.txt index 24c1665..fac725a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ -apache-beam[gcp]==2.43.0 +apache-beam[gcp]==2.52.0 google-cloud-firestore==2.0.0 diff --git a/setup.py b/setup.py index d3311c6..4f2337e 100644 --- a/setup.py +++ b/setup.py @@ -4,6 +4,6 @@ name="data-pipeline", version="0.0.1", packages=setuptools.find_packages(), - install_requires=["apache-beam[gcp]==2.43.0", "google-cloud-firestore==2.13.0"], + install_requires=["apache-beam[gcp]==2.52.0", "google-cloud-firestore==2.13.0"], package_data={"schema": ["*.json"]}, ) From 35087a308c2ca59c5798914346f3899ed8cb6426 Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sun, 11 Feb 2024 17:17:34 -0500 Subject: [PATCH 27/51] Simplify date filtering in technology queries --- modules/constants.py | 10 +++-- modules/tech_report_pipeline.py | 75 ++++----------------------------- 2 files changed, 16 insertions(+), 69 deletions(-) diff --git a/modules/constants.py b/modules/constants.py index 60a912c..fb8928b 100644 --- a/modules/constants.py +++ b/modules/constants.py @@ -143,6 +143,7 @@ class MaxContentSize(Enum): ))) AS adoption FROM `httparchive.core_web_vitals.technologies` + WHERE date = '{date}' GROUP BY date, app, rank, geo """, "lighthouse": """ @@ -203,6 +204,7 @@ class MaxContentSize(Enum): ))) AS lighthouse FROM `httparchive.core_web_vitals.technologies` + WHERE date = '{date}' GROUP BY date, app, rank, geo """, "core_web_vitals": """ @@ -284,6 +286,7 @@ class MaxContentSize(Enum): ))) AS vitals FROM `httparchive.core_web_vitals.technologies` + WHERE date = '{date}' GROUP BY date, app, rank, geo """, "technologies": """ @@ -301,7 +304,7 @@ class MaxContentSize(Enum): `httparchive.core_web_vitals.technology_descriptions` ON app = technology - WHERE date = '2023-07-01' AND geo = 'ALL' AND rank = 'ALL' + WHERE date = '{date}' AND geo = 'ALL' AND rank = 'ALL' ORDER BY origins DESC """, "page_weight": """ @@ -351,6 +354,7 @@ class MaxContentSize(Enum): ))) AS pageWeight FROM `httparchive.core_web_vitals.technologies` + WHERE date = '{date}' GROUP BY date, app, rank, geo """, "categories": """ @@ -363,7 +367,7 @@ class MaxContentSize(Enum): UNNEST(technologies) AS t, UNNEST(t.categories) AS category WHERE - date = '2023-08-01' AND + date = '{date}' AND client = 'mobile' GROUP BY category @@ -379,7 +383,7 @@ class MaxContentSize(Enum): UNNEST(technologies) AS t, UNNEST(t.categories) AS category WHERE - date = '2023-08-01' AND + date = '{date}' AND client = 'mobile' GROUP BY category, diff --git a/modules/tech_report_pipeline.py b/modules/tech_report_pipeline.py index e34f54e..cd17921 100644 --- a/modules/tech_report_pipeline.py +++ b/modules/tech_report_pipeline.py @@ -25,62 +25,13 @@ def technology_hash_id(element: dict, query_type: str, key_map=constants.TECHNOL return hash -def build_query(query_type, queries=constants.TECHNOLOGY_QUERIES): +def build_query(query_type, date, queries=constants.TECHNOLOGY_QUERIES): if query_type not in queries: raise ValueError(f"Query type {query_type} not found in TECHNOLOGY_QUERIES") query = queries[query_type] - logging.info(query) - return query - - -def filter_dates_by_query_type(query_type, row, start_date, end_date) -> bool: - """Filter rows by date. For some queries, use the latest month available. For others, use the date range specified by the user.""" - if query_type in ["categories", "technologies"]: - return filter_by_month(row, start_date, end_date) - else: - return filter_by_dates(row, start_date, end_date) - - -def filter_by_month(row, start_date, end_date) -> bool: - """Filter rows by date range if given, otherwise by current month. If start_date and end_date are given, use those. If only start_date is given, use the entire month. If only end_date is given, use the entire month. If neither are given, use the current month.""" - if 'date' not in row: - return True - - if start_date and end_date: - first = date.fromisoformat(start_date) - last = date.fromisoformat(end_date) - elif start_date: - first = date.fromisoformat(start_date).replace(day=1) - last = first.replace(day=calendar.monthrange(first.year, first.month)[1]) - elif end_date: - last = date.fromisoformat(end_date) - first = last.replace(day=1) - else: - today = date.today() - first = today.replace(day=1) - last = today.replace(day=calendar.monthrange(today.year, today.month)[1]) - - # if first and last are greater than one month apart, throw an error - if first.replace(day=1) != last.replace(day=1): - raise ValueError(f"Start and end dates must be within the same month. {start_date=}, {end_date=}") - - return first <= row['date'] <= last - - -def filter_by_dates(row, start_date, end_date) -> bool: - """Filter rows between start and end date""" - if not start_date and not end_date: - return True - elif 'date' not in row: - return True - elif start_date and end_date: - return start_date <= row['date'] <= end_date - elif start_date: - return start_date <= row['date'] - elif end_date: - return row['date'] <= end_date - else: - return True + parameterized_query = query.format(date=date) + logging.info(parameterized_query) + return parameterized_query def convert_decimal_to_float(data): @@ -162,18 +113,11 @@ def _add_argparse_args(cls, parser): help='Firestore database', required=True) - # start date, optional - parser.add_argument( - '--start_date', - dest='start_date', - help='Start date', - required=False) - - # end date, optional + # date parser.add_argument( - '--end_date', - dest='end_date', - help='End date', + '--date', + dest='date', + help='Date', required=False) @@ -191,7 +135,7 @@ def create_pipeline(save_main_session=True): """Build the pipeline.""" known_args, beam_options = parse_args() - query = build_query(known_args.query_type) + query = build_query(known_args.query_type, known_args.date) logging.info(f"Pipeline options: {beam_options.get_all_options()}") @@ -205,7 +149,6 @@ def create_pipeline(save_main_session=True): firestore_ids = ( p | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(query=query, use_standard_sql=True) - | 'FilterDates' >> beam.Filter(lambda row: filter_dates_by_query_type(known_args.query_type, row, known_args.start_date, known_args.end_date)) | 'ConvertDecimalToFloat' >> beam.Map(convert_decimal_to_float) | 'WriteToFirestore' >> beam.ParDo(WriteToFirestoreDoFn( project=known_args.firestore_project, From 75674e480c738321304b41e502e09e284b7a8cf3 Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sun, 11 Feb 2024 17:17:44 -0500 Subject: [PATCH 28/51] Update google-cloud-firestore version --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index fac725a..cc5fce8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ apache-beam[gcp]==2.52.0 -google-cloud-firestore==2.0.0 +google-cloud-firestore==2.14.0 From 9940e84e0cb711caa806185ccfb1a0eaa9830c42 Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sun, 11 Feb 2024 17:17:58 -0500 Subject: [PATCH 29/51] Add validation for missing keys in technology_hash_id function --- modules/tech_report_pipeline.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/modules/tech_report_pipeline.py b/modules/tech_report_pipeline.py index cd17921..1a85d24 100644 --- a/modules/tech_report_pipeline.py +++ b/modules/tech_report_pipeline.py @@ -20,6 +20,8 @@ def technology_hash_id(element: dict, query_type: str, key_map=constants.TECHNOL if query_type not in key_map: raise ValueError(f"Invalid query type: {query_type}") keys = sorted(key_map[query_type]) + if not all(key in element for key in keys): + raise ValueError(f"Missing keys in element {element} for query type {query_type}") values = [element.get(key) for key in keys] hash = hashlib.sha256("-".join(values).encode()).hexdigest() return hash From 396ec86fee34cde5e7209bc76d8b4e95fe4bc901 Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sun, 11 Feb 2024 17:20:00 -0500 Subject: [PATCH 30/51] Add unit tests for technology_hash_id function --- test/test_tech_report_pipeline.py | 33 +++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) create mode 100644 test/test_tech_report_pipeline.py diff --git a/test/test_tech_report_pipeline.py b/test/test_tech_report_pipeline.py new file mode 100644 index 0000000..bdd1243 --- /dev/null +++ b/test/test_tech_report_pipeline.py @@ -0,0 +1,33 @@ +import unittest +import hashlib +from unittest.mock import patch +from modules.tech_report_pipeline import technology_hash_id +from modules import constants + +class TestTechnologyHashId(unittest.TestCase): + @patch('modules.tech_report_pipeline.constants') + def test_technology_hash_id_valid(self, patched_constants): + # Test with valid input + element = {'key1': 'value1', 'key2': 'value2'} + query_type = 'valid_query_type' + patched_constants.TECHNOLOGY_QUERY_ID_KEYS = {query_type: ['key1', 'key2']} + expected_hash = hashlib.sha256('value1-value2'.encode()).hexdigest() + self.assertEqual(technology_hash_id(element, query_type), expected_hash) + + def test_technology_hash_id_invalid_query_type(self): + # Test with invalid query type + element = {'key1': 'value1', 'key2': 'value2'} + query_type = 'invalid_query_type' + with self.assertRaises(ValueError): + technology_hash_id(element, query_type) + + def test_technology_hash_id_missing_key(self): + # Test with missing key in element + element = {'key1': 'value1'} + query_type = 'valid_query_type' + constants.TECHNOLOGY_QUERY_ID_KEYS[query_type] = ['key1', 'key2'] + with self.assertRaises(ValueError): + technology_hash_id(element, query_type) + +if __name__ == '__main__': + unittest.main() \ No newline at end of file From 5a898f9ac56b8c7925041ace7ff9c44e22a5e51b Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sun, 11 Feb 2024 17:30:01 -0500 Subject: [PATCH 31/51] Update dependency installation in unittest workflow --- .github/workflows/unittest.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/unittest.yml b/.github/workflows/unittest.yml index 2dd2955..223feb5 100644 --- a/.github/workflows/unittest.yml +++ b/.github/workflows/unittest.yml @@ -27,7 +27,7 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install -r requirements.txt -r requirements.dev.txt + if [ -f requirements.txt ]; then pip install -r requirements.txt; fi - name: "Test: unittest and code coverage" run: coverage run From 269e19150f0349044d6d8666fcc93302356f3ef2 Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sun, 11 Feb 2024 17:40:54 -0500 Subject: [PATCH 32/51] Linting --- modules/constants.py | 3 ++- modules/tech_report_deletion.py | 5 +++-- modules/tech_report_pipeline.py | 2 -- test/test_tech_report_pipeline.py | 4 +++- 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/modules/constants.py b/modules/constants.py index fb8928b..2898cc1 100644 --- a/modules/constants.py +++ b/modules/constants.py @@ -115,7 +115,7 @@ class MaxContentSize(Enum): } """Mapping of query types to a list of fields that uniquely identify a row.""" - +# editorconfig-checker-disable TECHNOLOGY_QUERIES = { "adoption": """ CREATE TEMPORARY FUNCTION GET_ADOPTION( @@ -408,3 +408,4 @@ class MaxContentSize(Enum): """ } """Mapping of query types to BigQuery SQL queries.""" +# editorconfig-checker-enable diff --git a/modules/tech_report_deletion.py b/modules/tech_report_deletion.py index dfcc41f..48c8803 100644 --- a/modules/tech_report_deletion.py +++ b/modules/tech_report_deletion.py @@ -113,7 +113,8 @@ def create_pipeline(argv=None, save_main_session=True): p = beam.Pipeline(options=pipeline_options) # Read from BigQuery, convert decimal to float, group into batches, and write to Firestore - (p + ( + p | 'Create' >> beam.Create([known_args.firestore_collection]) | 'QueryFirestore' >> beam.ParDo(QueryFirestoreDoFn( database=known_args.firestore_database, @@ -140,4 +141,4 @@ def create_pipeline(argv=None, save_main_session=True): # commented out for local testing # if not isinstance(p.runner, DataflowRunner): - # result.wait_until_finish() \ No newline at end of file + # result.wait_until_finish() diff --git a/modules/tech_report_pipeline.py b/modules/tech_report_pipeline.py index 1a85d24..5fb8ee0 100644 --- a/modules/tech_report_pipeline.py +++ b/modules/tech_report_pipeline.py @@ -1,7 +1,5 @@ #!/usr/bin/env python3 -import calendar -from datetime import date from decimal import Decimal import hashlib import apache_beam as beam diff --git a/test/test_tech_report_pipeline.py b/test/test_tech_report_pipeline.py index bdd1243..58fa2c8 100644 --- a/test/test_tech_report_pipeline.py +++ b/test/test_tech_report_pipeline.py @@ -4,6 +4,7 @@ from modules.tech_report_pipeline import technology_hash_id from modules import constants + class TestTechnologyHashId(unittest.TestCase): @patch('modules.tech_report_pipeline.constants') def test_technology_hash_id_valid(self, patched_constants): @@ -29,5 +30,6 @@ def test_technology_hash_id_missing_key(self): with self.assertRaises(ValueError): technology_hash_id(element, query_type) + if __name__ == '__main__': - unittest.main() \ No newline at end of file + unittest.main() From f32b41246b192054297859b5359cb3ee8dfa518c Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sun, 11 Feb 2024 17:44:45 -0500 Subject: [PATCH 33/51] Remove unnecessary code for local testing --- modules/tech_report_deletion.py | 6 +----- modules/tech_report_pipeline.py | 6 +----- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/modules/tech_report_deletion.py b/modules/tech_report_deletion.py index 48c8803..673bdd8 100644 --- a/modules/tech_report_deletion.py +++ b/modules/tech_report_deletion.py @@ -136,9 +136,5 @@ def create_pipeline(argv=None, save_main_session=True): logging.getLogger().setLevel(logging.INFO) p = create_pipeline(argv) logging.debug("Pipeline created") - result = p.run() + p.run() logging.debug("Pipeline run") - - # commented out for local testing - # if not isinstance(p.runner, DataflowRunner): - # result.wait_until_finish() diff --git a/modules/tech_report_pipeline.py b/modules/tech_report_pipeline.py index 5fb8ee0..1bf31ff 100644 --- a/modules/tech_report_pipeline.py +++ b/modules/tech_report_pipeline.py @@ -169,9 +169,5 @@ def create_pipeline(save_main_session=True): logging.getLogger().setLevel(logging.INFO) p = create_pipeline() logging.debug("Pipeline created") - result = p.run() + p.run() logging.debug("Pipeline run") - - # commented out for local testing - # if not isinstance(p.runner, DataflowRunner): - # result.wait_until_finish() From 8923e9f0313fd4ad79179304b0f57d5d5fbc8def Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sun, 11 Feb 2024 18:01:34 -0500 Subject: [PATCH 34/51] Linting --- modules/tech_report_pipeline.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/modules/tech_report_pipeline.py b/modules/tech_report_pipeline.py index 1bf31ff..1a0607d 100644 --- a/modules/tech_report_pipeline.py +++ b/modules/tech_report_pipeline.py @@ -14,7 +14,8 @@ def technology_hash_id(element: dict, query_type: str, key_map=constants.TECHNOLOGY_QUERY_ID_KEYS): - """Returns a hashed id for a set of technology query keys. Keys are sorted alphabetically and joined with a dash. The resulting string is hashed using SHA256.""" + """Returns a hashed id for a set of technology query keys. Keys are sorted alphabetically and joined with a dash. + The resulting string is hashed using SHA256.""" if query_type not in key_map: raise ValueError(f"Invalid query type: {query_type}") keys = sorted(key_map[query_type]) @@ -52,7 +53,8 @@ def convert_decimal_to_float(data): class WriteToFirestoreDoFn(beam.DoFn): - """Write a single element to Firestore. Yields the hash id of the document. Retry on failure using exponential backoff, see :func:`apache_beam.utils.retry.with_exponential_backoff`.""" + """Write a single element to Firestore. Yields the hash id of the document. + Retry on failure using exponential backoff, see :func:`apache_beam.utils.retry.with_exponential_backoff`.""" def __init__(self, project, database, collection, query_type): self.client = None self.project = project From e99eae7dd896ab95d372860a411b0e60739373ae Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sun, 11 Feb 2024 18:37:07 -0500 Subject: [PATCH 35/51] Update import statement for google.cloud.firestore in tech_report_deletion.py --- modules/tech_report_deletion.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/tech_report_deletion.py b/modules/tech_report_deletion.py index 673bdd8..98ebf3d 100644 --- a/modules/tech_report_deletion.py +++ b/modules/tech_report_deletion.py @@ -2,7 +2,7 @@ from sys import argv import apache_beam as beam -from google.cloud import firestore +import google.cloud.firestore as firestore from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions import logging import argparse From 9954c475130007b69a0e4b913b990537c32816fe Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sun, 11 Feb 2024 18:40:22 -0500 Subject: [PATCH 36/51] Update import statement for google.cloud.firestore in tech_report_pipeline.py --- modules/tech_report_pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/tech_report_pipeline.py b/modules/tech_report_pipeline.py index 1a0607d..8cdc928 100644 --- a/modules/tech_report_pipeline.py +++ b/modules/tech_report_pipeline.py @@ -4,7 +4,7 @@ import hashlib import apache_beam as beam from apache_beam.utils import retry -from google.cloud import firestore +import google.cloud.firestore as firestore from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions import logging import argparse From 0f5675756c2d6425cbcc61bebbc739b1429b094a Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sun, 11 Feb 2024 18:46:40 -0500 Subject: [PATCH 37/51] Update google-cloud-firestore version to 2.14.0 --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 4f2337e..822d4b3 100644 --- a/setup.py +++ b/setup.py @@ -4,6 +4,6 @@ name="data-pipeline", version="0.0.1", packages=setuptools.find_packages(), - install_requires=["apache-beam[gcp]==2.52.0", "google-cloud-firestore==2.13.0"], + install_requires=["apache-beam[gcp]==2.52.0", "google-cloud-firestore==2.14.0"], package_data={"schema": ["*.json"]}, ) From 94d4965b0878226fbfc27980d1f31dd40129a8df Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sun, 11 Feb 2024 18:57:57 -0500 Subject: [PATCH 38/51] Add Python 3.8 setup and dependency installation to linting action --- .github/workflows/linter.yml | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/.github/workflows/linter.yml b/.github/workflows/linter.yml index 7f6a1d4..3f22bb1 100644 --- a/.github/workflows/linter.yml +++ b/.github/workflows/linter.yml @@ -23,6 +23,15 @@ jobs: with: # Full git history is needed to get a proper list of changed files within `super-linter` fetch-depth: 0 + - name: Set up Python 3.8 + uses: actions/setup-python@v5 + with: + python-version: 3.8 + cache: 'pip' + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt - name: Set VALIDATE_ALL_CODEBASE variable to false # Only run the full workflow for manual runs or if upgrading the super linter if: | From 4ef9c8da5efff2b4ea274ca82c4fa4e4f3ee5373 Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sun, 11 Feb 2024 19:13:01 -0500 Subject: [PATCH 39/51] testing: Add check for library path --- .github/workflows/linter.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/linter.yml b/.github/workflows/linter.yml index 3f22bb1..8b95eaf 100644 --- a/.github/workflows/linter.yml +++ b/.github/workflows/linter.yml @@ -32,6 +32,9 @@ jobs: run: | python -m pip install --upgrade pip pip install -r requirements.txt + - name: Check library path + run: | + python -c "import google.cloud firestore; print(firestore)" - name: Set VALIDATE_ALL_CODEBASE variable to false # Only run the full workflow for manual runs or if upgrading the super linter if: | From 24487a4d1d779bf46b49d25a173952becba2fb19 Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sun, 11 Feb 2024 19:17:31 -0500 Subject: [PATCH 40/51] Fix import statement for Google Cloud Firestore --- .github/workflows/linter.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/linter.yml b/.github/workflows/linter.yml index 8b95eaf..cca9cd3 100644 --- a/.github/workflows/linter.yml +++ b/.github/workflows/linter.yml @@ -34,7 +34,7 @@ jobs: pip install -r requirements.txt - name: Check library path run: | - python -c "import google.cloud firestore; print(firestore)" + python -c "from google.cloud import firestore; print(firestore)" - name: Set VALIDATE_ALL_CODEBASE variable to false # Only run the full workflow for manual runs or if upgrading the super linter if: | From 7b70987bd45b86fa0dca5b792b5c741437740bd6 Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sun, 11 Feb 2024 20:17:09 -0500 Subject: [PATCH 41/51] Update import statements in tech_report_deletion.py and tech_report_pipeline.py --- modules/tech_report_deletion.py | 2 +- modules/tech_report_pipeline.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/tech_report_deletion.py b/modules/tech_report_deletion.py index 98ebf3d..0a9b358 100644 --- a/modules/tech_report_deletion.py +++ b/modules/tech_report_deletion.py @@ -2,7 +2,7 @@ from sys import argv import apache_beam as beam -import google.cloud.firestore as firestore +from google.cloud import firestore # pylint: disable=import-error from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions import logging import argparse diff --git a/modules/tech_report_pipeline.py b/modules/tech_report_pipeline.py index 8cdc928..9c69eb0 100644 --- a/modules/tech_report_pipeline.py +++ b/modules/tech_report_pipeline.py @@ -4,7 +4,7 @@ import hashlib import apache_beam as beam from apache_beam.utils import retry -import google.cloud.firestore as firestore +from google.cloud import firestore # pylint: disable=import-error from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions import logging import argparse From ebd0fefa9a6d074d091f3193b275d9d78e00e8ab Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sun, 11 Feb 2024 20:39:31 -0500 Subject: [PATCH 42/51] Fix import formatting in tech_report_deletion.py and tech_report_pipeline.py --- modules/tech_report_deletion.py | 2 +- modules/tech_report_pipeline.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/tech_report_deletion.py b/modules/tech_report_deletion.py index 0a9b358..081adf7 100644 --- a/modules/tech_report_deletion.py +++ b/modules/tech_report_deletion.py @@ -2,7 +2,7 @@ from sys import argv import apache_beam as beam -from google.cloud import firestore # pylint: disable=import-error +from google.cloud import firestore # pylint: disable=import-error from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions import logging import argparse diff --git a/modules/tech_report_pipeline.py b/modules/tech_report_pipeline.py index 9c69eb0..a19d3cc 100644 --- a/modules/tech_report_pipeline.py +++ b/modules/tech_report_pipeline.py @@ -4,7 +4,7 @@ import hashlib import apache_beam as beam from apache_beam.utils import retry -from google.cloud import firestore # pylint: disable=import-error +from google.cloud import firestore # pylint: disable=import-error from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions import logging import argparse From 9292a2ad7c2ad73207416cec8b2aa0f7d0eb5fa1 Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sun, 11 Feb 2024 20:44:54 -0500 Subject: [PATCH 43/51] Remove Python setup and dependency installation from linting action --- .github/workflows/linter.yml | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/.github/workflows/linter.yml b/.github/workflows/linter.yml index cca9cd3..7f6a1d4 100644 --- a/.github/workflows/linter.yml +++ b/.github/workflows/linter.yml @@ -23,18 +23,6 @@ jobs: with: # Full git history is needed to get a proper list of changed files within `super-linter` fetch-depth: 0 - - name: Set up Python 3.8 - uses: actions/setup-python@v5 - with: - python-version: 3.8 - cache: 'pip' - - name: Install dependencies - run: | - python -m pip install --upgrade pip - pip install -r requirements.txt - - name: Check library path - run: | - python -c "from google.cloud import firestore; print(firestore)" - name: Set VALIDATE_ALL_CODEBASE variable to false # Only run the full workflow for manual runs or if upgrading the super linter if: | From b5fa43aa48cc65da8ff2580898d73ffa43b2360d Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sat, 9 Mar 2024 15:19:44 +0000 Subject: [PATCH 44/51] Make required parameters optional --- modules/tech_report_pipeline.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/modules/tech_report_pipeline.py b/modules/tech_report_pipeline.py index a19d3cc..2fe7773 100644 --- a/modules/tech_report_pipeline.py +++ b/modules/tech_report_pipeline.py @@ -89,7 +89,7 @@ def _add_argparse_args(cls, parser): '--query_type', dest='query_type', help='Query type', - required=True, + required=False, # should be true choices=constants.TECHNOLOGY_QUERIES.keys()) # Firestore project @@ -98,14 +98,16 @@ def _add_argparse_args(cls, parser): dest='firestore_project', default='httparchive', help='Firestore project', - required=True) + required=False, # should be `True` but fails due to the way beam expects all pipelines to have the same options + ) # Firestore collection parser.add_argument( '--firestore_collection', dest='firestore_collection', help='Firestore collection', - required=True) + required=False, # should be `True` but fails due to the way beam expects all pipelines to have the same options + ) # Firestore database parser.add_argument( @@ -113,7 +115,8 @@ def _add_argparse_args(cls, parser): dest='firestore_database', default='(default)', help='Firestore database', - required=True) + required=False, # should be `True` but fails due to the way beam expects all pipelines to have the same options + ) # date parser.add_argument( From 9428e247ad45bccf9849283b5b406d97c26461ae Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sat, 9 Mar 2024 15:26:47 +0000 Subject: [PATCH 45/51] linting - shorten comments --- modules/tech_report_pipeline.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/modules/tech_report_pipeline.py b/modules/tech_report_pipeline.py index 2fe7773..e4cb4df 100644 --- a/modules/tech_report_pipeline.py +++ b/modules/tech_report_pipeline.py @@ -98,7 +98,7 @@ def _add_argparse_args(cls, parser): dest='firestore_project', default='httparchive', help='Firestore project', - required=False, # should be `True` but fails due to the way beam expects all pipelines to have the same options + required=False, # should be `True` but fails since beam expects all pipelines to have the same options ) # Firestore collection @@ -106,7 +106,7 @@ def _add_argparse_args(cls, parser): '--firestore_collection', dest='firestore_collection', help='Firestore collection', - required=False, # should be `True` but fails due to the way beam expects all pipelines to have the same options + required=False, # should be `True` but fails since beam expects all pipelines to have the same options ) # Firestore database @@ -115,7 +115,7 @@ def _add_argparse_args(cls, parser): dest='firestore_database', default='(default)', help='Firestore database', - required=False, # should be `True` but fails due to the way beam expects all pipelines to have the same options + required=False, # should be `True` but fails since beam expects all pipelines to have the same options ) # date From ac94e8eacc6193f6d6603579bab25f86c422881b Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sat, 9 Mar 2024 16:01:51 +0000 Subject: [PATCH 46/51] fix adoption query --- modules/constants.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/constants.py b/modules/constants.py index 2898cc1..b9518cf 100644 --- a/modules/constants.py +++ b/modules/constants.py @@ -127,9 +127,9 @@ class MaxContentSize(Enum): desktop INT64, mobile INT64 > LANGUAGE js AS ''' - return Object.fromEntries(records.map(({client, origins}) => { + return Object.fromEntries(records.map(({{client, origins}}) => {{ return [client, origins]; - })); + }})); '''; SELECT From 27b8663ca09d08e866656d91ace482b0d9d5f28a Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sat, 9 Mar 2024 16:24:45 +0000 Subject: [PATCH 47/51] fix lighthouse query --- modules/constants.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/modules/constants.py b/modules/constants.py index b9518cf..6ae1c49 100644 --- a/modules/constants.py +++ b/modules/constants.py @@ -165,25 +165,25 @@ class MaxContentSize(Enum): median_score NUMERIC > >> LANGUAGE js AS ''' - const METRIC_MAP = { + const METRIC_MAP = {{ accessibility: 'median_lighthouse_score_accessibility', best_practices: 'median_lighthouse_score_best_practices', performance: 'median_lighthouse_score_performance', pwa: 'median_lighthouse_score_pwa', seo: 'median_lighthouse_score_seo', - }; + }}; // Initialize the Lighthouse map. - const lighthouse = Object.fromEntries(Object.keys(METRIC_MAP).map(metricName => { - return [metricName, {name: metricName}]; - })); + const lighthouse = Object.fromEntries(Object.keys(METRIC_MAP).map(metricName => {{ + return [metricName, {{name: metricName}}]; + }})); // Populate each client record. - records.forEach(record => { - Object.entries(METRIC_MAP).forEach(([metricName, median_score]) => { - lighthouse[metricName][record.client] = {median_score: record[median_score]}; - }); - }); + records.forEach(record => {{ + Object.entries(METRIC_MAP).forEach(([metricName, median_score]) => {{ + lighthouse[metricName][record.client] = {{median_score: record[median_score]}}; + }}); + }}); return Object.values(lighthouse); '''; From 6a2472ee599b9c3f808720d7901420c5d96b8942 Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sat, 9 Mar 2024 16:29:58 +0000 Subject: [PATCH 48/51] fix core_web_vitals query --- modules/constants.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/modules/constants.py b/modules/constants.py index 6ae1c49..d70f845 100644 --- a/modules/constants.py +++ b/modules/constants.py @@ -237,7 +237,7 @@ class MaxContentSize(Enum): tested INT64 > >> LANGUAGE js AS ''' - const METRIC_MAP = { + const METRIC_MAP = {{ overall: ['origins_with_good_cwv', 'origins_eligible_for_cwv'], LCP: ['origins_with_good_lcp', 'origins_with_any_lcp'], CLS: ['origins_with_good_cls', 'origins_with_any_cls'], @@ -245,19 +245,19 @@ class MaxContentSize(Enum): FCP: ['origins_with_good_fcp', 'origins_with_any_fcp'], TTFB: ['origins_with_good_ttfb', 'origins_with_any_ttfb'], INP: ['origins_with_good_inp', 'origins_with_any_inp'] - }; + }}; // Initialize the vitals map. - const vitals = Object.fromEntries(Object.keys(METRIC_MAP).map(metricName => { - return [metricName, {name: metricName}]; - })); + const vitals = Object.fromEntries(Object.keys(METRIC_MAP).map(metricName => {{ + return [metricName, {{name: metricName}}]; + }})); // Populate each client record. - records.forEach(record => { - Object.entries(METRIC_MAP).forEach(([metricName, [good_number, tested]]) => { - vitals[metricName][record.client] = {good_number: record[good_number], tested: record[tested]}; - }); - }); + records.forEach(record => {{ + Object.entries(METRIC_MAP).forEach(([metricName, [good_number, tested]]) => {{ + vitals[metricName][record.client] = {{good_number: record[good_number], tested: record[tested]}}; + }}); + }}); return Object.values(vitals); '''; From e13d267a4e7d7afa1e7ff21d23a5c9ce3bd4e385 Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sat, 9 Mar 2024 16:33:28 +0000 Subject: [PATCH 49/51] fix page_weight query --- modules/constants.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/modules/constants.py b/modules/constants.py index d70f845..66cbf3c 100644 --- a/modules/constants.py +++ b/modules/constants.py @@ -327,16 +327,16 @@ class MaxContentSize(Enum): const METRICS = ['total', 'js', 'images']; // Initialize the page weight map. - const pageWeight = Object.fromEntries(METRICS.map(metricName => { - return [metricName, {name: metricName}]; - })); + const pageWeight = Object.fromEntries(METRICS.map(metricName => {{ + return [metricName, {{name: metricName}}]; + }})); // Populate each client record. - records.forEach(record => { - METRICS.forEach(metricName => { - pageWeight[metricName][record.client] = {median_bytes: record[metricName]}; - }); - }); + records.forEach(record => {{ + METRICS.forEach(metricName => {{ + pageWeight[metricName][record.client] = {{median_bytes: record[metricName]}}; + }}); + }}); return Object.values(pageWeight); '''; From 1c567ae0e84ae98d7741ddfbbdeb04d15ed3a7ab Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sat, 9 Mar 2024 16:36:04 +0000 Subject: [PATCH 50/51] update query mapping comment to note escaping --- modules/constants.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/modules/constants.py b/modules/constants.py index 66cbf3c..3678dd6 100644 --- a/modules/constants.py +++ b/modules/constants.py @@ -407,5 +407,8 @@ class MaxContentSize(Enum): categories.origins DESC """ } -"""Mapping of query types to BigQuery SQL queries.""" +"""Mapping of query types to BigQuery SQL queries. + The queries are formatted with the `date` parameter. + Queries containing javascript UDFs require additional curly braces to escape the braces in the UDF. +""" # editorconfig-checker-enable From 22c42fe8802f9fc2ecdb808560dfb6c1f3ec85d6 Mon Sep 17 00:00:00 2001 From: Giancarlo Faranda Date: Sat, 9 Mar 2024 18:58:28 +0000 Subject: [PATCH 51/51] update dependencies --- requirements.dev.txt | 2 -- requirements.txt | 2 +- setup.py | 2 +- 3 files changed, 2 insertions(+), 4 deletions(-) delete mode 100644 requirements.dev.txt diff --git a/requirements.dev.txt b/requirements.dev.txt deleted file mode 100644 index 13a168f..0000000 --- a/requirements.dev.txt +++ /dev/null @@ -1,2 +0,0 @@ -black==23.7.0 -coverage>=6.4.4 diff --git a/requirements.txt b/requirements.txt index d0b4bc7..ccc7593 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -apache-beam[gcp]==2.54.0 +apache-beam[gcp]==2.51.0 google-cloud-firestore==2.14.0 black==24.2.0 coverage>=6.4.4 diff --git a/setup.py b/setup.py index 6038a65..2024df5 100644 --- a/setup.py +++ b/setup.py @@ -4,6 +4,6 @@ name="data-pipeline", version="0.0.1", packages=setuptools.find_packages(), - install_requires=["apache-beam[gcp]==2.54.0", "google-cloud-firestore==2.14.0"], + install_requires=["apache-beam[gcp]==2.51.0", "google-cloud-firestore==2.14.0"], package_data={"schema": ["*.json"]}, )