Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
eb975a4
asdf
timtay-microsoft Jun 26, 2025
4d27162
noodling so far
timtay-microsoft Jun 30, 2025
0163c76
sadf
timtay-microsoft Jun 30, 2025
73b12d7
maybe
timtay-microsoft Jun 30, 2025
4080104
doc
timtay-microsoft Jun 30, 2025
4ac4003
more
timtay-microsoft Jul 7, 2025
af7c829
asdf
timtay-microsoft Jul 7, 2025
215eb64
asdf
timtay-microsoft Jul 17, 2025
615ba26
thoughts
timtay-microsoft Jul 18, 2025
ad17af3
more
timtay-microsoft Jul 18, 2025
a2c2f6e
notes
timtay-microsoft Jul 22, 2025
6bf1bcc
caching
timtay-microsoft Jul 22, 2025
180d85e
more thoughts
timtay-microsoft Jul 24, 2025
599a431
Merge branch 'main' into timtay/streaming
timtay-microsoft Jul 25, 2025
a1d2e21
no new error code, some gRPC notes
timtay-microsoft Jul 25, 2025
e680263
save impl for later
timtay-microsoft Jul 25, 2025
629dc2f
ordering q
timtay-microsoft Jul 25, 2025
f2a2c60
wording
timtay-microsoft Jul 25, 2025
710a425
links
timtay-microsoft Jul 25, 2025
0006c02
backwards
timtay-microsoft Jul 25, 2025
b457888
first thoughts on cancellation, re-order doc a bit
timtay-microsoft Jul 25, 2025
f16ad43
more notes, more re-ordering
timtay-microsoft Jul 25, 2025
e7c98d5
cleanup
timtay-microsoft Jul 28, 2025
1c1964b
Only allow cancelling streaming commands
timtay-microsoft Jul 28, 2025
edecd39
Merge branch 'main' into timtay/streaming
timtay-microsoft Jul 28, 2025
61a5212
typo
timtay-microsoft Jul 28, 2025
79af301
code changes in another branch
timtay-microsoft Jul 28, 2025
c7fb69e
more
timtay-microsoft Jul 28, 2025
aa2dc81
more
timtay-microsoft Jul 28, 2025
0784a10
Update ExtendedResponse.cs
timtay-microsoft Jul 28, 2025
ff9efc9
Update AkriSystemProperties.cs
timtay-microsoft Jul 28, 2025
0ff4dbe
Update 0025-rpc-streaming.md
timtay-microsoft Jul 30, 2025
4aef0a4
Remove responseId concept. User will do this with their own user prop…
timtay-microsoft Aug 1, 2025
9dbb174
Incorporate a lot of feedback
timtay-microsoft Aug 1, 2025
49b57c7
cleanup
timtay-microsoft Aug 1, 2025
ac799bc
canceled error code instead of header
timtay-microsoft Aug 1, 2025
04f1ce9
timeout thoughts
timtay-microsoft Aug 1, 2025
14ca259
Merge branch 'main' into timtay/streaming
timtay-microsoft Aug 1, 2025
353a2fd
asdf
timtay-microsoft Aug 4, 2025
439c5f8
Merge branch 'timtay/streaming' of https://github.com/Azure/iot-opera…
timtay-microsoft Aug 4, 2025
e462a3f
reword
timtay-microsoft Aug 4, 2025
4c4cb0b
API fix
timtay-microsoft Aug 4, 2025
5050ada
not needed
timtay-microsoft Aug 4, 2025
9caa59d
non-req
timtay-microsoft Aug 4, 2025
58d8fc1
fix type
timtay-microsoft Aug 5, 2025
fead2ad
timeout musings
timtay-microsoft Aug 6, 2025
7c83f06
wording
timtay-microsoft Aug 7, 2025
fb9de69
Update 0025-rpc-streaming.md
timtay-microsoft Aug 7, 2025
29e1811
Update 0025-rpc-streaming.md
timtay-microsoft Aug 7, 2025
c15790c
Update 0025-rpc-streaming.md
timtay-microsoft Aug 7, 2025
98f8763
Update 0025-rpc-streaming.md
timtay-microsoft Aug 7, 2025
bf8c252
Address feedback
timtay-microsoft Sep 3, 2025
8e6a6d8
note
timtay-microsoft Sep 3, 2025
59afded
more
timtay-microsoft Sep 4, 2025
69d7781
More, complete streams at any time
timtay-microsoft Sep 5, 2025
4a5b22d
ExecutorId is mandatory
timtay-microsoft Sep 5, 2025
91f55f3
fix .NET APIs
timtay-microsoft Sep 6, 2025
0c7fd4a
disconnection considerations
timtay-microsoft Sep 6, 2025
66556a5
De-duping?
timtay-microsoft Sep 6, 2025
6b15e19
Revert "De-duping?"
timtay-microsoft Sep 6, 2025
bddf3f9
executorId in tests
timtay-microsoft Sep 8, 2025
f5dff71
Merge branch 'main' into timtay/streaming
timtay-microsoft Sep 8, 2025
c39c41a
fix
timtay-microsoft Sep 8, 2025
6c3f1ec
Merge branch 'timtay/streaming' of https://github.com/Azure/iot-opera…
timtay-microsoft Sep 8, 2025
abe8ae8
No more executor Id, just use $partition
timtay-microsoft Sep 10, 2025
e6cef75
de-dup + qos 1 clarification
timtay-microsoft Sep 10, 2025
f4929a2
fixup
timtay-microsoft Sep 10, 2025
8322019
Optionally delay acknowledgements
timtay-microsoft Sep 10, 2025
1c6c3ad
cancellation user properties so far
timtay-microsoft Sep 10, 2025
e8d75e4
more cancellation user properties support
timtay-microsoft Sep 12, 2025
645f306
message level timeout is back
timtay-microsoft Sep 15, 2025
3e0f863
fixup
timtay-microsoft Sep 15, 2025
886906c
fixup
timtay-microsoft Sep 15, 2025
040940d
timeout vs cancellation
timtay-microsoft Sep 15, 2025
c7cba20
more
timtay-microsoft Sep 15, 2025
8f72c15
isLast
timtay-microsoft Sep 16, 2025
5c5d4b3
unused
timtay-microsoft Sep 16, 2025
1e59bd9
expiry interval note
timtay-microsoft Sep 16, 2025
266a809
message expiry purpose
timtay-microsoft Sep 17, 2025
10f55dc
broker behavior
timtay-microsoft Sep 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 151 additions & 0 deletions doc/dev/adr/0025-rpc-streaming.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
# ADR 25: RPC Streaming

