From 73064d2843d7db03518349ab2662bcaa2b78a86f Mon Sep 17 00:00:00 2001 From: Joakim Andersson <joakim.andersson@regalrexnord.com> Date: Fri, 21 Feb 2025 16:35:56 +0100 Subject: [PATCH] fix: properly cancel token passed to AsyncEventingBasicConsumer receiver event on channel close --- .../ConsumerDispatching/AsyncConsumerDispatcher.cs | 6 ++++-- projects/RabbitMQ.Client/IChannel.cs | 2 ++ projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs | 1 + projects/RabbitMQ.Client/Impl/Channel.cs | 5 +++++ 4 files changed, 12 insertions(+), 2 deletions(-) diff --git a/projects/RabbitMQ.Client/ConsumerDispatching/AsyncConsumerDispatcher.cs b/projects/RabbitMQ.Client/ConsumerDispatching/AsyncConsumerDispatcher.cs index d9958fae0..4000ba6a5 100644 --- a/projects/RabbitMQ.Client/ConsumerDispatching/AsyncConsumerDispatcher.cs +++ b/projects/RabbitMQ.Client/ConsumerDispatching/AsyncConsumerDispatcher.cs @@ -33,8 +33,10 @@ protected override async Task ProcessChannelAsync() work.DeliveryTag, work.BasicProperties!, work.Body.Size)) { await work.Consumer.HandleBasicDeliverAsync( - work.ConsumerTag!, work.DeliveryTag, work.Redelivered, - work.Exchange!, work.RoutingKey!, work.BasicProperties!, work.Body.Memory) + work.ConsumerTag!, work.DeliveryTag, work.Redelivered, + work.Exchange!, work.RoutingKey!, work.BasicProperties!, + work.Body.Memory, + work.Consumer.Channel?.ShutdownCts.Token ?? default) .ConfigureAwait(false); } break; diff --git a/projects/RabbitMQ.Client/IChannel.cs b/projects/RabbitMQ.Client/IChannel.cs index b3d4cfb74..abfd869cf 100644 --- a/projects/RabbitMQ.Client/IChannel.cs +++ b/projects/RabbitMQ.Client/IChannel.cs @@ -57,6 +57,8 @@ public interface IChannel : IAsyncDisposable, IDisposable /// or the cause of its closure otherwise. /// </summary> ShutdownEventArgs? CloseReason { get; } + + CancellationTokenSource ShutdownCts { get; } /// <summary>Signalled when an unexpected message is delivered.</summary> /// diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs index 8cfb9553e..d6a6013e2 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs @@ -136,6 +136,7 @@ public IEnumerable<string> ConsumerTags public int ChannelNumber => InnerChannel.ChannelNumber; public ShutdownEventArgs? CloseReason => InnerChannel.CloseReason; + public CancellationTokenSource ShutdownCts => InnerChannel.ShutdownCts; public IAsyncBasicConsumer? DefaultConsumer { diff --git a/projects/RabbitMQ.Client/Impl/Channel.cs b/projects/RabbitMQ.Client/Impl/Channel.cs index 8d373cea2..aac925511 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.cs @@ -66,6 +66,9 @@ internal partial class Channel : IChannel, IRecoverable private bool _disposed; private int _isDisposing; + private CancellationTokenSource _shutdownCts = new CancellationTokenSource(); + public CancellationTokenSource ShutdownCts => _shutdownCts; + public Channel(ISession session, CreateChannelOptions createChannelOptions) { ContinuationTimeout = createChannelOptions.ContinuationTimeout; @@ -208,6 +211,8 @@ public Task CloseAsync(ushort replyCode, string replyText, bool abort, public async Task CloseAsync(ShutdownEventArgs args, bool abort, CancellationToken cancellationToken) { + _shutdownCts.Cancel(); + bool enqueued = false; var k = new ChannelCloseAsyncRpcContinuation(ContinuationTimeout, cancellationToken);