Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions src/Dapr.AI/Conversation/ConversationStreamProcessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// ------------------------------------------------------------------------
// Copyright 2025 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

using System.Runtime.CompilerServices;
using System.Threading.Channels;
using Grpc.Core;
using Autogenerated = Dapr.Client.Autogen.Grpc.v1;

namespace Dapr.AI.Conversation;

/// <summary>
/// Provides the implementation to process the streamed response from a conversation endpoint invocation.
/// </summary>
internal sealed class ConversationStreamProcessor : IDisposable
{
private bool disposed;
private readonly Channel<string> outputChannel = Channel.CreateUnbounded<string>();

/// <summary>
/// Surfaces any exceptions encountered while asynchronously processing the outbound stream.
/// </summary>
internal event EventHandler<Exception>? OnException;

/// <summary>
/// Reads the chunks out asynchronously from the streaming source into the channel.
/// </summary>
/// <param name="call">The call made to the Dapr sidecar to process the response from.</param>
/// <param name="cancellationToken">Token used to cancel the ongoing request.</param>
public Task ProcessStreamAsync(
AsyncServerStreamingCall<Autogenerated.ConversationStreamResponse> call,
CancellationToken cancellationToken)
{
// Start reading from the gRPC call and writing to the output channel.
_ = Task.Run(async () =>
{
try
{
await foreach (var response in call.ResponseStream.ReadAllAsync(cancellationToken))
{
await outputChannel.Writer.WriteAsync(response.Chunk.Content, cancellationToken);
}
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// Expected cancellation exception
}
catch (Exception ex)
{
OnException?.Invoke(this, ex);
}
finally
{
outputChannel.Writer.Complete();
}
}, cancellationToken);
return Task.CompletedTask;
}

public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

private void Dispose(bool disposing)
{
if (!disposed)
{
if (disposing)
{
outputChannel.Writer.TryComplete();
}

disposed = true;
}
}

/// <summary>
/// Retrieves the processed content from the operation from the Dapr sidecar and returns as an
/// enumerable stream.
/// </summary>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns></returns>
public async IAsyncEnumerable<string> GetProcessedDataAsync([EnumeratorCancellation] CancellationToken cancellationToken)
{
await foreach (var data in outputChannel.Reader.ReadAllAsync(cancellationToken))
{
yield return data;
}
}
}
15 changes: 15 additions & 0 deletions src/Dapr.AI/Conversation/DaprConversationClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,19 @@ protected DaprConversationClient(Autogenerated.DaprClient client,
public abstract Task<DaprConversationResponse> ConverseAsync(string daprConversationComponentName,
IReadOnlyList<DaprConversationInput> inputs, ConversationOptions? options = null,
CancellationToken cancellationToken = default);

/// <summary>
/// Sends various inputs to the large language model via the Conversational building block on the Dapr sidecar
/// and get a streamed response back.
/// </summary>
/// <param name="daprConversationComponentName">The name of the Dapr conversation component.</param>
/// <param name="inputs">The input values to send.</param>
/// <param name="options">Optional options used to configure the conversation.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The response provided as a stream by the LLM provider.</returns>
public abstract IAsyncEnumerable<string> ConverseAsStreamAsync(
string daprConversationComponentName,
IReadOnlyList<DaprConversationInput> inputs,
ConversationOptions? options = null,
CancellationToken cancellationToken = default);
}
71 changes: 62 additions & 9 deletions src/Dapr.AI/Conversation/DaprConversationGrpcClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
// limitations under the License.
// ------------------------------------------------------------------------

using System.Runtime.CompilerServices;
using Dapr.Common;
using Dapr.Common.Extensions;
using Grpc.Core;
using Autogenerated = Dapr.Client.Autogen.Grpc.v1;