## Context

Users have expressed a desire to allow more than one response per RPC invocation. This would enable scenarios like:

- Execute long-running commands while still being responsive
-

## Requirements
- Allow for an arbitrary number of command responses for a single command invocation
- The total number of responses does not need to be known before the first response is sent
- When exposed to the user, each response includes an index of where it was in the stream
- Allow for multiple separate commands to be streamed simultaneously
- Even the same command can be executed in parallel to itself?

## Non-requirements
- Different payload shapes per command response

## State of the art

What does gRPC do?

## Decision

Our command invoker base class will now include a new method ```InvokeCommandWithStreaming``` to go with the existing ```InvokeCommand``` method.

This new method will take the same parameters as ```InvokeCommand``` but will return an asynchronously iterable list (or callback depending on language?) of command response objects.

```csharp
public abstract class CommandInvoker<TReq, TResp>
where TReq : class
where TResp : class
{
// Single response
public Task<ExtendedResponse<TResp>> InvokeCommandAsync(TReq request, ...) {...}

// Many responses, responses may be staggered
public IAsyncEnumerable<StreamingExtendedResponse<TResp>> InvokeStreamingCommandAsync(TReq request, ...) {...}
}
```

Additionally, this new method will return an extended version of the ```ExtendedResponse``` wrapper that will include the streaming-specific information about each response:

```csharp
public class StreamingExtendedResponse<TResp> : ExtendedResponse<TResp>
where TResp : class
{
/// <summary>
/// An optional Id for this response (relative to the other responses in this response stream)
/// </summary>
/// <remarks>
/// Users are allowed to provide Ids for each response, only for specific responses, or for none of the responses.
/// </remarks>
public string? StreamingResponseId { get; set; }

/// <summary>
/// The index of this response relative to the other responses in this response stream. Starts at 0.
/// </summary>
public int StreamingResponseIndex { get; set; }

/// <summary>
/// If true, this response is the final response in this response stream.
/// </summary>
public bool IsLastResponse { get; set; }
}
```

