-
Notifications
You must be signed in to change notification settings - Fork 49
Expand file tree
/
Copy pathIngestionMetrics.cs
More file actions
75 lines (59 loc) · 2.91 KB
/
IngestionMetrics.cs
File metadata and controls
75 lines (59 loc) · 2.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
namespace ServiceControl.Audit.Auditing.Metrics;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Metrics;
using EndpointPlugin.Messages.SagaState;
using NServiceBus;
using NServiceBus.Transport;
public class IngestionMetrics
{
public const string MeterName = "Particular.ServiceControl.Audit";
public static readonly string BatchDurationInstrumentName = $"{InstrumentPrefix}.batch_duration_seconds";
public static readonly string MessageDurationInstrumentName = $"{InstrumentPrefix}.message_duration_seconds";
public IngestionMetrics(IMeterFactory meterFactory)
{
var meter = meterFactory.Create(MeterName, MeterVersion);
batchDuration = meter.CreateHistogram<double>(BatchDurationInstrumentName, unit: "seconds", "Message batch processing duration in seconds");
ingestionDuration = meter.CreateHistogram<double>(MessageDurationInstrumentName, unit: "seconds", description: "Audit message processing duration in seconds");
consecutiveBatchFailureGauge = meter.CreateObservableGauge($"{InstrumentPrefix}.consecutive_batch_failures_total", () => consecutiveBatchFailures, description: "Consecutive audit ingestion batch failures");
failureCounter = meter.CreateCounter<long>($"{InstrumentPrefix}.failures_total", description: "Audit ingestion failure count");
}
public MessageMetrics BeginIngestion(MessageContext messageContext) => new(messageContext, ingestionDuration);
public ErrorMetrics BeginErrorHandling(ErrorContext errorContext) => new(errorContext, failureCounter);
public BatchMetrics BeginBatch(int maxBatchSize) => new(maxBatchSize, batchDuration, RecordBatchOutcome);
public static TagList GetMessageTags(Dictionary<string, string> headers)
{
var tags = new TagList();
if (headers.TryGetValue(Headers.EnclosedMessageTypes, out var messageType))
{
tags.Add("message.category", messageType == SagaUpdateMessageType ? "saga-update" : "audit-message");
}
else
{
tags.Add("message.category", "control-message");
}
return tags;
}
void RecordBatchOutcome(bool success)
{
if (success)
{
consecutiveBatchFailures = 0;
}
else
{
consecutiveBatchFailures++;
}
}
long consecutiveBatchFailures;
readonly Histogram<double> batchDuration;
#pragma warning disable IDE0052
// this can be changed to Gauge<T> once we can use the latest version of System.Diagnostics.DiagnosticSource
readonly ObservableGauge<long> consecutiveBatchFailureGauge;
#pragma warning restore IDE0052
readonly Histogram<double> ingestionDuration;
readonly Counter<long> failureCounter;
const string MeterVersion = "0.1.0";
const string InstrumentPrefix = "sc.audit.ingestion";
static readonly string SagaUpdateMessageType = typeof(SagaUpdatedMessage).FullName;
}