namespace Dapr.AI.Conversation;
Expand All @@ -35,6 +37,63 @@ internal sealed class DaprConversationGrpcClient(Autogenerated.Dapr.DaprClient c
/// <returns>The response(s) provided by the LLM provider.</returns>
public override async Task<DaprConversationResponse> ConverseAsync(string daprConversationComponentName, IReadOnlyList<DaprConversationInput> inputs, ConversationOptions? options = null,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrEmpty(daprConversationComponentName);

//Build out the common request and gRPC options to the endpoint
var (request, grpcCallOptions) = BuildRequest(daprConversationComponentName, inputs, options, cancellationToken);

var result = await Client.ConverseAlpha1Async(request, grpcCallOptions).ConfigureAwait(false);
var outputs = result.Outputs.Select(output => new DaprConversationResult(output.Result)
{
Parameters = output.Parameters.ToDictionary(kvp => kvp.Key, parameter => parameter.Value)
}).ToList();

return new DaprConversationResponse(outputs);
}

/// <summary>
/// Sends various inputs to the large language model via the Conversational building block on the Dapr sidecar
/// and get a streamed response back.
/// </summary>
/// <param name="daprConversationComponentName">The name of the Dapr conversation component.</param>
/// <param name="inputs">The input values to send.</param>
/// <param name="options">Optional options used to configure the conversation.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The response provided as a stream by the LLM provider.</returns>
public override async IAsyncEnumerable<string> ConverseAsStreamAsync(
string daprConversationComponentName,
IReadOnlyList<DaprConversationInput> inputs,
ConversationOptions? options = null,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrEmpty(daprConversationComponentName);

EventHandler<Exception> exceptionHandler = (_, ex) => throw ex;

//Build out the common request and gRPC options to the endpoint
var (request, grpcCallOptions) = BuildRequest(daprConversationComponentName, inputs, options, cancellationToken);

var streamResponse = Client.ConverseStreamAlpha1(request, grpcCallOptions);
using var streamProcessor = new ConversationStreamProcessor();
try
{
streamProcessor.OnException += exceptionHandler;
await streamProcessor.ProcessStreamAsync(streamResponse, cancellationToken);

await foreach (var content in streamProcessor.GetProcessedDataAsync(cancellationToken))
{
yield return content;
}
}
finally
{
streamProcessor.OnException -= exceptionHandler;
}
}

private (Autogenerated.ConversationRequest request, CallOptions grpcCallOptions) BuildRequest(string daprConversationComponentName,
IReadOnlyList<DaprConversationInput> inputs, ConversationOptions? options, CancellationToken cancellationToken)
{
var request = new Autogenerated.ConversationRequest
{
Expand Down Expand Up @@ -70,18 +129,12 @@ public override async Task<DaprConversationResponse> ConverseAsync(string daprCo
Role = input.Role.GetValueFromEnumMember()
});
}

var grpCCallOptions =
var grpcCallOptions =
DaprClientUtilities.ConfigureGrpcCallOptions(typeof(DaprConversationClient).Assembly, this.DaprApiToken,
cancellationToken);

var result = await Client.ConverseAlpha1Async(request, grpCCallOptions).ConfigureAwait(false);
var outputs = result.Outputs.Select(output => new DaprConversationResult(output.Result)
{
Parameters = output.Parameters.ToDictionary(kvp => kvp.Key, parameter => parameter.Value)
}).ToList();

return new DaprConversationResponse(outputs);
return (request, grpcCallOptions);
}

/// <inheritdoc />
Expand Down
35 changes: 35 additions & 0 deletions src/Dapr.AI/Conversation/DaprConversationUsage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// ------------------------------------------------------------------------
// Copyright 2025 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

namespace Dapr.AI.Conversation;

/// <summary>
/// Represents token usage statistics.
/// </summary>
public record DaprConversationUsage
{
/// <summary>
/// The number of tokens in the prompt.
/// </summary>
public int? PromptTokens { get; init; }

/// <summary>
/// The number of tokens in the completion.
/// </summary>
public int? CompletionTokens { get; init; }

/// <summary>
/// The total number of tokens used.
/// </summary>
public int? TotalTokens { get; init; }
}
40 changes: 40 additions & 0 deletions src/Dapr.Protos/Protos/dapr/proto/runtime/v1/dapr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,9 @@ service Dapr {

// Converse with a LLM service
rpc ConverseAlpha1(ConversationRequest) returns (ConversationResponse) {}

// Converse with a LLM service using streaming
rpc ConverseStreamAlpha1(ConversationRequest) returns (stream ConversationStreamResponse) {}
}

// InvokeServiceRequest represents the request message for Service invocation.
Expand Down Expand Up @@ -1357,4 +1360,41 @@ message ConversationResponse {

// An array of results.
repeated ConversationResult outputs = 2;

// Usage statistics if available
optional ConversationUsage usage = 3;
}

// ConversationStreamResponse is the streaming response for Conversation.
message ConversationStreamResponse {
oneof response_type {
ConversationStreamChunk chunk = 1;
ConversationStreamComplete complete = 2;
}
}

// ConversationStreamChunk represents a streaming content chunk.
message ConversationStreamChunk {
// Streaming content chunk
string content = 1;
}

// ConversationStreamComplete indicates the streaming conversation has completed.
message ConversationStreamComplete {
// Final context ID
optional string contextID = 1;
// Usage statistics if available
optional ConversationUsage usage = 2;
}



// ConversationUsage represents token usage statistics.
message ConversationUsage {
// Number of tokens in the prompt
optional int32 prompt_tokens = 1 [json_name = "promptTokens"];
// Number of tokens in the completion
optional int32 completion_tokens = 2 [json_name = "completionTokens"];
// Total number of tokens used
optional int32 total_tokens = 3 [json_name = "totalTokens"];
}