Skip to content

Commit cec3fad

Browse files
Christiancquirosj
authored andcommitted
Using editId colum value as the uniquemessage id
1 parent 4c70bd7 commit cec3fad

File tree

10 files changed

+214
-18
lines changed

10 files changed

+214
-18
lines changed

src/ServiceControl.AcceptanceTests/Recoverability/ExternalIntegration/ExternalIntegrationAcceptanceTest.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ public ErrorSender() =>
2020
EndpointSetup<DefaultServerWithoutAudit>(c =>
2121
{
2222
c.NoDelayedRetries();
23-
c.ReportSuccessfulRetriesToServiceControl();
23+
//TODO: Get back to this to determine whether or not this duplication for simulating the production code is really needed
24+
//c.ReportSuccessfulRetriesToServiceControl();
2425
});
2526

2627
public class AHandler : IHandleMessages<AMessage>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
namespace ServiceControl.AcceptanceTests.Recoverability.ExternalIntegration
2+
{
3+
using System;
4+
using System.Collections.Generic;
5+
using System.Threading.Tasks;
6+
using AcceptanceTesting;
7+
using AcceptanceTesting.EndpointTemplates;
8+
using Contracts;
9+
using NServiceBus;
10+
using NServiceBus.AcceptanceTesting;
11+
using NUnit.Framework;
12+
using ServiceBus.Management.Infrastructure.Settings;
13+
using ServiceControl.MessageFailures;
14+
using JsonSerializer = System.Text.Json.JsonSerializer;
15+
16+
class When_a_failed_message_is_resolved_by_edit_and_retry : ExternalIntegrationAcceptanceTest
17+
{
18+
[Test]
19+
public async Task Should_publish_notification()
20+
{
21+
CustomConfiguration = config => config.OnEndpointSubscribed<Context>((s, ctx) =>
22+
{
23+
ctx.ExternalProcessorSubscribed = s.SubscriberReturnAddress.Contains(nameof(ExternalProcessor));
24+
});
25+
26+
var context = await Define<Context>()
27+
.WithEndpoint<ErrorSender>(b => b.When(session => Task.CompletedTask).DoNotFailOnErrorMessages())
28+
.WithEndpoint<ExternalProcessor>(b => b.When(async (bus, c) =>
29+
{
30+
await bus.Subscribe<MessageFailureResolvedByRetry>();
31+
32+
if (c.HasNativePubSubSupport)
33+
{
34+
c.ExternalProcessorSubscribed = true;
35+
}
36+
}))
37+
.Do("WaitUntilErrorsContainsFailedMessage",
38+
async ctx => await this.TryGet<FailedMessage>($"/api/errors/{ctx.FailedMessageId}") != null)
39+
.Do("WaitForExternalProcessorToSubscribe",
40+
ctx => Task.FromResult(ctx.ExternalProcessorSubscribed))
41+
.Do("EditAndRetry", async ctx =>
42+
{
43+
// First retrieve the original failed message to get all its headers
44+
var originalFailedMessageResult = await this.TryGet<FailedMessage>($"/api/errors/{ctx.FailedMessageId}");
45+
var originalFailedMessage = originalFailedMessageResult.Item;
46+
47+
// Convert the original headers to Dictionary<string, string> for the edit payload
48+
var originalHeaders = new Dictionary<string, string>();
49+
foreach (var header in originalFailedMessage.ProcessingAttempts[0].Headers)
50+
{
51+
originalHeaders[header.Key] = header.Value;
52+
}
53+
54+
// Prepare the edit payload with all original headers (locked headers unchanged, others can be modified)
55+
var editPayload = new
56+
{
57+
message_body = "{}", // Empty JSON body for AMessage (ICommand with no properties)
58+
message_headers = originalHeaders // Use all original headers to satisfy controller validation
59+
};
60+
61+
await this.Post($"/api/edit/{ctx.FailedMessageId}", editPayload);
62+
})
63+
.Do("EnsureRetried", async ctx =>
64+
{
65+
return await this.TryGet<FailedMessage>($"/api/errors/{ctx.FailedMessageId}",
66+
e => e.Status == FailedMessageStatus.Resolved);
67+
})
68+
.Done(ctx => ctx.EventDelivered) //Done when sequence is finished
69+
.Run();
70+
71+
var deserializedEvent = JsonSerializer.Deserialize<MessageFailureResolvedByRetry>(context.Event);
72+
Assert.That(deserializedEvent?.FailedMessageId, Is.EqualTo(context.FailedMessageId.ToString()));
73+
}
74+
75+
public class ExternalProcessor : EndpointConfigurationBuilder
76+
{
77+
public ExternalProcessor() =>
78+
EndpointSetup<DefaultServerWithoutAudit>(c =>
79+
{
80+
var routing = c.ConfigureRouting();
81+
routing.RouteToEndpoint(typeof(MessageFailureResolvedByRetry).Assembly, Settings.DEFAULT_INSTANCE_NAME);
82+
}, publisherMetadata => { publisherMetadata.RegisterPublisherFor<MessageFailureResolvedByRetry>(Settings.DEFAULT_INSTANCE_NAME); });
83+
84+
public class FailureHandler(Context testContext) : IHandleMessages<MessageFailureResolvedByRetry>
85+
{
86+
public Task Handle(MessageFailureResolvedByRetry message, IMessageHandlerContext context)
87+
{
88+
var serializedMessage = JsonSerializer.Serialize(message);
89+
testContext.Event = serializedMessage;
90+
testContext.EventDelivered = true;
91+
return Task.CompletedTask;
92+
}
93+
}
94+
}
95+
}
96+
}

