diff --git a/src/main/com/mongodb/BaseCluster.java b/src/main/com/mongodb/BaseCluster.java index 24b018bce4d..43104ad40df 100644 --- a/src/main/com/mongodb/BaseCluster.java +++ b/src/main/com/mongodb/BaseCluster.java @@ -197,6 +197,10 @@ protected void fireChangeEvent() { clusterListener.clusterDescriptionChanged(new ClusterDescriptionChangedEvent(clusterId, description)); } + ClusterDescription getCurrentDescription() { + return description; + } + // gets a random server that still exists in the cluster. Returns null if there are none. private ClusterableServer getRandomServer(final List serverDescriptions) { while (!serverDescriptions.isEmpty()) { diff --git a/src/main/com/mongodb/MultiServerCluster.java b/src/main/com/mongodb/MultiServerCluster.java index 8f47f4c8be1..7f4be5491dc 100644 --- a/src/main/com/mongodb/MultiServerCluster.java +++ b/src/main/com/mongodb/MultiServerCluster.java @@ -45,6 +45,7 @@ final class MultiServerCluster extends BaseCluster { private ClusterType clusterType; private String replicaSetName; private ObjectId maxElectionId; + private Integer maxSetVersion; private final ConcurrentMap addressToServerTupleMap = new ConcurrentHashMap(); @@ -201,10 +202,13 @@ private boolean handleReplicaSetMemberChanged(final ServerDescription newDescrip } if (newDescription.isPrimary()) { - if (newDescription.getElectionId() != null) { - if (maxElectionId != null && maxElectionId.compareTo(newDescription.getElectionId()) > 0) { - LOGGER.info(format("Invalidating potential primary %s whose election id %s is less than the max election id seen so " - + "far %s", newDescription.getAddress(), newDescription.getElectionId(), maxElectionId)); + if (newDescription.getSetVersion() != null && newDescription.getElectionId() != null) { + if (isStalePrimary(newDescription)) { + LOGGER.info(format("Invalidating potential primary %s whose (set version, election id) tuple of (%d, %s) " + + "is less than one already seen of (%d, %s)", + newDescription.getAddress(), + newDescription.getSetVersion(), newDescription.getElectionId(), + maxSetVersion, maxElectionId)); addressToServerTupleMap.get(newDescription.getAddress()).server.invalidate(); return false; } @@ -216,6 +220,13 @@ private boolean handleReplicaSetMemberChanged(final ServerDescription newDescrip } } + if (newDescription.getSetVersion() != null + && (maxSetVersion == null || newDescription.getSetVersion().compareTo(maxSetVersion) > 0)) { + LOGGER.info(format("Setting max set version to %d from replica set primary %s", newDescription.getSetVersion(), + newDescription.getAddress())); + maxSetVersion = newDescription.getSetVersion(); + } + if (isNotAlreadyPrimary(newDescription.getAddress())) { LOGGER.info(format("Discovered replica set primary %s", newDescription.getAddress())); } @@ -224,6 +235,15 @@ private boolean handleReplicaSetMemberChanged(final ServerDescription newDescrip return true; } + private boolean isStalePrimary(final ServerDescription newDescription) { + if (maxSetVersion == null || maxElectionId == null) { + return false; + } + + return (maxSetVersion.compareTo(newDescription.getSetVersion()) > 0 + || (maxSetVersion.equals(newDescription.getSetVersion()) && maxElectionId.compareTo(newDescription.getElectionId()) > 0)); + } + private boolean isNotAlreadyPrimary(final ServerAddress address) { ServerTuple serverTuple = addressToServerTupleMap.get(address); return serverTuple == null || !serverTuple.description.isPrimary(); diff --git a/src/main/com/mongodb/ServerDescription.java b/src/main/com/mongodb/ServerDescription.java index b32013f4f36..a54260ee6b9 100644 --- a/src/main/com/mongodb/ServerDescription.java +++ b/src/main/com/mongodb/ServerDescription.java @@ -74,6 +74,7 @@ class ServerDescription { private final int maxWireVersion; private final ObjectId electionId; + private final Integer setVersion; private final Throwable exception; @@ -97,6 +98,7 @@ static class Builder { private int minWireVersion = 0; private int maxWireVersion = 0; private ObjectId electionId; + private Integer setVersion; private Throwable exception; // CHECKSTYLE:OFF @@ -202,6 +204,17 @@ public Builder electionId(final ObjectId electionId) { return this; } + /** + * Sets the setVersion reported by this server. + * + * @param setVersion the set version + * @return this + */ + public Builder setVersion(final Integer setVersion) { + this.setVersion = setVersion; + return this; + } + public Builder exception(final Throwable exception) { this.exception = exception; return this; @@ -343,6 +356,15 @@ public ObjectId getElectionId() { return electionId; } + /** + * The replica set setVersion reported by this MongoDB server. + * + * @return the setVersion, which may be null + */ + public Integer getSetVersion() { + return setVersion; + } + /** * Returns true if the server has the given tags. A server of either type {@code ServerType.StandAlone} or * {@code ServerType.ShardRouter} is considered to have all tags, so this method will always return true for instances of either of @@ -467,7 +489,9 @@ public boolean equals(final Object o) { if (electionId != null ? !electionId.equals(that.electionId) : that.electionId != null) { return false; } - + if (setVersion != null ? !setVersion.equals(that.setVersion) : that.setVersion != null) { + return false; + } // Compare class equality and message as exceptions rarely override equals Class thisExceptionClass = exception != null ? exception.getClass() : null; Class thatExceptionClass = that.exception != null ? that.exception.getClass() : null; @@ -505,6 +529,7 @@ public int hashCode() { result = 31 * result + minWireVersion; result = 31 * result + maxWireVersion; result = 31 * result + (electionId != null ? electionId.hashCode() : 0); + result = 31 * result + (setVersion != null ? setVersion.hashCode() : 0); result = 31 * result + (exception == null ? 0 : exception.getClass().hashCode()); result = 31 * result + (exception == null ? 0 : exception.getMessage().hashCode()); return result; @@ -524,6 +549,7 @@ public String toString() { + ", maxMessageSize=" + maxMessageSize + ", maxWriteBatchSize=" + maxWriteBatchSize + ", electionId=" + electionId + + ", setVersion=" + setVersion + ", tagSet=" + tagSet + ", setName='" + setName + '\'' + ", averageLatencyNanos=" + averageLatencyNanos @@ -587,6 +613,7 @@ private String getAverageLatencyFormattedInMilliseconds() { minWireVersion = builder.minWireVersion; maxWireVersion = builder.maxWireVersion; electionId = builder.electionId; + setVersion = builder.setVersion; exception = builder.exception; } } diff --git a/src/main/com/mongodb/ServerMonitor.java b/src/main/com/mongodb/ServerMonitor.java index 3b01fbaddb6..256d850bd2a 100644 --- a/src/main/com/mongodb/ServerMonitor.java +++ b/src/main/com/mongodb/ServerMonitor.java @@ -239,8 +239,8 @@ private ServerDescription lookupServerDescription(final DBPort connection) throw } @SuppressWarnings("unchecked") - private ServerDescription createDescription(final CommandResult commandResult, final ServerVersion serverVersion, - final long averageLatencyNanos) { + ServerDescription createDescription(final CommandResult commandResult, final ServerVersion serverVersion, + final long averageLatencyNanos) { return ServerDescription.builder() .state(ServerConnectionState.Connected) .version(serverVersion) @@ -260,6 +260,7 @@ private ServerDescription createDescription(final CommandResult commandResult, f .minWireVersion(commandResult.getInt("minWireVersion", ServerDescription.getDefaultMinWireVersion())) .maxWireVersion(commandResult.getInt("maxWireVersion", ServerDescription.getDefaultMaxWireVersion())) .electionId(commandResult.containsKey("electionId") ? commandResult.getObjectId("electionId") : null) + .setVersion(commandResult.containsKey("setVersion") ? commandResult.getInt("setVersion") : null) .averageLatency(averageLatencyNanos, TimeUnit.NANOSECONDS) .ok(commandResult.ok()).build(); } diff --git a/src/test/com/mongodb/MultiServerClusterSpecification.groovy b/src/test/com/mongodb/MultiServerClusterSpecification.groovy index 33dbc20d2a4..95e4f470d48 100644 --- a/src/test/com/mongodb/MultiServerClusterSpecification.groovy +++ b/src/test/com/mongodb/MultiServerClusterSpecification.groovy @@ -510,23 +510,24 @@ class MultiServerClusterSpecification extends Specification { def sendNotification(ServerAddress serverAddress, ServerType serverType, List hosts, String setName, ServerAddress trueAddress) { - factory.getServer(serverAddress).sendNotification(getBuilder(serverAddress, serverType, hosts, [], true, setName, null, trueAddress) - .build()) + factory.getServer(serverAddress).sendNotification(getBuilder(serverAddress, serverType, hosts, [], true, setName, null, null, + trueAddress).build()) } def sendNotification(ServerAddress serverAddress, ServerType serverType, List hosts, List passives, String setName) { - factory.getServer(serverAddress).sendNotification(getBuilder(serverAddress, serverType, hosts, passives, true, setName, null, null) - .build()) + factory.getServer(serverAddress).sendNotification(getBuilder(serverAddress, serverType, hosts, passives, true, setName, null, null, + null).build()) } def sendNotification(ServerAddress serverAddress, ServerType serverType, List hosts, ObjectId electionId) { - factory.getServer(serverAddress).sendNotification(getBuilder(serverAddress, serverType, hosts, [], true, 'test', electionId, null) - .build()) + factory.getServer(serverAddress).sendNotification(getBuilder(serverAddress, serverType, hosts, [], true, 'test', 2, electionId, + null).build()) } def sendNotification(ServerAddress serverAddress, ServerType serverType, List hosts, boolean ok) { - factory.getServer(serverAddress).sendNotification(getBuilder(serverAddress, serverType, hosts, [], ok, null, null, null).build()) + factory.getServer(serverAddress).sendNotification(getBuilder(serverAddress, serverType, hosts, [], ok, null, null, null, null) + .build()) } def getClusterDescription(MultiServerCluster cluster) { @@ -542,7 +543,7 @@ class MultiServerClusterSpecification extends Specification { } def getBuilder(ServerAddress serverAddress, ServerType serverType, List hosts, List passives, boolean ok, - String setName, ObjectId electionId, ServerAddress trueAddress) { + String setName, Integer setVersion, ObjectId electionId, ServerAddress trueAddress) { ServerDescription.builder() .address(serverAddress) .type(serverType) @@ -553,5 +554,6 @@ class MultiServerClusterSpecification extends Specification { .passives(passives*.toString() as Set) .setName(setName) .electionId(electionId) + .setVersion(setVersion) } } diff --git a/src/test/com/mongodb/ServerDescriptionTest.java b/src/test/com/mongodb/ServerDescriptionTest.java index f306f28edba..a84aba6009f 100644 --- a/src/test/com/mongodb/ServerDescriptionTest.java +++ b/src/test/com/mongodb/ServerDescriptionTest.java @@ -88,6 +88,7 @@ public void testDefaults() throws UnknownHostException { assertEquals(0, serverDescription.getMinWireVersion()); assertEquals(0, serverDescription.getMaxWireVersion()); assertNull(serverDescription.getElectionId()); + assertNull(serverDescription.getSetVersion()); assertNull(serverDescription.getException()); } @@ -117,6 +118,7 @@ public void testBuilder() throws UnknownHostException { .minWireVersion(1) .maxWireVersion(2) .electionId(new ObjectId("123412341234123412341234")) + .setVersion(2) .exception(exception) .build(); @@ -151,6 +153,7 @@ public void testBuilder() throws UnknownHostException { assertEquals(1, serverDescription.getMinWireVersion()); assertEquals(2, serverDescription.getMaxWireVersion()); assertEquals(new ObjectId("123412341234123412341234"), serverDescription.getElectionId()); + assertEquals(new Integer(2), serverDescription.getSetVersion()); assertEquals(exception, serverDescription.getException()); } @@ -176,6 +179,7 @@ public void testObjectOverrides() throws UnknownHostException { .minWireVersion(1) .maxWireVersion(2) .electionId(new ObjectId()) + .setVersion(2) .exception(new IllegalArgumentException("This is illegal")); assertEquals(builder.build(), builder.build()); assertEquals(builder.build().hashCode(), builder.build().hashCode()); diff --git a/src/test/com/mongodb/ServerMonitorSpecification.groovy b/src/test/com/mongodb/ServerMonitorSpecification.groovy index f82eca2ad42..9d8411ed6ca 100644 --- a/src/test/com/mongodb/ServerMonitorSpecification.groovy +++ b/src/test/com/mongodb/ServerMonitorSpecification.groovy @@ -72,6 +72,19 @@ class ServerMonitorSpecification extends FunctionalSpecification { newDescription.electionId == expected } + def 'should set setVersion'() { + given: + initializeServerMonitor(new ServerAddress()) + CommandResult commandResult = database.command(new BasicDBObject('ismaster', 1)) + def expected = commandResult.get('setVersion') + + when: + latch.await() + + then: + newDescription.setVersion == expected + } + @IgnoreIf( { serverIsAtLeastVersion(2.6) } ) def 'should set default max wire batch size when not provided by server'() { given: