Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using MassTransit;
Expand All @@ -11,7 +12,7 @@ public interface IConfigureTransportTestExecution
{
Func<CancellationToken, Task> ConfigureTransportForEndpoint(EndpointConfiguration endpointConfiguration, PublisherMetadata publisherMetadata);

void ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurator configurator);
Func<IReadOnlyCollection<string>, CancellationToken, Task> ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurator configurator);

void ConfigureTransportForConnector(IServiceCollection services, IConfiguration configuration);
Func<IReadOnlyCollection<string>, CancellationToken, Task> ConfigureTransportForConnector(IServiceCollection services, IConfiguration configuration);
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public void ApplyToContext(TestExecutionContext context)
var connectionString = Environment.GetEnvironmentVariable("RabbitMQTransport_ConnectionString");
if (string.IsNullOrWhiteSpace(connectionString))
{
Assert.Ignore("Ignoring because environment variable RabbitMQConnectionString is not available");
Assert.Ignore("Ignoring because environment variable RabbitMQTransport_ConnectionString is not available");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public Func<CancellationToken, Task> ConfigureTransportForEndpoint(EndpointConfi
return _ => Task.CompletedTask;
}

public void ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurator configurator)
public Func<IReadOnlyCollection<string>, CancellationToken, Task> ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurator configurator)
{
var region = FallbackRegionFactory.GetRegionEndpoint().SystemName;

Expand All @@ -29,15 +29,17 @@ public void ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurato

cfg.ConfigureEndpoints(context, new DefaultEndpointNameFormatter(NamePrefixGenerator.GetNamePrefix(), false));
});
return (_, _) => Task.CompletedTask;
}

public void ConfigureTransportForConnector(IServiceCollection services, IConfiguration configuration)
public Func<IReadOnlyCollection<string>, CancellationToken, Task> ConfigureTransportForConnector(IServiceCollection services, IConfiguration configuration)
{
services.UsingAmazonSqs(transport =>
{
transport.QueueNamePrefix = NamePrefixGenerator.GetNamePrefix();
transport.TopicNamePrefix = NamePrefixGenerator.GetNamePrefix();
transport.QueueNameGenerator = TestNameHelper.GetSqsQueueName;
});
return (_, _) => Task.CompletedTask;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,20 @@ public Func<CancellationToken, Task> ConfigureTransportForEndpoint(EndpointConfi
return Cleanup;
}

public void ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurator configurator)
public Func<IReadOnlyCollection<string>, CancellationToken, Task> ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurator configurator)
{
configurator.UsingAzureServiceBus((context, cfg) =>
{
cfg.Host(connectionString);
cfg.ConfigureEndpoints(context);
});
return (_, _) => Task.CompletedTask;
}

public void ConfigureTransportForConnector(IServiceCollection services, IConfiguration configuration)
public Func<IReadOnlyCollection<string>, CancellationToken, Task> ConfigureTransportForConnector(IServiceCollection services, IConfiguration configuration)
{
services.UsingAzureServiceBus(configuration, connectionString);
return (_, _) => Task.CompletedTask;
}

Task Cleanup(CancellationToken cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public Func<CancellationToken, Task> ConfigureTransportForEndpoint(EndpointConfi
return Cleanup;
}

public void ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurator configurator)
public Func<IReadOnlyCollection<string>, CancellationToken, Task> ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurator configurator)
{
configurator.UsingAzureServiceBus((context, cfg) =>
{
Expand All @@ -29,11 +29,13 @@ public void ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurato
sb.ConfigureDeadLetterQueueErrorTransport();
}
});
return (_, _) => Task.CompletedTask;
}

public void ConfigureTransportForConnector(IServiceCollection services, IConfiguration configuration)
public Func<IReadOnlyCollection<string>, CancellationToken, Task> ConfigureTransportForConnector(IServiceCollection services, IConfiguration configuration)
{
services.UsingAzureServiceBus(configuration, connectionString, true);
return (_, _) => Task.CompletedTask;
}