On the executor side, we will define a separate callback that executes whenever a streaming command is invoked. Instead of returning the single response, this callback will return the asynchronously iterable list of responses. Importantly, this iterable may still be added to by the user after this callback has finished.

```csharp
public abstract class CommandExecutor<TReq, TResp> : IAsyncDisposable
where TReq : class
where TResp : class
{
/// <summary>
/// The callback to execute each time a non-streaming command request is received.
/// </summary>
/// <remarks>
/// This callback may be null if this command executor only supports commands that stream responses.
/// </remarks>
public Func<ExtendedRequest<TReq>, CancellationToken, Task<ExtendedResponse<TResp>>>? OnCommandReceived { get; set; }

/// <summary>
/// The callback to execute each time a command request that expects streamed responses is received.
/// </summary>
/// <remarks>
/// The callback provides the request itself and requires the user to return one to many responses. This callback may be null
/// if this command executors doesn't have any streaming commands.
/// </remarks>
public Func<ExtendedRequest<TReq>, CancellationToken, Task<IAsyncEnumerable<StreamingExtendedResponse<TResp>>>>? OnStreamingCommandReceived { get; set; }
}

```

With this design, commands that use streaming are defined at codegen time. Codegen layer changes will be defined in a separate ADR, though.

## Example with code gen

TODO which existing client works well for long-running commands? Mem mon ("Report usage for 10 seconds at 1 second intervals")?

### MQTT layer implementation

#### Command invoker side

- The command invoker's request message will include an MQTT user property with name "__isStream" and value "true".
- Otherwise, the request message will look the same as a non-streaming RPC request
- The command invoker will listen for command responses with the correlation data that matches the invoked method's correlation data until it receives a response with the "__isLastResp" flag
- The command invoker will acknowledge all messages it receives that match the correlation data of the command request

#### Command executor side

- All command responses will use the same MQTT message correlation data as the request provided so that the invoker can map responses to the appropriate command invocation.
- Each streamed response will contain an MQTT user property with name "__streamRespId" and value equal to that response's streaming response Id.
- The final command response will include an MQTT user property "__isLastResp" with value "true" to signal that it is the final response in the stream.
- A streaming command is allowed to have a single response. If the stream only has one response, it should include both the "__isStream" and "__isLastResp" flags set.
- All **completed** streamed command responses will be added to the command response cache
- If we cache incompleted commands, will the cache hit just wait on cache additions to get the remaining responses?

Check warning on line 118 in doc/dev/adr/0025-rpc-streaming.md

View workflow job for this annotation

GitHub Actions / CI-spelling

Unknown word (incompleted) Suggestions: (incomplete, incompletely, completed, oncomplete, incompetent)
- Cache exists for de-duplication, and we want that even for long-running RPC, right?
- Separate cache for data structure purposes?

### Protocol version update

