-
Notifications
You must be signed in to change notification settings - Fork 49
Expand file tree
/
Copy pathAuditIngestor.cs
More file actions
134 lines (116 loc) · 5 KB
/
AuditIngestor.cs
File metadata and controls
134 lines (116 loc) · 5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
namespace ServiceControl.Audit.Auditing
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Infrastructure.Settings;
using Monitoring;
using NServiceBus;
using NServiceBus.Logging;
using NServiceBus.Routing;
using NServiceBus.Transport;
using Persistence.UnitOfWork;
using Recoverability;
using SagaAudit;
using Transports;
public class AuditIngestor
{
public AuditIngestor(
Settings settings,
IAuditIngestionUnitOfWorkFactory unitOfWorkFactory,
EndpointInstanceMonitoring endpointInstanceMonitoring,
IEnumerable<IEnrichImportedAuditMessages> auditEnrichers, // allows extending message enrichers with custom enrichers registered in the DI container
IMessageSession messageSession,
Lazy<IMessageDispatcher> messageDispatcher,
ITransportCustomization transportCustomization
)
{
this.settings = settings;
this.messageDispatcher = messageDispatcher;
var enrichers = new IEnrichImportedAuditMessages[] { new MessageTypeEnricher(), new EnrichWithTrackingIds(), new ProcessingStatisticsEnricher(), new DetectNewEndpointsFromAuditImportsEnricher(endpointInstanceMonitoring), new DetectSuccessfulRetriesEnricher(), new SagaRelationshipsEnricher() }.Concat(auditEnrichers).ToArray();
logQueueAddress = transportCustomization.ToTransportQualifiedQueueName(settings.AuditLogQueue);
auditPersister = new AuditPersister(
unitOfWorkFactory,
enrichers,
messageSession,
messageDispatcher
);
}
public async Task Ingest(List<MessageContext> contexts, CancellationToken cancellationToken)
{
var stored = await auditPersister.Persist(contexts, cancellationToken);
try
{
if (settings.ForwardAuditMessages)
{
await Forward(stored, logQueueAddress, cancellationToken);
}
foreach (var context in contexts)
{
context.GetTaskCompletionSource().TrySetResult(true);
}
}
catch (Exception e)
{
Log.Warn("Forwarding messages failed", e);
// making sure to rethrow so that all messages get marked as failed
throw;
}
}
Task Forward(IReadOnlyCollection<MessageContext> messageContexts, string forwardingAddress, CancellationToken cancellationToken)
{
var transportOperations = new List<TransportOperation>(messageContexts.Count);
MessageContext anyContext = null;
foreach (var messageContext in messageContexts)
{
if (messageContext.Extensions.TryGet("AuditType", out string auditType)
&& auditType != "ProcessedMessage")
{
continue;
}
anyContext = messageContext;
var outgoingMessage = new OutgoingMessage(
messageContext.NativeMessageId,
messageContext.Headers,
messageContext.Body);
// Forwarded messages should last as long as possible
outgoingMessage.Headers.Remove(Headers.TimeToBeReceived);
transportOperations.Add(new TransportOperation(outgoingMessage, new UnicastAddressTag(forwardingAddress)));
}
return anyContext != null
? messageDispatcher.Value.Dispatch(
new TransportOperations([.. transportOperations]),
anyContext.TransportTransaction, cancellationToken)
: Task.CompletedTask;
}
public async Task VerifyCanReachForwardingAddress(CancellationToken cancellationToken)
{
if (!settings.ForwardAuditMessages)
{
return;
}
try
{
var transportOperations = new TransportOperations(
new TransportOperation(
new OutgoingMessage(Guid.Empty.ToString("N"),
[], Array.Empty<byte>()),
new UnicastAddressTag(logQueueAddress)
)
);
await messageDispatcher.Value.Dispatch(transportOperations, new TransportTransaction(), cancellationToken);
}
catch (Exception e)
{
throw new Exception($"Unable to write to forwarding queue {settings.AuditLogQueue}", e);
}
}
readonly AuditPersister auditPersister;
readonly Settings settings;
readonly Lazy<IMessageDispatcher> messageDispatcher;
readonly string logQueueAddress;
static readonly ILog Log = LogManager.GetLogger<AuditIngestor>();
}
}