Skip to content
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import ai.aitia.arrowhead.application.library.ArrowheadService;
import ai.aitia.arrowhead.application.library.config.ApplicationInitListener;
import eu.arrowhead.application.skeleton.executor.execution.ExecutionManager;
import eu.arrowhead.common.core.CoreSystem;

@Component
Expand All @@ -19,6 +20,9 @@ public class ExecutorApplicationInitListener extends ApplicationInitListener {
@Autowired
private ArrowheadService arrowheadService;

@Autowired
private ExecutionManager executionManager;

private final Logger logger = LogManager.getLogger(ExecutorApplicationInitListener.class);

//=================================================================================================
Expand All @@ -34,12 +38,19 @@ protected void customInit(final ContextRefreshedEvent event) {
//Initialize Arrowhead Context
arrowheadService.updateCoreServiceURIs(CoreSystem.CHOREOGRAPHER);

//Start Executor Manager
executionManager.start();

//TODO: implement here any custom behavior on application start up
}

//-------------------------------------------------------------------------------------------------
@Override
public void customDestroy() {

//Stop Executor Manager
executionManager.interrupt();

//TODO: implement here any custom behavior on application shout down
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package eu.arrowhead.application.skeleton.executor;

public class ExecutorConstants {

//=================================================================================================
// members

public static final String THREAD_NUM_EXECUTION_WORKER = "thread.num.execution-worker";
public static final String $THREAD_NUM_EXECUTION_WORKER_WD = "${" + THREAD_NUM_EXECUTION_WORKER + ":1}";

//=================================================================================================
// assistant methods

//-------------------------------------------------------------------------------------------------
private ExecutorConstants() {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import eu.arrowhead.common.dto.shared.ChoreographerExecutorServiceInfoResponseDTO;

@RestController
//@RequestMapping("/executor") // TODO: specify the base URI here
public class ExecutorController {

//=================================================================================================
Expand All @@ -36,13 +37,13 @@ public String echo() {
}

//-------------------------------------------------------------------------------------------------
@PostMapping(path = CommonConstants.CHOREOGRAPHER_EXECUTOR_CLIENT_SERVICE_START_URI, produces = MediaType.APPLICATION_JSON_VALUE)
@PostMapping(path = CommonConstants.CHOREOGRAPHER_EXECUTOR_CLIENT_SERVICE_START_URI, consumes = MediaType.APPLICATION_JSON_VALUE)
public void start(@RequestBody final ChoreographerExecuteStepRequestDTO request) {
executorService.startExecution(request);
}

//-------------------------------------------------------------------------------------------------
@PostMapping(path = CommonConstants.CHOREOGRAPHER_EXECUTOR_CLIENT_SERVICE_ABORT_URI, produces = MediaType.APPLICATION_JSON_VALUE)
@PostMapping(path = CommonConstants.CHOREOGRAPHER_EXECUTOR_CLIENT_SERVICE_ABORT_URI, consumes = MediaType.APPLICATION_JSON_VALUE)
public void abort(@RequestBody final ChoreographerAbortStepRequestDTO request) {
executorService.abortExecution(request);
}
Expand All @@ -54,5 +55,5 @@ public void abort(@RequestBody final ChoreographerAbortStepRequestDTO request) {
}

//-------------------------------------------------------------------------------------------------
//TODO: implement here your provider related REST end points
//TODO: implement here your executor related REST end points
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package eu.arrowhead.application.skeleton.executor.execution;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;

import org.springframework.stereotype.Component;
import org.springframework.util.Assert;

import eu.arrowhead.common.dto.shared.ChoreographerExecuteStepRequestDTO;

@Component
public class ExecutionBoard {

//=================================================================================================
// members

private final Map<String,Job> board = new ConcurrentHashMap<>();
private final BlockingQueue<Job> queue = new LinkedBlockingQueue<>();

private final Object lock = new Object();

//=================================================================================================
// methods

//-------------------------------------------------------------------------------------------------
public Job newJob(final ChoreographerExecuteStepRequestDTO jobRequest) {
Assert.notNull(jobRequest, "jobRequest is null");

synchronized (lock) {
final Job job = new Job(jobRequest, ExecutionSignal.DO);
board.put(getUinqueIdentifier(jobRequest), job);
queue.add(job);
return job;
}
}

//-------------------------------------------------------------------------------------------------
public Optional<Job> peekJob(final long sessionId, final long sessionStepId) {
final Job job = board.get(getUinqueIdentifier(sessionId, sessionStepId));
if (job != null) {
return Optional.of(job);
}
return Optional.empty();
}

//-------------------------------------------------------------------------------------------------
public Job nextJob() throws InterruptedException {
return queue.take();
}

//-------------------------------------------------------------------------------------------------
public ExecutionSignal getJobExecutionSignal(final long sessionId, final long sessionStepId) {
synchronized (lock) {
final Job job = board.get(getUinqueIdentifier(sessionId, sessionStepId));
if (job != null) {
return job.getExecutionSignal();
}
return ExecutionSignal.UNKNOWN;
}
}

//-------------------------------------------------------------------------------------------------
public void abortJob(final long sessionId, final long sessionStepId) {
synchronized (lock) {
final Optional<Job> optional = peekJob(sessionId, sessionStepId);
if (optional.isPresent()) {
optional.get().setExecutionSignal(ExecutionSignal.ABORT);
}
}
}

//-------------------------------------------------------------------------------------------------
public void removeJob(final long sessionId, final long sessionStepId) {
synchronized (lock) {
final Job job = board.remove(getUinqueIdentifier(sessionId, sessionStepId));
if (job != null) {
queue.remove(job);
}
}
}

//=================================================================================================
// assistant methods

//-------------------------------------------------------------------------------------------------
private String getUinqueIdentifier(final ChoreographerExecuteStepRequestDTO request) {
return getUinqueIdentifier(request.getSessionId(), request.getSessionStepId());
}

//-------------------------------------------------------------------------------------------------
private String getUinqueIdentifier(final long sessionId, final long sessionStepId) {
return sessionId + "-" + sessionStepId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package eu.arrowhead.application.skeleton.executor.execution;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Function;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import eu.arrowhead.application.skeleton.executor.ExecutorConstants;

@Component
public class ExecutionManager extends Thread {

//=================================================================================================
// members

@Autowired
private ExecutionBoard board;

@Autowired
private Function<Job,Runnable> workerFactory;

private ThreadPoolExecutor threadPool;

@Value(ExecutorConstants.$THREAD_NUM_EXECUTION_WORKER_WD)
private int threadNum;

private boolean doWork = true;

//=================================================================================================
// methods

//-------------------------------------------------------------------------------------------------
@Override
public void run() {

threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(threadNum);

while (doWork) {
try {
final Job job = board.nextJob();
if (job.getExecutionSignal() == ExecutionSignal.ABORT) {
board.removeJob(job.getJobRequest().getSessionId(), job.getJobRequest().getSessionStepId());

} else {
threadPool.execute(workerFactory.apply(job));
}

} catch (final InterruptedException ex) {
interrupt();
}
}
}

//-------------------------------------------------------------------------------------------------
@Override
public void interrupt() {
doWork = false;
super.interrupt();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package eu.arrowhead.application.skeleton.executor.execution;

public enum ExecutionSignal {

DO, ABORT, UNKNOWN;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package eu.arrowhead.application.skeleton.executor.execution;

import eu.arrowhead.common.dto.shared.ChoreographerExecuteStepRequestDTO;

public class Job {

//=================================================================================================
// members

private final ChoreographerExecuteStepRequestDTO jobRequest;
private ExecutionSignal executionSignal;

//=================================================================================================
// methods

//-------------------------------------------------------------------------------------------------
public Job(final ChoreographerExecuteStepRequestDTO jobRequest, final ExecutionSignal executionSignal) {
this.jobRequest = jobRequest;
this.executionSignal = executionSignal;
}

//-------------------------------------------------------------------------------------------------
public ExecutionSignal getExecutionSignal() { return executionSignal; }
public ChoreographerExecuteStepRequestDTO getJobRequest() { return jobRequest; }

//-------------------------------------------------------------------------------------------------
public void setExecutionSignal(final ExecutionSignal executionSignal) { this.executionSignal = executionSignal; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package eu.arrowhead.application.skeleton.executor.execution.worker;

import java.util.function.Function;

import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

import eu.arrowhead.application.skeleton.executor.execution.Job;

@Configuration
public class ExecutionWorkerFactoryConfig {

//-------------------------------------------------------------------------------------------------
@Bean
public Function<Job,Runnable> executionWorkerFactory() {
return job -> createExecutionWorker(job);
}

//-------------------------------------------------------------------------------------------------
@Bean
@Scope(BeanDefinition.SCOPE_PROTOTYPE)
public Runnable createExecutionWorker(final Job job) {
final String serviceDefinition = job.getJobRequest().getMainOrchestrationResult().getService().getServiceDefinition();

switch (serviceDefinition) {
//TODO initiate here your execution workers
// case "your-main-service-A":
// return new YourMainServiceAExecutionWorker(job);

// case "your-main-service-B":
// return new YourMainServiceBExecutionWorker(job);

default:
return new UnkownServiceExecutionWorker(job);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package eu.arrowhead.application.skeleton.executor.execution.worker;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import eu.arrowhead.application.skeleton.executor.execution.Job;

public class UnkownServiceExecutionWorker implements Runnable {

//=================================================================================================
// members

private final Job job;

private final Logger logger = LogManager.getLogger(UnkownServiceExecutionWorker.class);

//=================================================================================================
// methods

//-------------------------------------------------------------------------------------------------
public UnkownServiceExecutionWorker(Job job) {
this.job = job;
}

//-------------------------------------------------------------------------------------------------
@Override
public void run() {
logger.error("Unkown service execution request: sessionId={}, sessionStepId={}, service={}",
job.getJobRequest().getSessionId(),
job.getJobRequest().getSessionStepId(),
job.getJobRequest().getMainOrchestrationResult().getService().getServiceDefinition());
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package eu.arrowhead.application.skeleton.executor.service;

import org.apache.http.HttpStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import eu.arrowhead.application.skeleton.executor.execution.ExecutionBoard;
import eu.arrowhead.common.Utilities;
import eu.arrowhead.common.dto.shared.ChoreographerAbortStepRequestDTO;
import eu.arrowhead.common.dto.shared.ChoreographerExecuteStepRequestDTO;
Expand All @@ -13,18 +15,24 @@
@Service
public class ExecutorService {

//=================================================================================================
// members

@Autowired
private ExecutionBoard executionBoard;

//=================================================================================================
// methods

//-------------------------------------------------------------------------------------------------
public void startExecution(final ChoreographerExecuteStepRequestDTO request) {
validateChoreographerExecuteStepRequestDTO(request);
//TODO implement your logic here
executionBoard.newJob(request);
}

//-------------------------------------------------------------------------------------------------
public void abortExecution(final ChoreographerAbortStepRequestDTO request) {
//TODO implement your logic here
executionBoard.abortJob(request.getSessionId(), request.getSessionStepId());
}

//-------------------------------------------------------------------------------------------------
Expand Down
Loading