diff --git a/doc/dev/adr/0025-rpc-streaming.md b/doc/dev/adr/0025-rpc-streaming.md new file mode 100644 index 0000000000..f8b3a6a9c9 --- /dev/null +++ b/doc/dev/adr/0025-rpc-streaming.md @@ -0,0 +1,400 @@ +# ADR 25: RPC Streaming + +## Context + +Users have expressed a desire to allow more than one request and/or more than one response per RPC invocation. + +## Requirements + + - Allow for an arbitrary number of command requests and responses for a single command invocation + - The total number of requests and responses does not need to be known before the first request/response is sent + - The total number of entries in a stream is allowed to be 1 + - When exposed to the user, each request and response includes an index of where it was in the stream + - Allow for multiple separate commands to be streamed simultaneously + - Allow for invoker and/or executor to cancel a streamed request and/or streamed response at any time + - Allow for invoker + executor to send their requests/responses at arbitrary* times + - For instance, executor may send 1 response upon receiving 1 request, or it may wait for the request stream to finish before sending the first response + - Alternatively, this allows the invoker to send a request upon receiving a response + - *The only limitation is that the invoker must initiate the RPC streaming with a request + - Allow for invoker/executor to end their own request/response stream gracefully at any time + - For instance, if the executor doesn't know if a response will be the last one prior to sending it, the executor should still be capable of ending the response stream later without sending another fully-fledged payload + +## Non-requirements + + - Different payload shapes per command response + - The API of the receiving side of a stream will provide the user the streamed requests/responses in their **intended** order rather than their **received** order + - If the stream's Nth message is lost due to message expiry (or other circumstances), our API should still notify the user when the N+1th stream message is received + - This may be added as a feature later if requested by customers + +## State of the art + +gRPC supports these patterns for RPC: +- [Unary RPC](https://grpc.io/docs/what-is-grpc/core-concepts/#unary-rpc) (1 request message, 1 response message) +- [Server streaming RPC](https://grpc.io/docs/what-is-grpc/core-concepts/#server-streaming-rpc) (1 request message, many response messages) +- [Client streaming RPC](https://grpc.io/docs/what-is-grpc/core-concepts/#server-streaming-rpc) (many request messages, one response message) +- [Bi-directional streaming RPC](https://grpc.io/docs/what-is-grpc/core-concepts/#bidirectional-streaming-rpc) (many request messages, many response messages. Request and response stream may send concurrently and/or in any order) + +[gRPC also allows for either the client or server to cancel an RPC at any time](https://grpc.io/docs/what-is-grpc/core-concepts/#cancelling-an-rpc) + +## Decision + +### API design, .NET + +While RPC streaming shares a lot of similarities to normal RPC, we will define a new communication pattern to handle this scenario with two corresponding base classes: ```StreamingCommandInvoker``` and ```StreamingCommandExecutor```. + +These new base classes will use similar versions of the ```ExtendedRequest``` and ```ExtendedResponse``` RPC classes to include the streaming-specific information about each request and response: + +```csharp +public class StreamingExtendedRequest + where TReq : class +{ + /// + /// The request payload + /// + public TReq Request { get; set; } + + /// + /// The metadata specific to this message in the stream + /// + public StreamMessageMetadata Metadata { get; set; } +} + +public class StreamingExtendedResponse + where TResp : class +{ + /// + /// The response payload + /// + public TResp Response { get; set; } + + /// + /// The metadata specific to this message in the stream + /// + public StreamMessageMetadata Metadata { get; set; } +} +``` + +#### Invoker side + +The new API will ask users to provide a stream of request payloads + metadata, the target streaming command executor, any stream-level metadata and timeout/cancellation tokens. It will return the +stream of responses as well as a cancellation function that allows the user to terminate the stream exchange at any time. + +```csharp +public abstract class StreamingCommandInvoker + where TReq : class + where TResp : class +{ + /// + /// Invoke a streaming command on a particular streaming command executor + /// + /// The stream of requests to send. This stream must contain at least one request. + /// The metadata for the request stream as a whole. + /// Topic tokens to substitute in the request topic. + /// The timeout between the beginning of the request stream and the end of both the request and response stream. + /// Cancellation token. Signalling this will also make a single attempt to notify the executor of the cancellation. + /// The stream of responses. + public async Task> InvokeStreamingCommandAsync( + IAsyncEnumerable> requests, + StreamRequestMetadata? streamRequestMetadata = null, + Dictionary? additionalTopicTokenMap = null, + TimeSpan? commandTimeout = default, + CancellationToken cancellationToken = default) {...} +} +``` + +#### Executor side + +The new ```StreamingCommandExecutor``` will largely look like the existing ```CommandExecutor```, but the callback to notify users that a command was received will include a stream of requests and return a stream of responses. + +```csharp +public abstract class StreamingCommandExecutor : IAsyncDisposable + where TReq : class + where TResp : class +{ + /// + /// A streaming command was invoked + /// + /// + /// The callback provides the stream of requests and requires the user to return one to many responses. + /// + public required Func, StreamRequestMetadata, CancellationToken, IAsyncEnumerable>> OnStreamingCommandReceived { get; set; } +} + +``` + +With this design, commands that use streaming are defined at codegen time. Codegen layer changes will be defined in a separate ADR, though. + +### MQTT layer protocol + +#### Streaming user property + +To convey streaming context in a request/response stream, we will put this information in the "__stream" MQTT user property with a value that looks like: + +```:::``` + +with data types + +```:::``` + +where the field ```:``` is only present in request stream messages and may be omitted if the RPC has no timeout. + +For example: + +```0:false:false:10000```: The first (and not last) message in a request stream where the RPC should timeout beyond 10 seconds + +```3:true:false```: The third and final message in a response stream + +```0:true:false:1000```: The first and final message in a request stream where the RPC should timeout beyond 1 second + +```0:true:true:0```: This request stream has been canceled. Note that the values for ```index```, ```isLast```, and `````` are ignored here. + +```0:true:true```: This response stream has been canceled. Note that the values for ```index``` and ```isLast``` are ignored here. + +[see cancellation support for more details on cancellation scenarios](#cancellation-support) + +[see timeout support for more details on timeout scenarios](#timeout-support) + +#### Invoker side + +The streaming command invoker will first subscribe to the appropriate response topic prior to sending any requests + +Once the user invokes a streaming command, the streaming command invoker will send one to many MQTT messages with: + - The same response topic + - This response topic must be prefixed with 'clients/{mqtt client id of invoker}' like in vanilla RPC + - The same correlation data + - The user property "$partition" set to a value of the client Id of the MQTT client sending this invocation + - This ensures that the broker always routes the messages in the stream to the same executor + - The appropriate streaming metadata [see above](#streaming-user-property) + - The serialized payload as provided by the user's request object + - Any user-definied metadata as specified in the ```ExtendedStreamingRequest``` + - QoS 1 + +Once the stream of requests has started sending, the streaming command invoker should expect the stream of responses to arrive on the provided response topic with the provided correlation data and the streaming user property. + +Once the user-supplied stream of request messages has ended, the streaming command invoker should send one final message to the same topic/with the same correlation data with no payload and with the 'isLast' flag set in the '__stream' metadata bundle. + +Upon receiving an MQTT message in the response stream with the 'isLast' flag set in the '__stream' metadata, the streaming command invoker should notify the user that the stream of responses has ended. This particular message should not contain any payload or other user properties, so the message _should not_ be propagated to the user as if it were part of the response stream. [See here for more details on why this ```isLast``` flag is an independent message](#islast-message-being-its-own-message). + +By default, the streaming command invoker will acknowledge all request messages it receives as soon as they are given to the user. Users may opt into manual acknowledgements, though. Opting into manual acknowledgements allows the user time to "process" each response as necessary before forgoing re-delivery from the broker if the invoker crashes unexpectedly. + +The streaming command invoker will provide de-dupe caching of received responses to account for QoS 1 messages potentially being re-delivered. The streaming command invoker will de-dup using a the combination of the correlationId of the stream and the index of the message within that stream. The de-dup cache entries for a stream should be cleared once the stream has finished (gracefully or otherwise). + +#### Executor side + +A streaming command executor should start by subscribing to the expected command topic + - Even though the streaming command classes are separate from the existing RPC classes, they should also offer the same features around topic string pre/suffixing, custom topic token support, etc. + - The executor should use a shared subscription so that, if there are multiple executors, only one of them receives each stream + +Upon receiving a MQTT message that contains a streaming request, the streaming executor should notify the application layer that the first message in a request stream was received. Once the executor has notified the user that the first message in a request stream was received, the user should be able to provide a stream of responses. Upon receiving each response in that stream from the user, the executor will send an MQTT message for each streamed response with: + - The same correlation data as the original request + - The topic as specified by the original request's response topic field + - The appropriate streaming metadata [see above](#streaming-user-property) + - The serialized payload as provided by the user's response object + - Any user-definied metadata as specified in the ```ExtendedStreamingResponse``` + - QoS 1 + +Upon receiving an MQTT message in the request stream with the 'isLast' flag set in the '__stream' metadata, the streaming executor should notify the user that the stream of requests has ended. This particular message should not contain any payload or other user properties, so the message _should not_ be propagated to the user as if it were part of the request stream. [See here for more details on why this ```isLast``` flag is an independent message](#islast-message-being-its-own-message). + +If a streaming command executor receives an MQTT message with the 'isLast' flag set but has not received any other messages in that request stream, the executor should log an error, acknowledge the message, but otherwise ignore it. A stream of requests must have at least one entry. + +By default, the streaming command executor will acknowledge all response messages it receives as soon as they are given to the user. Users may opt into manual acknowledgements, though. Opting into manual acknowledgements allows the user time to "process" each response as necessary before forgoing re-delivery from the broker if the executor crashes unexpectedly. + +Also unlike normal RPC, the streaming command executor will not provide any re-play cache support. This is because streams may grow indefinitely in length and size so re-playing a response stream isn't feasible. + +The streaming command executor will provide de-dupe caching of received requests to account for QoS 1 messages potentially being re-delivered. The streaming command invoker will de-dup using a the combination of the correlationId of the stream and the index of the message within that stream. The de-dup cache entries for a stream should be cleared once the stream has finished (gracefully or otherwise). + +### Timeout support + +We need to provide timeout support for our streaming APIs to avoid scenarios such as: + +- The invoker side is stuck waiting for the final response in a stream because it was lost or the executor side crashed before sending it. +- The executor side is stuck waiting for the final request in a stream because it was lost or the invoker side crashed before sending it. + +#### Decision + +We will allow configuration on the invoker's side of a timeout for the RPC as a whole and a timeout of each message in the request and/or response stream. + +##### RPC level timeout + +To enable this, each message in the request stream will include a value in the `````` portion of the ```__stream``` user property. This header should be sent in all request stream messages in case the first N request messages are lost due to timeout or otherwise. + +The invoker side will start a countdown from this value after receiving the first PUBACK that ends with throwing a timeout exception to the user if the final stream response has not been received yet. The invoker should not send any further messages beyond this timeout. + +The executor side will start a countdown from this value after receiving the first PUBLISH in the request stream. At the end of the countdown, if the executor has not sent the final response in the response stream, the executor should return the ```timeout``` error code back to the invoker. The executor should also notify the user callback to stop. + +Any request stream or response stream messages that are received by the executor/invoker after they have ended the timeout countdown should be acknowledged but otherwise ignored. This will require both parties to track correlationIds for timed out streams for a period of time beyond the expected end of the RPC so that any post-timeout messages are not treated as initiating a new stream. + +If the request stream omits the timeout value in the ```__stream``` user property, the invoker and executor should treat the RPC as not having a timeout. + +This design does make the invoker start the countdown sooner than the executor, but the time difference is negligible in most circumstances. + +##### Message level timeout + +We will allow users to set the message expiry interval of each message in a request/response stream. By default, though, we will set each message expiry interval equal to the RPC level timeout value. + +Both the invoker and executor stream messages _must_ include a message expiry interval. The receiving end will use this value as the de-dup cache length for each cached message. Vanilla RPC has the same requirement as explained [here](../../reference/command-timeouts.md#input-values). + +#### Alternative timeout designs considered + +- The above approach, but trying to calculate time spent on broker side (using message expiry interval) so that invoker and executor timeout at the same exact time + - This would require additional metadata in the ```__stream``` user property (intended vs received message expiry interval) and is only helpful + in the uncommon scenario where a message spends extended periods of time at the broker +- Specify the number of milliseconds allowed between the executor receiving the final command request and delivering the final command response. + - This is the approach that gRPC takes, but... + - It doesn't account for scenarios where the invoker/executor dies unexpectedly (since gRPC relies on a direct connection between invoker and executor) +- Use the message expiry interval of the first received message in a stream to indicate the RPC level timeout + - Misuses the message expiry interval's purpose and could lead to broker storing messages for extended periods of time unintentionally + +### Cancellation support + +To avoid scenarios where long-running streaming requests/responses are no longer wanted, we will want to support cancelling streaming RPC calls. + +Since sending a cancellation request may fail (message expiry on broker side), the SDK API design should allow for the user to repeatedly call "cancel" and should return successfully once the other party has responded appropriately. + +Additionally, cancellation requests may include user properties. This allows users to provide additional context on why the cancellation is happening. + +#### .NET API design + +The proposed cancellation support would come from the return type on the invoker side and the provided type on the executor side: + +```csharp +public interface IStreamContext + where T : class +{ + /// + /// The asynchronously readable entries in the stream. + /// + IAsyncEnumerable Entries { get; set; } + + /// + /// Cancel this received RPC streaming request. + /// + /// The optional user properties to include + /// Cancellation token for this cancellation request + /// + /// This method may be called by the streaming executor at any time. For instance, if the request stream + /// stalls unexpectedly, the executor can call this method to notify the invoker to stop sending requests. + /// Additionally, the executor can call this method if its response stream has stalled unexpectedly. + /// + Task CancelAsync(Dictionary? userProperties = null, CancellationToken cancellationToken = default); + + /// + /// The token that tracks if the streaming exchange has been cancelled by the other party and/or timed out. + /// + /// + /// For instance, if the invoker side cancels the streaming exchange, the executor side callback's + /// will be triggered. If the executor side cancels the streaming exchange, the invoker side's returned + /// will be triggered. + /// + /// To see if this was triggered because the stream exchange was cancelled, see . To see if it was triggered because + /// the stream exchange timed out, see . + /// + CancellationToken CancellationToken { get; } + + /// + /// Get the user properties associated with a cancellation request started with . + /// + /// The user properties associated with a cancellation request + /// + /// If the stream has not been cancelled, this will return null. If the stream has been cancelled, but no user properties were + /// provided in that cancellation request, this will return null. + /// + Dictionary? GetCancellationRequestUserProperties(); + + /// + /// True if this stream exchange has timed out. If a stream has timed out, will trigger as well. + /// + bool HasTimedOut { get; internal set; } + + /// + /// True if this stream exchange has been canceled by the other party. If a stream has been cancelled, will trigger as well. + /// + bool IsCanceled { get; internal set; } +} +``` + +With this design, we can cancel a stream from either side at any time and check for received user properties on any received cancellation requests. For detailed examples, see the integration tests written [here](../../../dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/StreamingIntegrationTests.cs). + +### Protocol layer details + +#### Invoker side + +- The command invoker may cancel a streaming command while streaming the request or receiving the stream of responses by sending an MQTT message with: + - The same MQTT topic as the invoked method + - The same correlation data as the invoked method + - Streaming metadata with the ["cancel" flag set](#streaming-user-property) + - No payload +- The command invoker should still listen on the response topic for a response from the executor which may still contain a successful response (if cancellation was received after the command completed successfully) or a response signalling that cancellation succeeded ("Canceled" error code) + +As detailed below, the executor may also cancel the stream at any time. In response to receiving a cancellation request from the executor, the invoker should send an MQTT message with: + - The same topic as the command itself + - The same correlation data as the command itself + - The "Canceled" error code + +After receiving an acknowledgement from the executor side that the stream has been canceled, any further received messages should be acknowledged but not given to the user. + +#### Executor side + +Upon receiving an MQTT message with the stream "cancel" flag set to "true" that correlates to an actively executing streaming command, the command executor should: + - Notify the application layer that that RPC has been canceled if it is still running + - Send an MQTT message to the appropriate response topic with error code "canceled" to notify the invoker that the RPC has stopped and no further responses will be sent. + +If the executor receives a cancellation request for a streaming command that has already completed, then the cancellation request should be ignored. + +The executor may cancel receiving a stream of requests or cancel sending a stream of responses as well. It does so by sending an MQTT message to the invoker with: + - The same MQTT topic as command response + - The same correlation data as the invoked method + - Streaming metadata with the ["cancel" flag set](#streaming-user-property) + - No payload + +The command invoker should then send a message on the same command topic with the same correlation data with the "stream canceled successfully" flag set. + +Any received MQTT messages pertaining to a command that was already canceled should still be acknowledged. They should not be given to the user, though. + +### Disconnection scenario considerations + +- Invoker side disconnects unexpectedly while sending requests + - Upon reconnection, the request messages queued in the session client should send as expected + - If no reconnection, the streaming RPC will timeout +- Invoker side disconnects unexpectedly while receiving responses + - The broker should hold all published responses for as long as the invoker's session lives and send them upon reconnection + - If the invoker's session is lost, then the RPC will timeout +- Executor side isn't connected when invoker sends first request + - Depending on broker behavior, invoker will receive a "no matching subscribers" puback + - Seems like a scenario we would want to retry? + - If the broker returns a successful puback, then the invoker side will eventually time out +- Executor side disconnects unexpectedly while receiving requests + - Broker should hold all published requests for as long as the executor's session lives and send them upon reconnection + - If the executor's session is lost, the RPC will timeout +- Executor side disconnects unexpectedly while sending responses + - Upon reconnection, the response messages queued in the session client should send as expected + - If no reconnection, the streaming RPC will timeout + +### Protocol versioning + +By maintaining RPC streaming as a separate communication pattern from normal RPC, we will need to introduce an independent protocol version for RPC streaming. It will start at ```1.0``` and should follow the same protocol versioning rules as the protocol versions used by telemetry and normal RPC. + +## Alternative designs considered + + - Allow the command executor to decide at run time of each command if it will stream responses independent of the command invoker's request + - This would force users to always call the ```InvokeCommandWithStreaming``` API on the command invoker side and that returned object isn't as easy to use for single responses + - Treat streaming RPC as the same protocol as RPC + - This introduces a handful of error cases such as: + - Invoker invokes a method that it thinks is non-streaming, but the executor tries streaming responses + - Executor receives a streaming command but the user did not set the streaming command handler callback (which must be optional since not every command executor has streaming commands) + - API design is messy because a command invoker/executor should not expose streaming command APIs if they have no streaming commands + - Caching behavior of normal RPC doesn't fit well with streamed RPCs which may grow indefinitely large + + +## Appendix + +### IsLast message being its own message + +There are three possible approaches to marking the final message in a stream that have been considered. Below are the approaches and the reasons why that approach doesn't work + +- Require the ```isLast``` flag to be set on a message that carries a fully-fledged stream message (i.e. has a user-provided payload and/or user properties) + - We must support ending streams at an arbitrary time even if a fully-fledged stream message can't be sent and this approach doesn't allow for that +- Allow the ```isLast``` flag to be set on either a fully-fledged stream message or as a standalone message with no user payload and no user properties + - This approach does not allow the receiving end to distinguish between "The stream is over" and "This is the final message in the stream" in cases where the user may provide no payload or user properties on streamed messages. + +Because the two above approaches either don't support our requirements or have ambiguities in corner cases, we should require the ```isLast``` flag be set on a standalone message with no uesr payload and no user properties. \ No newline at end of file diff --git a/doc/reference/error-model.md b/doc/reference/error-model.md index b711488d61..3fb8b14fba 100644 --- a/doc/reference/error-model.md +++ b/doc/reference/error-model.md @@ -151,6 +151,7 @@ public enum AkriMqttErrorKind ExecutionException, MqttError, UnsupportedVersion, + Canceled, } ``` @@ -263,6 +264,7 @@ public enum AkriMqttErrorKind { EXECUTION_EXCEPTION, MQTT_ERROR, UNSUPPORTED_VERSION, + CANCELED, } ``` @@ -327,6 +329,7 @@ pub enum AIOProtocolErrorKind { ExecutionException, MqttError, UnsupportedVersion, + Canceled, } ``` @@ -399,6 +402,7 @@ const { ExecutionError MqttError UnsupportedVersion + Canceled } ``` @@ -454,6 +458,7 @@ class AkriMqttErrorKind(Enum): EXECUTION_EXCEPTION = 10 MQTT_ERROR = 11 UNSUPPORTED_VERSION = 12 + CANCELED = 13 ``` The Akri.Mqtt error type is defined as follows: @@ -569,6 +574,7 @@ The following table lists the HTTP status codes, conditions on other fields in t | 400 | Bad Request | false | no | | invalid payload | | 408 | Request Timeout | false | yes | yes | timeout | | 415 | Unsupported Media Type | false | yes | yes | invalid header | +| 452 | Request Cancelled | false | no | no | canceled | | 500 | Internal Server Error | false | no | | unknown error | | 500 | Internal Server Error | false | yes | | internal logic error | | 500 | Internal Server Error | true | maybe | | execution error | diff --git a/dotnet/src/Azure.Iot.Operations.Protocol/AkriSystemProperties.cs b/dotnet/src/Azure.Iot.Operations.Protocol/AkriSystemProperties.cs index 740650e017..e55cd7fc7d 100644 --- a/dotnet/src/Azure.Iot.Operations.Protocol/AkriSystemProperties.cs +++ b/dotnet/src/Azure.Iot.Operations.Protocol/AkriSystemProperties.cs @@ -1,4 +1,4 @@ -// Copyright (c) Microsoft Corporation. +// Copyright (c) Microsoft Corporation. // Licensed under the MIT License. using System; diff --git a/dotnet/src/Azure.Iot.Operations.Protocol/RPC/ExtendedResponse.cs b/dotnet/src/Azure.Iot.Operations.Protocol/RPC/ExtendedResponse.cs index d639b76a0e..33a5babdef 100644 --- a/dotnet/src/Azure.Iot.Operations.Protocol/RPC/ExtendedResponse.cs +++ b/dotnet/src/Azure.Iot.Operations.Protocol/RPC/ExtendedResponse.cs @@ -1,4 +1,4 @@ -// Copyright (c) Microsoft Corporation. +// Copyright (c) Microsoft Corporation. // Licensed under the MIT License. using System; diff --git a/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/IStreamContext.cs b/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/IStreamContext.cs new file mode 100644 index 0000000000..a948e33e52 --- /dev/null +++ b/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/IStreamContext.cs @@ -0,0 +1,76 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace Azure.Iot.Operations.Protocol.Streaming +{ + /// + /// A stream of requests or responses that can be gracefully ended or canceled (with confirmation) at any time. + /// + /// The type of the payload of the request/response stream + public interface IStreamContext + where T : class + { + /// + /// The asynchronously readable entries in the stream + /// + IAsyncEnumerable Entries { get; set; } + + /// + /// Cancel this RPC streaming exchange. + /// + /// + /// The optional user properties to include in this cancellation request. the receiving side of this cancellation request + /// will be given these properties alongside the notification that the streaming exchange has been canceled. + /// + /// Cancellation token to wait for confirmation from the receiving side that the cancellation succeeded. + /// + /// When called by the invoker, the executor will be notified about this cancellation and the executor will attempt + /// to stop any user-defined handling of the streaming request. When called by the executor, the invoker will be notified + /// and will cease sending requests and will throw an with + /// of . + /// + /// This method may be called by the streaming invoker or executor at any time. For instance, if the request stream + /// stalls unexpectedly, the executor can call this method to notify the invoker to stop sending requests. + /// Additionally, the invoker can call this method if its response stream has stalled unexpectedly. + /// + Task CancelAsync(Dictionary? userData = null, CancellationToken cancellationToken = default); + + /// + /// The token that tracks if the streaming exchange has been cancelled by the other party and/or timed out. + /// + /// + /// For instance, if the invoker side cancels the streaming exchange, the executor side callback's + /// will be triggered. If the executor side cancels the streaming exchange, the invoker side's returned + /// will be triggered. + /// + /// To see if this was triggered because the stream exchange was cancelled, see . To see if it was triggered because + /// the stream exchange timed out, see . + /// + CancellationToken CancellationToken { get; } + + /// + /// Get the user properties associated with a cancellation request started with . + /// + /// The user properties associated with a cancellation request + /// + /// If the stream has not been cancelled, this will return null. If the stream has been cancelled, but no user properties were + /// provided in that cancellation request, this will return null. + /// + Dictionary? GetCancellationRequestUserProperties(); + + /// + /// True if this stream exchange has timed out. If a stream has timed out, will trigger as well. + /// + bool HasTimedOut { get; internal set; } + + /// + /// True if this stream exchange has been canceled by the other party. If a stream has been cancelled, will trigger as well. + /// + bool IsCanceled { get; internal set; } + } +} diff --git a/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/ReceivedStreamingExtendedRequest.cs b/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/ReceivedStreamingExtendedRequest.cs new file mode 100644 index 0000000000..02886fcd40 --- /dev/null +++ b/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/ReceivedStreamingExtendedRequest.cs @@ -0,0 +1,29 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System; +using System.Threading.Tasks; + +namespace Azure.Iot.Operations.Protocol.Streaming +{ + /// + /// The payload and metadata associated with a single request in a request stream. + /// + /// The type of the payload of the request + public class ReceivedStreamingExtendedRequest : StreamingExtendedRequest + where TReq : class + { + private readonly Task _acknowledgementFunc; + + internal ReceivedStreamingExtendedRequest(TReq request, StreamMessageMetadata metadata, Task acknowledgementFunc) + : base(request, metadata) + { + _acknowledgementFunc = acknowledgementFunc; + } + + public async Task AcknowledgeAsync() + { + await _acknowledgementFunc; + } + } +} diff --git a/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/ReceivedStreamingExtendedResponse.cs b/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/ReceivedStreamingExtendedResponse.cs new file mode 100644 index 0000000000..8d7b4d834f --- /dev/null +++ b/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/ReceivedStreamingExtendedResponse.cs @@ -0,0 +1,28 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.Threading.Tasks; + +namespace Azure.Iot.Operations.Protocol.Streaming +{ + /// + /// The payload and metadata associated with a single response in a response stream. + /// + /// The type of the payload of the response + public class ReceivedStreamingExtendedResponse : StreamingExtendedResponse + where TResp : class + { + private readonly Task _acknowledgementFunc; + + internal ReceivedStreamingExtendedResponse(TResp response, StreamMessageMetadata metadata, Task acknowledgementFunc) + : base(response, metadata) + { + _acknowledgementFunc = acknowledgementFunc; + } + + public async Task AcknowledgeAsync() + { + await _acknowledgementFunc; + } + } +} diff --git a/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/RequestStreamMetadata.cs b/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/RequestStreamMetadata.cs new file mode 100644 index 0000000000..32703b7185 --- /dev/null +++ b/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/RequestStreamMetadata.cs @@ -0,0 +1,45 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using Azure.Iot.Operations.Protocol.Models; + +namespace Azure.Iot.Operations.Protocol.Streaming +{ + /// + /// Metadata for a request stream as a whole. + /// + public class RequestStreamMetadata + { + /// + /// The correlationId for tracking this streaming request + /// + public Guid CorrelationId { get; set; } + + /// + /// The Id of the client that invoked this streaming request + /// + public string? InvokerClientId { get; set; } + + /// + /// The MQTT topic tokens used in this streaming request. + /// + public Dictionary TopicTokens { get; } = new(); + + /// + /// The partition associated with this streaming request. + /// + public string? Partition { get; } + + /// + /// The content type of all messages sent in this request stream. + /// + public string? ContentType { get; set; } + + /// + /// The payload format indicator for all messages sent in this request stream. + /// + public MqttPayloadFormatIndicator PayloadFormatIndicator { get; set; } + } +} diff --git a/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/ResponseStreamMetadata.cs b/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/ResponseStreamMetadata.cs new file mode 100644 index 0000000000..a88630ec25 --- /dev/null +++ b/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/ResponseStreamMetadata.cs @@ -0,0 +1,23 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Azure.Iot.Operations.Protocol.Models; + +namespace Azure.Iot.Operations.Protocol.Streaming +{ + /// + /// Metadata for a response stream as a whole. + /// + public class ResponseStreamMetadata + { + /// + /// The content type of all messages in this response stream + /// + public string? ContentType { get; set; } + + /// + /// The payload format indicator for all messages in this response stream + /// + public MqttPayloadFormatIndicator PayloadFormatIndicator { get; set; } + } +} diff --git a/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamMessageMetadata.cs b/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamMessageMetadata.cs new file mode 100644 index 0000000000..176424d8d9 --- /dev/null +++ b/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamMessageMetadata.cs @@ -0,0 +1,29 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.Collections.Generic; + +namespace Azure.Iot.Operations.Protocol.Streaming +{ + /// + /// Metadata for a specific message within a request stream + /// + public class StreamMessageMetadata + { + /// + /// The timestamp attached to this particular message + /// + public HybridLogicalClock? Timestamp { get; internal set; } + + /// + /// User properties associated with this particular message + /// + public Dictionary UserData { get; init; } = new(); + + /// + /// The index of this message within the stream as a whole + /// + /// This value is automatically assigned when sending messages in a request/response stream and cannot be overriden. + public int Index { get; internal set; } + } +} diff --git a/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamingCommandExecutor.cs b/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamingCommandExecutor.cs new file mode 100644 index 0000000000..532c24ea05 --- /dev/null +++ b/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamingCommandExecutor.cs @@ -0,0 +1,75 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +#pragma warning disable IDE0060 // Remove unused parameter +#pragma warning disable CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring as nullable. + +namespace Azure.Iot.Operations.Protocol.Streaming +{ + public abstract class StreamingCommandExecutor : IAsyncDisposable + where TReq : class + where TResp : class + { + /// + /// A streaming command was invoked + /// + /// + /// The callback provides the stream of requests and requires the user to return one to many responses. + /// + public required Func>, RequestStreamMetadata, IAsyncEnumerable>> OnStreamingCommandReceived { get; set; } + + public string RequestTopicPattern { get; init; } + + /// + /// The topic token replacement map that this executor will use by default. Generally, this will include the token values + /// for topic tokens such as "executorId" which should be the same for the duration of this command executor's lifetime. + /// + /// + /// Tokens replacement values can also be specified when starting the executor by specifying the additionalTopicToken map in . + /// + public Dictionary TopicTokenMap { get; protected set; } + + /// + /// If true, this executor will acknowledge the MQTT message associated with each streaming request as soon as it arrives. + /// If false, the user must call once they are done processing + /// each request message. + /// + /// + /// Generally, delaying acknowledgement allows for re-delivery by the broker in cases where the executor crashes or restarts unexpectedly. + /// However, MQTT acknowledgements must be delivered in order, so delaying these acknowledgements may affect the flow of acknowledgements + /// being sent by other processes using this same MQTT client. Additionally, the MQTT broker has a limit on the number of un-acknowledged messages + /// that are allowed to be in-flight at a single moment, so delaying too many acknowledgements may halt all further MQTT traffic on the underlying + /// MQTT client. + /// + public bool AutomaticallyAcknowledgeRequests { get; set; } = true; + + public Task StartAsync(CancellationToken cancellationToken = default) + { + // TODO: derive the expected request topic (like command executor does) + + // TODO: subscribe to the shared subscription prefixed request topic + + throw new NotImplementedException(); + } + + public Task StopAsync(CancellationToken cancellationToken = default) + { + // TODO: Unsubscribe from the request topic derived in StartAsync + + throw new NotImplementedException(); + } + + public async ValueTask DisposeAsync() + { + await StopAsync(); + + GC.SuppressFinalize(this); + } + } +} +#pragma warning restore IDE0060 // Remove unused parameter +#pragma warning restore CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring as nullable. diff --git a/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamingCommandInvoker.cs b/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamingCommandInvoker.cs new file mode 100644 index 0000000000..07d329bafd --- /dev/null +++ b/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamingCommandInvoker.cs @@ -0,0 +1,124 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +#pragma warning disable IDE0060 // Remove unused parameter +#pragma warning disable CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring as nullable. +#pragma warning disable CS0168 // Variable is declared but never used + +namespace Azure.Iot.Operations.Protocol.Streaming +{ + public abstract class StreamingCommandInvoker : IAsyncDisposable + where TReq : class + where TResp : class + { + /// + /// The topic token replacement map that this streaming command invoker will use by default. Generally, this will include the token values + /// for topic tokens such as "modelId" which should be the same for the duration of this command invoker's lifetime. + /// + /// + /// Tokens replacement values can also be specified per-method invocation by specifying the additionalTopicToken map in . + /// + public Dictionary TopicTokenMap { get; protected set; } + + public string RequestTopicPattern { get; init; } + + public string? TopicNamespace { get; set; } + + /// + /// The prefix to use in the command response topic. This value is ignored if is set. + /// + /// + /// If no prefix or suffix is specified, and no value is provided in , then this + /// value will default to "clients/{invokerClientId}" for security purposes. + /// + /// If a prefix and/or suffix are provided, then the response topic will use the format: + /// {prefix}/{command request topic}/{suffix}. + /// + public string? ResponseTopicPrefix { get; set; } + + /// + /// The suffix to use in the command response topic. This value is ignored if is set. + /// + /// + /// If no suffix is specified, then the command response topic won't include a suffix. + /// + /// If a prefix and/or suffix are provided, then the response topic will use the format: + /// {prefix}/{command request topic}/{suffix}. + /// + public string? ResponseTopicSuffix { get; set; } + + /// + /// If provided, this topic pattern will be used for command response topic. + /// + /// + /// If not provided, and no value is provided for or , the default pattern used will be clients/{mqtt client id}/{request topic pattern}. + /// + public string? ResponseTopicPattern { get; set; } + + /// + /// If true, this invoker will acknowledge the MQTT message associated with each streaming response as soon as it arrives. + /// If false, the user must call once they are done processing + /// each response message. + /// + /// + /// Generally, delaying acknowledgement allows for re-delivery by the broker in cases where the invoker crashes or restarts unexpectedly. + /// However, MQTT acknowledgements must be delivered in order, so delaying these acknowledgements may affect the flow of acknowledgements + /// being sent by other processes using this same MQTT client. Additionally, the MQTT broker has a limit on the number of un-acknowledged messages + /// that are allowed to be in-flight at a single moment, so delaying too many acknowledgements may halt all further MQTT traffic on the underlying + /// MQTT client. + /// + public bool AutomaticallyAcknowledgeResponses { get; set; } = true; + + /// + /// Invoke a streaming command on a particular streaming command executor + /// + /// The stream of requests to send. This stream must contain at least one request. + /// The metadata for the request stream as a whole. + /// Topic tokens to substitute in the request topic. + /// The timeout between the beginning of the request stream and the end of both the request and response stream. + /// + /// Cancellation token. Signalling this will also make a single attempt to notify the executor of the cancellation. To make multiple attempts to cancel and/or + /// check that this cancellation succeeded, use instead. + /// + /// The stream of responses. + public async Task>> InvokeStreamingCommandAsync( + IAsyncEnumerable> requests, + RequestStreamMetadata? streamMetadata = null, + Dictionary? additionalTopicTokenMap = null, + TimeSpan? streamExchangeTimeout = null, + CancellationToken cancellationToken = default) + { + // TODO: Derive the request topic (like commandInvoker does) + + // TODO: Subscribe to the expected response topic + + // TODO: construct the IAsyncEnumerable of responses to capture the stream of responses prior to sending the first request. + IAsyncEnumerable> responses; + IStreamContext>> streamContext; + + await foreach (var streamMessage in requests) + { + // TODO: Construct and send an MQTT message to the executor. Attach properties from both streamMetadata and streamMessage.Metadata + } + + // TODO: Send the "end of stream" MQTT message now that all request messages have been sent + + throw new NotImplementedException(); + } + + public ValueTask DisposeAsync() + { + GC.SuppressFinalize(this); + return ValueTask.CompletedTask; + } +#pragma warning restore IDE0060 // Remove unused parameter +#pragma warning restore CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring as nullable. +#pragma warning restore CS0168 // Variable is declared but never used + + } +} diff --git a/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamingExtendedRequest.cs b/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamingExtendedRequest.cs new file mode 100644 index 0000000000..be174de3e0 --- /dev/null +++ b/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamingExtendedRequest.cs @@ -0,0 +1,41 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System; + +namespace Azure.Iot.Operations.Protocol.Streaming +{ + /// + /// The payload and metadata associated with a single request in a request stream. + /// + /// The type of the payload of the request + public class StreamingExtendedRequest + where TReq : class + { + /// + /// The request payload + /// + public TReq Payload { get; set; } + + /// + /// The metadata specific to this message in the stream + /// + public StreamMessageMetadata Metadata { get; set; } + + /// + /// How long the message will be persisted by the MQTT broker if the executor side is not connected to receive it. + /// + /// + /// By default, this value will be set equal to the stream-level timeout specified in . + /// Generally, this value should be strictly less than or equal to the stream-level timeout. + /// Setting shorter timespans here allows for streamed messages to expire if they are no longer relevant beyond a certain point. + /// + public TimeSpan? MessageExpiry { get; set; } + + public StreamingExtendedRequest(TReq request, StreamMessageMetadata? metadata = null) + { + Payload = request; + Metadata = metadata ?? new(); + } + } +} diff --git a/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamingExtendedResponse.cs b/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamingExtendedResponse.cs new file mode 100644 index 0000000000..c33c1bfd71 --- /dev/null +++ b/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamingExtendedResponse.cs @@ -0,0 +1,41 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System; + +namespace Azure.Iot.Operations.Protocol.Streaming +{ + /// + /// The payload and metadata associated with a single response in a response stream. + /// + /// The type of the payload of the response + public class StreamingExtendedResponse + where TResp : class + { + /// + /// The response payload + /// + public TResp Payload { get; set; } + + /// + /// The metadata specific to this message in the stream + /// + public StreamMessageMetadata Metadata { get; set; } + + /// + /// How long the message will be persisted by the MQTT broker if the invoker side is not connected to receive it. + /// + /// + /// By default, this value will be set equal to the stream-level timeout specified in . + /// Generally, this value should be strictly less than or equal to the stream-level timeout. + /// Setting shorter timespans here allows for streamed messages to expire if they are no longer relevant beyond a certain point. + /// + public TimeSpan? MessageExpiry { get; set; } + + public StreamingExtendedResponse(TResp response, StreamMessageMetadata? metadata = null) + { + Payload = response; + Metadata = metadata ?? new(); + } + } +} diff --git a/dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/StreamingIntegrationTests.cs b/dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/StreamingIntegrationTests.cs new file mode 100644 index 0000000000..1da01ba0ec --- /dev/null +++ b/dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/StreamingIntegrationTests.cs @@ -0,0 +1,625 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.Collections.Concurrent; +using Azure.Iot.Operations.Mqtt.Session; +using Azure.Iot.Operations.Protocol.Streaming; + +namespace Azure.Iot.Operations.Protocol.IntegrationTests +{ + public class StreamingIntegrationTests + { + // shared across all tests in this file, but each test should use a unique GUID as the correlationId of the stream + private readonly ConcurrentDictionary>> _receivedRequests = new(); + private readonly ConcurrentDictionary>> _sentResponses = new(); + +#pragma warning disable CS9113 // Parameter is unread. +#pragma warning disable IDE0060 // Remove unused parameter + internal class StringStreamingCommandInvoker(ApplicationContext applicationContext, IMqttPubSubClient mqttClient) + : StreamingCommandInvoker() + { } + + internal class EchoStringStreamingCommandExecutor : StreamingCommandExecutor + { + internal EchoStringStreamingCommandExecutor(ApplicationContext applicationContext, IMqttPubSubClient mqttClient, string commandName = "echo") +#pragma warning restore IDE0060 // Remove unused parameter + : base() +#pragma warning restore CS9113 // Parameter is unread. + + { + + } + } + + [Theory] + [InlineData(false, false)] + [InlineData(true, false)] + [InlineData(false, true)] + [InlineData(true, true)] + public async Task StreamRequestsAndResponsesInSerial(bool multipleRequests, bool multipleResponses) + { + await using MqttSessionClient invokerMqttClient = await ClientFactory.CreateSessionClientFromEnvAsync(); + await using MqttSessionClient executorMqttClient = await ClientFactory.CreateSessionClientFromEnvAsync(); + + int requestCount = multipleRequests ? 3 : 1; + int responseCount = multipleResponses ? 3 : 1; + + await using EchoStringStreamingCommandExecutor executor = multipleResponses + ? new(new(), executorMqttClient) + { + OnStreamingCommandReceived = SerialHandlerMultipleResponses + } + : new(new(), executorMqttClient) + { + OnStreamingCommandReceived = SerialHandlerSingleResponse + }; + + await executor.StartAsync(); + + await using StringStreamingCommandInvoker invoker = new(new(), invokerMqttClient); + + RequestStreamMetadata requestMetadata = new(); + var responseStreamContext = await invoker.InvokeStreamingCommandAsync(GetStringRequestStream(requestCount), requestMetadata); + + List> receivedResponses = new(); + await foreach (StreamingExtendedResponse response in responseStreamContext.Entries) + { + receivedResponses.Add(response); + } + + List> expectedRequests = new(); + await foreach (var request in GetStringRequestStream(requestCount)) + { + expectedRequests.Add(request); + } + + if (!_receivedRequests.TryGetValue(requestMetadata.CorrelationId, out var receivedRequests)) + { + Assert.Fail("Executor did not receive any requests"); + } + + Assert.Equal(expectedRequests.Count, receivedRequests.Count); + for (int i = 0; i < expectedRequests.Count; i++) + { + Assert.Equal(expectedRequests[i].Payload, receivedRequests[i].Payload); + Assert.Equal(i, receivedRequests[i].Metadata!.Index); + } + + if (!_sentResponses.TryGetValue(requestMetadata.CorrelationId, out var sentResponses)) + { + Assert.Fail("Executor did not send any responses"); + } + + Assert.Equal(receivedResponses.Count, sentResponses.Count); + for (int i = 0; i < expectedRequests.Count; i++) + { + Assert.Equal(sentResponses[i].Payload, receivedResponses[i].Payload); + Assert.Equal(i, receivedResponses[i].Metadata!.Index); + } + } + + //TODO add user properties to these tests + [Fact] + public async Task InvokerCanCancelWhileStreamingRequests() //TODO does cancellation token trigger on executor side? Add to other tests as well + { + await using MqttSessionClient invokerMqttClient = await ClientFactory.CreateSessionClientFromEnvAsync(); + await using MqttSessionClient executorMqttClient = await ClientFactory.CreateSessionClientFromEnvAsync(); + + await using EchoStringStreamingCommandExecutor executor = + new(new(), executorMqttClient) + { + OnStreamingCommandReceived = SerialHandlerExpectsCancellationWhileStreamingRequests + }; + + await executor.StartAsync(); + + await using StringStreamingCommandInvoker invoker = new(new(), invokerMqttClient); + + var stream = await invoker.InvokeStreamingCommandAsync(GetStringRequestStreamWithDelay()); + + Dictionary cancellationCustomUserProperties = new() + { + { "someUserPropertyKey", "someUserPropertyValue"} + }; + + await stream.CancelAsync(cancellationCustomUserProperties); + + //TODO assert the executor received cancellation + user properties + } + + [Fact] + public async Task InvokerCanCancelWhileStreamingResponses() + { + await using MqttSessionClient invokerMqttClient = await ClientFactory.CreateSessionClientFromEnvAsync(); + await using MqttSessionClient executorMqttClient = await ClientFactory.CreateSessionClientFromEnvAsync(); + + await using EchoStringStreamingCommandExecutor executor = new(new(), executorMqttClient) + { + OnStreamingCommandReceived = SerialHandlerMultipleResponsesWithDelay + }; + + await executor.StartAsync(); + + await using StringStreamingCommandInvoker invoker = new(new(), invokerMqttClient); + + var responseStreamContext = await invoker.InvokeStreamingCommandAsync(GetStringRequestStream(1)); + + await foreach (var response in responseStreamContext.Entries) + { + Dictionary cancellationCustomUserProperties = new() + { + { "someUserPropertyKey", "someUserPropertyValue"} + }; + + await responseStreamContext.CancelAsync(cancellationCustomUserProperties); + break; + } + } + + [Fact] + public async Task ExecutorCanCancelWhileStreamingRequests() + { + await using MqttSessionClient invokerMqttClient = await ClientFactory.CreateSessionClientFromEnvAsync(); + await using MqttSessionClient executorMqttClient = await ClientFactory.CreateSessionClientFromEnvAsync(); + + await using EchoStringStreamingCommandExecutor executor = + new(new(), executorMqttClient) + { + OnStreamingCommandReceived = SerialHandlerThatCancelsWhileStreamingRequests + }; + + await executor.StartAsync(); + + await using StringStreamingCommandInvoker invoker = new(new(), invokerMqttClient); + + var responseStreamContext = await invoker.InvokeStreamingCommandAsync(GetStringRequestStreamWithDelay()); + + bool receivedCancellation = false; + try + { + await foreach (var response in responseStreamContext.Entries) + { + // Executor should send cancellation request prior to sending any responses + } + } + catch (AkriMqttException ame) when (ame.Kind is AkriMqttErrorKind.Cancellation) + { + receivedCancellation = true; + Assert.True(responseStreamContext.CancellationToken.IsCancellationRequested); // TODO timing on exception thrown vs cancellation token triggered? + Dictionary? cancellationRequestUserProperties = responseStreamContext.GetCancellationRequestUserProperties(); + Assert.NotNull(cancellationRequestUserProperties); + Assert.NotEmpty(cancellationRequestUserProperties.Keys); //TODO actually validate the values match + } + + Assert.True(receivedCancellation); + } + + [Fact] + public async Task ExecutorCanCancelWhileStreamingResponses() + { + await using MqttSessionClient invokerMqttClient = await ClientFactory.CreateSessionClientFromEnvAsync(); + await using MqttSessionClient executorMqttClient = await ClientFactory.CreateSessionClientFromEnvAsync(); + + await using EchoStringStreamingCommandExecutor executor = + new(new(), executorMqttClient) + { + OnStreamingCommandReceived = SerialHandlerThatCancelsStreamingWhileStreamingResponses + }; + + await executor.StartAsync(); + + await using StringStreamingCommandInvoker invoker = new(new(), invokerMqttClient); + + var responseStreamContext = await invoker.InvokeStreamingCommandAsync(GetStringRequestStream(1)); + + bool receivedCancellation = false; + try + { + await foreach (var response in responseStreamContext.Entries) + { + // Read responses until the executor sends a cancellation request + } + } + catch (AkriMqttException ame) when (ame.Kind is AkriMqttErrorKind.Cancellation) + { + receivedCancellation = true; + Assert.True(responseStreamContext.CancellationToken.IsCancellationRequested); // TODO timing on exception thrown vs cancellation token triggered? + Dictionary? cancellationRequestUserProperties = responseStreamContext.GetCancellationRequestUserProperties(); + Assert.NotNull(cancellationRequestUserProperties); + Assert.NotEmpty(cancellationRequestUserProperties.Keys); //TODO actually validate the values match + } + + Assert.True(receivedCancellation); + } + + // Can configure the executor to send a response for each request and the invoker to only send the nth request after receiving the n-1th response + [Fact] + public async Task CanStreamRequestsAndResponsesSimultaneously() + { + await using MqttSessionClient invokerMqttClient = await ClientFactory.CreateSessionClientFromEnvAsync(); + await using MqttSessionClient executorMqttClient = await ClientFactory.CreateSessionClientFromEnvAsync(); + + await using EchoStringStreamingCommandExecutor executor = new(new(), executorMqttClient) + { + OnStreamingCommandReceived = ParallelHandlerEchoResponses + }; + + await executor.StartAsync(); + + await using StringStreamingCommandInvoker invoker = new(new(), invokerMqttClient); + + RequestStreamMetadata requestMetadata = new(); + TaskCompletionSource tcs1 = new(); // the delay to impose before sending the first request in the request stream + TaskCompletionSource tcs2 = new(); // the delay to impose before sending the second request in the request stream + TaskCompletionSource tcs3 = new(); // the delay to impose before sending the third request in the request stream + + tcs1.TrySetResult(); // Don't need to delay the first message + + var responseStreamContext = await invoker.InvokeStreamingCommandAsync(GetStringRequestStreamWithDelay(tcs1, tcs2, tcs3), requestMetadata); + + List> receivedResponses = new(); + await foreach (StreamingExtendedResponse response in responseStreamContext.Entries) + { + receivedResponses.Add(response); + + //TOOD metadata will never be null when received, but may be null when assigned + if (response.Metadata!.Index == 0) + { + // The first response has been received, so allow the second request to be sent + tcs2.TrySetResult(); + } + + if (response.Metadata!.Index == 1) + { + // The second response has been received, so allow the third request to be sent + tcs2.TrySetResult(); + } + } + + if (!_receivedRequests.TryGetValue(requestMetadata.CorrelationId, out var receivedRequests)) + { + Assert.Fail("Executor did not receive any requests"); + } + + // Executor should echo back each request as a response + Assert.Equal(receivedResponses.Count, receivedRequests.Count); + for (int i = 0; i < receivedResponses.Count; i++) + { + Assert.Equal(receivedResponses[i].Payload, receivedRequests[i].Payload); + } + } + + [Fact] + public Task CanAddUserPropertiesToSpecificToMessagesInRequestAndstreamContexts() + { + throw new NotImplementedException(); + } + + [Fact] + public Task CanCancelFromInvokerSideWithCancellationToken() + { + throw new NotImplementedException(); + } + + // In cases where the IAsyncEnumerable isn't sure if a given entry will be the last, users can "escape" by using the keyword + // "yield break" to signal the IAsyncEnumerable has ended without providing a fully-fledged final entry + [Fact] + public async Task InvokerCanCompleteRequestStreamWithYieldBreak() + { + await using MqttSessionClient invokerMqttClient = await ClientFactory.CreateSessionClientFromEnvAsync(); + await using MqttSessionClient executorMqttClient = await ClientFactory.CreateSessionClientFromEnvAsync(); + + await using EchoStringStreamingCommandExecutor executor = + new(new(), executorMqttClient) + { + OnStreamingCommandReceived = SerialHandlerSingleResponse + }; + + await executor.StartAsync(); + + await using StringStreamingCommandInvoker invoker = new(new(), invokerMqttClient); + + var stream = await invoker.InvokeStreamingCommandAsync(GetStringRequestStreamWithYieldBreak()); + + await foreach (var response in stream.Entries) + { + // TODO verify expected responses + } + } + + // See 'InvokerCanCompleteRequestStreamWithYieldBreak' but on the executor side + [Fact] + public async Task ExecutorCanCompleteResponseStreamWithYieldBreak() + { + await using MqttSessionClient invokerMqttClient = await ClientFactory.CreateSessionClientFromEnvAsync(); + await using MqttSessionClient executorMqttClient = await ClientFactory.CreateSessionClientFromEnvAsync(); + + await using EchoStringStreamingCommandExecutor executor = + new(new(), executorMqttClient) + { + OnStreamingCommandReceived = SerialHandlerMultipleResponsesWithYieldBreakAfterFirstResponse + }; + + await executor.StartAsync(); + + await using StringStreamingCommandInvoker invoker = new(new(), invokerMqttClient); + + var stream = await invoker.InvokeStreamingCommandAsync(GetStringRequestStream(3)); + + await foreach (var response in stream.Entries) + { + // TODO verify expected responses + } + } + + [Fact] + public async Task InvokerAndExecutorCanDelayAcknowledgements() + { + await using MqttSessionClient invokerMqttClient = await ClientFactory.CreateSessionClientFromEnvAsync(); + await using MqttSessionClient executorMqttClient = await ClientFactory.CreateSessionClientFromEnvAsync(); + + await using EchoStringStreamingCommandExecutor executor = + new(new(), executorMqttClient) + { + OnStreamingCommandReceived = SerialHandlerSingleResponseManualAcks + }; + + executor.AutomaticallyAcknowledgeRequests = false; + + await executor.StartAsync(); + + await using StringStreamingCommandInvoker invoker = new(new(), invokerMqttClient); + + invoker.AutomaticallyAcknowledgeResponses = false; + + var stream = await invoker.InvokeStreamingCommandAsync(GetStringRequestStream(3)); + + await foreach (var response in stream.Entries) + { + await response.AcknowledgeAsync(); + } + } + + private async IAsyncEnumerable> GetStringRequestStream(int requestCount) + { + for (int i = 0; i < requestCount; i++) + { + await Task.Delay(TimeSpan.FromMicroseconds(1)); // Simulate asynchronous work + yield return new($"Message {i}"); + } + } + + // send N requests after each provided TCS is triggered. This allows for testing scenarios like "only send a request once a response has been received" + private async IAsyncEnumerable> GetStringRequestStreamWithDelay(params TaskCompletionSource[] delays) + { + int index = 0; + foreach (TaskCompletionSource delay in delays) + { + await delay.Task; // Simulate asynchronous work + yield return new($"Message {index++}"); + } + } + + private async IAsyncEnumerable> GetStringStreamContext(int responseCount) + { + for (int i = 0; i < responseCount; i++) + { + await Task.Delay(TimeSpan.FromMicroseconds(1)); // Simulate asynchronous work + yield return new($"Message {i}"); + } + } + + private async IAsyncEnumerable> GetStringRequestStreamWithDelay() + { + for (int i = 0; i <= 10; i++) + { + yield return new($"Message {i}"); + + await Task.Delay(TimeSpan.FromHours(1)); // Simulate asynchronous work that is stuck after the first request is sent + } + } + + // Simulate a request stream that decides between entries to close gracefully + private static async IAsyncEnumerable> GetStringRequestStreamWithYieldBreak() + { + for (int i = 0; true; i++) + { + await Task.Delay(TimeSpan.FromMicroseconds(1)); // Simulate asynchronous work + + if (i > 5) + { + yield break; + } + + yield return new($"Message {i}"); + } + } + + private async IAsyncEnumerable> SerialHandlerSingleResponse(IStreamContext> stream, RequestStreamMetadata streamMetadata) + { + await SaveReceivedRequests(stream, streamMetadata, false, stream.CancellationToken); + + await foreach (var response in GetStringStreamContext(3).WithCancellation(stream.CancellationToken)) + { + yield return response; + } + } + + private async IAsyncEnumerable> SerialHandlerExpectsCancellationWhileStreamingRequests(IStreamContext> stream, RequestStreamMetadata streamMetadata) + { + try + { + await foreach (var request in stream.Entries.WithCancellation(stream.CancellationToken)) + { + } + } + catch (OperationCanceledException) + { + // The stream was cancelled by the invoker while it streamed requests + Dictionary? cancellationUserProperties = stream.GetCancellationRequestUserProperties(); + + //TODO assert received user properties in the cancellation request + } + + yield return new("should never be reached"); + } + + private async IAsyncEnumerable> SerialHandlerMultipleResponses(IStreamContext> stream, RequestStreamMetadata streamMetadata) + { + await SaveReceivedRequests(stream, streamMetadata, false, stream.CancellationToken); + + await foreach (var response in GetStringStreamContext(3).WithCancellation(stream.CancellationToken)) + { + _sentResponses.TryAdd(streamMetadata.CorrelationId, new()); + if (_sentResponses.TryGetValue(streamMetadata.CorrelationId, out var sentResponses)) + { + sentResponses.Add(response); + } + + yield return response; + } + } + + private async IAsyncEnumerable> SerialHandlerMultipleResponsesWithYieldBreakAfterFirstResponse(IStreamContext> stream, RequestStreamMetadata streamMetadata) + { + await SaveReceivedRequests(stream, streamMetadata, false, stream.CancellationToken); + + await foreach (var response in GetStringStreamContext(3).WithCancellation(stream.CancellationToken)) + { + _sentResponses.TryAdd(streamMetadata.CorrelationId, new()); + if (_sentResponses.TryGetValue(streamMetadata.CorrelationId, out var sentResponses)) + { + sentResponses.Add(response); + } + + yield return response; + yield break; // Break after sending the first response + } + } + + private async IAsyncEnumerable> ParallelHandlerEchoResponses(IStreamContext> stream, RequestStreamMetadata streamMetadata) + { + await foreach (StreamingExtendedRequest requestStreamEntry in stream.Entries.WithCancellation(stream.CancellationToken)) + { + // doesn't overwrite if the correlationId already exists in the dictionary + _receivedRequests.TryAdd(streamMetadata.CorrelationId, new()); + + if (_receivedRequests.TryGetValue(streamMetadata.CorrelationId, out var requestsReceived)) + { + requestsReceived.Add(requestStreamEntry); + } + + yield return new(requestStreamEntry.Payload); + } + } + + private async IAsyncEnumerable> SerialHandlerMultipleResponsesWithDelay(IStreamContext> stream, RequestStreamMetadata streamMetadata) + { + await SaveReceivedRequests(stream, streamMetadata, false, stream.CancellationToken); + + var asyncEnumeratorWithCancellation = GetStringRequestStreamWithDelay().WithCancellation(stream.CancellationToken).GetAsyncEnumerator(); + + bool readingRequestStream = true; + while (readingRequestStream) + { + StreamingExtendedRequest request; + try + { + readingRequestStream = await asyncEnumeratorWithCancellation.MoveNextAsync(); + request = asyncEnumeratorWithCancellation.Current; + } + catch (OperationCanceledException) + { + // The invoker side will cancel this stream of responses (via the provided cancellation token) since it takes too long + + Dictionary? cancellationUserProperties = stream.GetCancellationRequestUserProperties(); + + //TODO assert these match the user properties sent by the invoker + + yield break; + } + + yield return new(request.Payload); + } + } + + private static async IAsyncEnumerable> SerialHandlerThatCancelsWhileStreamingRequests(IStreamContext> stream, RequestStreamMetadata streamMetadata) + { + CancellationTokenSource requestTimeoutCancellationTokenSource = new CancellationTokenSource(); + requestTimeoutCancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(1)); + + var asyncEnumeratorWithCancellation = stream.Entries.WithCancellation(requestTimeoutCancellationTokenSource.Token).GetAsyncEnumerator(); + + bool readingRequestStream = true; + while (readingRequestStream) + { + StreamingExtendedRequest request; + try + { + readingRequestStream = await asyncEnumeratorWithCancellation.MoveNextAsync(); + request = asyncEnumeratorWithCancellation.Current; + } + catch (OperationCanceledException) + { + // simulates timing out while waiting on an entry in the stream and the executor deciding to cancel the stream as a result + await stream.CancelAsync(); + yield break; + } + + yield return new(request.Payload); + } + } + + private async IAsyncEnumerable> SerialHandlerThatCancelsStreamingWhileStreamingResponses(IStreamContext> stream, RequestStreamMetadata streamMetadata) + { + await SaveReceivedRequests(stream, streamMetadata, false, stream.CancellationToken); + + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(1)); + for (int responseCount = 0; responseCount < 5; responseCount++) + { + if (responseCount == 3) + { + Dictionary cancellationCustomUserProperties = new() + { + { "someUserPropertyKey", "someUserPropertyValue"} + }; + + await stream.CancelAsync(cancellationCustomUserProperties); + yield break; + } + + yield return new StreamingExtendedResponse("some response"); + } + } + + private async IAsyncEnumerable> SerialHandlerSingleResponseManualAcks(IStreamContext> stream, RequestStreamMetadata streamMetadata) + { + await SaveReceivedRequests(stream, streamMetadata, true, stream.CancellationToken); + + await foreach (var response in GetStringStreamContext(3).WithCancellation(stream.CancellationToken)) + { + yield return response; + } + } + + private async Task SaveReceivedRequests(IStreamContext> stream, RequestStreamMetadata streamMetadata, bool manualAcks, CancellationToken cancellationToken) + { + await foreach (ReceivedStreamingExtendedRequest requestStreamEntry in stream.Entries.WithCancellation(cancellationToken)) + { + // doesn't overwrite if the correlationId already exists in the dictionary + _receivedRequests.TryAdd(streamMetadata.CorrelationId, new()); + + if (_receivedRequests.TryGetValue(streamMetadata.CorrelationId, out var requestsReceived)) + { + requestsReceived.Add(requestStreamEntry); + } + + if (manualAcks) + { + await requestStreamEntry.AcknowledgeAsync(); + } + } + } + } +}