Skip to content

AsyncAPI Sample #7611

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 40 commits into from
Aug 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
d69203b
first checkin
jpalac Aug 5, 2025
461108c
publish / subscribe work in basic form
jpalac Aug 6, 2025
f104d34
got featur working ok
jpalac Aug 6, 2025
1ded5ef
Add Open API + Minimal API
jasontaylordev Aug 6, 2025
ead7916
Add AsyncAPI
jasontaylordev Aug 6, 2025
539cf77
Remove Saunter
jasontaylordev Aug 6, 2025
7c6bd3c
AsyncAPI sample using reflection (#7619)
jpalac Aug 11, 2025
8ad8a3d
cleanup TODOs
jpalac Aug 11, 2025
259dad6
remove unused code
jpalac Aug 11, 2025
d3a389f
Clean-up namespaces
jasontaylordev Aug 11, 2025
67d4552
Fix async lambda in ForEach by using foreach with awaited operations
jasontaylordev Aug 11, 2025
d70d027
Stop application on other key press.
jasontaylordev Aug 11, 2025
68b5307
Describe the AsyncAPI concepts
jasontaylordev Aug 11, 2025
90da332
Apply standard approach (don't make me think)
jasontaylordev Aug 11, 2025
e9d82f5
Rename subscriber events and add more snippets and content to sample.md
jpalac Aug 11, 2025
bfe77cc
add asyncapi to shape the future
jpalac Aug 11, 2025
a2d8947
cleanup subscriber project doc
jpalac Aug 11, 2025
185a29d
move message conventions into a folder, add content to sample.md
jpalac Aug 12, 2025
57f436b
Remove unused method
jasontaylordev Aug 12, 2025
b24095f
Improve comments
jasontaylordev Aug 12, 2025
709d461
add wording about the message being sent locally
jpalac Aug 12, 2025
5efa66c
Merge branch 'asyncapi-demo' of https://github.com/Particular/docs.pa…
jpalac Aug 12, 2025
4d64b8d
Update sample docs and code
jasontaylordev Aug 12, 2025
5b40858
Remove bold
jasontaylordev Aug 12, 2025
64f70b2
shape the future wording
jpalac Aug 12, 2025
cd003fa
Merge branch 'asyncapi-demo' of https://github.com/Particular/docs.pa…
jpalac Aug 12, 2025
2600198
Nitpicks
andreasohlund Aug 12, 2025
5848ad5
AsyncAPI demo with simple and complex versions (#7731)
jpalac Aug 13, 2025
9843101
fix wording - removing 'you'
jpalac Aug 13, 2025
3ef4dd0
More logging and some minor nitpicks
andreasohlund Aug 13, 2025
b57399f
add link to asyncapi feature issue for comments
jpalac Aug 13, 2025
6a40d59
Add wwwroot to avoid warning on startup
jasontaylordev Aug 13, 2025
ed8d9d9
Revert "Add wwwroot to avoid warning on startup"
jasontaylordev Aug 13, 2025
234051d
Apply suggestions from code review
jpalac Aug 14, 2025
0149254
Update asyncapi.md with link to public issue
jpalac Aug 18, 2025
bb4beaa
Update samples/asyncapi/simple/sample.md
jpalac Aug 18, 2025
abefc7f
Update samples/asyncapi/simple/sample.md
jpalac Aug 18, 2025
e671d28
Update sample.md remove die comment
jpalac Aug 18, 2025
26fdf51
Update samples/asyncapi/custom-message-types/sample.md
jpalac Aug 18, 2025
e9521dc
Update sample.md remove ide comment
jpalac Aug 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions menu/menu.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1727,3 +1727,5 @@
Url: shape-the-future/redis
- Title: NServiceBus and Infrastructure as Code (IaC)
Url: shape-the-future/iac-provisioning
- Title: NServiceBus and AsyncAPI
Url: shape-the-future/asyncapi
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#nullable enable
using Json.Schema;
using Json.Schema.Generation;
using Microsoft.Extensions.DependencyInjection;
using Neuroglia.AsyncApi;
using Neuroglia.AsyncApi.FluentBuilders.v3;
using Neuroglia.AsyncApi.Generation;
using Neuroglia.AsyncApi.v3;

namespace AsyncAPI.Feature;

public class ApiDocumentGenerator(IServiceProvider serviceProvider) : IAsyncApiDocumentGenerator
{
public async Task<IEnumerable<IAsyncApiDocument>> GenerateAsync(IEnumerable<Type> markupTypes, AsyncApiDocumentGenerationOptions options, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(options);

//only creating one document for the one NSB endpoint
var documents = new List<IAsyncApiDocument>(1);

var document = serviceProvider.GetRequiredService<IV3AsyncApiDocumentBuilder>();
options.V3BuilderSetup?.Invoke(document);

await GenerateChannels(document, options, cancellationToken);

documents.Add(document.Build());

return documents;
}

async Task GenerateChannels(IV3AsyncApiDocumentBuilder document, AsyncApiDocumentGenerationOptions options, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(document);
ArgumentNullException.ThrowIfNull(options);
IV3ChannelDefinitionBuilder channelBuilder = null!;

var typeCache = serviceProvider.GetRequiredService<TypeCache>();

//get all published events
foreach (var (actualType, publishedType) in typeCache.PublishedEventCache.Select(kvp => (kvp.Key, kvp.Value)))
{
var channelName = $"{publishedType.FullName!}";
document.WithChannel(channelName, channel =>
{
channelBuilder = channel;
channel
.WithAddress(typeCache.EndpointName)
.WithDescription(actualType.FullName);
});
await GenerateV3OperationForAsync(document, channelName, channelBuilder, actualType, publishedType, options, cancellationToken);
}

//NOTE this is where more channels and operations can be defined, for example subscribed to events, sent/received commands and messages
}

Task GenerateV3OperationForAsync(IV3AsyncApiDocumentBuilder document, string channelName, IV3ChannelDefinitionBuilder channel, Type actualType, Type producedType, AsyncApiDocumentGenerationOptions options, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(document);
ArgumentException.ThrowIfNullOrWhiteSpace(channelName);
ArgumentNullException.ThrowIfNull(channel);
ArgumentNullException.ThrowIfNull(actualType);
ArgumentNullException.ThrowIfNull(producedType);
ArgumentNullException.ThrowIfNull(options);

var requestMessagePayloadSchema = new JsonSchemaBuilder().FromType(actualType, JsonSchemaGeneratorConfiguration.Default).Build();
var messageName = producedType.FullName!;

var messageChannelReference = $"#/channels/{channelName}/messages/{producedType.FullName!}";
channel.WithMessage(messageName, message =>
{
message
.WithName(messageName)
.WithPayloadSchema(schema => schema
.WithFormat("application/vnd.aai.asyncapi+json;version=3.0.0")
.WithSchema(requestMessagePayloadSchema));
});

var operationName = $"{producedType.FullName!}";
document.WithOperation(operationName, operation =>
{
operation
.WithAction(V3OperationAction.Send)
.WithChannel($"#/channels/{channelName}")
.WithMessage(messageChannelReference);
});
return Task.CompletedTask;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net9.0</TargetFramework>
<LangVersion>13.0</LangVersion>
<ImplicitUsings>enable</ImplicitUsings>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="NServiceBus" Version="9.*" />
<PackageReference Include="Neuroglia.AsyncApi.AspNetCore" Version="3.0.6" />
<PackageReference Include="Neuroglia.AsyncApi.AspNetCore.UI" Version="3.0.6" />
<PackageReference Include="Neuroglia.AsyncApi.Core" Version="3.0.6" />
<PackageReference Include="Neuroglia.AsyncApi.DependencyInjectionExtensions" Version="3.0.6" />
<PackageReference Include="Neuroglia.AsyncApi.FluentBuilders" Version="3.0.6" />
<PackageReference Include="Neuroglia.AsyncApi.Generation" Version="3.0.6" />
<PackageReference Include="Neuroglia.AsyncApi.Validation" Version="3.0.6" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
using System.Reflection;
using Microsoft.Extensions.DependencyInjection;
using Neuroglia.AsyncApi.Generation;
using NServiceBus.Features;
using NServiceBus.Unicast.Messages;

namespace AsyncAPI.Feature;

public sealed class AsyncApiFeature : NServiceBus.Features.Feature
{
protected override void Setup(FeatureConfigurationContext context)
{
var conventions = context.Settings.Get<Conventions>();

var messageMetadataRegistry = context.Settings.Get<MessageMetadataRegistry>();

var proxyGenerator = new TypeProxyGenerator();

Dictionary<Type, Type> publishedEventCache = new();
Dictionary<string, (Type SubscribedType, Type ActualType)> subscribedEventCache = new();

foreach (var messageMetadata in messageMetadataRegistry.GetAllMessages())
{
//NOTE only events decorated with the "PublishedEvent" or "SubscribedEvent" are being stored so that they can be "translated" at publish and subscribe time from their concrete types
if (conventions.IsEventType(messageMetadata.MessageType))
{
var publishedEvent = messageMetadata.MessageType.GetCustomAttribute<PublishedEvent>();
if (publishedEvent != null)
{
publishedEventCache.Add(messageMetadata.MessageType,
proxyGenerator.CreateTypeFrom($"{publishedEvent.EventName}V{publishedEvent.Version}"));
}

var subscribedEvent = messageMetadata.MessageType.GetCustomAttribute<SubscribedEvent>();
if (subscribedEvent != null)
{
var subscribedType =
proxyGenerator.CreateTypeFrom($"{subscribedEvent.EventName}V{subscribedEvent.Version}");
subscribedEventCache.Add(subscribedType.FullName!,
(SubscribedType: subscribedType, ActualType: messageMetadata.MessageType));
}
}
}

if (context.Settings.GetOrDefault<bool>("Installers.Enable"))
{
context.RegisterStartupTask(static provider => new ManualSubscribe(provider.GetRequiredService<TypeCache>()
.SubscribedEventCache.Values.Select(x => x.SubscribedType).ToArray()));
}

//Registering the behaviors required to replace the outgoing and incoming message types based on the defined conventions
context.Pipeline.Register(
static provider =>
new ReplaceOutgoingEnclosedMessageTypeHeaderBehavior(provider.GetRequiredService<TypeCache>().PublishedEventCache),
"Replaces the outgoing enclosed message type header with the published event type fullname");
context.Pipeline.Register(
static provider => new ReplaceMulticastRoutingBehavior(provider.GetRequiredService<TypeCache>().PublishedEventCache),
"Replaces the multicast routing strategies that match the actual published event type with the published event type name");

if (!context.Settings.GetOrDefault<bool>("Endpoint.SendOnly"))
{
context.Pipeline.Register(
static provider =>
new ReplaceIncomingEnclosedMessageTypeHeaderBehavior(provider.GetRequiredService<TypeCache>()
.SubscribedEventCache), "Replaces the incoming published event type name with the actual local event type name");
}

#region RegisterEventMappings
context.Services.AddSingleton(new TypeCache
{
EndpointName = context.Settings.EndpointName(),
PublishedEventCache = publishedEventCache,
SubscribedEventCache = subscribedEventCache
});
#endregion

#region RegisterCustomDocumentGenerator
context.Services.AddTransient<IAsyncApiDocumentGenerator>(
provider => new ApiDocumentGenerator(provider));
#endregion
}

class ManualSubscribe(Type[] subscribedEvents) : FeatureStartupTask
{
protected override Task OnStart(IMessageSession session, CancellationToken cancellationToken = default)
{
return Task.WhenAll(subscribedEvents.Select(subscribedEvent => session.Subscribe(subscribedEvent, cancellationToken: cancellationToken)));
}

protected override Task OnStop(IMessageSession session, CancellationToken cancellationToken = default)
{
return Task.CompletedTask;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using NServiceBus.Features;

namespace AsyncAPI.Feature;

public static class EndpointConfigurationExtensions
{
#region EnableAsyncApiSupport
public static void EnableAsyncApiSupport(
this EndpointConfiguration endpointConfiguration)
{
endpointConfiguration.DisableFeature<AutoSubscribe>();
endpointConfiguration.EnableFeature<AsyncApiFeature>();

var conventions = endpointConfiguration.Conventions();
conventions.Add(new PublishedEventsConvention());
conventions.Add(new SubscribedEventsConvention());
}
#endregion
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace AsyncAPI.Feature;

[AttributeUsage(AttributeTargets.Class, AllowMultiple = false, Inherited = false)]
public sealed class PublishedEvent : Attribute
{
public string EventName { get; init; }
public int Version { get; init; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using System.Reflection;

namespace AsyncAPI.Feature;

public class PublishedEventsConvention : IMessageConvention
{
public bool IsMessageType(Type type)
{
return false;
}

public bool IsCommandType(Type type)
{
return false;
}

public bool IsEventType(Type type)
{
return type.GetCustomAttribute<PublishedEvent>() != null;
}

public string Name { get; } = "AsyncAPI Sample Event Message Convention";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using NServiceBus.Pipeline;

namespace AsyncAPI.Feature;

class ReplaceIncomingEnclosedMessageTypeHeaderBehavior(Dictionary<string, (Type SubscribedType, Type ActualType)> subscribedEventCache)
: IBehavior<ITransportReceiveContext, ITransportReceiveContext>
{
public Task Invoke(ITransportReceiveContext context, Func<ITransportReceiveContext, Task> next)
{
if (context.Message.Headers.TryGetValue(Headers.EnclosedMessageTypes, out var enclosedMessageTypes) && subscribedEventCache.TryGetValue(enclosedMessageTypes, out var subscribedEventType))
{
// very blunt and might break with certain transports
context.Message.Headers[Headers.EnclosedMessageTypes] = subscribedEventType.ActualType.FullName;
}
return next(context);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using NServiceBus.Pipeline;
using NServiceBus.Routing;

namespace AsyncAPI.Feature;

class ReplaceMulticastRoutingBehavior(Dictionary<Type, Type> publishedEventCache) : IBehavior<IRoutingContext, IRoutingContext>
{
public Task Invoke(IRoutingContext context, Func<IRoutingContext, Task> next)
{
var logicalMessage = context.Extensions.Get<OutgoingLogicalMessage>();
if (publishedEventCache.TryGetValue(logicalMessage.MessageType, out var publishedEvent))
{
var newStrategies = new List<RoutingStrategy>(context.RoutingStrategies.Count);
var strategies = context.RoutingStrategies;
foreach (var strategy in strategies)
{
if (strategy is MulticastRoutingStrategy multicastRoutingStrategy)
{
// we assume here a multi cast address tag will never do anything with the headers so we pass a static empty dictionary
var multicastAddressTag = (MulticastAddressTag) multicastRoutingStrategy.Apply(emptyHeaders);
if (multicastAddressTag.MessageType == logicalMessage.MessageType)
{
newStrategies.Add(new MulticastRoutingStrategy(publishedEvent));
}
}
else
{
newStrategies.Add(strategy);
}
}

context.RoutingStrategies = newStrategies;
}

return next(context);
}

private readonly Dictionary<string, string> emptyHeaders = new Dictionary<string, string>();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using NServiceBus.Pipeline;

namespace AsyncAPI.Feature;

class ReplaceOutgoingEnclosedMessageTypeHeaderBehavior(Dictionary<Type, Type> publishedEventCache) : IBehavior<IOutgoingPhysicalMessageContext,
IOutgoingPhysicalMessageContext>
{
public Task Invoke(IOutgoingPhysicalMessageContext context, Func<IOutgoingPhysicalMessageContext, Task> next)
{
var logicalMessage = context.Extensions.Get<OutgoingLogicalMessage>();
if (publishedEventCache.TryGetValue(logicalMessage.MessageType, out var publishedEvent))
{
// very blunt and might break with certain transports
context.Headers[Headers.EnclosedMessageTypes] = publishedEvent.FullName;
}

return next(context);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace AsyncAPI.Feature;

[AttributeUsage(AttributeTargets.Class, AllowMultiple = false, Inherited = false)]
public sealed class SubscribedEvent : Attribute
{
public string EventName { get; init; }
public int Version { get; init; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using System.Reflection;

namespace AsyncAPI.Feature;

public class SubscribedEventsConvention : IMessageConvention
{
public bool IsMessageType(Type type)
{
return false;
}

public bool IsCommandType(Type type)
{
return false;
}

public bool IsEventType(Type type)
{
return type.GetCustomAttribute<SubscribedEvent>() != null;
}

public string Name => "AsyncAPI Sample Event Message Convention";
}
Loading
Loading