diff --git a/nevado-jms/pom.xml b/nevado-jms/pom.xml index a80333b..5d45beb 100644 --- a/nevado-jms/pom.xml +++ b/nevado-jms/pom.xml @@ -5,12 +5,12 @@ org.skyscreamer nevado - 1.3.3-SNAPSHOT + 1.3.2 org.skyscreamer nevado-jms - 1.3.3-SNAPSHOT + 1.3.2-OA7 jar Nevado JMS @@ -51,13 +51,13 @@ org.json json - 20090211 - - - org.codehaus.jackson - jackson-core-asl - 1.9.12 + 20231013 + + + + + joda-time joda-time @@ -65,15 +65,45 @@ - log4j - log4j - 1.2.17 + org.apache.logging.log4j + log4j-1.2-api + 2.23.1 + test + + + + org.apache.logging.log4j + log4j-core + 2.23.1 test + + + com.amazonaws + aws-java-sdk-sqs + 1.12.377 + + + + com.fasterxml.jackson.dataformat + jackson-dataformat-cbor + + + com.amazonaws - aws-java-sdk - 1.6.12 + aws-java-sdk-sns + 1.12.377 + + + com.fasterxml.jackson.core + jackson-core + 2.20.0 + + + com.fasterxml.jackson.core + jackson-annotations + 2.20 @@ -90,16 +120,23 @@ org.springframework - spring-beans + spring-context ${spring.version} test + - org.springframework.integration - spring-integration-jms - 2.0.0.RELEASE + org.springframework + spring-beans + ${spring.version} test + + + + + + activemq activemq @@ -116,13 +153,13 @@ junit junit - 4.10 + 4.13.2 test - 3.0.5.RELEASE + 4.3.21.RELEASE diff --git a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/NevadoSession.java b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/NevadoSession.java index 5ea6305..d61df71 100644 --- a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/NevadoSession.java +++ b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/NevadoSession.java @@ -355,8 +355,10 @@ protected String getDurableEndpointQueueName(String durableSubscriptionName) { if (_connection.getClientID() != null) { - queueName += "_client-" + _connection.getClientID() + ""; + queueName += "_" + _connection.getClientID() + ""; } + + return queueName; } diff --git a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnector.java b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnector.java index c3d5118..99b93f5 100644 --- a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnector.java +++ b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnector.java @@ -4,20 +4,17 @@ import com.amazonaws.AmazonServiceException; import com.amazonaws.ClientConfiguration; import com.amazonaws.Protocol; -import com.amazonaws.auth.AWSCredentials; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.services.sns.AmazonSNS; -import com.amazonaws.services.sns.AmazonSNSAsync; -import com.amazonaws.services.sns.AmazonSNSAsyncClient; -import com.amazonaws.services.sns.AmazonSNSClient; +import com.amazonaws.auth.*; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.client.builder.ExecutorFactory; +import com.amazonaws.services.sns.*; import com.amazonaws.services.sns.model.*; -import com.amazonaws.services.sqs.AmazonSQS; -import com.amazonaws.services.sqs.AmazonSQSAsyncClient; -import com.amazonaws.services.sqs.AmazonSQSClient; +import com.amazonaws.services.sqs.*; import com.amazonaws.services.sqs.model.CreateQueueRequest; import com.amazonaws.services.sqs.model.CreateQueueResult; import com.amazonaws.services.sqs.model.ListQueuesRequest; import com.amazonaws.services.sqs.model.ListQueuesResult; +import org.apache.commons.lang.StringUtils; import org.skyscreamer.nevado.jms.connector.AbstractSQSConnector; import org.skyscreamer.nevado.jms.connector.SQSMessage; import org.skyscreamer.nevado.jms.connector.SQSQueue; @@ -53,13 +50,12 @@ public class AmazonAwsSQSConnector extends AbstractSQSConnector { private boolean _testAlwaysPasses = false; - public AmazonAwsSQSConnector(String awsAccessKey, String awsSecretKey, boolean isSecure, long receiveCheckIntervalMs) { - this(awsAccessKey, awsSecretKey, isSecure, receiveCheckIntervalMs, false); + public AmazonAwsSQSConnector(String awsAccessKey, String awsSecretKey, String awsSQSEndpoint, String awsSNSEndpoint, boolean isSecure, long receiveCheckIntervalMs) { + this(awsAccessKey, awsSecretKey, awsSQSEndpoint, awsSNSEndpoint, isSecure, receiveCheckIntervalMs, false); } - public AmazonAwsSQSConnector(String awsAccessKey, String awsSecretKey, boolean isSecure, long receiveCheckIntervalMs, boolean isAsync) { + public AmazonAwsSQSConnector(String awsAccessKey, String awsSecretKey, String awsSQSEndpoint, String awsSNSEndpoint, boolean isSecure, long receiveCheckIntervalMs, boolean isAsync) { super(receiveCheckIntervalMs, isAsync); - AWSCredentials awsCredentials = new BasicAWSCredentials(awsAccessKey, awsSecretKey); ClientConfiguration clientConfiguration = new ClientConfiguration(); String proxyHost = System.getProperty("http.proxyHost"); String proxyPort = System.getProperty("http.proxyPort"); @@ -70,13 +66,77 @@ public AmazonAwsSQSConnector(String awsAccessKey, String awsSecretKey, boolean i } } clientConfiguration.setProtocol(isSecure ? Protocol.HTTPS : Protocol.HTTP); + AwsClientBuilder.EndpointConfiguration awsSQSEndpointConfiguration = new AwsClientBuilder.EndpointConfiguration(awsSQSEndpoint, null); + AwsClientBuilder.EndpointConfiguration awsSNSEndpointConfiguration = new AwsClientBuilder.EndpointConfiguration(awsSNSEndpoint, null); + if (isAsync) { - ExecutorService executorService = Executors.newSingleThreadExecutor(); - _amazonSQS = new AmazonSQSAsyncClient(awsCredentials, clientConfiguration, executorService); - _amazonSNS = new AmazonSNSAsyncClient(awsCredentials, clientConfiguration, executorService); - } else { - _amazonSQS = new AmazonSQSClient(awsCredentials, clientConfiguration); - _amazonSNS = new AmazonSNSClient(awsCredentials, clientConfiguration); + ExecutorFactory executorFactory = new ExecutorFactory() { + @Override + public ExecutorService newExecutor() { + return Executors.newSingleThreadExecutor(); + } + }; + + if(StringUtils.isNotEmpty(awsAccessKey) && StringUtils.isNotEmpty(awsSecretKey)) { + BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials(awsAccessKey, awsSecretKey); + _amazonSQS = AmazonSQSAsyncClientBuilder.standard() + .withCredentials(new AWSStaticCredentialsProvider(basicAWSCredentials)) + .withClientConfiguration(clientConfiguration) + .withExecutorFactory(executorFactory) + .withEndpointConfiguration(awsSQSEndpointConfiguration) + .build(); + _amazonSNS = AmazonSNSAsyncClientBuilder.standard() + .withCredentials(new AWSStaticCredentialsProvider(basicAWSCredentials)) + .withClientConfiguration(clientConfiguration) + .withExecutorFactory(executorFactory) + .withEndpointConfiguration(awsSNSEndpointConfiguration) + .build(); + } + else { + _amazonSQS = AmazonSQSAsyncClientBuilder + .standard() + .withClientConfiguration(clientConfiguration) + .withCredentials(InstanceProfileCredentialsProvider.getInstance()) + .withExecutorFactory(executorFactory) + .withEndpointConfiguration(awsSQSEndpointConfiguration) + .build(); + _amazonSNS = AmazonSNSAsyncClientBuilder + .standard() + .withClientConfiguration(clientConfiguration) + .withCredentials(InstanceProfileCredentialsProvider.getInstance()) + .withExecutorFactory(executorFactory) + .withEndpointConfiguration(awsSNSEndpointConfiguration) + .build(); + } + } + else { + if(StringUtils.isNotEmpty(awsAccessKey) && StringUtils.isNotEmpty(awsSecretKey)) { + BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials(awsAccessKey, awsSecretKey); + _amazonSQS = AmazonSQSClientBuilder.standard() + .withCredentials(new AWSStaticCredentialsProvider(basicAWSCredentials)) + .withClientConfiguration(clientConfiguration) + .withEndpointConfiguration(awsSQSEndpointConfiguration) + .build(); + _amazonSNS = AmazonSNSClientBuilder.standard() + .withCredentials(new AWSStaticCredentialsProvider(basicAWSCredentials)) + .withClientConfiguration(clientConfiguration) + .withEndpointConfiguration(awsSNSEndpointConfiguration) + .build(); + } + else { + _amazonSQS = AmazonSQSClientBuilder + .standard() + .withClientConfiguration(clientConfiguration) + .withCredentials(InstanceProfileCredentialsProvider.getInstance()) + .withEndpointConfiguration(awsSQSEndpointConfiguration) + .build(); + _amazonSNS = AmazonSNSClientBuilder + .standard() + .withClientConfiguration(clientConfiguration) + .withCredentials(InstanceProfileCredentialsProvider.getInstance()) + .withEndpointConfiguration(awsSNSEndpointConfiguration) + .build(); + } } } diff --git a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnectorFactory.java b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnectorFactory.java index 26b7df4..4bddd0e 100644 --- a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnectorFactory.java +++ b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/AmazonAwsSQSConnectorFactory.java @@ -15,19 +15,13 @@ public class AmazonAwsSQSConnectorFactory extends AbstractSQSConnectorFactory { @Override public AmazonAwsSQSConnector getInstance(String awsAccessKey, String awsSecretKey, String awsSQSEndpoint, String awsSNSEndpoint) { - AmazonAwsSQSConnector amazonAwsSQSConnector = createConnector(awsAccessKey, awsSecretKey); + AmazonAwsSQSConnector amazonAwsSQSConnector = createConnector(awsAccessKey, awsSecretKey, awsSQSEndpoint, awsSNSEndpoint); amazonAwsSQSConnector.setTestAlwaysPasses(_testAlwaysPasses); - if (StringUtils.isNotEmpty(awsSQSEndpoint)) { - amazonAwsSQSConnector.getAmazonSQS().setEndpoint(awsSQSEndpoint); - } - if (StringUtils.isNotEmpty(awsSNSEndpoint)) { - amazonAwsSQSConnector.getAmazonSNS().setEndpoint(awsSNSEndpoint); - } return amazonAwsSQSConnector; } - protected AmazonAwsSQSConnector createConnector(String awsAccessKey, String awsSecretKey) { - return new AmazonAwsSQSConnector(awsAccessKey, awsSecretKey, _isSecure, _receiveCheckIntervalMs, _useAsyncSend); + protected AmazonAwsSQSConnector createConnector(String awsAccessKey, String awsSecretKey, String awsSQSEndpoint, String awsSNSEndpoint) { + return new AmazonAwsSQSConnector(awsAccessKey, awsSecretKey, awsSQSEndpoint, awsSNSEndpoint, _isSecure, _receiveCheckIntervalMs, _useAsyncSend); } public void setUseAsyncSend(boolean useAsyncSend) { diff --git a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/PlainTextAmazonSQSConnector.java b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/PlainTextAmazonSQSConnector.java index 2454d6e..1bc8ce4 100644 --- a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/PlainTextAmazonSQSConnector.java +++ b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/PlainTextAmazonSQSConnector.java @@ -1,43 +1,43 @@ -package org.skyscreamer.nevado.jms.connector.amazonaws; - -import javax.jms.JMSException; -import javax.jms.TextMessage; - -import org.skyscreamer.nevado.jms.message.NevadoMessage; -import org.skyscreamer.nevado.jms.message.NevadoTextMessage; - -/** - * Overrides the serialisation handling from AbstractSQSConnector so that raw - * text messages can be received/sent without wrapping. - * - * @author qi.chen - */ -public class PlainTextAmazonSQSConnector extends AmazonAwsSQSConnector { - - public PlainTextAmazonSQSConnector(String awsAccessKey, String awsSecretKey, boolean isSecure, - long receiveCheckIntervalMs) { - super(awsAccessKey, awsSecretKey, isSecure, receiveCheckIntervalMs); - } - - public PlainTextAmazonSQSConnector(String awsAccessKey, String awsSecretKey, boolean isSecure, - long receiveCheckIntervalMs, boolean isAsync) { - super(awsAccessKey, awsSecretKey, isSecure, receiveCheckIntervalMs, isAsync); - } - - @Override - protected String serializeMessage(NevadoMessage message) throws JMSException { - if (message instanceof TextMessage) { - return ((TextMessage) message).getText(); - } else { - throw new UnsupportedOperationException(getClass().getSimpleName() + " does not support " + message); - } - } - - @Override - protected NevadoMessage deserializeMessage(String serializedMessage) throws JMSException { - NevadoTextMessage out = new NevadoTextMessage(); - out.setText(serializedMessage); - return out; - } - -} +package org.skyscreamer.nevado.jms.connector.amazonaws; + +import javax.jms.JMSException; +import javax.jms.TextMessage; + +import org.skyscreamer.nevado.jms.message.NevadoMessage; +import org.skyscreamer.nevado.jms.message.NevadoTextMessage; + +/** + * Overrides the serialisation handling from AbstractSQSConnector so that raw + * text messages can be received/sent without wrapping. + * + * @author qi.chen + */ +public class PlainTextAmazonSQSConnector extends AmazonAwsSQSConnector { + + public PlainTextAmazonSQSConnector(String awsAccessKey, String awsSecretKey, String awsSQSEndpoint, String awsSNSEndpoint, boolean isSecure, + long receiveCheckIntervalMs) { + super(awsAccessKey, awsSecretKey, awsSQSEndpoint, awsSNSEndpoint, isSecure, receiveCheckIntervalMs); + } + + public PlainTextAmazonSQSConnector(String awsAccessKey, String awsSecretKey, String awsSQSEndpoint, String awsSNSEndpoint, boolean isSecure, + long receiveCheckIntervalMs, boolean isAsync) { + super(awsAccessKey, awsSecretKey, awsSQSEndpoint, awsSNSEndpoint, isSecure, receiveCheckIntervalMs, isAsync); + } + + @Override + protected String serializeMessage(NevadoMessage message) throws JMSException { + if (message instanceof TextMessage) { + return ((TextMessage) message).getText(); + } else { + throw new UnsupportedOperationException(getClass().getSimpleName() + " does not support " + message); + } + } + + @Override + protected NevadoMessage deserializeMessage(String serializedMessage) throws JMSException { + NevadoTextMessage out = new NevadoTextMessage(); + out.setText(serializedMessage); + return out; + } + +} diff --git a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/PlainTextAmazonSQSConnectorFactory.java b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/PlainTextAmazonSQSConnectorFactory.java index 2ad7e93..f578d48 100644 --- a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/PlainTextAmazonSQSConnectorFactory.java +++ b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/connector/amazonaws/PlainTextAmazonSQSConnectorFactory.java @@ -1,19 +1,19 @@ -package org.skyscreamer.nevado.jms.connector.amazonaws; - -import org.skyscreamer.nevado.jms.connector.amazonaws.AmazonAwsSQSConnector; -import org.skyscreamer.nevado.jms.connector.amazonaws.AmazonAwsSQSConnectorFactory; - -/** - * Produces {@link PlainTextAmazonSQSConnector} - * - * @author qi.chen - */ -public class PlainTextAmazonSQSConnectorFactory extends AmazonAwsSQSConnectorFactory { - - @Override - protected AmazonAwsSQSConnector createConnector(String awsAccessKey, String awsSecretKey) { - return new PlainTextAmazonSQSConnector(awsAccessKey, awsSecretKey, _isSecure, _receiveCheckIntervalMs, - _useAsyncSend); - } - -} +package org.skyscreamer.nevado.jms.connector.amazonaws; + +import org.skyscreamer.nevado.jms.connector.amazonaws.AmazonAwsSQSConnector; +import org.skyscreamer.nevado.jms.connector.amazonaws.AmazonAwsSQSConnectorFactory; + +/** + * Produces {@link PlainTextAmazonSQSConnector} + * + * @author qi.chen + */ +public class PlainTextAmazonSQSConnectorFactory extends AmazonAwsSQSConnectorFactory { + + @Override + protected AmazonAwsSQSConnector createConnector(String awsAccessKey, String awsSecretKey, String awsSQSEndpoint, String awsSNSEndpoint) { + return new PlainTextAmazonSQSConnector(awsAccessKey, awsSecretKey, awsSQSEndpoint, awsSNSEndpoint, _isSecure, _receiveCheckIntervalMs, + _useAsyncSend); + } + +} diff --git a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/destination/NevadoProviderQueuePrefix.java b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/destination/NevadoProviderQueuePrefix.java index b611870..84c2aac 100644 --- a/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/destination/NevadoProviderQueuePrefix.java +++ b/nevado-jms/src/main/java/org/skyscreamer/nevado/jms/destination/NevadoProviderQueuePrefix.java @@ -6,7 +6,7 @@ * @author Carter Page */ public enum NevadoProviderQueuePrefix { - TEMPORARY_DESTINATION_PREFIX("nevado_temp_"), DURABLE_SUBSCRIPTION_PREFIX("nevado_durable_topic_"); + TEMPORARY_DESTINATION_PREFIX("nevado_temp_"), DURABLE_SUBSCRIPTION_PREFIX("nev_"); private final String _value; diff --git a/pom.xml b/pom.xml index 08a2ace..9b829f9 100644 --- a/pom.xml +++ b/pom.xml @@ -9,7 +9,7 @@ org.skyscreamer nevado - 1.3.3-SNAPSHOT + 1.3.2-OA7 pom Nevado