Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ public async Task UpdateUserIndicatorOnEndpoints(List<UpdateUserIndicator> userI
.Where(document => document.SanitizedName.In(updates.Keys) || document.EndpointId.Name.In(updates.Keys));

var documents = await query.ToListAsync(cancellationToken);

// Collect sanitized names needing sibling propagation to avoid issuing a query per document in the loop below.
var sanitizedNameToUserIndicator = new Dictionary<string, string>();

foreach (var document in documents)
{
if (updates.TryGetValue(document.SanitizedName, out var newValueFromSanitizedName))
Expand All @@ -199,14 +203,25 @@ public async Task UpdateUserIndicatorOnEndpoints(List<UpdateUserIndicator> userI
else if (updates.TryGetValue(document.EndpointId.Name, out var newValueFromEndpoint))
{
document.UserIndicator = newValueFromEndpoint;
//update all that match this sanitized name
var sanitizedMatchingQuery = session.Query<EndpointDocument>()
.Where(sanitizedDocument => sanitizedDocument.SanitizedName == document.SanitizedName && sanitizedDocument.EndpointId.Name != document.EndpointId.Name);
var sanitizedMatchingDocuments = await sanitizedMatchingQuery.ToListAsync(cancellationToken);
sanitizedNameToUserIndicator[document.SanitizedName] = newValueFromEndpoint;
}
}

foreach (var matchingDocumentOnSanitizedName in sanitizedMatchingDocuments)
if (sanitizedNameToUserIndicator.Count > 0)
{
// One batched query for all sibling documents, instead of one query per document.
var sanitizedNames = sanitizedNameToUserIndicator.Keys.ToList();
var alreadyLoadedIds = documents.Select(d => d.Id).ToHashSet();

var siblingDocuments = await session.Query<EndpointDocument>()
.Where(d => d.SanitizedName.In(sanitizedNames))
.ToListAsync(cancellationToken);

foreach (var sibling in siblingDocuments.Where(d => !alreadyLoadedIds.Contains(d.Id)))
{
if (sanitizedNameToUserIndicator.TryGetValue(sibling.SanitizedName, out var indicator))
{
matchingDocumentOnSanitizedName.UserIndicator = newValueFromEndpoint;
sibling.UserIndicator = indicator;
}
}
}
Expand Down
32 changes: 32 additions & 0 deletions src/ServiceControl.Persistence.Tests/Throughput/EndpointsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,38 @@ public async Task Should_update_indicators_on_all_endpoint_sources_when_updated_
Assert.That(foundEndpointMonitoring.UserIndicator, Is.EqualTo(userIndicator));
}

[Test]
public async Task Should_update_user_indicators_on_more_than_30_endpoints_without_hitting_session_request_limit()
{
// Arrange
// Each pair shares a sanitized name but has different raw names.
// Updating by raw name (not sanitized name) triggers a sibling propagation query.
// In the original code, that was one DB query per endpoint, exceeding RavenDB's
// default limit of 30 requests per session when 30+ endpoints are updated at once.
const int endpointCount = 30;
var userIndicator = "someIndicator";

for (var i = 0; i < endpointCount; i++)
{
var sanitizedName = $"Endpoint{i}";
await LicensingDataStore.SaveEndpoint(new Endpoint(sanitizedName, ThroughputSource.Audit) { SanitizedName = sanitizedName }, default);
await LicensingDataStore.SaveEndpoint(new Endpoint($"schema.{sanitizedName}", ThroughputSource.Monitoring) { SanitizedName = sanitizedName }, default);
}

var updates = Enumerable.Range(0, endpointCount)
.Select(i => new UpdateUserIndicator { Name = $"schema.Endpoint{i}", UserIndicator = userIndicator })
.ToList();

// Act - must not throw InvalidOperationException due to exceeding session request limit
await LicensingDataStore.UpdateUserIndicatorOnEndpoints(updates, default);

// Assert
var allEndpoints = (await LicensingDataStore.GetAllEndpoints(true, default)).ToList();

Assert.That(allEndpoints, Has.Count.EqualTo(endpointCount * 2));
Assert.That(allEndpoints, Has.All.Matches<Endpoint>(e => e.UserIndicator == userIndicator));
}

[TestCase(10, 5, false)]
[TestCase(10, 20, true)]
public async Task Should_correctly_report_throughput_existence_for_X_days(int daysSinceLastThroughputEntry, int timeFrameToCheck, bool expectedValue)
Expand Down
Loading