-
Notifications
You must be signed in to change notification settings - Fork 49
Expand file tree
/
Copy pathAuditIngestor.cs
More file actions
139 lines (122 loc) · 5.3 KB
/
AuditIngestor.cs
File metadata and controls
139 lines (122 loc) · 5.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
namespace ServiceControl.Audit.Auditing
{
using System;
using System.Collections.Generic;
using System.Diagnostics.Metrics;
using System.Linq;
using System.Threading.Tasks;
using Infrastructure.Settings;
using Monitoring;
using NServiceBus;
using NServiceBus.Logging;
using NServiceBus.Routing;
using NServiceBus.Transport;
using Persistence.UnitOfWork;
using Recoverability;
using SagaAudit;
using ServiceControl.Infrastructure;
using ServiceControl.Transports;
public class AuditIngestor
{
public AuditIngestor(
Settings settings,
IAuditIngestionUnitOfWorkFactory unitOfWorkFactory,
EndpointInstanceMonitoring endpointInstanceMonitoring,
IEnumerable<IEnrichImportedAuditMessages> auditEnrichers, // allows extending message enrichers with custom enrichers registered in the DI container
IMessageSession messageSession,
Lazy<IMessageDispatcher> messageDispatcher,
ITransportCustomization transportCustomization
)
{
this.settings = settings;
this.messageDispatcher = messageDispatcher;
var enrichers = new IEnrichImportedAuditMessages[] { new MessageTypeEnricher(), new EnrichWithTrackingIds(), new ProcessingStatisticsEnricher(), new DetectNewEndpointsFromAuditImportsEnricher(endpointInstanceMonitoring), new DetectSuccessfulRetriesEnricher(), new SagaRelationshipsEnricher() }.Concat(auditEnrichers).ToArray();
logQueueAddress = transportCustomization.ToTransportQualifiedQueueName(settings.AuditLogQueue);
auditPersister = new AuditPersister(
unitOfWorkFactory,
enrichers,
messageSession,
messageDispatcher
);
}
public async Task Ingest(List<MessageContext> contexts)
{
var stored = await auditPersister.Persist(contexts);
try
{
if (settings.ForwardAuditMessages)
{
await Forward(stored, logQueueAddress);
forwardedMessagesCounter.Add(stored.Count);
}
foreach (var context in contexts)
{
context.GetTaskCompletionSource().TrySetResult(true);
}
}
catch (Exception e)
{
Log.Warn("Forwarding messages failed", e);
// making sure to rethrow so that all messages get marked as failed
throw;
}
}
Task Forward(IReadOnlyCollection<MessageContext> messageContexts, string forwardingAddress)
{
var transportOperations = new TransportOperation[messageContexts.Count]; //We could allocate based on the actual number of ProcessedMessages but this should be OK
var index = 0;
MessageContext anyContext = null;
foreach (var messageContext in messageContexts)
{
if (messageContext.Extensions.TryGet("AuditType", out string auditType)
&& auditType != "ProcessedMessage")
{
continue;
}
anyContext = messageContext;
var outgoingMessage = new OutgoingMessage(
messageContext.NativeMessageId,
messageContext.Headers,
messageContext.Body);
// Forwarded messages should last as long as possible
outgoingMessage.Headers.Remove(Headers.TimeToBeReceived);
transportOperations[index] = new TransportOperation(outgoingMessage, new UnicastAddressTag(forwardingAddress));
index++;
}
return anyContext != null
? messageDispatcher.Value.Dispatch(
new TransportOperations(transportOperations),
anyContext.TransportTransaction
)
: Task.CompletedTask;
}
public async Task VerifyCanReachForwardingAddress()
{
if (!settings.ForwardAuditMessages)
{
return;
}
try
{
var transportOperations = new TransportOperations(
new TransportOperation(
new OutgoingMessage(Guid.Empty.ToString("N"),
[], Array.Empty<byte>()),
new UnicastAddressTag(logQueueAddress)
)
);
await messageDispatcher.Value.Dispatch(transportOperations, new TransportTransaction());
}
catch (Exception e)
{
throw new Exception($"Unable to write to forwarding queue {settings.AuditLogQueue}", e);
}
}
readonly AuditPersister auditPersister;
readonly Settings settings;
readonly Lazy<IMessageDispatcher> messageDispatcher;
readonly string logQueueAddress;
readonly Counter<long> forwardedMessagesCounter = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("ingestion", "forwarded"), description: "Audit ingestion forwarded message count");
static readonly ILog Log = LogManager.GetLogger<AuditIngestor>();
}
}