From cc351a3d28fa52ef72f5857120dfa278b4989eaa Mon Sep 17 00:00:00 2001 From: Barry Pollard Date: Thu, 7 Mar 2024 14:15:18 +0000 Subject: [PATCH 1/3] More fixes to improve the run --- modules/non_summary_pipeline.py | 15 +++++++++++++++ modules/transformation.py | 2 +- requirements.txt | 2 +- run_pipeline_all.sh | 2 +- run_pipeline_combined.sh | 2 +- setup.py | 2 +- 6 files changed, 20 insertions(+), 5 deletions(-) diff --git a/modules/non_summary_pipeline.py b/modules/non_summary_pipeline.py index 5df7888..f227f92 100644 --- a/modules/non_summary_pipeline.py +++ b/modules/non_summary_pipeline.py @@ -202,6 +202,12 @@ def get_response_bodies(har): """Parses response bodies from a HAR object.""" page_url = get_page_url(har) + if not page_url: + logging.warning( + "Skipping response bodies: unable to get page URL (see preceding warning)." + ) + return None + requests = har.get("log").get("entries") response_bodies = [] @@ -247,6 +253,13 @@ def get_technologies(har): page = har.get("log").get("pages")[0] page_url = page.get("_URL") + + if not page_url: + logging.warning( + "Skipping technologies: unable to get page URL (see preceding warning)." + ) + return None + app_names = page.get("_detected_apps", {}) categories = page.get("_detected", {}) metadata = get_metadata(har) @@ -431,6 +444,8 @@ def to_json(obj): raise ValueError return json.dumps(obj, separators=(",", ":"), ensure_ascii=False) + .encode("utf-8", "surrogatepass") + .decode("utf-8", "replace") def from_json(file_name, element): diff --git a/modules/transformation.py b/modules/transformation.py index 4eee1bf..f82abd9 100644 --- a/modules/transformation.py +++ b/modules/transformation.py @@ -85,7 +85,7 @@ def __init__( "create_disposition": BigQueryDisposition.CREATE_IF_NEEDED, "write_disposition": BigQueryDisposition.WRITE_APPEND, "additional_bq_parameters": { - "maxBadRecords": 10, + "maxBadRecords": 100, "ignoreUnknownValues": True, **self.additional_bq_parameters, }, diff --git a/requirements.txt b/requirements.txt index 6d79a5e..dca473f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ -apache-beam[gcp]==2.54.0 +apache-beam[gcp]==2.50.0 black==24.2.0 coverage>=6.4.4 diff --git a/run_pipeline_all.sh b/run_pipeline_all.sh index f54d282..e9f77fa 100755 --- a/run_pipeline_all.sh +++ b/run_pipeline_all.sh @@ -1,6 +1,6 @@ #!/bin/bash python run_all.py \ - --input_file=gs://httparchive/crawls_manifest/android-Sep_1_2023.txt \ + --input_file=gs://httparchive/crawls_manifest/android-Feb_1_2024.txt \ --runner=DataflowRunner \ --project=httparchive \ --temp_location=gs://httparchive-staging/experimental/temp \ diff --git a/run_pipeline_combined.sh b/run_pipeline_combined.sh index 0d13b8b..4a8c58c 100755 --- a/run_pipeline_combined.sh +++ b/run_pipeline_combined.sh @@ -1,7 +1,7 @@ #!/bin/bash # shellcheck disable=SC1143,SC2211,SC2215 python3 run_combined.py \ - --input_file=gs://httparchive/crawls_manifest/chrome-Sep_1_2023.txt \ + --input_file=gs://httparchive/crawls_manifest/chrome-Feb_1_2024.txt \ --runner=DataflowRunner \ --project=httparchive \ --temp_location=gs://httparchive-staging/experimental/temp \ diff --git a/setup.py b/setup.py index bcb565a..197c088 100644 --- a/setup.py +++ b/setup.py @@ -4,6 +4,6 @@ name="data-pipeline", version="0.0.1", packages=setuptools.find_packages(), - install_requires=["apache-beam[gcp]==2.54.0"], + install_requires=["apache-beam[gcp]==2.50.0"], package_data={"schema": ["*.json"]}, ) From ba4804bcc23773bb58af2b52304ed5af1932d8cf Mon Sep 17 00:00:00 2001 From: Barry Pollard Date: Thu, 7 Mar 2024 14:21:22 +0000 Subject: [PATCH 2/3] Formatting --- modules/non_summary_pipeline.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/modules/non_summary_pipeline.py b/modules/non_summary_pipeline.py index f227f92..482354d 100644 --- a/modules/non_summary_pipeline.py +++ b/modules/non_summary_pipeline.py @@ -443,9 +443,11 @@ def to_json(obj): if not obj: raise ValueError - return json.dumps(obj, separators=(",", ":"), ensure_ascii=False) + return ( + json.dumps(obj, separators=(",", ":"), ensure_ascii=False) .encode("utf-8", "surrogatepass") .decode("utf-8", "replace") + ) def from_json(file_name, element): From 686a0cd796609647695ccb439c867fecff039593 Mon Sep 17 00:00:00 2001 From: Barry Pollard Date: Sat, 9 Mar 2024 11:15:01 +0000 Subject: [PATCH 3/3] Try increasing response sizes to match all --- modules/non_summary_pipeline.py | 10 ++++++---- requirements.txt | 2 +- run_pipeline_all.sh | 2 +- run_pipeline_combined.sh | 2 +- setup.py | 2 +- 5 files changed, 10 insertions(+), 8 deletions(-) diff --git a/modules/non_summary_pipeline.py b/modules/non_summary_pipeline.py index 482354d..d71ca28 100644 --- a/modules/non_summary_pipeline.py +++ b/modules/non_summary_pipeline.py @@ -12,7 +12,8 @@ from modules import utils, constants, transformation # BigQuery can handle rows up to 100 MB. -MAX_CONTENT_SIZE = 2 * 1024 * 1024 +MAX_CONTENT_SIZE = 100 * 1000000 +MAX_BODY_CONTENT_SIZE = 20 * 1000000 # Number of times to partition the requests tables. NUM_PARTITIONS = 4 @@ -221,12 +222,13 @@ def get_response_bodies(har): if body is None: continue - truncated = len(body) > MAX_CONTENT_SIZE + truncated = len(body) > MAX_BODY_CONTENT_SIZE if truncated: logging.warning( 'Truncating response body for "%s". Response body size %s exceeds limit %s.' - % (request_url, len(body), MAX_CONTENT_SIZE) + % (request_url, len(body), MAX_BODY_CONTENT_SIZE) ) + body = body[:MAX_BODY_CONTENT_SIZE] metadata = get_metadata(har) @@ -234,7 +236,7 @@ def get_response_bodies(har): { "page": page_url, "url": request_url, - "body": body[:MAX_CONTENT_SIZE], + "body": body, "truncated": truncated, "date": har["date"], "client": har["client"], diff --git a/requirements.txt b/requirements.txt index dca473f..6d79a5e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ -apache-beam[gcp]==2.50.0 +apache-beam[gcp]==2.54.0 black==24.2.0 coverage>=6.4.4 diff --git a/run_pipeline_all.sh b/run_pipeline_all.sh index e9f77fa..7bcd742 100755 --- a/run_pipeline_all.sh +++ b/run_pipeline_all.sh @@ -9,4 +9,4 @@ python run_all.py \ --setup_file=./setup.py \ --machine_type=n1-standard-32 \ --worker_disk_type=compute.googleapis.com/projects//zones//diskTypes/pd-ssd \ - --noauth_local_webserver + --max_cache_memory_usage_mb=0 diff --git a/run_pipeline_combined.sh b/run_pipeline_combined.sh index 4a8c58c..95e37f5 100755 --- a/run_pipeline_combined.sh +++ b/run_pipeline_combined.sh @@ -10,4 +10,4 @@ python3 run_combined.py \ --setup_file=./setup.py \ --machine_type=n1-standard-32 \ --worker_disk_type=compute.googleapis.com/projects//zones//diskTypes/pd-ssd \ - --noauth_local_webserver + --max_cache_memory_usage_mb=0 diff --git a/setup.py b/setup.py index 197c088..bcb565a 100644 --- a/setup.py +++ b/setup.py @@ -4,6 +4,6 @@ name="data-pipeline", version="0.0.1", packages=setuptools.find_packages(), - install_requires=["apache-beam[gcp]==2.50.0"], + install_requires=["apache-beam[gcp]==2.54.0"], package_data={"schema": ["*.json"]}, )