Skip to content

Commit

Permalink
refactor task/job executors
Browse files Browse the repository at this point in the history
* migrate from java's executor impl to spring's abstraction:
let spring handle the bean lifecycle, graceful shutdown etc.

* separate on interface-level between scheduled and async tasks:
we only have 1 use case for scheduled tasks (websocket ping-pongs),
whereas all other usages of ScheduledExecutorService were just async
job submissions. separation might be useful in future, if we want to have
distinct thread pools as well.
  • Loading branch information
goekay committed Feb 2, 2025
1 parent 0b0745b commit e348315
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 80 deletions.
53 changes: 15 additions & 38 deletions src/main/java/de/rwth/idsg/steve/config/BeanConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.mysql.cj.conf.PropertyKey;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
Expand All @@ -45,6 +44,7 @@
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.validation.beanvalidation.LocalValidatorFactoryBean;
import org.springframework.web.accept.ContentNegotiationManager;
import org.springframework.web.servlet.config.annotation.EnableWebMvc;
Expand All @@ -55,16 +55,11 @@
import org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter;
import org.springframework.web.servlet.view.InternalResourceViewResolver;

import jakarta.annotation.PreDestroy;
import jakarta.validation.Validator;

import javax.sql.DataSource;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Executor;

import static de.rwth.idsg.steve.SteveConfiguration.CONFIG;

Expand All @@ -81,8 +76,6 @@
@ComponentScan("de.rwth.idsg.steve")
public class BeanConfiguration implements WebMvcConfigurer {

private ScheduledThreadPoolExecutor executor;

/**
* https://github.com/brettwooldridge/HikariCP/wiki/MySQL-Configuration
*/
Expand Down Expand Up @@ -144,13 +137,20 @@ public DSLContext dslContext(DataSource dataSource) {
return DSL.using(conf);
}

@Bean
public ScheduledExecutorService scheduledExecutorService() {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("SteVe-Executor-%d")
.build();
@Bean(name = "asyncTaskScheduler")
public ThreadPoolTaskScheduler asyncTaskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(5);
scheduler.setThreadNamePrefix("SteVe-Executor-");
scheduler.setWaitForTasksToCompleteOnShutdown(true);
scheduler.setAwaitTerminationSeconds(30);
scheduler.initialize();
return scheduler;
}

executor = new ScheduledThreadPoolExecutor(5, threadFactory);
return executor;
@Bean(name = "asyncTaskExecutor")
public Executor asyncTaskExecutor(ThreadPoolTaskScheduler asyncTaskScheduler) {
return asyncTaskScheduler;
}

@Bean
Expand All @@ -173,29 +173,6 @@ public ReleaseCheckService releaseCheckService() {
}
}

@PreDestroy
public void shutDown() {
if (executor != null) {
gracefulShutDown(executor);
}
}

private void gracefulShutDown(ExecutorService executor) {
try {
executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);

} catch (InterruptedException e) {
log.error("Termination interrupted", e);

} finally {
if (!executor.isTerminated()) {
log.warn("Killing non-finished tasks");
}
executor.shutdownNow();
}
}

// -------------------------------------------------------------------------
// Web config
// -------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class WebSocketConfiguration implements WebSocketConfigurer {
@Autowired private Ocpp16WebSocketEndpoint ocpp16WebSocketEndpoint;

public static final String PATH_INFIX = "/websocket/CentralSystemService/";
public static final long PING_INTERVAL = TimeUnit.MINUTES.toMinutes(15);
public static final Duration PING_INTERVAL = Duration.ofMinutes(15);
public static final Duration IDLE_TIMEOUT = Duration.ofHours(2);
public static final int MAX_MSG_SIZE = 8_388_608; // 8 MB for max message size

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

import javax.xml.namespace.QName;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Executor;

import static org.apache.cxf.ws.addressing.JAXWSAConstants.ADDRESSING_PROPERTIES_INBOUND;

Expand All @@ -62,7 +62,7 @@ public class MessageHeaderInterceptor extends AbstractPhaseInterceptor<Message>

@Autowired private OcppServerRepository ocppServerRepository;
@Autowired private ChargePointHelperService chargePointHelperService;
@Autowired private ScheduledExecutorService executorService;
@Autowired private Executor asyncTaskExecutor;

private static final String BOOT_OPERATION_NAME = "BootNotification";
private static final String CHARGEBOX_ID_HEADER = "ChargeBoxIdentity";
Expand Down Expand Up @@ -93,7 +93,7 @@ public void handleMessage(Message message) throws Fault {
// 2. update endpoint
// -------------------------------------------------------------------------

executorService.execute(() -> {
asyncTaskExecutor.execute(() -> {
try {
String endpointAddress = getEndpointAddress(message);
if (endpointAddress != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.joda.time.DateTime;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.socket.BinaryMessage;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.PongMessage;
Expand All @@ -39,14 +40,13 @@
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

/**
Expand All @@ -55,7 +55,7 @@
*/
public abstract class AbstractWebSocketEndpoint extends ConcurrentWebSocketHandler implements SubProtocolCapable {

@Autowired private ScheduledExecutorService service;
@Autowired private ThreadPoolTaskScheduler asyncTaskScheduler;
@Autowired private OcppServerRepository ocppServerRepository;
@Autowired private FutureResponseContextStore futureResponseContextStore;
@Autowired private ApplicationEventPublisher applicationEventPublisher;
Expand Down Expand Up @@ -131,11 +131,11 @@ public void onOpen(WebSocketSession session) throws Exception {

// Just to keep the connection alive, such that the servers do not close
// the connection because of a idle timeout, we ping-pong at fixed intervals.
ScheduledFuture pingSchedule = service.scheduleAtFixedRate(
ScheduledFuture pingSchedule = asyncTaskScheduler.scheduleAtFixedRate(
new PingTask(chargeBoxId, session),
WebSocketConfiguration.PING_INTERVAL,
WebSocketConfiguration.PING_INTERVAL,
TimeUnit.MINUTES);
Instant.now().plus(WebSocketConfiguration.PING_INTERVAL),
WebSocketConfiguration.PING_INTERVAL
);

futureResponseContextStore.addSession(session);

Expand Down
12 changes: 6 additions & 6 deletions src/main/java/de/rwth/idsg/steve/service/BackgroundService.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import lombok.RequiredArgsConstructor;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executor;
import java.util.function.Consumer;

/**
Expand All @@ -32,10 +32,10 @@
*/
@RequiredArgsConstructor
public class BackgroundService {
private final ExecutorService executorService;
private final Executor asyncTaskExecutor;

public static BackgroundService with(ExecutorService executorService) {
return new BackgroundService(executorService);
public static BackgroundService with(Executor asyncTaskExecutor) {
return new BackgroundService(asyncTaskExecutor);
}

public Runner forFirst(List<ChargePointSelect> list) {
Expand All @@ -56,7 +56,7 @@ private class BackgroundSingleRunner implements Runner {

@Override
public void execute(Consumer<ChargePointSelect> consumer) {
executorService.execute(() -> consumer.accept(cps));
asyncTaskExecutor.execute(() -> consumer.accept(cps));
}
}

Expand All @@ -66,7 +66,7 @@ private class BackgroundListRunner implements Runner {

@Override
public void execute(Consumer<ChargePointSelect> consumer) {
executorService.execute(() -> list.forEach(consumer));
asyncTaskExecutor.execute(() -> list.forEach(consumer));
}
}
}
Loading

0 comments on commit e348315

Please sign in to comment.