From 385d78c9f49c935eb5e9ac04b25e9f5a569d450e Mon Sep 17 00:00:00 2001 From: aminsrz <46673894+aminsrz@users.noreply.github.com> Date: Thu, 31 Oct 2024 13:06:19 +0100 Subject: [PATCH 1/3] Support for BasicNack in RabbitMQBasicConsumer --- .../RabbitMQBasicConsumer.cs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs b/src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs index 66815e16d..b4940eef7 100644 --- a/src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs +++ b/src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs @@ -15,7 +15,7 @@ namespace DotNetCore.CAP.RabbitMQ; public class RabbitMQBasicConsumer : AsyncDefaultBasicConsumer { - private readonly SemaphoreSlim _semaphore; + protected readonly SemaphoreSlim _semaphore; private readonly string _groupName; private readonly bool _usingTaskRun; private readonly Func _msgCallback; @@ -85,18 +85,26 @@ Task Consume() } } - public void BasicAck(ulong deliveryTag) + public virtual void BasicReject(ulong deliveryTag) { if (Model.IsOpen) - Model.BasicAck(deliveryTag, false); + Model.BasicReject(deliveryTag, true); _semaphore.Release(); } - public void BasicReject(ulong deliveryTag) + public virtual void BasicNack(ulong deliveryTag) { if (Model.IsOpen) - Model.BasicReject(deliveryTag, true); + Model.BasicNack(deliveryTag, false, true); + + _semaphore.Release(); + } + + public void BasicNack(ulong deliveryTag) + { + if (Model.IsOpen) + Model.BasicNack(deliveryTag, false, true); _semaphore.Release(); } From 298ea37fcb939d518cf836219b6d480799e4722f Mon Sep 17 00:00:00 2001 From: aminsrz <46673894+aminsrz@users.noreply.github.com> Date: Thu, 31 Oct 2024 13:22:10 +0100 Subject: [PATCH 2/3] Making the _semaphore in the RabbitMQBasicConsumer protected --- .../RabbitMQBasicConsumer.cs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs b/src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs index b4940eef7..63e00b232 100644 --- a/src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs +++ b/src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs @@ -15,7 +15,6 @@ namespace DotNetCore.CAP.RabbitMQ; public class RabbitMQBasicConsumer : AsyncDefaultBasicConsumer { - protected readonly SemaphoreSlim _semaphore; private readonly string _groupName; private readonly bool _usingTaskRun; private readonly Func _msgCallback; @@ -23,6 +22,8 @@ public class RabbitMQBasicConsumer : AsyncDefaultBasicConsumer private readonly Func>>? _customHeadersBuilder; private readonly IServiceProvider _serviceProvider; + protected SemaphoreSlim Semaphore { get; } + public RabbitMQBasicConsumer(IModel? model, byte concurrent, string groupName, Func msgCallback, @@ -31,7 +32,7 @@ public RabbitMQBasicConsumer(IModel? model, IServiceProvider serviceProvider) : base(model) { - _semaphore = new SemaphoreSlim(concurrent); + Semaphore = new SemaphoreSlim(concurrent); _groupName = groupName; _usingTaskRun = concurrent > 0; _msgCallback = msgCallback; @@ -45,7 +46,7 @@ public override async Task HandleBasicDeliver(string consumerTag, ulong delivery { if (_usingTaskRun) { - await _semaphore.WaitAsync(); + await Semaphore.WaitAsync(); _ = Task.Run(Consume).ConfigureAwait(false); } @@ -90,7 +91,7 @@ public virtual void BasicReject(ulong deliveryTag) if (Model.IsOpen) Model.BasicReject(deliveryTag, true); - _semaphore.Release(); + Semaphore.Release(); } public virtual void BasicNack(ulong deliveryTag) @@ -98,7 +99,7 @@ public virtual void BasicNack(ulong deliveryTag) if (Model.IsOpen) Model.BasicNack(deliveryTag, false, true); - _semaphore.Release(); + Semaphore.Release(); } public void BasicNack(ulong deliveryTag) @@ -106,7 +107,7 @@ public void BasicNack(ulong deliveryTag) if (Model.IsOpen) Model.BasicNack(deliveryTag, false, true); - _semaphore.Release(); + Semaphore.Release(); } public override async Task OnCancel(params string[] consumerTags) From 8e469304b6f3d113062405d9b4604f11e6b142f1 Mon Sep 17 00:00:00 2001 From: aminsrz <46673894+aminsrz@users.noreply.github.com> Date: Thu, 31 Oct 2024 13:25:53 +0100 Subject: [PATCH 3/3] Bring back the mistakenly removed BasicAck method --- src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs b/src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs index 63e00b232..0ed2e4a3b 100644 --- a/src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs +++ b/src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs @@ -86,23 +86,23 @@ Task Consume() } } - public virtual void BasicReject(ulong deliveryTag) + public virtual void BasicAck(ulong deliveryTag) { if (Model.IsOpen) - Model.BasicReject(deliveryTag, true); + Model.BasicAck(deliveryTag, false); Semaphore.Release(); } - public virtual void BasicNack(ulong deliveryTag) + public virtual void BasicReject(ulong deliveryTag) { if (Model.IsOpen) - Model.BasicNack(deliveryTag, false, true); + Model.BasicReject(deliveryTag, true); Semaphore.Release(); } - public void BasicNack(ulong deliveryTag) + public virtual void BasicNack(ulong deliveryTag) { if (Model.IsOpen) Model.BasicNack(deliveryTag, false, true);