Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
<CentralPackageTransitivePinningEnabled>true</CentralPackageTransitivePinningEnabled>
</PropertyGroup>
<ItemGroup>
<PackageVersion Include="AWSSDK.SQS" Version="4.0.0.4" />
<PackageVersion Include="AWSSDK.SimpleNotificationService" Version="4.0.0.4" />
<PackageVersion Include="AWSSDK.SQS" Version="4.0.0.6" />
<PackageVersion Include="AWSSDK.SimpleNotificationService" Version="4.0.0.5" />
<PackageVersion Include="Azure.Identity" Version="1.14.0" />
<PackageVersion Include="Azure.Messaging.EventGrid" Version="4.31.0" />
<PackageVersion Include="Azure.Storage.Queues" Version="12.22.0" />
<PackageVersion Include="Codebelt.Extensions.Newtonsoft.Json" Version="9.0.3" />
<PackageVersion Include="Codebelt.Extensions.Xunit.App" Version="10.0.1" />
<PackageVersion Include="Codebelt.Extensions.Xunit.App" Version="10.0.2" />
<PackageVersion Include="Codebelt.Extensions.YamlDotNet" Version="9.0.3" />
<PackageVersion Include="Cuemon.Extensions.Collections.Generic" Version="9.0.5" />
<PackageVersion Include="Cuemon.Extensions.Core" Version="9.0.5" />
Expand All @@ -22,17 +22,18 @@
<PackageVersion Include="Dapper.StrongName" Version="2.1.66" />
<PackageVersion Include="DapperExtensions.StrongNameReference" Version="1.10.0" />
<PackageVersion Include="Microsoft.Data.Sqlite" Version="9.0.5" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.14.0" />
<PackageVersion Include="Microsoft.TestPlatform.ObjectModel" Version="17.14.0" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.14.1" />
<PackageVersion Include="Microsoft.TestPlatform.ObjectModel" Version="17.14.1" />
<PackageVersion Include="MinVer" Version="6.0.0" />
<PackageVersion Include="RabbitMQ.Client" Version="7.1.2" />
<PackageVersion Include="System.Linq.Async" Version="6.0.1" />
<PackageVersion Include="System.Text.RegularExpressions" Version="4.3.1" />
<PackageVersion Include="Xunit.Priority" Version="1.1.6" />
<PackageVersion Include="coverlet.collector" Version="6.0.4" />
<PackageVersion Include="coverlet.msbuild" Version="6.0.4" />
<PackageVersion Include="xunit" Version="2.9.3" />
<PackageVersion Include="xunit.runner.console" Version="2.9.3" />
<PackageVersion Include="xunit.runner.visualstudio" Version="3.1.0" />
<PackageVersion Include="xunit.runner.visualstudio" Version="3.1.1" />
</ItemGroup>
<ItemGroup Condition="$(TargetFramework.StartsWith('net9'))">
<PackageVersion Include="Microsoft.EntityFrameworkCore" Version="9.0.5" />
Expand Down
14 changes: 14 additions & 0 deletions Savvyio.sln
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Savvyio.Extensions.QueueSto
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Savvyio.Extensions.DependencyInjection.QueueStorage", "src\Savvyio.Extensions.DependencyInjection.QueueStorage\Savvyio.Extensions.DependencyInjection.QueueStorage.csproj", "{36520BC5-4343-4CF3-A8AE-B7A04B91DDB2}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Savvyio.Extensions.RabbitMQ", "src\Savvyio.Extensions.RabbitMQ\Savvyio.Extensions.RabbitMQ.csproj", "{0AB0316F-76C4-413F-8C96-DB3489E318E6}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Savvyio.Extensions.RabbitMQ.FunctionalTests", "test\Savvyio.Extensions.RabbitMQ.FunctionalTests\Savvyio.Extensions.RabbitMQ.FunctionalTests.csproj", "{C15021DF-E983-4B40-8F65-16A8966429AF}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -349,6 +353,14 @@ Global
{36520BC5-4343-4CF3-A8AE-B7A04B91DDB2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{36520BC5-4343-4CF3-A8AE-B7A04B91DDB2}.Release|Any CPU.ActiveCfg = Release|Any CPU
{36520BC5-4343-4CF3-A8AE-B7A04B91DDB2}.Release|Any CPU.Build.0 = Release|Any CPU
{0AB0316F-76C4-413F-8C96-DB3489E318E6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{0AB0316F-76C4-413F-8C96-DB3489E318E6}.Debug|Any CPU.Build.0 = Debug|Any CPU
{0AB0316F-76C4-413F-8C96-DB3489E318E6}.Release|Any CPU.ActiveCfg = Release|Any CPU
{0AB0316F-76C4-413F-8C96-DB3489E318E6}.Release|Any CPU.Build.0 = Release|Any CPU
{C15021DF-E983-4B40-8F65-16A8966429AF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C15021DF-E983-4B40-8F65-16A8966429AF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C15021DF-E983-4B40-8F65-16A8966429AF}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C15021DF-E983-4B40-8F65-16A8966429AF}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -410,6 +422,8 @@ Global
{04804591-413B-4199-90E6-723097091DF9} = {407762FB-26D8-435F-AE16-C76791EC9FEC}
{0F265E4A-04AD-48F0-B309-22492822D91C} = {407762FB-26D8-435F-AE16-C76791EC9FEC}
{36520BC5-4343-4CF3-A8AE-B7A04B91DDB2} = {0E75F0C5-DBEB-4B6D-AC52-98656CA79A7D}
{0AB0316F-76C4-413F-8C96-DB3489E318E6} = {0E75F0C5-DBEB-4B6D-AC52-98656CA79A7D}
{C15021DF-E983-4B40-8F65-16A8966429AF} = {407762FB-26D8-435F-AE16-C76791EC9FEC}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {D8DDAD51-08E6-42B5-970B-2DC88D44297B}
Expand Down
121 changes: 121 additions & 0 deletions src/Savvyio.Extensions.RabbitMQ/Commands/RabbitMqCommandQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
using Cuemon;
using Cuemon.Extensions;
using Cuemon.Extensions.IO;
using Cuemon.Extensions.Reflection;
using Cuemon.Threading;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Savvyio.Commands;
using Savvyio.Messaging;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace Savvyio.Extensions.RabbitMQ.Commands
{
/// <summary>
/// Represents a RabbitMQ-based implementation of a point-to-point command queue.
/// </summary>
public class RabbitMqCommandQueue : RabbitMqMessage, IPointToPointChannel<ICommand>
{
private readonly RabbitMqCommandQueueOptions _options;

/// <summary>
/// Initializes a new instance of the <see cref="RabbitMqCommandQueue"/> class.
/// </summary>
/// <param name="marshaller">The marshaller used for serializing and deserializing messages.</param>
/// <param name="options">The options used to configure the RabbitMQ command queue.</param>
/// <exception cref="ArgumentNullException">
/// <paramref name="marshaller"/> cannot be null -or-
/// <paramref name="options"/> cannot be null.
/// </exception>
/// <exception cref="ArgumentException">
/// <paramref name="options"/> are not in a valid state.
/// </exception>
public RabbitMqCommandQueue(IMarshaller marshaller, RabbitMqCommandQueueOptions options) : base(marshaller, options)
{
Validator.ThrowIfInvalidOptions(options);
_options = options;
}

/// <summary>
/// Sends the specified command messages asynchronously to the configured RabbitMQ queue.
/// </summary>
/// <param name="messages">The messages to send.</param>
/// <param name="setup">The <see cref="AsyncOptions"/> which may be configured.</param>
/// <returns>A <see cref="Task"/> that represents the asynchronous operation.</returns>
public async Task SendAsync(IEnumerable<IMessage<ICommand>> messages, Action<AsyncOptions> setup = null)
{
Validator.ThrowIfInvalidConfigurator(setup, out var options);

await EnsureConnectivityAsync(options.CancellationToken).ConfigureAwait(false);

await RabbitMqChannel.QueueDeclareAsync(_options.QueueName, false, false, false, cancellationToken: options.CancellationToken).ConfigureAwait(false);

foreach (var message in messages)
{
await RabbitMqChannel.BasicPublishAsync("", _options.QueueName, true, basicProperties: new BasicProperties()
{
//Persistent = true,
Headers = new Dictionary<string, object>()
{
{ MessageType, message.GetType().ToFullNameIncludingAssemblyName() }
}
}, body: await Marshaller.Serialize(message).ToByteArrayAsync(o => o.CancellationToken = options.CancellationToken).ConfigureAwait(false), options.CancellationToken).ConfigureAwait(false);
}
}

/// <summary>
/// Receives command messages asynchronously from the configured RabbitMQ queue.
/// </summary>
/// <param name="setup">The <see cref="AsyncOptions"/> which may be configured.</param>
/// <returns>
/// An <see cref="IAsyncEnumerable{T}"/> that yields <see cref="IMessage{ICommand}"/> instances as they are received.
/// </returns>
public async IAsyncEnumerable<IMessage<ICommand>> ReceiveAsync(Action<AsyncOptions> setup = null)
{
Validator.ThrowIfInvalidConfigurator(setup, out var options);

await EnsureConnectivityAsync(options.CancellationToken).ConfigureAwait(false);

await RabbitMqChannel.QueueDeclareAsync(_options.QueueName, false, false, false, cancellationToken: options.CancellationToken).ConfigureAwait(false);

var buffer = Channel.CreateUnbounded<IMessage<ICommand>>();
var consumer = new AsyncEventingBasicConsumer(RabbitMqChannel);
consumer.ReceivedAsync += async (_, e) =>
{
var messageType = Type.GetType((e.BasicProperties.Headers[MessageType] as byte[]).ToEncodedString());
var deserialized = Marshaller.Deserialize(e.Body.ToArray().ToStream(), messageType) as IMessage<ICommand>;
deserialized!.Properties.Add(nameof(BasicDeliverEventArgs.DeliveryTag), e.DeliveryTag);
deserialized.Properties.Add(nameof(CancellationToken), options.CancellationToken);
deserialized.Properties.Add(nameof(QueueDeclareOk.QueueName), _options.QueueName);
await buffer.Writer.WriteAsync(deserialized, options.CancellationToken).ConfigureAwait(false);
};

await RabbitMqChannel.BasicConsumeAsync(_options.QueueName, autoAck: false, consumer: consumer, cancellationToken: options.CancellationToken).ConfigureAwait(false);

while (await buffer.Reader.WaitToReadAsync(options.CancellationToken).ConfigureAwait(false))
{
while (buffer.Reader.TryRead(out var message))
{
message.Acknowledged += OnMessageAcknowledgedAsync;
if (_options.AutoAcknowledge)
{
await message.AcknowledgeAsync().ConfigureAwait(false);
}
yield return message;
}
}
}

private async Task OnMessageAcknowledgedAsync(object sender, AcknowledgedEventArgs e)
{
var ct = (CancellationToken)e.Properties[nameof(CancellationToken)];
var queueName = e.Properties[nameof(QueueDeclareOk.QueueName)] as string;
await RabbitMqChannel.QueueDeclareAsync(queueName, false, false, false, cancellationToken: ct).ConfigureAwait(false);
await RabbitMqChannel.BasicAckAsync((ulong)e.Properties[nameof(BasicDeliverEventArgs.DeliveryTag)], false, ct).ConfigureAwait(false);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
using System;
using Cuemon;

namespace Savvyio.Extensions.RabbitMQ.Commands
{
/// <summary>
/// Configuration options for <see cref="RabbitMqCommandQueue"/>.
/// </summary>
/// <seealso cref="RabbitMqMessageOptions"/>
public class RabbitMqCommandQueueOptions : RabbitMqMessageOptions
{
/// <summary>
/// Initializes a new instance of the <see cref="RabbitMqCommandQueueOptions"/> class with default values.
/// </summary>
/// <remarks>
/// The following table shows the initial property values for an instance of <see cref="RabbitMqCommandQueueOptions"/>.
/// <list type="table">
/// <listheader>
/// <term>Property</term>
/// <description>Initial Value</description>
/// </listheader>
/// <item>
/// <term><see cref="QueueName"/></term>
/// <description><c>null</c></description>
/// </item>
/// <item>
/// <term><see cref="AutoAcknowledge"/></term>
/// <description><c>false</c></description>
/// </item>
/// </list>
/// </remarks>
public RabbitMqCommandQueueOptions()
{
}

/// <summary>
/// Gets or sets the name of the queue.
/// </summary>
/// <value>
/// The name of the RabbitMQ queue to be used for command messages.
/// </value>
public string QueueName { get; set; }

/// <summary>
/// Gets or sets a value indicating whether messages should be automatically acknowledged.
/// </summary>
/// <value>
/// <c>true</c> if messages are automatically acknowledged; otherwise, <c>false</c>.
/// </value>
public bool AutoAcknowledge { get; set; }

/// <summary>
/// Determines whether the public read-write properties of this instance are in a valid state.
/// </summary>
/// <remarks>This method is expected to throw exceptions when one or more conditions fails to be in a valid state.</remarks>
/// <exception cref="InvalidOperationException">
/// <see cref="QueueName"/> cannot be null or empty.
/// </exception>
public override void ValidateOptions()
{
Validator.ThrowIfInvalidState(string.IsNullOrEmpty(QueueName));
base.ValidateOptions();
}
}
}
108 changes: 108 additions & 0 deletions src/Savvyio.Extensions.RabbitMQ/EventDriven/RabbitMqEventBus.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
using Cuemon;
using Cuemon.Extensions;
using Cuemon.Extensions.IO;
using Cuemon.Extensions.Reflection;
using Cuemon.Threading;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Savvyio.EventDriven;
using Savvyio.Messaging;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace Savvyio.Extensions.RabbitMQ.EventDriven
{
/// <summary>
/// Represents a RabbitMQ-based implementation of a publish-subscribe event bus for integration events.
/// </summary>
public class RabbitMqEventBus : RabbitMqMessage, IPublishSubscribeChannel<IIntegrationEvent>
{
private readonly ConnectionFactory _factory;

Check warning on line 23 in src/Savvyio.Extensions.RabbitMQ/EventDriven/RabbitMqEventBus.cs

View workflow job for this annotation

GitHub Actions / call-build (Release) / 🛠️ Build

The field 'RabbitMqEventBus._factory' is never used

Check warning on line 23 in src/Savvyio.Extensions.RabbitMQ/EventDriven/RabbitMqEventBus.cs

View workflow job for this annotation

GitHub Actions / call-build (Release) / 🛠️ Build

The field 'RabbitMqEventBus._factory' is never used

Check warning on line 23 in src/Savvyio.Extensions.RabbitMQ/EventDriven/RabbitMqEventBus.cs

View workflow job for this annotation

GitHub Actions / call-build (Release) / 🛠️ Build

The field 'RabbitMqEventBus._factory' is never used

Check warning on line 23 in src/Savvyio.Extensions.RabbitMQ/EventDriven/RabbitMqEventBus.cs

View workflow job for this annotation

GitHub Actions / call-build (Release) / 🛠️ Build

The field 'RabbitMqEventBus._factory' is never used

Check warning on line 23 in src/Savvyio.Extensions.RabbitMQ/EventDriven/RabbitMqEventBus.cs

View workflow job for this annotation

GitHub Actions / call-build (Debug) / 🛠️ Build

The field 'RabbitMqEventBus._factory' is never used

Check warning on line 23 in src/Savvyio.Extensions.RabbitMQ/EventDriven/RabbitMqEventBus.cs

View workflow job for this annotation

GitHub Actions / call-build (Debug) / 🛠️ Build

The field 'RabbitMqEventBus._factory' is never used

Check warning on line 23 in src/Savvyio.Extensions.RabbitMQ/EventDriven/RabbitMqEventBus.cs

View workflow job for this annotation

GitHub Actions / call-build (Debug) / 🛠️ Build

The field 'RabbitMqEventBus._factory' is never used

Check warning on line 23 in src/Savvyio.Extensions.RabbitMQ/EventDriven/RabbitMqEventBus.cs

View workflow job for this annotation

GitHub Actions / call-build (Debug) / 🛠️ Build

The field 'RabbitMqEventBus._factory' is never used

Check warning on line 23 in src/Savvyio.Extensions.RabbitMQ/EventDriven/RabbitMqEventBus.cs

View workflow job for this annotation

GitHub Actions / call-test (ubuntu-24.04, Debug, test/Savvyio.Extensions.RabbitMQ.FunctionalTests/Savvyio.Extensio... / 🧪 Test

The field 'RabbitMqEventBus._factory' is never used

Check warning on line 23 in src/Savvyio.Extensions.RabbitMQ/EventDriven/RabbitMqEventBus.cs

View workflow job for this annotation

GitHub Actions / call-test (ubuntu-24.04, Debug, test/Savvyio.Extensions.RabbitMQ.FunctionalTests/Savvyio.Extensio... / 🧪 Test

The field 'RabbitMqEventBus._factory' is never used

Check warning on line 23 in src/Savvyio.Extensions.RabbitMQ/EventDriven/RabbitMqEventBus.cs

View workflow job for this annotation

GitHub Actions / call-test (windows-2022, Release, test/Savvyio.Extensions.RabbitMQ.FunctionalTests/Savvyio.Extens... / 🧪 Test

The field 'RabbitMqEventBus._factory' is never used

Check warning on line 23 in src/Savvyio.Extensions.RabbitMQ/EventDriven/RabbitMqEventBus.cs

View workflow job for this annotation

GitHub Actions / call-test (windows-2022, Release, test/Savvyio.Extensions.RabbitMQ.FunctionalTests/Savvyio.Extens... / 🧪 Test

The field 'RabbitMqEventBus._factory' is never used

Check warning on line 23 in src/Savvyio.Extensions.RabbitMQ/EventDriven/RabbitMqEventBus.cs

View workflow job for this annotation

GitHub Actions / call-test (windows-2022, Debug, test/Savvyio.Extensions.RabbitMQ.FunctionalTests/Savvyio.Extensio... / 🧪 Test

The field 'RabbitMqEventBus._factory' is never used

Check warning on line 23 in src/Savvyio.Extensions.RabbitMQ/EventDriven/RabbitMqEventBus.cs

View workflow job for this annotation

GitHub Actions / call-test (windows-2022, Debug, test/Savvyio.Extensions.RabbitMQ.FunctionalTests/Savvyio.Extensio... / 🧪 Test

The field 'RabbitMqEventBus._factory' is never used

Check warning on line 23 in src/Savvyio.Extensions.RabbitMQ/EventDriven/RabbitMqEventBus.cs

View workflow job for this annotation

GitHub Actions / call-test (ubuntu-24.04, Release, test/Savvyio.Extensions.RabbitMQ.FunctionalTests/Savvyio.Extens... / 🧪 Test

The field 'RabbitMqEventBus._factory' is never used

Check warning on line 23 in src/Savvyio.Extensions.RabbitMQ/EventDriven/RabbitMqEventBus.cs

View workflow job for this annotation

GitHub Actions / call-test (ubuntu-24.04, Release, test/Savvyio.Extensions.RabbitMQ.FunctionalTests/Savvyio.Extens... / 🧪 Test

The field 'RabbitMqEventBus._factory' is never used
private readonly RabbitMqEventBusOptions _options;

/// <summary>
/// Initializes a new instance of the <see cref="RabbitMqEventBus"/> class.
/// </summary>
/// <param name="marshaller">The marshaller used for serializing and deserializing messages.</param>
/// <param name="options">The options used to configure the RabbitMQ event bus.</param>
/// <exception cref="ArgumentNullException">
/// <paramref name="marshaller"/> cannot be null -or-
/// <paramref name="options"/> cannot be null.
/// </exception>
/// <exception cref="ArgumentException">
/// <paramref name="options"/> are not in a valid state.
/// </exception>
public RabbitMqEventBus(IMarshaller marshaller, RabbitMqEventBusOptions options) : base(marshaller, options)
{
Validator.ThrowIfInvalidOptions(options);
_options = options;
}

/// <summary>
/// Publishes the specified integration event message asynchronously to the configured RabbitMQ exchange.
/// </summary>
/// <param name="message">The message to publish.</param>
/// <param name="setup">The <see cref="AsyncOptions"/> which may be configured.</param>
/// <returns>A <see cref="Task"/> that represents the asynchronous operation.</returns>
public async Task PublishAsync(IMessage<IIntegrationEvent> message, Action<AsyncOptions> setup = null)
{
Validator.ThrowIfInvalidConfigurator(setup, out var options);

await EnsureConnectivityAsync(options.CancellationToken).ConfigureAwait(false);

await RabbitMqChannel.ExchangeDeclareAsync(_options.ExchangeName, ExchangeType.Fanout, cancellationToken: options.CancellationToken).ConfigureAwait(false);

await RabbitMqChannel.BasicPublishAsync(_options.ExchangeName, "", false, basicProperties: new BasicProperties()
{
Headers = new Dictionary<string, object>()
{
{ MessageType, message.GetType().ToFullNameIncludingAssemblyName() }
}
}, body: await Marshaller.Serialize(message).ToByteArrayAsync(o => o.CancellationToken = options.CancellationToken).ConfigureAwait(false), options.CancellationToken).ConfigureAwait(false);
}

/// <summary>
/// Subscribes to integration event messages from the configured RabbitMQ exchange and invokes the specified asynchronous handler for each received message.
/// </summary>
/// <param name="asyncHandler">The function delegate that will handle the message.</param>
/// <param name="setup">The <see cref="SubscribeAsyncOptions"/> which may be configured.</param>
/// <returns>A <see cref="Task"/> that represents the asynchronous operation.</returns>
public async Task SubscribeAsync(Func<IMessage<IIntegrationEvent>, CancellationToken, Task> asyncHandler, Action<SubscribeAsyncOptions> setup = null)
{
Validator.ThrowIfInvalidConfigurator(setup, out var options);

await EnsureConnectivityAsync(options.CancellationToken).ConfigureAwait(false);

await RabbitMqChannel.ExchangeDeclareAsync(_options.ExchangeName, ExchangeType.Fanout, cancellationToken: options.CancellationToken).ConfigureAwait(false);

var queue = await RabbitMqChannel.QueueDeclareAsync(cancellationToken: options.CancellationToken).ConfigureAwait(false);

await RabbitMqChannel.QueueBindAsync(queue.QueueName, _options.ExchangeName, "", cancellationToken: options.CancellationToken).ConfigureAwait(false);

var buffer = Channel.CreateUnbounded<IMessage<IIntegrationEvent>>();
var consumer = new AsyncEventingBasicConsumer(RabbitMqChannel);
consumer.ReceivedAsync += async (_, e) =>
{
var messageType = Type.GetType((e.BasicProperties.Headers[MessageType] as byte[]).ToEncodedString());
var deserialized = Marshaller.Deserialize(e.Body.ToArray().ToStream(), messageType) as IMessage<IIntegrationEvent>;
deserialized!.Properties.Add(nameof(BasicDeliverEventArgs.DeliveryTag), e.DeliveryTag);
deserialized.Properties.Add(nameof(CancellationToken), options.CancellationToken);
deserialized.Properties.Add(nameof(QueueDeclareOk.QueueName), queue.QueueName);
await buffer.Writer.WriteAsync(deserialized, options.CancellationToken).ConfigureAwait(false);
};

await RabbitMqChannel.BasicConsumeAsync(queue.QueueName, autoAck: true, consumer: consumer, options.CancellationToken).ConfigureAwait(false);

while (await buffer.Reader.WaitToReadAsync(options.CancellationToken).ConfigureAwait(false))
{
while (buffer.Reader.TryRead(out var message))
{
await asyncHandler(message, options.CancellationToken).ConfigureAwait(false);
}
}
}
}
}
Loading
Loading