Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,7 @@ public class ActiveMQJMSConstants {
public static final int INDIVIDUAL_ACKNOWLEDGE = 101;

public static final String JMS_ACTIVEMQ_ENABLE_BYTE_ARRAY_JMS_CORRELATION_ID_PROPERTY_NAME = "amq.jms.support-bytes-id";

public static final String JMS_MAX_TEXT_MESSAGE_SIZE = "jmsMaxTextMessageSize";

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -54,6 +55,7 @@
import org.apache.activemq.artemis.core.version.Version;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.VersionLoader;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
Expand Down Expand Up @@ -134,6 +136,7 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme

private final ConnectionFactoryOptions options;

private Integer jmsMaxTextMessageSize;

public ActiveMQConnection(final ConnectionFactoryOptions options,
final String username,
Expand Down Expand Up @@ -170,6 +173,16 @@ public ActiveMQConnection(final ConnectionFactoryOptions options,
this.enable1xPrefixes = enable1xPrefixes;

creationStack = new Exception();

if (sessionFactory != null && sessionFactory.getConnectorConfiguration() != null) {
int maxSize = ConfigurationHelper.getIntProperty(
ActiveMQJMSConstants.JMS_MAX_TEXT_MESSAGE_SIZE,
0,
sessionFactory.getConnectorConfiguration().getExtraParams());
if (maxSize > 0) {
this.jmsMaxTextMessageSize = maxSize;
}
}
}

/**
Expand Down Expand Up @@ -647,6 +660,10 @@ public void authorize(boolean validateClientId) throws JMSException {
}
}

public Optional<Integer> getJmsMaxTextMessageSize() {
return Optional.ofNullable(jmsMaxTextMessageSize);
}

private void addSessionMetaData(ClientSession session) throws ActiveMQException {
session.addMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY, "");
if (clientID != null) {
Expand Down Expand Up @@ -680,7 +697,6 @@ public String getDeserializationAllowList() {
return this.factoryReference.getDeserializationAllowList();
}


private static class JMSFailureListener implements SessionFailureListener {

private final WeakReference<ActiveMQConnection> connectionRef;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQCompatibleMessage;

import java.util.Optional;

import static org.apache.activemq.artemis.api.core.Message.TEXT_TYPE;

/**
* ActiveMQ Artemis implementation of a JMS MessageConsumer.
*/
Expand Down Expand Up @@ -66,8 +70,6 @@ public final class ActiveMQMessageConsumer implements QueueReceiver, TopicSubscr

private final SimpleString autoDeleteQueueName;



protected ActiveMQMessageConsumer(final ConnectionFactoryOptions options,
final ActiveMQConnection connection,
final ActiveMQSession session,
Expand Down Expand Up @@ -216,6 +218,17 @@ private ActiveMQMessage getMessage(final long timeout, final boolean noWait) thr
ActiveMQMessage jmsMsg = null;

if (coreMessage != null) {

Optional<Integer> jmsMaxTextMessageSize = connection.getJmsMaxTextMessageSize();
if (jmsMaxTextMessageSize.isPresent()) {
if (coreMessage.getType() == TEXT_TYPE && coreMessage.getBodySize() > jmsMaxTextMessageSize.get()) {
String errorMsg = "The text message exceeds maximum set size of %d bytes.".formatted(jmsMaxTextMessageSize.get());
ActiveMQException amqe = new ActiveMQException(errorMsg);
ActiveMQClientLogger.LOGGER.unableToGetMessage(amqe);
throw amqe;
}
}

ClientSession coreSession = session.getCoreSession();
boolean needSession = ackMode == Session.CLIENT_ACKNOWLEDGE ||
ackMode == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.Wait;
Expand Down Expand Up @@ -3670,6 +3671,92 @@ public void testExceptionMessageListenerStopSession() throws Exception {
}
}

@Test
public void testReceiveThrowsTextMessageAboveSetMaxSize() throws Exception {
Connection producerConnection = null;

Connection consumerConnection = null;

try {
String brokerURL = "tcp://127.0.0.1:61616?blockOnAcknowledge=true&blockOnDurableSend=true&blockOnNonDurableSend=true";

producerConnection = createConnection(new ActiveMQConnectionFactory(brokerURL));

var cf = new ActiveMQConnectionFactory(brokerURL + "&jmsMaxTextMessageSize=20");
cf.setMinLargeMessageSize(10);
consumerConnection = createConnection(
cf
);

Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);

MessageProducer queueProducer = producerSession.createProducer(queue1);

MessageConsumer queueConsumer = consumerSession.createConsumer(queue1);

consumerConnection.start();

TextMessage tm = producerSession.createTextMessage("Hello, world!");

queueProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

queueProducer.send(tm);

JMSException exception = ProxyAssertSupport.assertThrows(JMSException.class, queueConsumer::receive);
ProxyAssertSupport.assertEquals("The text message exceeds maximum set size of 20 bytes.", exception.getMessage());
} finally {
if (producerConnection != null) {
producerConnection.close();
}
if (consumerConnection != null) {
consumerConnection.close();
}
}
}

@Test
public void testReceiveDoesNotThrowWhenMessageIsBelowSetMaximuSize() throws Exception {
Connection producerConnection = null;

Connection consumerConnection = null;

try {
String brokerURL = "tcp://127.0.0.1:61616?blockOnAcknowledge=true&blockOnDurableSend=true&blockOnNonDurableSend=true";

producerConnection = createConnection(new ActiveMQConnectionFactory(brokerURL));

consumerConnection = createConnection(
new ActiveMQConnectionFactory(brokerURL + "&jmsMaxTextMessageSize=100")
);

Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);

MessageProducer queueProducer = producerSession.createProducer(queue1);

MessageConsumer queueConsumer = consumerSession.createConsumer(queue1);

consumerConnection.start();

TextMessage tm = producerSession.createTextMessage("Hello, world!");

queueProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

queueProducer.send(tm);

ProxyAssertSupport.assertDoesNotThrow(queueConsumer::receive);
} finally {
if (producerConnection != null) {
producerConnection.close();
}
if (consumerConnection != null) {
consumerConnection.close();
}
}
}

private class ConnectionCloseMessageListener implements MessageListener {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.activemq.artemis.jms.tests.util;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.function.Executable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
Expand Down Expand Up @@ -341,4 +342,22 @@ public static void assertNotSame(final java.lang.Object object, final java.lang.
throw e;
}
}

public static <T extends Throwable> T assertThrows(Class<T> expectedType, Executable executable) {
try {
return Assertions.assertThrows(expectedType, executable);
} catch (AssertionError e) {
logger.warn("AssertionFailure::", e);
throw e;
}
}

public static void assertDoesNotThrow(Executable executable) {
try {
Assertions.assertDoesNotThrow(executable);
} catch (AssertionError e) {
logger.warn("AssertionFailure::", e);
throw e;
}
}
}