diff --git a/nevado-jms/pom.xml b/nevado-jms/pom.xml index a80333b..e7404ec 100644 --- a/nevado-jms/pom.xml +++ b/nevado-jms/pom.xml @@ -72,8 +72,18 @@ com.amazonaws - aws-java-sdk - 1.6.12 + aws-java-sdk-core + 1.11.161 + + + com.amazonaws + aws-java-sdk-sqs + 1.11.161 + + + com.amazonaws + aws-java-sdk-sns + 1.11.161 diff --git a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/AsyncConsumerRunner.java b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/AsyncConsumerRunner.java index 0e46e4d..3feb362 100644 --- a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/AsyncConsumerRunner.java +++ b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/AsyncConsumerRunner.java @@ -45,7 +45,7 @@ public void run() { } _log.debug("Sleeping async loop"); try { - _sleeper.sleep(); + if (_running) { _sleeper.sleep(); } } catch (InterruptedException e) { _log.info("Loop interrupted"); _running = false; @@ -111,6 +111,7 @@ synchronized void stop() throws InterruptedException { if (_running) { _running = false; if (runner != Thread.currentThread()) { + _sleeper.reset(); _sleeper.stopSleeping(); runner.join(); } diff --git a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/AbstractSQSConnector.java b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/AbstractSQSConnector.java index 9209a22..3ee0dda 100644 --- a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/AbstractSQSConnector.java +++ b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/AbstractSQSConnector.java @@ -34,22 +34,33 @@ public abstract class AbstractSQSConnector implements SQSConnector { private final long _receiveCheckIntervalMs; private final boolean _isAsync; + private final int _visibilityTimeoutOnReset; protected AbstractSQSConnector(long receiveCheckIntervalMs) { this(receiveCheckIntervalMs, false); } - protected AbstractSQSConnector(long receiveCheckIntervalMs, boolean isAsync) + protected AbstractSQSConnector(long receiveCheckIntervalMs, boolean isAsync, int visibilityTimeoutOnReset) { _receiveCheckIntervalMs = receiveCheckIntervalMs; _isAsync = isAsync; + _visibilityTimeoutOnReset = visibilityTimeoutOnReset; + } + + protected AbstractSQSConnector(long receiveCheckIntervalMs, boolean isAsync) + { + this(receiveCheckIntervalMs, false, 0); } public boolean isAsync() { return _isAsync; } + public int getVisibilityTimeoutOnReset() { + return _visibilityTimeoutOnReset; + } + public void sendMessage(NevadoDestination destination, NevadoMessage message) throws JMSException { if (destination == null) @@ -127,7 +138,7 @@ public void resetMessage(NevadoMessage message) throws JMSException { "Did this come from an SQS queue?"); } SQSQueue sqsQueue = getSQSQueue(message.getNevadoDestination()); - sqsQueue.setMessageVisibilityTimeout(sqsReceiptHandle, 0); + sqsQueue.setMessageVisibilityTimeout(sqsReceiptHandle, _visibilityTimeoutOnReset); // Customize message visibility timeout } /** @@ -179,7 +190,7 @@ protected SQSMessage receiveSQSMessage(NevadoConnection connection, NevadoDestin if (sqsMessage != null && !connection.isRunning()) { // Connection was stopped while the REST call to SQS was being made try { - sqsQueue.setMessageVisibilityTimeout(sqsMessage.getReceiptHandle(), 0); // Make it immediately available to the next requestor + sqsQueue.setMessageVisibilityTimeout(sqsMessage.getReceiptHandle(), _visibilityTimeoutOnReset); // Customize visibility timeout } catch (JMSException e) { String exMessage = "Unable to reset visibility timeout for message: " + e.getMessage(); _log.warn(exMessage, e); // Non-fatal. Just means the message will disappear until the visibility timeout expires. diff --git a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnector.java b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnector.java index c3d5118..9a5ba16 100644 --- a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnector.java +++ b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnector.java @@ -58,7 +58,11 @@ public AmazonAwsSQSConnector(String awsAccessKey, String awsSecretKey, boolean i } public AmazonAwsSQSConnector(String awsAccessKey, String awsSecretKey, boolean isSecure, long receiveCheckIntervalMs, boolean isAsync) { - super(receiveCheckIntervalMs, isAsync); + this(awsAccessKey, awsSecretKey, isSecure, receiveCheckIntervalMs, isAsync, 0); + } + + public AmazonAwsSQSConnector(String awsAccessKey, String awsSecretKey, boolean isSecure, long receiveCheckIntervalMs, boolean isAsync, int visibilityTimeoutOnReset) { + super(receiveCheckIntervalMs, isAsync, visibilityTimeoutOnReset); AWSCredentials awsCredentials = new BasicAWSCredentials(awsAccessKey, awsSecretKey); ClientConfiguration clientConfiguration = new ClientConfiguration(); String proxyHost = System.getProperty("http.proxyHost"); diff --git a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnectorFactory.java b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnectorFactory.java index 26b7df4..f87649b 100644 --- a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnectorFactory.java +++ b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnectorFactory.java @@ -10,6 +10,7 @@ */ public class AmazonAwsSQSConnectorFactory extends AbstractSQSConnectorFactory { protected boolean _useAsyncSend = false; + protected int _visibilityTimeoutOnReset = 0; private boolean _testAlwaysPasses = false; @@ -27,7 +28,7 @@ public AmazonAwsSQSConnector getInstance(String awsAccessKey, String awsSecretKe } protected AmazonAwsSQSConnector createConnector(String awsAccessKey, String awsSecretKey) { - return new AmazonAwsSQSConnector(awsAccessKey, awsSecretKey, _isSecure, _receiveCheckIntervalMs, _useAsyncSend); + return new AmazonAwsSQSConnector(awsAccessKey, awsSecretKey, _isSecure, _receiveCheckIntervalMs, _useAsyncSend, _visibilityTimeoutOnReset); } public void setUseAsyncSend(boolean useAsyncSend) { @@ -37,6 +38,14 @@ public void setUseAsyncSend(boolean useAsyncSend) { public boolean isUseAsyncSend() { return _useAsyncSend; } + + public int getVisibilityTimeoutOnReset() { + return _visibilityTimeoutOnReset; + } + + public void setVisibilityTimeoutOnReset(int visibilityTimeoutOnReset) { + _visibilityTimeoutOnReset = visibilityTimeoutOnReset; + } public void setTestAlwaysPasses(boolean _testAlwaysPasses) { this._testAlwaysPasses = _testAlwaysPasses; diff --git a/nevado-jms/src/test/java/org/skyscreamer/nevado/jms/metadata/JMSExpirationTest.java b/nevado-jms/src/test/java/org/skyscreamer/nevado/jms/metadata/JMSExpirationTest.java index 38f35c5..499b5ef 100644 --- a/nevado-jms/src/test/java/org/skyscreamer/nevado/jms/metadata/JMSExpirationTest.java +++ b/nevado-jms/src/test/java/org/skyscreamer/nevado/jms/metadata/JMSExpirationTest.java @@ -46,7 +46,7 @@ public void testExpire() throws JMSException, InterruptedException { MessageProducer msgProducer = session.createProducer(tempQueue); msgProducer.send(msgToExpire, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, 10); msgProducer.send(msgWithoutExpire); - Thread.sleep(10); + Thread.sleep(20); Message msgOut = session.createConsumer(tempQueue).receive(); Assert.assertNotNull("Got null message back", msgOut); msgOut.acknowledge();