Skip to content

Commit 6f064d2

Browse files
authored
Fixed dropping of view records in the merge process. (#1314)
1 parent 6a8ac36 commit 6f064d2

File tree

2 files changed

+11
-5
lines changed

2 files changed

+11
-5
lines changed

e2e-tests/controller-spark/controller_spark_sql_validation.sh

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -227,10 +227,12 @@ function check_parquet() {
227227

228228
if [[ "${isIncremental}" == "true" ]]
229229
then
230-
# In case of incremental run, we will have two directories
231-
# assuming batch run was executed before this.
230+
# In case of incremental run, we will have two directories (because a batch
231+
# run was executed first); one directory is for the first batch run and
232+
# the second is for the merge step of the incremental run. The second
233+
# directory has one more patient, hence the new totals.
232234
TOTAL_TEST_PATIENTS=$((2*TOTAL_TEST_PATIENTS + 1))
233-
TOTAL_VIEW_PATIENTS=108
235+
TOTAL_VIEW_PATIENTS=$((2*TOTAL_VIEW_PATIENTS + 1))
234236
TOTAL_TEST_ENCOUNTERS=$((2*TOTAL_TEST_ENCOUNTERS))
235237
TOTAL_TEST_OBS=$((2*TOTAL_TEST_OBS))
236238
fi

pipelines/batch/src/main/java/com/google/fhir/analytics/ParquetMerger.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2024 Google LLC
2+
* Copyright 2020-2025 Google LLC
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -342,7 +342,11 @@ public static Pipeline writeMergedViewsPipeline(
342342
public void processElement(ProcessContext c) {
343343
KV<String, CoGbkResult> e = c.element();
344344
List<GenericRecord> lastRecords = new ArrayList<>();
345-
e.getValue().getAll(newDwh).forEach(lastRecords::add);
345+
Iterable<GenericRecord> iter = e.getValue().getAll(newDwh);
346+
if (!iter.iterator().hasNext()) {
347+
iter = e.getValue().getAll(oldDwh);
348+
}
349+
iter.forEach(lastRecords::add);
346350
for (GenericRecord r : lastRecords) {
347351
c.output(r);
348352
}

0 commit comments

Comments
 (0)