Skip to content

Commit

Permalink
changes to make it work with LocalRuntime
Browse files Browse the repository at this point in the history
  • Loading branch information
esttenorio committed Jan 23, 2025
1 parent 70c3a29 commit df98ee0
Show file tree
Hide file tree
Showing 12 changed files with 67 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Microsoft.SemanticKernel;
/// An interface that provides a channel for emitting external messages from a step.
/// In addition provide common methods like initialization and Uninitialization
/// </summary>
public interface IExternalKernelProcessMessageChannel : IExternalKernelProcessMessageChannelEmitter
public interface IExternalKernelProcessMessageChannel
{
/// <summary>
/// Initialization of the external messaging channel used
Expand All @@ -21,4 +21,12 @@ public interface IExternalKernelProcessMessageChannel : IExternalKernelProcessMe
/// </summary>
/// <returns>A <see cref="ValueTask"/></returns>
public abstract ValueTask Uninitialize();

/// <summary>
/// Emits the specified event from the step outside the SK process
/// </summary>
/// <param name="externalTopicEvent">name of the topic to be used externally as the event name</param>
/// <param name="eventData">data to be transmitted externally</param>
/// <returns></returns>
public abstract Task EmitExternalEventAsync(string externalTopicEvent, object? eventData);
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@ public abstract class KernelProcessContext
/// </summary>
/// <returns>A <see cref="Task{T}"/> where T is <see cref="KernelProcess"/></returns>
public abstract Task<KernelProcess> GetStateAsync();

public abstract Task<IExternalKernelProcessMessageChannel?> GetExternalMessageChannelAsync();

Check failure on line 31 in dotnet/src/Experimental/Process.Abstractions/KernelProcessContext.cs

View workflow job for this annotation

GitHub Actions / dotnet-build-and-test (8.0, ubuntu-latest, Release, true, integration)

Missing XML comment for publicly visible type or member 'KernelProcessContext.GetExternalMessageChannelAsync()'
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ namespace Microsoft.SemanticKernel;
public sealed class KernelProcessStepContext
{
private readonly IKernelProcessMessageChannel _stepMessageChannel;
private readonly IExternalKernelProcessMessageChannelEmitter? _externalMessageChannel;
private readonly IExternalKernelProcessMessageChannel? _externalMessageChannel;

/// <summary>
/// Initializes a new instance of the <see cref="KernelProcessStepContext"/> class.
/// </summary>
/// <param name="channel">An instance of <see cref="IKernelProcessMessageChannel"/>.</param>
/// <param name="externalMessageChannel">An instance of <see cref="IExternalKernelProcessMessageChannelEmitter"/></param>
public KernelProcessStepContext(IKernelProcessMessageChannel channel, IExternalKernelProcessMessageChannelEmitter? externalMessageChannel = null)
/// <param name="externalMessageChannel">An instance of <see cref="IExternalKernelProcessMessageChannel"/></param>
public KernelProcessStepContext(IKernelProcessMessageChannel channel, IExternalKernelProcessMessageChannel? externalMessageChannel = null)
{
this._stepMessageChannel = channel;
this._externalMessageChannel = externalMessageChannel;
Expand Down Expand Up @@ -57,7 +57,7 @@ public ValueTask EmitEventAsync(
}

/// <summary>
/// Emit an external event to through a <see cref="IExternalKernelProcessMessageChannelEmitter"/>
/// Emit an external event to through a <see cref="IExternalKernelProcessMessageChannel"/>
/// component if connected from within the SK process
/// </summary>
/// <param name="externalTopicName"></param>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public sealed class LocalKernelProcessContext : KernelProcessContext, IDisposabl
private readonly LocalProcess _localProcess;
private readonly Kernel _kernel;

internal LocalKernelProcessContext(KernelProcess process, Kernel kernel, ProcessEventProxy? eventProxy = null)
internal LocalKernelProcessContext(KernelProcess process, Kernel kernel, ProcessEventProxy? eventProxy = null, IExternalKernelProcessMessageChannel? externalMessageChannel = null)
{
Verify.NotNull(process, nameof(process));
Verify.NotNull(kernel, nameof(kernel));
Expand All @@ -22,7 +22,8 @@ internal LocalKernelProcessContext(KernelProcess process, Kernel kernel, Process
this._kernel = kernel;
this._localProcess = new LocalProcess(
process,
kernel)
kernel,
externalMessageChannel)
{
EventProxy = eventProxy
};
Expand Down Expand Up @@ -55,4 +56,9 @@ public override Task SendEventAsync(KernelProcessEvent processEvent) =>
/// Disposes of the resources used by the process.
/// </summary>
public void Dispose() => this._localProcess.Dispose();

public override Task<IExternalKernelProcessMessageChannel?> GetExternalMessageChannelAsync()

Check failure on line 60 in dotnet/src/Experimental/Process.LocalRuntime/LocalKernelProcessContext.cs

View workflow job for this annotation

GitHub Actions / dotnet-build-and-test (8.0, ubuntu-latest, Release, true, integration)

Missing XML comment for publicly visible type or member 'LocalKernelProcessContext.GetExternalMessageChannelAsync()'
{
return Task.FromResult(this._localProcess._externalMessageChannel);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ public static class LocalKernelProcessFactory
/// <param name="kernel">Required: An instance of <see cref="Kernel"/></param>
/// <param name="initialEvent">Required: The initial event to start the process.</param>
/// <returns>An instance of <see cref="KernelProcess"/> that can be used to interrogate or stop the running process.</returns>
public static async Task<LocalKernelProcessContext> StartAsync(this KernelProcess process, Kernel kernel, KernelProcessEvent initialEvent)
public static async Task<LocalKernelProcessContext> StartAsync(this KernelProcess process, Kernel kernel, KernelProcessEvent initialEvent, IExternalKernelProcessMessageChannel? externalMessageChannel = null)

Check failure on line 19 in dotnet/src/Experimental/Process.LocalRuntime/LocalKernelProcessFactory.cs

View workflow job for this annotation

GitHub Actions / dotnet-build-and-test (8.0, ubuntu-latest, Release, true, integration)

Parameter 'externalMessageChannel' has no matching param tag in the XML comment for 'LocalKernelProcessFactory.StartAsync(KernelProcess, Kernel, KernelProcessEvent, IExternalKernelProcessMessageChannel?)' (but other parameters do)
{
Verify.NotNull(initialEvent, nameof(initialEvent));

LocalKernelProcessContext processContext = new(process, kernel);
LocalKernelProcessContext processContext = new(process, kernel, null, externalMessageChannel);
await processContext.StartWithEventAsync(initialEvent).ConfigureAwait(false);
return processContext;
}
Expand Down
6 changes: 3 additions & 3 deletions dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ internal sealed class LocalProcess : LocalStep, IDisposable
/// </summary>
/// <param name="process">The <see cref="KernelProcess"/> instance.</param>
/// <param name="kernel">An instance of <see cref="Kernel"/></param>
internal LocalProcess(KernelProcess process, Kernel kernel)
: base(process, kernel)
internal LocalProcess(KernelProcess process, Kernel kernel, IExternalKernelProcessMessageChannel? externalMessageChannel = null)

Check failure on line 40 in dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs

View workflow job for this annotation

GitHub Actions / dotnet-build-and-test (8.0, ubuntu-latest, Release, true, integration)

Parameter 'externalMessageChannel' has no matching param tag in the XML comment for 'LocalProcess.LocalProcess(KernelProcess, Kernel, IExternalKernelProcessMessageChannel?)' (but other parameters do)
: base(process, kernel, null, externalMessageChannel)
{
Verify.NotNull(process.Steps);

Expand Down Expand Up @@ -207,7 +207,7 @@ private ValueTask InitializeProcessAsync()
Verify.NotNull(step.State?.Id);

localStep =
new LocalStep(step, this._kernel)
new LocalStep(step, this._kernel, null, this._externalMessageChannel)
{
ParentProcessId = this.Id,
EventProxy = this.EventProxy,
Expand Down
20 changes: 18 additions & 2 deletions dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@ internal class LocalStep : IKernelProcessMessageChannel
protected Dictionary<string, Dictionary<string, object?>?>? _initialInputs = [];
protected Dictionary<string, List<KernelProcessEdge>> _outputEdges;

internal readonly IExternalKernelProcessMessageChannel? _externalMessageChannel;

/// <summary>
/// Represents a step in a process that is running in-process.
/// </summary>
/// <param name="stepInfo">An instance of <see cref="KernelProcessStepInfo"/></param>
/// <param name="kernel">Required. An instance of <see cref="Kernel"/>.</param>
/// <param name="parentProcessId">Optional. The Id of the parent process if one exists.</param>
public LocalStep(KernelProcessStepInfo stepInfo, Kernel kernel, string? parentProcessId = null)
public LocalStep(KernelProcessStepInfo stepInfo, Kernel kernel, string? parentProcessId = null, IExternalKernelProcessMessageChannel? externalMessageChannel = null)

Check failure on line 42 in dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs

View workflow job for this annotation

GitHub Actions / dotnet-build-and-test (8.0, ubuntu-latest, Release, true, integration)

Parameter 'externalMessageChannel' has no matching param tag in the XML comment for 'LocalStep.LocalStep(KernelProcessStepInfo, Kernel, string?, IExternalKernelProcessMessageChannel?)' (but other parameters do)
{
Verify.NotNull(kernel, nameof(kernel));
Verify.NotNull(stepInfo, nameof(stepInfo));
Expand All @@ -58,6 +60,13 @@ public LocalStep(KernelProcessStepInfo stepInfo, Kernel kernel, string? parentPr
this._logger = this._kernel.LoggerFactory?.CreateLogger(this._stepInfo.InnerStepType) ?? new NullLogger<LocalStep>();
this._outputEdges = this._stepInfo.Edges.ToDictionary(kvp => kvp.Key, kvp => kvp.Value.ToList());
this._eventNamespace = $"{this._stepInfo.State.Name}_{this._stepInfo.State.Id}";

this._externalMessageChannel = externalMessageChannel;
}

~LocalStep()
{
this._externalMessageChannel?.Uninitialize().GetAwaiter().GetResult();

Check failure on line 69 in dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs

View workflow job for this annotation

GitHub Actions / dotnet-build-and-test (8.0, ubuntu-latest, Release, true, integration)

ValueTask instances should not have their result directly accessed unless the instance has already completed. Unlike Tasks, calling Result or GetAwaiter().GetResult() on a ValueTask is not guaranteed to block until the operation completes. If you can't simply await the instance, consider first checking its IsCompleted property (or asserting it's true if you know that to be the case). (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/quality-rules/ca2012)

Check failure on line 69 in dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs

View workflow job for this annotation

GitHub Actions / dotnet-build-and-test (8.0, ubuntu-latest, Release, true, integration)

Synchronously waiting on tasks or awaiters may cause deadlocks. Use await or JoinableTaskFactory.Run instead. (https://github.com/Microsoft/vs-threading/blob/main/doc/analyzers/VSTHRD002.md)
}

/// <summary>
Expand Down Expand Up @@ -231,6 +240,13 @@ internal virtual async Task HandleMessageAsync(ProcessMessage message)
/// <exception cref="KernelException"></exception>
protected virtual async ValueTask InitializeStepAsync()
{
if (this._externalMessageChannel != null)
{
// initialize external message channel
// TODO: in LocalRuntime need to ensure initialization only happens once
await this._externalMessageChannel.Initialize().ConfigureAwait(false);
}

// Instantiate an instance of the inner step object
KernelProcessStep stepInstance = (KernelProcessStep)ActivatorUtilities.CreateInstance(this._kernel.Services, this._stepInfo.InnerStepType);
var kernelPlugin = KernelPluginFactory.CreateFromObject(stepInstance, pluginName: this._stepInfo.State.Name);
Expand All @@ -242,7 +258,7 @@ protected virtual async ValueTask InitializeStepAsync()
}

// Initialize the input channels
this._initialInputs = this.FindInputChannels(this._functions, this._logger);
this._initialInputs = this.FindInputChannels(this._functions, this._logger, this._externalMessageChannel);
this._inputs = this._initialInputs.ToDictionary(kvp => kvp.Key, kvp => kvp.Value?.ToDictionary(kvp => kvp.Key, kvp => kvp.Value));

// Activate the step with user-defined state if needed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Microsoft.SemanticKernel;
/// Class used to allow using <see cref="IExternalEventBuffer"/> as <see cref="IExternalKernelProcessMessageChannelEmitter"/>
/// in SK Process shared abstractions
/// </summary>
public class ExternalMessageBufferActorWrapper : IExternalKernelProcessMessageChannelEmitter
public class ExternalMessageBufferActorWrapper : IExternalKernelProcessMessageChannel
{
private readonly IExternalMessageBuffer _actor;

Expand All @@ -26,4 +26,16 @@ public async Task EmitExternalEventAsync(string externalTopicEvent, object? even
{
await this._actor.EmitExternalEventAsync(externalTopicEvent, eventData).ConfigureAwait(false);
}

public ValueTask Initialize()
{
// When using Dapr initialization is already taken care of by Dapr Actors
throw new System.NotImplementedException();
}

public ValueTask Uninitialize()
{
// When using Dapr uninitialization is already taken care of by Dapr Actors
throw new System.NotImplementedException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ protected virtual async ValueTask ActivateStepAsync()
}

// Creating external process channel actor to be used for external messaging by some steps
IExternalKernelProcessMessageChannelEmitter? externalMessageChannelActor = null;
IExternalKernelProcessMessageChannel? externalMessageChannelActor = null;
var scopedExternalMessageBufferId = this.ScopedActorId(new ActorId(this.Id.GetId()));
var actor = this.ProxyFactory.CreateActorProxy<IExternalMessageBuffer>(scopedExternalMessageBufferId, nameof(ExternalMessageBufferActor));
externalMessageChannelActor = new ExternalMessageBufferActorWrapper(actor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,9 @@ public override async Task<KernelProcess> GetStateAsync()
var daprProcessInfo = await this._daprProcess.GetProcessInfoAsync().ConfigureAwait(false);
return daprProcessInfo.ToKernelProcess();
}

public override Task<IExternalKernelProcessMessageChannel?> GetExternalMessageChannelAsync()
{
throw new NotImplementedException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,14 @@ public static void InitializeUserState(this KernelProcessStepState stateObject,
/// <param name="channel">The source channel to evaluate</param>
/// <param name="functions">A dictionary of KernelFunction instances.</param>
/// <param name="logger">An instance of <see cref="ILogger"/>.</param>
/// <param name="externalMessageChannel">An instance of <see cref="IExternalKernelProcessMessageChannelEmitter"/></param>
/// <param name="externalMessageChannel">An instance of <see cref="IExternalKernelProcessMessageChannel"/></param>
/// <returns><see cref="Dictionary{TKey, TValue}"/></returns>
/// <exception cref="InvalidOperationException"></exception>
public static Dictionary<string, Dictionary<string, object?>?> FindInputChannels(
this IKernelProcessMessageChannel channel,
Dictionary<string, KernelFunction> functions,
ILogger? logger,
IExternalKernelProcessMessageChannelEmitter? externalMessageChannel = null)
IExternalKernelProcessMessageChannel? externalMessageChannel = null)
{
if (functions is null)
{
Expand Down

0 comments on commit df98ee0

Please sign in to comment.