Skip to content

Commit 803f66d

Browse files
committed
feat(Solution): Added means to suspend, resume and cancel workflow instances
Signed-off-by: Charles d'Avernas <[email protected]>
2 parents 62460d1 + 71d9adb commit 803f66d

File tree

27 files changed

+272
-119
lines changed

27 files changed

+272
-119
lines changed

src/api/Synapse.Api.Client.Core/Services/IClusterResourceApiClient.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,15 @@ public interface IClusterResourceApiClient<TResource>
3838
/// <param name="labelSelectors">Defines the expected labels, if any, of the resources to watch</param>
3939
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
4040
/// <returns>A new <see cref="IAsyncEnumerable{T}"/> used to asynchronously enumerate resulting <see cref="IResourceWatchEvent"/>s</returns>
41-
Task<IAsyncEnumerable<IResourceWatchEvent<TResource>>> WatchAsync(IEnumerable<LabelSelector>? labelSelectors = null, CancellationToken cancellationToken = default);
41+
IAsyncEnumerable<IResourceWatchEvent<TResource>> WatchAsync(IEnumerable<LabelSelector>? labelSelectors = null, CancellationToken cancellationToken = default);
4242

4343
/// <summary>
4444
/// Monitors the resource with the specified name
4545
/// </summary>
4646
/// <param name="name">The name of the resource to monitor</param>
4747
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
4848
/// <returns>A new <see cref="IAsyncEnumerable{T}"/> used to asynchronously enumerate resulting <see cref="IResourceWatchEvent"/>s</returns>
49-
Task<IAsyncEnumerable<IResourceWatchEvent<TResource>>> MonitorAsync(string name, CancellationToken cancellationToken = default);
49+
IAsyncEnumerable<IResourceWatchEvent<TResource>> MonitorAsync(string name, CancellationToken cancellationToken = default);
5050

5151
/// <summary>
5252
/// Gets the resource with the specified name

src/api/Synapse.Api.Client.Core/Services/INamespacedResourceApiClient.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public interface INamespacedResourceApiClient<TResource>
4040
/// <param name="labelSelectors">Defines the expected labels, if any, of the resources to watch</param>
4141
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
4242
/// <returns>A new <see cref="IAsyncEnumerable{T}"/> used to asynchronously enumerate resulting <see cref="IResourceWatchEvent"/>s</returns>
43-
Task<IAsyncEnumerable<IResourceWatchEvent<TResource>>> WatchAsync(string? @namespace = null, IEnumerable<LabelSelector>? labelSelectors = null, CancellationToken cancellationToken = default);
43+
IAsyncEnumerable<IResourceWatchEvent<TResource>> WatchAsync(string? @namespace = null, IEnumerable<LabelSelector>? labelSelectors = null, CancellationToken cancellationToken = default);
4444

4545
/// <summary>
4646
/// Monitors the resource with the specified name
@@ -49,7 +49,7 @@ public interface INamespacedResourceApiClient<TResource>
4949
/// <param name="namespace">The namespace the resource to monitor belongs to</param>
5050
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
5151
/// <returns>A new <see cref="IAsyncEnumerable{T}"/> used to asynchronously enumerate resulting <see cref="IResourceWatchEvent"/>s</returns>
52-
Task<IAsyncEnumerable<IResourceWatchEvent<TResource>>> MonitorAsync(string name, string @namespace, CancellationToken cancellationToken = default);
52+
IAsyncEnumerable<IResourceWatchEvent<TResource>> MonitorAsync(string name, string @namespace, CancellationToken cancellationToken = default);
5353

5454
/// <summary>
5555
/// Gets the resource with the specified name

src/api/Synapse.Api.Client.Http/Services/ResourceHttpApiClient.cs

+46-15
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
// See the License for the specific language governing permissions and
1212
// limitations under the License.
1313

14+
using System.Runtime.CompilerServices;
15+
1416
namespace Synapse.Api.Client.Services;
1517

1618
/// <summary>
@@ -105,60 +107,89 @@ public virtual async Task<IAsyncEnumerable<TResource>> ListAsync(IEnumerable<Lab
105107
}
106108

