Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RabbitMQ.Client v7 client integration package #6770

Merged
merged 11 commits into from
Jan 13, 2025
Next Next commit
Add RabbitMQ.Client v7 client integration package
Fix #3956
eerhardt committed Jan 13, 2025
commit 649267d53d60d0b5202917edfc0d3277776f962e
7 changes: 7 additions & 0 deletions Aspire.sln
Original file line number Diff line number Diff line change
@@ -631,6 +631,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DaprServiceC", "playground\
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Dashboard", "Dashboard", "{830F7CA9-8E51-4D62-832F-91F53F85B7AE}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Aspire.RabbitMQ.Client.v7", "src\Components\Aspire.RabbitMQ.Client.v7\Aspire.RabbitMQ.Client.v7.csproj", "{604EBA18-AA84-A046-0CEF-9E2E61F711B3}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "AzureServiceBus", "AzureServiceBus", "{D2938171-1DBB-4E8D-AF16-97F75F1AE6DE}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ServiceBusWorker", "playground\AzureServiceBus\ServiceBusWorker\ServiceBusWorker.csproj", "{162F0B66-E88F-4735-8CE0-BE8950F74CC6}"
@@ -1659,6 +1661,10 @@ Global
{B26653B9-439E-4850-A7F8-43C6E5121952}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B26653B9-439E-4850-A7F8-43C6E5121952}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B26653B9-439E-4850-A7F8-43C6E5121952}.Release|Any CPU.Build.0 = Release|Any CPU
{604EBA18-AA84-A046-0CEF-9E2E61F711B3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{604EBA18-AA84-A046-0CEF-9E2E61F711B3}.Debug|Any CPU.Build.0 = Debug|Any CPU
{604EBA18-AA84-A046-0CEF-9E2E61F711B3}.Release|Any CPU.ActiveCfg = Release|Any CPU
{604EBA18-AA84-A046-0CEF-9E2E61F711B3}.Release|Any CPU.Build.0 = Release|Any CPU
{162F0B66-E88F-4735-8CE0-BE8950F74CC6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{162F0B66-E88F-4735-8CE0-BE8950F74CC6}.Debug|Any CPU.Build.0 = Debug|Any CPU
{162F0B66-E88F-4735-8CE0-BE8950F74CC6}.Release|Any CPU.ActiveCfg = Release|Any CPU
@@ -1970,6 +1976,7 @@ Global
{042DD8C6-A26C-4B06-80A1-FE7F8659C5BC} = {B7345F72-712F-436C-AE18-CAF7CDD4A990}
{B26653B9-439E-4850-A7F8-43C6E5121952} = {57A42144-739E-49A7-BADB-BB8F5F20FA17}
{830F7CA9-8E51-4D62-832F-91F53F85B7AE} = {4981B3A5-4AFD-4191-BF7D-8692D9783D60}
{604EBA18-AA84-A046-0CEF-9E2E61F711B3} = {27381127-6C45-4B4C-8F18-41FF48DFE4B2}
{D2938171-1DBB-4E8D-AF16-97F75F1AE6DE} = {D173887B-AF42-4576-B9C1-96B9E9B3D9C0}
{162F0B66-E88F-4735-8CE0-BE8950F74CC6} = {D2938171-1DBB-4E8D-AF16-97F75F1AE6DE}
{A7EC9111-F3CC-46E8-B95E-3768481D67B4} = {D2938171-1DBB-4E8D-AF16-97F75F1AE6DE}
11 changes: 6 additions & 5 deletions playground/TestShop/BasketService/BasketService.cs
Original file line number Diff line number Diff line change
@@ -75,14 +75,15 @@ public override async Task<CheckoutCustomerBasketResponse> CheckoutBasket(Checko
return new();
}

using var channel = _messageConnection.CreateModel();
channel.QueueDeclare(queueName, durable: true, exclusive: false);
using var channel = await _messageConnection.CreateChannelAsync();
await channel.QueueDeclareAsync(queueName, durable: true, exclusive: false);

var props = channel.CreateBasicProperties();
var props = new BasicProperties();
props.Persistent = true; // or props.DeliveryMode = 2;
channel.BasicPublish(
exchange: "",
await channel.BasicPublishAsync(
exchange: string.Empty,
routingKey: queueName,
mandatory: true,
basicProperties: props,
body: JsonSerializer.SerializeToUtf8Bytes(order));
}
3 changes: 2 additions & 1 deletion playground/TestShop/BasketService/BasketService.csproj
Original file line number Diff line number Diff line change
@@ -18,7 +18,8 @@

<ItemGroup>
<AspireProjectOrPackageReference Include="Aspire.StackExchange.Redis" />
<AspireProjectOrPackageReference Include="Aspire.RabbitMQ.Client" />
<AspireProjectOrPackageReference Include="Aspire.RabbitMQ.Client.v7" />
<PackageReference Include="RabbitMQ.Client" VersionOverride="7.0.0" />
<ProjectReference Include="..\TestShop.ServiceDefaults\TestShop.ServiceDefaults.csproj" />
</ItemGroup>

23 changes: 13 additions & 10 deletions playground/TestShop/OrderProcessor/OrderProcessingWorker.cs
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@ public class OrderProcessingWorker : BackgroundService
private readonly IConfiguration _config;
private readonly IServiceProvider _serviceProvider;
private IConnection? _messageConnection;
private IModel? _messageChannel;
private IChannel? _messageChannel;

public OrderProcessingWorker(ILogger<OrderProcessingWorker> logger, IConfiguration config, IServiceProvider serviceProvider)
{
@@ -23,22 +23,23 @@ public OrderProcessingWorker(ILogger<OrderProcessingWorker> logger, IConfigurati

protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
return Task.Factory.StartNew(() =>
return Task.Factory.StartNew(async () =>
{
const string configKeyName = "Aspire:RabbitMQ:Client:OrderQueueName";
string queueName = _config[configKeyName] ?? "orders";

_messageConnection = _serviceProvider.GetRequiredService<IConnection>();

_messageChannel = _messageConnection.CreateModel();
_messageChannel.QueueDeclare(queueName, durable: true, exclusive: false);
_messageChannel = await _messageConnection.CreateChannelAsync();
await _messageChannel.QueueDeclareAsync(queueName, durable: true, exclusive: false);

var consumer = new EventingBasicConsumer(_messageChannel);
consumer.Received += ProcessMessageAsync;
var consumer = new AsyncEventingBasicConsumer(_messageChannel);
consumer.ReceivedAsync += ProcessMessageAsync;

_messageChannel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);
await _messageChannel.BasicConsumeAsync(
queueName,
autoAck: true,
consumer);
}, stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Current);
}

@@ -49,7 +50,7 @@ public override async Task StopAsync(CancellationToken cancellationToken)
_messageChannel?.Dispose();
}

private void ProcessMessageAsync(object? sender, BasicDeliverEventArgs args)
private Task ProcessMessageAsync(object? sender, BasicDeliverEventArgs args)
{
_logger.LogInformation($"Processing Order at: {DateTime.UtcNow}");

@@ -72,5 +73,7 @@ private void ProcessMessageAsync(object? sender, BasicDeliverEventArgs args)
BuyerId:{BuyerId}
ProductCount:{Count}
""", order.Id, order.BuyerId, order.Items.Count);

return Task.CompletedTask;
}
}
3 changes: 2 additions & 1 deletion playground/TestShop/OrderProcessor/OrderProcessor.csproj
Original file line number Diff line number Diff line change
@@ -15,7 +15,8 @@
</ItemGroup>

<ItemGroup>
<AspireProjectOrPackageReference Include="Aspire.RabbitMQ.Client" />
<AspireProjectOrPackageReference Include="Aspire.RabbitMQ.Client.v7" />
<PackageReference Include="RabbitMQ.Client" VersionOverride="7.0.0" />
<ProjectReference Include="..\TestShop.ServiceDefaults\TestShop.ServiceDefaults.csproj" />
</ItemGroup>
</Project>
2 changes: 2 additions & 0 deletions playground/TestShop/OrderProcessor/Program.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using OrderProcessor;

System.Diagnostics.Debugger.Launch();

var builder = Host.CreateApplicationBuilder(args);

builder.AddServiceDefaults();
1 change: 1 addition & 0 deletions playground/TestShop/TestShop.AppHost/Program.cs
Original file line number Diff line number Diff line change
@@ -28,6 +28,7 @@

var messaging = builder.AddRabbitMQ("messaging")
.WithDataVolume()
.WithLifetime(ContainerLifetime.Persistent)
.WithManagementPlugin()
.PublishAsContainer();

Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>$(DefaultTargetFramework)</TargetFramework>
<IsPackable>true</IsPackable>
<AssemblyName>Aspire.RabbitMQ.Client</AssemblyName>
<PackageTags>$(ComponentDatabasePackageTags) rabbitmq amqp</PackageTags>
<Description>A RabbitMQ client (v7+) that integrates with Aspire, including health checks, logging, and telemetry.</Description>
<NoWarn>$(NoWarn);SYSLIB1100;SYSLIB1101</NoWarn>
</PropertyGroup>

<PropertyGroup>
<MinCodeCoverage>80</MinCodeCoverage>
</PropertyGroup>

<ItemGroup>
<Compile Include="..\Common\ConfigurationSchemaAttributes.cs" Link="ConfigurationSchemaAttributes.cs" />
<Compile Include="..\Common\HealthChecksExtensions.cs" Link="HealthChecksExtensions.cs" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="AspNetCore.HealthChecks.Rabbitmq" />
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" />
<PackageReference Include="Polly.Core" />
<PackageReference Include="RabbitMQ.Client" VersionOverride="7.0.0" />
<PackageReference Include="OpenTelemetry.Extensions.Hosting" />
</ItemGroup>

<ItemGroup>
<InternalsVisibleTo Include="Aspire.RabbitMQ.Client.v7.Tests" />
</ItemGroup>

</Project>
231 changes: 231 additions & 0 deletions src/Components/Aspire.RabbitMQ.Client.v7/AspireRabbitMQExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Diagnostics;
using System.Net.Sockets;
using Aspire;
using Aspire.RabbitMQ.Client;
using HealthChecks.RabbitMQ;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Logging;
using Polly;
using Polly.Retry;
using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;

namespace Microsoft.Extensions.Hosting;

/// <summary>
/// Extension methods for connecting to a RabbitMQ message broker.
/// </summary>
public static class AspireRabbitMQExtensions
{
private const string ActivitySourceName = "Aspire.RabbitMQ.Client";
private static readonly ActivitySource s_activitySource = new ActivitySource(ActivitySourceName);
private const string DefaultConfigSectionName = "Aspire:RabbitMQ:Client";

/// <summary>
/// Registers <see cref="IConnection"/> as a singleton in the services provided by the <paramref name="builder"/>.
/// Enables retries, corresponding health check, logging, and telemetry.
/// </summary>
/// <param name="builder">The <see cref="IHostApplicationBuilder" /> to read config from and add services to.</param>
/// <param name="connectionName">A name used to retrieve the connection string from the ConnectionStrings configuration section.</param>
/// <param name="configureSettings">An optional method that can be used for customizing the <see cref="RabbitMQClientSettings"/>. It's invoked after the settings are read from the configuration.</param>
/// <param name="configureConnectionFactory">An optional method that can be used for customizing the <see cref="ConnectionFactory"/>. It's invoked after the options are read from the configuration.</param>
/// <remarks>Reads the configuration from "Aspire:RabbitMQ:Client" section.</remarks>
public static void AddRabbitMQClient(
this IHostApplicationBuilder builder,
string connectionName,
Action<RabbitMQClientSettings>? configureSettings = null,
Action<ConnectionFactory>? configureConnectionFactory = null)
=> AddRabbitMQClient(builder, configureSettings, configureConnectionFactory, connectionName, serviceKey: null);

/// <summary>
/// Registers <see cref="IConnection"/> as a keyed singleton for the given <paramref name="name"/> in the services provided by the <paramref name="builder"/>.
/// Enables retries, corresponding health check, logging, and telemetry.
/// </summary>
/// <param name="builder">The <see cref="IHostApplicationBuilder" /> to read config from and add services to.</param>
/// <param name="name">The name of the component, which is used as the <see cref="ServiceDescriptor.ServiceKey"/> of the service and also to retrieve the connection string from the ConnectionStrings configuration section.</param>
/// <param name="configureSettings">An optional method that can be used for customizing the <see cref="RabbitMQClientSettings"/>. It's invoked after the settings are read from the configuration.</param>
/// <param name="configureConnectionFactory">An optional method that can be used for customizing the <see cref="ConnectionFactory"/>. It's invoked after the options are read from the configuration.</param>
/// <remarks>Reads the configuration from "Aspire:RabbitMQ:Client:{name}" section.</remarks>
public static void AddKeyedRabbitMQClient(
this IHostApplicationBuilder builder,
string name,
Action<RabbitMQClientSettings>? configureSettings = null,
Action<ConnectionFactory>? configureConnectionFactory = null)
{
ArgumentException.ThrowIfNullOrEmpty(name);

AddRabbitMQClient(builder, configureSettings, configureConnectionFactory, connectionName: name, serviceKey: name);
}

private static void AddRabbitMQClient(
IHostApplicationBuilder builder,
Action<RabbitMQClientSettings>? configureSettings,
Action<ConnectionFactory>? configureConnectionFactory,
string connectionName,
object? serviceKey)
{
ArgumentNullException.ThrowIfNull(builder);

var configSection = builder.Configuration.GetSection(DefaultConfigSectionName);
var namedConfigSection = configSection.GetSection(connectionName);

var settings = new RabbitMQClientSettings();
configSection.Bind(settings);
namedConfigSection.Bind(settings);

if (builder.Configuration.GetConnectionString(connectionName) is string connectionString)
{
settings.ConnectionString = connectionString;
}

configureSettings?.Invoke(settings);

IConnectionFactory CreateConnectionFactory(IServiceProvider sp)
{
// ensure the log forwarder is initialized
sp.GetRequiredService<RabbitMQEventSourceLogForwarder>().Start();

var factory = new ConnectionFactory();

var configurationOptionsSection = configSection.GetSection("ConnectionFactory");
var namedConfigurationOptionsSection = namedConfigSection.GetSection("ConnectionFactory");
configurationOptionsSection.Bind(factory);
namedConfigurationOptionsSection.Bind(factory);

// the connection string from settings should win over the one from the ConnectionFactory section
var connectionString = settings.ConnectionString;
if (!string.IsNullOrEmpty(connectionString))
{
factory.Uri = new(connectionString);
}

configureConnectionFactory?.Invoke(factory);

return factory;
}

if (serviceKey is null)
{
builder.Services.AddSingleton<IConnectionFactory>(CreateConnectionFactory);
builder.Services.AddSingleton<IConnection>(sp => CreateConnection(sp.GetRequiredService<IConnectionFactory>(), settings.MaxConnectRetryCount));
}
else
{
builder.Services.AddKeyedSingleton<IConnectionFactory>(serviceKey, (sp, _) => CreateConnectionFactory(sp));
builder.Services.AddKeyedSingleton<IConnection>(serviceKey, (sp, key) => CreateConnection(sp.GetRequiredKeyedService<IConnectionFactory>(key), settings.MaxConnectRetryCount));
}

builder.Services.AddSingleton<RabbitMQEventSourceLogForwarder>();

if (!settings.DisableTracing)
{
// Note that RabbitMQ.Client v6.6 doesn't have built-in support for tracing. See https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1261

builder.Services.AddOpenTelemetry()
.WithTracing(traceBuilder =>
traceBuilder
.AddSource(ActivitySourceName)
.AddSource("RabbitMQ.Client.Publisher")
.AddSource("RabbitMQ.Client.Subscriber"));
}

if (!settings.DisableHealthChecks)
{
builder.TryAddHealthCheck(new HealthCheckRegistration(
serviceKey is null ? "RabbitMQ.Client" : $"RabbitMQ.Client_{connectionName}",
sp =>
{
try
{
// if the IConnection can't be resolved, make a health check that will fail
var options = new RabbitMQHealthCheckOptions();
options.Connection = serviceKey is null ? sp.GetRequiredService<IConnection>() : sp.GetRequiredKeyedService<IConnection>(serviceKey);
return new RabbitMQHealthCheck(options);
}
catch (Exception ex)
{
return new FailedHealthCheck(ex);
}
},
failureStatus: default,
tags: default));
}
}

private sealed class FailedHealthCheck(Exception ex) : IHealthCheck
{
public Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = default)
{
return Task.FromResult(new HealthCheckResult(context.Registration.FailureStatus, exception: ex));
}
}

private static IConnection CreateConnection(IConnectionFactory factory, int retryCount)
{
var resiliencePipelineBuilder = new ResiliencePipelineBuilder();
if (retryCount > 0)
{
resiliencePipelineBuilder.AddRetry(new RetryStrategyOptions
{
ShouldHandle = static args => args.Outcome is { Exception: SocketException or BrokerUnreachableException }
? PredicateResult.True()
: PredicateResult.False(),
BackoffType = DelayBackoffType.Exponential,
MaxRetryAttempts = retryCount,
Delay = TimeSpan.FromSeconds(1),
});
}
var resiliencePipeline = resiliencePipelineBuilder.Build();

using var activity = s_activitySource.StartActivity("rabbitmq connect", ActivityKind.Client);
AddRabbitMQTags(activity, factory.Uri);

#pragma warning disable CA2012 // Use ValueTasks correctly
return resiliencePipeline.ExecuteAsync(static async (factory, cancellationToken) =>
{
using var connectAttemptActivity = s_activitySource.StartActivity("rabbitmq connect attempt", ActivityKind.Client);
AddRabbitMQTags(connectAttemptActivity, factory.Uri, "connect");

try
{
return await factory.CreateConnectionAsync(cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
if (connectAttemptActivity is not null)
{
connectAttemptActivity.AddTag("exception.message", ex.Message);
// Note that "exception.stacktrace" is the full exception detail, not just the StackTrace property.
// See https://opentelemetry.io/docs/specs/semconv/attributes-registry/exception/
// and https://github.com/open-telemetry/opentelemetry-specification/pull/697#discussion_r453662519
connectAttemptActivity.AddTag("exception.stacktrace", ex.ToString());
connectAttemptActivity.AddTag("exception.type", ex.GetType().FullName);
connectAttemptActivity.SetStatus(ActivityStatusCode.Error);
}
throw;
}
}, factory).GetAwaiter().GetResult();
#pragma warning restore CA2012 // Use ValueTasks correctly
}

private static void AddRabbitMQTags(Activity? activity, Uri address, string? operation = null)
{
if (activity is null)
{
return;
}

activity.AddTag("server.address", address.Host);
activity.AddTag("server.port", address.Port);
activity.AddTag("messaging.system", "rabbitmq");
if (operation is not null)
{
activity.AddTag("messaging.operation", operation);
}
}
}
11 changes: 11 additions & 0 deletions src/Components/Aspire.RabbitMQ.Client.v7/AssemblyInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using Aspire;
using Aspire.RabbitMQ.Client;
using RabbitMQ.Client;

[assembly: ConfigurationSchema("Aspire:RabbitMQ:Client", typeof(RabbitMQClientSettings))]
[assembly: ConfigurationSchema("Aspire:RabbitMQ:Client:ConnectionFactory", typeof(ConnectionFactory), exclusionPaths: ["ClientProperties"])]

[assembly: LoggingCategories("RabbitMQ.Client")]
354 changes: 354 additions & 0 deletions src/Components/Aspire.RabbitMQ.Client.v7/ConfigurationSchema.json

Large diffs are not rendered by default.

14 changes: 14 additions & 0 deletions src/Components/Aspire.RabbitMQ.Client.v7/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#nullable enable
Aspire.RabbitMQ.Client.RabbitMQClientSettings
Aspire.RabbitMQ.Client.RabbitMQClientSettings.ConnectionString.get -> string?
Aspire.RabbitMQ.Client.RabbitMQClientSettings.ConnectionString.set -> void
Aspire.RabbitMQ.Client.RabbitMQClientSettings.DisableHealthChecks.get -> bool
Aspire.RabbitMQ.Client.RabbitMQClientSettings.DisableHealthChecks.set -> void
Aspire.RabbitMQ.Client.RabbitMQClientSettings.DisableTracing.get -> bool
Aspire.RabbitMQ.Client.RabbitMQClientSettings.DisableTracing.set -> void
Aspire.RabbitMQ.Client.RabbitMQClientSettings.MaxConnectRetryCount.get -> int
Aspire.RabbitMQ.Client.RabbitMQClientSettings.MaxConnectRetryCount.set -> void
Aspire.RabbitMQ.Client.RabbitMQClientSettings.RabbitMQClientSettings() -> void
Microsoft.Extensions.Hosting.AspireRabbitMQExtensions
static Microsoft.Extensions.Hosting.AspireRabbitMQExtensions.AddKeyedRabbitMQClient(this Microsoft.Extensions.Hosting.IHostApplicationBuilder! builder, string! name, System.Action<Aspire.RabbitMQ.Client.RabbitMQClientSettings!>? configureSettings = null, System.Action<RabbitMQ.Client.ConnectionFactory!>? configureConnectionFactory = null) -> void
static Microsoft.Extensions.Hosting.AspireRabbitMQExtensions.AddRabbitMQClient(this Microsoft.Extensions.Hosting.IHostApplicationBuilder! builder, string! connectionName, System.Action<Aspire.RabbitMQ.Client.RabbitMQClientSettings!>? configureSettings = null, System.Action<RabbitMQ.Client.ConnectionFactory!>? configureConnectionFactory = null) -> void
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#nullable enable

122 changes: 122 additions & 0 deletions src/Components/Aspire.RabbitMQ.Client.v7/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
# Aspire.RabbitMQ.Client library

Registers an [IConnection](https://rabbitmq.github.io/rabbitmq-dotnet-client/api/RabbitMQ.Client.IConnection.html) in the DI container for connecting to a RabbitMQ server. Enables corresponding health check, logging and telemetry.

## Getting started

### Prerequisites

- RabbitMQ server and the server hostname for connecting a client.

### Install the package

Install the .NET Aspire RabbitMQ library with [NuGet](https://www.nuget.org):

```dotnetcli
dotnet add package Aspire.RabbitMQ.Client
```

## Usage example

In the _Program.cs_ file of your project, call the `AddRabbitMQClient` extension method to register an `IConnection` for use via the dependency injection container. The method takes a connection name parameter.

```csharp
builder.AddRabbitMQClient("messaging");
```

You can then retrieve the `IConnection` instance using dependency injection. For example, to retrieve the connection from a Web API controller:

```csharp
private readonly IConnection _connection;

public ProductsController(IConnection connection)
{
_connection = connection;
}
```

## Configuration

The .NET Aspire RabbitMQ component provides multiple options to configure the connection based on the requirements and conventions of your project.

### Use a connection string

When using a connection string from the `ConnectionStrings` configuration section, you can provide the name of the connection string when calling `builder.AddRabbitMQClient()`:

```csharp
builder.AddRabbitMQClient("myConnection");
```

And then the connection string will be retrieved from the `ConnectionStrings` configuration section:

```json
{
"ConnectionStrings": {
"myConnection": "amqp://username:password@localhost:5672"
}
}
```

See the [ConnectionString documentation](https://www.rabbitmq.com/uri-spec.html) for more information on how to format this connection string.

### Use configuration providers

The .NET Aspire RabbitMQ component supports [Microsoft.Extensions.Configuration](https://learn.microsoft.com/dotnet/api/microsoft.extensions.configuration). It loads the `RabbitMQClientSettings` from configuration by using the `Aspire:RabbitMQ:Client` key. Example `appsettings.json` that configures some of the options:

```json
{
"Aspire": {
"RabbitMQ": {
"Client": {
"DisableHealthChecks": true
}
}
}
}
```

### Use inline delegates

Also you can pass the `Action<RabbitMQClientSettings> configureSettings` delegate to set up some or all the options inline, for example to disable health checks from code:

```csharp
builder.AddRabbitMQClient("messaging", settings => settings.DisableHealthChecks = true);
```

You can also setup the [ConnectionFactory](https://rabbitmq.github.io/rabbitmq-dotnet-client/api/RabbitMQ.Client.ConnectionFactory.html) using the `Action<ConnectionFactory> configureConnectionFactory` delegate parameter of the `AddRabbitMQClient` method. For example to set the client provided name for connections:

```csharp
builder.AddRabbitMQClient("messaging", configureConnectionFactory: factory => factory.ClientProvidedName = "MyApp");
```

## AppHost extensions

In your AppHost project, install the `Aspire.Hosting.RabbitMQ` library with [NuGet](https://www.nuget.org):

```dotnetcli
dotnet add package Aspire.Hosting.RabbitMQ
```

Then, in the _Program.cs_ file of `AppHost`, register a RabbitMQ server and consume the connection using the following methods:

```csharp
var messaging = builder.AddRabbitMQ("messaging");

var myService = builder.AddProject<Projects.MyService>()
.WithReference(messaging);
```

The `WithReference` method configures a connection in the `MyService` project named `messaging`. In the _Program.cs_ file of `MyService`, the RabbitMQ connection can be consumed using:

```csharp
builder.AddRabbitMQClient("messaging");
```

## Additional documentation

* https://rabbitmq.github.io/rabbitmq-dotnet-client/
* https://github.com/dotnet/aspire/tree/main/src/Components/README.md

## Feedback & contributing

https://github.com/dotnet/aspire
37 changes: 37 additions & 0 deletions src/Components/Aspire.RabbitMQ.Client.v7/RabbitMQClientSettings.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

namespace Aspire.RabbitMQ.Client;

/// <summary>
/// Provides the client configuration settings for connecting to a RabbitMQ message broker.
/// </summary>
public sealed class RabbitMQClientSettings
{
/// <summary>
/// Gets or sets the connection string of the RabbitMQ server to connect to.
/// </summary>
public string? ConnectionString { get; set; }

/// <summary>
/// <para>Gets or sets the maximum number of connection retry attempts.</para>
/// <para>Default value is 5, set it to 0 to disable the retry mechanism.</para>
/// </summary>
public int MaxConnectRetryCount { get; set; } = 5;

/// <summary>
/// Gets or sets a boolean value that indicates whether the RabbitMQ health check is disabled or not.
/// </summary>
/// <value>
/// The default value is <see langword="false"/>.
/// </value>
public bool DisableHealthChecks { get; set; }

/// <summary>
/// Gets or sets a boolean value that indicates whether the OpenTelemetry tracing is disabled or not.
/// </summary>
/// <value>
/// The default value is <see langword="false"/>.
/// </value>
public bool DisableTracing { get; set; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Collections;
using System.Diagnostics;
using System.Diagnostics.Tracing;
using Microsoft.Extensions.Logging;

namespace Aspire.RabbitMQ.Client;

internal sealed class RabbitMQEventSourceLogForwarder : IDisposable
{
private static readonly Func<ErrorEventSourceEvent, Exception?, string> s_formatErrorEvent = FormatErrorEvent;
private static readonly Func<EventSourceEvent, Exception?, string> s_formatEvent = FormatEvent;

private readonly ILogger _logger;
private RabbitMQEventSourceListener? _listener;

public RabbitMQEventSourceLogForwarder(ILoggerFactory loggerFactory)
{
_logger = loggerFactory.CreateLogger("RabbitMQ.Client");
}

/// <summary>
/// Initiates the log forwarding from the RabbitMQ event sources to a provided <see cref="ILoggerFactory"/>, call <see cref="Dispose"/> to stop forwarding.
/// </summary>
public void Start()
{
_listener ??= new RabbitMQEventSourceListener(LogEvent, EventLevel.Verbose);
}

private void LogEvent(EventWrittenEventArgs eventData)
{
var level = MapLevel(eventData.Level);
var eventId = new EventId(eventData.EventId, eventData.EventName);

// Special case the Error event so the Exception Details are written correctly
if (eventData.EventId == 3 &&
eventData.EventName == "Error" &&
eventData.PayloadNames?.Count == 2 &&
eventData.Payload?.Count == 2 &&
eventData.PayloadNames[0] == "message" &&
eventData.PayloadNames[1] == "ex")
{
_logger.Log(level, eventId, new ErrorEventSourceEvent(eventData), null, s_formatErrorEvent);
}
else
{
Debug.Assert(
(eventData.EventId == 1 && eventData.EventName == "Info") ||
(eventData.EventId == 2 && eventData.EventName == "Warn"));

_logger.Log(level, eventId, new EventSourceEvent(eventData), null, s_formatEvent);
}
}

private static string FormatErrorEvent(ErrorEventSourceEvent eventSourceEvent, Exception? ex) =>
eventSourceEvent.EventData.Payload?[0]?.ToString() ?? "<empty>";

private static string FormatEvent(EventSourceEvent eventSourceEvent, Exception? ex) =>
eventSourceEvent.EventData.Payload?[0]?.ToString() ?? "<empty>";

public void Dispose() => _listener?.Dispose();

private static LogLevel MapLevel(EventLevel level) => level switch
{
EventLevel.Critical => LogLevel.Critical,
EventLevel.Error => LogLevel.Error,
EventLevel.Informational => LogLevel.Information,
EventLevel.Verbose => LogLevel.Debug,
EventLevel.Warning => LogLevel.Warning,
EventLevel.LogAlways => LogLevel.Information,
_ => throw new ArgumentOutOfRangeException(nameof(level), level, null),
};

private readonly struct EventSourceEvent : IReadOnlyList<KeyValuePair<string, object?>>
{
public EventWrittenEventArgs EventData { get; }

public EventSourceEvent(EventWrittenEventArgs eventData)
{
// only Info and Warn events are expected, which always have 'message' as the only payload
Debug.Assert(eventData.PayloadNames?.Count == 1 && eventData.PayloadNames[0] == "message");

EventData = eventData;
}

public IEnumerator<KeyValuePair<string, object?>> GetEnumerator()
{
for (var i = 0; i < Count; i++)
{
yield return this[i];
}
}

IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}

public int Count => EventData.PayloadNames?.Count ?? 0;

public KeyValuePair<string, object?> this[int index] => new(EventData.PayloadNames![index], EventData.Payload![index]);
}

private readonly struct ErrorEventSourceEvent : IReadOnlyList<KeyValuePair<string, object?>>
{
public EventWrittenEventArgs EventData { get; }

public ErrorEventSourceEvent(EventWrittenEventArgs eventData)
{
EventData = eventData;
}

public IEnumerator<KeyValuePair<string, object?>> GetEnumerator()
{
for (var i = 0; i < Count; i++)
{
yield return this[i];
}
}

IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}

public int Count => 5;

public KeyValuePair<string, object?> this[int index]
{
get
{
Debug.Assert(EventData.PayloadNames?.Count == 2 && EventData.Payload?.Count == 2);
Debug.Assert(EventData.PayloadNames[0] == "message");
Debug.Assert(EventData.PayloadNames[1] == "ex");

ArgumentOutOfRangeException.ThrowIfGreaterThanOrEqual(index, 5);

return index switch
{
0 => new(EventData.PayloadNames[0], EventData.Payload[0]),
< 5 => GetExData(EventData, index),
_ => throw new UnreachableException()
};

static KeyValuePair<string, object?> GetExData(EventWrittenEventArgs eventData, int index)
{
Debug.Assert(index >= 1 && index <= 4);
Debug.Assert(eventData.Payload?.Count == 2);
var exData = eventData.Payload[1] as IDictionary<string, object?>;
Debug.Assert(exData is not null && exData.Count == 4);

return index switch
{
1 => new("exception.type", exData["Type"]),
2 => new("exception.message", exData["Message"]),
3 => new("exception.stacktrace", exData["StackTrace"]),
4 => new("exception.innerexception", exData["InnerException"]),
_ => throw new UnreachableException()
};
}
}
}
}

/// <summary>
/// Implementation of <see cref="EventListener"/> that listens to events produced by the RabbitMQ.Client library.
/// </summary>
private sealed class RabbitMQEventSourceListener : EventListener
{
private readonly List<EventSource> _eventSources = new List<EventSource>();

private readonly Action<EventWrittenEventArgs> _log;
private readonly EventLevel _level;

public RabbitMQEventSourceListener(Action<EventWrittenEventArgs> log, EventLevel level)
{
_log = log;
_level = level;

foreach (EventSource eventSource in _eventSources)
{
OnEventSourceCreated(eventSource);
}

_eventSources.Clear();
}

protected sealed override void OnEventSourceCreated(EventSource eventSource)
{
base.OnEventSourceCreated(eventSource);

if (_log == null)
{
_eventSources.Add(eventSource);
}

if (eventSource.Name == "rabbitmq-dotnet-client" || eventSource.Name == "rabbitmq-client")
{
EnableEvents(eventSource, _level);
}
}

protected sealed override void OnEventWritten(EventWrittenEventArgs eventData)
{
// Workaround https://github.com/dotnet/corefx/issues/42600
if (eventData.EventId == -1)
{
return;
}

// There is a very tight race during the listener creation where EnableEvents was called
// and the thread producing events not observing the `_log` field assignment
_log?.Invoke(eventData);
}
}
}