Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions docs/telemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,19 @@ The following metrics are available:

### Ingestion

- `sc.audit.ingestion.count` - Successful ingested audit message count
- `sc.audit.ingestion.success` - Successful ingested audit message count
- `sc.audit.ingestion.retry` - Retried audit message count
- `sc.audit.ingestion.failed` - Failed audit message count
- `sc.audit.ingestion.duration` - Audit message processing duration (in milliseconds)
- `sc.audit.ingestion.message_size` - Audit message body size (in kilobytes)
- `sc.audit.ingestion.forwarded_count` - Forwarded audit messages count
- `sc.audit.ingestion.forwarded` - Forwarded audit messages count

### Batching

- `sc.audit.ingestion.batch_duration` - Batch processing duration (in milliseconds)
- `sc.audit.ingestion.batch_size` - Batch size (number of messages)
- `sc.audit.ingestion.consecutive_batch_failures` - Consecutive batch failures

### Storage

- `sc.audit.ingestion.audits_count` - Stored audit message count
- `sc.audit.ingestion.sagas_count` - Stored sagas message count
- `sc.audit.ingestion.commit_duration` - Storage unit of work commit duration (in milliseconds)

## Monitoring

No telemetry is currently available.
16 changes: 8 additions & 8 deletions src/ServiceControl.Audit/Auditing/AuditIngestion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ async Task OnMessage(MessageContext messageContext, CancellationToken cancellati
await channel.Writer.WriteAsync(messageContext, cancellationToken);
await taskCompletionSource.Task;

ingestedMessagesCounter.Add(1);
successfulMessagesCounter.Add(1, Telemetry.GetIngestedMessageTags(messageContext.Headers));
messageSize.Record(messageContext.Body.Length / 1024.0);
Comment thread
andreasohlund marked this conversation as resolved.
Outdated
}
Comment thread
andreasohlund marked this conversation as resolved.
}
Expand All @@ -210,15 +210,15 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
try
{
// as long as there is something to read this will fetch up to MaximumConcurrency items
while (channel.Reader.TryRead(out var context))
using (new DurationRecorder(auditBatchDuration))
{
contexts.Add(context);
}
while (channel.Reader.TryRead(out var context))
{
contexts.Add(context);
}

auditBatchSize.Record(contexts.Count);
auditBatchSize.Record(contexts.Count);

using (new DurationRecorder(auditBatchDuration))
{
await auditIngestor.Ingest(contexts);
}

Expand Down Expand Up @@ -296,7 +296,7 @@ public override async Task StopAsync(CancellationToken cancellationToken)
readonly Histogram<long> auditBatchSize = Telemetry.Meter.CreateHistogram<long>(Telemetry.CreateInstrumentName("ingestion", "batch_size"), description: "Audit ingestion average batch size");
readonly Histogram<double> auditBatchDuration = Telemetry.Meter.CreateHistogram<double>(Telemetry.CreateInstrumentName("ingestion", "batch_duration"), unit: "ms", "Average audit message batch processing duration");
readonly Histogram<double> messageSize = Telemetry.Meter.CreateHistogram<double>(Telemetry.CreateInstrumentName("ingestion", "message_size"), unit: "kilobytes", description: "Average audit message body size");
readonly Counter<long> ingestedMessagesCounter = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("ingestion", "count"), description: "Successful ingested audit message count");
readonly Counter<long> successfulMessagesCounter = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("ingestion", "success"), description: "Successful ingested audit message count");
readonly Histogram<long> consecutiveBatchFailuresCounter = Telemetry.Meter.CreateHistogram<long>(Telemetry.CreateInstrumentName("ingestion", "consecutive_batch_failures"), unit: "count", description: "Consecutive audit ingestion batch failure");
readonly Histogram<double> ingestionDuration = Telemetry.Meter.CreateHistogram<double>(Telemetry.CreateInstrumentName("ingestion", "duration"), unit: "ms", description: "Average incoming audit message processing duration");
readonly Watchdog watchdog;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Threading;
using System.Threading.Tasks;
using Infrastructure;
using NServiceBus;
using NServiceBus.Logging;
using NServiceBus.Transport;
using ServiceControl.Audit.Persistence;
Expand All @@ -35,16 +36,19 @@ public AuditIngestionFaultPolicy(IFailedAuditStorage failedAuditStorage, Logging

public async Task<ErrorHandleResult> OnError(ErrorContext errorContext, CancellationToken cancellationToken = default)
{
var tags = Telemetry.GetIngestedMessageTags(errorContext.Message.Headers);

//Same as recoverability policy in NServiceBusFactory
if (errorContext.ImmediateProcessingFailures < 3)
{
retryCounter.Add(1);
retryCounter.Add(1, tags);
return ErrorHandleResult.RetryRequired;
}

await StoreFailedMessageDocument(errorContext, cancellationToken);

failedCounter.Add(1);
failedCounter.Add(1, tags);

return ErrorHandleResult.Handled;
}

Expand Down
2 changes: 1 addition & 1 deletion src/ServiceControl.Audit/Auditing/AuditIngestor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public async Task VerifyCanReachForwardingAddress()
readonly Settings settings;
readonly Lazy<IMessageDispatcher> messageDispatcher;
readonly string logQueueAddress;
readonly Counter<long> forwardedMessagesCounter = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("ingestion", "forwarded_count"), description: "Audit ingestion forwarded message count");
readonly Counter<long> forwardedMessagesCounter = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("ingestion", "forwarded"), description: "Audit ingestion forwarded message count");