107109
/// <inheritdoc/>
108-
public virtual async Task<IAsyncEnumerable<IResourceWatchEvent<TResource>>> WatchAsync(string? @namespace = null, IEnumerable<LabelSelector>? labelSelectors = null, CancellationToken cancellationToken = default)
110+
public virtual async IAsyncEnumerable<IResourceWatchEvent<TResource>> WatchAsync(string? @namespace = null, IEnumerable<LabelSelector>? labelSelectors = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
109111
{
110112
var resource = new TResource();
111-
var uri = string.IsNullOrWhiteSpace(@namespace) ? $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/watch" : $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/{@namespace}/watch";
113+
var uri = string.IsNullOrWhiteSpace(@namespace) ? $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/watch/sse" : $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/{@namespace}/watch";
112114
var queryStringArguments = new Dictionary<string, string>();
113115
if (labelSelectors?.Any() == true) queryStringArguments.Add("labelSelector", labelSelectors.Select(s => s.ToString()).Join(','));
114116
if (queryStringArguments.Count != 0) uri += $"?{queryStringArguments.Select(kvp => $"{kvp.Key}={kvp.Value}").Join('&')}";
115117
using var request = await this.ProcessRequestAsync(new HttpRequestMessage(HttpMethod.Get, uri), cancellationToken).ConfigureAwait(false);
116-
request.EnableWebAssemblyStreamingResponse();
117118
var response = await this.HttpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken);
118119
var responseStream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false);
119-
return this.JsonSerializer.DeserializeAsyncEnumerable<ResourceWatchEvent<TResource>>(responseStream, cancellationToken)!;
120+
using var streamReader = new StreamReader(await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false));
121+
while (!streamReader.EndOfStream)
122+
{
123+
var sseMessage = await streamReader.ReadLineAsync(cancellationToken).ConfigureAwait(false);
124+
if (string.IsNullOrWhiteSpace(sseMessage)) continue;
125+
var json = sseMessage["data: ".Length..].Trim();
126+
var e = JsonSerializer.Deserialize<ResourceWatchEvent<TResource>>(json)!;
127+
yield return e;
128+
}
120129
}
121130

122131
/// <inheritdoc/>
123-
public virtual async Task<IAsyncEnumerable<IResourceWatchEvent<TResource>>> WatchAsync(IEnumerable<LabelSelector>? labelSelectors = null, CancellationToken cancellationToken = default)
132+
public virtual async IAsyncEnumerable<IResourceWatchEvent<TResource>> WatchAsync(IEnumerable<LabelSelector>? labelSelectors = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
124133
{
125134
var resource = new TResource();
126-
var uri = $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/watch";
135+
var uri = $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/watch/sse";
127136
var queryStringArguments = new Dictionary<string, string>();
128137
if (labelSelectors?.Any() == true) queryStringArguments.Add("labelSelector", labelSelectors.Select(s => s.ToString()).Join(','));
129138
if (queryStringArguments.Count != 0) uri += $"?{queryStringArguments.Select(kvp => $"{kvp.Key}={kvp.Value}").Join('&')}";
130139
using var request = await this.ProcessRequestAsync(new HttpRequestMessage(HttpMethod.Get, uri), cancellationToken).ConfigureAwait(false);
131140
request.EnableWebAssemblyStreamingResponse();
132141
var response = await this.HttpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);
133142
var responseStream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false);
134-
return this.JsonSerializer.DeserializeAsyncEnumerable<ResourceWatchEvent<TResource>>(responseStream, cancellationToken)!;
143+
using var streamReader = new StreamReader(await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false));
144+
while (!streamReader.EndOfStream)
145+
{
146+
var sseMessage = await streamReader.ReadLineAsync(cancellationToken).ConfigureAwait(false);
147+
if (string.IsNullOrWhiteSpace(sseMessage)) continue;
148+
var json = sseMessage["data: ".Length..].Trim();
149+
var e = JsonSerializer.Deserialize<ResourceWatchEvent<TResource>>(json)!;
150+
yield return e;
151+
}
135152
}
136153

137154
/// <inheritdoc/>
138-
public virtual async Task<IAsyncEnumerable<IResourceWatchEvent<TResource>>> MonitorAsync(string name, string @namespace, CancellationToken cancellationToken = default)
155+
public virtual async IAsyncEnumerable<IResourceWatchEvent<TResource>> MonitorAsync(string name, string @namespace, [EnumeratorCancellation]CancellationToken cancellationToken = default)
139156
{
140157
ArgumentException.ThrowIfNullOrWhiteSpace(name);
141158
ArgumentException.ThrowIfNullOrWhiteSpace(@namespace);
142159
var resource = new TResource();
143-
var uri = $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/{@namespace}/{name}/monitor";
160+
var uri = $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/{@namespace}/{name}/monitor/sse";
144161
using var request = await this.ProcessRequestAsync(new HttpRequestMessage(HttpMethod.Get, uri), cancellationToken).ConfigureAwait(false);
145-
request.EnableWebAssemblyStreamingResponse();
146162
var response = await this.HttpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);
147163
var responseStream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false);
148-
return this.JsonSerializer.DeserializeAsyncEnumerable<ResourceWatchEvent<TResource>>(responseStream, cancellationToken)!;
164+
using var streamReader = new StreamReader(await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false));
165+
while (!streamReader.EndOfStream)
166+
{
167+
var sseMessage = await streamReader.ReadLineAsync(cancellationToken).ConfigureAwait(false);
168+
if (string.IsNullOrWhiteSpace(sseMessage)) continue;
169+
var json = sseMessage["data: ".Length..].Trim();
170+
var e = JsonSerializer.Deserialize<ResourceWatchEvent<TResource>>(json)!;
171+
yield return e;
172+
}
149173
}
150174

