diff --git a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java index bc759e2c4b3..d513bf13bfb 100644 --- a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java +++ b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7ProtocolLogic.java @@ -92,11 +92,7 @@ public class S7ProtocolLogic extends Plc4xProtocolBase { /* * Task group for managing connection redundancy. */ - private final ExecutorService clientExecutorService = Executors.newFixedThreadPool(4, new BasicThreadFactory.Builder() - .namingPattern("plc4x-app-thread-%d") - .daemon(true) - .priority(Thread.MAX_PRIORITY) - .build()); + private final ExecutorService clientExecutorService = SharedExecutor.getAppExecutor(); /* * Take into account that the size of this buffer depends on the final device. @@ -154,8 +150,6 @@ public PlcTagHandler getTagHandler() { @Override public void close(ConversationContext context) { - // TODO: Find out how to close this prior to Java 19 - //clientExecutorService.close(); tm.shutdown(); eventLogic.stop(); // TODO Implement Closing on Protocol Level @@ -259,8 +253,6 @@ public void onConnect(ConversationContext context) { @Override public void onDisconnect(ConversationContext context) { logger.info("onDisconnect"); - // 1. Here we shut down the local task executor. - clientExecutorService.shutdownNow(); // 2. Performs the shutdown of the transaction executor. tm.shutdown(); // 3. Finish the execution of the tasks for the handling of Events. diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/transaction/RequestTransactionManager.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/transaction/RequestTransactionManager.java index f71be3b2d4d..d0251892ea0 100644 --- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/transaction/RequestTransactionManager.java +++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/transaction/RequestTransactionManager.java @@ -47,15 +47,9 @@ public class RequestTransactionManager { private static final Logger logger = LoggerFactory.getLogger(RequestTransactionManager.class); - /** Executor that performs all operations */ - //static final ExecutorService executor = Executors.newScheduledThreadPool(4); - - final ExecutorService executor = Executors.newFixedThreadPool(4, new BasicThreadFactory.Builder() - .namingPattern("plc4x-tm-thread-%d") - .daemon(true) - .priority(Thread.MAX_PRIORITY) - .build()); - + /** Shared Executor that performs all operations */ + final ExecutorService executor = SharedExecutor.getTmExecutor(); + private final Set runningRequests; /** How many Transactions are allowed to run at the same time? */ private int numberOfConcurrentRequests; @@ -92,11 +86,9 @@ public void setNumberOfConcurrentRequests(int numberOfConcurrentRequests) { } /* - * It allows the sequential shutdown of the associated driver. - */ - public void shutdown(){ - executor.shutdown(); - } + * Empty shutdown because of shared executor + */ + public void shutdown(){} public void submit(Consumer context) { RequestTransaction transaction = startRequest(); diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/transaction/SharedExecutor.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/transaction/SharedExecutor.java new file mode 100644 index 00000000000..bd999791065 --- /dev/null +++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/transaction/SharedExecutor.java @@ -0,0 +1,54 @@ +package org.apache.plc4x.java.spi.transaction; + +import org.apache.commons.lang3.concurrent.BasicThreadFactory; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * A new class created to centrally manage executors, which would otherwise be created and never removed for each connection request. + * That is, given a connection, ThreadPools were created for each request through it; these only died at the end of the connection, so in + * the case of a cached connection, the ThreadPools accumulated. This way, there is always only one shared ThreadPool: nothing changes for the normal connection + * while avoiding the problem with the cached connection. + * + * NB: the number of threads here is set to 10 (in the original, non-shared connection, there were 4) for safety. + * If necessary (e.g. massive several connections to independent PLCs), it can be increased. + */ +public class SharedExecutor { + /** + * Usato da opcua driver (e forse altri.. ma non s7) + */ + private static final ExecutorService tmExecutor = Executors.newFixedThreadPool( + 10, + new BasicThreadFactory.Builder() + .namingPattern("plc4x-tm-thread-%d") + .daemon(true) + .priority(Thread.MAX_PRIORITY) + .build() + ); + + /** + * Usato solo da s7 driver + */ + private static final ExecutorService appExecutor = Executors.newFixedThreadPool( + 10, + new BasicThreadFactory.Builder() + .namingPattern("plc4x-app-thread-%d") + .daemon(true) + .priority(Thread.MAX_PRIORITY) + .build() + ); + + public static ExecutorService getTmExecutor() { + return tmExecutor; + } + + public static ExecutorService getAppExecutor() { + return appExecutor; + } + + public static void shutdown() { + tmExecutor.shutdownNow(); + appExecutor.shutdownNow(); + } +}