src/ServiceControl.Persistence.RavenDB/Editing/EditFailedMessageManager.cs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
namespace ServiceControl.Persistence.RavenDB
22
{
33
using System;
4+
using System.Linq;
45
using System.Threading.Tasks;
6+
using Raven.Client.Documents;
57
using Raven.Client.Documents.Session;
68
using ServiceControl.MessageFailures;
79
using ServiceControl.Persistence.Recoverability.Editing;
10+
using ServiceControl.Persistence.RavenDB.Indexes;
811

912
class EditFailedMessageManager : AbstractSessionManager, IEditFailedMessagesManager
1013
{
@@ -25,13 +28,13 @@ public async Task<FailedMessage> GetFailedMessage(string failedMessageId)
2528
return failedMessage;
2629
}
2730

28-
public async Task<string> GetCurrentEditingMessageId(string failedMessageId)
31+
public async Task<string> GetCurrentEditingRequestId(string failedMessageId)
2932
{
3033
var edit = await session.LoadAsync<FailedMessageEdit>(FailedMessageEdit.MakeDocumentId(failedMessageId));
3134
return edit?.EditId;
3235
}
3336

34-
public Task SetCurrentEditingMessageId(string editingMessageId)
37+
public Task SetCurrentEditingRequestId(string editingMessageId)
3538
{
3639
if (failedMessage == null)
3740
{
@@ -54,5 +57,19 @@ public Task SetFailedMessageAsResolved()
5457

5558
return Task.CompletedTask;
5659
}
60+
61+
public async Task<string> GetFailedMessageIdByEditId(string editId)
62+
{
63+
var edit = await session.Query<FailedMessageEdit, FailedMessageEditIndex>()
64+
.Where(x => x.EditId == editId)
65+
.FirstOrDefaultAsync();
66+
67+
if (edit?.FailedMessageId != null)
68+
{
69+
return FailedMessageIdGenerator.GetMessageIdFromDocumentId(edit.FailedMessageId);
70+
}
71+
72+
return null;
73+
}
5774
}
5875
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
namespace ServiceControl.Persistence.RavenDB.Indexes
2+
{
3+
using System.Linq;
4+
using Raven.Client.Documents.Indexes;
5+
using ServiceControl.Persistence.Recoverability.Editing;
6+
7+
class FailedMessageEditIndex : AbstractIndexCreationTask<FailedMessageEdit>
8+
{
9+
public FailedMessageEditIndex()
10+
{
11+
Map = edits =>
12+
from edit in edits
13+
select new
14+
{
15+
edit.EditId,
16+
edit.FailedMessageId
17+
};
18+
}
19+
20+
public class SortAndFilterOptions
21+
{
22+
public string EditId { get; set; }
23+
public string FailedMessageId { get; set; }
24+
}
25+
}
26+
}

src/ServiceControl.Persistence.Tests/Recoverability/EditMessageTests.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public async Task Should_discard_edit_if_edited_message_not_unresolved(FailedMes
5858
var failedMessage = await ErrorMessageDataStore.ErrorBy(failedMessageId);
5959

6060
var editFailedMessagesManager = await ErrorMessageDataStore.CreateEditFailedMessageManager();
61-
var editOperation = await editFailedMessagesManager.GetCurrentEditingMessageId(failedMessageId);
61+
var editOperation = await editFailedMessagesManager.GetCurrentEditingRequestId(failedMessageId);
6262

6363
Assert.Multiple(() =>
6464
{
@@ -79,7 +79,7 @@ public async Task Should_discard_edit_when_different_edit_already_exists()
7979
using (var editFailedMessagesManager = await ErrorMessageDataStore.CreateEditFailedMessageManager())
8080
{
8181
_ = await editFailedMessagesManager.GetFailedMessage(failedMessageId);
82-
await editFailedMessagesManager.SetCurrentEditingMessageId(previousEdit);
82+
await editFailedMessagesManager.SetCurrentEditingRequestId(previousEdit);
8383
await editFailedMessagesManager.SaveChanges();
8484
}
8585

@@ -91,7 +91,7 @@ public async Task Should_discard_edit_when_different_edit_already_exists()
9191
using (var editFailedMessagesManagerAssert = await ErrorMessageDataStore.CreateEditFailedMessageManager())
9292
{
9393
var failedMessage = await editFailedMessagesManagerAssert.GetFailedMessage(failedMessageId);
94-
var editId = await editFailedMessagesManagerAssert.GetCurrentEditingMessageId(failedMessageId);
94+
var editId = await editFailedMessagesManagerAssert.GetCurrentEditingRequestId(failedMessageId);
9595

9696
Assert.Multiple(() =>
9797
{
@@ -130,7 +130,7 @@ public async Task Should_dispatch_edited_message_when_first_edit()
130130
var failedMessage2 = await x.GetFailedMessage(failedMessage.UniqueMessageId);
131131
Assert.That(failedMessage2, Is.Not.Null, "Edited failed message");
132132

133-
var editId = await x.GetCurrentEditingMessageId(failedMessage2.UniqueMessageId);
133+
var editId = await x.GetCurrentEditingRequestId(failedMessage2.UniqueMessageId);
134134

135135
Assert.Multiple(() =>
136136
{

src/ServiceControl.Persistence/IEditFailedMessagesManager.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@
66
public interface IEditFailedMessagesManager : IDataSessionManager
77
{
88
Task<FailedMessage> GetFailedMessage(string failedMessageId);
9-
Task<string> GetCurrentEditingMessageId(string failedMessageId);
10-
Task SetCurrentEditingMessageId(string editingMessageId);
9+
Task<string> GetCurrentEditingRequestId(string failedMessageId);
10+
Task SetCurrentEditingRequestId(string editingMessageId);
1111
Task SetFailedMessageAsResolved();
12+
Task<string> GetFailedMessageIdByEditId(string editId);
1213
}
1314
}

src/ServiceControl/MessageFailures/Api/EditFailedMessagesController.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public async Task<ActionResult<EditRetryResponse>> Edit(string failedMessageId,
3737

3838
//HINT: This validation is the first one because we want to minimize the chance of two users concurrently execute an edit-retry.
3939
var editManager = await store.CreateEditFailedMessageManager();
40-
var editId = await editManager.GetCurrentEditingMessageId(failedMessageId);
40+
var editId = await editManager.GetCurrentEditingRequestId(failedMessageId);
4141
if (editId != null)
4242
{
4343
logger.LogWarning("Cannot edit message {FailedMessageId} because it has already been edited", failedMessageId);

src/ServiceControl/Operations/ErrorIngestor.cs

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,26 @@
1515
using Recoverability;
1616
using ServiceBus.Management.Infrastructure.Settings;
1717
using ServiceControl.Persistence.UnitOfWork;
18+
using ServiceControl.Persistence;
1819
using ServiceControl.Transports;
1920

2021
public class ErrorIngestor
2122
{
2223
static readonly long FrequencyInMilliseconds = Stopwatch.Frequency / 1000;
23-
2424
public ErrorIngestor(Metrics metrics,
2525
IEnumerable<IEnrichImportedErrorMessages> errorEnrichers,
2626
IEnumerable<IFailedMessageEnricher> failedMessageEnrichers,
2727
IDomainEvents domainEvents,
2828
IIngestionUnitOfWorkFactory unitOfWorkFactory,
2929
Lazy<IMessageDispatcher> messageDispatcher,
3030
ITransportCustomization transportCustomization,
31+
IErrorMessageDataStore errorMessageDataStore,
3132
Settings settings,
3233
ILogger<ErrorIngestor> logger)
3334
{
3435
this.unitOfWorkFactory = unitOfWorkFactory;
3536
this.messageDispatcher = messageDispatcher;
37+
this.errorMessageDataStore = errorMessageDataStore;
3638
this.settings = settings;
3739
this.logger = logger;
3840
bulkInsertDurationMeter = metrics.GetMeter("Error ingestion - bulk insert duration", FrequencyInMilliseconds);
@@ -80,7 +82,17 @@ public async Task Ingest(List<MessageContext> contexts, CancellationToken cancel
8082
}
8183
foreach (var context in retriedMessages)
8284
{
83-
announcerTasks.Add(retryConfirmationProcessor.Announce(context));
85+
var failedMessageId = await GetFailedMessageId(context);
86+
announcerTasks.Add(retryConfirmationProcessor.Announce(failedMessageId));
87+
88+
//OPTIONAL: raise events for the entire chain of edits and retries
89+
// var retryUniqueMessageId = context.Headers["ServiceControl.Retry.UniqueMessageId"];
90+
// var failedMessageIds = await GetFailedMessageIdChain(retryUniqueMessageId);
91+
92+
// foreach (var failedMessageId in failedMessageIds)
93+
// {
94+
// announcerTasks.Add(retryConfirmationProcessor.Announce(failedMessageId));
95+
// }
8496
}
8597

8698
await Task.WhenAll(announcerTasks);
@@ -107,6 +119,46 @@ public async Task Ingest(List<MessageContext> contexts, CancellationToken cancel
107119
throw;
108120
}
109121
}
122+
async Task<string> GetFailedMessageId(MessageContext context)
123+
{
124+
var retryUniqueMessageId = context.Headers["ServiceControl.Retry.UniqueMessageId"];
125+
126+
127+
//Check if this retry was recorded as an edit and retry in order to locate the original failedMessageId;
128+
using var editFailedMessagesManager = await errorMessageDataStore.CreateEditFailedMessageManager();
129+
var failedMessageId = await editFailedMessagesManager.GetFailedMessageIdByEditId(retryUniqueMessageId);
130+
131+
// If not found, this is a regular retry, so return retryUniqueMessageId
132+
return failedMessageId ?? retryUniqueMessageId;
133+
}
134+
135+
// async Task<List<string>> GetFailedMessageIdChain(string retryUniqueMessageId)
136+
// {
137+
// var failedMessageIds = new List<string>();
138+
// var currentId = retryUniqueMessageId;
139+
140+
// using var editFailedMessagesManager = await errorMessageDataStore.CreateEditFailedMessageManager();
141+
142+
// while (true)
143+
// {
144+
// var failedMessageId = await editFailedMessagesManager.GetFailedMessageIdByEditId(currentId);
145+
146+
// if (failedMessageId == null)
147+
// {
148+
// // If not found, this is a regular retry, so include the current ID
149+
// failedMessageIds.Add(currentId);
150+
// break;
151+
// }
152+
153+
// // Found an edit - add the original failed message ID to our chain
154+
// failedMessageIds.Add(failedMessageId);
155+
156+
// // Continue navigating the chain by looking for edits of this failed message
157+
// currentId = failedMessageId;
158+
// }
159+
160+
// return failedMessageIds;
161+
// }
110162

111163
async Task<IReadOnlyList<MessageContext>> PersistFailedMessages(List<MessageContext> failedMessageContexts, List<MessageContext> retriedMessageContexts, CancellationToken cancellationToken)
112164
{
@@ -193,6 +245,7 @@ public async Task VerifyCanReachForwardingAddress(CancellationToken cancellation
193245
readonly Settings settings;
194246
readonly ErrorProcessor errorProcessor;
195247
readonly Lazy<IMessageDispatcher> messageDispatcher;
248+
readonly IErrorMessageDataStore errorMessageDataStore;
196249
readonly RetryConfirmationProcessor retryConfirmationProcessor;
197250
readonly UnicastAddressTag logQueueAddress;
198251

src/ServiceControl/Operations/RetryConfirmationProcessor.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ class RetryConfirmationProcessor
1212
public const string SuccessfulRetryHeader = "ServiceControl.Retry.Successful";
1313
const string RetryUniqueMessageIdHeader = "ServiceControl.Retry.UniqueMessageId";
1414

15+
1516
public RetryConfirmationProcessor(IDomainEvents domainEvents)
1617
{
1718
this.domainEvents = domainEvents;
@@ -26,11 +27,11 @@ public async Task Process(List<MessageContext> contexts, IIngestionUnitOfWork un
2627
}
2728
}
2829

29-
public Task Announce(MessageContext messageContext)
30+
public Task Announce(string failedMessageId)
3031
{
3132
return domainEvents.Raise(new MessageFailureResolvedByRetry
3233
{
33-
FailedMessageId = messageContext.Headers[RetryUniqueMessageIdHeader],
34+
FailedMessageId = failedMessageId
3435
});
3536
}
3637

0 commit comments

Comments
 (0)