This feature is not backwards compatible (old invoker can't communicate with new executor that may try to stream a response), so it requires a bump in our RPC protocol version from "1.0" to "2.0".

TODO: Start defining a doc in our repo that defines what features are present in what protocol version.

## Alternative designs considered

- Allow the command executor to decide at run time of each command if it will stream responses
- This would force users to call the ```InvokeCommandWithStreaming``` API on the command invoker side and that returned object isn't as easy to use for single responses
- Treat streaming RPC as a separate protocol from RPC, give it its own client like ```CommandInvoker``` and ```TelemetrySender```
- There is a lot of code re-use between RPC and streaming RPC so this would make implementation very inconvenient
- This would introduce another protocol to version. Future RPC changes would likely be relevant to RPC streaming anyways, so this feels redundant.

## Error cases

- RPC executor dies after sending X out of Y responses. Just time out waiting on X+1'th reply?
- RPC executor doesn't support streaming but receives a streaming request
- RPC executor responds with "NotSupportedVersion" error code
- RPC invoker tries to invoke a command that the executor requires streaming on
- timeout per response vs overall?

## Open Questions

- Do we need to include response index user property on each streamed response?
- MQTT message ordering suggests this information can just be inferred by the command invoker
- Command timeout/cancellation tokens in single vs streaming?
- When to ack the streaming request?
- In normal RPC, request is Ack'd only after the method finishes invocation, but this would likely clog up Acks since streaming requests can take a while.
- Ack after first response is generated?
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ public static class AkriSystemProperties
// TODO remove this once akri service is code gen'd to expect srcId instead of invId
internal const string CommandInvokerId = ReservedPrefix + "invId";

/// <summary>
/// Inidicates that an RPC request expects the executor to
/// </summary>
internal const string IsStreamingCommand = ReservedPrefix + "stream";

internal static bool IsReservedUserProperty(string name)
{
return name.Equals(Timestamp, StringComparison.Ordinal)
Expand All @@ -88,7 +93,8 @@ internal static bool IsReservedUserProperty(string name)
|| name.Equals(SupportedMajorProtocolVersions, StringComparison.Ordinal)
|| name.Equals(RequestedProtocolVersion, StringComparison.Ordinal)
|| name.Equals(SourceId, StringComparison.Ordinal)
|| name.Equals(CommandInvokerId, StringComparison.Ordinal);
|| name.Equals(CommandInvokerId, StringComparison.Ordinal)
|| name.Equals(IsStreamingCommand, StringComparison.Ordinal);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using Azure.Iot.Operations.Protocol;
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace Azure.Iot.Operations.Protocol.RPC
{
/// <summary>
/// A blocking queue that is thread safe.
/// </summary>
/// <typeparam name="T">The type of all the elements in the blocking queue.</typeparam>
/// <remarks>Note that this is a copy of the "BlockingConcurrentDelayableQueue" defined in the MQTT package, but without the "delayable" feature</remarks>
internal class BlockingConcurrentQueue<T> : IDisposable
{
private readonly ConcurrentQueue<T> _queue;
private readonly SemaphoreSlim _gate;

public BlockingConcurrentQueue()
{
_queue = new ConcurrentQueue<T>();
_gate = new(0, 1);
}

/// <summary>
/// Delete all entries from this queue.
/// </summary>
public void Clear()
{
_queue.Clear();
}

public int Count => _queue.Count;

/// <summary>
/// Enqueue the provided item.
/// </summary>
/// <param name="item">The item to enqueue.</param>
public void Enqueue(T item)
{
_queue.Enqueue(item);
_gate.Release();
}

/// <summary>
/// Block until there is a first element in the queue and that element is ready to be dequeued then dequeue and
/// return that element.
/// </summary>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The first element in the queue.</returns>
public async Task<T> DequeueAsync(CancellationToken cancellationToken = default)
{
while (true)
{
if (_queue.IsEmpty)
{
await _gate.WaitAsync(cancellationToken);
continue;
}
else
{
if (_queue.TryPeek(out T? item)
&& _queue.TryDequeue(out T? dequeuedItem))
{
return dequeuedItem;
}
else
{
await _gate.WaitAsync(cancellationToken);
cancellationToken.ThrowIfCancellationRequested();
continue;
}
}
}
}

/// <summary>
/// Wakeup any blocking calls not because a new element was added to the queue, but because
/// one or more elements in the queue is now ready.
/// </summary>
/// <remarks>
/// Generally, this method should be called every time an item in this queue is marked as ready.
/// </remarks>
public void Signal()
{
_gate.Release();
}

public void Dispose()
{
_gate.Dispose();
}
}
}
17 changes: 16 additions & 1 deletion dotnet/src/Azure.Iot.Operations.Protocol/RPC/CommandExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,22 @@ public abstract class CommandExecutor<TReq, TResp> : IAsyncDisposable
/// </remarks>
public TimeSpan ExecutionTimeout { get; set; }

public required Func<ExtendedRequest<TReq>, CancellationToken, Task<ExtendedResponse<TResp>>> OnCommandReceived { get; set; }
/// <summary>
/// The callback to execute each time a non-streaming command request is received.
/// </summary>
/// <remarks>
/// This callback may be null if this command executor only supports commands that stream responses.
/// </remarks>
public Func<ExtendedRequest<TReq>, CancellationToken, Task<ExtendedResponse<TResp>>>? OnCommandReceived { get; set; }

/// <summary>
/// The callback to execute each time a command request that expects streamed responses is received.
/// </summary>
/// <remarks>
/// The callback provides the request itself and requires the user to return one to many responses. This callback may be null
/// if this command executors doesn't have any streaming commands.
/// </remarks>
public Func<ExtendedRequest<TReq>, CancellationToken, Task<IAsyncEnumerable<StreamingExtendedResponse<TResp>>>>? OnStreamingCommandReceived { get; set; }

public string? ExecutorId { get; init; }

Expand Down
Loading
Loading