@@ -3,8 +3,8 @@ package alpakka.jms
33import com .typesafe .config .Config
44import org .apache .activemq .ActiveMQConnectionFactory
55import org .apache .pekko .actor .ActorSystem
6- import org .apache .pekko .stream ._
7- import org .apache .pekko .stream .connectors .jms ._
6+ import org .apache .pekko .stream .*
7+ import org .apache .pekko .stream .connectors .jms .*
88import org .apache .pekko .stream .connectors .jms .scaladsl .{JmsConsumer , JmsConsumerControl , JmsProducer }
99import org .apache .pekko .stream .scaladsl .{Keep , Sink , Source }
1010import org .apache .pekko .{Done , NotUsed }
@@ -13,21 +13,21 @@ import org.slf4j.{Logger, LoggerFactory}
1313import java .util .concurrent .ThreadLocalRandom
1414import javax .jms .{ConnectionFactory , Message , TextMessage }
1515import scala .collection .immutable
16- import scala .concurrent .duration ._
16+ import scala .concurrent .duration .*
1717import scala .concurrent .{Await , Future }
1818import scala .util .control .NonFatal
1919import scala .util .{Failure , Success }
2020
2121/**
2222 * An Alpakka JMS client which consumes text messages from either:
2323 * - Preferred: Artemis JMS Broker on docker image, started from /docker/docker-compose.yml
24- * - Preferred: Embedded Artemis JMS Broker [[JMSServerArtemis ]], started from IDE
24+ * - Experimental: Embedded Artemis JMS Broker [[alpakka.env. JMSServerArtemis ]], started from IDE
2525 * - Experimental: Embedded ActiveMQ JMS Broker [[alpakka.env.jms.JMSServerActiveMQ ]], started from IDE
2626 *
2727 * Generate text messages with [[JMSTextMessageProducerClient ]]
2828 *
2929 * Features:
30- * - non deliverable messages are acknowledged and written to an error queue (so that processing resumes)
30+ * - non deliverable messages are acknowledged and written to [[ ProcessingApp.errorQueue ]] (so that processing resumes)
3131 * - Failures in this client may be simulated by throwing random java.lang.RuntimeException: BOOM
3232 * see [[ProcessingApp.simulateFaultyDeliveryToExternalSystem ]]
3333 * - for an example of ConnectionRetrySettings/SendRetrySettings see [[JMSTextMessageProducerClient ]]
@@ -68,8 +68,8 @@ object ProcessingApp {
6868 // Seems to work together with the new connection and send retry settings on the connector
6969 val connectionFactory : ConnectionFactory = new ActiveMQConnectionFactory (" artemis" , " artemis" , " failover:tcp://127.0.0.1:21616" )
7070
71- val consumerConfig : Config = system.settings.config.getConfig(JmsConsumerSettings .configPath)
72- val jmsConsumerSource : Source [AckEnvelope , JmsConsumerControl ] = JmsConsumer .ackSource(
71+ private val consumerConfig : Config = system.settings.config.getConfig(JmsConsumerSettings .configPath)
72+ private val jmsConsumerSource : Source [AckEnvelope , JmsConsumerControl ] = JmsConsumer .ackSource(
7373 JmsConsumerSettings (consumerConfig, connectionFactory)
7474 .withQueue(" test-queue" )
7575 .withSessionCount(5 )
@@ -81,9 +81,9 @@ object ProcessingApp {
8181 .withAcknowledgeMode(AcknowledgeMode .ClientAcknowledge ) // Default
8282 )
8383
84- val jmsErrorQueueSettings : JmsProducerSettings = JmsProducerSettings .create(system, connectionFactory).withQueue(" test-queue-error" )
85- val errorQueueSink : Sink [JmsTextMessage , Future [Done ]] = JmsProducer .sink(jmsErrorQueueSettings)
86- val errorQueue = Source
84+ private val jmsErrorQueueSettings : JmsProducerSettings = JmsProducerSettings .create(system, connectionFactory).withQueue(" test-queue-error" )
85+ private val errorQueueSink : Sink [JmsTextMessage , Future [Done ]] = JmsProducer .sink(jmsErrorQueueSettings)
86+ private val errorQueue = Source
8787 .queue[JmsTextMessage ](100 , OverflowStrategy .backpressure, 10 )
8888 .toMat(errorQueueSink)(Keep .left)
8989 .run()
@@ -133,7 +133,7 @@ object ProcessingApp {
133133 }
134134 }
135135
136- private def pendingMessageWatcher (jmsConsumerControl : JmsConsumerControl ) = {
136+ private def pendingMessageWatcher (jmsConsumerControl : JmsConsumerControl ): Unit = {
137137 val queue = jmsConsumerControl.connectorState.toMat(Sink .queue())(Keep .right).run()
138138
139139 val browseSource : Source [Message , NotUsed ] = JmsConsumer .browse(
@@ -156,7 +156,7 @@ object ProcessingApp {
156156 }
157157
158158
159- def logWhen (done : Future [Done ]) = {
159+ def logWhen (done : Future [Done ]): Unit = {
160160 done.onComplete {
161161 case Success (_) =>
162162 logger.info(" Message successfully written to error queue" )
0 commit comments