Skip to content

Commit d57ecae

Browse files
authored
Merge pull request #394 from serverlessworkflow/feat-cloud-event-publisher
Add a new ICloudEventPublisher service
2 parents 5bc3a56 + 60a8dd1 commit d57ecae

File tree

6 files changed

+221
-26
lines changed

6 files changed

+221
-26
lines changed

src/api/Synapse.Api.Application/Commands/Events/PublishCloudEventCommand.cs

Lines changed: 4 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,8 @@
1111
// See the License for the specific language governing permissions and
1212
// limitations under the License.
1313

14-
using Microsoft.Extensions.Logging;
15-
using Microsoft.Extensions.Options;
1614
using Neuroglia.Eventing.CloudEvents;
17-
using Neuroglia.Serialization;
18-
using Synapse.Api.Application.Configuration;
19-
using System.Text;
15+
using Synapse.Api.Application.Services;
2016

2117
namespace Synapse.Api.Application.Commands.Events;
2218

@@ -38,32 +34,15 @@ public class PublishCloudEventCommand(CloudEvent e)
3834
/// <summary>
3935
/// Represents the service used to handle <see cref="PublishCloudEventCommand"/>s
4036
/// </summary>
41-
/// <param name="logger">The service used to perform logging</param>
42-
/// <param name="options">The service used to access the current <see cref="ApiServerOptions"/></param>
43-
/// <param name="jsonSerializer">The service used to serialize/deserialize data to/from JSON</param>
44-
/// <param name="httpClient">The service used to perform HTTP requests</param>
45-
public class PublishCloudEventCommandHandler(ILogger<PublishCloudEventCommandHandler> logger, IOptions<ApiServerOptions> options, IJsonSerializer jsonSerializer, HttpClient httpClient)
37+
/// <param name="cloudEventPublisher">The service used to publish <see cref="CloudEvent"/>s</param>
38+
public class PublishCloudEventCommandHandler(ICloudEventPublisher cloudEventPublisher)
4639
: ICommandHandler<PublishCloudEventCommand>
4740
{
4841

4942
/// <inheritdoc/>
5043
public virtual async Task<IOperationResult> HandleAsync(PublishCloudEventCommand command, CancellationToken cancellationToken = default)
5144
{
52-
if (options.Value.CloudEvents.Endpoint == null)
53-
{
54-
logger.LogWarning("No endpoint configured for cloud events. Event will not be published.");
55-
return this.Ok();
56-
}
57-
var json = jsonSerializer.SerializeToText(command.CloudEvent);
58-
using var content = new StringContent(json, Encoding.UTF8, CloudEventContentType.Json);
59-
using var request = new HttpRequestMessage(HttpMethod.Post, options.Value.CloudEvents.Endpoint) { Content = content };
60-
using var response = await httpClient.SendAsync(request, cancellationToken).ConfigureAwait(false);
61-
json = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false);
62-
if (!response.IsSuccessStatusCode)
63-
{
64-
logger.LogError("An error occurred while publishing the cloud event with id '{eventId}' to the configure endpoint '{endpoint}': {ex}", command.CloudEvent.Id, options.Value.CloudEvents.Endpoint, json);
65-
response.EnsureSuccessStatusCode();
66-
}
45+
await cloudEventPublisher.PublishAsync(command.CloudEvent, cancellationToken).ConfigureAwait(false);
6746
return this.Ok();
6847
}
6948

src/api/Synapse.Api.Application/Extensions/IServiceCollectionExtensions.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// limitations under the License.
1313

1414
using Microsoft.Extensions.DependencyInjection;
15+
using Microsoft.Extensions.Hosting;
1516
using Neuroglia.Data.Infrastructure;
1617
using Neuroglia.Security;
1718
using Synapse.Api.Application.Commands.Documents;
@@ -21,6 +22,7 @@
2122
using Synapse.Api.Application.Queries.Resources.Generic;
2223
using Synapse.Api.Application.Queries.Users;
2324
using Synapse.Api.Application.Queries.WorkflowInstances;
25+
using Synapse.Api.Application.Services;
2426
using Synapse.Resources;
2527

2628
namespace Synapse.Api.Application;
@@ -41,6 +43,10 @@ public static IServiceCollection AddSynapseApi(this IServiceCollection services)
4143
services.AddApiCommands();
4244
services.AddApiQueries();
4345

