Skip to content

Commit

Permalink
JAVA-2682: Don't retry writes unless all writes in the list are retry…
Browse files Browse the repository at this point in the history
…able
  • Loading branch information
jyemin committed Nov 30, 2017
1 parent d719076 commit 44d40c3
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@
import com.mongodb.connection.BulkWriteBatchCombiner;
import com.mongodb.connection.ConnectionDescription;
import com.mongodb.connection.ServerDescription;
import com.mongodb.session.SessionContext;
import com.mongodb.connection.SplittablePayload;
import com.mongodb.internal.connection.IndexMap;
import com.mongodb.internal.validator.CollectibleDocumentFieldNameValidator;
import com.mongodb.internal.validator.MappedFieldNameValidator;
import com.mongodb.internal.validator.NoOpFieldNameValidator;
import com.mongodb.internal.validator.UpdateFieldNameValidator;
import com.mongodb.session.SessionContext;
import org.bson.BsonArray;
import org.bson.BsonBoolean;
import org.bson.BsonDocument;
Expand Down Expand Up @@ -101,7 +101,7 @@ public static BulkWriteBatch createBulkWriteBatch(final MongoNamespace namespace
boolean writeRequestsAreRetryable = true;
for (int i = 0; i < writeRequests.size(); i++) {
WriteRequest writeRequest = writeRequests.get(i);
writeRequestsAreRetryable = isRetryable(writeRequest);
writeRequestsAreRetryable = writeRequestsAreRetryable && isRetryable(writeRequest);
writeRequestsWithIndex.add(new WriteRequestWithIndex(writeRequest, i));
}
if (canRetryWrites && !writeRequestsAreRetryable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import static com.mongodb.client.model.Filters.gte
import static com.mongodb.connection.ServerType.REPLICA_SET_PRIMARY
import static com.mongodb.connection.ServerType.STANDALONE

@SuppressWarnings('ClassSize')
class MixedBulkWriteOperationSpecification extends OperationFunctionalSpecification {

def 'should throw IllegalArgumentException for empty list of requests'() {
Expand Down Expand Up @@ -999,6 +1000,45 @@ class MixedBulkWriteOperationSpecification extends OperationFunctionalSpecificat
async << [true, false]
}

@IgnoreIf({ !serverVersionAtLeast(3, 6) })
def 'should not request retryable write for multi updates or deletes'() {
given:
def operation = new MixedBulkWriteOperation(getNamespace(),
writes, true, ACKNOWLEDGED, true)

when:
executeWithSession(operation, async)

then:
noExceptionThrown()

where:
[async, writes] << [
[true, false],
// Test scenarios where the multi:true request is at the beginning and at the end of the list
[
[
new DeleteRequest(new BsonDocument()).multi(true),
new InsertRequest(new BsonDocument())
],
[
new UpdateRequest(new BsonDocument('_id', new BsonInt32(1)),
new BsonDocument('$set', new BsonDocument('_id', new BsonInt32(1))), UPDATE).multi(true),
new InsertRequest(new BsonDocument())
],
[
new InsertRequest(new BsonDocument()),
new DeleteRequest(new BsonDocument()).multi(true)
],
[
new InsertRequest(new BsonDocument()),
new UpdateRequest(new BsonDocument('_id', new BsonInt32(1)),
new BsonDocument('$set', new BsonDocument('_id', new BsonInt32(1))), UPDATE).multi(true)
]
]
].combinations()
}

@IgnoreIf({ !serverVersionAtLeast(3, 6) })
def 'should support array filters'() {
given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ import com.mongodb.bulk.WriteRequest
import com.mongodb.client.model.Collation
import com.mongodb.connection.ClusterId
import com.mongodb.connection.ConnectionDescription
import com.mongodb.connection.ConnectionId
import com.mongodb.connection.ServerDescription
import com.mongodb.connection.ServerId
import com.mongodb.connection.ServerType
import com.mongodb.connection.ServerVersion
import com.mongodb.internal.connection.NoOpSessionContext
import org.bson.BsonDocument
import org.bson.BsonInt32
Expand All @@ -41,8 +44,13 @@ import static com.mongodb.connection.ServerConnectionState.CONNECTED

class BulkWriteBatchSpecification extends Specification {
def namespace = new MongoNamespace('db.coll')
def serverDescription = ServerDescription.builder().address(new ServerAddress()).state(CONNECTED).build()
def connectionDescription = new ConnectionDescription(new ServerId(new ClusterId(), serverDescription.getAddress()))
def serverDescription = ServerDescription.builder().address(new ServerAddress()).state(CONNECTED)
.version(new ServerVersion(3, 6))
.logicalSessionTimeoutMinutes(30)
.build()
def connectionDescription = new ConnectionDescription(
new ConnectionId(new ServerId(new ClusterId(), serverDescription.getAddress())), new ServerVersion(3, 6),
ServerType.REPLICA_SET_PRIMARY, 1000, 16000, 48000, [])
def sessionContext = new NoOpSessionContext()

def 'should split payloads by type when ordered'() {
Expand Down Expand Up @@ -221,6 +229,16 @@ class BulkWriteBatchSpecification extends Specification {
!bulkWriteBatch.hasAnotherBatch()
}

def 'should not retry when at least one write is not retryable'() {
when:
def bulkWriteBatch = BulkWriteBatch.createBulkWriteBatch(namespace, serverDescription, connectionDescription, false,
WriteConcern.ACKNOWLEDGED, null, true,
[new DeleteRequest(new BsonDocument()).multi(true), new InsertRequest(new BsonDocument())], sessionContext)

then:
!bulkWriteBatch.getRetryWrites()
}

def 'should handle operation responses'() {
given:
def bulkWriteBatch = BulkWriteBatch.createBulkWriteBatch(namespace, serverDescription, connectionDescription, true,
Expand Down

0 comments on commit 44d40c3

Please sign in to comment.