Skip to content

Commit

Permalink
JAVA-2238: Remove DefaultServerMonitor.invalidate and replace usage o…
Browse files Browse the repository at this point in the history
…f it with DefaultServerMonitor.check. This removes any need to create a new monitor thread and simplifies any necessary synchronization that goes along with that.
  • Loading branch information
jyemin committed Jul 15, 2016
1 parent 6d29d1a commit 2c14b2b
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void invalidate() {
.address(serverId.getAddress())
.build()));
connectionPool.invalidate();
serverMonitor.invalidate();
connect();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static com.mongodb.assertions.Assertions.isTrue;
import static com.mongodb.connection.CommandHelper.executeCommand;
import static com.mongodb.connection.DescriptionHelper.createServerDescription;
import static com.mongodb.connection.ServerConnectionState.CONNECTING;
Expand All @@ -52,8 +51,8 @@ class DefaultServerMonitor implements ServerMonitor {
private final InternalConnectionFactory internalConnectionFactory;
private final ConnectionPool connectionPool;
private final ServerSettings settings;
private volatile ServerMonitorRunnable monitor;
private volatile Thread monitorThread;
private final ServerMonitorRunnable monitor;
private final Thread monitorThread;
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private volatile boolean isClosed;
Expand All @@ -69,7 +68,9 @@ class DefaultServerMonitor implements ServerMonitor {
this.serverStateListener = serverStateListener;
this.internalConnectionFactory = internalConnectionFactory;
this.connectionPool = connectionPool;
monitorThread = createMonitorThread();
monitor = new ServerMonitorRunnable();
monitorThread = new Thread(monitor, "cluster-" + this.serverId.getClusterId() + "-" + this.serverId.getAddress());
monitorThread.setDaemon(true);
isClosed = false;
}

Expand All @@ -88,45 +89,23 @@ public void connect() {
}
}

@Override
public void invalidate() {
isTrue("open", !isClosed);
monitor.close();
monitorThread.interrupt();
monitorThread = createMonitorThread();
monitorThread.start();
}

@Override
public void close() {
monitor.close();
monitorThread.interrupt();
isClosed = true;
}

Thread createMonitorThread() {
monitor = new ServerMonitorRunnable();
Thread monitorThread = new Thread(monitor, "cluster-" + serverId.getClusterId() + "-" + serverId.getAddress());
monitorThread.setDaemon(true);
return monitorThread;
monitorThread.interrupt();
}

