diff --git a/database/spring-cloud-stream-binder-oracle-txeventq/spring-cloud-stream-binder-txeventq-sample/src/test/java/com/oracle/database/spring/cloud/stream/binder/sample/TxEventQSampleAppTest.java b/database/spring-cloud-stream-binder-oracle-txeventq/spring-cloud-stream-binder-txeventq-sample/src/test/java/com/oracle/database/spring/cloud/stream/binder/sample/TxEventQSampleAppTest.java index b87a9f13..9c736969 100644 --- a/database/spring-cloud-stream-binder-oracle-txeventq/spring-cloud-stream-binder-txeventq-sample/src/test/java/com/oracle/database/spring/cloud/stream/binder/sample/TxEventQSampleAppTest.java +++ b/database/spring-cloud-stream-binder-oracle-txeventq/spring-cloud-stream-binder-txeventq-sample/src/test/java/com/oracle/database/spring/cloud/stream/binder/sample/TxEventQSampleAppTest.java @@ -20,7 +20,7 @@ @Testcontainers public class TxEventQSampleAppTest { @Container - static OracleContainer oracleContainer = new OracleContainer("gvenzl/oracle-free:23.6-slim-faststart") + static OracleContainer oracleContainer = new OracleContainer("gvenzl/oracle-free:23.7-slim-faststart") .withStartupTimeout(Duration.ofMinutes(2)) .withUsername("testuser") .withPassword("testpwd"); diff --git a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/JMSMessageChannelBinder.java b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/JMSMessageChannelBinder.java index ae9cfbd0..266da17c 100644 --- a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/JMSMessageChannelBinder.java +++ b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/JMSMessageChannelBinder.java @@ -30,8 +30,6 @@ import com.oracle.database.spring.cloud.stream.binder.config.JmsProducerProperties; import com.oracle.database.spring.cloud.stream.binder.provisioning.JmsConsumerDestination; import com.oracle.database.spring.cloud.stream.binder.provisioning.JmsProducerDestination; - - import com.oracle.database.spring.cloud.stream.binder.utils.DestinationNameResolver; import com.oracle.database.spring.cloud.stream.binder.utils.JmsMessageDrivenChannelAdapterFactory; import com.oracle.database.spring.cloud.stream.binder.utils.JmsSendingMessageHandlerFactory; @@ -39,7 +37,6 @@ import jakarta.jms.ConnectionFactory; import jakarta.jms.Session; import jakarta.jms.Topic; - import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder; import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; diff --git a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/TxEventQQueueProvisioner.java b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/TxEventQQueueProvisioner.java index a5e14d7d..9ea7b231 100644 --- a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/TxEventQQueueProvisioner.java +++ b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/TxEventQQueueProvisioner.java @@ -1,6 +1,6 @@ /* ** TxEventQ Support for Spring Cloud Stream - ** Copyright (c) 2023, 2024 Oracle and/or its affiliates. + ** Copyright (c) 2023, 2025 Oracle and/or its affiliates. ** ** This file has been modified by Oracle Corporation. */ @@ -24,6 +24,8 @@ package com.oracle.database.spring.cloud.stream.binder; +import java.sql.SQLException; + import com.oracle.database.spring.cloud.stream.binder.config.JmsConsumerProperties; import com.oracle.database.spring.cloud.stream.binder.config.JmsProducerProperties; import com.oracle.database.spring.cloud.stream.binder.plsql.OracleDBUtils; @@ -34,9 +36,6 @@ import jakarta.jms.JMSException; import jakarta.jms.Session; import jakarta.jms.Topic; - -import java.sql.SQLException; - import oracle.jakarta.jms.AQjmsException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -137,7 +136,7 @@ private Topic provisionProducerTopic(String topicName, ExtendedProducerProperties properties) { Connection aQConnection = null; Session session = null; - Topic topic = null; + Topic topic; try { aQConnection = connectionFactory.createConnection(); session = aQConnection.createSession(true, Session.CLIENT_ACKNOWLEDGE); @@ -177,7 +176,7 @@ private Topic provisionConsumerTopic(String topicName, ExtendedConsumerProperties properties) { Connection aQConnection = null; Session session = null; - Topic topic = null; + Topic topic; try { aQConnection = connectionFactory.createConnection(); session = aQConnection.createSession(true, Session.CLIENT_ACKNOWLEDGE); diff --git a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/config/JmsBinderGlobalConfiguration.java b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/config/JmsBinderGlobalConfiguration.java index 0c82dec0..0a17c62f 100644 --- a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/config/JmsBinderGlobalConfiguration.java +++ b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/config/JmsBinderGlobalConfiguration.java @@ -46,7 +46,7 @@ @Configuration public class JmsBinderGlobalConfiguration { - private final ConnectionFactory connectionFactory; + private ConnectionFactory connectionFactory; public JmsBinderGlobalConfiguration(ConnectionFactory connectionFactory) { this.connectionFactory = connectionFactory; diff --git a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/config/JmsExtendedBindingProperties.java b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/config/JmsExtendedBindingProperties.java index 846f9409..7311fd1f 100644 --- a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/config/JmsExtendedBindingProperties.java +++ b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/config/JmsExtendedBindingProperties.java @@ -27,8 +27,8 @@ import java.util.Map; import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider; import org.springframework.cloud.stream.binder.AbstractExtendedBindingProperties; +import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider; @ConfigurationProperties("spring.cloud.stream.txeventq") public class JmsExtendedBindingProperties diff --git a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/config/TxEventQJmsConfiguration.java b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/config/TxEventQJmsConfiguration.java index 05380d5b..426d27f0 100644 --- a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/config/TxEventQJmsConfiguration.java +++ b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/config/TxEventQJmsConfiguration.java @@ -24,17 +24,15 @@ package com.oracle.database.spring.cloud.stream.binder.config; +import java.sql.SQLException; + import com.oracle.database.spring.cloud.stream.binder.TxEventQQueueProvisioner; import com.oracle.database.spring.cloud.stream.binder.plsql.OracleDBUtils; - import jakarta.jms.ConnectionFactory; import jakarta.jms.JMSException; import oracle.jakarta.jms.AQjmsConnectionFactory; import oracle.jakarta.jms.AQjmsFactory; import oracle.ucp.jdbc.PoolDataSource; - -import java.sql.SQLException; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.autoconfigure.AutoConfigureAfter; diff --git a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/plsql/OracleDBUtils.java b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/plsql/OracleDBUtils.java index 27693cf8..d8779177 100644 --- a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/plsql/OracleDBUtils.java +++ b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/plsql/OracleDBUtils.java @@ -29,15 +29,14 @@ import java.sql.SQLException; import java.sql.Types; +import oracle.ucp.jdbc.PoolDataSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import oracle.ucp.jdbc.PoolDataSource; - public class OracleDBUtils { private PoolDataSource pds = null; - private final int dbversion; + private int dbversion; private final Logger logger = LoggerFactory.getLogger(getClass()); private static final String CREATE_KB2_TEQ = diff --git a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/provisioning/JmsProducerDestination.java b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/provisioning/JmsProducerDestination.java index 18222203..355a8e58 100644 --- a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/provisioning/JmsProducerDestination.java +++ b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/provisioning/JmsProducerDestination.java @@ -34,7 +34,7 @@ public class JmsProducerDestination implements ProducerDestination { private final Topic topic; private final int partitionCount; - private final int dbversion; + private int dbversion; public JmsProducerDestination(Topic topic, int pCount, int dbversion) { this.topic = topic; diff --git a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/serialize/CustomSerializationMessageConverter.java b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/serialize/CustomSerializationMessageConverter.java index 480e1a3e..0d309763 100644 --- a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/serialize/CustomSerializationMessageConverter.java +++ b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/serialize/CustomSerializationMessageConverter.java @@ -1,6 +1,6 @@ /* ** TxEventQ Support for Spring Cloud Stream - ** Copyright (c) 2023, 2024 Oracle and/or its affiliates. + ** Copyright (c) 2023, 2025 Oracle and/or its affiliates. ** ** This file has been modified by Oracle Corporation. */ @@ -24,13 +24,12 @@ package com.oracle.database.spring.cloud.stream.binder.serialize; +import jakarta.jms.JMSException; +import jakarta.jms.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.support.converter.SimpleMessageConverter; -import jakarta.jms.JMSException; -import jakarta.jms.Message; - public class CustomSerializationMessageConverter extends SimpleMessageConverter { public String deserializer = null; @@ -66,8 +65,8 @@ public Object fromMessage(Message jmsMessage) throws JMSException { } if (!isInstanceOfDeserializer) { - logger.debug("The configured deserializer class is not an instance of 'com.oracle.database.spring.cloud.stream.binder.serialize.DeSerializer'"); - throw new IllegalArgumentException("The configured serializer class is not an instance of 'com.oracle.database.spring.cloud.stream.binder.serialize.DeSerializer'"); + logger.debug("The configured deserializer class is not an instance of 'com.oracle.cstream.serialize.DeSerializer'"); + throw new IllegalArgumentException("The configured serializer class is not an instance of 'com.oracle.cstream.serialize.DeSerializer'"); } Deserializer s = null; @@ -79,7 +78,7 @@ public Object fromMessage(Message jmsMessage) throws JMSException { throw new IllegalArgumentException("Serializer object could not be initiated."); } - result = s.deserialize((byte[]) result); + result = (Object) (s.deserialize((byte[]) result)); return result; } diff --git a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/serialize/Deserializer.java b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/serialize/Deserializer.java index 61f6e6bd..dfc6ea91 100644 --- a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/serialize/Deserializer.java +++ b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/serialize/Deserializer.java @@ -25,5 +25,5 @@ package com.oracle.database.spring.cloud.stream.binder.serialize; public interface Deserializer { - T deserialize(byte[] bytes); + public T deserialize(byte[] bytes); } diff --git a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/serialize/Serializer.java b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/serialize/Serializer.java index b370f719..4afd1540 100644 --- a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/serialize/Serializer.java +++ b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/serialize/Serializer.java @@ -25,5 +25,5 @@ package com.oracle.database.spring.cloud.stream.binder.serialize; public interface Serializer { - byte[] serialize(Object data); + public byte[] serialize(Object data); } diff --git a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/DestinationNameResolver.java b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/DestinationNameResolver.java index ae2b9aef..118a4d77 100644 --- a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/DestinationNameResolver.java +++ b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/DestinationNameResolver.java @@ -27,7 +27,7 @@ import org.springframework.util.StringUtils; public class DestinationNameResolver { - private final AnonymousNamingStrategy namingStrategy; + private AnonymousNamingStrategy namingStrategy; public DestinationNameResolver(AnonymousNamingStrategy namingStrategy) { this.namingStrategy = namingStrategy; diff --git a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/JmsMessageDrivenChannelAdapterFactory.java b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/JmsMessageDrivenChannelAdapterFactory.java index ddd83aef..24269034 100644 --- a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/JmsMessageDrivenChannelAdapterFactory.java +++ b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/JmsMessageDrivenChannelAdapterFactory.java @@ -26,13 +26,12 @@ import com.oracle.database.spring.cloud.stream.binder.config.JmsConsumerProperties; import com.oracle.database.spring.cloud.stream.binder.serialize.CustomSerializationMessageConverter; - import jakarta.jms.BytesMessage; import jakarta.jms.Destination; import jakarta.jms.JMSException; import jakarta.jms.Message; import jakarta.jms.Session; - +import oracle.jakarta.jms.AQjmsSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; @@ -60,7 +59,7 @@ public class JmsMessageDrivenChannelAdapterFactory private ApplicationContext applicationContext; - private final Logger logger = LoggerFactory.getLogger(getClass()); + private Logger logger = LoggerFactory.getLogger(getClass()); public JmsMessageDrivenChannelAdapterFactory( ListenerContainerFactory listenerContainerFactory, @@ -145,10 +144,12 @@ private static class RetryingChannelPublishingJmsMessageListener private final RetryTemplate retryTemplate; - private final RecoveryCallback recoverer; + private RecoveryCallback recoverer; private String deSerializerClassName = null; + SpecCompliantJmsHeaderMapper headerMapper = new SpecCompliantJmsHeaderMapper(); + RetryingChannelPublishingJmsMessageListener( ConsumerProperties properties, MessageRecoverer messageRecoverer, @@ -191,6 +192,9 @@ public Object doWithRetry(RetryContext retryContext) RETRY_CONTEXT_MESSAGE_ATTRIBUTE, jmsMessage ); + + headerMapper.setConnection(((AQjmsSession) session).getDBConnection()); + RetryingChannelPublishingJmsMessageListener.super.setHeaderMapper(headerMapper); RetryingChannelPublishingJmsMessageListener.super.onMessage( jmsMessage, session diff --git a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/JmsSendingMessageHandlerFactory.java b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/JmsSendingMessageHandlerFactory.java index e286333b..8680ffb3 100644 --- a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/JmsSendingMessageHandlerFactory.java +++ b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/JmsSendingMessageHandlerFactory.java @@ -24,6 +24,7 @@ package com.oracle.database.spring.cloud.stream.binder.utils; +import jakarta.jms.Destination; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; @@ -33,8 +34,6 @@ import org.springframework.jms.core.JmsTemplate; import org.springframework.messaging.MessageChannel; -import jakarta.jms.Destination; - public class JmsSendingMessageHandlerFactory implements ApplicationContextAware, BeanFactoryAware { private final JmsTemplate template; diff --git a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/ListenerContainerFactory.java b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/ListenerContainerFactory.java index c01e7680..a2d2a029 100644 --- a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/ListenerContainerFactory.java +++ b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/ListenerContainerFactory.java @@ -26,14 +26,14 @@ import jakarta.jms.ConnectionFactory; import jakarta.jms.Destination; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.listener.AbstractMessageListenerContainer; import org.springframework.jms.listener.DefaultMessageListenerContainer; public class ListenerContainerFactory { - private final ConnectionFactory factory; + + private ConnectionFactory factory; private static final Logger logger = LoggerFactory.getLogger(ListenerContainerFactory.class); diff --git a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/PartitionAwareJmsSendingMessageHandler.java b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/PartitionAwareJmsSendingMessageHandler.java index 7c0c0af5..046d79a7 100644 --- a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/PartitionAwareJmsSendingMessageHandler.java +++ b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/PartitionAwareJmsSendingMessageHandler.java @@ -1,6 +1,6 @@ /* ** TxEventQ Support for Spring Cloud Stream - ** Copyright (c) 2023, 2024 Oracle and/or its affiliates. + ** Copyright (c) 2023, 2025 Oracle and/or its affiliates. ** ** This file has been modified by Oracle Corporation. */ @@ -24,9 +24,16 @@ package com.oracle.database.spring.cloud.stream.binder.utils; +import java.sql.Connection; +import java.util.function.Consumer; + +import com.oracle.database.spring.cloud.stream.binder.serialize.Serializer; import jakarta.jms.Destination; import jakarta.jms.JMSException; - +import jakarta.jms.MessageProducer; +import jakarta.jms.Session; +import oracle.jakarta.jms.AQjmsSession; +import oracle.jakarta.jms.AQjmsTopicConnectionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cloud.stream.binder.BinderHeaders; @@ -35,12 +42,11 @@ import org.springframework.integration.jms.JmsHeaderMapper; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessagePostProcessor; +import org.springframework.jms.support.converter.MessageConverter; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.support.ErrorMessage; -import com.oracle.database.spring.cloud.stream.binder.serialize.Serializer; - public class PartitionAwareJmsSendingMessageHandler extends AbstractMessageHandler implements Lifecycle { @@ -59,7 +65,7 @@ public class PartitionAwareJmsSendingMessageHandler private int dbversion = 23; - private static final Logger logger = LoggerFactory.getLogger(PartitionAwareJmsSendingMessageHandler.class); + private static final Logger sendLogger = LoggerFactory.getLogger(PartitionAwareJmsSendingMessageHandler.class); public PartitionAwareJmsSendingMessageHandler( JmsTemplate jmsTemplate, @@ -85,9 +91,9 @@ public void setDBVersion(int dbversion) { protected void handleMessageInternal(Message message) { try { - this.jmsHandleMessageInternal(message); + this.handleJMSMessageInternal(message); } catch (Exception e) { - logger.error("An error occurred while trying to send message: " + e); + sendLogger.error("An error occurred while trying to send message:", e); if (this.errorChannel != null) { this.errorChannel.send(new ErrorMessage(e, message)); } @@ -95,18 +101,73 @@ protected void handleMessageInternal(Message message) { } } - protected void jmsHandleMessageInternal(Message message) { + protected void handleJMSMessageInternal(Message message) { if (message == null) { throw new IllegalArgumentException("message must not be null"); } - Object objectToSend = message.getPayload(); + Object objectToSend = this.serializeMessageIfRequired(message.getPayload()); + + Integer partitionToSend = getPartition(message); + HeaderMappingMessagePostProcessor messagePostProcessor = new HeaderMappingMessagePostProcessor( + message, + this.headerMapper, + partitionToSend, + mapHeaders + ); + messagePostProcessor.setDBVersion(this.dbversion); + + // try to read ConnectionCallback - if set + @SuppressWarnings("unchecked") + Consumer connCallback = (Consumer) message.getHeaders().get(TxEventQBinderHeaderConstants.CONNECTION_CONSUMER); + if (connCallback == null) { + this.jmsTemplate.convertAndSend( + destination, + objectToSend, + messagePostProcessor + ); + return; + } + Connection c = (Connection) message.getHeaders().get(TxEventQBinderHeaderConstants.MESSAGE_CONTEXT); + if (c == null) { + final Object actualPayload = objectToSend; + jmsTemplate.send(destination, session -> { + Connection sessionConnection = ((AQjmsSession) session).getDBConnection(); + connCallback.accept(sessionConnection); + MessageConverter msgConverter = this.jmsTemplate.getMessageConverter(); + if (msgConverter == null) { + throw new IllegalStateException("No 'messageConverter' specified. Check configuration of JmsTemplate."); + } + jakarta.jms.Message msg = msgConverter.toMessage(actualPayload, session); + return messagePostProcessor.postProcessMessage(msg); + }); + } else { + connCallback.accept(c); + // create topic connection and session using c + try (jakarta.jms.Connection conn = AQjmsTopicConnectionFactory.createTopicConnection(c); + Session s = conn.createSession(true, this.jmsTemplate.getSessionAcknowledgeMode()); + MessageProducer p = s.createProducer(destination)) { + MessageConverter msgConverter = this.jmsTemplate.getMessageConverter(); + if (msgConverter == null) { + throw new IllegalStateException("No 'messageConverter' specified. Check configuration of JmsTemplate."); + } + jakarta.jms.Message msg = msgConverter.toMessage(objectToSend, s); + jakarta.jms.Message msgToSend = messagePostProcessor.postProcessMessage(msg); + p.send(msgToSend); + s.commit(); + } catch (JMSException e) { + e.printStackTrace(); + } + } + } + + private Object serializeMessageIfRequired(Object objectToSend) { if (this.serializerClassName != null) { Class serializer = null; try { serializer = Class.forName(this.serializerClassName); } catch (ClassNotFoundException ce) { - logger.debug("The class name: " + serializerClassName + "is invalid."); + sendLogger.debug("The class name: {} is invalid.", serializerClassName); throw new IllegalArgumentException(ce.getMessage()); } @@ -119,44 +180,28 @@ protected void jmsHandleMessageInternal(Message message) { } if (!isInstanceOfSerializer) { - logger.debug("The configured serializer class is not an instance of 'com.oracle.database.spring.cloud.stream.binder.serialize.Serializer'"); - throw new IllegalArgumentException("The configured serializer class is not an instance of 'com.oracle.database.spring.cloud.stream.binder.serialize.Serializer'"); + sendLogger.debug("The configured serializer class is not an instance of 'com.oracle.cstream.serialize.Serializer'"); + throw new IllegalArgumentException("The configured serializer class is not an instance of 'com.oracle.cstream.serialize.Serializer'"); } Serializer s = null; try { s = (Serializer) (serializer.getDeclaredConstructor().newInstance()); } catch (Exception e) { - logger.debug("Serializer object could not be initiated."); + sendLogger.debug("Serializer object could not be initiated."); throw new IllegalArgumentException("Serializer object could not be initiated."); } objectToSend = s.serialize(objectToSend); } - - Integer partitionToSend = getPartition(message); - HeaderMappingMessagePostProcessor messagePostProcessor = new HeaderMappingMessagePostProcessor( - message, - this.headerMapper, - partitionToSend, - mapHeaders - ); - messagePostProcessor.setDBVersion(this.dbversion); - - this.jmsTemplate.convertAndSend( - destination, - objectToSend, - messagePostProcessor - ); - - + return objectToSend; } private Integer getPartition(Message message) { try { return (Integer) (message.getHeaders().get(BinderHeaders.PARTITION_HEADER)); } catch (Exception e) { - logger.info("Invalid Partition Index"); + sendLogger.info("Invalid Partition Index"); throw new IllegalArgumentException("The partition index cannot be converted to an integer"); } } @@ -201,11 +246,11 @@ public jakarta.jms.Message postProcessMessage( if (this.dbversion == 19) jmsMessage.setJMSCorrelationID("" + this.partition); else - jmsMessage.setLongProperty("AQINTERNAL_PARTITION", this.partition * 2); + jmsMessage.setLongProperty("AQINTERNAL_PARTITION", this.partition * 2L); } else { // choose 0 by default if (this.dbversion != 19) - jmsMessage.setLongProperty("AQINTERNAL_PARTITION", 0); + jmsMessage.setLongProperty("AQINTERNAL_PARTITION", 0L); } return jmsMessage; diff --git a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/RepublishMessageRecoverer.java b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/RepublishMessageRecoverer.java index 43059f65..69ba7f00 100644 --- a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/RepublishMessageRecoverer.java +++ b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/RepublishMessageRecoverer.java @@ -24,11 +24,10 @@ package com.oracle.database.spring.cloud.stream.binder.utils; -import jakarta.jms.JMSException; -import jakarta.jms.Message; - import java.util.Map; +import jakarta.jms.JMSException; +import jakarta.jms.Message; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.integration.jms.JmsHeaderMapper; diff --git a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/SpecCompliantJmsHeaderMapper.java b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/SpecCompliantJmsHeaderMapper.java index 960ef986..5e8f7aeb 100644 --- a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/SpecCompliantJmsHeaderMapper.java +++ b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/SpecCompliantJmsHeaderMapper.java @@ -1,6 +1,6 @@ /* ** TxEventQ Support for Spring Cloud Stream - ** Copyright (c) 2023, 2024 Oracle and/or its affiliates. + ** Copyright (c) 2023, 2025 Oracle and/or its affiliates. ** ** This file has been modified by Oracle Corporation. */ @@ -24,9 +24,8 @@ package com.oracle.database.spring.cloud.stream.binder.utils; -import jakarta.jms.Message; - import java.io.Serializable; +import java.sql.Connection; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -34,6 +33,7 @@ import java.util.Map; import java.util.UUID; +import jakarta.jms.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.integration.jms.DefaultJmsHeaderMapper; @@ -64,12 +64,18 @@ public List getDefaultToStringClasses() { return new ArrayList<>(SpecCompliantJmsHeaderMapper.DEFAULT_TO_STRING_CLASSES); } + private Connection conn; + + public void setConnection(Connection c) { + this.conn = c; + } + @Override public void fromHeaders(MessageHeaders headers, Message jmsMessage) { Map compliantHeaders = new HashMap<>(headers.size()); for (Map.Entry entry : headers.entrySet()) { if (entry.getKey().contains("-")) { - String key = entry.getKey().replaceAll("-", "_"); + String key = entry.getKey().replace("-", "_"); logger.trace("Rewriting header name '{}' to conform to JMS spec", key); compliantHeaders.put(key, entry.getValue()); } else { @@ -85,10 +91,10 @@ public void fromHeaders(MessageHeaders headers, Message jmsMessage) { compliantHeaders.put(entry.getKey(), value.toString()); } else if (!SUPPORTED_PROPERTY_TYPES.contains(value.getClass())) { if (value instanceof Serializable) { - logger.debug("Serializing {} header object", value); + logger.info("Serializing {} header object", value); compliantHeaders.put(entry.getKey(), SerializationUtils.serialize(value)); } else { - logger.debug("Storing String representation for header: {}", entry.getKey()); + logger.info("Storing String representation for header: {}", entry.getKey()); compliantHeaders.put(entry.getKey(), value.toString()); } } @@ -96,4 +102,11 @@ public void fromHeaders(MessageHeaders headers, Message jmsMessage) { super.fromHeaders(new MessageHeaders(compliantHeaders), jmsMessage); } + + @Override + public Map toHeaders(Message jmsMessage) { + Map headers = super.toHeaders(jmsMessage); + headers.put(TxEventQBinderHeaderConstants.MESSAGE_CONNECTION, this.conn); + return headers; + } } diff --git a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/TEQBatchMessageListener.java b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/TEQBatchMessageListener.java index fe19d069..79a6bac9 100644 --- a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/TEQBatchMessageListener.java +++ b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/TEQBatchMessageListener.java @@ -28,6 +28,10 @@ import java.util.List; import java.util.Map; +import com.oracle.database.spring.cloud.stream.binder.serialize.CustomSerializationMessageConverter; +import jakarta.jms.BytesMessage; +import jakarta.jms.JMSException; +import jakarta.jms.Session; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; import org.springframework.integration.core.MessagingTemplate; @@ -47,12 +51,6 @@ import org.springframework.retry.RecoveryCallback; import org.springframework.retry.support.RetryTemplate; -import com.oracle.database.spring.cloud.stream.binder.serialize.CustomSerializationMessageConverter; - -import jakarta.jms.BytesMessage; -import jakarta.jms.JMSException; -import jakarta.jms.Session; - public class TEQBatchMessageListener extends ChannelPublishingJmsMessageListener { private final GatewayDelegate gatewayDelegate = new GatewayDelegate(); @@ -74,7 +72,7 @@ public class TEQBatchMessageListener extends ChannelPublishingJmsMessageListener private RecoveryCallback recoverer; - private final String RETRY_CONTEXT_MESSAGE_ATTRIBUTE = "message"; + private String RETRY_CONTEXT_MESSAGE_ATTRIBUTE = "message"; private String deSerializerClassName = null; @@ -231,7 +229,8 @@ public void onMessage(List jmsMessages, Session session) th protected void resetMessageIfRequired(jakarta.jms.Message jmsMessage) throws JMSException { - if (jmsMessage instanceof BytesMessage message) { + if (jmsMessage instanceof BytesMessage) { + BytesMessage message = (BytesMessage) jmsMessage; message.reset(); } } @@ -253,7 +252,7 @@ public void onMessageHelper(List jmsMessages, Session sessi } } else { for (jakarta.jms.Message jmsMessage : jmsMessages) - result.add(jmsMessage); + result.add((Object) jmsMessage); } requestMessage = this.messageBuilderFactory diff --git a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/TEQBatchMessageListenerContainer.java b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/TEQBatchMessageListenerContainer.java index c7defd4f..acfffc3f 100644 --- a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/TEQBatchMessageListenerContainer.java +++ b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/TEQBatchMessageListenerContainer.java @@ -25,9 +25,15 @@ package com.oracle.database.spring.cloud.stream.binder.utils; import java.util.ArrayList; -import java.util.Collections; import java.util.List; +import jakarta.jms.Connection; +import jakarta.jms.Destination; +import jakarta.jms.JMSException; +import jakarta.jms.Message; +import jakarta.jms.MessageConsumer; +import jakarta.jms.Session; +import oracle.jakarta.jms.AQjmsConsumer; import org.springframework.jms.connection.ConnectionFactoryUtils; import org.springframework.jms.connection.JmsResourceHolder; import org.springframework.jms.connection.SingleConnectionFactory; @@ -37,15 +43,6 @@ import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.support.TransactionSynchronizationManager; - -import jakarta.jms.Connection; -import jakarta.jms.Destination; -import jakarta.jms.JMSException; -import jakarta.jms.Message; -import jakarta.jms.MessageConsumer; -import jakarta.jms.Session; -import oracle.jakarta.jms.AQjmsConsumer; - public class TEQBatchMessageListenerContainer extends DefaultMessageListenerContainer { private final MessageListenerContainerResourceFactory transactionalResourceFactory = new MessageListenerContainerResourceFactory(); @@ -102,15 +99,21 @@ protected List receiveBatch(MessageConsumer consumer, long timeout) thr if (timeout > 0) { Message[] messages = ((AQjmsConsumer) consumer).bulkReceive(this.batchSize, timeout); if (messages == null) return null; - Collections.addAll(msgs, messages); + for (Message msg : messages) { + msgs.add(msg); + } } else if (timeout < 0) { Message[] messages = ((AQjmsConsumer) consumer).bulkReceiveNoWait(this.batchSize); if (messages == null) return null; - Collections.addAll(msgs, messages); + for (Message msg : messages) { + msgs.add(msg); + } } else { Message[] messages = ((AQjmsConsumer) consumer).bulkReceive(this.batchSize); if (messages == null) return null; - Collections.addAll(msgs, messages); + for (Message msg : messages) { + msgs.add(msg); + } } return msgs; } @@ -264,7 +267,7 @@ protected void invokeListener(Session session, List messages) throws JM Object listener = getMessageListener(); if (listener instanceof TEQBatchMessageListener teqBatchListener) { - this.doInvokeListener(teqBatchListener, session, messages); + this.doInvokeListener((TEQBatchMessageListener) teqBatchListener, session, messages); } else if (listener != null) { throw new IllegalArgumentException( "Only TEQBatchMessageListener supported: " + listener); diff --git a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/TEQMessageListenerContainer.java b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/TEQMessageListenerContainer.java index 40464fe3..f183539c 100644 --- a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/TEQMessageListenerContainer.java +++ b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/TEQMessageListenerContainer.java @@ -24,13 +24,12 @@ package com.oracle.database.spring.cloud.stream.binder.utils; -import org.springframework.jms.listener.DefaultMessageListenerContainer; -import oracle.jakarta.jms.AQjmsConsumer; - import jakarta.jms.Destination; import jakarta.jms.JMSException; import jakarta.jms.MessageConsumer; import jakarta.jms.Session; +import oracle.jakarta.jms.AQjmsConsumer; +import org.springframework.jms.listener.DefaultMessageListenerContainer; public class TEQMessageListenerContainer extends DefaultMessageListenerContainer { diff --git a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/TxEventQBinderHeaderConstants.java b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/TxEventQBinderHeaderConstants.java new file mode 100644 index 00000000..442a36b9 --- /dev/null +++ b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/TxEventQBinderHeaderConstants.java @@ -0,0 +1,10 @@ +package com.oracle.database.spring.cloud.stream.binder.utils; + +public class TxEventQBinderHeaderConstants { + public static final String MESSAGE_CONNECTION = "oracle.jdbc.internal.connection"; + public static final String CONNECTION_CONSUMER = "oracle.jdbc.internal.callback"; + public static final String MESSAGE_CONTEXT = "oracle.jdbc.internal.message_context"; + + private TxEventQBinderHeaderConstants() { + } +} diff --git a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/TxEventQMessageBuilder.java b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/TxEventQMessageBuilder.java new file mode 100644 index 00000000..5494d8a6 --- /dev/null +++ b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/TxEventQMessageBuilder.java @@ -0,0 +1,256 @@ +package com.oracle.database.spring.cloud.stream.binder.utils; + +import java.sql.Connection; +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.integration.IntegrationMessageHeaderAccessor; +import org.springframework.integration.support.AbstractIntegrationMessageBuilder; +import org.springframework.lang.Nullable; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.support.ErrorMessage; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.util.Assert; +import org.springframework.util.ObjectUtils; + +public final class TxEventQMessageBuilder extends AbstractIntegrationMessageBuilder { + + private static final Log LOGGER = LogFactory.getLog(TxEventQMessageBuilder.class); + + private final T payload; + + private final IntegrationMessageHeaderAccessor headerAccessor; + + @Nullable + private final Message originalMessage; + + private volatile boolean modified; + + private String[] readOnlyHeaders; + + private TxEventQMessageBuilder(T payload, @Nullable Message originalMessage) { + Assert.notNull(payload, "payload must not be null"); + this.payload = payload; + this.originalMessage = originalMessage; + this.headerAccessor = new IntegrationMessageHeaderAccessor(originalMessage); + if (originalMessage != null) { + this.modified = (!this.payload.equals(originalMessage.getPayload())); + } + } + + @Override + public T getPayload() { + return this.payload; + } + + @Override + public Map getHeaders() { + return this.headerAccessor.toMap(); + } + + @Nullable + @Override + public V getHeader(String key, Class type) { + return this.headerAccessor.getHeader(key, type); + } + + public static TxEventQMessageBuilder fromMessage(Message message) { + Assert.notNull(message, "message must not be null"); + return new TxEventQMessageBuilder<>(message.getPayload(), message); + } + + public static TxEventQMessageBuilder withPayload(T payload) { + return new TxEventQMessageBuilder<>(payload, null); + } + + @Override + public TxEventQMessageBuilder setHeader(String headerName, @Nullable Object headerValue) { + this.headerAccessor.setHeader(headerName, headerValue); + return this; + } + + @Override + public TxEventQMessageBuilder setHeaderIfAbsent(String headerName, Object headerValue) { + this.headerAccessor.setHeaderIfAbsent(headerName, headerValue); + return this; + } + + @Override + public TxEventQMessageBuilder removeHeaders(String... headerPatterns) { + this.headerAccessor.removeHeaders(headerPatterns); + return this; + } + + @Override + public TxEventQMessageBuilder removeHeader(String headerName) { + if (!this.headerAccessor.isReadOnly(headerName)) { + this.headerAccessor.removeHeader(headerName); + } else if (LOGGER.isInfoEnabled()) { + LOGGER.info("The header [" + headerName + "] is ignored for removal because it is is readOnly."); + } + return this; + } + + @Override + public TxEventQMessageBuilder copyHeaders(@Nullable Map headersToCopy) { + this.headerAccessor.copyHeaders(headersToCopy); + return this; + } + + @Override + public TxEventQMessageBuilder copyHeadersIfAbsent(@Nullable Map headersToCopy) { + if (headersToCopy != null) { + for (Map.Entry entry : headersToCopy.entrySet()) { + String headerName = entry.getKey(); + if (!this.headerAccessor.isReadOnly(headerName)) { + this.headerAccessor.setHeaderIfAbsent(headerName, entry.getValue()); + } + } + } + return this; + } + + @SuppressWarnings("unchecked") + @Override + @Nullable + protected List> getSequenceDetails() { + return (List>) this.headerAccessor.getHeader(IntegrationMessageHeaderAccessor.SEQUENCE_DETAILS); + } + + @Override + @Nullable + protected Object getCorrelationId() { + return this.headerAccessor.getCorrelationId(); + } + + @Override + protected Object getSequenceNumber() { + return this.headerAccessor.getSequenceNumber(); + } + + @Override + protected Object getSequenceSize() { + return this.headerAccessor.getSequenceSize(); + } + + @Override + public TxEventQMessageBuilder pushSequenceDetails(Object correlationId, int sequenceNumber, int sequenceSize) { + super.pushSequenceDetails(correlationId, sequenceNumber, sequenceSize); + return this; + } + + @Override + public TxEventQMessageBuilder popSequenceDetails() { + super.popSequenceDetails(); + return this; + } + + @Override + public TxEventQMessageBuilder setExpirationDate(@Nullable Long expirationDate) { + super.setExpirationDate(expirationDate); + return this; + } + + @Override + public TxEventQMessageBuilder setExpirationDate(@Nullable Date expirationDate) { + super.setExpirationDate(expirationDate); + return this; + } + + @Override + public TxEventQMessageBuilder setCorrelationId(Object correlationId) { + super.setCorrelationId(correlationId); + return this; + } + + @Override + public TxEventQMessageBuilder setReplyChannel(MessageChannel replyChannel) { + super.setReplyChannel(replyChannel); + return this; + } + + @Override + public TxEventQMessageBuilder setReplyChannelName(String replyChannelName) { + super.setReplyChannelName(replyChannelName); + return this; + } + + @Override + public TxEventQMessageBuilder setErrorChannel(MessageChannel errorChannel) { + super.setErrorChannel(errorChannel); + return this; + } + + @Override + public TxEventQMessageBuilder setErrorChannelName(String errorChannelName) { + super.setErrorChannelName(errorChannelName); + return this; + } + + @Override + public TxEventQMessageBuilder setSequenceNumber(Integer sequenceNumber) { + super.setSequenceNumber(sequenceNumber); + return this; + } + + @Override + public TxEventQMessageBuilder setSequenceSize(Integer sequenceSize) { + super.setSequenceSize(sequenceSize); + return this; + } + + @Override + public TxEventQMessageBuilder setPriority(Integer priority) { + super.setPriority(priority); + return this; + } + + public TxEventQMessageBuilder setConnectionCallback(Consumer callback) { + this.setHeader(TxEventQBinderHeaderConstants.CONNECTION_CONSUMER, callback); + return this; + } + + public TxEventQMessageBuilder setConnectionCallbackContext(Consumer callback, Message message) { + this.setHeader(TxEventQBinderHeaderConstants.CONNECTION_CONSUMER, callback); + this.setHeader(TxEventQBinderHeaderConstants.MESSAGE_CONTEXT, TxEventQUtils.getDBConnection(message)); + return this; + } + + public TxEventQMessageBuilder readOnlyHeaders(String... readOnlyHeaders) { + this.readOnlyHeaders = readOnlyHeaders != null ? Arrays.copyOf(readOnlyHeaders, readOnlyHeaders.length) : null; + this.headerAccessor.setReadOnlyHeaders(readOnlyHeaders); + return this; + } + + @Override + @SuppressWarnings("unchecked") + public Message build() { + if (!this.modified && !this.headerAccessor.isModified() && this.originalMessage != null + && !containsReadOnly(this.originalMessage.getHeaders())) { + return this.originalMessage; + } + if (this.payload instanceof Throwable) { + return (Message) new ErrorMessage((Throwable) this.payload, this.headerAccessor.toMap()); + } + return new GenericMessage<>(this.payload, this.headerAccessor.toMap()); + } + + private boolean containsReadOnly(MessageHeaders headers) { + if (!ObjectUtils.isEmpty(this.readOnlyHeaders)) { + for (String readOnly : this.readOnlyHeaders) { + if (headers.containsKey(readOnly)) { + return true; + } + } + } + return false; + } + +} diff --git a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/TxEventQUtils.java b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/TxEventQUtils.java new file mode 100644 index 00000000..37c353af --- /dev/null +++ b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/TxEventQUtils.java @@ -0,0 +1,35 @@ +package com.oracle.database.spring.cloud.stream.binder.utils; + +import java.sql.Connection; +import java.util.function.Consumer; + +import org.springframework.messaging.Message; + +public class TxEventQUtils { + /* Private constructor to avoid creating object of class TxEventQUtils */ + private TxEventQUtils() { + } + + /* Static Utility Methods */ + public static Connection getDBConnection(Message message) { + return (Connection) message + .getHeaders() + .getOrDefault(TxEventQBinderHeaderConstants.MESSAGE_CONNECTION, null); + } + + public static Message setConnectionCallbackContext(Message message, + Consumer callback, + Message oldMessage) { + return TxEventQMessageBuilder + .fromMessage(message) + .setConnectionCallbackContext(callback, oldMessage) + .build(); + } + + public static Message setConnectionCallback(Message message, Consumer callback) { + return TxEventQMessageBuilder + .fromMessage(message) + .setConnectionCallback(callback) + .build(); + } +} diff --git a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/resources/META-INF/spring.binders b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/resources/META-INF/spring.binders index a3d0b78e..7211a669 100644 --- a/database/spring-cloud-stream-binder-oracle-txeventq/src/main/resources/META-INF/spring.binders +++ b/database/spring-cloud-stream-binder-oracle-txeventq/src/main/resources/META-INF/spring.binders @@ -1,2 +1,2 @@ txeventqjms=\ -com.oracle.database.spring.cloud.stream.binder.config.TxEventQJmsConfiguration \ No newline at end of file +com.oracle.database.spring.cloud.stream.binder.config.TxEventQJmsConfiguration diff --git a/database/spring-cloud-stream-binder-oracle-txeventq/src/test/java/com/oracle/database/spring/cloud/stream/binder/Util.java b/database/spring-cloud-stream-binder-oracle-txeventq/src/test/java/com/oracle/database/spring/cloud/stream/binder/Util.java index 1100b437..1b6b2c7e 100644 --- a/database/spring-cloud-stream-binder-oracle-txeventq/src/test/java/com/oracle/database/spring/cloud/stream/binder/Util.java +++ b/database/spring-cloud-stream-binder-oracle-txeventq/src/test/java/com/oracle/database/spring/cloud/stream/binder/Util.java @@ -11,7 +11,7 @@ public class Util { public static OracleContainer oracleContainer() { - return new OracleContainer("gvenzl/oracle-free:23.6-slim-faststart") + return new OracleContainer("gvenzl/oracle-free:23.7-slim-faststart") .withStartupTimeout(Duration.ofMinutes(2)) // Needed for M1 Mac .withUsername("testuser") .withPassword("testpwd"); diff --git a/database/starters/oracle-spring-boot-starter-json-collections/src/test/java/com/oracle/spring/json/JsonCollectionsIT.java b/database/starters/oracle-spring-boot-starter-json-collections/src/test/java/com/oracle/spring/json/JsonCollectionsIT.java index b085973d..6afacdbc 100644 --- a/database/starters/oracle-spring-boot-starter-json-collections/src/test/java/com/oracle/spring/json/JsonCollectionsIT.java +++ b/database/starters/oracle-spring-boot-starter-json-collections/src/test/java/com/oracle/spring/json/JsonCollectionsIT.java @@ -29,7 +29,7 @@ @SpringBootTest(classes = JsonCollectionsAutoConfiguration.class) @Testcontainers public class JsonCollectionsIT { - static OracleContainer oracleContainer = new OracleContainer("gvenzl/oracle-free:23.6-slim-faststart") + static OracleContainer oracleContainer = new OracleContainer("gvenzl/oracle-free:23.7-slim-faststart") .withStartupTimeout(Duration.ofMinutes(2)) .withUsername("testuser") .withPassword("testpwd"); diff --git a/database/starters/oracle-spring-boot-starter-samples/oracle-spring-boot-sample-json-duality/src/test/java/com/oracle/database/spring/jsonduality/JSONDualitySampleApplicationTest.java b/database/starters/oracle-spring-boot-starter-samples/oracle-spring-boot-sample-json-duality/src/test/java/com/oracle/database/spring/jsonduality/JSONDualitySampleApplicationTest.java index 0ea05903..0f266fe3 100644 --- a/database/starters/oracle-spring-boot-starter-samples/oracle-spring-boot-sample-json-duality/src/test/java/com/oracle/database/spring/jsonduality/JSONDualitySampleApplicationTest.java +++ b/database/starters/oracle-spring-boot-starter-samples/oracle-spring-boot-sample-json-duality/src/test/java/com/oracle/database/spring/jsonduality/JSONDualitySampleApplicationTest.java @@ -27,7 +27,7 @@ public class JSONDualitySampleApplicationTest { * The Testcontainers Oracle Free module let's us create an Oracle database container in a junit context. */ @Container - static OracleContainer oracleContainer = new OracleContainer("gvenzl/oracle-free:23.6-slim-faststart") + static OracleContainer oracleContainer = new OracleContainer("gvenzl/oracle-free:23.7-slim-faststart") .withStartupTimeout(Duration.ofMinutes(2)) .withUsername("testuser") .withPassword("testpwd"); diff --git a/database/starters/oracle-spring-boot-starter-samples/oracle-spring-boot-sample-json-events/src/test/java/com/oracle/database/spring/jsonevents/JSONEventsSampleTest.java b/database/starters/oracle-spring-boot-starter-samples/oracle-spring-boot-sample-json-events/src/test/java/com/oracle/database/spring/jsonevents/JSONEventsSampleTest.java index 6124f7f6..1806e93b 100644 --- a/database/starters/oracle-spring-boot-starter-samples/oracle-spring-boot-sample-json-events/src/test/java/com/oracle/database/spring/jsonevents/JSONEventsSampleTest.java +++ b/database/starters/oracle-spring-boot-starter-samples/oracle-spring-boot-sample-json-events/src/test/java/com/oracle/database/spring/jsonevents/JSONEventsSampleTest.java @@ -33,7 +33,7 @@ public class JSONEventsSampleTest { * The Testcontainers Oracle Free module let's us create an Oracle database container in a junit context. */ @Container - static OracleContainer oracleContainer = new OracleContainer("gvenzl/oracle-free:23.6-slim-faststart") + static OracleContainer oracleContainer = new OracleContainer("gvenzl/oracle-free:23.7-slim-faststart") .withStartupTimeout(Duration.ofMinutes(2)) .withUsername("testuser") .withPassword("testpwd"); diff --git a/database/starters/oracle-spring-boot-starter-samples/oracle-spring-boot-sample-okafka/src/test/java/com/oracle/database/spring/okafka/OKafkaSampleTest.java b/database/starters/oracle-spring-boot-starter-samples/oracle-spring-boot-sample-okafka/src/test/java/com/oracle/database/spring/okafka/OKafkaSampleTest.java index 3667c0e1..73bdcc52 100644 --- a/database/starters/oracle-spring-boot-starter-samples/oracle-spring-boot-sample-okafka/src/test/java/com/oracle/database/spring/okafka/OKafkaSampleTest.java +++ b/database/starters/oracle-spring-boot-starter-samples/oracle-spring-boot-sample-okafka/src/test/java/com/oracle/database/spring/okafka/OKafkaSampleTest.java @@ -30,7 +30,7 @@ @Testcontainers public class OKafkaSampleTest { // Oracle Database 23ai Free container image - private static final String oracleImage = "gvenzl/oracle-free:23.6-slim-faststart"; + private static final String oracleImage = "gvenzl/oracle-free:23.7-slim-faststart"; private static final String testUser = "testuser"; private static final String testPassword = "Welcome123#"; diff --git a/database/starters/oracle-spring-boot-starter-samples/oracle-spring-boot-sample-ucp-jpa/src/test/java/com/oracle/database/spring/sample/UCPSampleApplicationTest.java b/database/starters/oracle-spring-boot-starter-samples/oracle-spring-boot-sample-ucp-jpa/src/test/java/com/oracle/database/spring/sample/UCPSampleApplicationTest.java index 6ba9d874..b19a9609 100644 --- a/database/starters/oracle-spring-boot-starter-samples/oracle-spring-boot-sample-ucp-jpa/src/test/java/com/oracle/database/spring/sample/UCPSampleApplicationTest.java +++ b/database/starters/oracle-spring-boot-starter-samples/oracle-spring-boot-sample-ucp-jpa/src/test/java/com/oracle/database/spring/sample/UCPSampleApplicationTest.java @@ -28,7 +28,7 @@ public class UCPSampleApplicationTest { * The Testcontainers Oracle Free module let's us create an Oracle database container in a junit context. */ @Container - static OracleContainer oracleContainer = new OracleContainer("gvenzl/oracle-free:23.6-slim-faststart") + static OracleContainer oracleContainer = new OracleContainer("gvenzl/oracle-free:23.7-slim-faststart") .withStartupTimeout(Duration.ofMinutes(2)) .withUsername("testuser") .withPassword("testpwd"); diff --git a/database/starters/oracle-spring-boot-starter-ucp/pom.xml b/database/starters/oracle-spring-boot-starter-ucp/pom.xml index 345ba989..da39711f 100644 --- a/database/starters/oracle-spring-boot-starter-ucp/pom.xml +++ b/database/starters/oracle-spring-boot-starter-ucp/pom.xml @@ -62,6 +62,10 @@ ojdbc11 compile + + com.oracle.database.ha + ons + com.oracle.database.jdbc ucp diff --git a/database/starters/pom.xml b/database/starters/pom.xml index 0d7e0911..d8334358 100644 --- a/database/starters/pom.xml +++ b/database/starters/pom.xml @@ -161,6 +161,11 @@ ojdbc11 ${oracle.version} + + com.oracle.database.ha + ons + ${oracle.version} + com.oracle.database.jdbc ucp