Skip to content

Commit 61bc6bb

Browse files
authored
Merge pull request #68 from Aitia-IIOT/tb/orch-mqtt
Dynamic Service Orchestration MQTT API
2 parents 636a0e7 + 2b4ccc8 commit 61bc6bb

File tree

10 files changed

+483
-5
lines changed

10 files changed

+483
-5
lines changed

serviceorchestration-dynamic/src/main/java/eu/arrowhead/serviceorchestration/DynamicServiceOrchestrationConstants.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ public final class DynamicServiceOrchestrationConstants {
3737
public static final String MQTT_API_BASE_TOPIC_PREFIX = "arrowhead/serviceorchestration";
3838
public static final String MQTT_API_MONITOR_BASE_TOPIC = MQTT_API_BASE_TOPIC_PREFIX + "/monitor/";
3939
public static final String MQTT_API_GENERAL_MANAGEMENT_BASE_TOPIC = MQTT_API_BASE_TOPIC_PREFIX + "/general/management/";
40+
public static final String MQTT_API_ORCHESTRATION_BASE_TOPIC = MQTT_API_BASE_TOPIC_PREFIX + "/orchestration/";
41+
public static final String MQTT_API_ORCHESTRATION_MANAGEMENT_PREFIX = MQTT_API_ORCHESTRATION_BASE_TOPIC + "management";
42+
public static final String MQTT_API_ORCHESTRATION_PUSH_MANAGEMENT_BASE_TOPIC = MQTT_API_ORCHESTRATION_MANAGEMENT_PREFIX + "/push/";
43+
public static final String MQTT_API_ORCHESTRATION_HISTORY_MANAGEMENT_BASE_TOPIC = MQTT_API_ORCHESTRATION_MANAGEMENT_PREFIX + "/history/";
44+
public static final String MQTT_API_ORCHESTRATION_LOCK_MANAGEMENT_BASE_TOPIC = MQTT_API_ORCHESTRATION_MANAGEMENT_PREFIX + "/lock/";
4045

4146
public static final String VERSION_MONITOR = "1.0.0";
4247
public static final String VERSION_GENERAL_MANAGEMENT = "1.0.0";

serviceorchestration-dynamic/src/main/java/eu/arrowhead/serviceorchestration/DynamicServiceOrchestrationSystemInfo.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,25 +90,30 @@ public List<ServiceModel> getServices() {
9090
.serviceDefinition(Constants.SERVICE_DEF_ORCHESTRATION)
9191
.version(DynamicServiceOrchestrationConstants.VERSION_ORCHESTRATION)
9292
.metadata(DynamicServiceOrchestrationConstants.METADATA_KEY_ORCHESTRATION_STRATEGY, DynamicServiceOrchestrationConstants.METADATA_VALUE_ORCHESTRATION_STRATEGY)
93+
.metadata(Constants.METADATA_KEY_UNRESTRICTED_DISCOVERY, true)
9394
.serviceInterface(getHttpServiceInterfaceForOrchestration())
95+
.serviceInterface(getMqttServiceInterfaceForOrchestration())
9496
.build();
9597

9698
final ServiceModel orchestrationPushManagement = new ServiceModel.Builder()
9799
.serviceDefinition(Constants.SERVICE_DEF_ORCHESTRATION_PUSH_MANAGEMENT)
98100
.version(DynamicServiceOrchestrationConstants.VERSION_ORCHESTRATION_PUSH_MANAGEMENT)
99101
.serviceInterface(getHttpServiceInterfaceForOrchestrationPushManagement())
102+
.serviceInterface(getMqttServiceInterfaceForOrchestrationPushManagement())
100103
.build();
101104

102105
final ServiceModel orchestrationLockManagement = new ServiceModel.Builder()
103106
.serviceDefinition(Constants.SERVICE_DEF_ORCHESTRATION_LOCK_MANAGEMENT)
104107
.version(DynamicServiceOrchestrationConstants.VERSION_ORCHESTRATION_PUSH_MANAGEMENT)
105108
.serviceInterface(getHttpServiceInterfaceForOrchestrationLockManagement())
109+
.serviceInterface(getMqttServiceInterfaceForOrchestrationLockManagement())
106110
.build();
107111

108112
final ServiceModel orchestrationHistoryManagement = new ServiceModel.Builder()
109113
.serviceDefinition(Constants.SERVICE_DEF_ORCHESTRATION_HISTORY_MANAGEMENT)
110114
.version(DynamicServiceOrchestrationConstants.VERSION_ORCHESTRATION_HISTORY_MANAGEMENT)
111115
.serviceInterface(getHttpServiceInterfaceForOrchestrationHistoryManagement())
116+
.serviceInterface(getMqttServiceInterfaceForOrchestrationHistoryManagement())
112117
.build();
113118

114119
return List.of(generalManagement, orchestration, orchestrationPushManagement, orchestrationLockManagement, orchestrationHistoryManagement);
@@ -351,4 +356,56 @@ private InterfaceModel getMqttServiceInterfaceForGeneralManagement() {
351356
.operations(Set.of(Constants.SERVICE_OP_GET_LOG, Constants.SERVICE_OP_GET_CONFIG))
352357
.build();
353358
}
359+
360+
//-------------------------------------------------------------------------------------------------
361+
private InterfaceModel getMqttServiceInterfaceForOrchestration() {
362+
if (!isMqttApiEnabled()) {
363+
return null;
364+
}
365+
366+
final String templateName = getSslProperties().isSslEnabled() ? Constants.GENERIC_MQTTS_INTERFACE_TEMPLATE_NAME : Constants.GENERIC_MQTT_INTERFACE_TEMPLATE_NAME;
367+
return new MqttInterfaceModel.Builder(templateName, getMqttBrokerAddress(), getMqttBrokerPort())
368+
.baseTopic(DynamicServiceOrchestrationConstants.MQTT_API_ORCHESTRATION_BASE_TOPIC)
369+
.operations(Set.of(Constants.SERVICE_OP_ORCHESTRATION_PULL, Constants.SERVICE_OP_ORCHESTRATION_SUBSCRIBE, Constants.SERVICE_OP_ORCHESTRATION_UNSUBSCRIBE))
370+
.build();
371+
}
372+
373+
//-------------------------------------------------------------------------------------------------
374+
private InterfaceModel getMqttServiceInterfaceForOrchestrationPushManagement() {
375+
if (!isMqttApiEnabled()) {
376+
return null;
377+
}
378+
379+
final String templateName = getSslProperties().isSslEnabled() ? Constants.GENERIC_MQTTS_INTERFACE_TEMPLATE_NAME : Constants.GENERIC_MQTT_INTERFACE_TEMPLATE_NAME;
380+
return new MqttInterfaceModel.Builder(templateName, getMqttBrokerAddress(), getMqttBrokerPort())
381+
.baseTopic(DynamicServiceOrchestrationConstants.MQTT_API_ORCHESTRATION_PUSH_MANAGEMENT_BASE_TOPIC)
382+
.operations(Set.of(Constants.SERVICE_OP_ORCHESTRATION_SUBSCRIBE, Constants.SERVICE_OP_ORCHESTRATION_TRIGGER, Constants.SERVICE_OP_ORCHESTRATION_UNSUBSCRIBE, Constants.SERVICE_OP_ORCHESTRATION_QUERY))
383+
.build();
384+
}
385+
386+
//-------------------------------------------------------------------------------------------------
387+
private InterfaceModel getMqttServiceInterfaceForOrchestrationLockManagement() {
388+
if (!isMqttApiEnabled()) {
389+
return null;
390+
}
391+
392+
final String templateName = getSslProperties().isSslEnabled() ? Constants.GENERIC_MQTTS_INTERFACE_TEMPLATE_NAME : Constants.GENERIC_MQTT_INTERFACE_TEMPLATE_NAME;
393+
return new MqttInterfaceModel.Builder(templateName, getMqttBrokerAddress(), getMqttBrokerPort())
394+
.baseTopic(DynamicServiceOrchestrationConstants.MQTT_API_ORCHESTRATION_LOCK_MANAGEMENT_BASE_TOPIC)
395+
.operations(Set.of(Constants.SERVICE_OP_ORCHESTRATION_CREATE, Constants.SERVICE_OP_ORCHESTRATION_QUERY, Constants.SERVICE_OP_ORCHESTRATION_REMOVE))
396+
.build();
397+
}
398+
399+
//-------------------------------------------------------------------------------------------------
400+
private InterfaceModel getMqttServiceInterfaceForOrchestrationHistoryManagement() {
401+
if (!isMqttApiEnabled()) {
402+
return null;
403+
}
404+
405+
final String templateName = getSslProperties().isSslEnabled() ? Constants.GENERIC_MQTTS_INTERFACE_TEMPLATE_NAME : Constants.GENERIC_MQTT_INTERFACE_TEMPLATE_NAME;
406+
return new MqttInterfaceModel.Builder(templateName, getMqttBrokerAddress(), getMqttBrokerPort())
407+
.baseTopic(DynamicServiceOrchestrationConstants.MQTT_API_ORCHESTRATION_HISTORY_MANAGEMENT_BASE_TOPIC)
408+
.operations(Set.of(Constants.SERVICE_OP_ORCHESTRATION_QUERY))
409+
.build();
410+
}
354411
}

serviceorchestration-dynamic/src/main/java/eu/arrowhead/serviceorchestration/api/http/OrchestrationAPI.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.springframework.web.bind.annotation.PostMapping;
1414
import org.springframework.web.bind.annotation.RequestBody;
1515
import org.springframework.web.bind.annotation.RequestMapping;
16+
import org.springframework.web.bind.annotation.RequestParam;
1617
import org.springframework.web.bind.annotation.ResponseBody;
1718
import org.springframework.web.bind.annotation.RestController;
1819

@@ -25,6 +26,7 @@
2526
import eu.arrowhead.serviceorchestration.api.http.utils.SystemNamePreprocessor;
2627
import eu.arrowhead.serviceorchestration.service.OrchestrationService;
2728
import io.swagger.v3.oas.annotations.Operation;
29+
import io.swagger.v3.oas.annotations.Parameter;
2830
import io.swagger.v3.oas.annotations.media.Content;
2931
import io.swagger.v3.oas.annotations.media.Schema;
3032
import io.swagger.v3.oas.annotations.responses.ApiResponse;
@@ -92,13 +94,16 @@ public class OrchestrationAPI {
9294
@Content(mediaType = MediaType.APPLICATION_JSON_VALUE, schema = @Schema(implementation = ErrorMessageDTO.class)) })
9395
})
9496
@PostMapping(path = DynamicServiceOrchestrationConstants.HTTP_API_OP_PUSH_SUBSCRIBE_PATH, consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.TEXT_PLAIN_VALUE)
95-
public ResponseEntity<String> pushSubscribe(final HttpServletRequest httpServletRequest, @RequestBody final OrchestrationSubscriptionRequestDTO dto) {
97+
public ResponseEntity<String> pushSubscribe(
98+
final HttpServletRequest httpServletRequest,
99+
@RequestBody final OrchestrationSubscriptionRequestDTO dto,
100+
@Parameter(name = "trigger", description = "Set to true in order to initiate a push orchestration after the successful subscription.") @RequestParam(required = false, defaultValue = "false") final Boolean trigger) {
96101
logger.debug("pushSubscribe started...");
97102

98103
final String origin = HttpMethod.POST.name() + " " + DynamicServiceOrchestrationConstants.HTTP_API_ORCHESTRATION_PATH + DynamicServiceOrchestrationConstants.HTTP_API_OP_PUSH_SUBSCRIBE_PATH;
99104

100105
final String requesterSystem = sysNamePreprocessor.process(httpServletRequest, origin);
101-
final Pair<Boolean, String> result = orchService.pushSubscribe(requesterSystem, dto, origin);
106+
final Pair<Boolean, String> result = orchService.pushSubscribe(requesterSystem, dto, trigger, origin);
102107
return new ResponseEntity<String>(result.getRight(), result.getLeft() ? HttpStatus.CREATED : HttpStatus.OK);
103108
}
104109

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package eu.arrowhead.serviceorchestration.api.mqtt;
2+
3+
import java.security.InvalidParameterException;
4+
5+
import org.apache.logging.log4j.LogManager;
6+
import org.apache.logging.log4j.Logger;
7+
import org.springframework.beans.factory.annotation.Autowired;
8+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
9+
import org.springframework.stereotype.Service;
10+
import org.springframework.util.Assert;
11+
12+
import eu.arrowhead.common.Constants;
13+
import eu.arrowhead.common.exception.ArrowheadException;
14+
import eu.arrowhead.common.mqtt.MqttStatus;
15+
import eu.arrowhead.common.mqtt.handler.MqttTopicHandler;
16+
import eu.arrowhead.common.mqtt.model.MqttRequestModel;
17+
import eu.arrowhead.dto.OrchestrationHistoryQueryRequestDTO;
18+
import eu.arrowhead.dto.OrchestrationHistoryResponseDTO;
19+
import eu.arrowhead.serviceorchestration.DynamicServiceOrchestrationConstants;
20+
import eu.arrowhead.serviceorchestration.service.OrchestrationHistoryManagementService;
21+
22+
@Service
23+
@ConditionalOnProperty(name = Constants.MQTT_API_ENABLED, matchIfMissing = false)
24+
public class OrchestrationHistoryManagementMqttHandler extends MqttTopicHandler {
25+
26+
//=================================================================================================
27+
// members
28+
29+
@Autowired
30+
private OrchestrationHistoryManagementService historyMgmtService;
31+
32+
private final Logger logger = LogManager.getLogger(getClass());
33+
34+
//=================================================================================================
35+
// methods
36+
37+
//-------------------------------------------------------------------------------------------------
38+
@Override
39+
public String baseTopic() {
40+
return DynamicServiceOrchestrationConstants.MQTT_API_ORCHESTRATION_HISTORY_MANAGEMENT_BASE_TOPIC;
41+
}
42+
43+
//-------------------------------------------------------------------------------------------------
44+
@Override
45+
public void handle(final MqttRequestModel request) throws ArrowheadException {
46+
logger.debug("OrchestrationHistoryManagementMqttHandler.handle started");
47+
Assert.isTrue(request.getBaseTopic().equals(baseTopic()), "MQTT topic-handler mismatch");
48+
49+
final MqttStatus responseStatus = MqttStatus.OK;
50+
Object responsePayload = null;
51+
52+
switch (request.getOperation()) {
53+
case Constants.SERVICE_OP_ORCHESTRATION_QUERY:
54+
final OrchestrationHistoryQueryRequestDTO queryReqDTO = readPayload(request.getPayload(), OrchestrationHistoryQueryRequestDTO.class);
55+
responsePayload = query(queryReqDTO);
56+
break;
57+
58+
default:
59+
throw new InvalidParameterException("Unknown operation: " + request.getOperation());
60+
}
61+
62+
successResponse(request, responseStatus, responsePayload);
63+
}
64+
65+
//=================================================================================================
66+
// assistant methods
67+
68+
//-------------------------------------------------------------------------------------------------
69+
private OrchestrationHistoryResponseDTO query(final OrchestrationHistoryQueryRequestDTO dto) {
70+
logger.debug("OrchestrationHistoryManagementMqttHandler.query started");
71+
return historyMgmtService.query(dto, baseTopic() + Constants.SERVICE_OP_ORCHESTRATION_QUERY);
72+
}
73+
74+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package eu.arrowhead.serviceorchestration.api.mqtt;
2+
3+
import java.security.InvalidParameterException;
4+
import java.util.List;
5+
6+
import org.apache.logging.log4j.LogManager;
7+
import org.apache.logging.log4j.Logger;
8+
import org.springframework.beans.factory.annotation.Autowired;
9+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
10+
import org.springframework.stereotype.Service;
11+
import org.springframework.util.Assert;
12+
13+
import com.fasterxml.jackson.core.type.TypeReference;
14+
15+
import eu.arrowhead.common.Constants;
16+
import eu.arrowhead.common.exception.ArrowheadException;
17+
import eu.arrowhead.common.mqtt.MqttStatus;
18+
import eu.arrowhead.common.mqtt.handler.MqttTopicHandler;
19+
import eu.arrowhead.common.mqtt.model.MqttRequestModel;
20+
import eu.arrowhead.dto.OrchestrationLockListRequestDTO;
21+
import eu.arrowhead.dto.OrchestrationLockListResponseDTO;
22+
import eu.arrowhead.dto.OrchestrationLockQueryRequestDTO;
23+
import eu.arrowhead.serviceorchestration.DynamicServiceOrchestrationConstants;
24+
import eu.arrowhead.serviceorchestration.service.OrchestrationLockManagementService;
25+
26+
@Service
27+
@ConditionalOnProperty(name = Constants.MQTT_API_ENABLED, matchIfMissing = false)
28+
public class OrchestrationLockManagementMqttHandler extends MqttTopicHandler {
29+
30+
//=================================================================================================
31+
// members
32+
33+
@Autowired
34+
private OrchestrationLockManagementService lockMgmtService;
35+
36+
private final Logger logger = LogManager.getLogger(getClass());
37+
38+
//=================================================================================================
39+
// methods
40+
41+
//-------------------------------------------------------------------------------------------------
42+
@Override
43+
public String baseTopic() {
44+
return DynamicServiceOrchestrationConstants.MQTT_API_ORCHESTRATION_LOCK_MANAGEMENT_BASE_TOPIC;
45+
}
46+
47+
//-------------------------------------------------------------------------------------------------
48+
@Override
49+
public void handle(final MqttRequestModel request) throws ArrowheadException {
50+
logger.debug("OrchestrationLockManagementMqttHandler.handle started");
51+
Assert.isTrue(request.getBaseTopic().equals(baseTopic()), "MQTT topic-handler mismatch");
52+
53+
MqttStatus responseStatus = MqttStatus.OK;
54+
Object responsePayload = null;
55+
56+
switch (request.getOperation()) {
57+
case Constants.SERVICE_OP_ORCHESTRATION_CREATE:
58+
final OrchestrationLockListRequestDTO createReqDTO = readPayload(request.getPayload(), OrchestrationLockListRequestDTO.class);
59+
responsePayload = create(createReqDTO);
60+
responseStatus = MqttStatus.CREATED;
61+
break;
62+
63+
case Constants.SERVICE_OP_ORCHESTRATION_QUERY:
64+
final OrchestrationLockQueryRequestDTO queryReqDTO = readPayload(request.getPayload(), OrchestrationLockQueryRequestDTO.class);
65+
responsePayload = query(queryReqDTO);
66+
break;
67+
68+
case Constants.SERVICE_OP_ORCHESTRATION_REMOVE:
69+
final List<String> removeReqDTO = readPayload(request.getPayload(), new TypeReference<List<String>>() {
70+
});
71+
remove(request.getRequester(), removeReqDTO);
72+
break;
73+
74+
default:
75+
throw new InvalidParameterException("Unknown operation: " + request.getOperation());
76+
}
77+
78+
successResponse(request, responseStatus, responsePayload);
79+
80+
}
81+
82+
//=================================================================================================
83+
// assistant methods
84+
85+
//-------------------------------------------------------------------------------------------------
86+
private OrchestrationLockListResponseDTO create(final OrchestrationLockListRequestDTO dto) {
87+
logger.debug("OrchestrationLockManagementMqttHandler.create started");
88+
return lockMgmtService.create(dto, baseTopic() + Constants.SERVICE_OP_ORCHESTRATION_CREATE);
89+
}
90+
91+
//-------------------------------------------------------------------------------------------------
92+
private OrchestrationLockListResponseDTO query(final OrchestrationLockQueryRequestDTO dto) {
93+
logger.debug("OrchestrationLockManagementMqttHandler.query started");
94+
return lockMgmtService.query(dto, baseTopic() + Constants.SERVICE_OP_ORCHESTRATION_QUERY);
95+
}
96+
97+
//-------------------------------------------------------------------------------------------------
98+
private void remove(final String requesterSystem, final List<String> instanceIds) {
99+
logger.debug("OrchestrationLockManagementMqttHandler.remove started");
100+
lockMgmtService.remove(requesterSystem, instanceIds, baseTopic() + Constants.SERVICE_OP_ORCHESTRATION_REMOVE);
101+
}
102+
}

0 commit comments

Comments
 (0)