Skip to content

Commit

Permalink
Merge pull request #1686 from steve-community/refactor_executors
Browse files Browse the repository at this point in the history
refactor task/job executors
  • Loading branch information
goekay authored Feb 2, 2025
2 parents 0b0745b + 535d87c commit 3bfcc82
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 80 deletions.
61 changes: 23 additions & 38 deletions src/main/java/de/rwth/idsg/steve/config/BeanConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.mysql.cj.conf.PropertyKey;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
Expand All @@ -45,6 +44,8 @@
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.validation.beanvalidation.LocalValidatorFactoryBean;
import org.springframework.web.accept.ContentNegotiationManager;
import org.springframework.web.servlet.config.annotation.EnableWebMvc;
Expand All @@ -55,16 +56,10 @@
import org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter;
import org.springframework.web.servlet.view.InternalResourceViewResolver;

import jakarta.annotation.PreDestroy;
import jakarta.validation.Validator;

import javax.sql.DataSource;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import static de.rwth.idsg.steve.SteveConfiguration.CONFIG;

Expand All @@ -81,8 +76,6 @@
@ComponentScan("de.rwth.idsg.steve")
public class BeanConfiguration implements WebMvcConfigurer {

private ScheduledThreadPoolExecutor executor;

/**
* https://github.com/brettwooldridge/HikariCP/wiki/MySQL-Configuration
*/
Expand Down Expand Up @@ -144,13 +137,28 @@ public DSLContext dslContext(DataSource dataSource) {
return DSL.using(conf);
}

@Bean
public ScheduledExecutorService scheduledExecutorService() {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("SteVe-Executor-%d")
.build();
@Bean(destroyMethod = "close")
public DelegatingTaskScheduler asyncTaskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(5);
scheduler.setThreadNamePrefix("SteVe-TaskScheduler-");
scheduler.setWaitForTasksToCompleteOnShutdown(true);
scheduler.setAwaitTerminationSeconds(30);
scheduler.initialize();

executor = new ScheduledThreadPoolExecutor(5, threadFactory);
return executor;
return new DelegatingTaskScheduler(scheduler);
}

@Bean(destroyMethod = "close")
public DelegatingTaskExecutor asyncTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setThreadNamePrefix("SteVe-TaskExecutor-");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(30);
executor.initialize();

return new DelegatingTaskExecutor(executor);
}

@Bean
Expand All @@ -173,29 +181,6 @@ public ReleaseCheckService releaseCheckService() {
}
}

@PreDestroy
public void shutDown() {
if (executor != null) {
gracefulShutDown(executor);
}
}

private void gracefulShutDown(ExecutorService executor) {
try {
executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);

} catch (InterruptedException e) {
log.error("Termination interrupted", e);

} finally {
if (!executor.isTerminated()) {
log.warn("Killing non-finished tasks");
}
executor.shutdownNow();
}
}

// -------------------------------------------------------------------------
// Web config
// -------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* SteVe - SteckdosenVerwaltung - https://github.com/steve-community/steve
* Copyright (C) 2013-2025 SteVe Community Team
* All Rights Reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package de.rwth.idsg.steve.config;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.io.Closeable;
import java.io.IOException;