46+
services.AddSingleton<CloudEventPublisher>();
47+
services.AddSingleton<ICloudEventPublisher>(provider => provider.GetRequiredService<CloudEventPublisher>());
48+
services.AddSingleton<IHostedService>(provider => provider.GetRequiredService<CloudEventPublisher>());
49+
4450
return services;
4551
}
4652

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
// Copyright © 2024-Present The Synapse Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"),
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
using Microsoft.Extensions.Hosting;
15+
using Microsoft.Extensions.Logging;
16+
using Microsoft.Extensions.Options;
17+
using Neuroglia.Eventing.CloudEvents;
18+
using Neuroglia.Serialization;
19+
using Polly;
20+
using Synapse.Api.Application.Configuration;
21+
using System.Text;
22+
using System.Threading.Channels;
23+
24+
namespace Synapse.Api.Application.Services;
25+
26+
/// <summary>
27+
/// Represents the default implementation of the <see cref="ICloudEventPublisher"/> interface
28+
/// </summary>
29+
/// <param name="logger">The service used to perform logging</param>
30+
/// <param name="options">The service used to access the current <see cref="ApiServerOptions"/></param>
31+
/// <param name="jsonSerializer">The service used to serialize/deserialize data to/from JSON</param>
32+
/// <param name="httpClient">The service used to perform HTTP requests</param>
33+
public class CloudEventPublisher(ILogger<CloudEventPublisher> logger, IJsonSerializer jsonSerializer, IOptions<ApiServerOptions> options, HttpClient httpClient)
34+
: IHostedService, ICloudEventPublisher, IDisposable, IAsyncDisposable
35+
{
36+
37+
bool _disposed;
38+
39+
/// <summary>
40+
/// Gets the service used to perform logging
41+
/// </summary>
42+
protected ILogger Logger { get; } = logger;
43+
44+
/// <summary>
45+
/// Gets the service used to serialize/deserialize data to/from JSON
46+
/// </summary>
47+
protected IJsonSerializer JsonSerializer { get; } = jsonSerializer;
48+
49+
/// <summary>
50+
/// Gets the current <see cref="ApiServerOptions"/>
51+
/// </summary>
52+
protected ApiServerOptions Options { get; } = options.Value;
53+
54+
/// <summary>
55+
/// Gets the service used to perform HTTP requests
56+
/// </summary>
57+
protected HttpClient HttpClient { get; } = httpClient;
58+
59+
/// <summary>
60+
/// Gets the <see cref="CloudEventPublisher"/>'s <see cref="System.Threading.CancellationTokenSource"/>
61+
/// </summary>
62+
protected CancellationTokenSource CancellationTokenSource { get; } = new();
63+
64+
/// <summary>
65+
/// Gets the <see cref="Channel{T}"/> used to enqueue <see cref="CloudEvent"/>s to publish
66+
/// </summary>
67+
protected Channel<CloudEvent> Channel { get; } = System.Threading.Channels.Channel.CreateUnbounded<CloudEvent>();
68+
69+
/// <inheritdoc/>
70+
public virtual Task StartAsync(CancellationToken cancellationToken)
71+
{
72+
if (options.Value.CloudEvents.Endpoint == null) logger.LogWarning("No endpoint configured for cloud events. Events will not be published.");
73+
else _ = this.PublishEnqueuedEventsAsync();
74+
return Task.CompletedTask;
75+
}
76+
77+
/// <inheritdoc/>
78+
public virtual async Task PublishAsync(CloudEvent e, CancellationToken cancellationToken = default)
79+
{
80+
ArgumentNullException.ThrowIfNull(e);
81+
await this.Channel.Writer.WriteAsync(e, cancellationToken).ConfigureAwait(false);
82+
}
83+
84+
/// <inheritdoc/>
85+
public virtual Task StopAsync(CancellationToken cancellationToken)
86+
{
87+
this.CancellationTokenSource.Cancel();
88+
return Task.CompletedTask;
89+
}
90+
91+
/// <summary>
92+
/// Publishes enqueued <see cref="CloudEvent"/>s
93+
/// </summary>
94+
/// <returns>A new awaitable <see cref="Task"/></returns>
95+
protected virtual async Task PublishEnqueuedEventsAsync()
96+
{
97+
var policy = Policy
98+
.Handle<HttpRequestException>()
99+
.OrResult<HttpResponseMessage>(r => !r.IsSuccessStatusCode)
100+
.WaitAndRetryAsync(5, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)))
101+
.WrapAsync(Policy
102+
.Handle<HttpRequestException>()
103+
.OrResult<HttpResponseMessage>(r => !r.IsSuccessStatusCode)
104+
.CircuitBreakerAsync(3, TimeSpan.FromSeconds(5)));
105+
while (!this.CancellationTokenSource.IsCancellationRequested)
106+
{
107+
try
108+
{
109+
var e = await this.Channel.Reader.ReadAsync(this.CancellationTokenSource.Token).ConfigureAwait(false);
110+
if (e == null)
111+
{
112+
await Task.Delay(10);
113+
continue;
114+
}
115+
var json = this.JsonSerializer.SerializeToText(e);
116+
using var content = new StringContent(json, Encoding.UTF8, CloudEventContentType.Json);
117+
using var request = new HttpRequestMessage(HttpMethod.Post, options.Value.CloudEvents.Endpoint) { Content = content };
118+
using var response = await policy.ExecuteAsync(async () => await this.HttpClient.SendAsync(request, this.CancellationTokenSource.Token).ConfigureAwait(false));
119+
json = await response.Content.ReadAsStringAsync(this.CancellationTokenSource.Token).ConfigureAwait(false);
120+
if (!response.IsSuccessStatusCode)
121+
{
122+
logger.LogError("An error occurred while publishing the cloud event with id '{eventId}' to the configure endpoint '{endpoint}': {ex}", e.Id, options.Value.CloudEvents.Endpoint, json);
123+
response.EnsureSuccessStatusCode();
124+
}
125+
}
126+
catch(Exception ex)
127+
{
128+
logger.LogError(ex, "An exception occurred while publishing cloud events");
129+
//todo: start persisting following events to disk
130+
}
131+
}
132+
}
133+
134+
/// <summary>
135+
/// Disposes of the <see cref="CloudEventPublisher"/>
136+
/// </summary>
137+
/// <param name="disposing">A boolean indicating whether or not the <see cref="CloudEventPublisher"/> is being disposed of</param>
138+
protected virtual async ValueTask DisposeAsync(bool disposing)
139+
{
140+
if (!this._disposed) return;
141+
if (disposing)
142+
{
143+
this.CancellationTokenSource.Dispose();
144+
}
145+
this._disposed = true;
146+
await Task.CompletedTask.ConfigureAwait(false);
147+
}
148+
149+
/// <inheritdoc/>
150+
public async ValueTask DisposeAsync()
151+
{
152+
await this.DisposeAsync(disposing: true).ConfigureAwait(false);
153+
GC.SuppressFinalize(this);
154+
}
155+
156+
/// <summary>
157+
/// Disposes of the <see cref="CloudEventPublisher"/>
158+
/// </summary>
159+
/// <param name="disposing">A boolean indicating whether or not the <see cref="CloudEventPublisher"/> is being disposed of</param>
160+
protected virtual void Dispose(bool disposing)
161+
{
162+
if (!this._disposed) return;
163+
if (disposing)
164+
{
165+
this.CancellationTokenSource.Dispose();
166+
}
167+
this._disposed = true;
168+
}
169+
170+
/// <inheritdoc/>
171+
public void Dispose()
172+
{
173+
this.Dispose(disposing: true);
174+
GC.SuppressFinalize(this);
175+
}
176+
177+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// Copyright © 2024-Present The Synapse Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"),
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
using Neuroglia.Eventing.CloudEvents;
15+
16+
namespace Synapse.Api.Application.Services;
17+
18+
/// <summary>
19+
/// Defines the fundamentals of a service used to publish <see cref="CloudEvent"/>s
20+
/// </summary>
21+
public interface ICloudEventPublisher
22+
{
23+
24+
/// <summary>
25+
/// Published the specified <see cref="CloudEvent"/>
26+
/// </summary>
27+
/// <param name="e">The <see cref="CloudEvent"/> to publish</param>
28+
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
29+
/// <returns>A new awaitable <see cref="Task"/></returns>
30+
Task PublishAsync(CloudEvent e, CancellationToken cancellationToken = default);
31+
32+
}

src/api/Synapse.Api.Application/Services/StaticBearerAuthenticationHandler.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,4 @@ protected override Task<AuthenticateResult> HandleAuthenticateAsync()
4444
}
4545
return Task.FromResult(AuthenticateResult.Fail("Invalid token"));
4646
}
47-
}
47+
}

src/api/Synapse.Api.Application/Synapse.Api.Application.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
<ItemGroup>
3131
<PackageReference Include="IdentityServer4" Version="4.1.2" />
3232
<PackageReference Include="IdentityServer4.Storage" Version="4.1.2" />
33+
<PackageReference Include="Polly" Version="8.4.0" />
3334
</ItemGroup>
3435

3536
<ItemGroup>

0 commit comments

Comments
 (0)