Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RabbitMQ.Client v7 client integration package #6770

Merged
merged 11 commits into from
Jan 13, 2025
14 changes: 14 additions & 0 deletions Aspire.sln
Original file line number Diff line number Diff line change
@@ -631,6 +631,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DaprServiceC", "playground\
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Dashboard", "Dashboard", "{830F7CA9-8E51-4D62-832F-91F53F85B7AE}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Aspire.RabbitMQ.Client.v7", "src\Components\Aspire.RabbitMQ.Client.v7\Aspire.RabbitMQ.Client.v7.csproj", "{604EBA18-AA84-A046-0CEF-9E2E61F711B3}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Aspire.RabbitMQ.Client.v7.Tests", "tests\Aspire.RabbitMQ.Client.v7.Tests\Aspire.RabbitMQ.Client.v7.Tests.csproj", "{FEA86BE6-0533-4146-0084-F2940A64A258}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "AzureServiceBus", "AzureServiceBus", "{D2938171-1DBB-4E8D-AF16-97F75F1AE6DE}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ServiceBusWorker", "playground\AzureServiceBus\ServiceBusWorker\ServiceBusWorker.csproj", "{162F0B66-E88F-4735-8CE0-BE8950F74CC6}"
@@ -1659,6 +1663,14 @@ Global
{B26653B9-439E-4850-A7F8-43C6E5121952}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B26653B9-439E-4850-A7F8-43C6E5121952}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B26653B9-439E-4850-A7F8-43C6E5121952}.Release|Any CPU.Build.0 = Release|Any CPU
{604EBA18-AA84-A046-0CEF-9E2E61F711B3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{604EBA18-AA84-A046-0CEF-9E2E61F711B3}.Debug|Any CPU.Build.0 = Debug|Any CPU
{604EBA18-AA84-A046-0CEF-9E2E61F711B3}.Release|Any CPU.ActiveCfg = Release|Any CPU
{604EBA18-AA84-A046-0CEF-9E2E61F711B3}.Release|Any CPU.Build.0 = Release|Any CPU
{FEA86BE6-0533-4146-0084-F2940A64A258}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{FEA86BE6-0533-4146-0084-F2940A64A258}.Debug|Any CPU.Build.0 = Debug|Any CPU
{FEA86BE6-0533-4146-0084-F2940A64A258}.Release|Any CPU.ActiveCfg = Release|Any CPU
{FEA86BE6-0533-4146-0084-F2940A64A258}.Release|Any CPU.Build.0 = Release|Any CPU
{162F0B66-E88F-4735-8CE0-BE8950F74CC6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{162F0B66-E88F-4735-8CE0-BE8950F74CC6}.Debug|Any CPU.Build.0 = Debug|Any CPU
{162F0B66-E88F-4735-8CE0-BE8950F74CC6}.Release|Any CPU.ActiveCfg = Release|Any CPU
@@ -1970,6 +1982,8 @@ Global
{042DD8C6-A26C-4B06-80A1-FE7F8659C5BC} = {B7345F72-712F-436C-AE18-CAF7CDD4A990}
{B26653B9-439E-4850-A7F8-43C6E5121952} = {57A42144-739E-49A7-BADB-BB8F5F20FA17}
{830F7CA9-8E51-4D62-832F-91F53F85B7AE} = {4981B3A5-4AFD-4191-BF7D-8692D9783D60}
{604EBA18-AA84-A046-0CEF-9E2E61F711B3} = {27381127-6C45-4B4C-8F18-41FF48DFE4B2}
{FEA86BE6-0533-4146-0084-F2940A64A258} = {C424395C-1235-41A4-BF55-07880A04368C}
{D2938171-1DBB-4E8D-AF16-97F75F1AE6DE} = {D173887B-AF42-4576-B9C1-96B9E9B3D9C0}
{162F0B66-E88F-4735-8CE0-BE8950F74CC6} = {D2938171-1DBB-4E8D-AF16-97F75F1AE6DE}
{A7EC9111-F3CC-46E8-B95E-3768481D67B4} = {D2938171-1DBB-4E8D-AF16-97F75F1AE6DE}
5 changes: 3 additions & 2 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
@@ -62,7 +62,8 @@
<PackageVersion Include="AspNetCore.HealthChecks.MySql" Version="8.0.1" />
<PackageVersion Include="AspNetCore.HealthChecks.NpgSql" Version="8.0.2" />
<PackageVersion Include="AspNetCore.HealthChecks.Oracle" Version="8.0.1" />
<PackageVersion Include="AspNetCore.HealthChecks.Rabbitmq" Version="8.0.2" />
<PackageVersion Include="AspNetCore.HealthChecks.Rabbitmq" Version="9.0.0" />
<PackageVersion Include="AspNetCore.HealthChecks.Rabbitmq.v6" Version="9.0.0" />
<PackageVersion Include="AspNetCore.HealthChecks.Redis" Version="8.0.1" />
<PackageVersion Include="AspNetCore.HealthChecks.SqlServer" Version="8.0.2" />
<PackageVersion Include="AspNetCore.HealthChecks.Uris" Version="8.0.1" />
@@ -103,7 +104,7 @@
<PackageVersion Include="Polly.Extensions" Version="8.5.0" />
<PackageVersion Include="Pomelo.EntityFrameworkCore.MySql" Version="8.0.2" />
<PackageVersion Include="Qdrant.Client" Version="1.12.0" />
<PackageVersion Include="RabbitMQ.Client" Version="[6.8.1,7.0.0)" />
<PackageVersion Include="RabbitMQ.Client" Version="7.0.0" />
<PackageVersion Include="StackExchange.Redis" Version="2.8.22" />
<PackageVersion Include="System.IO.Hashing" Version="8.0.0" />
<PackageVersion Include="Yarp.ReverseProxy" Version="2.2.0" />
11 changes: 6 additions & 5 deletions playground/TestShop/BasketService/BasketService.cs
Original file line number Diff line number Diff line change
@@ -75,14 +75,15 @@ public override async Task<CheckoutCustomerBasketResponse> CheckoutBasket(Checko
return new();
}

using var channel = _messageConnection.CreateModel();
channel.QueueDeclare(queueName, durable: true, exclusive: false);
using var channel = await _messageConnection.CreateChannelAsync();
await channel.QueueDeclareAsync(queueName, durable: true, exclusive: false);

var props = channel.CreateBasicProperties();
var props = new BasicProperties();
props.Persistent = true; // or props.DeliveryMode = 2;
channel.BasicPublish(
exchange: "",
await channel.BasicPublishAsync(
exchange: string.Empty,
routingKey: queueName,
mandatory: true,
basicProperties: props,
body: JsonSerializer.SerializeToUtf8Bytes(order));
}
2 changes: 1 addition & 1 deletion playground/TestShop/BasketService/BasketService.csproj
Original file line number Diff line number Diff line change
@@ -18,7 +18,7 @@

<ItemGroup>
<AspireProjectOrPackageReference Include="Aspire.StackExchange.Redis" />
<AspireProjectOrPackageReference Include="Aspire.RabbitMQ.Client" />
<AspireProjectOrPackageReference Include="Aspire.RabbitMQ.Client.v7" />
<ProjectReference Include="..\TestShop.ServiceDefaults\TestShop.ServiceDefaults.csproj" />
</ItemGroup>

23 changes: 13 additions & 10 deletions playground/TestShop/OrderProcessor/OrderProcessingWorker.cs
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@ public class OrderProcessingWorker : BackgroundService
private readonly IConfiguration _config;
private readonly IServiceProvider _serviceProvider;
private IConnection? _messageConnection;
private IModel? _messageChannel;
private IChannel? _messageChannel;

public OrderProcessingWorker(ILogger<OrderProcessingWorker> logger, IConfiguration config, IServiceProvider serviceProvider)
{
@@ -23,22 +23,23 @@ public OrderProcessingWorker(ILogger<OrderProcessingWorker> logger, IConfigurati

protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
return Task.Factory.StartNew(() =>
return Task.Factory.StartNew(async () =>
{
const string configKeyName = "Aspire:RabbitMQ:Client:OrderQueueName";
string queueName = _config[configKeyName] ?? "orders";

_messageConnection = _serviceProvider.GetRequiredService<IConnection>();

_messageChannel = _messageConnection.CreateModel();
_messageChannel.QueueDeclare(queueName, durable: true, exclusive: false);
_messageChannel = await _messageConnection.CreateChannelAsync();
await _messageChannel.QueueDeclareAsync(queueName, durable: true, exclusive: false);

var consumer = new EventingBasicConsumer(_messageChannel);
consumer.Received += ProcessMessageAsync;
var consumer = new AsyncEventingBasicConsumer(_messageChannel);
consumer.ReceivedAsync += ProcessMessageAsync;

_messageChannel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);
await _messageChannel.BasicConsumeAsync(
queueName,
autoAck: true,
consumer);
}, stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Current);
}

@@ -49,7 +50,7 @@ public override async Task StopAsync(CancellationToken cancellationToken)
_messageChannel?.Dispose();
}

private void ProcessMessageAsync(object? sender, BasicDeliverEventArgs args)
private Task ProcessMessageAsync(object? sender, BasicDeliverEventArgs args)
{
_logger.LogInformation($"Processing Order at: {DateTime.UtcNow}");

@@ -72,5 +73,7 @@ private void ProcessMessageAsync(object? sender, BasicDeliverEventArgs args)
BuyerId:{BuyerId}
ProductCount:{Count}
""", order.Id, order.BuyerId, order.Items.Count);

return Task.CompletedTask;
}
}
2 changes: 1 addition & 1 deletion playground/TestShop/OrderProcessor/OrderProcessor.csproj
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@
</ItemGroup>

<ItemGroup>
<AspireProjectOrPackageReference Include="Aspire.RabbitMQ.Client" />
<AspireProjectOrPackageReference Include="Aspire.RabbitMQ.Client.v7" />
<ProjectReference Include="..\TestShop.ServiceDefaults\TestShop.ServiceDefaults.csproj" />
</ItemGroup>
</Project>
1 change: 1 addition & 0 deletions playground/TestShop/TestShop.AppHost/Program.cs
Original file line number Diff line number Diff line change
@@ -28,6 +28,7 @@

var messaging = builder.AddRabbitMQ("messaging")
.WithDataVolume()
.WithLifetime(ContainerLifetime.Persistent)
.WithManagementPlugin()
.PublishAsContainer();

21 changes: 16 additions & 5 deletions src/Aspire.Hosting.RabbitMQ/RabbitMQBuilderExtensions.cs
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@
using Aspire.Hosting.RabbitMQ;
using Aspire.Hosting.Utils;
using Microsoft.Extensions.DependencyInjection;
using RabbitMQ.Client;

namespace Aspire.Hosting;

@@ -52,12 +53,22 @@ public static IResourceBuilder<RabbitMQServerResource> AddRabbitMQ(this IDistrib
});

var healthCheckKey = $"{name}_check";
builder.Services.AddHealthChecks().AddRabbitMQ((sp, options) =>
// cache the connection so it is reused on subsequent calls to the health check
IConnection? connection = null;
builder.Services.AddHealthChecks().AddRabbitMQ(async (sp) =>
{
// NOTE: This specific callback signature needs to be used to ensure
// that execution of this setup callback is deferred until after
// the container is build & started.
options.ConnectionUri = new Uri(connectionString!);
// NOTE: Ensure that execution of this setup callback is deferred until after
// the container is built & started.
return connection ??= await CreateConnection(connectionString!).ConfigureAwait(false);

static Task<IConnection> CreateConnection(string connectionString)
{
var factory = new ConnectionFactory
{
Uri = new Uri(connectionString)
};
return factory.CreateConnectionAsync();
}
}, healthCheckKey);

var rabbitmq = builder.AddResource(rabbitMq)
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>$(DefaultTargetFramework)</TargetFramework>
<IsPackable>true</IsPackable>
<PackageTags>$(ComponentDatabasePackageTags) rabbitmq amqp</PackageTags>
<Description>A RabbitMQ client (version 7+) that integrates with Aspire, including health checks, logging, and telemetry.</Description>
<MinCodeCoverage>80</MinCodeCoverage>
<NoWarn>$(NoWarn);SYSLIB1100;SYSLIB1101</NoWarn>
<!-- Disable package validation as this package hasn't shipped yet. -->
<EnablePackageValidation>false</EnablePackageValidation>

<!-- Keep the same assembly name as the main library. -->
<AssemblyName>Aspire.RabbitMQ.Client</AssemblyName>
<!-- PackageId defaults to AssemblyName, so need to reset it. -->
<PackageId>$(MSBuildProjectName)</PackageId>
</PropertyGroup>

<ItemGroup>
<Compile Include="..\Aspire.RabbitMQ.Client\AspireRabbitMQExtensions.cs" />
<Compile Include="..\Aspire.RabbitMQ.Client\RabbitMQClientSettings.cs" />
<Compile Include="..\Aspire.RabbitMQ.Client\RabbitMQEventSourceLogForwarder.cs" />
<None Include="..\Aspire.RabbitMQ.Client\README.md" Pack="true" PackagePath="\" />

<Compile Include="..\Common\ConfigurationSchemaAttributes.cs" Link="ConfigurationSchemaAttributes.cs" />
<Compile Include="..\Common\HealthChecksExtensions.cs" Link="HealthChecksExtensions.cs" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="AspNetCore.HealthChecks.Rabbitmq" />
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" />
<PackageReference Include="Polly.Core" />
<PackageReference Include="RabbitMQ.Client" VersionOverride="7.0.0" />
<PackageReference Include="OpenTelemetry.Extensions.Hosting" />
</ItemGroup>

<ItemGroup>
<InternalsVisibleTo Include="Aspire.RabbitMQ.Client.v7.Tests" />
</ItemGroup>

</Project>
354 changes: 354 additions & 0 deletions src/Components/Aspire.RabbitMQ.Client.v7/ConfigurationSchema.json

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#nullable enable
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#nullable enable
Aspire.RabbitMQ.Client.RabbitMQClientSettings
Aspire.RabbitMQ.Client.RabbitMQClientSettings.ConnectionString.get -> string?
Aspire.RabbitMQ.Client.RabbitMQClientSettings.ConnectionString.set -> void
Aspire.RabbitMQ.Client.RabbitMQClientSettings.DisableHealthChecks.get -> bool
Aspire.RabbitMQ.Client.RabbitMQClientSettings.DisableHealthChecks.set -> void
Aspire.RabbitMQ.Client.RabbitMQClientSettings.DisableTracing.get -> bool
Aspire.RabbitMQ.Client.RabbitMQClientSettings.DisableTracing.set -> void
Aspire.RabbitMQ.Client.RabbitMQClientSettings.MaxConnectRetryCount.get -> int
Aspire.RabbitMQ.Client.RabbitMQClientSettings.MaxConnectRetryCount.set -> void
Aspire.RabbitMQ.Client.RabbitMQClientSettings.RabbitMQClientSettings() -> void
Microsoft.Extensions.Hosting.AspireRabbitMQExtensions
static Microsoft.Extensions.Hosting.AspireRabbitMQExtensions.AddKeyedRabbitMQClient(this Microsoft.Extensions.Hosting.IHostApplicationBuilder! builder, string! name, System.Action<Aspire.RabbitMQ.Client.RabbitMQClientSettings!>? configureSettings = null, System.Action<RabbitMQ.Client.ConnectionFactory!>? configureConnectionFactory = null) -> void
static Microsoft.Extensions.Hosting.AspireRabbitMQExtensions.AddRabbitMQClient(this Microsoft.Extensions.Hosting.IHostApplicationBuilder! builder, string! connectionName, System.Action<Aspire.RabbitMQ.Client.RabbitMQClientSettings!>? configureSettings = null, System.Action<RabbitMQ.Client.ConnectionFactory!>? configureConnectionFactory = null) -> void
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@
<PackageTags>$(ComponentDatabasePackageTags) rabbitmq amqp</PackageTags>
<Description>A RabbitMQ client that integrates with Aspire, including health checks, logging, and telemetry.</Description>
<NoWarn>$(NoWarn);SYSLIB1100;SYSLIB1101</NoWarn>
<DefineConstants>$(DefineConstants);RABBITMQ_V6</DefineConstants>
</PropertyGroup>

<PropertyGroup>
@@ -18,11 +19,11 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="AspNetCore.HealthChecks.Rabbitmq" />
<PackageReference Include="AspNetCore.HealthChecks.Rabbitmq.v6" />
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" />
<PackageReference Include="Polly.Core" />
<PackageReference Include="RabbitMQ.Client" />
<PackageReference Include="RabbitMQ.Client" VersionOverride="[6.8.1,7.0.0)" />
<PackageReference Include="OpenTelemetry.Extensions.Hosting" />
</ItemGroup>

64 changes: 50 additions & 14 deletions src/Components/Aspire.RabbitMQ.Client/AspireRabbitMQExtensions.cs
Original file line number Diff line number Diff line change
@@ -124,10 +124,16 @@ IConnectionFactory CreateConnectionFactory(IServiceProvider sp)

if (!settings.DisableTracing)
{
// Note that RabbitMQ.Client v6.6 doesn't have built-in support for tracing. See https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1261

builder.Services.AddOpenTelemetry()
.WithTracing(traceBuilder => traceBuilder.AddSource(ActivitySourceName));
.WithTracing(traceBuilder =>
traceBuilder
.AddSource(ActivitySourceName)
#if RABBITMQ_V6
// Note that RabbitMQ.Client v6.x doesn't have built-in support for tracing. See https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1261
#else
.AddSource("RabbitMQ.Client.*")
#endif
);
}

if (!settings.DisableHealthChecks)
@@ -139,9 +145,14 @@ IConnectionFactory CreateConnectionFactory(IServiceProvider sp)
try
{
// if the IConnection can't be resolved, make a health check that will fail
var connection = serviceKey is null ? sp.GetRequiredService<IConnection>() : sp.GetRequiredKeyedService<IConnection>(serviceKey);
#if RABBITMQ_V6
var options = new RabbitMQHealthCheckOptions();
options.Connection = serviceKey is null ? sp.GetRequiredService<IConnection>() : sp.GetRequiredKeyedService<IConnection>(serviceKey);
options.Connection = connection;
return new RabbitMQHealthCheck(options);
#else
return new RabbitMQHealthCheck(connection);
#endif
}
catch (Exception ex)
{
@@ -181,6 +192,7 @@ private static IConnection CreateConnection(IConnectionFactory factory, int retr
using var activity = s_activitySource.StartActivity("rabbitmq connect", ActivityKind.Client);
AddRabbitMQTags(activity, factory.Uri);

#if RABBITMQ_V6
return resiliencePipeline.Execute(static factory =>
{
using var connectAttemptActivity = s_activitySource.StartActivity("rabbitmq connect attempt", ActivityKind.Client);
@@ -192,19 +204,27 @@ private static IConnection CreateConnection(IConnectionFactory factory, int retr
}
catch (Exception ex)
{
if (connectAttemptActivity is not null)
{
connectAttemptActivity.AddTag("exception.message", ex.Message);
// Note that "exception.stacktrace" is the full exception detail, not just the StackTrace property.
// See https://opentelemetry.io/docs/specs/semconv/attributes-registry/exception/
// and https://github.com/open-telemetry/opentelemetry-specification/pull/697#discussion_r453662519
connectAttemptActivity.AddTag("exception.stacktrace", ex.ToString());
connectAttemptActivity.AddTag("exception.type", ex.GetType().FullName);
connectAttemptActivity.SetStatus(ActivityStatusCode.Error);
}
AddRabbitMQExceptionTags(connectAttemptActivity, ex);
throw;
}
}, factory);
#else
return resiliencePipeline.ExecuteAsync(static async (factory, cancellationToken) =>
{
using var connectAttemptActivity = s_activitySource.StartActivity("rabbitmq connect attempt", ActivityKind.Client);
AddRabbitMQTags(connectAttemptActivity, factory.Uri, "connect");

try
{
return await factory.CreateConnectionAsync(cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
AddRabbitMQExceptionTags(connectAttemptActivity, ex);
throw;
}
}, factory).AsTask().GetAwaiter().GetResult(); // see https://github.com/dotnet/aspire/issues/565
#endif
}

private static void AddRabbitMQTags(Activity? activity, Uri address, string? operation = null)
@@ -222,4 +242,20 @@ private static void AddRabbitMQTags(Activity? activity, Uri address, string? ope
activity.AddTag("messaging.operation", operation);
}
}

private static void AddRabbitMQExceptionTags(Activity? connectAttemptActivity, Exception ex)
{
if (connectAttemptActivity is null)
{
return;
}

connectAttemptActivity.AddTag("exception.message", ex.Message);
// Note that "exception.stacktrace" is the full exception detail, not just the StackTrace property.
// See https://opentelemetry.io/docs/specs/semconv/attributes-registry/exception/
// and https://github.com/open-telemetry/opentelemetry-specification/pull/697#discussion_r453662519
connectAttemptActivity.AddTag("exception.stacktrace", ex.ToString());
connectAttemptActivity.AddTag("exception.type", ex.GetType().FullName);
connectAttemptActivity.SetStatus(ActivityStatusCode.Error);
}
}
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@
<ItemGroup>
<ProjectReference Include="..\..\src\Aspire.Hosting.AppHost\Aspire.Hosting.AppHost.csproj" />
<ProjectReference Include="..\..\src\Aspire.Hosting.RabbitMQ\Aspire.Hosting.RabbitMQ.csproj" />
<ProjectReference Include="..\..\src\Components\Aspire.RabbitMQ.Client\Aspire.RabbitMQ.Client.csproj" />
<ProjectReference Include="..\..\src\Components\Aspire.RabbitMQ.Client.v7\Aspire.RabbitMQ.Client.v7.csproj" />
<ProjectReference Include="..\Aspire.Hosting.Tests\Aspire.Hosting.Tests.csproj" />
</ItemGroup>

27 changes: 14 additions & 13 deletions tests/Aspire.Hosting.RabbitMQ.Tests/RabbitMQFunctionalTests.cs
Original file line number Diff line number Diff line change
@@ -80,17 +80,17 @@ public async Task VerifyRabbitMQResource()

var connection = host.Services.GetRequiredService<IConnection>();

using var channel = connection.CreateModel();
await using var channel = await connection.CreateChannelAsync();
const string queueName = "hello";
channel.QueueDeclare(queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
await channel.QueueDeclareAsync(queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);

const string message = "Hello World!";
var body = Encoding.UTF8.GetBytes(message);

channel.BasicPublish(exchange: string.Empty, routingKey: queueName, basicProperties: null, body: body);
await channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body);

var result = channel.BasicGet(queueName, true);
Assert.Equal(message, Encoding.UTF8.GetString(result.Body.Span));
var result = await channel.BasicGetAsync(queueName, true);
Assert.Equal(message, Encoding.UTF8.GetString(result!.Body.Span));
}

[Theory]
@@ -142,18 +142,19 @@ public async Task WithDataShouldPersistStateBetweenUsages(bool useVolume)

var connection = host.Services.GetRequiredService<IConnection>();

using var channel = connection.CreateModel();
await using var channel = await connection.CreateChannelAsync();
const string queueName = "hello";
channel.QueueDeclare(queueName, durable: true, exclusive: false);
await channel.QueueDeclareAsync(queueName, durable: true, exclusive: false);

const string message = "Hello World!";
var body = Encoding.UTF8.GetBytes(message);

var props = channel.CreateBasicProperties();
var props = new BasicProperties();
props.Persistent = true; // or props.DeliveryMode = 2;
channel.BasicPublish(
await channel.BasicPublishAsync(
exchange: string.Empty,
queueName,
mandatory: true,
props,
body);
}
@@ -199,12 +200,12 @@ public async Task WithDataShouldPersistStateBetweenUsages(bool useVolume)

var connection = host.Services.GetRequiredService<IConnection>();

using var channel = connection.CreateModel();
await using var channel = await connection.CreateChannelAsync();
const string queueName = "hello";
channel.QueueDeclare(queueName, durable: true, exclusive: false);
await channel.QueueDeclareAsync(queueName, durable: true, exclusive: false);

var result = channel.BasicGet(queueName, true);
Assert.Equal("Hello World!", Encoding.UTF8.GetString(result.Body.Span));
var result = await channel.BasicGetAsync(queueName, true);
Assert.Equal("Hello World!", Encoding.UTF8.GetString(result!.Body.Span));
}
}
finally
Original file line number Diff line number Diff line change
@@ -2,13 +2,17 @@

<PropertyGroup>
<TargetFramework>$(DefaultTargetFramework)</TargetFramework>
<DefineConstants>$(DefineConstants);RABBITMQ_V6</DefineConstants>
</PropertyGroup>

<ItemGroup>
<None Include="$(RepoRoot)src\Components\Aspire.RabbitMQ.Client\ConfigurationSchema.json" CopyToOutputDirectory="PreserveNewest" />
<Compile Include="$(RepoRoot)src\Aspire.Hosting.RabbitMQ\RabbitMQContainerImageTags.cs" />

<ProjectReference Include="..\..\src\Components\Aspire.RabbitMQ.Client\Aspire.RabbitMQ.Client.csproj" />
<PackageReference Include="AspNetCore.HealthChecks.Rabbitmq" VersionOverride="[8.0.2,9.0.0)" />
<PackageReference Include="RabbitMQ.Client" VersionOverride="[6.8.1,7.0.0)" />

<ProjectReference Include="..\Aspire.Components.Common.Tests\Aspire.Components.Common.Tests.csproj" />

<PackageReference Include="Testcontainers.RabbitMq" />
Original file line number Diff line number Diff line change
@@ -130,7 +130,7 @@ public void ConnectionFactoryOptionsFromConfig()
"Enabled": true,
"Version": "Tls13"
},
"MaxMessageSize": 304,
"RequestedFrameMax": 304,
"ClientProvidedName": "aspire-app"
}
}
@@ -155,7 +155,7 @@ public void ConnectionFactoryOptionsFromConfig()
Assert.True(connectionFactory.Ssl.Enabled);
Assert.Equal(SslProtocols.Tls13, connectionFactory.Ssl.Version);
Assert.Equal(TimeSpan.FromSeconds(3), connectionFactory.SocketReadTimeout);
Assert.Equal((uint)304, connectionFactory.MaxMessageSize);
Assert.Equal((uint)304, connectionFactory.RequestedFrameMax);
Assert.Equal("aspire-app", connectionFactory.ClientProvidedName);
}

47 changes: 42 additions & 5 deletions tests/Aspire.RabbitMQ.Client.Tests/AspireRabbitMQLoggingTests.cs
Original file line number Diff line number Diff line change
@@ -10,10 +10,15 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using RabbitMQ.Client;
using RabbitMQ.Client.Logging;
using Testcontainers.RabbitMq;
using Xunit;

#if RABBITMQ_V6
using RabbitMQ.Client.Logging;
#else
using System.Reflection;
#endif

namespace Aspire.RabbitMQ.Client.Tests;

public class AspireRabbitMQLoggingTests
@@ -81,15 +86,15 @@ public void TestInfoAndWarn()
host.Services.GetRequiredService<RabbitMQEventSourceLogForwarder>().Start();

var message = "This is an informational message.";
RabbitMqClientEventSource.Log.Info(message);
LogInfo(message);

var logs = logger.Logs.ToArray();
Assert.Single(logs);
Assert.Equal(LogLevel.Information, logs[0].Level);
Assert.Equal(message, logs[0].Message);

var warningMessage = "This is a warning message.";
RabbitMqClientEventSource.Log.Warn(warningMessage);
LogWarn(warningMessage);

logs = logger.Logs.ToArray();
Assert.Equal(2, logs.Length);
@@ -122,7 +127,7 @@ public void TestExceptionWithoutInnerException()

Assert.NotNull(testException);
var logMessage = "This is an error message.";
RabbitMqClientEventSource.Log.Error(logMessage, testException);
LogError(logMessage, testException);

var logs = logger.Logs.ToArray();
Assert.Single(logs);
@@ -168,7 +173,7 @@ public void TestExceptionWithInnerException()

Assert.NotNull(testException);
var logMessage = "This is an error message.";
RabbitMqClientEventSource.Log.Error(logMessage, testException);
LogError(logMessage, testException);

var logs = logger.Logs.ToArray();
Assert.Single(logs);
@@ -191,6 +196,38 @@ public void TestExceptionWithInnerException()
Assert.Equal($"{innerException.GetType()}: {innerException.Message}", errorEvent[3].Value?.ToString());
}

#if !RABBITMQ_V6
private static readonly object s_log =
Type.GetType("RabbitMQ.Client.Logging.RabbitMqClientEventSource, RabbitMQ.Client")!
.GetField("Log", BindingFlags.Static | BindingFlags.Public)!
.GetValue(null)!;
#endif

private static void LogInfo(string message)
{
#if RABBITMQ_V6
RabbitMqClientEventSource.Log.Info(message);
#else
s_log.GetType().GetMethod("Info")!.Invoke(s_log, new object[] { message });
#endif
}
private static void LogWarn(string message)
{
#if RABBITMQ_V6
RabbitMqClientEventSource.Log.Warn(message);
#else
s_log.GetType().GetMethod("Warn")!.Invoke(s_log, new object[] { message });
#endif
}
private static void LogError(string message, Exception ex)
{
#if RABBITMQ_V6
RabbitMqClientEventSource.Log.Error(message, ex);
#else
s_log.GetType().GetMethod("Error", [typeof(string), typeof(Exception)])!.Invoke(s_log, new object[] { message, ex });
#endif
}

private sealed class LoggerProvider(TestLogger logger) : ILoggerProvider
{
public ILogger CreateLogger(string categoryName) => logger;
12 changes: 12 additions & 0 deletions tests/Aspire.RabbitMQ.Client.Tests/ConformanceTests.cs
Original file line number Diff line number Diff line change
@@ -108,13 +108,25 @@ protected override void SetMetrics(RabbitMQClientSettings options, bool enabled)

protected override void TriggerActivity(IConnection service)
{
#if RABBITMQ_V6
var channel = service.CreateModel();
channel.QueueDeclare("test-queue", exclusive: false);
channel.BasicPublish(
exchange: "",
routingKey: "test-queue",
basicProperties: null,
body: "hello world"u8.ToArray());
#else
Task.Run(async () =>
{
using var channel = await service.CreateChannelAsync();
await channel.QueueDeclareAsync("test-queue", exclusive: false);
await channel.BasicPublishAsync(
exchange: "",
routingKey: "test-queue",
body: "hello world"u8.ToArray());
}).Wait();
#endif
}

protected override void SetupConnectionInformationIsDelayValidated()
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>$(DefaultTargetFramework)</TargetFramework>
</PropertyGroup>

<ItemGroup>
<None Include="$(RepoRoot)src\Components\Aspire.RabbitMQ.Client.v7\ConfigurationSchema.json" CopyToOutputDirectory="PreserveNewest" />
<Compile Include="$(RepoRoot)src\Aspire.Hosting.RabbitMQ\RabbitMQContainerImageTags.cs" />

<Compile Include="..\Aspire.RabbitMQ.Client.Tests\AspireRabbitMQExtensionsTests.cs" />
<Compile Include="..\Aspire.RabbitMQ.Client.Tests\AspireRabbitMQLoggingTests.cs" />
<Compile Include="..\Aspire.RabbitMQ.Client.Tests\ConformanceTests.cs" />
<Compile Include="..\Aspire.RabbitMQ.Client.Tests\RabbitMQContainerFixture.cs" />

<ProjectReference Include="..\..\src\Components\Aspire.RabbitMQ.Client.v7\Aspire.RabbitMQ.Client.v7.csproj" />
<ProjectReference Include="..\Aspire.Components.Common.Tests\Aspire.Components.Common.Tests.csproj" />

<PackageReference Include="Testcontainers.RabbitMq" />
</ItemGroup>

</Project>
1 change: 1 addition & 0 deletions tests/Shared/RepoTesting/Directory.Packages.Helix.props
Original file line number Diff line number Diff line change
@@ -71,6 +71,7 @@
<PackageVersion Include="Aspire.Pomelo.EntityFrameworkCore.MySql" Version="$(PackageVersion)" />
<PackageVersion Include="Aspire.Qdrant.Client" Version="$(PackageVersion)" />
<PackageVersion Include="Aspire.RabbitMQ.Client" Version="$(PackageVersion)" />
<PackageVersion Include="Aspire.RabbitMQ.Client.v7" Version="$(PackageVersion)" />
<PackageVersion Include="Aspire.Seq" Version="$(PackageVersion)" />
<PackageVersion Include="Aspire.StackExchange.Redis" Version="$(PackageVersion)" />
<PackageVersion Include="Aspire.StackExchange.Redis.DistributedCaching" Version="$(PackageVersion)" />