Skip to content

[Blazor] Adds support for pausing and resuming circuits with support for pushing the state to the client #62271

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Jun 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions src/Components/Server/src/Circuits/CircuitClientProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@

namespace Microsoft.AspNetCore.Components.Server.Circuits;

internal sealed class CircuitClientProxy : IClientProxy
internal sealed class CircuitClientProxy : ISingleClientProxy
{
public CircuitClientProxy()
{
Connected = false;
}

public CircuitClientProxy(IClientProxy clientProxy, string connectionId)
public CircuitClientProxy(ISingleClientProxy clientProxy, string connectionId)
{
Transfer(clientProxy, connectionId);
}
Expand All @@ -21,9 +21,9 @@ public CircuitClientProxy(IClientProxy clientProxy, string connectionId)

public string ConnectionId { get; private set; }

public IClientProxy Client { get; private set; }
public ISingleClientProxy Client { get; private set; }

public void Transfer(IClientProxy clientProxy, string connectionId)
public void Transfer(ISingleClientProxy clientProxy, string connectionId)
{
Client = clientProxy ?? throw new ArgumentNullException(nameof(clientProxy));
ConnectionId = connectionId ?? throw new ArgumentNullException(nameof(connectionId));
Expand All @@ -44,4 +44,13 @@ public Task SendCoreAsync(string method, object[] args, CancellationToken cancel

return Client.SendCoreAsync(method, args, cancellationToken);
}

public Task<T> InvokeCoreAsync<T>(string method, object[] args, CancellationToken cancellationToken = default)
{
if (Client == null)
{
throw new InvalidOperationException($"{nameof(InvokeCoreAsync)} cannot be invoked with an offline client.");
}
return Client.InvokeCoreAsync<T>(method, args, cancellationToken);
}
}
22 changes: 22 additions & 0 deletions src/Components/Server/src/Circuits/CircuitHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,25 @@ internal PersistedCircuitState TakePersistedCircuitState()
return result;
}

internal async Task<bool> SendPersistedStateToClient(string rootComponents, string applicationState, CancellationToken cancellation)
{
try
{
var succeded = await Client.InvokeAsync<bool>(
"JS.SavePersistedState",
CircuitId.Secret,
rootComponents,
applicationState,
cancellationToken: cancellation);
return succeded;
}
catch (Exception ex)
{
Log.FailedToSaveStateToClient(_logger, CircuitId, ex);
return false;
}
}

private static partial class Log
{
// 100s used for lifecycle stuff
Expand Down Expand Up @@ -1048,5 +1067,8 @@ public static void BeginInvokeDotNetFailed(ILogger logger, string callId, string

[LoggerMessage(219, LogLevel.Error, "Location change to '{URI}' in circuit '{CircuitId}' failed.", EventName = "LocationChangeFailedInCircuit")]
public static partial void LocationChangeFailedInCircuit(ILogger logger, string uri, CircuitId circuitId, Exception exception);

[LoggerMessage(220, LogLevel.Debug, "Failed to save state to client in circuit '{CircuitId}'.", EventName = "FailedToSaveStateToClient")]
public static partial void FailedToSaveStateToClient(ILogger logger, CircuitId circuitId, Exception exception);
}
}
72 changes: 66 additions & 6 deletions src/Components/Server/src/Circuits/CircuitPersistenceManager.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
using Microsoft.AspNetCore.Components.Endpoints;
using Microsoft.AspNetCore.Components.Infrastructure;
using Microsoft.AspNetCore.Components.Web;
using Microsoft.AspNetCore.DataProtection;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;