Task Cleanup(CancellationToken cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[*.cs]

# Justification: Test project
dotnet_diagnostic.CA2007.severity = none
dotnet_diagnostic.PS0004.severity = none # A parameter of type CancellationToken on a private delegate or method should be required
dotnet_diagnostic.PS0018.severity = none # Add a CancellationToken parameter

# Justification: Tests don't support cancellation and don't need to forward IMessageHandlerContext.CancellationToken
dotnet_diagnostic.NSB0002.severity = suggestion
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\ServiceControl.Connector.MassTransit.AcceptanceTesting\ServiceControl.Connector.MassTransit.AcceptanceTesting.csproj" />
<ProjectReference Include="..\ServiceControl.Connector.MassTransit.RabbitMQ\ServiceControl.Connector.MassTransit.RabbitMQ.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="GitHubActionsTestLogger" Version="3.0.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="18.3.0" />
<PackageReference Include="NUnit" Version="4.5.1" />
<PackageReference Include="NUnit.Analyzers" Version="4.12.0" />
<PackageReference Include="NUnit3TestAdapter" Version="6.1.0" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="MassTransit.RabbitMQ" Version="8.3.1" />
</ItemGroup>

<ItemGroup>
<Compile Include="..\ServiceControl.Connector.MassTransit.AcceptanceTests\Shared\**\*.cs" LinkBase="Shared" />
<Compile Include="..\ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ\**\*.cs" Exclude="..\ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ\obj\**\*"/>
<Compile Remove="..\ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ\TestSuiteConfiguration.cs" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[assembly: RabbitMQTest]

public partial class TestSuiteConfiguration
{
public IConfigureTransportTestExecution CreateTransportConfiguration() => new ConfigureRabbitMQTransportTestExecution(QueueType.Classic);
public Task Cleanup() => Task.CompletedTask;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,21 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using NServiceBus.AcceptanceTesting.Support;
using NServiceBus.Transport;
using ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ;

class ConfigureRabbitMQTransportTestExecution : IConfigureTransportTestExecution
class ConfigureRabbitMQTransportTestExecution(QueueType queueType = QueueType.Quorum) : IConfigureTransportTestExecution
{
TestRabbitMQTransport? transport;

public Func<CancellationToken, Task> ConfigureTransportForEndpoint(EndpointConfiguration endpointConfiguration, PublisherMetadata publisherMetadata)
{
var transport = new RabbitMQTransport(
RoutingTopology.Conventional(QueueType.Quorum), "host=localhost", false);
transport = new TestRabbitMQTransport(RoutingTopology.Conventional(queueType), "host=localhost", false);
endpointConfiguration.UseTransport(transport);
return Cleanup;
}

public void ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurator configurator)
public Func<IReadOnlyCollection<string>, CancellationToken, Task> ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurator configurator)
{
configurator.UsingRabbitMq((context, cfg) =>
{
Expand All @@ -32,19 +35,72 @@ public void ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurato
{
if (cfg is IRabbitMqReceiveEndpointConfigurator rmq)
{
rmq.SetQuorumQueue();
if (queueType == QueueType.Quorum)
{
rmq.SetQuorumQueue();
}
}
});

return (queuesToDelete, _) =>
{
DeleteQueues(queuesToDelete);
return Task.CompletedTask;
};
}

public void ConfigureTransportForConnector(IServiceCollection services, IConfiguration configuration)
public Func<IReadOnlyCollection<string>, CancellationToken, Task> ConfigureTransportForConnector(IServiceCollection services, IConfiguration configuration)
{
services.UsingRabbitMQ("host=localhost", new Uri("http://localhost:15672/"), "guest", "guest");
services.UsingRabbitMQ("host=localhost", new Uri("http://localhost:15672/"), "guest", "guest", queueType);
return (queuesToDelete, _) =>
{
DeleteQueues(queuesToDelete);
return Task.CompletedTask;
};
}

Task Cleanup(CancellationToken cancellationToken)
{
//TODO?
PurgeQueues();
return Task.CompletedTask;
}

void PurgeQueues()
{
if (transport == null)
{
return;
}

DeleteQueues(transport.QueuesToCleanup.ToHashSet());
}

static void DeleteQueues(IReadOnlyCollection<string> queues)
{
using var connection = ConnectionHelper.ConnectionFactory.CreateConnection("Test Queue Purger");
using var channel = connection.CreateModel();
foreach (var queue in queues)
{
try
{
channel.QueueDelete(queue, false, false);
}
catch (Exception ex)
{
Console.WriteLine("Unable to clear queue {0}: {1}", queue, ex);
}
}
}

class TestRabbitMQTransport(RoutingTopology routingTopology, string connectionString, bool enableDelayedDelivery) : RabbitMQTransport(routingTopology, connectionString, enableDelayedDelivery)
{
public override async Task<TransportInfrastructure> Initialize(HostSettings hostSettings, ReceiveSettings[] receivers, string[] sendingAddresses, CancellationToken cancellationToken = default)
{
var infrastructure = await base.Initialize(hostSettings, receivers, sendingAddresses, cancellationToken);
QueuesToCleanup.AddRange(infrastructure.Receivers.Select(x => x.Value.ReceiveAddress).Concat(sendingAddresses).Distinct());
return infrastructure;
}

public List<string> QueuesToCleanup { get; } = [];
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
namespace ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ;

using System;
using System.Security.Authentication;
using global::RabbitMQ.Client;

public static class ConnectionHelper
{
static Lazy<ConnectionFactory> connectionFactory = new(() =>
{
var connectionConfiguration = AdapterRabbitMqConfiguration.ConnectionConfiguration.Create("host=localhost", "AcceptanceTests");

var factory = new ConnectionFactory
{
AutomaticRecoveryEnabled = true,
HostName = connectionConfiguration.Host,
Port = connectionConfiguration.Port,
VirtualHost = connectionConfiguration.VirtualHost,
UserName = connectionConfiguration.UserName ?? "guest",
Password = connectionConfiguration.Password ?? "guest"
};

factory.Ssl.ServerName = factory.HostName;
factory.Ssl.Certs = null;
factory.Ssl.Version = SslProtocols.Tls12;
factory.Ssl.Enabled = connectionConfiguration.UseTls;

return factory;
});

public static ConnectionFactory ConnectionFactory => connectionFactory.Value;
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public override async Task Start(CancellationToken cancellationToken = default)
configuration,
provider.GetRequiredService<IHostApplicationLifetime>()));
}
transportConfig.ConfigureTransportForConnector(services, hostContext.Configuration);
cleanup = transportConfig.ConfigureTransportForConnector(services, hostContext.Configuration);
});

