Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
eb975a4
asdf
timtay-microsoft Jun 26, 2025
4d27162
noodling so far
timtay-microsoft Jun 30, 2025
0163c76
sadf
timtay-microsoft Jun 30, 2025
73b12d7
maybe
timtay-microsoft Jun 30, 2025
4080104
doc
timtay-microsoft Jun 30, 2025
4ac4003
more
timtay-microsoft Jul 7, 2025
af7c829
asdf
timtay-microsoft Jul 7, 2025
215eb64
asdf
timtay-microsoft Jul 17, 2025
615ba26
thoughts
timtay-microsoft Jul 18, 2025
ad17af3
more
timtay-microsoft Jul 18, 2025
a2c2f6e
notes
timtay-microsoft Jul 22, 2025
6bf1bcc
caching
timtay-microsoft Jul 22, 2025
180d85e
more thoughts
timtay-microsoft Jul 24, 2025
599a431
Merge branch 'main' into timtay/streaming
timtay-microsoft Jul 25, 2025
a1d2e21
no new error code, some gRPC notes
timtay-microsoft Jul 25, 2025
e680263
save impl for later
timtay-microsoft Jul 25, 2025
629dc2f
ordering q
timtay-microsoft Jul 25, 2025
f2a2c60
wording
timtay-microsoft Jul 25, 2025
710a425
links
timtay-microsoft Jul 25, 2025
0006c02
backwards
timtay-microsoft Jul 25, 2025
b457888
first thoughts on cancellation, re-order doc a bit
timtay-microsoft Jul 25, 2025
f16ad43
more notes, more re-ordering
timtay-microsoft Jul 25, 2025
e7c98d5
cleanup
timtay-microsoft Jul 28, 2025
1c1964b
Only allow cancelling streaming commands
timtay-microsoft Jul 28, 2025
edecd39
Merge branch 'main' into timtay/streaming
timtay-microsoft Jul 28, 2025
61a5212
typo
timtay-microsoft Jul 28, 2025
79af301
code changes in another branch
timtay-microsoft Jul 28, 2025
c7fb69e
more
timtay-microsoft Jul 28, 2025
aa2dc81
more
timtay-microsoft Jul 28, 2025
0784a10
Update ExtendedResponse.cs
timtay-microsoft Jul 28, 2025
ff9efc9
Update AkriSystemProperties.cs
timtay-microsoft Jul 28, 2025
0ff4dbe
Update 0025-rpc-streaming.md
timtay-microsoft Jul 30, 2025
4aef0a4
Remove responseId concept. User will do this with their own user prop…
timtay-microsoft Aug 1, 2025
9dbb174
Incorporate a lot of feedback
timtay-microsoft Aug 1, 2025
49b57c7
cleanup
timtay-microsoft Aug 1, 2025
ac799bc
canceled error code instead of header
timtay-microsoft Aug 1, 2025
04f1ce9
timeout thoughts
timtay-microsoft Aug 1, 2025
14ca259
Merge branch 'main' into timtay/streaming
timtay-microsoft Aug 1, 2025
353a2fd
asdf
timtay-microsoft Aug 4, 2025
439c5f8
Merge branch 'timtay/streaming' of https://github.com/Azure/iot-opera…
timtay-microsoft Aug 4, 2025
e462a3f
reword
timtay-microsoft Aug 4, 2025
4c4cb0b
API fix
timtay-microsoft Aug 4, 2025
5050ada
not needed
timtay-microsoft Aug 4, 2025
9caa59d
non-req
timtay-microsoft Aug 4, 2025
58d8fc1
fix type
timtay-microsoft Aug 5, 2025
fead2ad
timeout musings
timtay-microsoft Aug 6, 2025
7c83f06
wording
timtay-microsoft Aug 7, 2025
fb9de69
Update 0025-rpc-streaming.md
timtay-microsoft Aug 7, 2025
29e1811
Update 0025-rpc-streaming.md
timtay-microsoft Aug 7, 2025
c15790c
Update 0025-rpc-streaming.md
timtay-microsoft Aug 7, 2025
98f8763
Update 0025-rpc-streaming.md
timtay-microsoft Aug 7, 2025
bf8c252
Address feedback
timtay-microsoft Sep 3, 2025
8e6a6d8
note
timtay-microsoft Sep 3, 2025
59afded
more
timtay-microsoft Sep 4, 2025
69d7781
More, complete streams at any time
timtay-microsoft Sep 5, 2025
4a5b22d
ExecutorId is mandatory
timtay-microsoft Sep 5, 2025
91f55f3
fix .NET APIs
timtay-microsoft Sep 6, 2025
0c7fd4a
disconnection considerations
timtay-microsoft Sep 6, 2025
66556a5
De-duping?
timtay-microsoft Sep 6, 2025
6b15e19
Revert "De-duping?"
timtay-microsoft Sep 6, 2025
bddf3f9
executorId in tests
timtay-microsoft Sep 8, 2025
f5dff71
Merge branch 'main' into timtay/streaming
timtay-microsoft Sep 8, 2025
c39c41a
fix
timtay-microsoft Sep 8, 2025
6c3f1ec
Merge branch 'timtay/streaming' of https://github.com/Azure/iot-opera…
timtay-microsoft Sep 8, 2025
abe8ae8
No more executor Id, just use $partition
timtay-microsoft Sep 10, 2025
e6cef75
de-dup + qos 1 clarification
timtay-microsoft Sep 10, 2025
f4929a2
fixup
timtay-microsoft Sep 10, 2025
8322019
Optionally delay acknowledgements
timtay-microsoft Sep 10, 2025
1c6c3ad
cancellation user properties so far
timtay-microsoft Sep 10, 2025
e8d75e4
more cancellation user properties support
timtay-microsoft Sep 12, 2025
645f306
message level timeout is back
timtay-microsoft Sep 15, 2025
3e0f863
fixup
timtay-microsoft Sep 15, 2025
886906c
fixup
timtay-microsoft Sep 15, 2025
040940d
timeout vs cancellation
timtay-microsoft Sep 15, 2025
c7cba20
more
timtay-microsoft Sep 15, 2025
8f72c15
isLast
timtay-microsoft Sep 16, 2025
5c5d4b3
unused
timtay-microsoft Sep 16, 2025
1e59bd9
expiry interval note
timtay-microsoft Sep 16, 2025
266a809
message expiry purpose
timtay-microsoft Sep 17, 2025
10f55dc
broker behavior
timtay-microsoft Sep 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 49 additions & 19 deletions doc/dev/adr/0025-rpc-streaming.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,30 @@
- When exposed to the user, each response includes an index of where it was in the stream
- Allow for multiple separate commands to be streamed simultaneously
- Even the same command can be executed in parallel to itself?
- Allow for invoker to cancel streamed responses mid-stream