Expand All @@ -14,9 +16,10 @@ namespace Microsoft.AspNetCore.Components.Server.Circuits;
internal partial class CircuitPersistenceManager(
IOptions<CircuitOptions> circuitOptions,
ServerComponentSerializer serverComponentSerializer,
ICircuitPersistenceProvider circuitPersistenceProvider)
ICircuitPersistenceProvider circuitPersistenceProvider,
IDataProtectionProvider dataProtectionProvider)
{
public async Task PauseCircuitAsync(CircuitHost circuit, CancellationToken cancellation = default)
public async Task PauseCircuitAsync(CircuitHost circuit, bool saveStateToClient = false, CancellationToken cancellation = default)
{
var renderer = circuit.Renderer;
var persistenceManager = circuit.Services.GetRequiredService<ComponentStatePersistenceManager>();
Expand All @@ -27,10 +30,67 @@ public async Task PauseCircuitAsync(CircuitHost circuit, CancellationToken cance

await persistenceManager.PersistStateAsync(collector, renderer);

await circuitPersistenceProvider.PersistCircuitAsync(
circuit.CircuitId,
collector.PersistedCircuitState,
cancellation);
if (saveStateToClient)
{
await SaveStateToClient(circuit, collector.PersistedCircuitState, cancellation);
}
else
{
await circuitPersistenceProvider.PersistCircuitAsync(
circuit.CircuitId,
collector.PersistedCircuitState,
cancellation);
}
}

internal async Task SaveStateToClient(CircuitHost circuit, PersistedCircuitState state, CancellationToken cancellation = default)
{
var (rootComponents, applicationState) = await ToProtectedStateAsync(state);
if (!await circuit.SendPersistedStateToClient(rootComponents, applicationState, cancellation))
{
try
{
await circuitPersistenceProvider.PersistCircuitAsync(
circuit.CircuitId,
state,
cancellation);
}
catch (Exception)
{
// At this point, we give up as we haven't been able to save the state to the client nor the server.
return;
}
}
}

internal async Task<(string rootComponents, string applicationState)> ToProtectedStateAsync(PersistedCircuitState state)
{
// Root components descriptors are already protected and serialized as JSON, we just convert the bytes to a string.
var rootComponents = Encoding.UTF8.GetString(state.RootComponents);
Comment on lines +68 to +69
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we avoid encoding/decoding root component state by serializing it directly to a string rather than to a byte[] first via PersistedComponentState.PersistAsJson()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PersistedComponentState.PersistAsJson()

This will serialize it into a byte array in any case.

I don't think it's super important given that this won't be a super common thing and that we already serialize/stringify 2 or 3 times more than necessary.

I feel that it's better to not do this right now and wait for a time to optimize serialization paths all the way through as a separate piece of work (post 10)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that it's better to not do this right now and wait for a time to optimize serialization paths all the way through as a separate piece of work (post 10)

Sounds good to me 🙂

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've filed #62312 to cover going through this stuff across the board


// The application state we protect in the same way we do for prerendering.
var store = new ProtectedPrerenderComponentApplicationStore(dataProtectionProvider);
await store.PersistStateAsync(state.ApplicationState);

return (rootComponents, store.PersistedState);
}

internal PersistedCircuitState FromProtectedState(string rootComponents, string applicationState)
{
var rootComponentsBytes = Encoding.UTF8.GetBytes(rootComponents);
var prerenderedState = new ProtectedPrerenderComponentApplicationStore(applicationState, dataProtectionProvider);
var state = new PersistedCircuitState
{
RootComponents = rootComponentsBytes,
ApplicationState = prerenderedState.ExistingState
};

return state;
}

internal ProtectedPrerenderComponentApplicationStore ToComponentApplicationStore(Dictionary<string, byte[]> applicationState)
{
return new ProtectedPrerenderComponentApplicationStore(applicationState, dataProtectionProvider);
}

public async Task<PersistedCircuitState> ResumeCircuitAsync(CircuitId circuitId, CancellationToken cancellation = default)
Expand Down
76 changes: 69 additions & 7 deletions src/Components/Server/src/Circuits/CircuitRegistry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public void RegisterDisconnectedCircuit(CircuitHost circuitHost)
// 1. If the circuit is not found return null
// 2. If the circuit is found, but fails to connect, we need to dispose it here and return null
// 3. If everything goes well, return the circuit.
public virtual async Task<CircuitHost> ConnectAsync(CircuitId circuitId, IClientProxy clientProxy, string connectionId, CancellationToken cancellationToken)
public virtual async Task<CircuitHost> ConnectAsync(CircuitId circuitId, ISingleClientProxy clientProxy, string connectionId, CancellationToken cancellationToken)
{
Log.CircuitConnectStarted(_logger, circuitId);

Expand Down Expand Up @@ -228,7 +228,7 @@ public virtual async Task<CircuitHost> ConnectAsync(CircuitId circuitId, IClient
}
}

protected virtual (CircuitHost circuitHost, bool previouslyConnected) ConnectCore(CircuitId circuitId, IClientProxy clientProxy, string connectionId)
protected virtual (CircuitHost circuitHost, bool previouslyConnected) ConnectCore(CircuitId circuitId, ISingleClientProxy clientProxy, string connectionId)
{
if (ConnectedCircuits.TryGetValue(circuitId, out var connectedCircuitHost))
{
Expand Down Expand Up @@ -281,7 +281,7 @@ protected virtual void OnEntryEvicted(object key, object value, EvictionReason r
}
}

private async Task PauseAndDisposeCircuitEntry(DisconnectedCircuitEntry entry)
private Task PauseAndDisposeCircuitEntry(DisconnectedCircuitEntry entry)
{
DisposeTokenSource(entry);

Expand All @@ -291,20 +291,76 @@ private async Task PauseAndDisposeCircuitEntry(DisconnectedCircuitEntry entry)
{
// Only pause and persist the circuit state if it has been active at some point,
// meaning that the client called UpdateRootComponents on it.
await _circuitPersistenceManager.PauseCircuitAsync(entry.CircuitHost);
var circuitHost = entry.CircuitHost;
return PauseAndDisposeCircuitHost(circuitHost, saveStateToClient: false);
}
else
{
Log.PersistedCircuitStateDiscarded(_logger, entry.CircuitHost.CircuitId);
}

entry.CircuitHost.UnhandledException -= CircuitHost_UnhandledException;
await entry.CircuitHost.DisposeAsync();
}
catch (Exception ex)
{
Log.UnhandledExceptionDisposingCircuitHost(_logger, ex);
}

return Task.CompletedTask;
}

private async Task PauseAndDisposeCircuitHost(CircuitHost circuitHost, bool saveStateToClient)
{
await _circuitPersistenceManager.PauseCircuitAsync(circuitHost, saveStateToClient);
circuitHost.UnhandledException -= CircuitHost_UnhandledException;
await circuitHost.DisposeAsync();
}

internal async Task PauseCircuitAsync(
CircuitHost circuitHost,
string connectionId)
{
try
{
Log.CircuitPauseStarted(_logger, circuitHost.CircuitId, connectionId);

Task pauseTask;
lock (CircuitRegistryLock)
{
pauseTask = PauseCore(circuitHost, connectionId);
}
await pauseTask;
}
catch (Exception)
{
Log.CircuitPauseFailed(_logger, circuitHost.CircuitId, connectionId);
}
}

internal virtual Task PauseCore(CircuitHost circuitHost, string connectionId)
{
var circuitId = circuitHost.CircuitId;
if (!ConnectedCircuits.TryGetValue(circuitId, out circuitHost))
{
Log.CircuitNotActive(_logger, circuitId);

// Circuit should be in the connected state for pausing.
return Task.CompletedTask;
}

if (!string.Equals(circuitHost.Client.ConnectionId, connectionId, StringComparison.Ordinal))
{
// Circuit should be connected to the same connection for pausing.
Log.CircuitConnectedToDifferentConnection(_logger, circuitId, circuitHost.Client.ConnectionId);

// The circuit is associated with a different connection. One way this could happen is when
// the client reconnects with a new connection before the OnDisconnect for the older
// connection is executed. Do nothing
return Task.CompletedTask;
}

var removeResult = ConnectedCircuits.TryRemove(circuitId, out _);
Debug.Assert(removeResult, "This operation operates inside of a lock. We expect the previously inspected value to be still here.");

return PauseAndDisposeCircuitHost(circuitHost, saveStateToClient: true);
}

private void DisposeTokenSource(DisconnectedCircuitEntry entry)
Expand Down Expand Up @@ -430,5 +486,11 @@ public static void ExceptionDisposingTokenSource(ILogger logger, Exception excep

[LoggerMessage(116, LogLevel.Debug, "Circuit {CircuitId} was not resumed. Persisted circuit state for {CircuitId} discarded.", EventName = "PersistedCircuitStateDiscarded")]
public static partial void PersistedCircuitStateDiscarded(ILogger logger, CircuitId circuitId);

[LoggerMessage(117, LogLevel.Debug, "Pausing circuit with id {CircuitId} from connection {ConnectionId}.", EventName = "CircuitPauseStarted")]
public static partial void CircuitPauseStarted(ILogger logger, CircuitId circuitId, string connectionId);

[LoggerMessage(118, LogLevel.Debug, "Failed to pause circuit with id {CircuitId} from connection {ConnectionId}.", EventName = "CircuitPauseFailed")]
public static partial void CircuitPauseFailed(ILogger logger, CircuitId circuitId, string connectionId);
}
}
53 changes: 46 additions & 7 deletions src/Components/Server/src/ComponentHub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,19 @@ public async ValueTask<string> ResumeCircuit(
return null;
}
}
else if (!RootComponentIsEmpty(rootComponents) || !string.IsNullOrEmpty(applicationState))
else if (!RootComponentIsEmpty(rootComponents) && !string.IsNullOrEmpty(applicationState))
{
persistedCircuitState = _circuitPersistenceManager.FromProtectedState(rootComponents, applicationState);
if (persistedCircuitState == null)
{
// If we couldn't deserialize the persisted state, signal that.
Log.InvalidInputData(_logger);
await NotifyClientError(Clients.Caller, "The root components or application state provided are invalid.");
Context.Abort();
return null;
}
}
else
{
Log.InvalidInputData(_logger);
await NotifyClientError(
Expand All @@ -335,12 +347,6 @@ await NotifyClientError(
Context.Abort();
return null;
}
else
{
// For now abort, since we currently don't support resuming circuits persisted to the client.
Context.Abort();
return null;
}

try
{
Expand Down Expand Up @@ -389,6 +395,39 @@ static bool RootComponentIsEmpty(string rootComponents) =>
string.IsNullOrEmpty(rootComponents) || rootComponents == "[]";
}

// Client initiated pauses work as follows:
// * The client calls PauseCircuit, we dissasociate the circuit from the connection.
// * We trigger the circuit pause to collect the current root components and dispose the current circuit.
// * We push the current root components and application state to the client.
// * If that succeeds, the client receives the state and we are done.
// * If that fails, we will fall back to the server-side cache storage.
// * The client will disconnect after receiving the state or after a 30s timeout.
// * From that point on, it can choose to resume the circuit by calling ResumeCircuit with or without the state
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if client calls the resume() in the middle of pending pause->push to client workflow?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We push to the client when the client initiates the pause explicitly, so at that point the client must have called pause() which resume() is aware of and will await until pause() completes before trying the resume() operation.

// depending on whether the transfer was successful.
// * Most of the time we expect the state push to succeed, if that fails, the possibilites are:
// * Client tries to resume before the state has been saved to the server-side cache storage.
// * Resumption fails as the state is not there.
// * The state eventually makes it to the server-side cache storage, but the client will have already given up and
// the state will eventually go away by virtue of the cache expiration policy on it.
// * The state has been saved to the server-side cache storage. This is what we expect to happen most of the time in the
// rare event that the client push fails.
// * This case becomes equivalent to the "ungraceful pause" case, where the client has no state and the server has the state.
public async ValueTask<bool> PauseCircuit()
{
var circuitHost = await GetActiveCircuitAsync();
if (circuitHost == null)
{
return false;
}

_ = _circuitRegistry.PauseCircuitAsync(circuitHost, Context.ConnectionId);

// This only signals that pausing the circuit has started.
// The client will receive the root components and application state in a separate message
// from the server.
return true;
}

public async ValueTask BeginInvokeDotNetFromJS(string callId, string assemblyName, string methodIdentifier, long dotNetObjectId, string argsJson)
{
var circuitHost = await GetActiveCircuitAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,8 @@ private static CircuitPersistenceManager CreatePersistenceManager()
var circuitPersistenceManager = new CircuitPersistenceManager(
Options.Create(new CircuitOptions()),
new Endpoints.ServerComponentSerializer(new EphemeralDataProtectionProvider()),
Mock.Of<ICircuitPersistenceProvider>());
Mock.Of<ICircuitPersistenceProvider>(),
new EphemeralDataProtectionProvider());
return circuitPersistenceManager;
}
}
Loading
Loading