class ServerMonitorRunnable implements Runnable {
private volatile boolean monitorIsClosed;
private final ExponentiallyWeightedMovingAverage averageRoundTripTime = new ExponentiallyWeightedMovingAverage(0.2);

public void close() {
monitorIsClosed = true;
}

@Override
@SuppressWarnings("unchecked")
public synchronized void run() {
InternalConnection connection = null;
try {
ServerDescription currentServerDescription = getConnectingServerDescription(null);
Throwable currentException = null;
while (!monitorIsClosed) {
while (!isClosed) {
ServerDescription previousServerDescription = currentServerDescription;
Throwable previousException = currentException;
currentException = null;
Expand Down Expand Up @@ -167,7 +146,7 @@ public synchronized void run() {
currentServerDescription = getConnectingServerDescription(t);
}

if (!monitorIsClosed) {
if (!isClosed) {
try {
logStateChange(previousServerDescription, previousException, currentServerDescription, currentException);
sendStateChangedEvent(previousServerDescription, currentServerDescription);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ interface ServerMonitor {

void connect();

void invalidate();

void close();

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.mongodb.connection

import com.mongodb.MongoSocketOpenException
import com.mongodb.MongoSocketReadTimeoutException
import com.mongodb.ServerAddress
import com.mongodb.event.ServerHeartbeatFailedEvent
Expand All @@ -38,78 +37,6 @@ class DefaultServerMonitorSpecification extends Specification {

DefaultServerMonitor monitor

def 'A thread interrupt should send a sendStateChangedEvent'() {
given:
def stateChanged = false
def latch = new CountDownLatch(1);
def changeListener = new ChangeListener<ServerDescription>() {
@Override
void stateChanged(final ChangeEvent<ServerDescription> event) {
stateChanged = true;
latch.countDown()
}
}
def internalConnectionFactory = Mock(InternalConnectionFactory) {
create(_) >> {
Mock(InternalConnection) {
open() >> { throw new MongoSocketOpenException('open', new ServerAddress(), new IOException()) }
}
}
}
monitor = new DefaultServerMonitor(new ServerId(new ClusterId(), new ServerAddress()),
ServerSettings.builder().addServerListener(new NoOpServerListener()).build(),
changeListener, internalConnectionFactory, new TestConnectionPool())
monitor.start()

when:
monitor.monitorThread.interrupt()
latch.await()

then:
stateChanged

cleanup:
monitor?.close()
}

def 'invalidate should not send a sendStateChangedEvent'() {
given:
def stateChanged = false
def changeListener = new ChangeListener<ServerDescription>() {
@Override
void stateChanged(final ChangeEvent<ServerDescription> event) {
stateChanged = true;
}
}
def latch = new CountDownLatch(1)
def internalConnectionFactory = Mock(InternalConnectionFactory) {
create(_) >> {
Mock(InternalConnection) {
open() >> {
latch.countDown()
Thread.sleep(Long.MAX_VALUE);
}
}
}
}
monitor = new DefaultServerMonitor(new ServerId(new ClusterId(), new ServerAddress()),
ServerSettings.builder().addServerListener(new NoOpServerListener()).build(),
changeListener, internalConnectionFactory, new TestConnectionPool())
monitor.start()
def monitorThread = monitor.monitorThread
latch.await()

when:
monitor.invalidate()
monitorThread.join();

then:
!stateChanged

cleanup:
monitor?.close()
}

def 'close should not send a sendStateChangedEvent'() {
given:
def stateChanged = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class DefaultServerSpecification extends Specification {
then:
thrown(MongoSecurityException)
1 * connectionPool.invalidate()
1 * serverMonitor.invalidate()
1 * serverMonitor.connect()
}

def 'failed open should invalidate the server asychronously'() {
Expand All @@ -170,7 +170,7 @@ class DefaultServerSpecification extends Specification {
then:
!receivedConnection
receivedThrowable.is(exceptionToThrow)
1 * serverMonitor.invalidate()
1 * serverMonitor.connect()
}

def 'should invalidate on MongoNotPrimaryException'() {
Expand All @@ -196,7 +196,7 @@ class DefaultServerSpecification extends Specification {
then:
thrown(MongoNotPrimaryException)
1 * connectionPool.invalidate()
1 * serverMonitor.invalidate()
1 * serverMonitor.connect()

when:
def futureResultCallback = new FutureResultCallback()
Expand All @@ -207,7 +207,7 @@ class DefaultServerSpecification extends Specification {
then:
thrown(MongoNotPrimaryException)
1 * connectionPool.invalidate()
1 * serverMonitor.invalidate()
1 * serverMonitor.connect()

when:
futureResultCallback = new FutureResultCallback()
Expand All @@ -218,7 +218,7 @@ class DefaultServerSpecification extends Specification {
then:
thrown(MongoNotPrimaryException)
1 * connectionPool.invalidate()
1 * serverMonitor.invalidate()
1 * serverMonitor.connect()
}

def 'should invalidate on MongoNodeIsRecoveringException'() {
Expand All @@ -244,7 +244,7 @@ class DefaultServerSpecification extends Specification {
then:
thrown(MongoNodeIsRecoveringException)
1 * connectionPool.invalidate()
1 * serverMonitor.invalidate()
1 * serverMonitor.connect()
}


Expand All @@ -271,7 +271,7 @@ class DefaultServerSpecification extends Specification {
then:
thrown(MongoSocketException)
1 * connectionPool.invalidate()
1 * serverMonitor.invalidate()
1 * serverMonitor.connect()

when:
def futureResultCallback = new FutureResultCallback<WriteConcernResult>()
Expand All @@ -282,7 +282,7 @@ class DefaultServerSpecification extends Specification {
then:
thrown(MongoSocketException)
1 * connectionPool.invalidate()
1 * serverMonitor.invalidate()
1 * serverMonitor.connect()
}

def 'should not invalidate on MongoSocketReadTimeoutException'() {
Expand All @@ -309,7 +309,7 @@ class DefaultServerSpecification extends Specification {
then:
thrown(MongoSocketReadTimeoutException)
0 * connectionPool.invalidate()
0 * serverMonitor.invalidate()
0 * serverMonitor.connect()

when:
def futureResultCallback = new FutureResultCallback<WriteConcernResult>()
Expand All @@ -320,7 +320,7 @@ class DefaultServerSpecification extends Specification {
then:
thrown(MongoSocketReadTimeoutException)
0 * connectionPool.invalidate()
0 * serverMonitor.invalidate()
0 * serverMonitor.connect()
}

def 'should enable command listener'() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ public void start() {
public void connect() {
}

@Override
public void invalidate() {
}

@Override
public void close() {
}
Expand Down

0 comments on commit 2c14b2b

Please sign in to comment.