diff --git a/driver-core/src/main/com/mongodb/connection/CommandProtocol.java b/driver-core/src/main/com/mongodb/connection/CommandProtocol.java index 09a794219c6..d50278ca5b0 100644 --- a/driver-core/src/main/com/mongodb/connection/CommandProtocol.java +++ b/driver-core/src/main/com/mongodb/connection/CommandProtocol.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2008-2015 MongoDB, Inc. + * Copyright 2008-2016 MongoDB, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,12 +22,13 @@ import com.mongodb.diagnostics.logging.Logger; import com.mongodb.diagnostics.logging.Loggers; import com.mongodb.event.CommandListener; +import org.bson.BsonBinaryReader; import org.bson.BsonDocument; -import org.bson.BsonDocumentReader; import org.bson.FieldNameValidator; import org.bson.codecs.BsonDocumentCodec; import org.bson.codecs.Decoder; -import org.bson.codecs.DecoderContext; +import org.bson.codecs.RawBsonDocumentCodec; +import org.bson.io.ByteBufferBsonInput; import java.util.HashSet; import java.util.Set; @@ -105,25 +106,21 @@ public T execute(final InternalConnection connection) { long startTimeNanos = System.nanoTime(); CommandMessage commandMessage = new CommandMessage(namespace.getFullName(), command, slaveOk, fieldNameValidator, ProtocolHelper.getMessageSettings(connection.getDescription())); + ResponseBuffers responseBuffers = null; try { sendMessage(commandMessage, connection); - ResponseBuffers responseBuffers = connection.receiveMessage(commandMessage.getId()); - ReplyMessage replyMessage; - try { - replyMessage = new ReplyMessage(responseBuffers, new BsonDocumentCodec(), commandMessage.getId()); - } finally { - responseBuffers.close(); + responseBuffers = connection.receiveMessage(commandMessage.getId()); + if (!ProtocolHelper.isCommandOk(new BsonBinaryReader(new ByteBufferBsonInput(responseBuffers.getBodyByteBuffer())))) { + throw getCommandFailureException(getResponseDocument(responseBuffers, commandMessage, new BsonDocumentCodec()), + connection.getDescription().getServerAddress()); } - BsonDocument response = replyMessage.getDocuments().get(0); - if (!ProtocolHelper.isCommandOk(response)) { - throw getCommandFailureException(response, connection.getDescription().getServerAddress()); - } + T retval = getResponseDocument(responseBuffers, commandMessage, commandResultDecoder); - T retval = commandResultDecoder.decode(new BsonDocumentReader(response), DecoderContext.builder().build()); if (commandListener != null) { BsonDocument responseDocumentForEvent = (SECURITY_SENSITIVE_COMMANDS.contains(getCommandName())) - ? new BsonDocument() : response; + ? new BsonDocument() + : getResponseDocument(responseBuffers, commandMessage, new RawBsonDocumentCodec()); sendCommandSucceededEvent(commandMessage, getCommandName(), responseDocumentForEvent, connection.getDescription(), startTimeNanos, commandListener); } @@ -139,9 +136,21 @@ public T execute(final InternalConnection connection) { commandListener); } throw e; + } finally { + if (responseBuffers != null) { + responseBuffers.close(); + } } } + private static D getResponseDocument(final ResponseBuffers responseBuffers, final CommandMessage commandMessage, + final Decoder decoder) { + responseBuffers.reset(); + ReplyMessage replyMessage = new ReplyMessage(responseBuffers, decoder, commandMessage.getId()); + + return replyMessage.getDocuments().get(0); + } + @Override public void executeAsync(final InternalConnection connection, final SingleResultCallback callback) { try { diff --git a/driver-core/src/main/com/mongodb/connection/ProtocolHelper.java b/driver-core/src/main/com/mongodb/connection/ProtocolHelper.java index 15d9f640ff9..5f5d85182d6 100644 --- a/driver-core/src/main/com/mongodb/connection/ProtocolHelper.java +++ b/driver-core/src/main/com/mongodb/connection/ProtocolHelper.java @@ -36,14 +36,22 @@ import org.bson.BsonBoolean; import org.bson.BsonDocument; import org.bson.BsonInt32; +import org.bson.BsonReader; import org.bson.BsonString; +import org.bson.BsonType; import org.bson.BsonValue; +import org.bson.codecs.BsonValueCodecProvider; +import org.bson.codecs.DecoderContext; +import org.bson.codecs.configuration.CodecRegistry; import org.bson.io.BsonOutput; import static java.lang.String.format; +import static org.bson.codecs.BsonValueCodecProvider.getClassForBsonType; +import static org.bson.codecs.configuration.CodecRegistries.fromProviders; final class ProtocolHelper { private static final Logger PROTOCOL_EVENT_LOGGER = Loggers.getLogger("protocol.event"); + private static final CodecRegistry REGISTRY = fromProviders(new BsonValueCodecProvider()); static WriteConcernResult getWriteResult(final BsonDocument result, final ServerAddress serverAddress) { if (!isCommandOk(result)) { @@ -67,6 +75,27 @@ private static WriteConcernResult createWriteResult(final BsonDocument result) { static boolean isCommandOk(final BsonDocument response) { BsonValue okValue = response.get("ok"); + return isCommandOk(okValue); + } + + static boolean isCommandOk(final BsonReader bsonReader) { + return isCommandOk(getField(bsonReader, "ok")); + } + + private static BsonValue getField(final BsonReader bsonReader, final String fieldName) { + bsonReader.readStartDocument(); + while (bsonReader.readBsonType() != BsonType.END_OF_DOCUMENT) { + if (bsonReader.readName().equals(fieldName)) { + return REGISTRY.get(getClassForBsonType(bsonReader.getCurrentBsonType())).decode(bsonReader, + DecoderContext.builder().build()); + } + bsonReader.skipValue(); + } + bsonReader.readEndDocument(); + return null; + } + + private static boolean isCommandOk(final BsonValue okValue) { if (okValue == null) { return false; } else if (okValue.isBoolean()) { diff --git a/driver-core/src/main/com/mongodb/connection/ResponseBuffers.java b/driver-core/src/main/com/mongodb/connection/ResponseBuffers.java index 378cf5100d5..9f3ab552bc9 100644 --- a/driver-core/src/main/com/mongodb/connection/ResponseBuffers.java +++ b/driver-core/src/main/com/mongodb/connection/ResponseBuffers.java @@ -55,6 +55,10 @@ public ByteBuf getBodyByteBuffer() { return bodyByteBuffer.asReadOnly(); } + public void reset() { + bodyByteBuffer.position(0); + } + @Override public void close() { if (!isClosed) { diff --git a/driver/src/test/functional/com/mongodb/client/CommandMonitoringTest.java b/driver/src/test/functional/com/mongodb/client/CommandMonitoringTest.java index 1a4dcf9996f..9993e4c46b8 100644 --- a/driver/src/test/functional/com/mongodb/client/CommandMonitoringTest.java +++ b/driver/src/test/functional/com/mongodb/client/CommandMonitoringTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015 MongoDB, Inc. + * Copyright 2015-2016 MongoDB, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,13 +32,21 @@ import org.bson.BsonArray; import org.bson.BsonBoolean; import org.bson.BsonDocument; +import org.bson.BsonDocumentWriter; import org.bson.BsonDouble; import org.bson.BsonInt32; import org.bson.BsonInt64; import org.bson.BsonString; import org.bson.BsonValue; import org.bson.Document; +import org.bson.codecs.BsonDocumentCodec; +import org.bson.codecs.BsonValueCodecProvider; +import org.bson.codecs.Codec; import org.bson.codecs.DocumentCodec; +import org.bson.codecs.EncoderContext; +import org.bson.codecs.configuration.CodecProvider; +import org.bson.codecs.configuration.CodecRegistries; +import org.bson.codecs.configuration.CodecRegistry; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -69,6 +77,19 @@ // See https://github.com/mongodb/specifications/tree/master/source/command-monitoring/tests @RunWith(Parameterized.class) public class CommandMonitoringTest { + private static final CodecRegistry CODEC_REGISTRY_HACK = CodecRegistries.fromProviders(new BsonValueCodecProvider(), + new CodecProvider() { + @Override + @SuppressWarnings("unchecked") + public Codec get(final Class clazz, final CodecRegistry registry) { + // Use BsonDocumentCodec even for a private sub-class of BsonDocument + if (BsonDocument.class.isAssignableFrom(clazz)) { + return (Codec) new BsonDocumentCodec(registry); + } + return null; + } + }); + private static MongoClient mongoClient; private static TestCommandListener commandListener; private final String filename; @@ -210,34 +231,37 @@ private CommandSucceededEvent massageExpectedCommandSucceededEvent(final Command } private CommandSucceededEvent massageActualCommandSucceededEvent(final CommandSucceededEvent actual) { + BsonDocument response = getWritableCloneOfCommand(actual.getResponse()); + // massage numbers that are the wrong BSON type - actual.getResponse().put("ok", new BsonDouble(actual.getResponse().getNumber("ok").doubleValue())); - if (actual.getResponse().containsKey("n")) { - actual.getResponse().put("n", new BsonInt32(actual.getResponse().getNumber("n").intValue())); + response.put("ok", new BsonDouble(response.getNumber("ok").doubleValue())); + if (response.containsKey("n")) { + response.put("n", new BsonInt32(response.getNumber("n").intValue())); } if (actual.getCommandName().equals("find") || actual.getCommandName().equals("getMore")) { - if (actual.getResponse().containsKey("cursor")) { - if (actual.getResponse().getDocument("cursor").containsKey("id") - && !actual.getResponse().getDocument("cursor").getInt64("id").equals(new BsonInt64(0))) { - actual.getResponse().getDocument("cursor").put("id", new BsonInt64(42)); + if (response.containsKey("cursor")) { + if (response.getDocument("cursor").containsKey("id") + && !response.getDocument("cursor").getInt64("id").equals(new BsonInt64(0))) { + response.getDocument("cursor").put("id", new BsonInt64(42)); } } } else if (actual.getCommandName().equals("killCursors")) { - actual.getResponse().getArray("cursorsUnknown").set(0, new BsonInt64(42)); + response.getArray("cursorsUnknown").set(0, new BsonInt64(42)); } else if (isWriteCommand(actual.getCommandName())) { - if (actual.getResponse().containsKey("writeErrors")) { - for (Iterator iter = actual.getResponse().getArray("writeErrors").iterator(); iter.hasNext();) { + if (response.containsKey("writeErrors")) { + for (Iterator iter = response.getArray("writeErrors").iterator(); iter.hasNext();) { BsonDocument cur = iter.next().asDocument(); cur.put("code", new BsonInt32(42)); cur.put("errmsg", new BsonString("")); } } if (actual.getCommandName().equals("update")) { - actual.getResponse().remove("nModified"); + response.remove("nModified"); } } - return actual; + return new CommandSucceededEvent(actual.getRequestId(), actual.getConnectionDescription(), actual.getCommandName(), response, + actual.getElapsedTime(TimeUnit.NANOSECONDS)); } private boolean isWriteCommand(final String commandName) { @@ -245,8 +269,10 @@ private boolean isWriteCommand(final String commandName) { } private CommandStartedEvent massageActualCommandStartedEvent(final CommandStartedEvent actual) { + BsonDocument command = getWritableCloneOfCommand(actual.getCommand()); + if (actual.getCommandName().equals("update")) { - for (Iterator iter = actual.getCommand().getArray("updates").iterator(); iter.hasNext();) { + for (Iterator iter = command.getArray("updates").iterator(); iter.hasNext();) { BsonDocument curUpdate = iter.next().asDocument(); if (!curUpdate.containsKey("multi")) { curUpdate.put("multi", BsonBoolean.FALSE); @@ -256,12 +282,13 @@ private CommandStartedEvent massageActualCommandStartedEvent(final CommandStarte } } } else if (actual.getCommandName().equals("getMore")) { - actual.getCommand().put("getMore", new BsonInt64(42)); + command.put("getMore", new BsonInt64(42)); } else if (actual.getCommandName().equals("killCursors")) { - actual.getCommand().getArray("cursors").set(0, new BsonInt64(42)); + command.getArray("cursors").set(0, new BsonInt64(42)); } - return actual; + return new CommandStartedEvent(actual.getRequestId(), actual.getConnectionDescription(), actual.getDatabaseName(), + actual.getCommandName(), command); } private void executeOperation() { @@ -298,6 +325,15 @@ private List getExpectedEvents(final BsonArray expectedEventDocume return expectedEvents; } + + private BsonDocument getWritableCloneOfCommand(final BsonDocument original) { + BsonDocument clone = new BsonDocument(); + BsonDocumentWriter writer = new BsonDocumentWriter(clone); + new BsonDocumentCodec(CODEC_REGISTRY_HACK).encode(writer, original, EncoderContext.builder().build()); + return clone; + } + + @Parameterized.Parameters(name = "{1}") public static Collection data() throws URISyntaxException, IOException { List data = new ArrayList();