diff --git a/.gitignore b/.gitignore index 59abfc9..5763f50 100644 --- a/.gitignore +++ b/.gitignore @@ -27,6 +27,7 @@ wrapper /application-skeleton-consumer/target /application-skeleton-publisher/target /application-skeleton-subscriber/target +/application-skeleton-executor/target /mvnw /mvnw.cmd /.settings diff --git a/application-skeleton-executor/.classpath b/application-skeleton-executor/.classpath new file mode 100644 index 0000000..002ad57 --- /dev/null +++ b/application-skeleton-executor/.classpath @@ -0,0 +1,38 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/application-skeleton-executor/.project b/application-skeleton-executor/.project new file mode 100644 index 0000000..c674556 --- /dev/null +++ b/application-skeleton-executor/.project @@ -0,0 +1,23 @@ + + + arrowhead-application-skeleton-executor + + + + + + org.eclipse.jdt.core.javabuilder + + + + + org.eclipse.m2e.core.maven2Builder + + + + + + org.eclipse.jdt.core.javanature + org.eclipse.m2e.core.maven2Nature + + diff --git a/application-skeleton-executor/pom.xml b/application-skeleton-executor/pom.xml new file mode 100644 index 0000000..8347fa3 --- /dev/null +++ b/application-skeleton-executor/pom.xml @@ -0,0 +1,54 @@ + + 4.0.0 + + + eu.arrowhead + application-skeleton-java-spring + 4.4.0.0 + + + arrowhead-application-skeleton-executor + Arrowhead Executor Skeleton + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + repackage + + + + + + + maven-resources-plugin + 3.1.0 + + + copy-resources + validate + + copy-resources + + + ${basedir}/target + + + src/main/resources + + application.properties + + + + + + + + + + + \ No newline at end of file diff --git a/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/ExecutorApplicationInitListener.java b/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/ExecutorApplicationInitListener.java new file mode 100644 index 0000000..7f9c00a --- /dev/null +++ b/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/ExecutorApplicationInitListener.java @@ -0,0 +1,56 @@ +package eu.arrowhead.application.skeleton.executor; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.event.ContextRefreshedEvent; +import org.springframework.stereotype.Component; + +import ai.aitia.arrowhead.application.library.ArrowheadService; +import ai.aitia.arrowhead.application.library.config.ApplicationInitListener; +import eu.arrowhead.application.skeleton.executor.execution.ExecutionManager; +import eu.arrowhead.common.core.CoreSystem; + +@Component +public class ExecutorApplicationInitListener extends ApplicationInitListener { + + //================================================================================================= + // members + + @Autowired + private ArrowheadService arrowheadService; + + @Autowired + private ExecutionManager executionManager; + + private final Logger logger = LogManager.getLogger(ExecutorApplicationInitListener.class); + + //================================================================================================= + // methods + + //------------------------------------------------------------------------------------------------- + @Override + protected void customInit(final ContextRefreshedEvent event) { + + //Checking the availability of necessary core systems + checkCoreSystemReachability(CoreSystem.CHOREOGRAPHER); + + //Initialize Arrowhead Context + arrowheadService.updateCoreServiceURIs(CoreSystem.CHOREOGRAPHER); + + //Start Executor Manager + executionManager.start(); + + //TODO: implement here any custom behavior on application start up + } + + //------------------------------------------------------------------------------------------------- + @Override + public void customDestroy() { + + //Stop Executor Manager + executionManager.interrupt(); + + //TODO: implement here any custom behavior on application shout down + } +} diff --git a/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/ExecutorConstants.java b/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/ExecutorConstants.java new file mode 100644 index 0000000..1cf2514 --- /dev/null +++ b/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/ExecutorConstants.java @@ -0,0 +1,18 @@ +package eu.arrowhead.application.skeleton.executor; + +public class ExecutorConstants { + + //================================================================================================= + // members + + public static final String THREAD_NUM_EXECUTION_WORKER = "thread.num.execution-worker"; + public static final String $THREAD_NUM_EXECUTION_WORKER_WD = "${" + THREAD_NUM_EXECUTION_WORKER + ":1}"; + + //================================================================================================= + // assistant methods + + //------------------------------------------------------------------------------------------------- + private ExecutorConstants() { + throw new UnsupportedOperationException(); + } +} diff --git a/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/ExecutorMain.java b/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/ExecutorMain.java new file mode 100644 index 0000000..49bd7bc --- /dev/null +++ b/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/ExecutorMain.java @@ -0,0 +1,20 @@ +package eu.arrowhead.application.skeleton.executor; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.ComponentScan; + +import eu.arrowhead.common.CommonConstants; + +@SpringBootApplication +@ComponentScan(basePackages = {CommonConstants.BASE_PACKAGE}) //TODO: add custom packages if any +public class ExecutorMain { + + //================================================================================================= + // methods + + //------------------------------------------------------------------------------------------------- + public static void main(final String[] args) { + SpringApplication.run(ExecutorMain.class, args); + } +} diff --git a/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/configuration/ConfigConstants.java b/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/configuration/ConfigConstants.java new file mode 100644 index 0000000..a5c1d5e --- /dev/null +++ b/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/configuration/ConfigConstants.java @@ -0,0 +1,18 @@ +package eu.arrowhead.application.skeleton.executor.configuration; + +public class ConfigConstants { + + //================================================================================================= + // members + + public static final int MIN_MAXKEEPALIVE_REQUESTS = 1; + public static final int MAX_MAXKEEPALIVE_REQUESTS = 1000; + + //================================================================================================= + // assistant methods + + //------------------------------------------------------------------------------------------------- + private ConfigConstants() { + throw new UnsupportedOperationException(); + } +} diff --git a/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/configuration/ContainerConfProperties.java b/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/configuration/ContainerConfProperties.java new file mode 100644 index 0000000..85cb09b --- /dev/null +++ b/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/configuration/ContainerConfProperties.java @@ -0,0 +1,31 @@ +package eu.arrowhead.application.skeleton.executor.configuration; + +import javax.validation.constraints.Max; +import javax.validation.constraints.Min; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.PropertySource; +import org.springframework.validation.annotation.Validated; + +@Configuration +@PropertySource("classpath:application.properties") +@ConfigurationProperties(prefix = "container") +@Validated +public class ContainerConfProperties { + + //================================================================================================= + // members + + @Min(ConfigConstants.MIN_MAXKEEPALIVE_REQUESTS) + @Max(ConfigConstants.MAX_MAXKEEPALIVE_REQUESTS) + private int maxKeepAliveRequests; + + //================================================================================================= + // methods + + //------------------------------------------------------------------------------------------------- + + public int getMaxKeepAliveRequests() { return maxKeepAliveRequests; } + public void setMaxKeepAliveRequests(final int maxKeepAliveRequests) { this.maxKeepAliveRequests = maxKeepAliveRequests; } +} diff --git a/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/configuration/ContainerConfiguration.java b/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/configuration/ContainerConfiguration.java new file mode 100644 index 0000000..67ec9ca --- /dev/null +++ b/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/configuration/ContainerConfiguration.java @@ -0,0 +1,47 @@ +package eu.arrowhead.application.skeleton.executor.configuration; + +import org.apache.coyote.http11.AbstractHttp11Protocol; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory; +import org.springframework.boot.web.server.WebServerFactoryCustomizer; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class ContainerConfiguration implements WebServerFactoryCustomizer { + + //================================================================================================= + // members + + @Autowired + ContainerConfProperties containerConfProperties; + + private final Logger log = LogManager.getLogger( ContainerConfiguration.class); + + //================================================================================================= + // methods + + //------------------------------------------------------------------------------------------------- + @SuppressWarnings("rawtypes") + @Override + public void customize(TomcatServletWebServerFactory factory) { + factory.addConnectorCustomizers(connector -> { + final AbstractHttp11Protocol protocol = (AbstractHttp11Protocol) connector.getProtocolHandler(); + + protocol.setMaxKeepAliveRequests(containerConfProperties.getMaxKeepAliveRequests()); + + log.info("####################################################################################"); + log.info("#"); + log.info("# TomcatCustomizer"); + log.info("#"); + log.info("# custom maxKeepAliveRequests {}", protocol.getMaxKeepAliveRequests()); + log.info("# origin keepalive timeout: {} ms", protocol.getKeepAliveTimeout()); + log.info("# keepalive timeout: {} ms", protocol.getKeepAliveTimeout()); + log.info("# connection timeout: {} ms", protocol.getConnectionTimeout()); + log.info("# max connections: {}", protocol.getMaxConnections()); + log.info("#"); + log.info("####################################################################################"); + }); + } +} diff --git a/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/controller/ExecutorController.java b/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/controller/ExecutorController.java new file mode 100644 index 0000000..b7f46d6 --- /dev/null +++ b/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/controller/ExecutorController.java @@ -0,0 +1,59 @@ +package eu.arrowhead.application.skeleton.executor.controller; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.ResponseBody; +import org.springframework.web.bind.annotation.RestController; + +import eu.arrowhead.application.skeleton.executor.service.ExecutorService; +import eu.arrowhead.common.CommonConstants; +import eu.arrowhead.common.dto.shared.ChoreographerAbortStepRequestDTO; +import eu.arrowhead.common.dto.shared.ChoreographerExecuteStepRequestDTO; +import eu.arrowhead.common.dto.shared.ChoreographerExecutorServiceInfoRequestDTO; +import eu.arrowhead.common.dto.shared.ChoreographerExecutorServiceInfoResponseDTO; + +@RestController +//@RequestMapping("/executor") // TODO: specify the base URI here +public class ExecutorController { + + //================================================================================================= + // members + + @Autowired + private ExecutorService executorService; + + //TODO: add your variables here + + //================================================================================================= + // methods + + //------------------------------------------------------------------------------------------------- + @GetMapping(path = CommonConstants.ECHO_URI) + public String echo() { + return "Got it!"; + } + + //------------------------------------------------------------------------------------------------- + @PostMapping(path = CommonConstants.CHOREOGRAPHER_EXECUTOR_CLIENT_SERVICE_START_URI, consumes = MediaType.APPLICATION_JSON_VALUE) + public void start(@RequestBody final ChoreographerExecuteStepRequestDTO request) { + executorService.startExecution(request); + } + + //------------------------------------------------------------------------------------------------- + @PostMapping(path = CommonConstants.CHOREOGRAPHER_EXECUTOR_CLIENT_SERVICE_ABORT_URI, consumes = MediaType.APPLICATION_JSON_VALUE) + public void abort(@RequestBody final ChoreographerAbortStepRequestDTO request) { + executorService.abortExecution(request); + } + + //------------------------------------------------------------------------------------------------- + @PostMapping(path = CommonConstants.CHOREOGRAPHER_EXECUTOR_CLIENT_SERVICE_INFO_URI, consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE) + @ResponseBody public ChoreographerExecutorServiceInfoResponseDTO serviceInfo(@RequestBody final ChoreographerExecutorServiceInfoRequestDTO request) { + return executorService.collectServiceInfo(request); + } + + //------------------------------------------------------------------------------------------------- + //TODO: implement here your executor related REST end points +} diff --git a/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/execution/ExecutionBoard.java b/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/execution/ExecutionBoard.java new file mode 100644 index 0000000..f07ab22 --- /dev/null +++ b/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/execution/ExecutionBoard.java @@ -0,0 +1,97 @@ +package eu.arrowhead.application.skeleton.executor.execution; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; + +import org.springframework.stereotype.Component; +import org.springframework.util.Assert; + +import eu.arrowhead.common.dto.shared.ChoreographerExecuteStepRequestDTO; + +@Component +public class ExecutionBoard { + + //================================================================================================= + // members + + private final Map board = new ConcurrentHashMap<>(); + private final BlockingQueue queue = new LinkedBlockingQueue<>(); + + private final Object lock = new Object(); + + //================================================================================================= + // methods + + //------------------------------------------------------------------------------------------------- + public Job newJob(final ChoreographerExecuteStepRequestDTO jobRequest) { + Assert.notNull(jobRequest, "jobRequest is null"); + + synchronized (lock) { + final Job job = new Job(jobRequest, ExecutionSignal.DO); + board.put(getUniqueIdentifier(jobRequest), job); + queue.add(job); + return job; + } + } + + //------------------------------------------------------------------------------------------------- + public Optional peekJob(final long sessionId, final long sessionStepId) { + final Job job = board.get(getUniqueIdentifier(sessionId, sessionStepId)); + if (job != null) { + return Optional.of(job); + } + return Optional.empty(); + } + + //------------------------------------------------------------------------------------------------- + public Job nextJob() throws InterruptedException { + return queue.take(); + } + + //------------------------------------------------------------------------------------------------- + public ExecutionSignal getJobExecutionSignal(final long sessionId, final long sessionStepId) { + synchronized (lock) { + final Job job = board.get(getUniqueIdentifier(sessionId, sessionStepId)); + if (job != null) { + return job.getExecutionSignal(); + } + return ExecutionSignal.UNKNOWN; + } + } + + //------------------------------------------------------------------------------------------------- + public void abortJob(final long sessionId, final long sessionStepId) { + synchronized (lock) { + final Optional optional = peekJob(sessionId, sessionStepId); + if (optional.isPresent()) { + optional.get().setExecutionSignal(ExecutionSignal.ABORT); + } + } + } + + //------------------------------------------------------------------------------------------------- + public void removeJob(final long sessionId, final long sessionStepId) { + synchronized (lock) { + final Job job = board.remove(getUniqueIdentifier(sessionId, sessionStepId)); + if (job != null) { + queue.remove(job); + } + } + } + + //================================================================================================= + // assistant methods + + //------------------------------------------------------------------------------------------------- + private String getUniqueIdentifier(final ChoreographerExecuteStepRequestDTO request) { + return getUniqueIdentifier(request.getSessionId(), request.getSessionStepId()); + } + + //------------------------------------------------------------------------------------------------- + private String getUniqueIdentifier(final long sessionId, final long sessionStepId) { + return sessionId + "-" + sessionStepId; + } +} diff --git a/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/execution/ExecutionManager.java b/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/execution/ExecutionManager.java new file mode 100644 index 0000000..acffd9b --- /dev/null +++ b/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/execution/ExecutionManager.java @@ -0,0 +1,63 @@ +package eu.arrowhead.application.skeleton.executor.execution; + +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.function.Function; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import eu.arrowhead.application.skeleton.executor.ExecutorConstants; + +@Component +public class ExecutionManager extends Thread { + + //================================================================================================= + // members + + @Autowired + private ExecutionBoard board; + + @Autowired + private Function workerFactory; + + private ThreadPoolExecutor threadPool; + + @Value(ExecutorConstants.$THREAD_NUM_EXECUTION_WORKER_WD) + private int threadNum; + + private boolean doWork = true; + + //================================================================================================= + // methods + + //------------------------------------------------------------------------------------------------- + @Override + public void run() { + + threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(threadNum); + + while (doWork) { + try { + final Job job = board.nextJob(); + if (job.getExecutionSignal() == ExecutionSignal.ABORT) { + board.removeJob(job.getJobRequest().getSessionId(), job.getJobRequest().getSessionStepId()); + + } else { + threadPool.execute(workerFactory.apply(job)); + } + + } catch (final InterruptedException ex) { + interrupt(); + } + } + } + + //------------------------------------------------------------------------------------------------- + @Override + public void interrupt() { + doWork = false; + super.interrupt(); + } +} diff --git a/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/execution/ExecutionSignal.java b/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/execution/ExecutionSignal.java new file mode 100644 index 0000000..bf82810 --- /dev/null +++ b/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/execution/ExecutionSignal.java @@ -0,0 +1,6 @@ +package eu.arrowhead.application.skeleton.executor.execution; + +public enum ExecutionSignal { + + DO, ABORT, UNKNOWN; +} diff --git a/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/execution/Job.java b/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/execution/Job.java new file mode 100644 index 0000000..2dc439e --- /dev/null +++ b/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/execution/Job.java @@ -0,0 +1,28 @@ +package eu.arrowhead.application.skeleton.executor.execution; + +import eu.arrowhead.common.dto.shared.ChoreographerExecuteStepRequestDTO; + +public class Job { + + //================================================================================================= + // members + + private final ChoreographerExecuteStepRequestDTO jobRequest; + private ExecutionSignal executionSignal; + + //================================================================================================= + // methods + + //------------------------------------------------------------------------------------------------- + public Job(final ChoreographerExecuteStepRequestDTO jobRequest, final ExecutionSignal executionSignal) { + this.jobRequest = jobRequest; + this.executionSignal = executionSignal; + } + + //------------------------------------------------------------------------------------------------- + public ExecutionSignal getExecutionSignal() { return executionSignal; } + public ChoreographerExecuteStepRequestDTO getJobRequest() { return jobRequest; } + + //------------------------------------------------------------------------------------------------- + public void setExecutionSignal(final ExecutionSignal executionSignal) { this.executionSignal = executionSignal; } +} diff --git a/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/execution/worker/ExecutionWorkerFactoryConfig.java b/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/execution/worker/ExecutionWorkerFactoryConfig.java new file mode 100644 index 0000000..c25095c --- /dev/null +++ b/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/execution/worker/ExecutionWorkerFactoryConfig.java @@ -0,0 +1,39 @@ +package eu.arrowhead.application.skeleton.executor.execution.worker; + +import java.util.function.Function; + +import org.springframework.beans.factory.config.BeanDefinition; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Scope; + +import eu.arrowhead.application.skeleton.executor.execution.Job; + +@Configuration +public class ExecutionWorkerFactoryConfig { + + //------------------------------------------------------------------------------------------------- + @Bean + public Function executionWorkerFactory() { + return job -> createExecutionWorker(job); + } + + //------------------------------------------------------------------------------------------------- + @Bean + @Scope(BeanDefinition.SCOPE_PROTOTYPE) + public Runnable createExecutionWorker(final Job job) { + final String serviceDefinition = job.getJobRequest().getMainOrchestrationResult().getService().getServiceDefinition(); + + switch (serviceDefinition) { + //TODO initiate here your execution workers +// case "your-main-service-A": +// return new YourMainServiceAExecutionWorker(job); + +// case "your-main-service-B": +// return new YourMainServiceBExecutionWorker(job); + + default: + return new UnkownServiceExecutionWorker(job); + } + } +} diff --git a/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/execution/worker/UnkownServiceExecutionWorker.java b/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/execution/worker/UnkownServiceExecutionWorker.java new file mode 100644 index 0000000..6ab39e9 --- /dev/null +++ b/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/execution/worker/UnkownServiceExecutionWorker.java @@ -0,0 +1,42 @@ +package eu.arrowhead.application.skeleton.executor.execution.worker; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.springframework.beans.factory.annotation.Autowired; + +import eu.arrowhead.application.skeleton.executor.execution.Job; +import eu.arrowhead.application.skeleton.executor.service.ExecutorDriver; +import eu.arrowhead.common.dto.shared.ChoreographerExecutedStepStatus; + +public class UnkownServiceExecutionWorker implements Runnable { + + //================================================================================================= + // members + + private final Job job; + + @Autowired + private ExecutorDriver driver; + + private final Logger logger = LogManager.getLogger(UnkownServiceExecutionWorker.class); + + //================================================================================================= + // methods + + //------------------------------------------------------------------------------------------------- + public UnkownServiceExecutionWorker(Job job) { + this.job = job; + } + + //------------------------------------------------------------------------------------------------- + @Override + public void run() { + logger.error("Unkown service execution request: sessionId={}, sessionStepId={}, service={}", + job.getJobRequest().getSessionId(), + job.getJobRequest().getSessionStepId(), + job.getJobRequest().getMainOrchestrationResult().getService().getServiceDefinition()); + + driver.notifyChoreographer(job.getJobRequest().getSessionId(), job.getJobRequest().getSessionStepId(), ChoreographerExecutedStepStatus.ERROR, + job.getJobRequest().getMainOrchestrationResult().getService().getServiceDefinition() + " is not supported", null); + } +} diff --git a/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/security/ExecutorAccessControlFilter.java b/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/security/ExecutorAccessControlFilter.java new file mode 100644 index 0000000..1a00680 --- /dev/null +++ b/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/security/ExecutorAccessControlFilter.java @@ -0,0 +1,48 @@ +package eu.arrowhead.application.skeleton.executor.security; + +import java.util.Map; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.http.HttpStatus; +import org.springframework.stereotype.Component; + +import eu.arrowhead.common.CommonConstants; +import eu.arrowhead.common.core.CoreSystem; +import eu.arrowhead.common.exception.AuthException; +import eu.arrowhead.common.security.AccessControlFilter; + +@Component +@ConditionalOnProperty(name = CommonConstants.SERVER_SSL_ENABLED, matchIfMissing = true) +public class ExecutorAccessControlFilter extends AccessControlFilter { + + //================================================================================================= + // methods + + //------------------------------------------------------------------------------------------------- + @Override + protected void checkClientAuthorized(final String clientCN, final String method, final String requestTarget, final String requestJSON, final Map queryParams) { + super.checkClientAuthorized(clientCN, method, requestTarget, requestJSON, queryParams); + + final String cloudCN = getServerCloudCN(); + + if (requestTarget.contains(CommonConstants.CHOREOGRAPHER_EXECUTOR_CLIENT_SERVICE_START_URI) || + requestTarget.contains(CommonConstants.CHOREOGRAPHER_EXECUTOR_CLIENT_SERVICE_ABORT_URI) || + requestTarget.contains(CommonConstants.CHOREOGRAPHER_EXECUTOR_CLIENT_SERVICE_INFO_URI)) { + //Only Choreographer Core System is allowed to call this endpoints + checkIfClientIsChoreographer(clientCN, cloudCN); + } + + //TODO: implement here your custom access filter if any further + } + + //================================================================================================= + // methods + + //------------------------------------------------------------------------------------------------- + private void checkIfClientIsChoreographer(final String clientCN, final String cloudCN) { + final String coreSystemCN = CoreSystem.CHOREOGRAPHER.name().toLowerCase() + "." + cloudCN; + if (!clientCN.equalsIgnoreCase(coreSystemCN)) { + throw new AuthException("Only Choreographer Core System is allowed to call this endpoint", HttpStatus.UNAUTHORIZED.value()); + } + } +} diff --git a/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/security/ExecutorSecurityConfig.java b/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/security/ExecutorSecurityConfig.java new file mode 100644 index 0000000..b7d918f --- /dev/null +++ b/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/security/ExecutorSecurityConfig.java @@ -0,0 +1,12 @@ +package eu.arrowhead.application.skeleton.executor.security; + +import org.springframework.context.annotation.Configuration; +import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity; + +import ai.aitia.arrowhead.application.library.config.DefaultSecurityConfig; + +@Configuration +@EnableWebSecurity +public class ExecutorSecurityConfig extends DefaultSecurityConfig { + +} diff --git a/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/service/ExecutorDriver.java b/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/service/ExecutorDriver.java new file mode 100644 index 0000000..2b1dc4c --- /dev/null +++ b/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/service/ExecutorDriver.java @@ -0,0 +1,54 @@ +package eu.arrowhead.application.skeleton.executor.service; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpMethod; +import org.springframework.stereotype.Service; +import org.springframework.util.Assert; + +import ai.aitia.arrowhead.application.library.ArrowheadService; +import ai.aitia.arrowhead.application.library.util.CoreServiceUri; +import eu.arrowhead.common.CommonConstants; +import eu.arrowhead.common.SSLProperties; +import eu.arrowhead.common.core.CoreSystemService; +import eu.arrowhead.common.dto.shared.ChoreographerExecutedStepResultDTO; +import eu.arrowhead.common.dto.shared.ChoreographerExecutedStepStatus; + +@Service +public class ExecutorDriver { + + //================================================================================================= + // members + + @Autowired + private ArrowheadService arrowheadService; + + @Autowired + private SSLProperties sslProperties; + + //================================================================================================= + // methods + + //------------------------------------------------------------------------------------------------- + public void notifyChoreographer(final long sessionId, final long sessionStepId, final ChoreographerExecutedStepStatus status, + final String message, final String exception) { + Assert.notNull(status, "ChoreographerExecutedStepStatus is null"); + + final ChoreographerExecutedStepResultDTO dto = new ChoreographerExecutedStepResultDTO(); + dto.setSessionId(sessionId); + dto.setSessionStepId(sessionStepId); + dto.setStatus(status); + dto.setMessage(message); + dto.setException(exception); + + final CoreServiceUri uri = arrowheadService.getCoreServiceUri(CoreSystemService.CHOREOGRAPHER_SERVICE); + arrowheadService.consumeServiceHTTP(Void.class, HttpMethod.POST, uri.getAddress(), uri.getPort(), uri.getPath(), getCoreSystemInterface(), null, dto, new String[0]); + } + + //================================================================================================= + // methods + + //------------------------------------------------------------------------------------------------- + private String getCoreSystemInterface() { + return sslProperties.isSslEnabled() ? CommonConstants.HTTP_SECURE_JSON : CommonConstants.HTTP_INSECURE_JSON; + } +} diff --git a/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/service/ExecutorService.java b/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/service/ExecutorService.java new file mode 100644 index 0000000..1db5a8f --- /dev/null +++ b/application-skeleton-executor/src/main/java/eu/arrowhead/application/skeleton/executor/service/ExecutorService.java @@ -0,0 +1,78 @@ +package eu.arrowhead.application.skeleton.executor.service; + +import org.apache.http.HttpStatus; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import eu.arrowhead.application.skeleton.executor.execution.ExecutionBoard; +import eu.arrowhead.common.Utilities; +import eu.arrowhead.common.dto.shared.ChoreographerAbortStepRequestDTO; +import eu.arrowhead.common.dto.shared.ChoreographerExecuteStepRequestDTO; +import eu.arrowhead.common.dto.shared.ChoreographerExecutorServiceInfoRequestDTO; +import eu.arrowhead.common.dto.shared.ChoreographerExecutorServiceInfoResponseDTO; +import eu.arrowhead.common.exception.BadPayloadException; + +@Service +public class ExecutorService { + + //================================================================================================= + // members + + @Autowired + private ExecutionBoard executionBoard; + + //================================================================================================= + // methods + + //------------------------------------------------------------------------------------------------- + public void startExecution(final ChoreographerExecuteStepRequestDTO request) { + validateChoreographerExecuteStepRequestDTO(request); + executionBoard.newJob(request); + } + + //------------------------------------------------------------------------------------------------- + public void abortExecution(final ChoreographerAbortStepRequestDTO request) { + executionBoard.abortJob(request.getSessionId(), request.getSessionStepId()); + } + + //------------------------------------------------------------------------------------------------- + public ChoreographerExecutorServiceInfoResponseDTO collectServiceInfo(final ChoreographerExecutorServiceInfoRequestDTO request) { + validateChoreographerExecutorServiceInfoRequestDTO(request); + + final ChoreographerExecutorServiceInfoResponseDTO response = new ChoreographerExecutorServiceInfoResponseDTO(); + //TODO implement your logic here + return response; + } + + //================================================================================================= + // assistant methods + + //------------------------------------------------------------------------------------------------- + private void validateChoreographerExecuteStepRequestDTO(final ChoreographerExecuteStepRequestDTO dto) { + if (dto == null) { + throw new BadPayloadException("dto is null", HttpStatus.SC_BAD_REQUEST); + } + if (dto.getMainOrchestrationResult() == null) { + throw new BadPayloadException("mainOrchestrationResult is null", HttpStatus.SC_BAD_REQUEST); + } + } + + //------------------------------------------------------------------------------------------------- + private void validateChoreographerExecutorServiceInfoRequestDTO(final ChoreographerExecutorServiceInfoRequestDTO dto) { + if (dto == null) { + throw new BadPayloadException("dto is null", HttpStatus.SC_BAD_REQUEST); + } + if (Utilities.isEmpty(dto.getServiceDefinition())) { + throw new BadPayloadException("serviceDefinition is empty", HttpStatus.SC_BAD_REQUEST); + } + if (dto.getMinVersion() == null) { + throw new BadPayloadException("minVersion is empty", HttpStatus.SC_BAD_REQUEST); + } + if (dto.getMaxVersion() == null) { + throw new BadPayloadException("maxVersion is empty", HttpStatus.SC_BAD_REQUEST); + } + if (dto.getMinVersion() > dto.getMaxVersion()) { + throw new BadPayloadException("minVersion cannot be greater than maxVersion.", HttpStatus.SC_BAD_REQUEST); + } + } +} diff --git a/application-skeleton-executor/src/main/resources/application.properties b/application-skeleton-executor/src/main/resources/application.properties new file mode 100644 index 0000000..d3e3561 --- /dev/null +++ b/application-skeleton-executor/src/main/resources/application.properties @@ -0,0 +1,40 @@ +############################################ +### CUSTOM PARAMETERS ### +############################################ + +# Name of the client system +application_system_name=EXECUTORSKELETON + +# Client web-server parameters +server.address=127.0.0.1 +server.port=8999 + +# Client application-server paramters +container.max_keep_alive_requests=500 + +# Service Registry Core System web-server parameters +sr_address=127.0.0.1 +sr_port=8443 + +# Execution parameters +#------------------------------ +thread.num.execution-worker=1 + +############################################ +### SECURE MODE ### +############################################ + +# configure secure mode + +# Set this to false to disable https mode +server.ssl.enabled=true + +server.ssl.key-store-type=PKCS12 +server.ssl.key-store=classpath:certificates/executorskeleton.p12 +server.ssl.key-store-password=123456 +server.ssl.key-alias=executorskeleton +server.ssl.key-password=123456 +server.ssl.client-auth=need +server.ssl.trust-store-type=PKCS12 +server.ssl.trust-store=classpath:certificates/truststore.p12 +server.ssl.trust-store-password=123456 \ No newline at end of file diff --git a/application-skeleton-executor/src/main/resources/certificates/executorskeleton.p12 b/application-skeleton-executor/src/main/resources/certificates/executorskeleton.p12 new file mode 100644 index 0000000..fcdef1d Binary files /dev/null and b/application-skeleton-executor/src/main/resources/certificates/executorskeleton.p12 differ diff --git a/application-skeleton-executor/src/main/resources/certificates/truststore.p12 b/application-skeleton-executor/src/main/resources/certificates/truststore.p12 new file mode 100644 index 0000000..6da7cd2 Binary files /dev/null and b/application-skeleton-executor/src/main/resources/certificates/truststore.p12 differ diff --git a/pom.xml b/pom.xml index 01304cb..7b58fc0 100644 --- a/pom.xml +++ b/pom.xml @@ -28,13 +28,27 @@ application-skeleton-provider application-skeleton-subscriber application-skeleton-publisher - + application-skeleton-executor + + + + + + ossrh + https://s01.oss.sonatype.org/content/repositories/aiaitia-1022 + + ai.aitia arrowhead-application-library-java-spring 4.4.0.0 + + + ai.aitia + arrowhead-core-common-essentials-java-spring + 4.4.0.3