4343import java .util .concurrent .atomic .AtomicLongFieldUpdater ;
4444import java .util .concurrent .atomic .AtomicReferenceFieldUpdater ;
4545import java .util .concurrent .locks .ReentrantLock ;
46+ import java .util .function .Predicate ;
4647
4748import org .apache .activemq .artemis .api .config .ActiveMQDefaultConfiguration ;
4849import org .apache .activemq .artemis .api .core .ActiveMQException ;
@@ -2015,6 +2016,11 @@ private int iterQueue(final int flushLimit,
20152016 QueueIterateAction messageAction ) throws Exception {
20162017 int count = 0 ;
20172018 int txCount = 0 ;
2019+
2020+ if (filter1 != null ) {
2021+ messageAction .addFilter (filter1 );
2022+ }
2023+
20182024 // This is to avoid scheduling depaging while iterQueue is happening
20192025 // this should minimize the use of the paged executor.
20202026 depagePending = true ;
@@ -2033,10 +2039,12 @@ private int iterQueue(final int flushLimit,
20332039 while (iter .hasNext () && !messageAction .expectedHitsReached (count )) {
20342040 MessageReference ref = iter .next ();
20352041
2036- if (filter1 == null || filter1 .match (ref . getMessage () )) {
2042+ if (messageAction .match (ref )) {
20372043 if (messageAction .actMessage (tx , ref )) {
20382044 iter .remove ();
2039- refRemoved (ref );
2045+ if (!isLastValue ()) {
2046+ refRemoved (ref );
2047+ }
20402048 }
20412049 txCount ++;
20422050 count ++;
@@ -2055,7 +2063,7 @@ private int iterQueue(final int flushLimit,
20552063 return count ;
20562064 }
20572065
2058- List <MessageReference > cancelled = scheduledDeliveryHandler .cancel (ref -> filter1 == null ? true : filter1 . match ( ref . getMessage ()) );
2066+ List <MessageReference > cancelled = scheduledDeliveryHandler .cancel (messageAction :: match );
20592067 for (MessageReference messageReference : cancelled ) {
20602068 messageAction .actMessage (tx , messageReference );
20612069 count ++;
@@ -2078,12 +2086,12 @@ private int iterQueue(final int flushLimit,
20782086 PagedReference reference = pageIterator .next ();
20792087 pageIterator .remove ();
20802088
2081- if (filter1 == null || filter1 .match (reference .getMessage ())) {
2082- count ++;
2083- txCount ++;
2089+ if (messageAction .match (reference )) {
20842090 if (!messageAction .actMessage (tx , reference )) {
20852091 addTail (reference , false );
20862092 }
2093+ txCount ++;
2094+ count ++;
20872095 } else {
20882096 addTail (reference , false );
20892097 }
@@ -2401,71 +2409,48 @@ public void run() {
24012409 }
24022410
24032411 @ Override
2404- public synchronized boolean sendMessageToDeadLetterAddress (final long messageID ) throws Exception {
2405- try (LinkedListIterator <MessageReference > iter = iterator ()) {
2406- while (iter .hasNext ()) {
2407- MessageReference ref = iter .next ();
2408- if (ref .getMessage ().getMessageID () == messageID ) {
2409- incDelivering (ref );
2410- sendToDeadLetterAddress (null , ref );
2411- iter .remove ();
2412- refRemoved (ref );
2413- return true ;
2414- }
2415- }
2416- if (pageIterator != null && !queueDestroyed ) {
2417- while (pageIterator .hasNext ()) {
2418- PagedReference ref = pageIterator .next ();
2419- if (ref .getMessage ().getMessageID () == messageID ) {
2420- incDelivering (ref );
2421- sendToDeadLetterAddress (null , ref );
2422- pageIterator .remove ();
2423- refRemoved (ref );
2424- return true ;
2425- }
2426- }
2412+ public boolean sendMessageToDeadLetterAddress (final long messageID ) throws Exception {
2413+
2414+ return iterQueue (DEFAULT_FLUSH_LIMIT , null , new QueueIterateAction (messageID ) {
2415+
2416+ @ Override
2417+ public boolean actMessage (Transaction tx , MessageReference ref ) throws Exception {
2418+ incDelivering (ref );
2419+ sendToDeadLetterAddress (tx , ref );
2420+ return true ;
24272421 }
2428- return false ;
2429- }
2422+ }) == 1 ;
24302423 }
24312424
24322425 @ Override
2433- public synchronized int sendMessagesToDeadLetterAddress (Filter filter ) throws Exception {
2434-
2426+ public int sendMessagesToDeadLetterAddress (Filter filter ) throws Exception {
24352427 return iterQueue (DEFAULT_FLUSH_LIMIT , filter , new QueueIterateAction () {
24362428
24372429 @ Override
24382430 public boolean actMessage (Transaction tx , MessageReference ref ) throws Exception {
2439-
24402431 incDelivering (ref );
2441- return sendToDeadLetterAddress (tx , ref );
2432+ sendToDeadLetterAddress (tx , ref );
2433+ return true ;
24422434 }
24432435 });
24442436 }
24452437
24462438 @ Override
2447- public synchronized boolean moveReference (final long messageID ,
2439+ public boolean moveReference (final long messageID ,
24482440 final SimpleString toAddress ,
24492441 final Binding binding ,
24502442 final boolean rejectDuplicate ) throws Exception {
2451- try (LinkedListIterator <MessageReference > iter = iterator ()) {
2452- while (iter .hasNext ()) {
2453- MessageReference ref = iter .next ();
2454- if (ref .getMessage ().getMessageID () == messageID ) {
2455- iter .remove ();
2456- refRemoved (ref );
2457- incDelivering (ref );
2458- try {
2459- move (null , toAddress , binding , ref , rejectDuplicate , AckReason .NORMAL , null , null , true );
2460- } catch (Exception e ) {
2461- decDelivering (ref );
2462- throw e ;
2463- }
2464- return true ;
2465- }
2443+
2444+ return iterQueue (DEFAULT_FLUSH_LIMIT , null , new QueueIterateAction (messageID ) {
2445+
2446+ @ Override
2447+ public boolean actMessage (Transaction tx , MessageReference ref ) throws Exception {
2448+ incDelivering (ref );
2449+ move (tx , toAddress , binding , ref , rejectDuplicate , AckReason .NORMAL , null , null , true );
2450+ return true ;
24662451 }
2467- return false ;
2468- }
2452+
2453+ }) == 1 ;
24692454 }
24702455
24712456 @ Override
@@ -2511,7 +2496,7 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
25112496 }
25122497
25132498 if (!ignored ) {
2514- move (null , toAddress , binding , ref , rejectDuplicates , AckReason .NORMAL , null , null , true );
2499+ move (tx , toAddress , binding , ref , rejectDuplicates , AckReason .NORMAL , null , null , true );
25152500 }
25162501
25172502 return true ;
@@ -2529,26 +2514,22 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
25292514 }
25302515
25312516 @ Override
2532- public synchronized boolean copyReference (final long messageID ,
2517+ public boolean copyReference (final long messageID ,
25332518 final SimpleString toQueue ,
25342519 final Binding binding ) throws Exception {
2535- try (LinkedListIterator <MessageReference > iter = iterator ()) {
2536- while (iter .hasNext ()) {
2537- MessageReference ref = iter .next ();
2538- if (ref .getMessage ().getMessageID () == messageID ) {
2539- try {
2540- copy (null , toQueue , binding , ref );
2541- } catch (Exception e ) {
2542- throw e ;
2543- }
2544- return true ;
2545- }
2520+
2521+ return iterQueue (DEFAULT_FLUSH_LIMIT , null , new QueueIterateAction (messageID ) {
2522+
2523+ @ Override
2524+ public boolean actMessage (Transaction tx , MessageReference ref ) throws Exception {
2525+ copy (tx , toQueue , binding , ref );
2526+ addTail (ref , false );
2527+ return true ;
25462528 }
2547- return false ;
2548- }
2529+ }) == 1 ;
25492530 }
25502531
2551- public synchronized int rerouteMessages (final SimpleString queueName , final Filter filter ) throws Exception {
2532+ public int rerouteMessages (final SimpleString queueName , final Filter filter ) throws Exception {
25522533 return iterQueue (DEFAULT_FLUSH_LIMIT , filter , new QueueIterateAction () {
25532534 @ Override
25542535 public boolean actMessage (Transaction tx , MessageReference ref ) throws Exception {
@@ -2617,40 +2598,35 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
26172598 }
26182599
26192600 @ Override
2620- public synchronized boolean changeReferencePriority (final long messageID , final byte newPriority ) throws Exception {
2621- try ( LinkedListIterator < MessageReference > iter = iterator () ) {
2601+ public boolean changeReferencePriority (final long messageID , final byte newPriority ) throws Exception {
2602+ return iterQueue ( DEFAULT_FLUSH_LIMIT , null , new QueueIterateAction ( messageID ) {
26222603
2623- while ( iter . hasNext ()) {
2624- MessageReference ref = iter . next ();
2625- if ( ref .getMessage ().getMessageID () == messageID ) {
2626- iter . remove ();
2604+ @ Override
2605+ public boolean actMessage ( Transaction tx , MessageReference ref ) throws Exception {
2606+ ref .getMessage ().setPriority ( newPriority );
2607+ if ( isLastValue ()) {
26272608 refRemoved (ref );
2628- ref .getMessage ().setPriority (newPriority );
2629- addTail (ref , false );
2630- return true ;
26312609 }
2610+ addTail (ref , false );
2611+ return true ;
26322612 }
2633-
2634- return false ;
2635- }
2613+ }) == 1 ;
26362614 }
26372615
26382616 @ Override
2639- public synchronized int changeReferencesPriority (final Filter filter , final byte newPriority ) throws Exception {
2640- try (LinkedListIterator <MessageReference > iter = iterator ()) {
2641- int count = 0 ;
2642- while (iter .hasNext ()) {
2643- MessageReference ref = iter .next ();
2644- if (filter == null || filter .match (ref .getMessage ())) {
2645- count ++;
2646- iter .remove ();
2617+ public int changeReferencesPriority (final Filter filter , final byte newPriority ) throws Exception {
2618+ return iterQueue (DEFAULT_FLUSH_LIMIT , filter , new QueueIterateAction () {
2619+
2620+ @ Override
2621+ public boolean actMessage (Transaction tx , MessageReference ref ) throws Exception {
2622+ ref .getMessage ().setPriority (newPriority );
2623+ if (isLastValue ()) {
26472624 refRemoved (ref );
2648- ref .getMessage ().setPriority (newPriority );
2649- addTail (ref , false );
26502625 }
2626+ addTail (ref , false );
2627+ return true ;
26512628 }
2652- return count ;
2653- }
2629+ });
26542630 }
26552631
26562632 @ Override
@@ -4186,13 +4162,23 @@ public void run() {
41864162 abstract class QueueIterateAction {
41874163
41884164 protected Integer expectedHits ;
4165+ protected Long messageID ;
4166+ protected Filter filter1 = null ;
4167+ protected Predicate <MessageReference > match ;
41894168
41904169 QueueIterateAction (Integer expectedHits ) {
41914170 this .expectedHits = expectedHits ;
4171+ this .match = ref -> filter1 == null ? true : filter1 .match (ref .getMessage ());
4172+ }
4173+
4174+ QueueIterateAction (Long messageID ) {
4175+ this .expectedHits = 1 ;
4176+ this .match = ref -> ref .getMessage ().getMessageID () == messageID ;
41924177 }
41934178
41944179 QueueIterateAction () {
41954180 this .expectedHits = null ;
4181+ this .match = ref -> filter1 == null ? true : filter1 .match (ref .getMessage ());
41964182 }
41974183
41984184 /**
@@ -4207,6 +4193,15 @@ abstract class QueueIterateAction {
42074193 public boolean expectedHitsReached (int currentHits ) {
42084194 return expectedHits != null && currentHits >= expectedHits .intValue ();
42094195 }
4196+
4197+ public void addFilter (Filter filter1 ) {
4198+ this .filter1 = filter1 ;
4199+ }
4200+
4201+ public boolean match (MessageReference ref ) {
4202+ return match .test (ref );
4203+ }
4204+
42104205 }
42114206
42124207 // For external use we need to use a synchronized version since the list is not thread safe
0 commit comments