static readonly ILog Log = LogManager.GetLogger<AuditIngestor>();
}
Expand Down
16 changes: 2 additions & 14 deletions src/ServiceControl.Audit/Auditing/AuditPersister.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
{
using System;
using System.Collections.Generic;
using System.Diagnostics.Metrics;
using System.Text.Json;
using System.Threading.Tasks;
using Infrastructure;
Expand Down Expand Up @@ -61,14 +60,10 @@ public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageCo
}

await unitOfWork.RecordProcessedMessage(processedMessage, context.Body);

storedAuditsCounter.Add(1);
}
else if (context.Extensions.TryGet(out SagaSnapshot sagaSnapshot))
{
await unitOfWork.RecordSagaSnapshot(sagaSnapshot);

storedSagasCounter.Add(1);
}

storedContexts.Add(context);
Expand Down Expand Up @@ -105,11 +100,8 @@ public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageCo

try
{
using (new DurationRecorder(commitDuration))
{
// this can throw even though dispose is never supposed to throw
await unitOfWork.DisposeAsync();
}
// this can throw even though dispose is never supposed to throw
await unitOfWork.DisposeAsync();
}
catch (Exception e)
{
Expand Down Expand Up @@ -252,10 +244,6 @@ await messageDispatcher.Value.Dispatch(new TransportOperations(messagesToEmit.To
}
}

readonly Counter<long> storedAuditsCounter = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("ingestion", "audits_count"), description: "Stored audit message count");
readonly Counter<long> storedSagasCounter = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("ingestion", "sagas_count"), description: "Stored saga state count");
readonly Histogram<double> commitDuration = Telemetry.Meter.CreateHistogram<double>(Telemetry.CreateInstrumentName("ingestion", "commit_duration"), unit: "ms", description: "Storage unit of work commit duration");

static readonly ILog Logger = LogManager.GetLogger<AuditPersister>();
}
}
15 changes: 15 additions & 0 deletions src/ServiceControl.Audit/Infrastructure/Telemetry.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
namespace ServiceControl.Audit;

using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Metrics;
using NServiceBus;
using OpenTelemetry.Metrics;

static class Telemetry
Expand All @@ -14,4 +17,16 @@ public static void AddAuditIngestionMeters(this MeterProviderBuilder builder)
{
builder.AddMeter(MeterName);
}

public static TagList GetIngestedMessageTags(IDictionary<string, string> headers)
{
var tags = new TagList();

if (headers.TryGetValue(Headers.EnclosedMessageTypes, out var messageType))
{
tags.Add("nservicebus.message_type", messageType);
}

return tags;
}
}