From cf561f15230ba08c8d8cbc163e1704c89b81701d Mon Sep 17 00:00:00 2001 From: Igor Abdrakhimov Date: Thu, 15 May 2025 13:26:07 -0700 Subject: [PATCH 1/6] First compilable client --- .../iot/iotcommands/IotCommandsClient.java | 71 ++++ .../iot/iotcommands/IotCommandsV2Client.java | 340 ++++++++++++++++++ .../model/CommandExecutionEvent.java | 45 +++ .../model/CommandExecutionStatus.java | 71 ++++ .../CommandExecutionsSubscriptionRequest.java | 32 ++ .../iot/iotcommands/model/DeviceType.java | 56 +++ .../iotcommands/model/RejectedErrorCode.java | 91 +++++ .../iot/iotcommands/model/StatusReason.java | 31 ++ .../model/UpdateCommandExecutionRequest.java | 55 +++ .../model/UpdateCommandExecutionResponse.java | 24 ++ .../iotcommands/model/V2ErrorResponse.java | 39 ++ .../model/V2ErrorResponseException.java | 33 ++ 12 files changed, 888 insertions(+) create mode 100644 sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/IotCommandsClient.java create mode 100644 sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/IotCommandsV2Client.java create mode 100644 sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/model/CommandExecutionEvent.java create mode 100644 sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/model/CommandExecutionStatus.java create mode 100644 sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/model/CommandExecutionsSubscriptionRequest.java create mode 100644 sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/model/DeviceType.java create mode 100644 sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/model/RejectedErrorCode.java create mode 100644 sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/model/StatusReason.java create mode 100644 sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/model/UpdateCommandExecutionRequest.java create mode 100644 sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/model/UpdateCommandExecutionResponse.java create mode 100644 sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/model/V2ErrorResponse.java create mode 100644 sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/model/V2ErrorResponseException.java diff --git a/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/IotCommandsClient.java b/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/IotCommandsClient.java new file mode 100644 index 000000000..e8dd2fdbd --- /dev/null +++ b/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/IotCommandsClient.java @@ -0,0 +1,71 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + * + * This file is generated. + */ + +package software.amazon.awssdk.iot.iotcommands; + +import software.amazon.awssdk.iot.iotcommands.model.CommandExecutionEvent; +import software.amazon.awssdk.iot.iotcommands.model.CommandExecutionStatus; +import software.amazon.awssdk.iot.iotcommands.model.CommandExecutionsSubscriptionRequest; +import software.amazon.awssdk.iot.iotcommands.model.DeviceType; +import software.amazon.awssdk.iot.iotcommands.model.RejectedErrorCode; +import software.amazon.awssdk.iot.iotcommands.model.StatusReason; +import software.amazon.awssdk.iot.iotcommands.model.UpdateCommandExecutionRequest; +import software.amazon.awssdk.iot.iotcommands.model.UpdateCommandExecutionResponse; +import software.amazon.awssdk.iot.iotcommands.model.V2ErrorResponse; + +import java.nio.charset.StandardCharsets; + +import software.amazon.awssdk.crt.mqtt.MqttClientConnection; +import software.amazon.awssdk.crt.mqtt.QualityOfService; +import software.amazon.awssdk.crt.mqtt.MqttException; +import software.amazon.awssdk.crt.mqtt.MqttMessage; + +import software.amazon.awssdk.iot.Timestamp; +import software.amazon.awssdk.iot.EnumSerializer; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +/** + * The AWS IoT commands service is used to send an instruction from the cloud to a device that is connected to AWS IoT. + * + * AWS documentation: https://docs.aws.amazon.com/iot/latest/developerguide/iot-remote-command.html + * +*/ +public class IotCommandsClient { + private MqttClientConnection connection = null; + private final Gson gson = getGson(); + + /** + * Constructs a new IotCommandsClient + * @param connection The connection to use + */ + public IotCommandsClient(MqttClientConnection connection) { + this.connection = connection; + } + + private Gson getGson() { + GsonBuilder gson = new GsonBuilder(); + gson.disableHtmlEscaping(); + gson.registerTypeAdapter(Timestamp.class, new Timestamp.Serializer()); + gson.registerTypeAdapter(Timestamp.class, new Timestamp.Deserializer()); + addTypeAdapters(gson); + return gson.create(); + } + + private void addTypeAdapters(GsonBuilder gson) { + gson.registerTypeAdapter(CommandExecutionStatus.class, new EnumSerializer()); + gson.registerTypeAdapter(DeviceType.class, new EnumSerializer()); + gson.registerTypeAdapter(RejectedErrorCode.class, new EnumSerializer()); + } + +} diff --git a/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/IotCommandsV2Client.java b/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/IotCommandsV2Client.java new file mode 100644 index 000000000..c8749de3c --- /dev/null +++ b/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/IotCommandsV2Client.java @@ -0,0 +1,340 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + * + * This file is generated. + */ + +package software.amazon.awssdk.iot.iotcommands; + +import java.lang.AutoCloseable; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CompletableFuture; +import java.util.UUID; +import java.util.function.BiFunction; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import software.amazon.awssdk.crt.CrtRuntimeException; +import software.amazon.awssdk.crt.iot.*; +import software.amazon.awssdk.crt.mqtt.MqttClientConnection; +import software.amazon.awssdk.crt.mqtt5.Mqtt5Client; +import software.amazon.awssdk.iot.*; +import software.amazon.awssdk.iot.iotcommands.model.*; + +/** + * The AWS IoT commands service is used to send an instruction from the cloud to a device that is connected to AWS IoT. + * + * AWS documentation: https://docs.aws.amazon.com/iot/latest/developerguide/iot-remote-command.html + * +*/ +public class IotCommandsV2Client implements AutoCloseable { + + private MqttRequestResponseClient rrClient; + private final Gson gson; + + private Gson createGson() { + GsonBuilder gson = new GsonBuilder(); + gson.disableHtmlEscaping(); + gson.registerTypeAdapter(Timestamp.class, new Timestamp.Serializer()); + gson.registerTypeAdapter(Timestamp.class, new Timestamp.Deserializer()); + addTypeAdapters(gson); + return gson.create(); + } + + private void addTypeAdapters(GsonBuilder gson) { + gson.registerTypeAdapter(CommandExecutionStatus.class, new EnumSerializer()); + gson.registerTypeAdapter(DeviceType.class, new EnumSerializer()); + gson.registerTypeAdapter(RejectedErrorCode.class, new EnumSerializer()); + } + + private IotCommandsV2Client(MqttRequestResponseClient rrClient) { + this.rrClient = rrClient; + this.gson = createGson(); + } + + /** + * Constructs a new IotCommandsV2Client, using an MQTT5 client as transport + * + * @param protocolClient the MQTT5 client to use + * @param options configuration options to use + */ + static public IotCommandsV2Client newFromMqtt5(Mqtt5Client protocolClient, MqttRequestResponseClientOptions options) { + MqttRequestResponseClient rrClient = new MqttRequestResponseClient(protocolClient, options); + return new IotCommandsV2Client(rrClient); + } + + /** + * Constructs a new IotCommandsV2Client, using an MQTT311 client as transport + * + * @param protocolClient the MQTT311 client to use + * @param options configuration options to use + */ + static public IotCommandsV2Client newFromMqtt311(MqttClientConnection protocolClient, MqttRequestResponseClientOptions options) { + MqttRequestResponseClient rrClient = new MqttRequestResponseClient(protocolClient, options); + return new IotCommandsV2Client(rrClient); + } + + /** + * Releases all resources used by the client. It is not valid to invoke operations + * on the client after it has been closed. + */ + public void close() { + this.rrClient.decRef(); + this.rrClient = null; + } + + private CommandExecutionEvent createCommandExecutionEvent(IncomingPublishEvent publishEvent) { + CommandExecutionEvent event = new CommandExecutionEvent(); + event.executionId = publishEvent.getTopic().split("/")[5]; + event.payload = publishEvent.getPayload(); + String contentType = publishEvent.getContentType(); + if (contentType != null) { + event.contentType = contentType; + } + Long messageExpiryIntervalSeconds = publishEvent.getMessageExpiryIntervalSeconds(); + if (messageExpiryIntervalSeconds != null) { + event.timeout = Math.toIntExact(messageExpiryIntervalSeconds); + } + return event; + } + + /** + * Update the status of a command execution. + * + * + * @param request modeled request to perform + * + * @return a future that will complete with the corresponding response + */ + public CompletableFuture updateCommandExecution(UpdateCommandExecutionRequest request) { + V2ClientFuture responseFuture = new V2ClientFuture<>(); + + try { + if (request.deviceType == null) { + throw new CrtRuntimeException("UpdateCommandExecutionRequest.deviceType cannot be null"); + } + + if (request.deviceId == null) { + throw new CrtRuntimeException("UpdateCommandExecutionRequest.deviceId cannot be null"); + } + + if (request.executionId == null) { + throw new CrtRuntimeException("UpdateCommandExecutionRequest.executionId cannot be null"); + } + + RequestResponseOperation.RequestResponseOperationBuilder builder = RequestResponseOperation.builder(); + + + // Publish Topic + String publishTopic = "$aws/commands/{deviceType}/{deviceId}/executions/{executionId}/response/json"; + publishTopic = publishTopic.replace("{deviceType}", request.deviceType.toString()); + publishTopic = publishTopic.replace("{deviceId}", request.deviceId); + publishTopic = publishTopic.replace("{executionId}", request.executionId); + builder.withPublishTopic(publishTopic); + + // Payload + String payloadJson = gson.toJson(request); + builder.withPayload(payloadJson.getBytes(StandardCharsets.UTF_8)); + + // Subscriptions + String subscription0 = "$aws/commands/{deviceType}/{deviceId}/executions/{executionId}/response/accepted/json"; + subscription0 = subscription0.replace("{deviceType}", request.deviceType.toString()); + subscription0 = subscription0.replace("{deviceId}", request.deviceId); + subscription0 = subscription0.replace("{executionId}", request.executionId); + builder.withSubscription(subscription0); + String subscription1 = "$aws/commands/{deviceType}/{deviceId}/executions/{executionId}/response/rejected/json"; + subscription1 = subscription1.replace("{deviceType}", request.deviceType.toString()); + subscription1 = subscription1.replace("{deviceId}", request.deviceId); + subscription1 = subscription1.replace("{executionId}", request.executionId); + builder.withSubscription(subscription1); + + // Response paths + ResponsePath.ResponsePathBuilder pathBuilder1 = ResponsePath.builder(); + String responseTopic1 = publishTopic + "/accepted"; + pathBuilder1.withResponseTopic(publishTopic + "/accepted"); + builder.withResponsePath(pathBuilder1.build()); + + ResponsePath.ResponsePathBuilder pathBuilder2 = ResponsePath.builder(); + String responseTopic2 = publishTopic + "/rejected"; + pathBuilder2.withResponseTopic(publishTopic + "/rejected"); + builder.withResponsePath(pathBuilder2.build()); + + // Submit + submitOperation(responseFuture, builder.build(), responseTopic1, UpdateCommandExecutionResponse.class, responseTopic2, V2ErrorResponse.class, IotCommandsV2Client::createV2ErrorResponseException); + } catch (Exception e) { + responseFuture.completeExceptionally(createV2ErrorResponseException(e.getMessage(), null)); + } + + return responseFuture; + } + + /** + * Creates a stream of CommandExecution notifications for a given IoT thing. + * + * + * @param request modeled streaming operation subscription configuration + * @param options set of callbacks that the operation should invoke in response to related events + * + * @return a streaming operation which will invoke a callback every time a message is received on the + * associated MQTT topic + */ + public StreamingOperation createCommandExecutionsCborPayloadStream(CommandExecutionsSubscriptionRequest request, V2ClientStreamOptions options) { + String topic = "$aws/commands/{deviceType}/{deviceId}/executions/+/request/cbor"; + + if (request.deviceType == null) { + throw new CrtRuntimeException("CommandExecutionsSubscriptionRequest.deviceType cannot be null"); + } + topic = topic.replace("{deviceType}", request.deviceType.toString()); + + if (request.deviceId == null) { + throw new CrtRuntimeException("CommandExecutionsSubscriptionRequest.deviceId cannot be null"); + } + topic = topic.replace("{deviceId}", request.deviceId); + + StreamingOperationOptions innerOptions = StreamingOperationOptions.builder() + .withTopic(topic) + .withSubscriptionStatusEventCallback(options.subscriptionEventHandler()) + .withIncomingPublishEventCallback((event) -> { + try { + CommandExecutionEvent response = createCommandExecutionEvent(event); + options.streamEventHandler().accept(response); + } catch (Exception e) { + V2DeserializationFailureEvent failureEvent = V2DeserializationFailureEvent.builder() + .withCause(e) + .withPayload(event.getPayload()) + .withTopic(event.getTopic()) + .build(); + options.deserializationFailureHandler().accept(failureEvent); + } + }) + .build(); + + return this.rrClient.createStream(innerOptions); + } + + /** + * Creates a stream of CommandExecution notifications for a given IoT thing. + * + * + * @param request modeled streaming operation subscription configuration + * @param options set of callbacks that the operation should invoke in response to related events + * + * @return a streaming operation which will invoke a callback every time a message is received on the + * associated MQTT topic + */ + public StreamingOperation createCommandExecutionsGenericPayloadStream(CommandExecutionsSubscriptionRequest request, V2ClientStreamOptions options) { + String topic = "$aws/commands/{deviceType}/{deviceId}/executions/+/request"; + + if (request.deviceType == null) { + throw new CrtRuntimeException("CommandExecutionsSubscriptionRequest.deviceType cannot be null"); + } + topic = topic.replace("{deviceType}", request.deviceType.toString()); + + if (request.deviceId == null) { + throw new CrtRuntimeException("CommandExecutionsSubscriptionRequest.deviceId cannot be null"); + } + topic = topic.replace("{deviceId}", request.deviceId); + + StreamingOperationOptions innerOptions = StreamingOperationOptions.builder() + .withTopic(topic) + .withSubscriptionStatusEventCallback(options.subscriptionEventHandler()) + .withIncomingPublishEventCallback((event) -> { + try { + CommandExecutionEvent response = createCommandExecutionEvent(event); + options.streamEventHandler().accept(response); + } catch (Exception e) { + V2DeserializationFailureEvent failureEvent = V2DeserializationFailureEvent.builder() + .withCause(e) + .withPayload(event.getPayload()) + .withTopic(event.getTopic()) + .build(); + options.deserializationFailureHandler().accept(failureEvent); + } + }) + .build(); + + return this.rrClient.createStream(innerOptions); + } + + /** + * Creates a stream of CommandExecution notifications for a given IoT thing. + * + * + * @param request modeled streaming operation subscription configuration + * @param options set of callbacks that the operation should invoke in response to related events + * + * @return a streaming operation which will invoke a callback every time a message is received on the + * associated MQTT topic + */ + public StreamingOperation createCommandExecutionsJsonPayloadStream(CommandExecutionsSubscriptionRequest request, V2ClientStreamOptions options) { + String topic = "$aws/commands/{deviceType}/{deviceId}/executions/+/request/json"; + + if (request.deviceType == null) { + throw new CrtRuntimeException("CommandExecutionsSubscriptionRequest.deviceType cannot be null"); + } + topic = topic.replace("{deviceType}", request.deviceType.toString()); + + if (request.deviceId == null) { + throw new CrtRuntimeException("CommandExecutionsSubscriptionRequest.deviceId cannot be null"); + } + topic = topic.replace("{deviceId}", request.deviceId); + + StreamingOperationOptions innerOptions = StreamingOperationOptions.builder() + .withTopic(topic) + .withSubscriptionStatusEventCallback(options.subscriptionEventHandler()) + .withIncomingPublishEventCallback((event) -> { + try { + CommandExecutionEvent response = createCommandExecutionEvent(event); + options.streamEventHandler().accept(response); + } catch (Exception e) { + V2DeserializationFailureEvent failureEvent = V2DeserializationFailureEvent.builder() + .withCause(e) + .withPayload(event.getPayload()) + .withTopic(event.getTopic()) + .build(); + options.deserializationFailureHandler().accept(failureEvent); + } + }) + .build(); + + return this.rrClient.createStream(innerOptions); + } + + static private Throwable createV2ErrorResponseException(String message, V2ErrorResponse errorResponse) { + return new V2ErrorResponseException(message, errorResponse); + } + + private void submitOperation(V2ClientFuture finalFuture, RequestResponseOperation operation, String responseTopic, Class responseClass, String errorTopic, Class errorClass, BiFunction exceptionFactory) { + try { + CompletableFuture responseFuture = this.rrClient.submitRequest(operation); + CompletableFuture compositeFuture = responseFuture.whenComplete((res, ex) -> { + if (ex != null) { + finalFuture.completeExceptionally(exceptionFactory.apply(ex.getMessage(), null)); + } else if (res.getTopic().equals(responseTopic)) { + try { + String payload = new String(res.getPayload(), StandardCharsets.UTF_8); + T response = this.gson.fromJson(payload, responseClass); + finalFuture.complete(response); + } catch (Exception e) { + finalFuture.completeExceptionally(exceptionFactory.apply(e.getMessage(), null)); + } + } else if (res.getTopic().equals(errorTopic)) { + try { + String payload = new String(res.getPayload(), StandardCharsets.UTF_8); + E error = this.gson.fromJson(payload, errorClass); + finalFuture.completeExceptionally(exceptionFactory.apply("Request-response operation failure", error)); + } catch (Exception e) { + finalFuture.completeExceptionally(exceptionFactory.apply(e.getMessage(), null)); + } + } else { + finalFuture.completeExceptionally(exceptionFactory.apply("Request-response operation completed on unknown topic: " + res.getTopic(), null)); + } + }); + finalFuture.setTriggeringFuture(compositeFuture); + } catch (Exception ex) { + finalFuture.completeExceptionally(exceptionFactory.apply(ex.getMessage(), null)); + } + } + +} diff --git a/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/model/CommandExecutionEvent.java b/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/model/CommandExecutionEvent.java new file mode 100644 index 000000000..c4451105d --- /dev/null +++ b/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/model/CommandExecutionEvent.java @@ -0,0 +1,45 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + * + * This file is generated. + */ + +package software.amazon.awssdk.iot.iotcommands.model; + + +/** + * Sent whenever a command execution is added for a thing or a client. + * + */ +public class CommandExecutionEvent { + + /** + * Opaque data containing instructions sent from the IoT commands service. + * + */ + public byte[] payload; + + + /** + * Unique ID for the specific execution of a command. A command can have multiple executions, each with a unique ID. + * + */ + public String executionId; + + + /** + * Data format of the payload. It is supposed to be a MIME type (IANA media type), but can be an arbitrary string. + * + */ + public String contentType; + + + /** + * Number of seconds before the IoT commands service decides that this command execution is timed out. + * + */ + public Integer timeout; + + +} diff --git a/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/model/CommandExecutionStatus.java b/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/model/CommandExecutionStatus.java new file mode 100644 index 000000000..a22f6775a --- /dev/null +++ b/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/model/CommandExecutionStatus.java @@ -0,0 +1,71 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + * + * This file is generated. + */ + +package software.amazon.awssdk.iot.iotcommands.model; + +/** + * The status of the command execution. + * + */ +public enum CommandExecutionStatus { + + /** + * Enum value is an unknown value + */ + UNKNOWN_ENUM_VALUE("UNKNOWN_ENUM_VALUE"), + + /** + * The device is currently processing the received command. + */ + IN_PROGRESS("IN_PROGRESS"), + + /** + * The device successfully completed the command. + */ + SUCCEEDED("SUCCEEDED"), + + /** + * The device failed to complete the command. + */ + FAILED("FAILED"), + + /** + * The device received an invalid or incomplete request. + */ + REJECTED("REJECTED"), + + /** + * When the command execution timed out, this status can be used to provide additional information in the statusReason field in the UpdateCommandExecutionRequest request. + */ + TIMED_OUT("TIMED_OUT"); + + private String value; + + private CommandExecutionStatus(String value) { + this.value = value; + } + + @Override + public String toString() { + return value; + } + + /** + * Returns The enum associated with the given string or UNKNOWN_ENUM_VALUE + * if no enum is found. + * @param val The string to use + * @return The enum associated with the string or UNKNOWN_ENUM_VALUE + */ + static CommandExecutionStatus fromString(String val) { + for (CommandExecutionStatus e : CommandExecutionStatus.class.getEnumConstants()) { + if (e.toString().compareTo(val) == 0) { + return e; + } + } + return UNKNOWN_ENUM_VALUE; + } +} diff --git a/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/model/CommandExecutionsSubscriptionRequest.java b/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/model/CommandExecutionsSubscriptionRequest.java new file mode 100644 index 000000000..6912d3b58 --- /dev/null +++ b/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/model/CommandExecutionsSubscriptionRequest.java @@ -0,0 +1,32 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + * + * This file is generated. + */ + +package software.amazon.awssdk.iot.iotcommands.model; + +import software.amazon.awssdk.iot.iotcommands.model.DeviceType; + +/** + * Data needed to subscribe to CommandExecution events. + * + */ +public class CommandExecutionsSubscriptionRequest { + + /** + * The type of a target device. Determine if the device should subscribe for commands addressed to an IoT Thing or MQTT client. + * + */ + public DeviceType deviceType; + + + /** + * Depending on device type value, this field is either an IoT Thing name or a client ID. + * + */ + public String deviceId; + + +} diff --git a/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/model/DeviceType.java b/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/model/DeviceType.java new file mode 100644 index 000000000..e79ba5b15 --- /dev/null +++ b/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/model/DeviceType.java @@ -0,0 +1,56 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + * + * This file is generated. + */ + +package software.amazon.awssdk.iot.iotcommands.model; + +/** + * Possible device types for command executions. + * + */ +public enum DeviceType { + + /** + * Enum value is an unknown value + */ + UNKNOWN_ENUM_VALUE("UNKNOWN_ENUM_VALUE"), + + /** + * A target for the commands is an IoT Thing. + */ + THING("things"), + + /** + * A target for the commands is an MQTT client ID. + */ + CLIENT("clients"); + + private String value; + + private DeviceType(String value) { + this.value = value; + } + + @Override + public String toString() { + return value; + } + + /** + * Returns The enum associated with the given string or UNKNOWN_ENUM_VALUE + * if no enum is found. + * @param val The string to use + * @return The enum associated with the string or UNKNOWN_ENUM_VALUE + */ + static DeviceType fromString(String val) { + for (DeviceType e : DeviceType.class.getEnumConstants()) { + if (e.toString().compareTo(val) == 0) { + return e; + } + } + return UNKNOWN_ENUM_VALUE; + } +} diff --git a/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/model/RejectedErrorCode.java b/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/model/RejectedErrorCode.java new file mode 100644 index 000000000..e021027e7 --- /dev/null +++ b/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/model/RejectedErrorCode.java @@ -0,0 +1,91 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + * + * This file is generated. + */ + +package software.amazon.awssdk.iot.iotcommands.model; + +/** + * A value indicating the kind of error encountered while processing an AWS IoT Commands request + * + */ +public enum RejectedErrorCode { + + /** + * Enum value is an unknown value + */ + UNKNOWN_ENUM_VALUE("UNKNOWN_ENUM_VALUE"), + + /** + * The request was sent to a topic in the AWS IoT Commands namespace that does not map to any API. + */ + INVALID_TOPIC("InvalidTopic"), + + /** + * The contents of the request could not be interpreted as valid UTF-8-encoded JSON. + */ + INVALID_JSON("InvalidJson"), + + /** + * The contents of the request were invalid. The message contains details about the error. + */ + INVALID_REQUEST("InvalidRequest"), + + /** + * An update attempted to change the command execution to a state that is invalid because of the command execution's current state. In this case, the body of the error message also contains the executionState field. + */ + INVALID_STATE_TRANSITION("InvalidStateTransition"), + + /** + * The CommandExecution specified by the request topic does not exist. + */ + RESOURCE_NOT_FOUND("ResourceNotFound"), + + /** + * The expected version specified in the request does not match the version of the command execution in the AWS IoT Commands service. In this case, the body of the error message also contains the executionState field. + */ + VERSION_MISMATCH("VersionMismatch"), + + /** + * There was an internal error during the processing of the request. + */ + INTERNAL_ERROR("InternalError"), + + /** + * The request was throttled. + */ + REQUEST_THROTTLED("RequestThrottled"), + + /** + * Occurs when a command to describe a command is performed on a command that is in a terminal state. + */ + TERMINAL_STATE_REACHED("TerminalStateReached"); + + private String value; + + private RejectedErrorCode(String value) { + this.value = value; + } + + @Override + public String toString() { + return value; + } + + /** + * Returns The enum associated with the given string or UNKNOWN_ENUM_VALUE + * if no enum is found. + * @param val The string to use + * @return The enum associated with the string or UNKNOWN_ENUM_VALUE + */ + static RejectedErrorCode fromString(String val) { + for (RejectedErrorCode e : RejectedErrorCode.class.getEnumConstants()) { + if (e.toString().compareTo(val) == 0) { + return e; + } + } + return UNKNOWN_ENUM_VALUE; + } +} diff --git a/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/model/StatusReason.java b/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/model/StatusReason.java new file mode 100644 index 000000000..3345135ff --- /dev/null +++ b/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/model/StatusReason.java @@ -0,0 +1,31 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + * + * This file is generated. + */ + +package software.amazon.awssdk.iot.iotcommands.model; + + +/** + * Additional information on provided update. + * + */ +public class StatusReason { + + /** + * Reason code in the [A-Z0-9_-]+ format and not exceeding 64 characters in length. + * + */ + public String reasonCode; + + + /** + * Detailed description of the reason. + * + */ + public String reasonDescription; + + +} diff --git a/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/model/UpdateCommandExecutionRequest.java b/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/model/UpdateCommandExecutionRequest.java new file mode 100644 index 000000000..54f83da37 --- /dev/null +++ b/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/model/UpdateCommandExecutionRequest.java @@ -0,0 +1,55 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + * + * This file is generated. + */ + +package software.amazon.awssdk.iot.iotcommands.model; + +import software.amazon.awssdk.iot.iotcommands.model.CommandExecutionStatus; +import software.amazon.awssdk.iot.iotcommands.model.DeviceType; +import software.amazon.awssdk.iot.iotcommands.model.StatusReason; + +/** + * Data needed to make an UpdateCommandExecution request. + * + */ +public class UpdateCommandExecutionRequest { + + /** + * The type of a target device. Determine if the device should subscribe for commands addressed to an IoT Thing or MQTT client. + * + */ + public DeviceType deviceType; + + + /** + * Depending on device type value, this field is either an IoT Thing name or a client ID. + * + */ + public String deviceId; + + + /** + * ID of the command execution that needs to be updated. + * + */ + public String executionId; + + + /** + * The status of the command execution. + * + */ + public CommandExecutionStatus status; + + + /** + * A reason for the updated status. Can provide additional information on failures. Should be used when status is one of the following: FAILED, REJECTED, TIMED_OUT. + * + */ + public StatusReason statusReason; + + +} diff --git a/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/model/UpdateCommandExecutionResponse.java b/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/model/UpdateCommandExecutionResponse.java new file mode 100644 index 000000000..716b80f34 --- /dev/null +++ b/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/model/UpdateCommandExecutionResponse.java @@ -0,0 +1,24 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + * + * This file is generated. + */ + +package software.amazon.awssdk.iot.iotcommands.model; + + +/** + * Response payload to an UpdateCommandExecution request. + * + */ +public class UpdateCommandExecutionResponse { + + /** + * Execution ID for which a response was sent + * + */ + public String executionId; + + +} diff --git a/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/model/V2ErrorResponse.java b/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/model/V2ErrorResponse.java new file mode 100644 index 000000000..37bde4570 --- /dev/null +++ b/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/model/V2ErrorResponse.java @@ -0,0 +1,39 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + * + * This file is generated. + */ + +package software.amazon.awssdk.iot.iotcommands.model; + +import software.amazon.awssdk.iot.iotcommands.model.RejectedErrorCode; + +/** + * Response document containing details about a failed request. + * + */ +public class V2ErrorResponse { + + /** + * Indicates the type of error. + * + */ + public RejectedErrorCode error; + + + /** + * A text message that provides additional information. + * + */ + public String errorMessage; + + + /** + * Execution ID for which error is set. + * + */ + public String executionId; + + +} diff --git a/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/model/V2ErrorResponseException.java b/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/model/V2ErrorResponseException.java new file mode 100644 index 000000000..08a8f1ea8 --- /dev/null +++ b/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/model/V2ErrorResponseException.java @@ -0,0 +1,33 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + * + * This file is generated. + */ + +package software.amazon.awssdk.iot.iotcommands.model; + +import software.amazon.awssdk.crt.CrtRuntimeException; + +/** + * An exception that can wrap a specific modeled service error (V2ErrorResponse) as optional, + * auxiliary data. + */ +public class V2ErrorResponseException extends CrtRuntimeException { + private final V2ErrorResponse modeledError; + + /** + * Constructor + */ + public V2ErrorResponseException(String msg, V2ErrorResponse modeledError) { + super(msg); + this.modeledError = modeledError; + } + + /** + * Gets the modeled error, if any, associated with this exception. + */ + public V2ErrorResponse getModeledError() { + return this.modeledError; + } +} From 47cae0f4bef5298300c875695d7c6343ed50a16d Mon Sep 17 00:00:00 2001 From: Igor Abdrakhimov Date: Thu, 15 May 2025 13:26:44 -0700 Subject: [PATCH 2/6] Apply changes to existing clients --- .../awssdk/iot/iotjobs/IotJobsV2Client.java | 16 ++++++++++---- .../iot/iotshadow/IotShadowV2Client.java | 22 ++++++++++++------- 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/sdk/src/main/java/software/amazon/awssdk/iot/iotjobs/IotJobsV2Client.java b/sdk/src/main/java/software/amazon/awssdk/iot/iotjobs/IotJobsV2Client.java index 64143ab7b..bff872c78 100644 --- a/sdk/src/main/java/software/amazon/awssdk/iot/iotjobs/IotJobsV2Client.java +++ b/sdk/src/main/java/software/amazon/awssdk/iot/iotjobs/IotJobsV2Client.java @@ -83,6 +83,16 @@ public void close() { this.rrClient = null; } + private NextJobExecutionChangedEvent createNextJobExecutionChangedEvent(IncomingPublishEvent publishEvent) { + String payload = new String(publishEvent.getPayload(), StandardCharsets.UTF_8); + return this.gson.fromJson(payload, NextJobExecutionChangedEvent.class); + } + + private JobExecutionsChangedEvent createJobExecutionsChangedEvent(IncomingPublishEvent publishEvent) { + String payload = new String(publishEvent.getPayload(), StandardCharsets.UTF_8); + return this.gson.fromJson(payload, JobExecutionsChangedEvent.class); + } + /** * Gets detailed information about a job execution. * @@ -364,8 +374,7 @@ public StreamingOperation createJobExecutionsChangedStream(JobExecutionsChangedS .withSubscriptionStatusEventCallback(options.subscriptionEventHandler()) .withIncomingPublishEventCallback((event) -> { try { - String payload = new String(event.getPayload(), StandardCharsets.UTF_8); - JobExecutionsChangedEvent response = this.gson.fromJson(payload, JobExecutionsChangedEvent.class); + JobExecutionsChangedEvent response = createJobExecutionsChangedEvent(event); options.streamEventHandler().accept(response); } catch (Exception e) { V2DeserializationFailureEvent failureEvent = V2DeserializationFailureEvent.builder() @@ -406,8 +415,7 @@ public StreamingOperation createNextJobExecutionChangedStream(NextJobExecutionCh .withSubscriptionStatusEventCallback(options.subscriptionEventHandler()) .withIncomingPublishEventCallback((event) -> { try { - String payload = new String(event.getPayload(), StandardCharsets.UTF_8); - NextJobExecutionChangedEvent response = this.gson.fromJson(payload, NextJobExecutionChangedEvent.class); + NextJobExecutionChangedEvent response = createNextJobExecutionChangedEvent(event); options.streamEventHandler().accept(response); } catch (Exception e) { V2DeserializationFailureEvent failureEvent = V2DeserializationFailureEvent.builder() diff --git a/sdk/src/main/java/software/amazon/awssdk/iot/iotshadow/IotShadowV2Client.java b/sdk/src/main/java/software/amazon/awssdk/iot/iotshadow/IotShadowV2Client.java index 9538e4a64..8a603d699 100644 --- a/sdk/src/main/java/software/amazon/awssdk/iot/iotshadow/IotShadowV2Client.java +++ b/sdk/src/main/java/software/amazon/awssdk/iot/iotshadow/IotShadowV2Client.java @@ -83,6 +83,16 @@ public void close() { this.rrClient = null; } + private ShadowUpdatedEvent createShadowUpdatedEvent(IncomingPublishEvent publishEvent) { + String payload = new String(publishEvent.getPayload(), StandardCharsets.UTF_8); + return this.gson.fromJson(payload, ShadowUpdatedEvent.class); + } + + private ShadowDeltaUpdatedEvent createShadowDeltaUpdatedEvent(IncomingPublishEvent publishEvent) { + String payload = new String(publishEvent.getPayload(), StandardCharsets.UTF_8); + return this.gson.fromJson(payload, ShadowDeltaUpdatedEvent.class); + } + /** * Deletes a named shadow for an AWS IoT thing. * @@ -504,8 +514,7 @@ public StreamingOperation createNamedShadowDeltaUpdatedStream(NamedShadowDeltaUp .withSubscriptionStatusEventCallback(options.subscriptionEventHandler()) .withIncomingPublishEventCallback((event) -> { try { - String payload = new String(event.getPayload(), StandardCharsets.UTF_8); - ShadowDeltaUpdatedEvent response = this.gson.fromJson(payload, ShadowDeltaUpdatedEvent.class); + ShadowDeltaUpdatedEvent response = createShadowDeltaUpdatedEvent(event); options.streamEventHandler().accept(response); } catch (Exception e) { V2DeserializationFailureEvent failureEvent = V2DeserializationFailureEvent.builder() @@ -551,8 +560,7 @@ public StreamingOperation createNamedShadowUpdatedStream(NamedShadowUpdatedSubsc .withSubscriptionStatusEventCallback(options.subscriptionEventHandler()) .withIncomingPublishEventCallback((event) -> { try { - String payload = new String(event.getPayload(), StandardCharsets.UTF_8); - ShadowUpdatedEvent response = this.gson.fromJson(payload, ShadowUpdatedEvent.class); + ShadowUpdatedEvent response = createShadowUpdatedEvent(event); options.streamEventHandler().accept(response); } catch (Exception e) { V2DeserializationFailureEvent failureEvent = V2DeserializationFailureEvent.builder() @@ -593,8 +601,7 @@ public StreamingOperation createShadowDeltaUpdatedStream(ShadowDeltaUpdatedSubsc .withSubscriptionStatusEventCallback(options.subscriptionEventHandler()) .withIncomingPublishEventCallback((event) -> { try { - String payload = new String(event.getPayload(), StandardCharsets.UTF_8); - ShadowDeltaUpdatedEvent response = this.gson.fromJson(payload, ShadowDeltaUpdatedEvent.class); + ShadowDeltaUpdatedEvent response = createShadowDeltaUpdatedEvent(event); options.streamEventHandler().accept(response); } catch (Exception e) { V2DeserializationFailureEvent failureEvent = V2DeserializationFailureEvent.builder() @@ -635,8 +642,7 @@ public StreamingOperation createShadowUpdatedStream(ShadowUpdatedSubscriptionReq .withSubscriptionStatusEventCallback(options.subscriptionEventHandler()) .withIncomingPublishEventCallback((event) -> { try { - String payload = new String(event.getPayload(), StandardCharsets.UTF_8); - ShadowUpdatedEvent response = this.gson.fromJson(payload, ShadowUpdatedEvent.class); + ShadowUpdatedEvent response = createShadowUpdatedEvent(event); options.streamEventHandler().accept(response); } catch (Exception e) { V2DeserializationFailureEvent failureEvent = V2DeserializationFailureEvent.builder() From 39e7e058d738b50b7b2422cb54850a92fa40e502 Mon Sep 17 00:00:00 2001 From: Igor Abdrakhimov Date: Fri, 30 May 2025 10:45:38 -0700 Subject: [PATCH 3/6] Fix response topics --- .../iot/iotcommands/IotCommandsV2Client.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/IotCommandsV2Client.java b/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/IotCommandsV2Client.java index c8749de3c..2516f4675 100644 --- a/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/IotCommandsV2Client.java +++ b/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/IotCommandsV2Client.java @@ -151,13 +151,19 @@ public CompletableFuture updateCommandExecution( // Response paths ResponsePath.ResponsePathBuilder pathBuilder1 = ResponsePath.builder(); - String responseTopic1 = publishTopic + "/accepted"; - pathBuilder1.withResponseTopic(publishTopic + "/accepted"); + String responseTopic1 = "$aws/commands/{deviceType}/{deviceId}/executions/{executionId}/response/accepted/json"; + responseTopic1 = responseTopic1.replace("{deviceType}", request.deviceType.toString()); + responseTopic1 = responseTopic1.replace("{deviceId}", request.deviceId); + responseTopic1 = responseTopic1.replace("{executionId}", request.executionId); + pathBuilder1.withResponseTopic(responseTopic1); builder.withResponsePath(pathBuilder1.build()); ResponsePath.ResponsePathBuilder pathBuilder2 = ResponsePath.builder(); - String responseTopic2 = publishTopic + "/rejected"; - pathBuilder2.withResponseTopic(publishTopic + "/rejected"); + String responseTopic2 = "$aws/commands/{deviceType}/{deviceId}/executions/{executionId}/response/rejected/json"; + responseTopic2 = responseTopic2.replace("{deviceType}", request.deviceType.toString()); + responseTopic2 = responseTopic2.replace("{deviceId}", request.deviceId); + responseTopic2 = responseTopic2.replace("{executionId}", request.executionId); + pathBuilder2.withResponseTopic(responseTopic2); builder.withResponsePath(pathBuilder2.build()); // Submit From 6510b27644687886d93cbded0066cd0c19376ea5 Mon Sep 17 00:00:00 2001 From: Igor Abdrakhimov Date: Thu, 5 Jun 2025 10:58:04 -0700 Subject: [PATCH 4/6] Iot commands tests (#632) --- sdk/pom.xml | 10 +- sdk/tests/v2serviceclients/CommandsTests.java | 571 ++++++++++++++++++ .../V2ServiceClientTestFixture.java | 33 +- 3 files changed, 603 insertions(+), 11 deletions(-) create mode 100644 sdk/tests/v2serviceclients/CommandsTests.java diff --git a/sdk/pom.xml b/sdk/pom.xml index 5fd52f6c1..f0cf34b28 100644 --- a/sdk/pom.xml +++ b/sdk/pom.xml @@ -42,13 +42,19 @@ software.amazon.awssdk iot - 2.30.9 + 2.31.44 + test + + + software.amazon.awssdk + iotjobsdataplane + 2.31.44 test software.amazon.awssdk sts - 2.30.9 + 2.31.44 test diff --git a/sdk/tests/v2serviceclients/CommandsTests.java b/sdk/tests/v2serviceclients/CommandsTests.java new file mode 100644 index 000000000..1974beedb --- /dev/null +++ b/sdk/tests/v2serviceclients/CommandsTests.java @@ -0,0 +1,571 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +import org.junit.jupiter.api.*; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.crt.CrtRuntimeException; +import software.amazon.awssdk.crt.iot.MqttRequestResponseClientOptions; +import software.amazon.awssdk.crt.iot.StreamingOperation; +import software.amazon.awssdk.crt.iot.SubscriptionStatusEventType; +import software.amazon.awssdk.iot.V2ClientStreamOptions; +import software.amazon.awssdk.iot.iotcommands.IotCommandsV2Client; +import software.amazon.awssdk.iot.iotcommands.model.*; +import software.amazon.awssdk.iot.iotcommands.model.CommandExecutionStatus; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.iot.IotClient; +import software.amazon.awssdk.services.iotjobsdataplane.IotJobsDataPlaneClient; +import software.amazon.awssdk.services.iot.model.*; +import software.amazon.awssdk.services.iotjobsdataplane.model.StartCommandExecutionRequest; +import software.amazon.awssdk.services.iotjobsdataplane.model.StartCommandExecutionResponse; +import software.amazon.awssdk.services.sts.StsClient; +import software.amazon.awssdk.services.sts.model.GetCallerIdentityRequest; +import software.amazon.awssdk.services.sts.model.GetCallerIdentityResponse; + +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.logging.Logger; + +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +public class CommandsTests extends V2ServiceClientTestFixture { + + private static final Logger LOGGER = Logger.getLogger(CommandsTests.class.getName()); + + private static class TestContext { + private String thingName = null; + private String thingArn = null; + private String mqttClientId = null; + private String mqttClientArn = null; + + private CreateCommandResponse commandInfo = null; + private StartCommandExecutionResponse commandExecutionInfo = null; + + private final List commandExecutionEvents = new ArrayList<>(); + + private final Lock eventLock = new ReentrantLock(); + private final Condition eventSignal = eventLock.newCondition(); + } + + private StsClient stsClient; + private IotClient iotClient; + private IotCommandsV2Client commandsClient; + private IotJobsDataPlaneClient iotJobsDataPlaneClient; + + private String testRegion; + + private TestContext testContext; + + void populateTestingEnvironmentVariables() { + super.populateTestingEnvironmentVariables(); + testRegion = System.getenv("AWS_TEST_MQTT5_IOT_CORE_REGION"); + } + + boolean hasTestEnvironment() { + return testRegion != null && super.hasBaseTestEnvironment(); + } + + public CommandsTests() { + super(); + populateTestingEnvironmentVariables(); + + if (hasTestEnvironment()) { + stsClient = StsClient.builder() + .region(Region.of(testRegion)) + .build(); + + iotClient = IotClient.builder() + .region(Region.of(testRegion)) + .build(); + + DescribeEndpointRequest describeEndpointRequest = + DescribeEndpointRequest.builder().endpointType("iot:Jobs").build(); + DescribeEndpointResponse describeEndpointResponse = iotClient.describeEndpoint(describeEndpointRequest); + + iotJobsDataPlaneClient = IotJobsDataPlaneClient.builder() + .region(Region.of(testRegion)) + .endpointOverride(URI.create("https://" + describeEndpointResponse.endpointAddress())) + .build(); + } + } + + MqttRequestResponseClientOptions createDefaultServiceClientOptions() { + return MqttRequestResponseClientOptions.builder() + .withMaxRequestResponseSubscriptions(4) + .withMaxStreamingSubscriptions(2) + .withOperationTimeoutSeconds(10) + .build(); + } + + void setupCommandsClient5(MqttRequestResponseClientOptions serviceClientOptions) { + setupBaseMqtt5Client(testContext.mqttClientId); + + if (serviceClientOptions == null) { + serviceClientOptions = createDefaultServiceClientOptions(); + } + + commandsClient = IotCommandsV2Client.newFromMqtt5(mqtt5Client, serviceClientOptions); + } + + void setupCommandsClient311(MqttRequestResponseClientOptions serviceClientOptions) { + setupBaseMqtt311Client(testContext.mqttClientId); + + if (serviceClientOptions == null) { + serviceClientOptions = createDefaultServiceClientOptions(); + } + + commandsClient = IotCommandsV2Client.newFromMqtt311(mqtt311Client, serviceClientOptions); + } + + void pause(long millis) { + try { + wait(millis); + } catch (Exception ex) { + ; + } + } + + CreateCommandResponse createCommand(int index) { + String commandId = "command-id-" + UUID.randomUUID(); + String commandDocumentJson = String.format("{\"test\":\"do-something-%d\"}", index); + SdkBytes commandDocumentBytes = SdkBytes.fromUtf8String(commandDocumentJson); + CommandPayload commandPayload = CommandPayload.builder() + .contentType("application/json") + .content(commandDocumentBytes) + .build(); + + return iotClient.createCommand(CreateCommandRequest.builder() + .commandId(commandId) + .payload(commandPayload) + .build()); + } + + void sleepOnThrottle() { + long seed = System.nanoTime(); + Random generator = new Random(seed); + try { + // 1 - 10 seconds + long sleepMillis = (long) (generator.nextDouble() * 9000 + 1000); + Thread.sleep(sleepMillis); + } catch (Exception e) { + ; + } + } + + void deleteCommand(String commandId) { + boolean done = false; + while (!done) { + try { + iotClient.deleteCommand(DeleteCommandRequest.builder().commandId(commandId).build()); + done = true; + } catch (ThrottlingException | LimitExceededException ex) { + // We run more than 10 CI jobs concurrently, causing us to hit a variety of annoying limits. + sleepOnThrottle(); + } + } + } + + String getMqttClientArn() { + GetCallerIdentityResponse response = stsClient.getCallerIdentity(GetCallerIdentityRequest.builder().build()); + String accountId = response.account(); + // Format the MQTT client ARN + return String.format("arn:aws:iot:%s:%s:client/%s", testRegion, accountId, testContext.mqttClientId); + } + + @BeforeEach + public void setup() { + if (!hasTestEnvironment()) { + return; + } + + testContext = new TestContext(); + + String mqttClientId = "test-" + UUID.randomUUID(); + String thingName = "thing-commands-java-" + UUID.randomUUID(); + + CreateThingResponse response = iotClient.createThing(CreateThingRequest.builder().thingName(thingName).build()); + + testContext.thingName = thingName; + testContext.thingArn = response.thingArn(); + testContext.mqttClientId = mqttClientId; + testContext.mqttClientArn = getMqttClientArn(); + + pause(1000); + + testContext.commandInfo = createCommand(1); + } + + @AfterEach + public void tearDown() { + if (!hasTestEnvironment()) { + return; + } + + if (commandsClient != null) { + commandsClient.close(); + commandsClient = null; + } + + pause(1000); + + if (testContext.commandInfo != null) { + deleteCommand(testContext.commandInfo.commandId()); + } + + pause(1000); + + if (testContext.thingName != null) { + iotClient.deleteThing(DeleteThingRequest.builder().thingName(testContext.thingName).build()); + } + } + + StreamingOperation createCommandExecutionsJsonStream(CommandExecutionsSubscriptionRequest request) { + CompletableFuture subscribed = new CompletableFuture<>(); + + V2ClientStreamOptions options = V2ClientStreamOptions.builder() + .withStreamEventHandler((event) -> { + this.testContext.eventLock.lock(); + try { + this.testContext.commandExecutionEvents.add(event); + } finally { + this.testContext.eventSignal.signalAll(); + this.testContext.eventLock.unlock(); + } + }) + .withSubscriptionEventHandler((event) -> { + if (event.getType() == SubscriptionStatusEventType.SUBSCRIPTION_ESTABLISHED) { + subscribed.complete(true); + } + }) + .build(); + + StreamingOperation stream = commandsClient.createCommandExecutionsJsonPayloadStream(request, options); + stream.open(); + try { + subscribed.get(); + } catch (Exception ex) { + Assertions.fail("createCommandExecutionsJsonPayloadStream should have completed successfully"); + } + + return stream; + } + + void waitForStreamEvents() { + testContext.eventLock.lock(); + try { + testContext.eventSignal.await(10, TimeUnit.SECONDS); + CommandExecutionEvent event = testContext.commandExecutionEvents.get(0); + Assertions.assertEquals(testContext.commandExecutionInfo.executionId(), event.executionId); +// Assertions.assertEquals(testContext.commandExecutionInfo., event.contentType); + } catch (Exception ex) { + Assertions.fail("waitForInitialStreamEvents should have completed successfully"); + } finally { + testContext.eventLock.unlock(); + } + } + + void doThingCommandTest() { + CommandExecutionsSubscriptionRequest commandExecutionsSubscriptionRequest = new CommandExecutionsSubscriptionRequest(); + commandExecutionsSubscriptionRequest.deviceType = DeviceType.THING; + commandExecutionsSubscriptionRequest.deviceId = testContext.thingName; + + try (StreamingOperation commandExecutionsJsonStream = createCommandExecutionsJsonStream(commandExecutionsSubscriptionRequest)) { + + { + StartCommandExecutionRequest request = StartCommandExecutionRequest.builder() + .commandArn(testContext.commandInfo.commandArn()) + .executionTimeoutSeconds(10L) + .targetArn(testContext.thingArn) + .build(); + + testContext.commandExecutionInfo = iotJobsDataPlaneClient.startCommandExecution(request); + } + + // wait for initial stream events to trigger + waitForStreamEvents(); + + // pretend to work on it + pause(1000); + + // Update to in-progress. + { + UpdateCommandExecutionRequest updateCommandExecutionRequest = new UpdateCommandExecutionRequest(); + updateCommandExecutionRequest.executionId = testContext.commandExecutionInfo.executionId(); + updateCommandExecutionRequest.deviceType = DeviceType.THING; + updateCommandExecutionRequest.deviceId = testContext.thingName; + updateCommandExecutionRequest.status = CommandExecutionStatus.IN_PROGRESS; + commandsClient.updateCommandExecution(updateCommandExecutionRequest).get(); + } + + // Verify it's in-progress. + { + GetCommandExecutionRequest request = GetCommandExecutionRequest.builder() + .executionId(testContext.commandExecutionInfo.executionId()) + .targetArn(testContext.thingArn) + .build(); + GetCommandExecutionResponse response = iotClient.getCommandExecution(request); + Assertions.assertEquals( + software.amazon.awssdk.services.iot.model.CommandExecutionStatus.IN_PROGRESS, + response.status()); + } + + // Complete the command execution. + { + UpdateCommandExecutionRequest updateCommandExecutionRequest = new UpdateCommandExecutionRequest(); + updateCommandExecutionRequest.executionId = testContext.commandExecutionInfo.executionId(); + updateCommandExecutionRequest.deviceType = DeviceType.THING; + updateCommandExecutionRequest.deviceId = testContext.thingName; + updateCommandExecutionRequest.status = CommandExecutionStatus.SUCCEEDED; + commandsClient.updateCommandExecution(updateCommandExecutionRequest).get(); + } + + // Verify it's done. + { + GetCommandExecutionRequest request = GetCommandExecutionRequest.builder() + .executionId(testContext.commandExecutionInfo.executionId()) + .targetArn(testContext.thingArn) + .build(); + GetCommandExecutionResponse response = iotClient.getCommandExecution(request); + Assertions.assertEquals( + software.amazon.awssdk.services.iot.model.CommandExecutionStatus.SUCCEEDED, + response.status()); + } + + // Try to update command execution to FAILED. + { + UpdateCommandExecutionRequest updateCommandExecutionRequest = new UpdateCommandExecutionRequest(); + updateCommandExecutionRequest.executionId = testContext.commandExecutionInfo.executionId(); + updateCommandExecutionRequest.deviceType = DeviceType.THING; + updateCommandExecutionRequest.deviceId = testContext.thingName; + updateCommandExecutionRequest.status = CommandExecutionStatus.FAILED; + Assertions.assertThrows(ExecutionException.class, + () -> commandsClient.updateCommandExecution(updateCommandExecutionRequest).get()); + } + } catch (Exception ex) { + ex.printStackTrace(); + Assertions.fail("doCommandsControlTest triggered exception"); + } + } + + @Test + public void handleCommandExecutionWithThing5() { + assumeTrue(hasTestEnvironment()); + setupCommandsClient5(null); + doThingCommandTest(); + } + + @Test + public void handleCommandExecutionWithThing311() { + assumeTrue(hasTestEnvironment()); + setupCommandsClient311(null); + doThingCommandTest(); + } + + void doClientCommandTest() { + CommandExecutionsSubscriptionRequest commandExecutionsSubscriptionRequest = new CommandExecutionsSubscriptionRequest(); + commandExecutionsSubscriptionRequest.deviceType = DeviceType.CLIENT; + commandExecutionsSubscriptionRequest.deviceId = testContext.mqttClientId; + + try (StreamingOperation commandExecutionsJsonStream = createCommandExecutionsJsonStream(commandExecutionsSubscriptionRequest)) { + + { + StartCommandExecutionRequest request = StartCommandExecutionRequest.builder() + .commandArn(testContext.commandInfo.commandArn()) + .executionTimeoutSeconds(10L) + .targetArn(testContext.mqttClientArn) + .build(); + + testContext.commandExecutionInfo = iotJobsDataPlaneClient.startCommandExecution(request); + } + + // wait for initial stream events to trigger + waitForStreamEvents(); + + // pretend to work on it + pause(1000); + + // Update to in-progress. + { + UpdateCommandExecutionRequest updateCommandExecutionRequest = new UpdateCommandExecutionRequest(); + updateCommandExecutionRequest.executionId = testContext.commandExecutionInfo.executionId(); + updateCommandExecutionRequest.deviceType = DeviceType.CLIENT; + updateCommandExecutionRequest.deviceId = testContext.mqttClientId; + updateCommandExecutionRequest.status = CommandExecutionStatus.IN_PROGRESS; + commandsClient.updateCommandExecution(updateCommandExecutionRequest).get(); + } + + // Verify it's in-progress. + { + GetCommandExecutionRequest request = GetCommandExecutionRequest.builder() + .executionId(testContext.commandExecutionInfo.executionId()) + .targetArn(testContext.mqttClientArn) + .build(); + GetCommandExecutionResponse response = iotClient.getCommandExecution(request); + Assertions.assertEquals( + software.amazon.awssdk.services.iot.model.CommandExecutionStatus.IN_PROGRESS, + response.status()); + } + + // Complete the command execution. + { + UpdateCommandExecutionRequest updateCommandExecutionRequest = new UpdateCommandExecutionRequest(); + updateCommandExecutionRequest.executionId = testContext.commandExecutionInfo.executionId(); + updateCommandExecutionRequest.deviceType = DeviceType.CLIENT; + updateCommandExecutionRequest.deviceId = testContext.mqttClientId; + updateCommandExecutionRequest.status = CommandExecutionStatus.SUCCEEDED; + commandsClient.updateCommandExecution(updateCommandExecutionRequest).get(); + } + + // Verify it's done. + { + GetCommandExecutionRequest request = GetCommandExecutionRequest.builder() + .executionId(testContext.commandExecutionInfo.executionId()) + .targetArn(testContext.mqttClientArn) + .build(); + GetCommandExecutionResponse response = iotClient.getCommandExecution(request); + Assertions.assertEquals( + software.amazon.awssdk.services.iot.model.CommandExecutionStatus.SUCCEEDED, + response.status()); + } + + // Try to update command execution to FAILED. + // This should fail. + { + UpdateCommandExecutionRequest updateCommandExecutionRequest = new UpdateCommandExecutionRequest(); + updateCommandExecutionRequest.executionId = testContext.commandExecutionInfo.executionId(); + updateCommandExecutionRequest.deviceType = DeviceType.CLIENT; + updateCommandExecutionRequest.deviceId = testContext.mqttClientId; + updateCommandExecutionRequest.status = CommandExecutionStatus.FAILED; + Assertions.assertThrows(ExecutionException.class, + () -> commandsClient.updateCommandExecution(updateCommandExecutionRequest).get()); + } + } catch (Exception ex) { + ex.printStackTrace(); + Assertions.fail("doCommandsControlTest triggered exception"); + } + } + + @Test + public void handleCommandExecutionWithClient5() { + assumeTrue(hasTestEnvironment()); + setupCommandsClient5(null); + doClientCommandTest(); + } + + @Test + public void handleCommandExecutionWithClient311() { + assumeTrue(hasTestEnvironment()); + setupCommandsClient311(null); + doClientCommandTest(); + } + + void doCommandTest_FailIncompleteRequests() { + CommandExecutionsSubscriptionRequest commandExecutionsSubscriptionRequest = new CommandExecutionsSubscriptionRequest(); + + Assertions.assertThrows(CrtRuntimeException.class, () -> createCommandExecutionsJsonStream(commandExecutionsSubscriptionRequest)); + + commandExecutionsSubscriptionRequest.deviceType = DeviceType.THING; + commandExecutionsSubscriptionRequest.deviceId = testContext.thingName; + try (StreamingOperation commandExecutionsJsonStream = createCommandExecutionsJsonStream(commandExecutionsSubscriptionRequest)) { + + { + StartCommandExecutionRequest request = StartCommandExecutionRequest.builder() + .commandArn(testContext.commandInfo.commandArn()) + .executionTimeoutSeconds(10L) + .targetArn(testContext.thingArn) + .build(); + testContext.commandExecutionInfo = iotJobsDataPlaneClient.startCommandExecution(request); + } + + // wait for initial stream events to trigger + waitForStreamEvents(); + + { + UpdateCommandExecutionRequest updateCommandExecutionRequest = new UpdateCommandExecutionRequest(); + Assertions.assertThrows(ExecutionException.class, () -> commandsClient.updateCommandExecution(updateCommandExecutionRequest).get()); + } + } catch (Exception ex) { + ex.printStackTrace(); + Assertions.fail("doCommandsControlTest triggered exception"); + } + } + + @Test + public void handleCommandExecutionIncompleteRequestsFailures5() { + assumeTrue(hasTestEnvironment()); + setupCommandsClient5(null); + doCommandTest_FailIncompleteRequests(); + } + + @Test + public void handleCommandExecutionIncompleteRequestsFailures311() { + assumeTrue(hasTestEnvironment()); + setupCommandsClient311(null); + doCommandTest_FailIncompleteRequests(); + } + + void doCommandTest_FailInvalidRequests() { + CommandExecutionsSubscriptionRequest commandExecutionsSubscriptionRequest = new CommandExecutionsSubscriptionRequest(); + commandExecutionsSubscriptionRequest.deviceType = DeviceType.CLIENT; + commandExecutionsSubscriptionRequest.deviceId = testContext.mqttClientId; + + try (StreamingOperation commandExecutionsJsonStream = createCommandExecutionsJsonStream(commandExecutionsSubscriptionRequest)) { + + { + StartCommandExecutionRequest request = StartCommandExecutionRequest.builder() + .commandArn(testContext.commandInfo.commandArn()) + .executionTimeoutSeconds(10L) + .targetArn(testContext.mqttClientArn) + .build(); + + testContext.commandExecutionInfo = iotJobsDataPlaneClient.startCommandExecution(request); + } + + // wait for initial stream events to trigger + waitForStreamEvents(); + + { + UpdateCommandExecutionRequest request = new UpdateCommandExecutionRequest(); + request.executionId = testContext.commandExecutionInfo.executionId(); + request.deviceType = DeviceType.THING; + request.deviceId = "inexistent-thing"; + request.status = CommandExecutionStatus.FAILED; + Assertions.assertThrows(ExecutionException.class, () -> commandsClient.updateCommandExecution(request).get()); + } + + { + UpdateCommandExecutionRequest request = new UpdateCommandExecutionRequest(); + request.executionId = "inexistent-execution"; + request.deviceType = DeviceType.THING; + request.deviceId = testContext.thingName; + request.status = CommandExecutionStatus.FAILED; + Assertions.assertThrows(ExecutionException.class, () -> commandsClient.updateCommandExecution(request).get()); + } + } + } + + @Test + public void handleCommandExecutionInvalidRequestsFailures5() { + assumeTrue(hasTestEnvironment()); + setupCommandsClient5(null); + doCommandTest_FailInvalidRequests(); + } + + @Test + public void handleCommandExecutionInvalidRequestsFailures311() { + assumeTrue(hasTestEnvironment()); + setupCommandsClient311(null); + doCommandTest_FailInvalidRequests(); + } + +} diff --git a/sdk/tests/v2serviceclients/V2ServiceClientTestFixture.java b/sdk/tests/v2serviceclients/V2ServiceClientTestFixture.java index 85f2c72c6..51ad11a73 100644 --- a/sdk/tests/v2serviceclients/V2ServiceClientTestFixture.java +++ b/sdk/tests/v2serviceclients/V2ServiceClientTestFixture.java @@ -50,7 +50,7 @@ boolean hasProvisioningTestEnvironment() { return provisioningHost != null && provisioningCertificatePath != null && provisioningKeyPath != null; } - private void setupMqtt5Client(String host, String certificatePath, String keyPath) { + private void setupMqtt5Client(String host, String certificatePath, String keyPath, String clientId) { try (AwsIotMqtt5ClientBuilder builder = AwsIotMqtt5ClientBuilder.newDirectMqttBuilderWithMtlsFromPath( host, certificatePath, keyPath)) { @@ -80,7 +80,11 @@ public void onStopped(Mqtt5Client client, OnStoppedReturn onStoppedReturn) {} builder.withLifeCycleEvents(eventHandler); ConnectPacket.ConnectPacketBuilder connectBuilder = new ConnectPacket.ConnectPacketBuilder(); - connectBuilder.withClientId("test-" + UUID.randomUUID().toString()); + if (clientId != null) { + connectBuilder.withClientId(clientId); + } else { + connectBuilder.withClientId("test-" + UUID.randomUUID().toString()); + } builder.withConnectProperties(connectBuilder); this.mqtt5Client = builder.build(); @@ -95,18 +99,25 @@ public void onStopped(Mqtt5Client client, OnStoppedReturn onStoppedReturn) {} } void setupBaseMqtt5Client() { - setupMqtt5Client(baseHost, baseCertificatePath, baseKeyPath); + setupMqtt5Client(baseHost, baseCertificatePath, baseKeyPath, null); + } + + void setupBaseMqtt5Client(String clientId) { + setupMqtt5Client(baseHost, baseCertificatePath, baseKeyPath, clientId); } void setupProvisioningMqtt5Client() { - setupMqtt5Client(provisioningHost, provisioningCertificatePath, provisioningKeyPath); + setupMqtt5Client(provisioningHost, provisioningCertificatePath, provisioningKeyPath, null); } - private void setupMqtt311Client(String host, String certificatePath, String keyPath) { + private void setupMqtt311Client(String host, String certificatePath, String keyPath, String clientId) { try (AwsIotMqttConnectionBuilder builder = AwsIotMqttConnectionBuilder.newMtlsBuilderFromPath(certificatePath, keyPath)) { builder.withEndpoint(host); - String clientId = "test-" + UUID.randomUUID().toString(); - builder.withClientId(clientId); + if (clientId != null) { + builder.withClientId(clientId); + } else { + builder.withClientId("test-" + UUID.randomUUID().toString()); + } this.mqtt311Client = builder.build(); @@ -119,11 +130,15 @@ private void setupMqtt311Client(String host, String certificatePath, String keyP } void setupBaseMqtt311Client() { - setupMqtt311Client(baseHost, baseCertificatePath, baseKeyPath); + setupMqtt311Client(baseHost, baseCertificatePath, baseKeyPath, null); + } + + void setupBaseMqtt311Client(String clientId) { + setupMqtt311Client(baseHost, baseCertificatePath, baseKeyPath, clientId); } void setupProvisioningMqtt311Client() { - setupMqtt311Client(provisioningHost, provisioningCertificatePath, provisioningKeyPath); + setupMqtt311Client(provisioningHost, provisioningCertificatePath, provisioningKeyPath, null); } @AfterEach From f2eb533ac4368380c748ae5efbe82c80a4bc175b Mon Sep 17 00:00:00 2001 From: Igor Abdrakhimov Date: Thu, 5 Jun 2025 10:59:58 -0700 Subject: [PATCH 5/6] Remove IotCommandsClient --- .../iot/iotcommands/IotCommandsClient.java | 71 ------------------- 1 file changed, 71 deletions(-) delete mode 100644 sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/IotCommandsClient.java diff --git a/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/IotCommandsClient.java b/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/IotCommandsClient.java deleted file mode 100644 index e8dd2fdbd..000000000 --- a/sdk/src/main/java/software/amazon/awssdk/iot/iotcommands/IotCommandsClient.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - * - * This file is generated. - */ - -package software.amazon.awssdk.iot.iotcommands; - -import software.amazon.awssdk.iot.iotcommands.model.CommandExecutionEvent; -import software.amazon.awssdk.iot.iotcommands.model.CommandExecutionStatus; -import software.amazon.awssdk.iot.iotcommands.model.CommandExecutionsSubscriptionRequest; -import software.amazon.awssdk.iot.iotcommands.model.DeviceType; -import software.amazon.awssdk.iot.iotcommands.model.RejectedErrorCode; -import software.amazon.awssdk.iot.iotcommands.model.StatusReason; -import software.amazon.awssdk.iot.iotcommands.model.UpdateCommandExecutionRequest; -import software.amazon.awssdk.iot.iotcommands.model.UpdateCommandExecutionResponse; -import software.amazon.awssdk.iot.iotcommands.model.V2ErrorResponse; - -import java.nio.charset.StandardCharsets; - -import software.amazon.awssdk.crt.mqtt.MqttClientConnection; -import software.amazon.awssdk.crt.mqtt.QualityOfService; -import software.amazon.awssdk.crt.mqtt.MqttException; -import software.amazon.awssdk.crt.mqtt.MqttMessage; - -import software.amazon.awssdk.iot.Timestamp; -import software.amazon.awssdk.iot.EnumSerializer; - -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; - -import java.io.UnsupportedEncodingException; -import java.nio.ByteBuffer; -import java.util.concurrent.CompletableFuture; -import java.util.function.Consumer; - -/** - * The AWS IoT commands service is used to send an instruction from the cloud to a device that is connected to AWS IoT. - * - * AWS documentation: https://docs.aws.amazon.com/iot/latest/developerguide/iot-remote-command.html - * -*/ -public class IotCommandsClient { - private MqttClientConnection connection = null; - private final Gson gson = getGson(); - - /** - * Constructs a new IotCommandsClient - * @param connection The connection to use - */ - public IotCommandsClient(MqttClientConnection connection) { - this.connection = connection; - } - - private Gson getGson() { - GsonBuilder gson = new GsonBuilder(); - gson.disableHtmlEscaping(); - gson.registerTypeAdapter(Timestamp.class, new Timestamp.Serializer()); - gson.registerTypeAdapter(Timestamp.class, new Timestamp.Deserializer()); - addTypeAdapters(gson); - return gson.create(); - } - - private void addTypeAdapters(GsonBuilder gson) { - gson.registerTypeAdapter(CommandExecutionStatus.class, new EnumSerializer()); - gson.registerTypeAdapter(DeviceType.class, new EnumSerializer()); - gson.registerTypeAdapter(RejectedErrorCode.class, new EnumSerializer()); - } - -} From d0f7dfc4c21b2784c0837012f279c70d2e8c87dc Mon Sep 17 00:00:00 2001 From: Igor Abdrakhimov Date: Wed, 11 Jun 2025 14:28:59 -0700 Subject: [PATCH 6/6] IoT Commands sample (#634) --- pom.xml | 1 + samples/CommandsSandbox/README.md | 434 +++++++++++ samples/CommandsSandbox/pom.xml | 63 ++ .../main/java/commands/CommandsSandbox.java | 681 ++++++++++++++++++ samples/README.md | 1 + 5 files changed, 1180 insertions(+) create mode 100644 samples/CommandsSandbox/README.md create mode 100644 samples/CommandsSandbox/pom.xml create mode 100644 samples/CommandsSandbox/src/main/java/commands/CommandsSandbox.java diff --git a/pom.xml b/pom.xml index 6233e5460..3f16b8ae3 100644 --- a/pom.xml +++ b/pom.xml @@ -24,6 +24,7 @@ samples/Provisioning/Basic samples/Provisioning/Csr samples/JobsSandbox + samples/CommandsSandbox samples/ShadowSandbox samples/Mqtt5/PubSub samples/Mqtt5/SharedSubscription diff --git a/samples/CommandsSandbox/README.md b/samples/CommandsSandbox/README.md new file mode 100644 index 000000000..9a3cf390c --- /dev/null +++ b/samples/CommandsSandbox/README.md @@ -0,0 +1,434 @@ +# Commands Sandbox + +[**Return to main sample list**](../README.md) + +This is an interactive sample that allows you to use the AWS IoT [Commands](https://docs.aws.amazon.com/iot/latest/developerguide/iot-remote-command.html) +service to receive and process remote instructions. + +In a real use case, control plane commands (the actions performed by aws-sdk-java-v2) would be issued by another application +under control of the customer, while data plane operations (the actions performed by the IoT SDK Java v2) would be issued +by software running on the IoT device itself. + +Using the IoT Commands service and this sample requires an understanding of two closely-related but different service terms: +* **AWS IoT Command** - metadata describing a task that the user would like one or more devices to run. +* **AWS IoT Command Execution** - metadata describing the state of a single device's attempt to execute an AWS IoT Command. + +In particular, you can define an **AWS IoT Command** and then send it multiple times to the same device. The device will +try to execute each received **AWS IoT Command Execution**. + +AWS IoT command service uses different MQTT topics for different payload formats. This allows a device to choose which +AWS IoT command payload formats to receive. AWS IoT Commands service distinguishes the following payload formats: +- JSON +- CBOR +- generic (i.e. everything else) + +If your device wants to receive both JSON and CBOR payloads, it will need to subscribe to two topics using two separate +API calls. On the other hand, if your device needs to receive, for example, "plain/text" and "my-custom-format" payloads, +it has to subscribe to the generic MQTT topic and distinguish received IoT commands by the payload-type field. + +### Interaction with sample application + +> [!NOTE] +> In this sample, the term command can have two different meanings: +> - AWS IoT command - a description of a task defined in the AWS IoT Commands service. +> - sample command - an action that this sample application can perform, such as `open-thing-stream`. +> +> To avoid confusion, `sample command` is replaced with `sample instruction`. While this phrase might not be conventional, +> it helps maintain clarity throughout this document. + +Once connected, the sample supports the following instructions: + +Control Plane +* `list-commands` - list all AWS IoT commands available in the AWS account +* `create-command` - create a new AWS IoT command +* `delete-command` - delete an AWS IoT command +* `send-command-to-thing` - create an AWS IoT command execution targeted for the IoT thing +* `send-command-to-client` - create an AWS IoT command execution targeted for the MQTT client +* `get-command-execution`- get status of the AWS IoT command execution + +Data Plane +* `open-thing-stream ` - subscribe to a stream of AWS IoT command executions with a specified payload format + targeting the IoT Thing set on the application startup +* `open-client-stream ` - subscribe to a stream of AWS IoT command executions with a specified payload format + targeting the MQTT client ID set on the application startup +* `update-command-execution \ \[\] \[\]` - update status for specified + execution ID; + * status can be one of the following: IN_PROGRESS, SUCCEEDED, REJECTED, FAILED, TIMED_OUT + * reason-code and reason-description may be optionally provided for the REJECTED, FAILED, or TIMED_OUT statuses + +Miscellaneous +* `list-streams` - list all open streaming operations +* `close-stream ` - close a specified stream; is an internal ID that can be found with 'list-streams' +* `quit` - quit the sample application + +### Prerequisites +Your IoT Core Thing's [Policy](https://docs.aws.amazon.com/iot/latest/developerguide/iot-policies.html) must provide privileges +for this sample to connect, subscribe, publish, and receive in order to perform its data plane operations. Below is a sample +policy that can be used on your IoT Core Thing that will allow this sample to run as intended. + +
+Sample Policy +
+{
+  "Version": "2012-10-17",
+  "Statement": [
+    {
+      "Effect": "Allow",
+      "Action": "iot:Publish",
+      "Resource": [
+        "arn:aws:iot:<region>:<account>:topic/$aws/commands/<device_type>/<device_id>/executions/*/response/json",
+      ]
+    },
+    {
+      "Effect": "Allow",
+      "Action": "iot:Receive",
+      "Resource": [
+        "arn:aws:iot:<region>:<account>:topic/$aws/commands/<device_type>/<device_id>/executions/*/request/*",
+        "arn:aws:iot:<region>:<account>:topic/$aws/commands/<device_type>/<device_id>/executions/*/response/accepted/json",
+        "arn:aws:iot:<region>:<account>:topic/$aws/commands/<device_type>/<device_id>/executions/*/response/rejected/json"
+      ]
+    },
+    {
+      "Effect": "Allow",
+      "Action": "iot:Subscribe",
+      "Resource": [
+        "arn:aws:iot:<region>:<account>:topicfilter/$aws/commands/<device_type>/<device_id>/executions/*/request/*",
+        "arn:aws:iot:<region>:<account>:topicfilter/$aws/commands/<device_type>/<device_id>/executions/*/response/accepted/json",
+        "arn:aws:iot:<region>:<account>:topicfilter/$aws/commands/<device_type>/<device_id>/executions/*/response/rejected/json"
+      ]
+    },
+    {
+      "Effect": "Allow",
+      "Action": "iot:Connect",
+      "Resource": "arn:aws:iot:<region>:<account>:client/<mqtt_client_id>"
+    }
+  ]
+}
+
+ +Replace with the following with the data from your AWS account: +* ``: The AWS IoT Core region where you created your AWS IoT Core thing you wish to use with this sample. For example `us-east-1`. +* ``: Your AWS IoT Core account ID. This is the set of numbers in the top right next to your AWS account name when using the AWS IoT Core website. +* ``: Can be either `things` or `clients`. +* ``: Depending on `` value, this is either IoT Thing name or MQTT client ID. Note that for a case + when `` is set to `clients`, `` will be the same as ``. +* ``: MQTT client ID used for connection. + +Note that in a real application, you may want to avoid the use of wildcards in your ClientID or use them selectively. +Please follow best practices when working with AWS on production applications using the SDK. Also, for the purposes of +this sample, please make sure your policy allows a client ID of `test-*` to connect or use `--client_id ` +to send the client ID your policy supports. + +
+ +The triggered control plane operations in the walkthrough require AWS credentials with appropriate permissions to be +sourceable. The following permissions must be granted: +
+Sample Policy +
+{
+    "Version": "2012-10-17",
+    "Statement": [
+        {
+            "Action": "iot:ListCommands",
+            "Effect": "Allow",
+            "Resource": "arn:aws:iot:<region>:<account>:command/*"
+        },
+        {
+            "Action": "iot:CreateCommand",
+            "Effect": "Allow",
+            "Resource": "arn:aws:iot:<region>:<account>:command/<command_name>"
+        },
+        {
+            "Action": "iot:GetCommand",
+            "Effect": "Allow",
+            "Resource": "arn:aws:iot:<region>:<account>:command/<command_name>"
+        },
+        {
+            "Action": "iot:DeleteCommand",
+            "Effect": "Allow",
+            "Resource": "arn:aws:iot:<region>:<account>:command/<command_name>"
+        },
+        {
+            "Action": "iot:StartCommandExecution",
+            "Effect": "Allow",
+            "Resource": [
+                "arn:aws:iot:<region>:<account>:command/<command_name>",
+                "arn:aws:iot:<region>:<account>:<devices>/<device_id>"
+            ]
+        }
+    ]
+}
+
+ +Replace with the following with the data from your AWS account: +* ``: The AWS IoT Core region where you created your AWS IoT Core thing you wish to use with this sample. + For example `us-east-1`. +* ``: Your AWS IoT Core account ID. This is the set of numbers in the top right next to your AWS account name + when using the AWS IoT Core website. +* ``: The unique identifier for your AWS IoT command, such as `LockDoor`. If you want to use more than + one command, you can use `*` (e.g. `test-*`) or specify multiple commands under the Resource section in the IAM policy. +* ``: Must be either `thing` or `client` depending on whether your devices have been registered as AWS IoT things, + or are specified as MQTT clients. +* ``: Depending on `` value, this is either IoT Thing name or MQTT client ID. + +
+ +## Running the Sample + +To run the sample, use the following Shell command: + +```shell +mvn compile exec:java -pl samples/CommandsSandbox -Dexec.mainClass=commands.CommandsSandbox \ + -Dexec.args="--endpoint --cert --key --thing --client-id " +``` + +If an AWS IoT Thing resource with the given name does not exist, the sample will first create it. Once the thing exists, +the sample connects via MQTT, and you can issue instructions to the Command service and inspect the results. + +## Walkthrough + +### Creating AWS IoT Commands + +We'll start with the creation of a couple of AWS IoT commands. When creating the AWS IoT command, you must provide a payload. +The payload that you provide is base64 encoded. + +The first AWS IoT command contains JSON payload: +``` +create-command sample-json-command application/json { "message": "Hello IoT" } +``` + +The second AWS IoT command will be a plain text: +``` +create-command sample-text-command plain/text hello +``` + +You can examine the newly created AWS IoT commands in AWS Console or using the following sample instruction: + +``` +list-commands +``` + +yields output like the following: +``` +Command: + CommandSummary(CommandArn=arn:aws:iot:...:command/sample-text-command, CommandId=sample-text-command, Deprecated=false, CreatedAt=..., LastUpdatedAt=..., PendingDeletion=false) +Command: + CommandSummary(CommandArn=arn:aws:iot:...:command/sample-json-command, CommandId=sample-json-command, Deprecated=false, CreatedAt=..., LastUpdatedAt=..., PendingDeletion=false) +``` + +### Subscribing to AWS IoT Command Executions + +Now, let's subscribe to AWS IoT commands with JSON payloads using the following sample instruction: +``` +open-thing-stream application/json +``` + +Let's open another stream, this time for generic payloads and MQTT client: +``` +open-client-stream generic +``` + +To examine the open streaming operations use the `list-streams` sample instruction: +``` +list-streams +``` + +and you will see something like this +``` +Streams: + 1: device type 'things', device ID 'MyIotThing', payload type 'application/json' + 2: device type 'clients', device ID 'MyIotThing', payload type 'generic' +``` + +You can close a streaming operation using the `close-stream` sample instruction: +``` +close-stream +``` +, where `` is a sequence number of the operation. For our example, JSON operation has stream ID 1 and generic +operation has stream ID 2. + +For example, to close `generic` stream, execute this sample instruction: +``` +close-stream 2 +``` + +### Sending AWS IoT Command Executions + +AWS IoT command just defines a set of instructions. It cannot target any device. For sending an AWS IoT command to a device, +you need to create AWS IoT command execution. + +This can be done with the following sample instructions: + +``` +send-command-to-thing sample-json-command +``` +and +``` +send-command-to-client sample-text-command +``` + +When no timeout for an AWS IoT commands execution is specified, AWS IoT Core sets the timeout to a default value of 10 seconds. +So, the sample should receive these newly created AWS IoT command executions and output something similar to: + +``` +Received new command execution + execution ID: 11111111-1111-1111-1111-111111111111 + payload format: application/json + execution timeout: 9 + payload size: 26 + JSON payload: '{ "message": "Hello IoT" }' +``` +and, unless you closed the corresponding stream on the previous step, +``` +Received new command execution + execution ID: 22222222-2222-2222-2222-222222222222 + payload format: plain/text + execution timeout: 9 + payload size: 5 +``` + +> [!IMPORTANT] +> IoT Java SDK v2 does not parse the payload of the incoming AWS IoT commands. User code gets an object containing binary +> data for payload and additionally a payload format if it was specified for the AWS IoT command. User code is supposed +> to parse payload itself. +> This sample parses JSON data only, to demonstrate how it can be achieved. All other formats are left out for simplicity. + +Since we didn't specify a timeout for a newly created AWS IoT command execution, your device has only 9-10 seconds to +report back the execution status, which is not enough for an interactive application. +The AWS IoT command execution you sent will probably time out before you manage to send the status update. + +You can check the AWS IoT command execution status using the following sample instruction (remember to change execution ID +to the one you actually received): + +``` +get-command-execution 11111111-1111-1111-1111-111111111111 +``` + +will yield something like this: + +``` +Status of command execution '11111111-1111-1111-1111-111111111111' is TIMED_OUT + Reason code: $NO_RESPONSE_FROM_DEVICE + Reason description: null +``` + +Let's send another AWS IoT command execution, this time with a timeout more suitable for our sample. Notice that we use +the same AWS IoT command, the only thing that changed is the execution timeout value: + +``` +send-command-to-thing sample-json-command 300 +``` + +The running sample will receive another notification, with the new execution ID: +``` +Received new command execution + execution ID: 33333333-3333-3333-3333-333333333333 + payload format: application/json + execution timeout: 299 + payload size: 26 + JSON payload: '{ "message": "Hello IoT" }' +``` + +Let's proceed to the next section where we're going to update the status of an AWS IoT command execution. + +### Updating and monitoring AWS IoT command execution status + +The sample didn't yet update the status of the AWS IoT command execution, so the following sample instruction + +``` +get-command-execution 33333333-3333-3333-3333-333333333333 +``` + +should return `CREATED` status: + +``` +Status of command execution '33333333-3333-3333-3333-333333333333' is CREATED +``` + +To update the status of a received AWS IoT command execution, we should use the `update-command-execution` sample instruction. +Take an AWS IoT command execution ID your sample received at the end of the previous section and pass it to +`update-command-execution` along with the `IN_PROGRESS` status: +``` +update-command-execution 33333333-3333-3333-3333-333333333333 IN_PROGRESS +``` + +Then checking once again for the AWS IoT command execution status with +``` +get-command-execution 33333333-3333-3333-3333-333333333333 +``` + +should return + +``` +Status of Command execution '33333333-3333-3333-3333-333333333333' is IN_PROGRESS +``` + +`IN_PROGRESS` is an intermediary execution status, i.e. it's possible to change this status. +`SUCCEEDED`, `FAILED`, and `REJECTED` statuses are terminal - when you set the AWS IoT command execution status to one +of them, it's final. + +There is also the `TIMED_OUT` status. Though it's supposed to be set by the service side when there is no response from +the device in `timeout` time, your application may provide additional info by setting the `statusReason` field in the update +event. + +Let's set the AWS IoT command execution status to one of the terminal states with sample instruction: +``` +update-command-execution 33333333-3333-3333-3333-333333333333 SUCCEEDED +``` +or +``` +update-command-execution 33333333-3333-3333-3333-333333333333 FAILED SHORT_FAILURE_CODE A longer description +``` + +If you try to update the status of the same AWS IoT command execution to something else, it'll fail: +``` +update-command-execution 33333333-3333-3333-3333-333333333333 REJECTED +``` + +will yield +``` +update-command-execution ExecutionException! + update-command-execution source exception: Request-response operation failure + update-command-execution Modeled error: {"error":"TERMINAL_STATE_REACHED","errorMessage":"Command Execution status cannot be updated to REJECTED since execution has already completed with status SUCCEEDED.","executionId":"33333333-3333-3333-3333-333333333333"} +``` + +### Cleaning up + +When all executions for a given AWS IoT command have reached a terminal state (`SUCCEEDED`, `FAILED`, `REJECTED`), you +can delete the AWS IoT command itself with the following sample instruction: + +``` +delete-command sample-json-command +``` +and +``` +delete-command sample-text-command +``` + +Now the `list-commands` sample instruction will show that these two AWS IoT commands are pending deletion. + +### Misc Topics + +### What happens if I open the same stream twice? + +The Java AWS IoT Commands client **does** allow you to subscribe multiple times to the same stream of events. You can even +do this using this sample, just execute the same opening stream sample instruction few times. The client will receive event +for each opened subscription. + +A real-world application may prevent such situations by tracking which streams are open. The uniqueness of the AWS IoT +command executions stream is determined by `device type`, `device ID`, and `payload format`. Most probably, `device type` +and `device ID` will be constant, so the application needs to check `payload format`. Notice that Aws IoT Commands service +distinguishes only JSON and CBOR, all other payload formats will be routed to the generic stream. + +#### What is the proper generic architecture for a command-processing application running on a device? + +1. On startup, create and open streaming operations for the necessary AWS IoT command events using + `IotCommandsV2Client.createCommandExecutionsJsonPayloadStream`, `IotCommandsV2Client.CreateCommandExecutionsCborPayloadStream`, + and/or `IotCommandsV2Client.CreateCommandExecutionsGenericPayloadStream` functions. +2. **DO NOT** process received AWS IoT commands right in the callback passed to `CreateCommandExecutions*PayloadStream`. + As a general rule, **DO NOT** perform any time-consuming or blocking operations in the callback. One possible approache + is to put incoming IoT commands into a shared queue. Then the designated executor(s) should process them in separate + thread(s). +3. If your application is expected to receive a lot of AWS IoT commands, monitor the number of them enqueued for processing. + Consider introducing priorities based on AWS IoT command timeouts or another internal value. diff --git a/samples/CommandsSandbox/pom.xml b/samples/CommandsSandbox/pom.xml new file mode 100644 index 000000000..e1dfebcfd --- /dev/null +++ b/samples/CommandsSandbox/pom.xml @@ -0,0 +1,63 @@ + + 4.0.0 + software.amazon.awssdk.iotdevicesdk + CommandsSandbox + jar + 1.0-SNAPSHOT + ${project.groupId}:${project.artifactId} + Interactive sample using the MQTT Commands service + https://github.com/awslabs/aws-iot-device-sdk-java-v2 + + 1.8 + 1.8 + UTF-8 + + + + commons-cli + commons-cli + 1.9.0 + + + software.amazon.awssdk + iot + 2.31.44 + + + software.amazon.awssdk + iotjobsdataplane + 2.31.44 + + + software.amazon.awssdk + sts + 2.31.44 + + + + + latest-release + + + software.amazon.awssdk.iotdevicesdk + aws-iot-device-sdk + 1.25.1 + + + + + default + + true + + + + software.amazon.awssdk.iotdevicesdk + aws-iot-device-sdk + 1.0.0-SNAPSHOT + + + + + diff --git a/samples/CommandsSandbox/src/main/java/commands/CommandsSandbox.java b/samples/CommandsSandbox/src/main/java/commands/CommandsSandbox.java new file mode 100644 index 000000000..11973d008 --- /dev/null +++ b/samples/CommandsSandbox/src/main/java/commands/CommandsSandbox.java @@ -0,0 +1,681 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +package commands; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.crt.CRT; +import software.amazon.awssdk.crt.CrtResource; +import software.amazon.awssdk.crt.iot.*; +import software.amazon.awssdk.crt.mqtt5.*; +import software.amazon.awssdk.crt.mqtt5.packets.ConnectPacket; +import software.amazon.awssdk.iot.AwsIotMqtt5ClientBuilder; +import software.amazon.awssdk.iot.iotcommands.IotCommandsV2Client; +import software.amazon.awssdk.iot.iotcommands.model.*; +import software.amazon.awssdk.iot.V2ClientStreamOptions; +import software.amazon.awssdk.iot.iotcommands.model.StatusReason; +import software.amazon.awssdk.services.iotjobsdataplane.IotJobsDataPlaneClient; +import software.amazon.awssdk.services.iotjobsdataplane.model.StartCommandExecutionRequest; +import software.amazon.awssdk.services.iotjobsdataplane.model.StartCommandExecutionResponse; + +import software.amazon.awssdk.iot.iotcommands.model.CommandExecutionStatus; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.iot.IotClient; +import software.amazon.awssdk.services.iot.model.*; +import software.amazon.awssdk.services.sts.StsClient; + +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutionException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import software.amazon.awssdk.services.sts.model.GetCallerIdentityRequest; +import software.amazon.awssdk.services.sts.model.GetCallerIdentityResponse; + + +public class CommandsSandbox { + + /** + * Auxiliary class to store a received command execution. + * It's used for the updating command execution status. + */ + static class CommandExecutionContext { + DeviceType deviceType; + String deviceId; + String deviceArn; + } + + /** + * Auxiliary class to store an opened streaming operation data. + * It's used to report on the list-streams command. + */ + static class StreamingOperationContext { + StreamingOperation operation; + DeviceType deviceType; + String deviceId; + String payloadType; + } + + static class ApplicationContext implements AutoCloseable { + public final Gson gson = createGson(); + public final CompletableFuture connectedFuture = new CompletableFuture<>(); + public final CompletableFuture stoppedFuture = new CompletableFuture<>(); + + int streamId; + private Map commandExecutionsStreams; + + private Map activeCommandExecutions; + + public String thingName; + public String thingArn; + public String mqttClientId; + public String mqttClientArn; + + public StsClient stsClient; + public IotClient controlPlaneClient; + public IotJobsDataPlaneClient iotJobsDataPlaneClient; + public Mqtt5Client protocolClient; + public IotCommandsV2Client commandsClient; + + public void close() { + if (this.commandExecutionsStreams != null) { + this.commandExecutionsStreams.values().forEach(context -> context.operation.close()); + } + + if (this.commandsClient != null) { + this.commandsClient.close(); + } + + if (this.iotJobsDataPlaneClient != null) { + this.iotJobsDataPlaneClient.close(); + } + + if (this.protocolClient != null) { + this.protocolClient.close(); + } + + if (this.controlPlaneClient != null) { + this.controlPlaneClient.close(); + } + + if (this.stsClient != null) { + this.stsClient.close(); + } + } + + public void setMqttClientArn(String region) { + GetCallerIdentityResponse response = stsClient.getCallerIdentity(GetCallerIdentityRequest.builder().build()); + String accountId = response.account(); + mqttClientArn = String.format("arn:aws:iot:%s:%s:client/%s", region, accountId, mqttClientId); + } + + private static Gson createGson() { + GsonBuilder builder = new GsonBuilder(); + builder.disableHtmlEscaping(); + return builder.create(); + } + } + + private static ApplicationContext buildSampleContext(String[] args) throws Exception { + ApplicationContext context = new ApplicationContext(); + + Options cliOptions = new Options(); + + cliOptions.addOption(Option.builder("c").longOpt("cert").desc("file path to an X509 certificate to use when establishing mTLS context").hasArg().required().build()); + cliOptions.addOption(Option.builder("k").longOpt("key").desc("file path to an X509 private key to use when establishing mTLS context").hasArg().required().build()); + cliOptions.addOption(Option.builder("t").longOpt("thing").desc("name of the AWS IoT thing resource to interact with").hasArg().required().build()); + cliOptions.addOption(Option.builder("i").longOpt("client-id").desc("ID of the MQTT client to interact with").hasArg().build()); + cliOptions.addOption(Option.builder("e").longOpt("endpoint").desc("AWS IoT endpoint to connect to").hasArg().required().build()); + cliOptions.addOption(Option.builder("r").longOpt("region").desc("AWS Region the AWS IoT endpoint is using").hasArg().build()); + cliOptions.addOption(Option.builder("h").longOpt("help").desc("Prints command line help").build()); + + CommandLineParser parser = new DefaultParser(); + CommandLine commandLine = parser.parse(cliOptions, args); + + if (commandLine.hasOption("help")) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("CommandsSandbox", cliOptions); + return null; + } + + String endpoint = commandLine.getOptionValue("endpoint"); + String region = null; + if (commandLine.hasOption("region")) { + region = commandLine.getOptionValue("region"); + } + + if (region == null) { + System.out.println("No region supplied on the command line, attempting to extract from endpoint"); + + Pattern standardRegionPattern = Pattern.compile(".*\\.iot.*\\.([^.]+)\\.amazonaws\\.com"); + Matcher standardMatch = standardRegionPattern.matcher(endpoint); + if (standardMatch.find()) { + region = standardMatch.group(1); + } else { + System.out.println("ERROR: could not determine region from endpoint"); + return null; + } + } + + System.out.println(String.format("Using region '%s'", region)); + + // needed to pull in STS to the class path so that profile-based STS lookups work correctly + context.stsClient = StsClient.builder() + .region(Region.of(region)) + .build(); + + context.controlPlaneClient = IotClient.builder() + .region(Region.of(region)) + .build(); + + DescribeEndpointRequest describeEndpointRequest = + DescribeEndpointRequest.builder().endpointType("iot:Jobs").build(); + DescribeEndpointResponse describeEndpointResponse = context.controlPlaneClient.describeEndpoint(describeEndpointRequest); + + context.iotJobsDataPlaneClient = IotJobsDataPlaneClient.builder() + .region(Region.of(region)) + .endpointOverride(URI.create("https://" + describeEndpointResponse.endpointAddress())) + .build(); + + context.thingName = commandLine.getOptionValue("thing"); + context.mqttClientId = commandLine.getOptionValue("client-id", String.format("test-%s", UUID.randomUUID())); + context.setMqttClientArn(region); + + try { + context.controlPlaneClient.describeThing(DescribeThingRequest.builder().thingName(context.thingName).build()); + } catch (ResourceNotFoundException ex) { + System.out.println(String.format("Thing '%s' does not exist. Creating it...", context.thingName)); + context.controlPlaneClient.createThing(CreateThingRequest.builder().thingName(context.thingName).build()); + } + + DescribeThingResponse describeResponse = context.controlPlaneClient.describeThing(DescribeThingRequest.builder().thingName(context.thingName).build()); + context.thingArn = describeResponse.thingArn(); + + Mqtt5ClientOptions.LifecycleEvents lifecycleEvents = new Mqtt5ClientOptions.LifecycleEvents() { + @Override + public void onAttemptingConnect(Mqtt5Client client, OnAttemptingConnectReturn onAttemptingConnectReturn) { + System.out.println("Attempting connection..."); + } + + @Override + public void onConnectionSuccess(Mqtt5Client client, OnConnectionSuccessReturn onConnectionSuccessReturn) { + System.out.println("Connection success"); + context.connectedFuture.complete(null); + } + + @Override + public void onConnectionFailure(Mqtt5Client client, OnConnectionFailureReturn onConnectionFailureReturn) { + String errorString = CRT.awsErrorString(onConnectionFailureReturn.getErrorCode()); + System.out.println("Connection failed with error: " + errorString); + context.connectedFuture.completeExceptionally(new Exception("Could not connect: " + errorString)); + } + + @Override + public void onDisconnection(Mqtt5Client client, OnDisconnectionReturn onDisconnectionReturn) { + } + + @Override + public void onStopped(Mqtt5Client client, OnStoppedReturn onStoppedReturn) { + context.stoppedFuture.complete(null); + } + }; + + try (AwsIotMqtt5ClientBuilder builder = AwsIotMqtt5ClientBuilder.newDirectMqttBuilderWithMtlsFromPath( + commandLine.getOptionValue("endpoint"), commandLine.getOptionValue("cert"), commandLine.getOptionValue("key"))) { + builder.withLifeCycleEvents(lifecycleEvents); + + ConnectPacket.ConnectPacketBuilder connectProperties = new ConnectPacket.ConnectPacketBuilder(); + connectProperties.withClientId(context.mqttClientId); + builder.withConnectProperties(connectProperties); + + context.protocolClient = builder.build(); + } + + context.protocolClient.start(); + context.connectedFuture.get(); + + MqttRequestResponseClientOptions rrClientOptions = MqttRequestResponseClientOptions.builder() + .withMaxRequestResponseSubscriptions(5) + .withMaxStreamingSubscriptions(10) + .withOperationTimeoutSeconds(30) + .build(); + + context.streamId = 1; + context.commandExecutionsStreams = new HashMap(); + + context.activeCommandExecutions = new ConcurrentHashMap(); + + context.commandsClient = IotCommandsV2Client.newFromMqtt5(context.protocolClient, rrClientOptions); + + return context; + } + + private static void handleOperationException(String operationName, Exception ex, ApplicationContext context) { + if (ex instanceof ExecutionException) { + System.out.printf("%s ExecutionException!\n", operationName); + Throwable source = ex.getCause(); + if (source != null) { + System.out.printf(" %s source exception: %s\n", operationName, source.getMessage()); + if (source instanceof V2ErrorResponseException) { + V2ErrorResponseException v2exception = (V2ErrorResponseException) source; + if (v2exception.getModeledError() != null) { + System.out.printf(" %s Modeled error: %s\n", operationName, context.gson.toJson(v2exception.getModeledError())); + } + } + } + } else { + System.out.printf("%s Exception: %s\n", operationName, ex.getMessage()); + } + } + + private static void printCommandHelp() { + System.out.println("Usage\n"); + System.out.println(" IoT control plane commands:"); + System.out.println(" list-commands"); + System.out.println(" list all commands available in the AWS account"); + System.out.println(" create-command "); + System.out.println(" create a new AWS IoT command with the specified command ID and document;"); + System.out.println(" is a unique AWS IoT Command identifier"); + System.out.println(" a content type of the payload"); + System.out.println(" JSON and CBOR are handled specifically, see README for more information"); + System.out.println(" delete-command "); + System.out.println(" delete an AWS IoT command with the specified command ID"); + System.out.println(" send-command-to-thing []"); + System.out.println(" create an AWS IoT command execution targeted for the IoT thing specified"); + System.out.println(" at the application start"); + System.out.println(" send-command-to-client []"); + System.out.println(" create an AWS IoT command execution targeted for the MQTT client specified"); + System.out.println(" at the application start"); + System.out.println(" get-command-execution "); + System.out.println(" get status of the specified AWS IoT command execution\n"); + System.out.println(" MQTT Command service commands:"); + System.out.println(" open-thing-stream "); + System.out.println(" subscribe to a stream of command executions with a specified payload format"); + System.out.println(" targeting the IoT Thing set on the application startup"); + System.out.println(" is a string, with the following special values:"); + System.out.println(" application/json - subscribe to commands with JSON payload"); + System.out.println(" application/cbor - subscribe to commands with CBOR payload"); + System.out.println(" for any other value, subscribe to a generic topic"); + System.out.println(" open-client-stream "); + System.out.println(" subscribe to a stream of command executions with a specified payload format"); + System.out.println(" targeting the MQTT client ID set on the application startup"); + System.out.println(" is a string, with the following special values:"); + System.out.println(" application/json - subscribe to commands with JSON payload"); + System.out.println(" application/cbor - subscribe to commands with CBOR payload"); + System.out.println(" for any other value, subscribe to a generic topic"); + System.out.println(" update-command-execution [] []"); + System.out.println(" updates a command execution with a new status"); + System.out.println(" can be one of the following:"); + System.out.println(" IN_PROGRESS, SUCCEEDED, REJECTED, FAILED, TIMED_OUT"); + System.out.println(" and may be optionally provided for"); + System.out.println(" the REJECTED, FAILED, or TIMED_OUT statuses\n"); + System.out.println(" Miscellaneous commands:"); + System.out.println(" list-streams list all open streaming operations"); + System.out.println(" close-stream "); + System.out.println(" close a specified stream;"); + System.out.println(" is internal ID that can be found with 'list-streams' command"); + System.out.println(" quit exit the application\n"); + } + + private static void handleListCommands(ApplicationContext context) { + try { + ListCommandsResponse response = context.controlPlaneClient.listCommands(ListCommandsRequest.builder().build()); + response.commands().forEach(command -> { + System.out.printf("Command:\n %s\n", command.toString()); + }); + } catch (Exception ex) { + handleOperationException("list-commands", ex, context); + } + } + + private static void handleCreateCommand(ApplicationContext context, String arguments) { + String[] argumentSplit = arguments.trim().split(" ", 3); + if (argumentSplit.length < 2) { + printCommandHelp(); + return; + } + + try { + String commandId = argumentSplit[0]; + SdkBytes commandDocumentBytes = SdkBytes.fromUtf8String(argumentSplit[2]); + CommandPayload commandPayload = CommandPayload.builder() + .contentType(argumentSplit[1]) + .content(commandDocumentBytes) + .build(); + + CreateCommandResponse response = context.controlPlaneClient.createCommand(CreateCommandRequest.builder() + .commandId(commandId) + .payload(commandPayload) + .build()); + System.out.printf("CreateCommandResponse: \n %s\n%n", response.toString()); + } catch (Exception ex) { + handleOperationException("create-command", ex, context); + } + } + + private static void handleDeleteCommand(ApplicationContext context, String arguments) { + String commandId = arguments.trim(); + + try { + DeleteCommandResponse response = context.controlPlaneClient.deleteCommand(DeleteCommandRequest.builder().commandId(commandId).build()); + System.out.println(String.format("DeleteCommandResponse: \n %s\n", response.toString())); + } catch (Exception ex) { + handleOperationException("delete-command", ex, context); + } + } + + private static void handleSendCommand(ApplicationContext context, DeviceType deviceType, String arguments) { + String[] argumentSplit = arguments.trim().split(" ", 2); + + String commandId = argumentSplit[0]; + Long timeout = 10L; + if (argumentSplit.length > 1) { + timeout = Long.parseLong(argumentSplit[1]); + } + + String deviceArn; + if (deviceType == DeviceType.THING) { + deviceArn = context.thingArn; + } else { + deviceArn = context.mqttClientArn; + } + + try { + GetCommandRequest getCommandRequest = GetCommandRequest.builder().commandId(commandId).build(); + GetCommandResponse getCommandResponse = context.controlPlaneClient.getCommand(getCommandRequest); + + StartCommandExecutionRequest request = StartCommandExecutionRequest.builder() + .commandArn(getCommandResponse.commandArn()) + .executionTimeoutSeconds(timeout) + .targetArn(deviceArn) + .build(); + + context.iotJobsDataPlaneClient.startCommandExecution(request); + Thread.sleep(1000); + } catch (Exception ex) { + handleOperationException("send-command", ex, context); + } + } + + private static void handleGetCommandExecution(ApplicationContext context, String arguments) { + String commandExecutionId = arguments.trim(); + if (!context.activeCommandExecutions.containsKey(commandExecutionId)) { + System.out.printf("Failed to get command execution status: unknown command execution ID '%s'\n", commandExecutionId); + return; + } + + try { + CommandExecutionContext commandExecutionContext = context.activeCommandExecutions.get(commandExecutionId); + + GetCommandExecutionRequest getCommandExecutionRequest = GetCommandExecutionRequest.builder() + .executionId(commandExecutionId) + .targetArn(commandExecutionContext.deviceArn) + .build(); + GetCommandExecutionResponse getCommandExecutionResponse = context.controlPlaneClient.getCommandExecution(getCommandExecutionRequest); + System.out.printf("Status of command execution '%s' is %s\n", commandExecutionId, getCommandExecutionResponse.status()); + if (getCommandExecutionResponse.statusReason() != null) { + System.out.printf(" Reason code: %s\n", getCommandExecutionResponse.statusReason().reasonCode()); + System.out.printf(" Reason description: %s\n", getCommandExecutionResponse.statusReason().reasonDescription()); + } + } catch (Exception ex) { + handleOperationException("get-command-execution", ex, context); + } + } + + private static void handleUpdateCommandExecution(ApplicationContext context, String arguments) { + String[] argumentSplit = arguments.trim().split(" ", 4); + if (argumentSplit.length < 2) { + printCommandHelp(); + return; + } + + String commandExecutionId = argumentSplit[0]; + if (!context.activeCommandExecutions.containsKey(commandExecutionId)) { + System.out.printf("Failed to update command execution status: unknown command execution ID '%s'\n", commandExecutionId); + return; + } + + String statusStr = argumentSplit[1]; + + String reasonCode = null; + String reasonDescription = null; + if (argumentSplit.length > 3) { + reasonCode = argumentSplit[2]; + reasonDescription = argumentSplit[3]; + } + + try { + CommandExecutionContext commandExecutionContext = context.activeCommandExecutions.get(commandExecutionId); + UpdateCommandExecutionRequest request = new UpdateCommandExecutionRequest(); + request.executionId = commandExecutionId; + request.deviceType = commandExecutionContext.deviceType; + request.deviceId = commandExecutionContext.deviceId; + request.status = CommandExecutionStatus.valueOf(statusStr); + if (reasonCode != null && reasonDescription != null) { + request.statusReason = new StatusReason(); + request.statusReason.reasonCode = reasonCode; + request.statusReason.reasonDescription = reasonDescription; + } + + UpdateCommandExecutionResponse response = context.commandsClient.updateCommandExecution(request).get(); + System.out.printf("Successfully updated command execution '%s'\n", response.executionId); + } catch (Exception ex) { + handleOperationException("update-command-execution", ex, context); + } + } + + private static void handleOpenStream(ApplicationContext context, DeviceType deviceType, String payloadFormat) { + try { + String deviceId; + String deviceArn; + if (deviceType == DeviceType.THING) { + deviceId = context.thingName; + deviceArn = context.thingArn; + } else { + deviceId = context.mqttClientId; + deviceArn = context.mqttClientArn; + } + + int streamId = context.streamId++; + StreamingOperationContext streamingOperationContext = new StreamingOperationContext(); + context.commandExecutionsStreams.put(streamId, streamingOperationContext); + + V2ClientStreamOptions options = + V2ClientStreamOptions.builder().withStreamEventHandler(event -> { + System.out.println("Received new command execution"); + System.out.printf(" execution ID: %s\n", event.executionId); + System.out.printf(" payload format: %s\n", event.contentType); + System.out.printf(" execution timeout: %d\n", event.timeout); + System.out.printf(" payload size: %d\n", event.payload.length); + if (event.contentType.equals("application/json")) { + String payload = new String(event.payload, StandardCharsets.UTF_8); + System.out.printf(" JSON payload: '%s'\n", payload); + } + CommandExecutionContext commandExecutionContext = new CommandExecutionContext(); + commandExecutionContext.deviceId = deviceId; + commandExecutionContext.deviceType = deviceType; + commandExecutionContext.deviceArn = deviceArn; + context.activeCommandExecutions.put(event.executionId, commandExecutionContext); + }).withSubscriptionEventHandler( + event -> { + if (event.getError().isPresent()) { + System.out.printf("Error on opening stream: %d (%s)", event.getError().get(), CRT.awsErrorString(event.getError().get())); + context.commandExecutionsStreams.remove(streamId); + streamingOperationContext.operation.close(); + } + } + ).build(); + + CommandExecutionsSubscriptionRequest request = new CommandExecutionsSubscriptionRequest(); + + streamingOperationContext.deviceType = deviceType; + streamingOperationContext.deviceId = deviceId; + streamingOperationContext.payloadType = payloadFormat; + + request.deviceType = deviceType; + request.deviceId = deviceId; + + switch (payloadFormat) { + case "application/json": + streamingOperationContext.operation = context.commandsClient.createCommandExecutionsJsonPayloadStream(request, options); + break; + case "application/cbor": + streamingOperationContext.operation = context.commandsClient.createCommandExecutionsCborPayloadStream(request, options); + break; + default: + streamingOperationContext.operation = context.commandsClient.createCommandExecutionsGenericPayloadStream(request, options); + break; + } + + streamingOperationContext.operation.open(); + System.out.printf("Opened streaming operation with ID %d\n", streamId); + + + } catch (Exception ex) { + handleOperationException("open-command-stream", ex, context); + } + } + + static void handleListStreams(ApplicationContext context) { + System.out.println("Streams:"); + context.commandExecutionsStreams.entrySet().forEach(entry -> { + System.out.printf(" %d: device type '%s', device ID '%s', payload type '%s'\n", + entry.getKey(), entry.getValue().deviceType, entry.getValue().deviceId, entry.getValue().payloadType); + }); + } + + static void handleCloseStream(ApplicationContext context, String arguments) { + try { + int streamId = Integer.parseInt(arguments.trim()); + StreamingOperationContext streamingOperationContext = context.commandExecutionsStreams.get(streamId); + if (streamingOperationContext != null) { + streamingOperationContext.operation.close(); + context.commandExecutionsStreams.remove(streamId); + } + } catch (Exception ex) { + handleOperationException("close-stream", ex, context); + } + } + + private static boolean handleCommand(String commandLine, ApplicationContext context) { + String[] commandLineSplit = commandLine.trim().split(" ", 2); + if (commandLineSplit.length == 0) { + return false; + } + + String command = commandLineSplit[0]; + switch (command) { + case "list-commands": + handleListCommands(context); + return false; + case "create-command": + if (commandLineSplit.length == 2) { + handleCreateCommand(context, commandLineSplit[1]); + } + return false; + case "delete-command": + if (commandLineSplit.length == 2) { + handleDeleteCommand(context, commandLineSplit[1]); + } + return false; + case "send-command-to-thing": + if (commandLineSplit.length == 2) { + handleSendCommand(context, DeviceType.THING, commandLineSplit[1]); + } + return false; + case "send-command-to-client": + if (commandLineSplit.length == 2) { + handleSendCommand(context, DeviceType.CLIENT, commandLineSplit[1]); + } + return false; + case "get-command-execution": + if (commandLineSplit.length == 2) { + handleGetCommandExecution(context, commandLineSplit[1]); + } + return false; + case "open-thing-stream": + if (commandLineSplit.length == 2) { + handleOpenStream(context, DeviceType.THING, commandLineSplit[1]); + } + return false; + + case "open-client-stream": + if (commandLineSplit.length == 2) { + handleOpenStream(context, DeviceType.CLIENT, commandLineSplit[1]); + } + return false; + + case "update-command-execution": + if (commandLineSplit.length == 2) { + handleUpdateCommandExecution(context, commandLineSplit[1]); + } + return false; + + case "list-streams": + handleListStreams(context); + return false; + + case "close-stream": + if (commandLineSplit.length == 2) { + handleCloseStream(context, commandLineSplit[1]); + } + return false; + + case "quit": + return true; + + default: + break; + } + + printCommandHelp(); + return false; + } + + public static void main(String[] args) { + try (ApplicationContext context = buildSampleContext(args)) { + if (context == null) { + return; + } + + boolean done = false; + Scanner scanner = new Scanner(System.in); + while (!done) { + System.out.print("\nEnter command: > "); + if (scanner.hasNextLine()) { + String userInput = scanner.nextLine(); + done = handleCommand(userInput, context); + } else { + done = true; + } + } + scanner.close(); + + context.protocolClient.stop(null); + context.stoppedFuture.get(60, TimeUnit.SECONDS); + + System.out.println("Exiting application..."); + } catch (Exception ex) { + System.out.println("Exception encountered: " + ex.toString()); + System.exit(1); + } + + CrtResource.waitForNoResources(); + } +} diff --git a/samples/README.md b/samples/README.md index 30e76a8af..d9c34b228 100644 --- a/samples/README.md +++ b/samples/README.md @@ -28,6 +28,7 @@ ## Other Samples * [Shadow](./ShadowSandbox/README.md) * [Jobs](./JobsSandbox/README.md) +* [Commands](./CommandsSandbox/README.md) * [Fleet Provisioning](./Provisioning/Basic/README.md) * [Fleet Provisioning with CSR](./Provisioning/Csr/README.md) * [Android Sample](./Android/README.md)