Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ public async Task TimeSent_should_not_be_casted()

var sentTime = DateTime.Parse("2014-11-11T02:26:58.000462Z");

await Define<MyContext>(ctx => { ctx.TimeSent = sentTime; })
await Define<MyContext>(ctx => ctx.TimeSent = sentTime)
.WithEndpoint<Failing>()
.Done(async c =>
{
var result = await this.TryGet<FailedMessageView>($"/api/errors/last/{c.UniqueMessageId}");
failure = result;
return c.UniqueMessageId != null & result;
return (c.UniqueMessageId != null) & result;
})
.Run();

Expand All @@ -61,22 +61,31 @@ public async Task Should_be_able_to_get_the_message_by_id()
{
FailedMessageView failure = null;

await Define<MyContext>()
var testStartTime = DateTime.UtcNow;

var context = await Define<MyContext>()
.WithEndpoint<Failing>()
.Done(async c =>
{
var result = await this.TryGet<FailedMessageView>($"/api/errors/last/{c.UniqueMessageId}");
failure = result;
return c.UniqueMessageId != null & result;
return (c.UniqueMessageId != null) & result;
})
.Run();

Assert.That(failure, Is.Not.Null);

//No failure time will result in utc now being used
Assert.That(failure.TimeOfFailure, Is.GreaterThan(testStartTime));

// ServicePulse assumes that the receiving endpoint name is present
Assert.That(failure.ReceivingEndpoint, Is.Not.Null);
Assert.That(failure.ReceivingEndpoint.Name, Is.EqualTo(context.EndpointNameOfReceivingEndpoint));
}

public class Failing : EndpointConfigurationBuilder
class Failing : EndpointConfigurationBuilder
{
public Failing() => EndpointSetup<DefaultServerWithoutAudit>(c => { c.Recoverability().Delayed(x => x.NumberOfRetries(0)); });
public Failing() => EndpointSetup<DefaultServerWithoutAudit>(c => c.Recoverability().Delayed(x => x.NumberOfRetries(0)));

class SendFailedMessage : DispatchRawMessages<MyContext>
{
Expand All @@ -89,30 +98,23 @@ protected override TransportOperations CreateMessage(MyContext context)
var headers = new Dictionary<string, string>
{
[Headers.MessageId] = context.MessageId,
[Headers.ProcessingEndpoint] = context.EndpointNameOfReceivingEndpoint,
Comment thread
andreasohlund marked this conversation as resolved.
["NServiceBus.ExceptionInfo.ExceptionType"] = "2014-11-11 02:26:57:767462 Z",
["NServiceBus.ExceptionInfo.Message"] = "An error occurred while attempting to extract logical messages from transport message NServiceBus.TransportMessage",
["NServiceBus.ExceptionInfo.InnerExceptionType"] = "System.Exception",
["NServiceBus.ExceptionInfo.Source"] = "NServiceBus.Core",
["NServiceBus.ExceptionInfo.StackTrace"] = string.Empty,
["NServiceBus.FailedQ"] = Conventions.EndpointNamingConvention(typeof(Failing)),
["NServiceBus.TimeOfFailure"] = "2014-11-11 02:26:58:000462 Z"
[Headers.ProcessingMachine] = "unknown", // This is needed for endpoint detection to work since "host" is required, endpoint name is detected from the FailedQ header
};

if (context.TimeSent.HasValue)
{
headers["NServiceBus.TimeSent"] = DateTimeOffsetHelper.ToWireFormattedString(context.TimeSent.Value);
}

var outgoingMessage = new OutgoingMessage(context.MessageId, headers, new byte[0]);
var outgoingMessage = new OutgoingMessage(context.MessageId, headers, Array.Empty<byte>());

return new TransportOperations(
new TransportOperation(outgoingMessage, new UnicastAddressTag("error"))
);
return new TransportOperations(new TransportOperation(outgoingMessage, new UnicastAddressTag("error")));
}
}
}