## Non-requirements
- Different payload shapes per command response
- "Client Streaming" RPC (multiples requests -> One command response)
- Bi-directional streaming RPC (multiples requests -> multiple responses)
- Allow for executor to cancel streamed responses mid-stream

## State of the art

What does gRPC do?
gRPC supports these patterns for RPC:
- [Unary RPC](https://grpc.io/docs/what-is-grpc/core-concepts/#unary-rpc) (1 request message, 1 response message)
- [Server streaming RPC](https://grpc.io/docs/what-is-grpc/core-concepts/#server-streaming-rpc) (1 request message, many response messages)
- [Client streaming RPC](https://grpc.io/docs/what-is-grpc/core-concepts/#server-streaming-rpc) (many request messages, one response message)
- [Bi-directional streaming RPC](https://grpc.io/docs/what-is-grpc/core-concepts/#bidirectional-streaming-rpc) (many request messages, many response messages)

gRPC relies on the HTTP streaming protocol to delineate each message in the stream and to indicate the end of the stream.

[gRPC also allows for either the client or server to cancel an RPC at any time](https://grpc.io/docs/what-is-grpc/core-concepts/#cancelling-an-rpc)

## Decision

### API design, .NET

Our command invoker base class will now include a new method ```InvokeCommandWithStreaming``` to go with the existing ```InvokeCommand``` method.

This new method will take the same parameters as ```InvokeCommand``` but will return an asynchronously iterable list (or callback depending on language?) of command response objects.
Expand Down Expand Up @@ -95,11 +109,7 @@

With this design, commands that use streaming are defined at codegen time. Codegen layer changes will be defined in a separate ADR, though.

## Example with code gen

TODO which existing client works well for long-running commands? Mem mon ("Report usage for 10 seconds at 1 second intervals")?

### MQTT layer implementation
### MQTT layer protocol

#### Command invoker side

Expand All @@ -117,24 +127,47 @@
- Each streamed response may contain an MQTT user property with name "__streamRespId" and value equal to that response's streaming response Id. This is an optional and user-provided value.
- The final command response will include an MQTT user property "__isLastResp" with value "true" to signal that it is the final response in the stream.
- A streaming command is allowed to have a single response. It must include the "__isLastResp" flag in that first/final response
- All **completed** streamed command responses will be added to the command response cache
- If we cache incompleted commands, will the cache hit just wait on cache additions to get the remaining responses?
- Cache exists for de-duplication, and we want that even for long-running RPC, right?
- Separate cache for data structure purposes?
- Cache is only updated once the stream has completed and it is updated to include all of the responses (in order) for the command so they can be re-played if the streaming command is invoked again by the same client

- The command executor receives a command **without** "__streamResp" flag set to "true"
- The command must be responded to without streaming

### Cancellation support

To avoid scenarios where long-running streaming responses are no longer wanted, we will want to support cancelling RPC calls. This feature is moreso applicable for RPC streaming, but the design allows for it to work for non-streaming RPC as well.

Check warning on line 137 in doc/dev/adr/0025-rpc-streaming.md

View workflow job for this annotation

GitHub Actions / CI-spelling

Misspelled word (moreso) Suggestions: (more*, more so*, mores, Moreno, morse)

#### Invoker side

- The command invoker may cancel a normal or streaming RPC call at an arbitrary time by sending an MQTT message with:
- The same MQTT topic as the invoked method
- The same correlation data as the invoked method
- The user property "__cancelRpc" set to "true".
- No payload
- TODO what would API look like? gRPC uses cancellation token
- The command invoker should still listen on the response topic for a response from the executor which may still contain a successful response (if cancellation was received after the command completed successfully)

#### Executor side

Regardless of if an RPC is streaming or not, upon receiving an MQTT message with the "__cancelRpc" flag set to "true", the command executor should:
- Notify the application layer that that RPC has been canceled if it is still running
- Send an MQTT message to the appropriate response topic with error code "canceled" to notify the invoker that the RPC has stopped and no further responses will be sent.

If the executor receives a cancellation request for a command that has already completed, then the cancellation request should be ignored.

### Protocol version update

This feature is not backwards compatible (old invoker can't communicate with new executor that may try to stream a response), so it requires a bump in our RPC protocol version from "1.0" to "2.0".
This RPC streaming feature is not backwards compatible (new invoker can't initiate what it believes is a streaming RPC call on an old executor), so it requires a bump in our RPC protocol version from "1.0" to "2.0".

TODO: Start defining a doc in our repo that defines what features are present in what protocol version.

## Example with code gen

TODO which existing client works well for long-running commands? Mem mon ("Report usage for 10 seconds at 1 second intervals")?

## Alternative designs considered

- Allow the command executor to decide at run time of each command if it will stream responses independent of the command invoker's request
- This would force users to call the ```InvokeCommandWithStreaming``` API on the command invoker side and that returned object isn't as easy to use for single responses
- This would force users to always call the ```InvokeCommandWithStreaming``` API on the command invoker side and that returned object isn't as easy to use for single responses
- Treat streaming RPC as a separate protocol from RPC, give it its own client like ```CommandInvoker``` and ```TelemetrySender```
- There is a lot of code re-use between RPC and streaming RPC so this would make implementation very inconvenient
- This would introduce another protocol to version. Future RPC changes would likely be relevant to RPC streaming anyways, so this feels redundant.
Expand All @@ -149,15 +182,12 @@
- RPC executor treats it like a non-streaming command, but adds the "__isLastResp" flag to the one and only response
- RPC invoker tries to invoke a non-streaming command that the executor requires streaming on
- Atypical case since codegen will prevent this
- But, for the sake of non-codegen users, a new error code "StreamingRequired" would be returned by the executor
- Or should this just be "invalid header" error since the executor expects the "__streamResp" header?
- timeout per response vs overall?
- But, for the sake of non-codegen users, executor returns "invalid header" error pointing to the "__streamResp" header
- Invoker understands that, if the "invalid header" value is "__streamResp", it attempted a invoke a streaming method
- timeout per response vs overall? Both?

## Open Questions

- Do we need to include response index user property on each streamed response?
- MQTT message ordering suggests this information can just be inferred by the command invoker
- Command timeout/cancellation tokens in single vs streaming?
- When to ack the streaming request?
- In normal RPC, request is Ack'd only after the method finishes invocation, but this would likely clog up Acks since streaming requests can take a while.
- In normal RPC, request is Ack'd only after the method finishes invocation. Waiting until a streamed RPC finishes could clog up Acks since streaming requests can take a while.
- Ack after first response is generated?
12 changes: 6 additions & 6 deletions doc/reference/error-model.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@
ExecutionException,
MqttError,
UnsupportedVersion,
StreamingRequired
Canceled,
}
```

Expand Down Expand Up @@ -264,7 +264,7 @@
EXECUTION_EXCEPTION,
MQTT_ERROR,
UNSUPPORTED_VERSION,
STREAMING_REQUIRED,
CANCELED,
}
```

Expand Down Expand Up @@ -329,7 +329,7 @@
ExecutionException,
MqttError,
UnsupportedVersion,
StreamingRequired,
Canceled,
}
```

Expand Down Expand Up @@ -403,7 +403,7 @@
ExecutionError
MqttError
UnsupportedVersion
StreamingRequired
Canceled
}
```

Expand All @@ -428,7 +428,7 @@
}
```

The `AkriMqttError` struct must provide an `Error()` method, but the detais are omitted from this document:

Check warning on line 431 in doc/reference/error-model.md

View workflow job for this annotation

GitHub Actions / CI-spelling

Unknown word (detais) Suggestions: (detail, detain, details, detains, deaths)

```go
func (err AkriMqttError) Error() string {
Expand Down Expand Up @@ -459,7 +459,7 @@
EXECUTION_EXCEPTION = 10
MQTT_ERROR = 11
UNSUPPORTED_VERSION = 12
STREAMING_REQUIRED = 13
CANCELED = 13
```

The Akri.Mqtt error type is defined as follows:
Expand Down Expand Up @@ -575,7 +575,7 @@
| 400 | Bad Request | false | no | | invalid payload |
| 408 | Request Timeout | false | yes | yes | timeout |
| 415 | Unsupported Media Type | false | yes | yes | invalid header |
| 452 | Streaming Required | false | no | no | streaming required |
| 452 | Request Cancelled | false | no | no | canceled |
| 500 | Internal Server Error | false | no | | unknown error |
| 500 | Internal Server Error | false | yes | | internal logic error |
| 500 | Internal Server Error | true | maybe | | execution error |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,14 @@
internal const string CommandInvokerId = ReservedPrefix + "invId";

/// <summary>
/// Inidicates that an RPC request expects the executor to
/// Inidicates that an RPC request expects the executor to stream one or many responses.

Check warning on line 83 in dotnet/src/Azure.Iot.Operations.Protocol/AkriSystemProperties.cs

View workflow job for this annotation

GitHub Actions / CI-spelling

Unknown word (Inidicates) Suggestions: (indicates, indicate, initiates, indices, indicts)
/// </summary>
internal const string IsStreamingCommand = ReservedPrefix + "stream";
internal const string IsStreamingCommand = ReservedPrefix + "streamResp";

/// <summary>
/// Inidicates that an RPC request should be cancelled if it is still executing

Check warning on line 88 in dotnet/src/Azure.Iot.Operations.Protocol/AkriSystemProperties.cs

View workflow job for this annotation

GitHub Actions / CI-spelling

Unknown word (Inidicates) Suggestions: (indicates, indicate, initiates, indices, indicts)
/// </summary>
internal const string CancelCommand = ReservedPrefix + "cancelRpc";

internal static bool IsReservedUserProperty(string name)
{
Expand Down

This file was deleted.

Loading
Loading