Skip to content

Commit 3084f18

Browse files
committed
Ensure insertedIds contain ids from all batches (#850)
The previous code was incorrect because it was comparing absolute write indexes with indexes that are relative to the current batch. This patch avoids that by using the insertedId map from SplittablePayload directly, which already contains absolute write indexes. JAVA-4436
1 parent 7aa0a12 commit 3084f18

File tree

2 files changed

+60
-22
lines changed

2 files changed

+60
-22
lines changed

driver-core/src/main/com/mongodb/internal/operation/BulkWriteBatch.java

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,11 @@
5353
import org.bson.codecs.configuration.CodecRegistry;
5454

5555
import java.util.ArrayList;
56-
import java.util.Collections;
5756
import java.util.HashMap;
5857
import java.util.List;
5958
import java.util.Map;
59+
import java.util.Set;
6060
import java.util.stream.Collectors;
61-
import java.util.stream.Stream;
6261

6362
import static com.mongodb.internal.bulk.WriteRequest.Type.DELETE;
6463
import static com.mongodb.internal.bulk.WriteRequest.Type.INSERT;
@@ -282,21 +281,11 @@ private BulkWriteResult getBulkWriteResult(final BsonDocument result) {
282281
}
283282

284283
private List<BulkWriteInsert> getInsertedItems(final BsonDocument result) {
285-
if (payload.getPayloadType() == SplittablePayload.Type.INSERT) {
286-
287-
Stream<WriteRequestWithIndex> writeRequests = payload.getWriteRequestWithIndexes().stream();
288-
List<Integer> writeErrors = getWriteErrors(result).stream().map(BulkWriteError::getIndex).collect(Collectors.toList());
289-
if (!writeErrors.isEmpty()) {
290-
writeRequests = writeRequests.filter(wr -> !writeErrors.contains(wr.getIndex()));
291-
}
292-
if (payload.getPosition() < payload.size()) {
293-
writeRequests = writeRequests.filter(wr -> wr.getIndex() < payload.getPosition());
294-
}
295-
return writeRequests
296-
.map(wr -> new BulkWriteInsert(wr.getIndex(), payload.getInsertedIds().get(wr.getIndex())))
297-
.collect(Collectors.toList());
298-
}
299-
return Collections.emptyList();
284+
Set<Integer> writeErrors = getWriteErrors(result).stream().map(BulkWriteError::getIndex).collect(Collectors.toSet());
285+
return payload.getInsertedIds().entrySet().stream()
286+
.filter(entry -> !writeErrors.contains(entry.getKey()))
287+
.map(entry -> new BulkWriteInsert(entry.getKey(), entry.getValue()))
288+
.collect(Collectors.toList());
300289
}
301290

302291

driver-core/src/test/unit/com/mongodb/internal/operation/BulkWriteBatchSpecification.groovy

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.mongodb.internal.operation
1818

19+
import com.mongodb.MongoBulkWriteException
1920
import com.mongodb.MongoNamespace
2021
import com.mongodb.ReadConcern
2122
import com.mongodb.ServerAddress
@@ -228,23 +229,71 @@ class BulkWriteBatchSpecification extends Specification {
228229
!bulkWriteBatch.hasAnotherBatch()
229230
}
230231

231-
def 'should only map inserts up to the payload position'() {
232+
def 'should map all inserted ids'() {
232233
when:
233234
def bulkWriteBatch = BulkWriteBatch.createBulkWriteBatch(namespace, serverDescription, connectionDescription, false,
234-
WriteConcern.ACKNOWLEDGED, null, false, getWriteRequests()[3..4], sessionContext)
235+
WriteConcern.ACKNOWLEDGED, null, false,
236+
[new InsertRequest(toBsonDocument('{_id: 0}')),
237+
new InsertRequest(toBsonDocument('{_id: 1}')),
238+
new InsertRequest(toBsonDocument('{_id: 2}'))
239+
],
240+
sessionContext)
235241
def payload = bulkWriteBatch.getPayload()
236242
payload.setPosition(1)
243+
payload.insertedIds.put(0, new BsonInt32(0))
237244
bulkWriteBatch.addResult(BsonDocument.parse('{"n": 1, "ok": 1.0}'))
238245

239246
then:
240-
bulkWriteBatch.getResult().inserts == [new BulkWriteInsert(0, null)]
247+
bulkWriteBatch.getResult().inserts == [new BulkWriteInsert(0, new BsonInt32(0))]
241248

242249
when:
243-
payload.setPosition(2)
250+
bulkWriteBatch = bulkWriteBatch.getNextBatch()
251+
payload = bulkWriteBatch.getPayload()
252+
payload.setPosition(1)
253+
payload.insertedIds.put(1, new BsonInt32(1))
254+
bulkWriteBatch.addResult(BsonDocument.parse('{"n": 1, "ok": 1.0}'))
255+
256+
then:
257+
bulkWriteBatch.getResult().inserts == [new BulkWriteInsert(0, new BsonInt32(0)),
258+
new BulkWriteInsert(1, new BsonInt32(1))]
259+
260+
when:
261+
bulkWriteBatch = bulkWriteBatch.getNextBatch()
262+
payload = bulkWriteBatch.getPayload()
263+
payload.setPosition(1)
264+
payload.insertedIds.put(2, new BsonInt32(2))
244265
bulkWriteBatch.addResult(BsonDocument.parse('{"n": 1, "ok": 1.0}'))
245266

246267
then:
247-
bulkWriteBatch.getResult().inserts == [new BulkWriteInsert(0, null), new BulkWriteInsert(1, null)]
268+
bulkWriteBatch.getResult().inserts == [new BulkWriteInsert(0, new BsonInt32(0)),
269+
new BulkWriteInsert(1, new BsonInt32(1)),
270+
new BulkWriteInsert(2, new BsonInt32(2))]
271+
}
272+
273+
def 'should not map inserted id with a write error'() {
274+
given:
275+
def bulkWriteBatch = BulkWriteBatch.createBulkWriteBatch(namespace, serverDescription, connectionDescription, false,
276+
WriteConcern.ACKNOWLEDGED, null, false,
277+
[new InsertRequest(toBsonDocument('{_id: 0}')),
278+
new InsertRequest(toBsonDocument('{_id: 1}')),
279+
new InsertRequest(toBsonDocument('{_id: 2}'))
280+
],
281+
sessionContext)
282+
def payload = bulkWriteBatch.getPayload()
283+
payload.setPosition(3)
284+
payload.insertedIds.put(0, new BsonInt32(0))
285+
payload.insertedIds.put(1, new BsonInt32(1))
286+
payload.insertedIds.put(2, new BsonInt32(2))
287+
288+
when:
289+
bulkWriteBatch.addResult(toBsonDocument('''{"ok": 1, "n": 2,
290+
"writeErrors": [{ "index" : 1, "code" : 11000, "errmsg": "duplicate key error"}] }'''))
291+
bulkWriteBatch.getResult()
292+
293+
then:
294+
def ex = thrown(MongoBulkWriteException)
295+
ex.getWriteResult().inserts == [new BulkWriteInsert(0, new BsonInt32(0)),
296+
new BulkWriteInsert(2, new BsonInt32(2))]
248297
}
249298

250299
def 'should not retry when at least one write is not retryable'() {

0 commit comments

Comments
 (0)