Skip to content

Commit 449392e

Browse files
authored
Merge pull request #1655 from marklogic/feature/time-fix
awaitCompletion now waits correctly
2 parents ed3e72d + fc69366 commit 449392e

File tree

2 files changed

+47
-3
lines changed

2 files changed

+47
-3
lines changed

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/WriteBatcherImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -862,8 +862,8 @@ public void taskComplete(Runnable r) {
862862
// During failover, in order to re-submit the tasks which are meant
863863
// for a failed host, we drain the thread pool and re-submit all the tasks appropriately.
864864
// We would need awaitCompletion() to wait until these resubmitted tasks are also finished.
865-
// Hence we need to remove the old tasks from queuedAndExecutingTasks and any active
866-
// snapshots which contains them and replace it with new tasks which are submitted.
865+
// Hence we need to remove the old tasks from queuedAndExecutingTasks and any active
866+
// snapshots which contains them and replace it with new tasks which are submitted.
867867
public void replaceTask(Runnable oldTask, Runnable newTask) {
868868
boolean removedFromASnapshot = false;
869869
if(queuedAndExecutingTasks.remove(oldTask)) {
@@ -887,7 +887,7 @@ public boolean awaitCompletion(long timeout, TimeUnit unit) throws InterruptedEx
887887
// get asynchronously queued after this point
888888
ConcurrentLinkedQueue<Runnable> snapshotQueuedAndExecutingTasks = snapshotQueuedAndExecutingTasks();
889889
try {
890-
long duration = unit.convert(timeout, TimeUnit.MILLISECONDS);
890+
long duration = TimeUnit.MILLISECONDS.convert(timeout, unit);
891891
// we can iterate even when the underlying set is being modified
892892
// since we're using ConcurrentHashMap
893893
Runnable task = null;
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package com.marklogic.client.test.datamovement;
2+
3+
import com.marklogic.client.datamovement.DataMovementManager;
4+
import com.marklogic.client.datamovement.WriteBatcher;
5+
import com.marklogic.client.io.DocumentMetadataHandle;
6+
import com.marklogic.client.io.StringHandle;
7+
import com.marklogic.client.test.Common;
8+
import org.junit.jupiter.api.Test;
9+
10+
import java.util.concurrent.TimeUnit;
11+
import java.util.concurrent.atomic.AtomicBoolean;
12+
13+
import static org.junit.jupiter.api.Assertions.assertFalse;
14+
15+
public class AwaitCompletionTest {
16+
17+
@Test
18+
void test() throws Exception {
19+
DataMovementManager dmm = Common.newClient().newDataMovementManager();
20+
AtomicBoolean listenerCompleted = new AtomicBoolean(false);
21+
WriteBatcher writeBatcher = dmm.newWriteBatcher().withBatchSize(1).onBatchSuccess(batch -> {
22+
try {
23+
// Intended to last longer than the duration passed to writeBacher.awaitCompletion.
24+
Thread.sleep(10000);
25+
listenerCompleted.set(true);
26+
} catch (InterruptedException e) {
27+
listenerCompleted.set(false);
28+
}
29+
});
30+
dmm.startJob(writeBatcher);
31+
32+
writeBatcher.add("/0doesnt-matter.xml", new DocumentMetadataHandle()
33+
.withPermission("rest-reader", DocumentMetadataHandle.Capability.READ, DocumentMetadataHandle.Capability.UPDATE),
34+
new StringHandle("<test/>"));
35+
writeBatcher.flushAsync();
36+
writeBatcher.awaitCompletion(2, TimeUnit.SECONDS);
37+
dmm.stopJob(writeBatcher);
38+
39+
assertFalse(listenerCompleted.get(), "The batcher should have waited 2 seconds for the batch listener to " +
40+
"completed, which should not occur since the listener is sleeping for 10 seconds. This ensures that a bug " +
41+
"is fixed where the duration passed to 'awaitCompletion' was mishandled and always resulted in a " +
42+
"duration of 0 seconds, which means 'wait until all batches are completed'.");
43+
}
44+
}

0 commit comments

Comments
 (0)