-
Notifications
You must be signed in to change notification settings - Fork 49
Expand file tree
/
Copy pathBodyStorageEnricher.cs
More file actions
119 lines (100 loc) · 5.1 KB
/
BodyStorageEnricher.cs
File metadata and controls
119 lines (100 loc) · 5.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
namespace ServiceControl.Audit.Auditing.BodyStorage
{
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using NServiceBus;
using ServiceControl.Audit.Persistence;
using ServiceControl.Infrastructure;
public class BodyStorageEnricher(IBodyStorage bodyStorage, PersistenceSettings settings, ILogger<BodyStorageEnricher> logger)
{
public async ValueTask StoreAuditMessageBody(ReadOnlyMemory<byte> body, ProcessedMessage processedMessage, CancellationToken cancellationToken)
{
var bodySize = body.Length;
processedMessage.MessageMetadata.Add("ContentLength", bodySize);
if (bodySize == 0)
{
return;
}
var contentType = GetContentType(processedMessage.Headers, "text/plain");
processedMessage.MessageMetadata.Add("ContentType", contentType);
var stored = await TryStoreBody(body, processedMessage, bodySize, contentType, cancellationToken);
if (!stored)
{
processedMessage.MessageMetadata.Add("BodyNotStored", true);
}
}
static string GetContentType(IReadOnlyDictionary<string, string> headers, string defaultContentType)
=> headers.GetValueOrDefault(Headers.ContentType, defaultContentType);
async ValueTask<bool> TryStoreBody(ReadOnlyMemory<byte> body, ProcessedMessage processedMessage, int bodySize, string contentType, CancellationToken cancellationToken)
{
var bodyId = MessageId(processedMessage.Headers);
var bodyUrl = string.Format(BodyUrlFormatString, bodyId);
var isBelowMaxSize = bodySize <= settings.MaxBodySizeToStore;
var storedInBodyStorage = false;
if (isBelowMaxSize)
{
var avoidsLargeObjectHeap = bodySize < LargeObjectHeapThreshold;
var isBinary = IsBinary(processedMessage.Headers);
var useEmbeddedBody = avoidsLargeObjectHeap && !isBinary;
var useBodyStore = !useEmbeddedBody;
if (useEmbeddedBody)
{
try
{
if (settings.EnableFullTextSearchOnBodies)
{
processedMessage.MessageMetadata.Add("Body", enc.GetString(body.Span));
}
else
{
processedMessage.Body = enc.GetString(body.Span);
}
}
catch (DecoderFallbackException e)
{
useBodyStore = true;
logger.LogInformation("Body for {BodyId} could not be stored embedded, fallback to body storage ({ErrorMessage})", bodyId, e.Message);
}
}
if (useBodyStore)
{
await StoreBodyInBodyStorage(body, bodyId, contentType, bodySize, cancellationToken);
storedInBodyStorage = true;
}
}
processedMessage.MessageMetadata.Add("BodyUrl", bodyUrl);
return storedInBodyStorage;
}
async Task StoreBodyInBodyStorage(ReadOnlyMemory<byte> body, string bodyId, string contentType, int bodySize, CancellationToken cancellationToken)
{
await using var bodyStream = new ReadOnlyStream(body);
await bodyStorage.Store(bodyId, contentType, bodySize, bodyStream, cancellationToken);
}
static string MessageId(IReadOnlyDictionary<string, string> headers)
=> headers.GetValueOrDefault(Headers.MessageId);
static bool IsBinary(IReadOnlyDictionary<string, string> headers)
{
if (headers.TryGetValue(Headers.ContentType, out var contentType))
{
// Used by HTTP spec, presence indicates compressed binary payload:
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding
var hasContentEncodingHeader = headers.ContainsKey("Content-Encoding");
// Checking for text, json and xml gets the job done. All other types are pretty much all binary
var isText = contentType.StartsWith("text/")
|| contentType.Contains("xml") // matches +xml and /xml
|| contentType.Contains("json"); // matches +json and /json;
isText = isText && !contentType.Contains("binary"); // Backwards compatibility with prior binary detection logic untill SC v4.22.0
return !isText || hasContentEncodingHeader;
}
return true;
}
static readonly Encoding enc = new UTF8Encoding(true, true);
// large object heap starts above 85000 bytes and not above 85 KB!
public const int LargeObjectHeapThreshold = 85 * 1000;
public const string BodyUrlFormatString = "/messages/{0}/body";
}
}