Skip to content

Commit

Permalink
JAVA-2069: Use both setVersion and electionId to detect a stale primary
Browse files Browse the repository at this point in the history
  • Loading branch information
jyemin committed Jan 13, 2016
1 parent 0a3e00d commit 062a2bb
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 15 deletions.
4 changes: 4 additions & 0 deletions src/main/com/mongodb/BaseCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ protected void fireChangeEvent() {
clusterListener.clusterDescriptionChanged(new ClusterDescriptionChangedEvent(clusterId, description));
}

ClusterDescription getCurrentDescription() {
return description;
}

// gets a random server that still exists in the cluster. Returns null if there are none.
private ClusterableServer getRandomServer(final List<ServerDescription> serverDescriptions) {
while (!serverDescriptions.isEmpty()) {
Expand Down
28 changes: 24 additions & 4 deletions src/main/com/mongodb/MultiServerCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ final class MultiServerCluster extends BaseCluster {
private ClusterType clusterType;
private String replicaSetName;
private ObjectId maxElectionId;
private Integer maxSetVersion;
private final ConcurrentMap<ServerAddress, ServerTuple> addressToServerTupleMap =
new ConcurrentHashMap<ServerAddress, ServerTuple>();

Expand Down Expand Up @@ -201,10 +202,13 @@ private boolean handleReplicaSetMemberChanged(final ServerDescription newDescrip
}

if (newDescription.isPrimary()) {
if (newDescription.getElectionId() != null) {
if (maxElectionId != null && maxElectionId.compareTo(newDescription.getElectionId()) > 0) {
LOGGER.info(format("Invalidating potential primary %s whose election id %s is less than the max election id seen so "
+ "far %s", newDescription.getAddress(), newDescription.getElectionId(), maxElectionId));
if (newDescription.getSetVersion() != null && newDescription.getElectionId() != null) {
if (isStalePrimary(newDescription)) {
LOGGER.info(format("Invalidating potential primary %s whose (set version, election id) tuple of (%d, %s) "
+ "is less than one already seen of (%d, %s)",
newDescription.getAddress(),
newDescription.getSetVersion(), newDescription.getElectionId(),
maxSetVersion, maxElectionId));
addressToServerTupleMap.get(newDescription.getAddress()).server.invalidate();
return false;
}
Expand All @@ -216,6 +220,13 @@ private boolean handleReplicaSetMemberChanged(final ServerDescription newDescrip
}
}

if (newDescription.getSetVersion() != null
&& (maxSetVersion == null || newDescription.getSetVersion().compareTo(maxSetVersion) > 0)) {
LOGGER.info(format("Setting max set version to %d from replica set primary %s", newDescription.getSetVersion(),
newDescription.getAddress()));
maxSetVersion = newDescription.getSetVersion();
}

if (isNotAlreadyPrimary(newDescription.getAddress())) {
LOGGER.info(format("Discovered replica set primary %s", newDescription.getAddress()));
}
Expand All @@ -224,6 +235,15 @@ private boolean handleReplicaSetMemberChanged(final ServerDescription newDescrip
return true;
}

private boolean isStalePrimary(final ServerDescription newDescription) {
if (maxSetVersion == null || maxElectionId == null) {
return false;
}

return (maxSetVersion.compareTo(newDescription.getSetVersion()) > 0
|| (maxSetVersion.equals(newDescription.getSetVersion()) && maxElectionId.compareTo(newDescription.getElectionId()) > 0));
}

private boolean isNotAlreadyPrimary(final ServerAddress address) {
ServerTuple serverTuple = addressToServerTupleMap.get(address);
return serverTuple == null || !serverTuple.description.isPrimary();
Expand Down
29 changes: 28 additions & 1 deletion src/main/com/mongodb/ServerDescription.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class ServerDescription {
private final int maxWireVersion;

private final ObjectId electionId;
private final Integer setVersion;

private final Throwable exception;

Expand All @@ -97,6 +98,7 @@ static class Builder {
private int minWireVersion = 0;
private int maxWireVersion = 0;
private ObjectId electionId;
private Integer setVersion;
private Throwable exception;

// CHECKSTYLE:OFF
Expand Down Expand Up @@ -202,6 +204,17 @@ public Builder electionId(final ObjectId electionId) {
return this;
}

/**
* Sets the setVersion reported by this server.
*
* @param setVersion the set version
* @return this
*/
public Builder setVersion(final Integer setVersion) {
this.setVersion = setVersion;
return this;
}

public Builder exception(final Throwable exception) {
this.exception = exception;
return this;
Expand Down Expand Up @@ -343,6 +356,15 @@ public ObjectId getElectionId() {
return electionId;
}

/**
* The replica set setVersion reported by this MongoDB server.
*
* @return the setVersion, which may be null
*/
public Integer getSetVersion() {
return setVersion;
}

/**
* Returns true if the server has the given tags. A server of either type {@code ServerType.StandAlone} or
* {@code ServerType.ShardRouter} is considered to have all tags, so this method will always return true for instances of either of
Expand Down Expand Up @@ -467,7 +489,9 @@ public boolean equals(final Object o) {
if (electionId != null ? !electionId.equals(that.electionId) : that.electionId != null) {
return false;
}

if (setVersion != null ? !setVersion.equals(that.setVersion) : that.setVersion != null) {
return false;
}
// Compare class equality and message as exceptions rarely override equals
Class thisExceptionClass = exception != null ? exception.getClass() : null;
Class thatExceptionClass = that.exception != null ? that.exception.getClass() : null;
Expand Down Expand Up @@ -505,6 +529,7 @@ public int hashCode() {
result = 31 * result + minWireVersion;
result = 31 * result + maxWireVersion;
result = 31 * result + (electionId != null ? electionId.hashCode() : 0);
result = 31 * result + (setVersion != null ? setVersion.hashCode() : 0);
result = 31 * result + (exception == null ? 0 : exception.getClass().hashCode());
result = 31 * result + (exception == null ? 0 : exception.getMessage().hashCode());
return result;
Expand All @@ -524,6 +549,7 @@ public String toString() {
+ ", maxMessageSize=" + maxMessageSize
+ ", maxWriteBatchSize=" + maxWriteBatchSize
+ ", electionId=" + electionId
+ ", setVersion=" + setVersion
+ ", tagSet=" + tagSet
+ ", setName='" + setName + '\''
+ ", averageLatencyNanos=" + averageLatencyNanos
Expand Down Expand Up @@ -587,6 +613,7 @@ private String getAverageLatencyFormattedInMilliseconds() {
minWireVersion = builder.minWireVersion;
maxWireVersion = builder.maxWireVersion;
electionId = builder.electionId;
setVersion = builder.setVersion;
exception = builder.exception;
}
}
5 changes: 3 additions & 2 deletions src/main/com/mongodb/ServerMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,8 @@ private ServerDescription lookupServerDescription(final DBPort connection) throw
}

@SuppressWarnings("unchecked")
private ServerDescription createDescription(final CommandResult commandResult, final ServerVersion serverVersion,
final long averageLatencyNanos) {
ServerDescription createDescription(final CommandResult commandResult, final ServerVersion serverVersion,
final long averageLatencyNanos) {
return ServerDescription.builder()
.state(ServerConnectionState.Connected)
.version(serverVersion)
Expand All @@ -260,6 +260,7 @@ private ServerDescription createDescription(final CommandResult commandResult, f
.minWireVersion(commandResult.getInt("minWireVersion", ServerDescription.getDefaultMinWireVersion()))
.maxWireVersion(commandResult.getInt("maxWireVersion", ServerDescription.getDefaultMaxWireVersion()))
.electionId(commandResult.containsKey("electionId") ? commandResult.getObjectId("electionId") : null)
.setVersion(commandResult.containsKey("setVersion") ? commandResult.getInt("setVersion") : null)
.averageLatency(averageLatencyNanos, TimeUnit.NANOSECONDS)
.ok(commandResult.ok()).build();
}
Expand Down
18 changes: 10 additions & 8 deletions src/test/com/mongodb/MultiServerClusterSpecification.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -510,23 +510,24 @@ class MultiServerClusterSpecification extends Specification {

def sendNotification(ServerAddress serverAddress, ServerType serverType, List<ServerAddress> hosts, String setName,
ServerAddress trueAddress) {
factory.getServer(serverAddress).sendNotification(getBuilder(serverAddress, serverType, hosts, [], true, setName, null, trueAddress)
.build())
factory.getServer(serverAddress).sendNotification(getBuilder(serverAddress, serverType, hosts, [], true, setName, null, null,
trueAddress).build())
}

def sendNotification(ServerAddress serverAddress, ServerType serverType, List<ServerAddress> hosts, List<ServerAddress> passives,
String setName) {
factory.getServer(serverAddress).sendNotification(getBuilder(serverAddress, serverType, hosts, passives, true, setName, null, null)
.build())
factory.getServer(serverAddress).sendNotification(getBuilder(serverAddress, serverType, hosts, passives, true, setName, null, null,
null).build())
}

def sendNotification(ServerAddress serverAddress, ServerType serverType, List<ServerAddress> hosts, ObjectId electionId) {
factory.getServer(serverAddress).sendNotification(getBuilder(serverAddress, serverType, hosts, [], true, 'test', electionId, null)
.build())
factory.getServer(serverAddress).sendNotification(getBuilder(serverAddress, serverType, hosts, [], true, 'test', 2, electionId,
null).build())
}

def sendNotification(ServerAddress serverAddress, ServerType serverType, List<ServerAddress> hosts, boolean ok) {
factory.getServer(serverAddress).sendNotification(getBuilder(serverAddress, serverType, hosts, [], ok, null, null, null).build())
factory.getServer(serverAddress).sendNotification(getBuilder(serverAddress, serverType, hosts, [], ok, null, null, null, null)
.build())
}

def getClusterDescription(MultiServerCluster cluster) {
Expand All @@ -542,7 +543,7 @@ class MultiServerClusterSpecification extends Specification {
}

def getBuilder(ServerAddress serverAddress, ServerType serverType, List<ServerAddress> hosts, List<ServerAddress> passives, boolean ok,
String setName, ObjectId electionId, ServerAddress trueAddress) {
String setName, Integer setVersion, ObjectId electionId, ServerAddress trueAddress) {
ServerDescription.builder()
.address(serverAddress)
.type(serverType)
Expand All @@ -553,5 +554,6 @@ class MultiServerClusterSpecification extends Specification {
.passives(passives*.toString() as Set)
.setName(setName)
.electionId(electionId)
.setVersion(setVersion)
}
}
4 changes: 4 additions & 0 deletions src/test/com/mongodb/ServerDescriptionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public void testDefaults() throws UnknownHostException {
assertEquals(0, serverDescription.getMinWireVersion());
assertEquals(0, serverDescription.getMaxWireVersion());
assertNull(serverDescription.getElectionId());
assertNull(serverDescription.getSetVersion());
assertNull(serverDescription.getException());
}

Expand Down Expand Up @@ -117,6 +118,7 @@ public void testBuilder() throws UnknownHostException {
.minWireVersion(1)
.maxWireVersion(2)
.electionId(new ObjectId("123412341234123412341234"))
.setVersion(2)
.exception(exception)
.build();

Expand Down Expand Up @@ -151,6 +153,7 @@ public void testBuilder() throws UnknownHostException {
assertEquals(1, serverDescription.getMinWireVersion());
assertEquals(2, serverDescription.getMaxWireVersion());
assertEquals(new ObjectId("123412341234123412341234"), serverDescription.getElectionId());
assertEquals(new Integer(2), serverDescription.getSetVersion());
assertEquals(exception, serverDescription.getException());
}

Expand All @@ -176,6 +179,7 @@ public void testObjectOverrides() throws UnknownHostException {
.minWireVersion(1)
.maxWireVersion(2)
.electionId(new ObjectId())
.setVersion(2)
.exception(new IllegalArgumentException("This is illegal"));
assertEquals(builder.build(), builder.build());
assertEquals(builder.build().hashCode(), builder.build().hashCode());
Expand Down
13 changes: 13 additions & 0 deletions src/test/com/mongodb/ServerMonitorSpecification.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,19 @@ class ServerMonitorSpecification extends FunctionalSpecification {
newDescription.electionId == expected
}

def 'should set setVersion'() {
given:
initializeServerMonitor(new ServerAddress())
CommandResult commandResult = database.command(new BasicDBObject('ismaster', 1))
def expected = commandResult.get('setVersion')

when:
latch.await()

then:
newDescription.setVersion == expected
}

@IgnoreIf( { serverIsAtLeastVersion(2.6) } )
def 'should set default max wire batch size when not provided by server'() {
given:
Expand Down

0 comments on commit 062a2bb

Please sign in to comment.