Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ json-schema-inferrer = "0.2.1"
json-schema-validator = "1.5.8"
junit-jupiter = "5.13.2"
logback = "1.5.18"
milo = "1.0.0"
milo = "1.0.5"
mockito = "5.17.0"
mqtt-sn-codec = "838f51d691"
mssql="12.8.1.jre11"
Expand Down
3 changes: 2 additions & 1 deletion modules/hivemq-edge-module-opcua/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@ dependencies {
implementation(libs.milo.encoding.json)
implementation(libs.milo.encoding.xml)
implementation(libs.milo.client)
implementation(libs.milo.server)
implementation(libs.milo.dtd.reader)
implementation(libs.milo.dtd.manager)
errorprone(libs.errorprone)
}

dependencies {
testImplementation(libs.milo.server)

testImplementation("com.hivemq:hivemq-edge")
testImplementation(libs.jackson.databind)
testImplementation(libs.hivemq.edge.adaptersdk)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,32 +28,26 @@
import com.hivemq.edge.adapters.opcua.config.tag.OpcuaTag;
import com.hivemq.edge.adapters.opcua.listeners.OpcUaServiceFaultListener;
import com.hivemq.edge.adapters.opcua.listeners.OpcUaSessionActivityListener;
import com.hivemq.edge.adapters.opcua.listeners.OpcUaSubscriptionListener;
import com.hivemq.edge.adapters.opcua.listeners.OpcUaSubscriptionLifecycleHandler;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.sdk.client.ServiceFaultListener;
import org.eclipse.milo.opcua.sdk.client.SessionActivityListener;
import org.eclipse.milo.opcua.sdk.client.subscriptions.OpcUaMonitoredItem;
import org.eclipse.milo.opcua.sdk.client.subscriptions.OpcUaSubscription;
import org.eclipse.milo.opcua.stack.core.UaException;
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
import org.eclipse.milo.opcua.stack.core.types.structured.TransferSubscriptionsResponse;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;

import static com.hivemq.edge.adapters.opcua.Constants.PROTOCOL_ID_OPCUA;
import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;

class OpcUaClientConnection {
public class OpcUaClientConnection {
private static final @NotNull Logger log = LoggerFactory.getLogger(OpcUaClientConnection.class);

private final @NotNull OpcUaSpecificAdapterConfig config;
Expand All @@ -65,8 +59,6 @@ class OpcUaClientConnection {
private final @NotNull ProtocolAdapterState protocolAdapterState;
private final @NotNull ProtocolAdapterMetricsService protocolAdapterMetricsService;

private final @NotNull AtomicReference<UInteger> lastKnownSubscriptionId;

private final @NotNull AtomicReference<ConnectionContext> context = new AtomicReference<>();

OpcUaClientConnection(
Expand All @@ -87,12 +79,10 @@ class OpcUaClientConnection {
this.adapterId = adapterId;
this.protocolAdapterState = protocolAdapterState;
this.tags = tags;
this.lastKnownSubscriptionId = lastSubscriptionId;
}

@NotNull synchronized boolean start(final ParsedConfig parsedConfig) {
final var subscriptionIdOptional = Optional.ofNullable(lastKnownSubscriptionId.get());
log.debug("Subscribing to OPC UA client with subscriptionId: {}", subscriptionIdOptional.orElse(null));
synchronized boolean start(final ParsedConfig parsedConfig) {
log.debug("Subscribing to OPC UA client");
final OpcUaClient client;
final var faultListener = new OpcUaServiceFaultListener(protocolAdapterMetricsService, eventService, adapterId);
final var activityListener = new OpcUaSessionActivityListener(protocolAdapterMetricsService, eventService, adapterId, protocolAdapterState);
Expand All @@ -117,7 +107,9 @@ class OpcUaClientConnection {
return false;
}

final var subscriptionOptional = subscribe(client, subscriptionIdOptional);
final var subscriptionLifecycleHandler = new OpcUaSubscriptionLifecycleHandler(protocolAdapterMetricsService, tagStreamingService, eventService, adapterId, tags, client, dataPointFactory, config);

final var subscriptionOptional = subscriptionLifecycleHandler.subscribe(client);

if(subscriptionOptional.isEmpty()) {
log.error("Failed to create or transfer OPC UA subscription. Closing client connection.");
Expand Down Expand Up @@ -167,156 +159,6 @@ void destroy() {
return Optional.empty();
}

/**
* Subscribes to the OPC UA client.
* If a subscription ID is provided, it attempts to transfer the subscription.
* If the transfer fails or no ID is provided, it creates a new subscription.
* It then synchronizes the tags and monitored items.
*
* @param client the OPC UA client
* @param subscriptionOptional an Optional containing the subscription ID if available
* @return an Optional containing the created or transferred subscription, or empty if failed
*/
private @NotNull Optional<OpcUaSubscription> subscribe(final @NotNull OpcUaClient client, final @NotNull Optional<UInteger> subscriptionOptional) {
return subscriptionOptional
.flatMap(subscriptionId -> transferSubscription(client, subscriptionId))
.or(() -> createNewSubscription(client))
.flatMap(subscription -> {
subscription.setPublishingInterval((double) config.getOpcuaToMqttConfig().publishingInterval());
subscription.setSubscriptionListener(new OpcUaSubscriptionListener(protocolAdapterMetricsService, tagStreamingService, eventService, adapterId, tags, client, dataPointFactory));
if(syncTagsAndMonitoredItems(subscription, tags, config)) {
return Optional.of(subscription);
} else {
return Optional.empty();
}
});
}

/**
* Creates a new OPC UA subscription.
* If the subscription is created successfully, it returns an Optional containing the subscription.
* If the subscription creation fails, it returns an empty Optional.
*
* @param client the OPC UA client
* @return an Optional containing the created subscription or empty if creation failed
*/
private @NotNull Optional<OpcUaSubscription> createNewSubscription(final @NotNull OpcUaClient client) {
log.debug("Creating new OPC UA subscription");
final OpcUaSubscription subscription = new OpcUaSubscription(client);
try {
subscription.create();
return subscription
.getSubscriptionId()
.map(subscriptionId -> {
log.trace("New subscription ID: {}", subscriptionId);
lastKnownSubscriptionId.set(subscriptionId);
return subscription;
})
.or(() -> {
log.error("Subscription not created on the server");
return Optional.empty();
});
} catch (final UaException e) {
log.error("Failed to create subscription", e);
}
return Optional.empty();
}

/**
* Transfers an existing subscription to the current client.
* If the subscription is not found, it will return an empty Optional.
*
* @param client the OPC UA client
* @param subscriptionId the subscription ID to transfer
* @return an Optional containing the transferred subscription or empty if not found
*/
private static @NotNull Optional<OpcUaSubscription> transferSubscription(final @NotNull OpcUaClient client, final @NotNull UInteger subscriptionId) {
log.debug("Transfer OPC UA subscription: {}", subscriptionId);
final TransferSubscriptionsResponse response;
try {
response = client.transferSubscriptions(List.of(subscriptionId), true);
} catch (final UaException e) {
log.debug("OPC UA subscription not transferred to connection", e);
return Optional.empty();
}

final var results = response.getResults();
if (results != null && results.length > 0) {
if (results[0].getStatusCode().isGood()) {
return client.getSubscriptions().stream()
.filter(subscription ->
subscription
.getSubscriptionId()
.map(currentSubscriptionId -> currentSubscriptionId.equals(subscriptionId))
.orElse(false))
.findFirst();
} else {
log.debug("OPC UA subscription not transferred to connection: {}", results[0].getStatusCode().toString());
return Optional.empty();
}
} else {
log.error("OPC UA subscription not transferred to connection: no results returned");
return Optional.empty();
}

}

/**
* Synchronizes the tags and monitored items in the subscription.
* It removes monitored items that are not in the tags list and adds new monitored items from the tags list.
* It also updates existing monitored items with the configured queue size and sampling interval.
*
* @param subscription the OPC UA subscription
* @param tags the list of tags to synchronize
* @param config the configuration for the OPC UA adapter
* @return true if synchronization was successful, false otherwise
*/
private static boolean syncTagsAndMonitoredItems(final @NotNull OpcUaSubscription subscription, final @NotNull List<OpcuaTag> tags, final @NotNull OpcUaSpecificAdapterConfig config) {

final var nodeIdToTag = tags.stream().collect(Collectors.toMap(tag -> NodeId.parse(tag.getDefinition().getNode()), Function.identity()));
final var nodeIdToMonitoredItem = subscription.getMonitoredItems().stream().collect(Collectors.toMap(monitoredItem -> monitoredItem.getReadValueId().getNodeId(), Function.identity()));

final var monitoredItemsToRemove = nodeIdToMonitoredItem.entrySet().stream().filter(entry -> !nodeIdToTag.containsKey(entry.getKey())).map(Map.Entry::getValue).toList();
final var monitoredItemsToAdd = nodeIdToTag.entrySet().stream().filter(entry -> !nodeIdToMonitoredItem.containsKey(entry.getKey())).map(Map.Entry::getValue).toList();

//clear deleted monitored items
if(!monitoredItemsToRemove.isEmpty()) {
subscription.removeMonitoredItems(monitoredItemsToRemove);
log.debug("Removed monitored items: {}", monitoredItemsToRemove.stream().map(item -> item.getReadValueId().getNodeId()));
}

//update existing monitored items
subscription.getMonitoredItems().forEach(monitoredItem -> {
//TODO: allow to configure these values per TAG!!!!
monitoredItem.setQueueSize(uint(config.getOpcuaToMqttConfig().serverQueueSize()));
monitoredItem.setSamplingInterval(config.getOpcuaToMqttConfig().publishingInterval());
});

//add new monitored items
if(!monitoredItemsToAdd.isEmpty()) {
monitoredItemsToAdd.forEach(opcuaTag -> {
final String nodeId = opcuaTag.getDefinition().getNode();
final var monitoredItem = OpcUaMonitoredItem.newDataItem(NodeId.parse(nodeId));
monitoredItem.setQueueSize(uint(config.getOpcuaToMqttConfig().serverQueueSize()));
monitoredItem.setSamplingInterval(config.getOpcuaToMqttConfig().publishingInterval());
subscription.addMonitoredItem(monitoredItem);
});
log.debug("Added monitored items: {}", monitoredItemsToAdd.stream().map(item -> item.getDefinition().getNode()).toList());
}

try {
subscription.synchronizeMonitoredItems();
log.info("All monitored items synchronized successfully");
return true;
} catch (final UaException e) {
log.error("Failed to synchronize monitored items: {} {}", e.getStatusCode(), e.getMessage(), e);
return false;
}
}

private record ConnectionContext(@NotNull OpcUaClient client, @NotNull ServiceFaultListener faultListener, @NotNull SessionActivityListener activityListener) {
}

private static void quietlyDeleteSubscription(
final @NotNull OpcUaClient client,
final @NotNull OpcUaSubscription subscription) {
Expand Down Expand Up @@ -367,4 +209,7 @@ private static void quietlyCloseClient(
log.error("Failed to disconnect: {}", e.getMessage());
}
}

private record ConnectionContext(@NotNull OpcUaClient client, @NotNull ServiceFaultListener faultListener, @NotNull SessionActivityListener activityListener) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ public OpcUaClientConfigurator(final @NotNull String adapterId, final @NotNull P

@Override
public void accept(final @NotNull OpcUaClientConfigBuilder configBuilder) {
configBuilder.setApplicationName(LocalizedText.english(Constants.OPCUA_APPLICATION_NAME));
configBuilder.setApplicationUri(Constants.OPCUA_APPLICATION_URI);
configBuilder.setProductUri(Constants.OPCUA_PRODUCT_URI);
configBuilder.setSessionName(() -> Constants.OPCUA_SESSION_NAME_PREFIX + adapterId);
configBuilder
.setApplicationName(LocalizedText.english(Constants.OPCUA_APPLICATION_NAME))
.setApplicationUri(Constants.OPCUA_APPLICATION_URI)
.setProductUri(Constants.OPCUA_PRODUCT_URI)
.setSessionName(() -> Constants.OPCUA_SESSION_NAME_PREFIX + adapterId);

log.info("TLS is enabled: {}", parsedConfig.tlsEnabled());
if (parsedConfig.tlsEnabled()) {
Expand Down
Loading
Loading