Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RabbitMQ.Client v7 client integration package #6770

Merged
merged 11 commits into from
Jan 13, 2025
14 changes: 14 additions & 0 deletions Aspire.sln
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DaprServiceC", "playground\
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Dashboard", "Dashboard", "{830F7CA9-8E51-4D62-832F-91F53F85B7AE}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Aspire.RabbitMQ.Client.v7", "src\Components\Aspire.RabbitMQ.Client.v7\Aspire.RabbitMQ.Client.v7.csproj", "{604EBA18-AA84-A046-0CEF-9E2E61F711B3}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Aspire.RabbitMQ.Client.v7.Tests", "tests\Aspire.RabbitMQ.Client.v7.Tests\Aspire.RabbitMQ.Client.v7.Tests.csproj", "{FEA86BE6-0533-4146-0084-F2940A64A258}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "AzureServiceBus", "AzureServiceBus", "{D2938171-1DBB-4E8D-AF16-97F75F1AE6DE}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ServiceBusWorker", "playground\AzureServiceBus\ServiceBusWorker\ServiceBusWorker.csproj", "{162F0B66-E88F-4735-8CE0-BE8950F74CC6}"
Expand Down Expand Up @@ -1659,6 +1663,14 @@ Global
{B26653B9-439E-4850-A7F8-43C6E5121952}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B26653B9-439E-4850-A7F8-43C6E5121952}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B26653B9-439E-4850-A7F8-43C6E5121952}.Release|Any CPU.Build.0 = Release|Any CPU
{604EBA18-AA84-A046-0CEF-9E2E61F711B3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{604EBA18-AA84-A046-0CEF-9E2E61F711B3}.Debug|Any CPU.Build.0 = Debug|Any CPU
{604EBA18-AA84-A046-0CEF-9E2E61F711B3}.Release|Any CPU.ActiveCfg = Release|Any CPU
{604EBA18-AA84-A046-0CEF-9E2E61F711B3}.Release|Any CPU.Build.0 = Release|Any CPU
{FEA86BE6-0533-4146-0084-F2940A64A258}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{FEA86BE6-0533-4146-0084-F2940A64A258}.Debug|Any CPU.Build.0 = Debug|Any CPU
{FEA86BE6-0533-4146-0084-F2940A64A258}.Release|Any CPU.ActiveCfg = Release|Any CPU
{FEA86BE6-0533-4146-0084-F2940A64A258}.Release|Any CPU.Build.0 = Release|Any CPU
{162F0B66-E88F-4735-8CE0-BE8950F74CC6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{162F0B66-E88F-4735-8CE0-BE8950F74CC6}.Debug|Any CPU.Build.0 = Debug|Any CPU
{162F0B66-E88F-4735-8CE0-BE8950F74CC6}.Release|Any CPU.ActiveCfg = Release|Any CPU
Expand Down Expand Up @@ -1970,6 +1982,8 @@ Global
{042DD8C6-A26C-4B06-80A1-FE7F8659C5BC} = {B7345F72-712F-436C-AE18-CAF7CDD4A990}
{B26653B9-439E-4850-A7F8-43C6E5121952} = {57A42144-739E-49A7-BADB-BB8F5F20FA17}
{830F7CA9-8E51-4D62-832F-91F53F85B7AE} = {4981B3A5-4AFD-4191-BF7D-8692D9783D60}
{604EBA18-AA84-A046-0CEF-9E2E61F711B3} = {27381127-6C45-4B4C-8F18-41FF48DFE4B2}
{FEA86BE6-0533-4146-0084-F2940A64A258} = {C424395C-1235-41A4-BF55-07880A04368C}
{D2938171-1DBB-4E8D-AF16-97F75F1AE6DE} = {D173887B-AF42-4576-B9C1-96B9E9B3D9C0}
{162F0B66-E88F-4735-8CE0-BE8950F74CC6} = {D2938171-1DBB-4E8D-AF16-97F75F1AE6DE}
{A7EC9111-F3CC-46E8-B95E-3768481D67B4} = {D2938171-1DBB-4E8D-AF16-97F75F1AE6DE}
Expand Down
5 changes: 3 additions & 2 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@
<PackageVersion Include="AspNetCore.HealthChecks.MySql" Version="8.0.1" />
<PackageVersion Include="AspNetCore.HealthChecks.NpgSql" Version="8.0.2" />
<PackageVersion Include="AspNetCore.HealthChecks.Oracle" Version="8.0.1" />
<PackageVersion Include="AspNetCore.HealthChecks.Rabbitmq" Version="8.0.2" />
<PackageVersion Include="AspNetCore.HealthChecks.Rabbitmq" Version="9.0.0" />
<PackageVersion Include="AspNetCore.HealthChecks.Rabbitmq.v6" Version="9.0.0" />
<PackageVersion Include="AspNetCore.HealthChecks.Redis" Version="8.0.1" />
<PackageVersion Include="AspNetCore.HealthChecks.SqlServer" Version="8.0.2" />
<PackageVersion Include="AspNetCore.HealthChecks.Uris" Version="8.0.1" />
Expand Down Expand Up @@ -103,7 +104,7 @@
<PackageVersion Include="Polly.Extensions" Version="8.5.0" />
<PackageVersion Include="Pomelo.EntityFrameworkCore.MySql" Version="8.0.2" />
<PackageVersion Include="Qdrant.Client" Version="1.12.0" />
<PackageVersion Include="RabbitMQ.Client" Version="[6.8.1,7.0.0)" />
<PackageVersion Include="RabbitMQ.Client" Version="7.0.0" />
<PackageVersion Include="StackExchange.Redis" Version="2.8.22" />
<PackageVersion Include="System.IO.Hashing" Version="8.0.0" />
<PackageVersion Include="Yarp.ReverseProxy" Version="2.2.0" />
Expand Down
11 changes: 6 additions & 5 deletions playground/TestShop/BasketService/BasketService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,15 @@ public override async Task<CheckoutCustomerBasketResponse> CheckoutBasket(Checko
return new();
}

using var channel = _messageConnection.CreateModel();
channel.QueueDeclare(queueName, durable: true, exclusive: false);
using var channel = await _messageConnection.CreateChannelAsync();
await channel.QueueDeclareAsync(queueName, durable: true, exclusive: false);

var props = channel.CreateBasicProperties();
var props = new BasicProperties();
props.Persistent = true; // or props.DeliveryMode = 2;
channel.BasicPublish(
exchange: "",
await channel.BasicPublishAsync(
exchange: string.Empty,
routingKey: queueName,
mandatory: true,
basicProperties: props,
body: JsonSerializer.SerializeToUtf8Bytes(order));
}
Expand Down
2 changes: 1 addition & 1 deletion playground/TestShop/BasketService/BasketService.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

<ItemGroup>
<AspireProjectOrPackageReference Include="Aspire.StackExchange.Redis" />
<AspireProjectOrPackageReference Include="Aspire.RabbitMQ.Client" />
<AspireProjectOrPackageReference Include="Aspire.RabbitMQ.Client.v7" />
<ProjectReference Include="..\TestShop.ServiceDefaults\TestShop.ServiceDefaults.csproj" />
</ItemGroup>

Expand Down
23 changes: 13 additions & 10 deletions playground/TestShop/OrderProcessor/OrderProcessingWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public class OrderProcessingWorker : BackgroundService
private readonly IConfiguration _config;
private readonly IServiceProvider _serviceProvider;
private IConnection? _messageConnection;
private IModel? _messageChannel;
private IChannel? _messageChannel;

public OrderProcessingWorker(ILogger<OrderProcessingWorker> logger, IConfiguration config, IServiceProvider serviceProvider)
{
Expand All @@ -23,22 +23,23 @@ public OrderProcessingWorker(ILogger<OrderProcessingWorker> logger, IConfigurati

protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
return Task.Factory.StartNew(() =>
return Task.Factory.StartNew(async () =>
{
const string configKeyName = "Aspire:RabbitMQ:Client:OrderQueueName";
string queueName = _config[configKeyName] ?? "orders";

_messageConnection = _serviceProvider.GetRequiredService<IConnection>();

_messageChannel = _messageConnection.CreateModel();
_messageChannel.QueueDeclare(queueName, durable: true, exclusive: false);
_messageChannel = await _messageConnection.CreateChannelAsync();
await _messageChannel.QueueDeclareAsync(queueName, durable: true, exclusive: false);

var consumer = new EventingBasicConsumer(_messageChannel);
consumer.Received += ProcessMessageAsync;
var consumer = new AsyncEventingBasicConsumer(_messageChannel);
consumer.ReceivedAsync += ProcessMessageAsync;

_messageChannel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);
await _messageChannel.BasicConsumeAsync(
queueName,
autoAck: true,
consumer);
}, stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Current);
}

Expand All @@ -49,7 +50,7 @@ public override async Task StopAsync(CancellationToken cancellationToken)
_messageChannel?.Dispose();
}

private void ProcessMessageAsync(object? sender, BasicDeliverEventArgs args)
private Task ProcessMessageAsync(object? sender, BasicDeliverEventArgs args)
{
_logger.LogInformation($"Processing Order at: {DateTime.UtcNow}");

Expand All @@ -72,5 +73,7 @@ private void ProcessMessageAsync(object? sender, BasicDeliverEventArgs args)
BuyerId:{BuyerId}
ProductCount:{Count}
""", order.Id, order.BuyerId, order.Items.Count);

return Task.CompletedTask;
}
}
2 changes: 1 addition & 1 deletion playground/TestShop/OrderProcessor/OrderProcessor.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
</ItemGroup>

<ItemGroup>
<AspireProjectOrPackageReference Include="Aspire.RabbitMQ.Client" />
<AspireProjectOrPackageReference Include="Aspire.RabbitMQ.Client.v7" />
<ProjectReference Include="..\TestShop.ServiceDefaults\TestShop.ServiceDefaults.csproj" />
</ItemGroup>
</Project>
1 change: 1 addition & 0 deletions playground/TestShop/TestShop.AppHost/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

var messaging = builder.AddRabbitMQ("messaging")
.WithDataVolume()
.WithLifetime(ContainerLifetime.Persistent)
.WithManagementPlugin()
.PublishAsContainer();

Expand Down
21 changes: 16 additions & 5 deletions src/Aspire.Hosting.RabbitMQ/RabbitMQBuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Aspire.Hosting.RabbitMQ;
using Aspire.Hosting.Utils;
using Microsoft.Extensions.DependencyInjection;
using RabbitMQ.Client;

namespace Aspire.Hosting;

Expand Down Expand Up @@ -52,12 +53,22 @@ public static IResourceBuilder<RabbitMQServerResource> AddRabbitMQ(this IDistrib
});

var healthCheckKey = $"{name}_check";
builder.Services.AddHealthChecks().AddRabbitMQ((sp, options) =>
// cache the connection so it is reused on subsequent calls to the health check
IConnection? connection = null;
builder.Services.AddHealthChecks().AddRabbitMQ(async (sp) =>
{
// NOTE: This specific callback signature needs to be used to ensure
// that execution of this setup callback is deferred until after
// the container is build & started.
options.ConnectionUri = new Uri(connectionString!);
// NOTE: Ensure that execution of this setup callback is deferred until after
// the container is built & started.
return connection ??= await CreateConnection(connectionString!).ConfigureAwait(false);

static Task<IConnection> CreateConnection(string connectionString)
{
var factory = new ConnectionFactory
{
Uri = new Uri(connectionString)
};
return factory.CreateConnectionAsync();
}
}, healthCheckKey);

var rabbitmq = builder.AddResource(rabbitMq)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>$(DefaultTargetFramework)</TargetFramework>
<IsPackable>true</IsPackable>
<PackageTags>$(ComponentDatabasePackageTags) rabbitmq amqp</PackageTags>
<Description>A RabbitMQ client (version 7+) that integrates with Aspire, including health checks, logging, and telemetry.</Description>
<MinCodeCoverage>80</MinCodeCoverage>
<NoWarn>$(NoWarn);SYSLIB1100;SYSLIB1101</NoWarn>
<!-- Disable package validation as this package hasn't shipped yet. -->
<EnablePackageValidation>false</EnablePackageValidation>

<!-- Keep the same assembly name as the main library. -->
<AssemblyName>Aspire.RabbitMQ.Client</AssemblyName>
<!-- PackageId defaults to AssemblyName, so need to reset it. -->
<PackageId>$(MSBuildProjectName)</PackageId>
</PropertyGroup>

<ItemGroup>
<Compile Include="..\Aspire.RabbitMQ.Client\AspireRabbitMQExtensions.cs" />
<Compile Include="..\Aspire.RabbitMQ.Client\RabbitMQClientSettings.cs" />
<Compile Include="..\Aspire.RabbitMQ.Client\RabbitMQEventSourceLogForwarder.cs" />
<None Include="..\Aspire.RabbitMQ.Client\README.md" Pack="true" PackagePath="\" />

<Compile Include="..\Common\ConfigurationSchemaAttributes.cs" Link="ConfigurationSchemaAttributes.cs" />
<Compile Include="..\Common\HealthChecksExtensions.cs" Link="HealthChecksExtensions.cs" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="AspNetCore.HealthChecks.Rabbitmq" />
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" />
<PackageReference Include="Polly.Core" />
<PackageReference Include="RabbitMQ.Client" VersionOverride="7.0.0" />
<PackageReference Include="OpenTelemetry.Extensions.Hosting" />
</ItemGroup>

<ItemGroup>
<InternalsVisibleTo Include="Aspire.RabbitMQ.Client.v7.Tests" />
</ItemGroup>

</Project>
Loading
Loading