151175
/// <inheritdoc/>
152-
public virtual async Task<IAsyncEnumerable<IResourceWatchEvent<TResource>>> MonitorAsync(string name, CancellationToken cancellationToken = default)
176+
public virtual async IAsyncEnumerable<IResourceWatchEvent<TResource>> MonitorAsync(string name, [EnumeratorCancellation]CancellationToken cancellationToken = default)
153177
{
154178
ArgumentException.ThrowIfNullOrWhiteSpace(name);
155179
var resource = new TResource();
156-
var uri = $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/{name}/monitor";
180+
var uri = $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/{name}/monitor/sse";
157181
using var request = await this.ProcessRequestAsync(new HttpRequestMessage(HttpMethod.Get, uri), cancellationToken).ConfigureAwait(false);
158-
request.EnableWebAssemblyStreamingResponse();
159182
var response = await this.HttpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);
160183
var responseStream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false);
161-
return this.JsonSerializer.DeserializeAsyncEnumerable<ResourceWatchEvent<TResource>>(responseStream, cancellationToken)!;
184+
using var streamReader = new StreamReader(await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false));
185+
while (!streamReader.EndOfStream)
186+
{
187+
var sseMessage = await streamReader.ReadLineAsync(cancellationToken).ConfigureAwait(false);
188+
if (string.IsNullOrWhiteSpace(sseMessage)) continue;
189+
var json = sseMessage["data: ".Length..].Trim();
190+
var e = JsonSerializer.Deserialize<ResourceWatchEvent<TResource>>(json)!;
191+
yield return e;
192+
}
162193
}
163194

164195
/// <inheritdoc/>

src/api/Synapse.Api.Http/ClusterResourceController.cs

+6-4
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,10 @@ public virtual async Task<IActionResult> WatchResourcesUsingSSE(string? labelSel
108108
this.Response.Headers.ContentType = "text/event-stream";
109109
this.Response.Headers.CacheControl = "no-cache";
110110
this.Response.Headers.Connection = "keep-alive";
111-
await foreach (var e in response.Data!)
111+
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
112+
await foreach (var e in response.Data!.WithCancellation(cancellationToken))
112113
{
113-
var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\\n\\n";
114+
var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\n\n";
114115
await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false);
115116
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
116117
}
@@ -147,9 +148,10 @@ public virtual async Task<IActionResult> MonitorResourceUsingSSE(string name, Ca
147148
this.Response.Headers.ContentType = "text/event-stream";
148149
this.Response.Headers.CacheControl = "no-cache";
149150
this.Response.Headers.Connection = "keep-alive";
150-
await foreach (var e in response.Data!)
151+
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
152+
await foreach (var e in response.Data!.WithCancellation(cancellationToken))
151153
{
152-
var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\\n\\n";
154+
var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\n\n";
153155
await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false);
154156
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
155157
}

src/api/Synapse.Api.Http/NamespacedResourceController.cs

+11-16
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
// See the License for the specific language governing permissions and
1212
// limitations under the License.
1313

14+
using Neuroglia.Data.Infrastructure.ResourceOriented;
15+
1416
namespace Synapse.Api.Http;
1517

1618
/// <summary>
@@ -120,20 +122,11 @@ public virtual async Task<IActionResult> ListResources(string @namespace, string
120122
[HttpGet("watch")]
121123
[ProducesResponseType(typeof(IAsyncEnumerable<ResourceWatchEvent>), (int)HttpStatusCode.OK)]
122124
[ProducesErrorResponseType(typeof(Neuroglia.ProblemDetails))]
123-
public virtual async Task<IActionResult> WatchResources(string? labelSelector = null, CancellationToken cancellationToken = default)
125+
public virtual async Task<IAsyncEnumerable<IResourceWatchEvent<TResource>>> WatchResources(string? labelSelector = null, CancellationToken cancellationToken = default)
124126
{
125-
if (!this.TryParseLabelSelectors(labelSelector, out var labelSelectors)) return this.InvalidLabelSelector(labelSelector!);
127+
if (!this.TryParseLabelSelectors(labelSelector, out var labelSelectors)) throw new Exception($"Invalid label selector '{labelSelector}'");
126128
var response = await this.Mediator.ExecuteAsync(new WatchResourcesQuery<TResource>(null, labelSelectors), cancellationToken).ConfigureAwait(false);
127-
this.Response.Headers.ContentType = "text/event-stream";
128-
this.Response.Headers.CacheControl = "no-cache";
129-
this.Response.Headers.Connection = "keep-alive";
130-
await foreach (var e in response.Data!)
131-
{
132-
var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\\n\\n";
133-
await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false);
134-
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
135-
}
136-
return this.Ok();
129+
return response.Data!;
137130
}
138131