public class MyContext : ScenarioContext
Comment thread
andreasohlund marked this conversation as resolved.
class MyContext : ScenarioContext
{
public string MessageId { get; set; }

Expand Down
7 changes: 1 addition & 6 deletions src/ServiceControl.Persistence/FailureDetails.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,9 @@ namespace ServiceControl.Contracts.Operations

public class FailureDetails
{
public FailureDetails()
{
TimeOfFailure = DateTime.UtcNow;
}

public string AddressOfFailingEndpoint { get; set; }

public DateTime TimeOfFailure { get; set; }
public DateTime TimeOfFailure { get; set; } = DateTime.UtcNow;

public ExceptionDetails Exception { get; set; }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,8 @@
using ServiceControl.Contracts.Operations;
using ServiceControl.Persistence;

class DetectNewEndpointsFromErrorImportsEnricher : IEnrichImportedErrorMessages
class DetectNewEndpointsFromErrorImportsEnricher(IEndpointInstanceMonitoring monitoring) : IEnrichImportedErrorMessages
{
public DetectNewEndpointsFromErrorImportsEnricher(IEndpointInstanceMonitoring monitoring)
{
this.monitoring = monitoring;
}

public void Enrich(ErrorEnricherContext context)
{
var sendingEndpoint = EndpointDetailsParser.SendingEndpoint(context.Headers);
Expand Down Expand Up @@ -47,7 +42,5 @@ void TryAddEndpoint(EndpointDetails endpointDetails, ErrorEnricherContext contex
context.Add(endpointDetails);
}
}

IEndpointInstanceMonitoring monitoring;
}
}
12 changes: 4 additions & 8 deletions src/ServiceControl/Operations/ErrorIngestor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,7 @@ public ErrorIngestor(Metrics metrics,
bulkInsertDurationMeter = metrics.GetMeter("Error ingestion - bulk insert duration", FrequencyInMilliseconds);
var ingestedMeter = metrics.GetCounter("Error ingestion - ingested");

var enrichers = new IEnrichImportedErrorMessages[]
{
new MessageTypeEnricher(),
new EnrichWithTrackingIds(),
new ProcessingStatisticsEnricher()

}.Concat(errorEnrichers).ToArray();
var enrichers = new IEnrichImportedErrorMessages[] { new MessageTypeEnricher(), new EnrichWithTrackingIds(), new ProcessingStatisticsEnricher() }.Concat(errorEnrichers).ToArray();
Comment thread
andreasohlund marked this conversation as resolved.
Outdated

errorProcessor = new ErrorProcessor(enrichers, failedMessageEnrichers.ToArray(), domainEvents, ingestedMeter);
retryConfirmationProcessor = new RetryConfirmationProcessor(domainEvents);
Expand All @@ -67,7 +61,6 @@ public async Task Ingest(List<MessageContext> contexts, CancellationToken cancel
}
}


var storedFailed = await PersistFailedMessages(failedMessages, retriedMessages, cancellationToken);

try
Expand All @@ -77,6 +70,7 @@ public async Task Ingest(List<MessageContext> contexts, CancellationToken cancel
{
announcerTasks.Add(errorProcessor.Announce(context));
}

foreach (var context in retriedMessages)
{
announcerTasks.Add(retryConfirmationProcessor.Announce(context));
Expand All @@ -90,6 +84,7 @@ public async Task Ingest(List<MessageContext> contexts, CancellationToken cancel
{
Logger.Debug($"Forwarding {storedFailed.Count} messages");
}

await Forward(storedFailed, cancellationToken);
if (Logger.IsDebugEnabled)
{
Expand Down Expand Up @@ -133,6 +128,7 @@ async Task<IReadOnlyList<MessageContext>> PersistFailedMessages(List<MessageCont
{
await unitOfWork.Complete(cancellationToken);
}

return storedFailedMessageContexts;
}
catch (Exception e)
Expand Down
19 changes: 5 additions & 14 deletions src/ServiceControl/Operations/ErrorProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,11 @@
using ServiceControl.Persistence;
using ServiceControl.Persistence.UnitOfWork;

class ErrorProcessor
class ErrorProcessor(IEnrichImportedErrorMessages[] enrichers,
IFailedMessageEnricher[] failedMessageEnrichers,
IDomainEvents domainEvents,
Counter ingestedCounter)
{
public ErrorProcessor(IEnrichImportedErrorMessages[] enrichers, IFailedMessageEnricher[] failedMessageEnrichers, IDomainEvents domainEvents,
Counter ingestedCounter)
{
this.enrichers = enrichers;
this.domainEvents = domainEvents;
this.ingestedCounter = ingestedCounter;
failedMessageFactory = new FailedMessageFactory(failedMessageEnrichers);
}

public async Task<IReadOnlyList<MessageContext>> Process(IReadOnlyList<MessageContext> contexts, IIngestionUnitOfWork unitOfWork)
{
var storedContexts = new List<MessageContext>(contexts.Count);
Expand Down Expand Up @@ -169,10 +163,7 @@ static void RecordKnownEndpoints(EndpointDetails observedEndpoint, Dictionary<st
}
}

readonly IEnrichImportedErrorMessages[] enrichers;
readonly IDomainEvents domainEvents;
readonly Counter ingestedCounter;
readonly FailedMessageFactory failedMessageFactory;
readonly FailedMessageFactory failedMessageFactory = new(failedMessageEnrichers);
static readonly ILog Logger = LogManager.GetLogger<ErrorProcessor>();
}
}