host = builder.Build();
Expand All @@ -92,9 +92,14 @@ public override async Task Stop(CancellationToken cancellationToken = default)
finally
{
host.Dispose();
if (cleanup != null)
{
await cleanup([errorQueue, serviceControlQueue ?? "Particular.ServiceControl", returnQueue, $"{returnQueue}.poison", .. queueNamesToMonitor], cancellationToken);
}
}
}

IHost? host;
Func<string[], CancellationToken, Task>? cleanup;
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using MassTransit;
using MassTransit.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -30,7 +31,7 @@ public override async Task Start(CancellationToken cancellationToken = default)
services.AddMassTransit(x =>
{
busConfig(x);
transportConfig.ConfigureTransportForMassTransitEndpoint(x);
cleanup = transportConfig.ConfigureTransportForMassTransitEndpoint(x);
});
hostConfig(hostContext, services);
services.AddSingleton((TContext)scenarioContext);
Expand All @@ -47,16 +48,28 @@ public override async Task Stop(CancellationToken cancellationToken = default)
return;
}

var queuesToDelete = new List<string>();
try
{
var consumerRegistrations = host.Services.GetServices<IConsumerRegistration>();
foreach (var registration in consumerRegistrations)
{
// we assume the endpoint name is already set and formatted
queuesToDelete.Add(registration.GetDefinition(null!).GetEndpointName(null!));
}
await host.StopAsync(cancellationToken).ConfigureAwait(false);
}
finally
{
host.Dispose();
if (cleanup != null)
{
await cleanup(queuesToDelete, cancellationToken).ConfigureAwait(false);
}
}
}

IHost? host;
Func<IReadOnlyCollection<string>, CancellationToken, Task>? cleanup;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@ public static IScenarioWithEndpointBehavior<TContext> WithMassTransit<TContext>(
string name,
Action<IBusRegistrationConfigurator> busConfig,
Action<HostBuilderContext, IServiceCollection>? hostConfig = null)
where TContext : ScenarioContext
{
return scenario.WithComponent(new MassTransitComponent<TContext>(name, busConfig, hostConfig ?? ((_, _) =>
{
})));
}
where TContext : ScenarioContext =>
scenario.WithComponent(new MassTransitComponent<TContext>(name, busConfig, hostConfig ?? ((_, _) => { })));
}
Loading