-
Notifications
You must be signed in to change notification settings - Fork 49
Expand file tree
/
Copy pathSQSTransportCustomization.cs
More file actions
126 lines (105 loc) · 5.96 KB
/
SQSTransportCustomization.cs
File metadata and controls
126 lines (105 loc) · 5.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
namespace ServiceControl.Transports.SQS
{
using System;
using System.Linq;
using System.Runtime.CompilerServices;
using Amazon;
using Amazon.Runtime;
using Amazon.S3;
using Amazon.SimpleNotificationService;
using Amazon.SQS;
using BrokerThroughput;
using Microsoft.Extensions.DependencyInjection;
using NServiceBus;
using NServiceBus.Configuration.AdvancedExtensibility;
using NServiceBus.Logging;
public class SQSTransportCustomization : TransportCustomization<SqsTransport>
{
protected override void CustomizeTransportForPrimaryEndpoint(EndpointConfiguration endpointConfiguration, SqsTransport transportDefinition, TransportSettings transportSettings)
{
var routing = new RoutingSettings(endpointConfiguration.GetSettings());
routing.EnableMessageDrivenPubSubCompatibilityMode();
}
//Do not ConfigurePubSub for send-only endpoint
protected override void CustomizeTransportForAuditEndpoint(EndpointConfiguration endpointConfiguration, SqsTransport transportDefinition, TransportSettings transportSettings) { }
protected override void CustomizeTransportForMonitoringEndpoint(EndpointConfiguration endpointConfiguration, SqsTransport transportDefinition, TransportSettings transportSettings) { }
protected override void AddTransportForPrimaryCore(IServiceCollection services,
TransportSettings transportSettings)
{
services.AddSingleton<IBrokerThroughputQuery, AmazonSQSQuery>();
}
protected override void AddTransportForMonitoringCore(IServiceCollection services, TransportSettings transportSettings)
{
services.AddSingleton<IProvideQueueLength, QueueLengthProvider>();
services.AddHostedService(provider => provider.GetRequiredService<IProvideQueueLength>());
}
protected override SqsTransport CreateTransport(TransportSettings transportSettings, TransportTransactionMode preferredTransactionMode = TransportTransactionMode.ReceiveOnly)
{
var builder = new SQSTransportConnectionString(transportSettings.ConnectionString);
IAmazonSQS sqsClient;
IAmazonSimpleNotificationService snsClient;
bool alwaysLoadFromEnvironmentVariable = false;
if (builder.AccessKey != null || builder.SecretKey != null)
{
PromoteEnvironmentVariableFromConnectionString(builder.AccessKey, "AWS_ACCESS_KEY_ID");
PromoteEnvironmentVariableFromConnectionString(builder.SecretKey, "AWS_SECRET_ACCESS_KEY");
PromoteEnvironmentVariableFromConnectionString(builder.Region, "AWS_REGION");
_ = RegionEndpoint.EnumerableAllRegions
.SingleOrDefault(x => x.SystemName == builder.Region) ??
throw new ArgumentException($"Unknown region: \"{builder.Region}\"");
// if the user provided the access key and secret access key they should always be loaded from environment credentials
alwaysLoadFromEnvironmentVariable = true;
sqsClient = new AmazonSQSClient(new EnvironmentVariablesAWSCredentials());
snsClient = new AmazonSimpleNotificationServiceClient(new EnvironmentVariablesAWSCredentials());
}
else
{
//See https://docs.aws.amazon.com/sdk-for-net/v3/developer-guide/net-dg-config-creds.html#creds-assign
log.Info(
"BasicAWSCredentials have not been supplied in the connection string. Attempting to use existing environment or IAM role credentials for SQS Client.");
sqsClient = new AmazonSQSClient();
snsClient = new AmazonSimpleNotificationServiceClient();
}
var transport = new SqsTransport(sqsClient, snsClient, disableUnrestrictedDelayedDelivery: true)
{
MaxAutoMessageVisibilityRenewalDuration = TimeSpan.Zero
};
if (!string.IsNullOrEmpty(builder.QueueNamePrefix))
{
transport.QueueNamePrefix = builder.QueueNamePrefix;
}
if (!string.IsNullOrEmpty(builder.TopicNamePrefix))
{
transport.TopicNamePrefix = builder.TopicNamePrefix;
}
if (!string.IsNullOrEmpty(builder.S3BucketForLargeMessages))
{
string keyPrefixAsString = string.Empty;
if (builder.S3KeyPrefix != null)
{
keyPrefixAsString = builder.S3KeyPrefix;
}
IAmazonS3 s3Client;
if (alwaysLoadFromEnvironmentVariable)
{
s3Client = new AmazonS3Client(new EnvironmentVariablesAWSCredentials());
}
else
{
log.Info(
"BasicAWSCredentials have not been supplied in the connection string. Attempting to use existing environment or IAM role credentials for S3 Client.");
s3Client = new AmazonS3Client();
}
transport.S3 = new S3Settings(builder.S3BucketForLargeMessages, keyPrefixAsString, s3Client);
}
transport.DoNotWrapOutgoingMessages = builder.DoNotWrapOutgoingMessages;
transport.ReserveBytesInMessageSizeCalculation = builder.ReservedBytesInMessageSize;
transport.TransportTransactionMode = transport.GetSupportedTransactionModes().Contains(preferredTransactionMode) ? preferredTransactionMode : TransportTransactionMode.ReceiveOnly;
return transport;
}
static void
PromoteEnvironmentVariableFromConnectionString(string value, string environmentVariableName) =>
Environment.SetEnvironmentVariable(environmentVariableName, value, EnvironmentVariableTarget.Process);
static readonly ILog log = LogManager.GetLogger<SQSTransportCustomization>();
}
}