139132
/// <summary>
@@ -170,9 +163,10 @@ public virtual async Task<IActionResult> WatchResourcesUsingSSE(string @namespac
170163
this.Response.Headers.ContentType = "text/event-stream";
171164
this.Response.Headers.CacheControl = "no-cache";
172165
this.Response.Headers.Connection = "keep-alive";
173-
await foreach (var e in response.Data!)
166+
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
167+
await foreach (var e in response.Data!.WithCancellation(cancellationToken))
174168
{
175-
var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\\n\\n";
169+
var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\n\n";
176170
await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false);
177171
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
178172
}
@@ -211,9 +205,10 @@ public virtual async Task<IActionResult> MonitorResourceUsingSSE(string name, st
211205
this.Response.Headers.ContentType = "text/event-stream";
212206
this.Response.Headers.CacheControl = "no-cache";
213207
this.Response.Headers.Connection = "keep-alive";
214-
await foreach(var e in response.Data!)
208+
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
209+
await foreach(var e in response.Data!.WithCancellation(cancellationToken))
215210
{
216-
var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\\n\\n";
211+
var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\n\n";
217212
await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false);
218213
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
219214
}

src/cli/Synapse.Cli/Commands/WorkflowInstances/MonitorWorkflowInstancesCommand.cs

+1-2
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,7 @@ public MonitorWorkflowInstancesCommand(IServiceProvider serviceProvider, ILogger
6464
public async Task HandleAsync(string name, string @namespace, string output)
6565
{
6666
this.EnsureConfigured();
67-
var enumerable = await this.Api.WorkflowInstances.MonitorAsync(name, @namespace);
68-
await foreach (var e in enumerable)
67+
await foreach (var e in this.Api.WorkflowInstances.MonitorAsync(name, @namespace))
6968
{
7069
string outputText = output.ToLowerInvariant() switch
7170
{

src/dashboard/Synapse.Dashboard/Components/DocumentDetails/Store.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ public async Task OnCopyToClipboard()
372372
try
373373
{
374374
await this.JSRuntime.InvokeVoidAsync("navigator.clipboard.writeText", text);
375-
this.ToastService.Notify(new(ToastType.Success, "Definition copied to the clipboard!"));
375+
this.ToastService.Notify(new(ToastType.Success, "Copied to the clipboard!"));
376376
}
377377
catch (Exception ex)
378378
{

src/dashboard/Synapse.Dashboard/Components/MonacoEditor/MonacoEditorHelper.cs

-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ public Func<StandaloneCodeEditor, StandaloneEditorConstructionOptions> GetStanda
4242
{
4343
Other = "true",
4444
Strings = "true",
45-
Comments = "fasle"
4645
}
4746
};
4847
}

src/dashboard/Synapse.Dashboard/Components/MonacoEditor/Store.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ public async Task OnCopyToClipboard()
290290
try
291291
{
292292
await this.JSRuntime.InvokeVoidAsync("navigator.clipboard.writeText", text);
293-
this.ToastService.Notify(new(ToastType.Success, "Definition copied to the clipboard!"));
293+
this.ToastService.Notify(new(ToastType.Success, "Copied to the clipboard!"));
294294
}
295295
catch (Exception ex)
296296
{

src/dashboard/Synapse.Dashboard/Components/ResourceEditor/ResourceEditor.razor

+2-2
Original file line numberDiff line numberDiff line change
@@ -256,9 +256,9 @@
256256
try
257257
{
258258
await this.JSRuntime.InvokeVoidAsync("navigator.clipboard.writeText", text);
259-
this.ToastService.Notify(new(ToastType.Success, "Definition copied to the clipboard!"));
259+
this.ToastService.Notify(new(ToastType.Success, "Copied to the clipboard!"));
260260
}
261-
catch (Exception ex)
261+
catch
262262
{
263263
this.ToastService.Notify(new(ToastType.Danger, "Failed to copy the definition to the clipboard."));
264264
}

0 commit comments

Comments
 (0)