/**
* @author Sevket Goekay <[email protected]>
* @since 02.02.2025
*/
@Slf4j
@RequiredArgsConstructor
public class DelegatingTaskExecutor implements Closeable {

private final ThreadPoolTaskExecutor delegate;

@Override
public void close() throws IOException {
log.info("Shutting down");
delegate.shutdown();
}

public void execute(Runnable task) {
delegate.execute(task);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* SteVe - SteckdosenVerwaltung - https://github.com/steve-community/steve
* Copyright (C) 2013-2025 SteVe Community Team
* All Rights Reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package de.rwth.idsg.steve.config;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ScheduledFuture;

/**
* @author Sevket Goekay <[email protected]>
* @since 02.02.2025
*/
@Slf4j
@RequiredArgsConstructor
public class DelegatingTaskScheduler implements Closeable {

private final ThreadPoolTaskScheduler delegate;

@Override
public void close() throws IOException {
log.info("Shutting down");
delegate.shutdown();
}

public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Instant startTime, Duration period) {
return delegate.scheduleAtFixedRate(task, startTime, period);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class WebSocketConfiguration implements WebSocketConfigurer {
@Autowired private Ocpp16WebSocketEndpoint ocpp16WebSocketEndpoint;

public static final String PATH_INFIX = "/websocket/CentralSystemService/";
public static final long PING_INTERVAL = TimeUnit.MINUTES.toMinutes(15);
public static final Duration PING_INTERVAL = Duration.ofMinutes(15);
public static final Duration IDLE_TIMEOUT = Duration.ofHours(2);
public static final int MAX_MSG_SIZE = 8_388_608; // 8 MB for max message size

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package de.rwth.idsg.steve.ocpp.soap;

import de.rwth.idsg.steve.config.DelegatingTaskExecutor;
import de.rwth.idsg.steve.ocpp.OcppProtocol;
import de.rwth.idsg.steve.repository.OcppServerRepository;
import de.rwth.idsg.steve.repository.impl.ChargePointRepositoryImpl;
Expand All @@ -41,7 +42,6 @@

import javax.xml.namespace.QName;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;

import static org.apache.cxf.ws.addressing.JAXWSAConstants.ADDRESSING_PROPERTIES_INBOUND;

Expand All @@ -62,7 +62,7 @@ public class MessageHeaderInterceptor extends AbstractPhaseInterceptor<Message>

@Autowired private OcppServerRepository ocppServerRepository;
@Autowired private ChargePointHelperService chargePointHelperService;
@Autowired private ScheduledExecutorService executorService;
@Autowired private DelegatingTaskExecutor asyncTaskExecutor;

private static final String BOOT_OPERATION_NAME = "BootNotification";
private static final String CHARGEBOX_ID_HEADER = "ChargeBoxIdentity";
Expand Down Expand Up @@ -93,7 +93,7 @@ public void handleMessage(Message message) throws Fault {
// 2. update endpoint
// -------------------------------------------------------------------------

executorService.execute(() -> {
asyncTaskExecutor.execute(() -> {
try {
String endpointAddress = getEndpointAddress(message);
if (endpointAddress != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.base.Strings;
import de.rwth.idsg.steve.config.WebSocketConfiguration;
import de.rwth.idsg.steve.config.DelegatingTaskScheduler;
import de.rwth.idsg.steve.ocpp.OcppTransport;
import de.rwth.idsg.steve.ocpp.OcppVersion;
import de.rwth.idsg.steve.ocpp.ws.data.CommunicationContext;
Expand All @@ -39,14 +40,13 @@
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

/**
Expand All @@ -55,7 +55,7 @@
*/
public abstract class AbstractWebSocketEndpoint extends ConcurrentWebSocketHandler implements SubProtocolCapable {

@Autowired private ScheduledExecutorService service;
@Autowired private DelegatingTaskScheduler asyncTaskScheduler;
@Autowired private OcppServerRepository ocppServerRepository;
@Autowired private FutureResponseContextStore futureResponseContextStore;
@Autowired private ApplicationEventPublisher applicationEventPublisher;
Expand Down Expand Up @@ -131,11 +131,11 @@ public void onOpen(WebSocketSession session) throws Exception {

// Just to keep the connection alive, such that the servers do not close
// the connection because of a idle timeout, we ping-pong at fixed intervals.
ScheduledFuture pingSchedule = service.scheduleAtFixedRate(
ScheduledFuture pingSchedule = asyncTaskScheduler.scheduleAtFixedRate(
new PingTask(chargeBoxId, session),
WebSocketConfiguration.PING_INTERVAL,
WebSocketConfiguration.PING_INTERVAL,
TimeUnit.MINUTES);
Instant.now().plus(WebSocketConfiguration.PING_INTERVAL),
WebSocketConfiguration.PING_INTERVAL
);

futureResponseContextStore.addSession(session);

Expand Down
13 changes: 7 additions & 6 deletions src/main/java/de/rwth/idsg/steve/service/BackgroundService.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
*/
package de.rwth.idsg.steve.service;

import de.rwth.idsg.steve.config.DelegatingTaskExecutor;
import de.rwth.idsg.steve.repository.dto.ChargePointSelect;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;

/**
Expand All @@ -32,10 +32,11 @@
*/
@RequiredArgsConstructor
public class BackgroundService {
private final ExecutorService executorService;

public static BackgroundService with(ExecutorService executorService) {
return new BackgroundService(executorService);
private final DelegatingTaskExecutor asyncTaskExecutor;

public static BackgroundService with(DelegatingTaskExecutor asyncTaskExecutor) {
return new BackgroundService(asyncTaskExecutor);
}

public Runner forFirst(List<ChargePointSelect> list) {
Expand All @@ -56,7 +57,7 @@ private class BackgroundSingleRunner implements Runner {

@Override
public void execute(Consumer<ChargePointSelect> consumer) {
executorService.execute(() -> consumer.accept(cps));
asyncTaskExecutor.execute(() -> consumer.accept(cps));
}
}

Expand All @@ -66,7 +67,7 @@ private class BackgroundListRunner implements Runner {

@Override
public void execute(Consumer<ChargePointSelect> consumer) {
executorService.execute(() -> list.forEach(consumer));
asyncTaskExecutor.execute(() -> list.forEach(consumer));
}
}
}
Loading

0 comments on commit 3bfcc82

Please sign in to comment.