Skip to content

Commit 968c1d2

Browse files
authored
refactor: use JobProgress tracking in data value import [DHIS2-18885] (#19781)
* chore: use JobProgress tracking in data value import [DHIS2-18885] * fix: dependencies and stage organisation * fix: read ADX import directly using a reader adapter * fix: test * fix: pass NOOP progress tracker in tests * fix: progress tracking of data import + bucketing of work items * chore: sonar issues * fix: AOC is a ADX group attribute * fix: revert XML export format changes as test fail that are hard to adjust
1 parent 012b1d4 commit 968c1d2

File tree

17 files changed

+380
-457
lines changed

17 files changed

+380
-457
lines changed

dhis-2/dhis-api/src/main/java/org/hisp/dhis/scheduling/JobProgress.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,14 @@ default void failedStage(@Nonnull Exception cause) {
276276
failedStage(getMessage(cause));
277277
}
278278

279+
/**
280+
* @since 2.42
281+
* @param size number of work items to join into one entry
282+
*/
283+
default void setWorkItemBucketing(int size) {
284+
// by default this is not supported and no bucketing will occur
285+
}
286+
279287
default void startingWorkItem(@Nonnull String description, Object... args) {
280288
startingWorkItem(format(description, args), FailurePolicy.PARENT);
281289
}

dhis-2/dhis-services/dhis-service-core/src/main/java/org/hisp/dhis/scheduling/RecordingJobProgress.java

Lines changed: 55 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import lombok.extern.slf4j.Slf4j;
4242
import org.hisp.dhis.feedback.ErrorCode;
4343
import org.hisp.dhis.message.MessageService;
44+
import org.hisp.dhis.system.notification.NotificationLevel;
45+
import org.hisp.dhis.system.notification.Notifier;
4446
import org.hisp.dhis.tracker.imports.validation.ValidationCode;
4547
import org.hisp.dhis.user.CurrentUserUtil;
4648
import org.hisp.dhis.user.UserDetails;
@@ -66,7 +68,15 @@ public class RecordingJobProgress implements JobProgress {
6668
* recoding objects
6769
*/
6870
public static JobProgress transitory() {
69-
return new RecordingJobProgress(null, null, JobProgress.noop(), true, () -> {}, false, true);
71+
return transitory(null, null);
72+
}
73+
74+
public static JobProgress transitory(JobConfiguration job, Notifier notifier) {
75+
JobProgress track =
76+
notifier == null
77+
? JobProgress.noop()
78+
: new NotifierJobProgress(notifier, job, NotificationLevel.INFO);
79+
return new RecordingJobProgress(null, null, track, true, () -> {}, false, true);
7080
}
7181

7282
@CheckForNull private final MessageService messageService;
@@ -87,6 +97,9 @@ public static JobProgress transitory() {
8797
private final ThreadLocal<Item> incompleteItem = new ThreadLocal<>();
8898
private final boolean usingErrorNotification;
8999

100+
private int bucketingSize;
101+
private int bucketed;
102+
90103
public RecordingJobProgress(JobConfiguration configuration) {
91104
this(null, configuration, JobProgress.noop(), true, () -> {}, false, false);
92105
}
@@ -295,6 +308,8 @@ public void startingStage(
295308
skipCurrentStage.set(false);
296309
tracker.startingStage(description, workItems);
297310
incompleteItem.remove();
311+
bucketingSize = 1;
312+
bucketed = 0;
298313
Stage stage =
299314
addStageRecord(getOrAddLastIncompleteProcess(), description, workItems, onFailure);
300315
logInfo(stage, "", description);
@@ -315,6 +330,7 @@ public void completedStage(String summary, Object... args) {
315330
String message = format(summary, args);
316331
tracker.completedStage(message);
317332
Stage stage = getOrAddLastIncompleteStage();
333+
autoCompleteWorkItemBucket();
318334
stage.complete(message);
319335
logInfo(stage, "completed", message);
320336
}
@@ -326,6 +342,7 @@ public void failedStage(@Nonnull String error, Object... args) {
326342
String message = format(error, args);
327343
tracker.failedStage(message);
328344
Stage stage = getOrAddLastIncompleteStage();
345+
autoCompleteWorkItemBucket();
329346
stage.completeExceptionally(message, null);
330347
if (stage.getOnFailure() != FailurePolicy.SKIP_STAGE) {
331348
automaticAbort(message, null);
@@ -341,6 +358,7 @@ public void failedStage(@Nonnull Exception cause) {
341358
tracker.failedStage(cause);
342359
String message = getMessage(cause);
343360
Stage stage = getOrAddLastIncompleteStage();
361+
autoCompleteWorkItemBucket();
344362
stage.completeExceptionally(message, cause);
345363
if (stage.getOnFailure() != FailurePolicy.SKIP_STAGE) {
346364
automaticAbort(message, cause);
@@ -349,28 +367,57 @@ public void failedStage(@Nonnull Exception cause) {
349367
logError(stage, cause, message);
350368
}
351369

370+
@Override
371+
public void setWorkItemBucketing(int size) {
372+
bucketingSize = Math.max(1, size);
373+
}
374+
352375
@Override
353376
public void startingWorkItem(@Nonnull String description, @Nonnull FailurePolicy onFailure) {
354-
observer.run();
377+
if (bucketed % bucketingSize == 0) {
378+
observer.run();
355379

356-
tracker.startingWorkItem(description, onFailure);
357-
Item item = addItemRecord(getOrAddLastIncompleteStage(), description, onFailure);
358-
logDebug(item, "started", description);
380+
tracker.startingWorkItem(description, onFailure);
381+
Item item = addItemRecord(getOrAddLastIncompleteStage(), description, onFailure);
382+
logDebug(item, "started", description);
383+
}
384+
bucketed++;
359385
}
360386

361387
@Override
362388
public void completedWorkItem(String summary, Object... args) {
389+
if (bucketed % bucketingSize == 0) {
390+
completeWorkItemBucket(summary, args);
391+
}
392+
}
393+
394+
private void completeWorkItemBucket(String summary, Object... args) {
363395
observer.run();
364396

365-
String message = format(summary, args);
366-
tracker.completedWorkItem(message);
367397
Item item = getOrAddLastIncompleteItem();
398+
String message =
399+
summary == null && bucketingSize > 1 ? getBucketSummary(item) : format(summary, args);
400+
tracker.completedWorkItem(message);
368401
item.complete(message);
369402
logDebug(item, "completed", message);
370403
}
371404

405+
@Nonnull
406+
private String getBucketSummary(Item item) {
407+
int n = bucketed % bucketingSize;
408+
return item.getDescription() + " +" + (n == 0 ? bucketingSize : n);
409+
}
410+
411+
private void autoCompleteWorkItemBucket() {
412+
if (bucketingSize > 1) {
413+
Item item = incompleteItem.get();
414+
if (item != null && !item.isComplete()) completeWorkItemBucket(null);
415+
}
416+
}
417+
372418
@Override
373419
public void failedWorkItem(@Nonnull String error, Object... args) {
420+
bucketed = 0; // reset to restart bucketing on next item
374421
observer.run();
375422

376423
String message = format(error, args);
@@ -385,6 +432,7 @@ public void failedWorkItem(@Nonnull String error, Object... args) {
385432

386433
@Override
387434
public void failedWorkItem(@Nonnull Exception cause) {
435+
bucketed = 0; // reset to restart bucketing on next item
388436
observer.run();
389437

390438
tracker.failedWorkItem(cause);

dhis-2/dhis-services/dhis-service-dxf2/pom.xml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,6 @@
9999
<groupId>org.springframework</groupId>
100100
<artifactId>spring-web</artifactId>
101101
</dependency>
102-
<dependency>
103-
<groupId>org.springframework.security</groupId>
104-
<artifactId>spring-security-core</artifactId>
105-
</dependency>
106102
<dependency>
107103
<groupId>org.hibernate</groupId>
108104
<artifactId>hibernate-core-jakarta</artifactId>

dhis-2/dhis-services/dhis-service-dxf2/src/main/java/org/hisp/dhis/dxf2/adx/AdxDataService.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,12 @@
2929

3030
import java.io.InputStream;
3131
import java.io.OutputStream;
32+
import javax.annotation.Nonnull;
3233
import org.hisp.dhis.datavalue.DataExportParams;
3334
import org.hisp.dhis.dxf2.common.ImportOptions;
3435
import org.hisp.dhis.dxf2.datavalueset.DataValueSetQueryParams;
3536
import org.hisp.dhis.dxf2.importsummary.ImportSummary;
36-
import org.hisp.dhis.scheduling.JobConfiguration;
37+
import org.hisp.dhis.scheduling.JobProgress;
3738

3839
/**
3940
* @author bobj
@@ -84,10 +85,11 @@ public interface AdxDataService {
8485
*
8586
* @param in the InputStream.
8687
* @param importOptions the importOptions.
87-
* @param id the task id, can be null.
88+
* @param progress to track progress
8889
* @return an ImportSummaries collection of ImportSummary for each DataValueSet.
8990
*/
90-
ImportSummary saveDataValueSet(InputStream in, ImportOptions importOptions, JobConfiguration id);
91+
ImportSummary saveDataValueSet(
92+
InputStream in, ImportOptions importOptions, @Nonnull JobProgress progress);
9193

9294
/**
9395
* Get data. Writes adx export data to output stream.
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Copyright (c) 2004-2025, University of Oslo
3+
* All rights reserved.
4+
*
5+
* Redistribution and use in source and binary forms, with or without
6+
* modification, are permitted provided that the following conditions are met:
7+
* Redistributions of source code must retain the above copyright notice, this
8+
* list of conditions and the following disclaimer.
9+
*
10+
* Redistributions in binary form must reproduce the above copyright notice,
11+
* this list of conditions and the following disclaimer in the documentation
12+
* and/or other materials provided with the distribution.
13+
* Neither the name of the HISP project nor the names of its contributors may
14+
* be used to endorse or promote products derived from this software without
15+
* specific prior written permission.
16+
*
17+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
18+
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19+
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
20+
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
21+
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
22+
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
23+
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
24+
* ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
26+
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27+
*/
28+
package org.hisp.dhis.dxf2.adx;
29+
30+
import static java.lang.Boolean.parseBoolean;
31+
32+
import java.util.Map;
33+
import java.util.function.Consumer;
34+
import java.util.function.Predicate;
35+
import lombok.RequiredArgsConstructor;
36+
import org.hisp.dhis.dxf2.datavalueset.DataValueEntry;
37+
import org.hisp.dhis.dxf2.datavalueset.DataValueSet;
38+
import org.hisp.dhis.dxf2.datavalueset.DataValueSetReader;
39+
import org.hisp.staxwax.reader.XMLReader;
40+
41+
@RequiredArgsConstructor
42+
public class AdxDataValueSetReader implements DataValueSetReader, DataValueEntry {
43+
44+
private final XMLReader adxReader;
45+
private final Consumer<Map<String, String>> handleGroup;
46+
private final Predicate<Map<String, String>> handleValue;
47+
48+
private Map<String, String> groupAttributes;
49+
private Map<String, String> valueAttributes;
50+
51+
@Override
52+
public DataValueSet readHeader() {
53+
adxReader.moveToStartElement(AdxDataService.ROOT, AdxDataService.NAMESPACE);
54+
return new DataValueSet();
55+
}
56+
57+
@Override
58+
public DataValueEntry readNext() {
59+
if (groupAttributes == null) {
60+
if (!adxReader.moveToStartElement(AdxDataService.GROUP, AdxDataService.NAMESPACE))
61+
return null;
62+
groupAttributes = adxReader.readAttributes();
63+
handleGroup.accept(groupAttributes);
64+
}
65+
if (!adxReader.moveToStartElement(AdxDataService.DATAVALUE, AdxDataService.GROUP)) {
66+
groupAttributes = null;
67+
// end of the group, try next by recursively calling itself
68+
return readNext();
69+
}
70+
valueAttributes = adxReader.readAttributes();
71+
if (handleValue.test(valueAttributes)) {
72+
// if data element type is not numeric we need to pick out the
73+
// 'annotation' element
74+
adxReader.moveToStartElement(AdxDataService.ANNOTATION, AdxDataService.DATAVALUE);
75+
if (adxReader.isStartElement(AdxDataService.ANNOTATION)) {
76+
valueAttributes.put(AdxDataService.VALUE, adxReader.getElementValue());
77+
} else {
78+
throw new IllegalArgumentException("DataElement expects text annotation");
79+
}
80+
}
81+
return this;
82+
}
83+
84+
@Override
85+
public void close() {
86+
adxReader.closeReader();
87+
}
88+
89+
@Override
90+
public String getDataElement() {
91+
return valueAttributes.get(AdxDataService.DATAELEMENT);
92+
}
93+
94+
@Override
95+
public String getPeriod() {
96+
return groupAttributes.get(AdxDataService.PERIOD);
97+
}
98+
99+
@Override
100+
public String getOrgUnit() {
101+
return groupAttributes.get(AdxDataService.ORGUNIT);
102+
}
103+
104+
@Override
105+
public String getCategoryOptionCombo() {
106+
return valueAttributes.get(AdxDataService.CATOPTCOMBO);
107+
}
108+
109+
@Override
110+
public String getAttributeOptionCombo() {
111+
return groupAttributes.get(AdxDataService.ATTOPTCOMBO);
112+
}
113+
114+
@Override
115+
public String getValue() {
116+
return valueAttributes.get(AdxDataService.VALUE);
117+
}
118+
119+
@Override
120+
public String getStoredBy() {
121+
return valueAttributes.get("storedBy");
122+
}
123+
124+
@Override
125+
public String getCreated() {
126+
return valueAttributes.get("created");
127+
}
128+
129+
@Override
130+
public String getLastUpdated() {
131+
return valueAttributes.get("lastUpdated");
132+
}
133+
134+
@Override
135+
public String getComment() {
136+
return valueAttributes.get("comment");
137+
}
138+
139+
@Override
140+
public boolean getFollowup() {
141+
return parseBoolean(valueAttributes.get("followup"));
142+
}
143+
144+
@Override
145+
public Boolean getDeleted() {
146+
String deleted = valueAttributes.get("deleted");
147+
return deleted == null ? null : parseBoolean(deleted);
148+
}
149+
}

0 commit comments

Comments
 (0)