Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
</PropertyGroup>
<ItemGroup Label="Versions for direct package references">
<PackageVersion Include="Autofac" Version="8.2.0" />
<PackageVersion Include="AWSSDK.CloudWatch" Version="4.0.1" />
<PackageVersion Include="AWSSDK.CloudWatch" Version="4.0.6.1" />
<PackageVersion Include="AWSSDK.SecurityToken" Version="4.0.0.4" />
<PackageVersion Include="Azure.Identity" Version="1.13.2" />
<PackageVersion Include="Azure.Monitor.Query.Metrics" Version="1.0.0" />
Expand Down
54 changes: 35 additions & 19 deletions src/ServiceControl.Transports.SQS.Tests/AmazonSQSQueryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ class AmazonSQSQueryTests : TransportTestFixture
public void Initialise()
{
provider = new();
provider.SetUtcNow(DateTimeOffset.UtcNow);

var kiribati = TimeZoneInfo.FindSystemTimeZoneById("Pacific/Kiritimati");
var furthestAhead = TimeZoneInfo.ConvertTime(DateTimeOffset.UtcNow, kiribati);
provider.SetUtcNow(furthestAhead);
transportSettings = new TransportSettings
{
ConnectionString = configuration.ConnectionString,
Expand Down Expand Up @@ -94,11 +97,9 @@ public async Task TestConnectionWithValidSettings()
}

[Test]
[CancelAfter(2 * 60 * 1000)]
public async Task RunScenario()
{
// We need to wait a bit of time, to ensure AWS metrics are retrievable
using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromMinutes(6));
CancellationToken token = cancellationTokenSource.Token;
const int numMessagesToIngest = 15;

await CreateTestQueue(transportSettings.EndpointName);
Expand All @@ -111,37 +112,52 @@ public async Task RunScenario()
{
dictionary.Add(AmazonSQSQuery.AmazonSQSSettings.AccessKey, connectionString.AccessKey);
}

if (!string.IsNullOrEmpty(connectionString.SecretKey))
{
dictionary.Add(AmazonSQSQuery.AmazonSQSSettings.SecretKey, connectionString.SecretKey);
}

if (!string.IsNullOrEmpty(connectionString.Region))
{
dictionary.Add(AmazonSQSQuery.AmazonSQSSettings.Region, connectionString.Region);
}

query.Initialize(new ReadOnlyDictionary<string, string>(dictionary));
query.Initialize(dictionary.AsReadOnly());

await Task.Delay(TimeSpan.FromMinutes(2), token);
DateTime startDate = provider.GetUtcNow().DateTime;
provider.Advance(TimeSpan.FromDays(1));

var queueNames = new List<IBrokerQueue>();
await foreach (IBrokerQueue queueName in query.GetQueueNames(token))
while (!TestContext.CurrentContext.CancellationToken.IsCancellationRequested)
{
queueNames.Add(queueName);
}
await Task.Delay(TimeSpan.FromSeconds(5), TestContext.CurrentContext.CancellationToken);

IBrokerQueue queue = queueNames.Find(name => name.QueueName == $"{connectionString.QueueNamePrefix}{transportSettings.EndpointName}");
Assert.That(queue, Is.Not.Null);
var queueNames = new List<IBrokerQueue>();
await foreach (IBrokerQueue queueName in query.GetQueueNames(TestContext.CurrentContext.CancellationToken))
{
queueNames.Add(queueName);
}

long total = 0L;
IBrokerQueue queue = queueNames.Find(name => name.QueueName == $"{connectionString.QueueNamePrefix}{transportSettings.EndpointName}");

DateTime startDate = provider.GetUtcNow().DateTime;
provider.Advance(TimeSpan.FromDays(1));
await foreach (QueueThroughput queueThroughput in query.GetThroughputPerDay(queue, DateOnly.FromDateTime(startDate), token))
{
total += queueThroughput.TotalThroughput;
if (queue == null)
{
continue;
}

long total = 0L;

await foreach (QueueThroughput queueThroughput in query.GetThroughputPerDay(queue, DateOnly.FromDateTime(startDate), TestContext.CurrentContext.CancellationToken))
{
total += queueThroughput.TotalThroughput;
}

if (total == numMessagesToIngest)
{
return;
}
}

Assert.That(total, Is.EqualTo(numMessagesToIngest));
Assert.Fail("Timeout waiting for expected throughput to be report");
}
}
36 changes: 26 additions & 10 deletions src/ServiceControl.Transports.SQS/AmazonSQSQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -197,23 +197,33 @@ public override async IAsyncEnumerable<QueueThroughput> GetThroughputPerDay(IBro
DateOnly startDate,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var endDate = DateOnly.FromDateTime(timeProvider.GetUtcNow().DateTime).AddDays(-1);
var utcNow = timeProvider.GetUtcNow();
var endDate = DateOnly.FromDateTime(utcNow.DateTime).AddDays(-1); // Query date up to but not including today
Comment thread
mauroservienti marked this conversation as resolved.

if (endDate < startDate)
{
yield break;
}

var startUtc = startDate.ToDateTime(TimeOnly.MinValue, DateTimeKind.Utc);
var endUtc = endDate.AddDays(1).ToDateTime(TimeOnly.MinValue, DateTimeKind.Utc);

const int SecondsInDay = 24 * 60 * 60;

var req = new GetMetricStatisticsRequest
{
Namespace = "AWS/SQS",
MetricName = "NumberOfMessagesDeleted",
StartTime = startDate.ToDateTime(TimeOnly.MinValue),
EndTime = endDate.ToDateTime(TimeOnly.MaxValue),
Period = 24 * 60 * 60, // 1 day
StartTime = startUtc,
EndTime = endUtc, // exclusive
Period = SecondsInDay,
Statistics = ["Sum"],
Dimensions = [
new Dimension { Name = "QueueName", Value = brokerQueue.QueueName }
Dimensions =
[
new Dimension
{
Name = "QueueName", Value = brokerQueue.QueueName
}
]
};

Expand All @@ -228,14 +238,20 @@ public override async IAsyncEnumerable<QueueThroughput> GetThroughputPerDay(IBro
currentDate = currentDate.AddDays(1);
}

// Cloudwatch returns data points per 5 minutes in UTC
foreach (var datapoint in resp.Datapoints ?? [])
{
// There is a bug in the AWS SDK. The timestamp is actually UTC time, eventhough the DateTime returned type says Local
// See https://github.com/aws/aws-sdk-net/issues/167
// So do not convert the timestamp to UTC time!
logger.LogInformation("\tDatapoint {Timestamp:O} {Sum} {Unit}", datapoint.Timestamp, datapoint.Sum, datapoint.Unit);
if (datapoint.Timestamp.HasValue)
{
data[DateOnly.FromDateTime(datapoint.Timestamp.Value)].TotalThroughput = (long)datapoint.Sum.GetValueOrDefault(0);
if (data.TryGetValue(DateOnly.FromDateTime(datapoint.Timestamp.Value), out var queueThroughput))
{
queueThroughput.TotalThroughput = (long)datapoint.Sum.GetValueOrDefault(0);
}
else
{
logger.LogWarning("Datapoint for unknown date {Timestamp:O}", datapoint.Timestamp);
}
}
}

Expand Down