Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace ServiceControl.AcceptanceTests.RavenDB.Recoverability.MessageFailures
{
using System;
using System.Threading;
using System.Threading.Tasks;
using AcceptanceTesting;
using AcceptanceTesting.EndpointTemplates;
Expand Down Expand Up @@ -75,7 +76,7 @@ public async Task It_can_be_reimported()

class MessageFailedHandler(MyContext scenarioContext) : IDomainHandler<MessageFailed>
{
public Task Handle(MessageFailed domainEvent)
public Task Handle(MessageFailed domainEvent, CancellationToken cancellationToken)
{
scenarioContext.MessageFailedEventPublished = true;
return Task.CompletedTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ServiceControl.Audit.Auditing.BodyStorage;

Expand All @@ -15,7 +16,7 @@ public InMemoryAttachmentsBodyStorage()
messageBodies = [];
}

public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream)
public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream, CancellationToken cancellationToken)
{
var messageBody = messageBodies.FirstOrDefault(w => w.BodyId == bodyId);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace ServiceControl.Audit.Persistence.InMemory
{
using System;
using System.Threading;
using System.Threading.Tasks;
using Auditing.BodyStorage;
using ServiceControl.Audit.Auditing;
Expand All @@ -15,21 +16,21 @@ class InMemoryAuditIngestionUnitOfWork(
{
public ValueTask DisposeAsync() => ValueTask.CompletedTask;

public Task RecordKnownEndpoint(KnownEndpoint knownEndpoint)
public Task RecordKnownEndpoint(KnownEndpoint knownEndpoint, CancellationToken cancellationToken)
{
dataStore.knownEndpoints.Add(knownEndpoint);
return Task.CompletedTask;
}

public async Task RecordProcessedMessage(ProcessedMessage processedMessage, ReadOnlyMemory<byte> body)
public async Task RecordProcessedMessage(ProcessedMessage processedMessage, ReadOnlyMemory<byte> body, CancellationToken cancellationToken)
{
if (!body.IsEmpty)
{
await bodyStorageEnricher.StoreAuditMessageBody(body, processedMessage);
await bodyStorageEnricher.StoreAuditMessageBody(body, processedMessage, cancellationToken);
}
await dataStore.SaveProcessedMessage(processedMessage);
}

public Task RecordSagaSnapshot(SagaSnapshot sagaSnapshot) => dataStore.SaveSagaSnapshot(sagaSnapshot);
public Task RecordSagaSnapshot(SagaSnapshot sagaSnapshot, CancellationToken cancellationToken) => dataStore.SaveSagaSnapshot(sagaSnapshot);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace ServiceControl.Audit.Persistence.InMemory
{
using System.Threading;
using System.Threading.Tasks;
using ServiceControl.Audit.Auditing.BodyStorage;
using ServiceControl.Audit.Persistence.UnitOfWork;
Expand All @@ -12,7 +13,7 @@ public InMemoryAuditIngestionUnitOfWorkFactory(InMemoryAuditDataStore dataStore,
bodyStorageEnricher = new BodyStorageEnricher(bodyStorage, settings);
}

public ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize)
public ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize, CancellationToken cancellationToken)
{
//The batchSize argument is ignored: the in-memory storage implementation doesn't support batching.
return new ValueTask<IAuditIngestionUnitOfWork>(new InMemoryAuditIngestionUnitOfWork(dataStore, bodyStorageEnricher));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace ServiceControl.Audit.Persistence.RavenDB
{
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Auditing.BodyStorage;
using Raven.Client.Documents.BulkInsert;
Expand All @@ -11,15 +12,15 @@ class RavenAttachmentsBodyStorage(
int settingsMaxBodySizeToStore)
: IBodyStorage
{
public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream)
public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream, CancellationToken cancellationToken)
{
if (bodySize > settingsMaxBodySizeToStore)
{
return Task.CompletedTask;
}

return bulkInsert.AttachmentsFor(bodyId)
.StoreAsync("body", bodyStream, contentType);
.StoreAsync("body", bodyStream, contentType, cancellationToken);
}

public async Task<StreamResult> TryFetch(string bodyId)
Comment thread
ramonsmits marked this conversation as resolved.
Outdated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class RavenAuditIngestionUnitOfWork(
IBodyStorage bodyStorage)
: IAuditIngestionUnitOfWork
{
public async Task RecordProcessedMessage(ProcessedMessage processedMessage, ReadOnlyMemory<byte> body)
public async Task RecordProcessedMessage(ProcessedMessage processedMessage, ReadOnlyMemory<byte> body, CancellationToken cancellationToken)
{
processedMessage.MessageMetadata["ContentLength"] = body.Length;
if (!body.IsEmpty)
Expand All @@ -37,7 +37,7 @@ public async Task RecordProcessedMessage(ProcessedMessage processedMessage, Read
await using var stream = new ReadOnlyStream(body);
var contentType = processedMessage.Headers.GetValueOrDefault(Headers.ContentType, "text/xml");

await bodyStorage.Store(processedMessage.Id, contentType, body.Length, stream);
await bodyStorage.Store(processedMessage.Id, contentType, body.Length, stream, cancellationToken);
}
}

Expand All @@ -47,10 +47,10 @@ MetadataAsDictionary GetExpirationMetadata() =>
[Constants.Documents.Metadata.Expires] = DateTime.UtcNow.Add(auditRetentionPeriod)
};

public Task RecordSagaSnapshot(SagaSnapshot sagaSnapshot)
public Task RecordSagaSnapshot(SagaSnapshot sagaSnapshot, CancellationToken cancellationToken)
=> bulkInsert.StoreAsync(sagaSnapshot, GetExpirationMetadata());

public Task RecordKnownEndpoint(KnownEndpoint knownEndpoint)
public Task RecordKnownEndpoint(KnownEndpoint knownEndpoint, CancellationToken cancellationToken)
=> bulkInsert.StoreAsync(knownEndpoint, GetExpirationMetadata());

public async ValueTask DisposeAsync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,21 @@ class RavenAuditIngestionUnitOfWorkFactory(
MinimumRequiredStorageState customCheckState)
: IAuditIngestionUnitOfWorkFactory
{
public async ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize)
public async ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize, CancellationToken cancellationToken)
{
var timedCancellationSource = new CancellationTokenSource(databaseConfiguration.BulkInsertCommitTimeout);
var bulkInsert = (await documentStoreProvider.GetDocumentStore(timedCancellationSource.Token))
.BulkInsert(new BulkInsertOptions { SkipOverwriteIfUnchanged = true, }, timedCancellationSource.Token);
// DO NOT USE using var, will be disposed by RavenAuditIngestionUnitOfWork
var lifetimeForwardedTimedCancellationSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
lifetimeForwardedTimedCancellationSource.CancelAfter(databaseConfiguration.BulkInsertCommitTimeout);
var bulkInsert = (await documentStoreProvider.GetDocumentStore(lifetimeForwardedTimedCancellationSource.Token))
.BulkInsert(new BulkInsertOptions { SkipOverwriteIfUnchanged = true, }, lifetimeForwardedTimedCancellationSource.Token);

return new RavenAuditIngestionUnitOfWork(
bulkInsert, timedCancellationSource, databaseConfiguration.AuditRetentionPeriod, new RavenAttachmentsBodyStorage(sessionProvider, bulkInsert, databaseConfiguration.MaxBodySizeToStore)
bulkInsert,
lifetimeForwardedTimedCancellationSource, // Transfer ownership for disposal
databaseConfiguration.AuditRetentionPeriod,
new RavenAttachmentsBodyStorage(sessionProvider, bulkInsert, databaseConfiguration.MaxBodySizeToStore)
);
// Intentionally not disposing CTS!
}

public bool CanIngestMore() => customCheckState.CanIngestMore;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ protected string GetManifestPath()
configuration.AuditIngestionUnitOfWorkFactory;

protected ValueTask<IAuditIngestionUnitOfWork> StartAuditUnitOfWork(int batchSize) =>
AuditIngestionUnitOfWorkFactory.StartNew(batchSize);
AuditIngestionUnitOfWorkFactory.StartNew(batchSize, TestContext.CurrentContext.CancellationToken);

protected IServiceProvider ServiceProvider => configuration.ServiceProvider;

Expand Down
13 changes: 7 additions & 6 deletions src/ServiceControl.Audit.Persistence/BodyStorageEnricher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using NServiceBus;
using NServiceBus.Logging;
Expand All @@ -11,7 +12,7 @@

public class BodyStorageEnricher(IBodyStorage bodyStorage, PersistenceSettings settings)
{
public async ValueTask StoreAuditMessageBody(ReadOnlyMemory<byte> body, ProcessedMessage processedMessage)
public async ValueTask StoreAuditMessageBody(ReadOnlyMemory<byte> body, ProcessedMessage processedMessage, CancellationToken cancellationToken)
{
var bodySize = body.Length;
processedMessage.MessageMetadata.Add("ContentLength", bodySize);
Expand All @@ -23,7 +24,7 @@ public async ValueTask StoreAuditMessageBody(ReadOnlyMemory<byte> body, Processe
var contentType = GetContentType(processedMessage.Headers, "text/xml");
processedMessage.MessageMetadata.Add("ContentType", contentType);

var stored = await TryStoreBody(body, processedMessage, bodySize, contentType);
var stored = await TryStoreBody(body, processedMessage, bodySize, contentType, cancellationToken);
if (!stored)
{
processedMessage.MessageMetadata.Add("BodyNotStored", true);
Expand All @@ -33,7 +34,7 @@ public async ValueTask StoreAuditMessageBody(ReadOnlyMemory<byte> body, Processe
static string GetContentType(IReadOnlyDictionary<string, string> headers, string defaultContentType)
=> headers.GetValueOrDefault(Headers.ContentType, defaultContentType);

async ValueTask<bool> TryStoreBody(ReadOnlyMemory<byte> body, ProcessedMessage processedMessage, int bodySize, string contentType)
async ValueTask<bool> TryStoreBody(ReadOnlyMemory<byte> body, ProcessedMessage processedMessage, int bodySize, string contentType, CancellationToken cancellationToken)
{
var bodyId = MessageId(processedMessage.Headers);
var bodyUrl = string.Format(BodyUrlFormatString, bodyId);
Expand Down Expand Up @@ -71,7 +72,7 @@ async ValueTask<bool> TryStoreBody(ReadOnlyMemory<byte> body, ProcessedMessage p

if (useBodyStore)
{
await StoreBodyInBodyStorage(body, bodyId, contentType, bodySize);
await StoreBodyInBodyStorage(body, bodyId, contentType, bodySize, cancellationToken);
storedInBodyStorage = true;
}
}
Expand All @@ -80,10 +81,10 @@ async ValueTask<bool> TryStoreBody(ReadOnlyMemory<byte> body, ProcessedMessage p
return storedInBodyStorage;
}

async Task StoreBodyInBodyStorage(ReadOnlyMemory<byte> body, string bodyId, string contentType, int bodySize)
async Task StoreBodyInBodyStorage(ReadOnlyMemory<byte> body, string bodyId, string contentType, int bodySize, CancellationToken cancellationToken)
{
await using var bodyStream = new ReadOnlyStream(body);
await bodyStorage.Store(bodyId, contentType, bodySize, bodyStream);
await bodyStorage.Store(bodyId, contentType, bodySize, bodyStream, cancellationToken);
}

static string MessageId(IReadOnlyDictionary<string, string> headers)
Expand Down
3 changes: 2 additions & 1 deletion src/ServiceControl.Audit.Persistence/IBodyStorage.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
namespace ServiceControl.Audit.Auditing.BodyStorage
{
using System.IO;
using System.Threading;
using System.Threading.Tasks;

public interface IBodyStorage
{
Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream);
Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream, CancellationToken cancellationToken);
Task<StreamResult> TryFetch(string bodyId);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
namespace ServiceControl.Audit.Persistence.UnitOfWork
{
using System;
using System.Threading;
using System.Threading.Tasks;
using Auditing;
using Monitoring;
using ServiceControl.SagaAudit;

public interface IAuditIngestionUnitOfWork : IAsyncDisposable
{
Task RecordProcessedMessage(ProcessedMessage processedMessage, ReadOnlyMemory<byte> body = default);
Task RecordSagaSnapshot(SagaSnapshot sagaSnapshot);
Task RecordKnownEndpoint(KnownEndpoint knownEndpoint);
Task RecordProcessedMessage(ProcessedMessage processedMessage, ReadOnlyMemory<byte> body = default, CancellationToken cancellationToken = default);
Task RecordSagaSnapshot(SagaSnapshot sagaSnapshot, CancellationToken cancellationToken = default);
Task RecordKnownEndpoint(KnownEndpoint knownEndpoint, CancellationToken cancellationToken = default);
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
namespace ServiceControl.Audit.Persistence.UnitOfWork
{
using System.Threading;
using System.Threading.Tasks;

public interface IAuditIngestionUnitOfWorkFactory
{
ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize); //Throws if not enough space or some other problem preventing from writing data
ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize, CancellationToken cancellationToken); //Throws if not enough space or some other problem preventing from writing data
bool CanIngestMore();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ namespace ServiceControl.UnitTests.BodyStorage
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Audit.Auditing;
using Audit.Auditing.BodyStorage;
Expand Down Expand Up @@ -33,7 +34,7 @@ public async Task Should_remove_body_when_above_threshold()

var message = new ProcessedMessage(headers, metadata);

await enricher.StoreAuditMessageBody(body, message);
await enricher.StoreAuditMessageBody(body, message, TestContext.CurrentContext.CancellationToken);

Assert.Multiple(() =>
{
Expand Down Expand Up @@ -63,7 +64,7 @@ public async Task Should_remove_body_when_above_threshold_and_binary()

var message = new ProcessedMessage(headers, metadata);

await enricher.StoreAuditMessageBody(body, message);
await enricher.StoreAuditMessageBody(body, message, TestContext.CurrentContext.CancellationToken);

Assert.Multiple(() =>
{
Expand Down Expand Up @@ -95,7 +96,7 @@ public async Task Should_store_body_in_metadata_when_below_large_object_heap_and

var message = new ProcessedMessage(headers, metadata);

await enricher.StoreAuditMessageBody(body, message);
await enricher.StoreAuditMessageBody(body, message, TestContext.CurrentContext.CancellationToken);

Assert.Multiple(() =>
{
Expand Down Expand Up @@ -127,7 +128,7 @@ public async Task Should_store_body_in_body_property_when_full_text_disabled_and

var message = new ProcessedMessage(headers, metadata);

await enricher.StoreAuditMessageBody(body, message);
await enricher.StoreAuditMessageBody(body, message, TestContext.CurrentContext.CancellationToken);

Assert.Multiple(() =>
{
Expand Down Expand Up @@ -159,7 +160,7 @@ public async Task Should_store_body_in_storage_when_above_large_object_heap_but_

var message = new ProcessedMessage(headers, metadata);

await enricher.StoreAuditMessageBody(body, message);
await enricher.StoreAuditMessageBody(body, message, TestContext.CurrentContext.CancellationToken);

Assert.Multiple(() =>
{
Expand Down Expand Up @@ -190,7 +191,7 @@ public async Task Should_store_body_in_storage_when_below_threshold_and_binary()

var message = new ProcessedMessage(headers, metadata);

await enricher.StoreAuditMessageBody(body, message);
await enricher.StoreAuditMessageBody(body, message, TestContext.CurrentContext.CancellationToken);

Assert.Multiple(() =>
{
Expand Down Expand Up @@ -221,7 +222,7 @@ public async Task Should_store_body_in_storage_when_below_threshold()

var message = new ProcessedMessage(headers, metadata);

await enricher.StoreAuditMessageBody(body, message);
await enricher.StoreAuditMessageBody(body, message, TestContext.CurrentContext.CancellationToken);

Assert.Multiple(() =>
{
Expand Down Expand Up @@ -251,7 +252,7 @@ public async Task Should_store_body_in_storage_when_encoding_fails()

var message = new ProcessedMessage(headers, metadata);

await enricher.StoreAuditMessageBody(body, message);
await enricher.StoreAuditMessageBody(body, message, TestContext.CurrentContext.CancellationToken);

Assert.That(fakeStorage.StoredBodySize, Is.GreaterThan(0));
}
Expand All @@ -260,7 +261,7 @@ class FakeBodyStorage : IBodyStorage
{
public int StoredBodySize { get; set; }

public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream)
public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream, CancellationToken cancellationToken)
{
StoredBodySize = bodySize;
return Task.CompletedTask;
Expand Down
Loading