From 8b887183abfd09f21bcab63b86b2c0c5def9ec27 Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Mon, 9 Mar 2026 13:52:39 +0100 Subject: [PATCH 1/8] Add command line parsing and quorum queue support --- .../Commands/ConnectorCommandOptions.cs | 95 +++++++++ .../Commands/HealthCheckCommand.cs | 9 +- .../Commands/QueuesCommand.cs | 8 +- .../Commands/StartupCommand.cs | 8 +- .../ConnectorOptions.cs | 32 +++ .../ConnectorOptionsValidator.cs | 47 +++++ .../HostApplicationBuilderExtensions.cs | 189 +++++++++++++----- ...eControl.Connector.MassTransit.Host.csproj | 1 + .../AdapterRabbitMqConfiguration.cs | 4 +- 9 files changed, 337 insertions(+), 56 deletions(-) create mode 100644 src/ServiceControl.Connector.MassTransit.Host/Commands/ConnectorCommandOptions.cs create mode 100644 src/ServiceControl.Connector.MassTransit.Host/ConnectorOptions.cs create mode 100644 src/ServiceControl.Connector.MassTransit.Host/ConnectorOptionsValidator.cs diff --git a/src/ServiceControl.Connector.MassTransit.Host/Commands/ConnectorCommandOptions.cs b/src/ServiceControl.Connector.MassTransit.Host/Commands/ConnectorCommandOptions.cs new file mode 100644 index 00000000..a4c6e032 --- /dev/null +++ b/src/ServiceControl.Connector.MassTransit.Host/Commands/ConnectorCommandOptions.cs @@ -0,0 +1,95 @@ +namespace ServiceControl.Connector.MassTransit.Host.Commands; + +using System.CommandLine; +using System.CommandLine.Parsing; + +public static class ConnectorCommandOptions +{ + static Option ConnectionString { get; } = new( + "--connection-string", + "The connection string for the transport. Can also be set via CONNECTION_STRING environment variable."); + + static Option TransportType { get; } = new( + "--transport-type", + "The transport type (AzureServiceBus, AzureServiceBusWithDeadLetter, RabbitMQ, AmazonSQS). Can also be set via TRANSPORT_TYPE environment variable."); + + static Option QueuesFile { get; } = new( + "--queues-file", + "Path to the file containing the list of queues to monitor. Can also be set via QUEUES_FILE environment variable."); + + static Option ReturnQueue { get; } = new( + "--return-queue", + "The return queue name. Defaults to 'Particular.ServiceControl.Connector.MassTransit_return'. Can also be set via RETURN_QUEUE environment variable."); + + static Option ErrorQueue { get; } = new( + "--error-queue", + "The error queue name. Defaults to 'error'. Can also be set via ERROR_QUEUE environment variable."); + + static Option ServiceControlQueue { get; } = new( + "--servicecontrol-queue", + "The ServiceControl queue name. Defaults to 'Particular.ServiceControl'. Can also be set via SERVICECONTROL_QUEUE environment variable."); + + static Option RabbitMqManagementApiUrl { get; } = new( + "--rabbitmq-management-api-url", + "The RabbitMQ management API URL (required for RabbitMQ transport). Can also be set via RABBITMQ_MANAGEMENT_API_URL environment variable."); + + static Option RabbitMqManagementApiUsername { get; } = new( + "--rabbitmq-management-api-username", + "The RabbitMQ management API username. Can also be set via RABBITMQ_MANAGEMENT_API_USERNAME environment variable."); + + static Option RabbitMqManagementApiPassword { get; } = new( + "--rabbitmq-management-api-password", + "The RabbitMQ management API password. Can also be set via RABBITMQ_MANAGEMENT_API_PASSWORD environment variable."); + + static Option RabbitMqQueueType { get; } = new( + "--rabbitmq-queue-type", + "The RabbitMQ queue type (Classic or Quorum). Defaults to Quorum. Can also be set via RABBITMQ_QUEUE_TYPE environment variable."); + + public static void AddConnectorOptions(this Command command) + { + command.AddOption(ConnectionString); + command.AddOption(TransportType); + command.AddOption(QueuesFile); + command.AddOption(ReturnQueue); + command.AddOption(ErrorQueue); + command.AddOption(ServiceControlQueue); + command.AddOption(RabbitMqManagementApiUrl); + command.AddOption(RabbitMqManagementApiUsername); + command.AddOption(RabbitMqManagementApiPassword); + command.AddOption(RabbitMqQueueType); + } + + public static string[] BuildArgs(ParseResult parseResult) + { + var args = new List(); + + AddArgIfExplicit(ConnectionString, "ConnectionString"); + AddArgIfExplicit(TransportType, "TransportType"); + AddArgIfExplicit(QueuesFile, "QueuesFile"); + AddArgIfExplicit(ReturnQueue, "ReturnQueue"); + AddArgIfExplicit(ErrorQueue, "ErrorQueue"); + AddArgIfExplicit(ServiceControlQueue, "ServiceControlQueue"); + AddArgIfExplicit(RabbitMqManagementApiUrl, "RabbitMqManagementApiUrl"); + AddArgIfExplicit(RabbitMqManagementApiUsername, "RabbitMqManagementApiUsername"); + AddArgIfExplicit(RabbitMqManagementApiPassword, "RabbitMqManagementApiPassword"); + AddArgIfExplicit(RabbitMqQueueType, "RabbitMqQueueType"); + + return [.. args]; + + void AddArgIfExplicit(Option option, string configKey) + { + // Only forward options that the user explicitly provided on the command line. + // Omitting defaults lets environment variables take effect through normal config precedence. + if (parseResult.FindResultFor(option) == null) + { + return; + } + + var value = parseResult.GetValueForOption(option); + if (!string.IsNullOrEmpty(value)) + { + args.Add($"--{configKey}={value}"); + } + } + } +} diff --git a/src/ServiceControl.Connector.MassTransit.Host/Commands/HealthCheckCommand.cs b/src/ServiceControl.Connector.MassTransit.Host/Commands/HealthCheckCommand.cs index 3f90e83e..fbfbdfe2 100644 --- a/src/ServiceControl.Connector.MassTransit.Host/Commands/HealthCheckCommand.cs +++ b/src/ServiceControl.Connector.MassTransit.Host/Commands/HealthCheckCommand.cs @@ -9,16 +9,21 @@ public class HealthCheckCommand : Command { public HealthCheckCommand() : base("health-check", "Performs a validation that the connector is able to connect to the broker") { + this.AddConnectorOptions(); + this.SetHandler(async context => { - context.ExitCode = await InternalHandler(context.GetCancellationToken()); + var connectorArgs = ConnectorCommandOptions.BuildArgs(context.ParseResult); + + context.ExitCode = await InternalHandler(connectorArgs, context.GetCancellationToken()); }); } - async Task InternalHandler(CancellationToken cancellationToken) + async Task InternalHandler(string[] connectorArgs, CancellationToken cancellationToken) { var builder = Host.CreateEmptyApplicationBuilder(null); builder.Configuration.AddEnvironmentVariables(); + builder.Configuration.AddCommandLine(connectorArgs); builder.UseMassTransitConnector(true); var host = builder.Build(); diff --git a/src/ServiceControl.Connector.MassTransit.Host/Commands/QueuesCommand.cs b/src/ServiceControl.Connector.MassTransit.Host/Commands/QueuesCommand.cs index fbe40752..30073f0a 100644 --- a/src/ServiceControl.Connector.MassTransit.Host/Commands/QueuesCommand.cs +++ b/src/ServiceControl.Connector.MassTransit.Host/Commands/QueuesCommand.cs @@ -13,18 +13,22 @@ public QueuesCommand() : base("queues-list", "List queues") var filterOption = new Option("--filter", () => ".*_error$", "Use a regex to filter queues by name."); AddOption(filterOption); + this.AddConnectorOptions(); + this.SetHandler(async context => { var filter = context.ParseResult.GetValueForOption(filterOption); + var connectorArgs = ConnectorCommandOptions.BuildArgs(context.ParseResult); - context.ExitCode = await InternalHandler(filter!, context.GetCancellationToken()); + context.ExitCode = await InternalHandler(filter!, connectorArgs, context.GetCancellationToken()); }); } - async Task InternalHandler(string filter, CancellationToken cancellationToken) + async Task InternalHandler(string filter, string[] connectorArgs, CancellationToken cancellationToken) { var builder = Host.CreateEmptyApplicationBuilder(null); builder.Configuration.AddEnvironmentVariables(); + builder.Configuration.AddCommandLine(connectorArgs); builder.UseMassTransitConnector(true); var host = builder.Build(); diff --git a/src/ServiceControl.Connector.MassTransit.Host/Commands/StartupCommand.cs b/src/ServiceControl.Connector.MassTransit.Host/Commands/StartupCommand.cs index 9187d760..3cf1da6f 100644 --- a/src/ServiceControl.Connector.MassTransit.Host/Commands/StartupCommand.cs +++ b/src/ServiceControl.Connector.MassTransit.Host/Commands/StartupCommand.cs @@ -24,19 +24,21 @@ public StartupCommand(string[] args) : base("Particular Software ServiceControl AddOption(consoleOption); AddOption(runModeOption); + this.AddConnectorOptions(); this.SetHandler(async context => { var isConsole = context.ParseResult.GetValueForOption(consoleOption); var runMode = context.ParseResult.GetValueForOption(runModeOption); + var connectorArgs = ConnectorCommandOptions.BuildArgs(context.ParseResult); - context.ExitCode = await InternalHandler(runMode, isConsole, args, context.GetCancellationToken()); + context.ExitCode = await InternalHandler(runMode, isConsole, connectorArgs, context.GetCancellationToken()); }); } - async Task InternalHandler(RunMode runMode, bool isConsole, string[] args, CancellationToken cancellationToken) + async Task InternalHandler(RunMode runMode, bool isConsole, string[] connectorArgs, CancellationToken cancellationToken) { - var builder = Host.CreateApplicationBuilder(args); + var builder = Host.CreateApplicationBuilder(connectorArgs); builder.UseMassTransitConnector(runMode == RunMode.Setup); if (isConsole) diff --git a/src/ServiceControl.Connector.MassTransit.Host/ConnectorOptions.cs b/src/ServiceControl.Connector.MassTransit.Host/ConnectorOptions.cs new file mode 100644 index 00000000..52163251 --- /dev/null +++ b/src/ServiceControl.Connector.MassTransit.Host/ConnectorOptions.cs @@ -0,0 +1,32 @@ +namespace ServiceControl.Connector.MassTransit.Host; + +using System.ComponentModel.DataAnnotations; + +public class ConnectorOptions +{ + // Not [Required]: AmazonSQS uses IAM credentials and does not need a connection string. + // Validated conditionally in ConnectorOptionsValidator for other transports. + public string ConnectionString { get; set; } = string.Empty; + + [Required(ErrorMessage = "TRANSPORT_TYPE is required")] + [RegularExpression("^(AzureServiceBus|AzureServiceBusWithDeadLetter|RabbitMQ|AmazonSQS)$", ErrorMessage = "TRANSPORT_TYPE must be one of: AzureServiceBus, AzureServiceBusWithDeadLetter, RabbitMQ, AmazonSQS")] + public string TransportType { get; set; } = string.Empty; + + // Not [Required]: setup-only mode does not need a queues file. + // Validated conditionally in HostApplicationBuilderExtensions when not in setup-only mode. + public string QueuesFile { get; set; } = string.Empty; + + public string ReturnQueue { get; set; } = "Particular.ServiceControl.Connector.MassTransit_return"; + + public string ErrorQueue { get; set; } = "error"; + + public string ServiceControlQueue { get; set; } = "Particular.ServiceControl"; + + public string? RabbitMqManagementApiUrl { get; set; } + + public string? RabbitMqManagementApiUsername { get; set; } + + public string? RabbitMqManagementApiPassword { get; set; } + + public string? RabbitMqQueueType { get; set; } = "Quorum"; +} diff --git a/src/ServiceControl.Connector.MassTransit.Host/ConnectorOptionsValidator.cs b/src/ServiceControl.Connector.MassTransit.Host/ConnectorOptionsValidator.cs new file mode 100644 index 00000000..0912092c --- /dev/null +++ b/src/ServiceControl.Connector.MassTransit.Host/ConnectorOptionsValidator.cs @@ -0,0 +1,47 @@ +namespace ServiceControl.Connector.MassTransit.Host; + +using Microsoft.Extensions.Options; + +public class ConnectorOptionsValidator : IValidateOptions +{ + public ValidateOptionsResult Validate(string? name, ConnectorOptions options) + { + var failures = new List(); + + // ConnectionString is required for all transports except AmazonSQS (which uses IAM credentials). + // Note: [Required] is intentionally absent from ConnectorOptions.ConnectionString. + if (options.TransportType != "AmazonSQS" && string.IsNullOrWhiteSpace(options.ConnectionString)) + { + failures.Add("CONNECTION_STRING is required for this transport. Use --connection-string or set the CONNECTION_STRING environment variable."); + } + + // RabbitMQ-specific validation — DataAnnotations can't handle conditional validation. + if (options.TransportType == "RabbitMQ") + { + if (string.IsNullOrWhiteSpace(options.RabbitMqManagementApiUrl)) + { + failures.Add("RABBITMQ_MANAGEMENT_API_URL is required when using RabbitMQ transport. Use --rabbitmq-management-api-url or set the RABBITMQ_MANAGEMENT_API_URL environment variable."); + } + else if (!Uri.TryCreate(options.RabbitMqManagementApiUrl, UriKind.Absolute, out _)) + { + failures.Add("RABBITMQ_MANAGEMENT_API_URL is invalid. Ensure the value is a valid URL without any quotes (e.g., http://localhost:15672)."); + } + + if (!IsValidQueueType(options.RabbitMqQueueType)) + { + failures.Add($"RABBITMQ_QUEUE_TYPE '{options.RabbitMqQueueType}' is invalid. Must be one of: Classic, Quorum"); + } + } + + return failures.Count > 0 ? ValidateOptionsResult.Fail(failures) : ValidateOptionsResult.Success; + } + + static bool IsValidQueueType(string? queueType) => + queueType?.ToLowerInvariant() switch + { + "classic" => true, + "quorum" => true, + null => true, // Default will be used + _ => false + }; +} diff --git a/src/ServiceControl.Connector.MassTransit.Host/HostApplicationBuilderExtensions.cs b/src/ServiceControl.Connector.MassTransit.Host/HostApplicationBuilderExtensions.cs index 80c0bfd2..f46bb8bd 100644 --- a/src/ServiceControl.Connector.MassTransit.Host/HostApplicationBuilderExtensions.cs +++ b/src/ServiceControl.Connector.MassTransit.Host/HostApplicationBuilderExtensions.cs @@ -3,104 +3,199 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; using ServiceControl.Connector.MassTransit; +using ServiceControl.Connector.MassTransit.Host; static class HostApplicationBuilderExtensions { public static void UseMassTransitConnector(this HostApplicationBuilder builder, bool isSetupOnly) { - var returnQueue = builder.Configuration.GetValue("RETURN_QUEUE") ?? - "Particular.ServiceControl.Connector.MassTransit_return"; - var errorQueue = builder.Configuration.GetValue("ERROR_QUEUE") ?? "error"; - var customChecksQueue = builder.Configuration.GetValue("SERVICECONTROL_QUEUE") ?? "Particular.ServiceControl"; + // Bind to root configuration to match command line args format (e.g., --ConnectionString=value) + builder.Services.AddOptions() + .BindConfiguration(string.Empty) + .PostConfigure(options => ApplyEnvironmentVariableFallbacks(options, builder.Configuration)) + .ValidateDataAnnotations() + .ValidateOnStart(); + + builder.Services.AddSingleton, ConnectorOptionsValidator>(); + + // Read options at composition time for eager validation and transport setup + var options = ReadOptions(builder.Configuration); + + if (!string.IsNullOrEmpty(options.ConnectionString)) + { + try + { + new DbConnectionStringBuilder { ConnectionString = options.ConnectionString }; + } + catch (Exception) + { + throw new Exception("CONNECTION_STRING contains an invalid connection string. Please check the value and try again"); + } + } + var services = builder.Services; - services - .AddSingleton(new Configuration + services.AddSingleton(sp => + { + var opts = sp.GetRequiredService>().Value; + return new Configuration { - ReturnQueue = returnQueue, - ErrorQueue = errorQueue, - ServiceControlQueue = customChecksQueue - }) + ReturnQueue = opts.ReturnQueue, + ErrorQueue = opts.ErrorQueue, + ServiceControlQueue = opts.ServiceControlQueue + }; + }); + + services .AddSingleton() .AddSingleton() .AddSingleton() .AddSingleton() .AddSingleton(TimeProvider.System); - var configuration = builder.Configuration; - var staticQueueList = string.Empty; + services.AddSingleton( + new FileBasedQueueInformationProvider(isSetupOnly ? string.Empty : options.QueuesFile)); if (!isSetupOnly) { - staticQueueList = configuration.GetValue("QUEUES_FILE"); - - if (staticQueueList == null) + if (string.IsNullOrEmpty(options.QueuesFile)) { - throw new Exception("QUEUES_FILE environment variable not set. Please set this to the path of a file containing a list of queues to bridge. You can use the `queues-list` cli command to populate this file."); + throw new Exception("QUEUES_FILE is not set. Please use --queues-file or set the QUEUES_FILE environment variable."); } - if (!File.Exists(staticQueueList)) + if (!File.Exists(options.QueuesFile)) { - throw new Exception($"Queues file ({staticQueueList}) specified does not exist"); + throw new Exception($"Queues file ({options.QueuesFile}) specified does not exist"); } - var content = File.ReadAllText(staticQueueList); + var content = File.ReadAllText(options.QueuesFile); if (string.IsNullOrWhiteSpace(content)) { - throw new Exception($"Queues file ({staticQueueList}) specified is empty. In order for the connector to bridge error queues, you need to specify some queues! You can use the `queues-list` cli command to populate this file."); + throw new Exception($"Queues file ({options.QueuesFile}) specified is empty. In order for the connector to bridge error queues, you need to specify some queues! You can use the `queues-list` cli command to populate this file."); } - services - .AddHostedService() - .AddHostedService() - .AddHostedService(); + services.AddHostedService(); + services.AddHostedService(); + services.AddHostedService(); var diagnosticsData = new DiagnosticsData(); services.AddSingleton(diagnosticsData); builder.Logging.AddProvider(new LastLogEntriesProvider(diagnosticsData)); } - var transportType = configuration.GetValue("TRANSPORT_TYPE"); - var connectionString = configuration.GetValue("CONNECTION_STRING"); - - if (string.IsNullOrEmpty(connectionString)) - { - try - { - new DbConnectionStringBuilder { ConnectionString = connectionString }; - } - catch (Exception) - { - throw new Exception("CONNECTION_STRING environment variable contains an invalid connection string. Please check the value and try again"); - } - } + ConfigureTransport(services, builder.Configuration, options); + } - switch (transportType) + static void ConfigureTransport(IServiceCollection services, IConfiguration configuration, ConnectorOptions options) + { + switch (options.TransportType) { case "AmazonSQS": services.UsingAmazonSqs(); break; case "AzureServiceBus": - services.UsingAzureServiceBus(configuration, - connectionString ?? throw new Exception("CONNECTION_STRING environment variable not set")); + services.UsingAzureServiceBus(configuration, options.ConnectionString, false); break; case "AzureServiceBusWithDeadLetter": - services.UsingAzureServiceBus(configuration, - connectionString ?? throw new Exception("CONNECTION_STRING environment variable not set"), true); + services.UsingAzureServiceBus(configuration, options.ConnectionString, true); break; case "RabbitMQ": - var managementApiValue = configuration.GetValue("RABBITMQ_MANAGEMENT_API_URL") ?? throw new Exception("RABBITMQ_MANAGEMENT_API_URL environment variable not set"); + var managementApiValue = options.RabbitMqManagementApiUrl + ?? throw new Exception("RABBITMQ_MANAGEMENT_API_URL is required for RabbitMQ transport. Use --rabbitmq-management-api-url or set the RABBITMQ_MANAGEMENT_API_URL environment variable."); if (!Uri.TryCreate(managementApiValue, UriKind.Absolute, out var managementApi)) { throw new Exception("RABBITMQ_MANAGEMENT_API_URL is invalid. Ensure the value is a valid url without any quotes e.g. http://localhost:15672"); } - services.UsingRabbitMQ(connectionString ?? throw new Exception("CONNECTION_STRING environment variable not set"), managementApi, configuration.GetValue("RABBITMQ_MANAGEMENT_API_USERNAME"), configuration.GetValue("RABBITMQ_MANAGEMENT_API_PASSWORD")); + var queueType = Enum.TryParse(typeof(QueueType), options.RabbitMqQueueType, true, out var queueTypeOut) + ? (QueueType)queueTypeOut + : QueueType.Quorum; + services.UsingRabbitMQ(options.ConnectionString, managementApi, options.RabbitMqManagementApiUsername, options.RabbitMqManagementApiPassword, queueType); break; default: - throw new NotSupportedException($"TRANSPORT_TYPE environment variable specified has an invalid value ({transportType}). Please use one of the following: AzureServiceBus, AzureServiceBusWithDeadLetter, RabbitMQ"); + throw new NotSupportedException($"TRANSPORT_TYPE specified has an invalid value ({options.TransportType}). Please use one of the following: AzureServiceBus, AzureServiceBusWithDeadLetter, RabbitMQ, AmazonSQS"); } + } + + // Reads configuration at composition time, applying CLI-arg-first then env-var fallback. + // CLI args are mapped to PascalCase keys (e.g. --ConnectionString=value), + // env vars use SCREAMING_SNAKE_CASE (e.g. CONNECTION_STRING). + static ConnectorOptions ReadOptions(IConfiguration config) => new() + { + ConnectionString = config["ConnectionString"] ?? config["CONNECTION_STRING"] ?? string.Empty, + TransportType = config["TransportType"] ?? config["TRANSPORT_TYPE"] ?? string.Empty, + QueuesFile = config["QueuesFile"] ?? config["QUEUES_FILE"] ?? string.Empty, + ReturnQueue = config["ReturnQueue"] ?? config["RETURN_QUEUE"] ?? "Particular.ServiceControl.Connector.MassTransit_return", + ErrorQueue = config["ErrorQueue"] ?? config["ERROR_QUEUE"] ?? "error", + ServiceControlQueue = config["ServiceControlQueue"] ?? config["SERVICECONTROL_QUEUE"] ?? "Particular.ServiceControl", + RabbitMqManagementApiUrl = config["RabbitMqManagementApiUrl"] ?? config["RABBITMQ_MANAGEMENT_API_URL"], + RabbitMqManagementApiUsername = config["RabbitMqManagementApiUsername"] ?? config["RABBITMQ_MANAGEMENT_API_USERNAME"], + RabbitMqManagementApiPassword = config["RabbitMqManagementApiPassword"] ?? config["RABBITMQ_MANAGEMENT_API_PASSWORD"], + RabbitMqQueueType = config["RabbitMqQueueType"] ?? config["RABBITMQ_QUEUE_TYPE"] ?? "Quorum", + }; - services.AddSingleton(new FileBasedQueueInformationProvider(staticQueueList)); + // Applied as PostConfigure so IOptions consumers get the same precedence: + // CLI arg (PascalCase key bound by BindConfiguration) > env var > class default. + // + // For required fields (ConnectionString, TransportType, QueuesFile): the class default is + // empty string, so string.IsNullOrEmpty correctly detects "not set via CLI arg". + // + // For optional fields with non-empty class defaults (ReturnQueue, ErrorQueue, etc.): + // string.IsNullOrEmpty would never fire because BindConfiguration leaves the property at + // its class default. Instead, check whether a CLI arg key was present in config directly; + // if not, apply the env var override when one is set. + static void ApplyEnvironmentVariableFallbacks(ConnectorOptions options, IConfiguration config) + { + if (string.IsNullOrEmpty(options.ConnectionString)) + { + options.ConnectionString = config.GetValue("CONNECTION_STRING") ?? string.Empty; + } + + if (string.IsNullOrEmpty(options.TransportType)) + { + options.TransportType = config.GetValue("TRANSPORT_TYPE") ?? string.Empty; + } + + if (string.IsNullOrEmpty(options.QueuesFile)) + { + options.QueuesFile = config.GetValue("QUEUES_FILE") ?? string.Empty; + } + + // For optional settings with non-empty defaults: only apply env var when no CLI arg was supplied. + if (config["ReturnQueue"] == null) + { + options.ReturnQueue = config.GetValue("RETURN_QUEUE") ?? options.ReturnQueue; + } + + if (config["ErrorQueue"] == null) + { + options.ErrorQueue = config.GetValue("ERROR_QUEUE") ?? options.ErrorQueue; + } + + if (config["ServiceControlQueue"] == null) + { + options.ServiceControlQueue = config.GetValue("SERVICECONTROL_QUEUE") ?? options.ServiceControlQueue; + } + + if (string.IsNullOrEmpty(options.RabbitMqManagementApiUrl)) + { + options.RabbitMqManagementApiUrl = config.GetValue("RABBITMQ_MANAGEMENT_API_URL"); + } + + if (string.IsNullOrEmpty(options.RabbitMqManagementApiUsername)) + { + options.RabbitMqManagementApiUsername = config.GetValue("RABBITMQ_MANAGEMENT_API_USERNAME"); + } + + if (string.IsNullOrEmpty(options.RabbitMqManagementApiPassword)) + { + options.RabbitMqManagementApiPassword = config.GetValue("RABBITMQ_MANAGEMENT_API_PASSWORD"); + } + + if (config["RabbitMqQueueType"] == null) + { + options.RabbitMqQueueType = config.GetValue("RABBITMQ_QUEUE_TYPE") ?? options.RabbitMqQueueType; + } } } \ No newline at end of file diff --git a/src/ServiceControl.Connector.MassTransit.Host/ServiceControl.Connector.MassTransit.Host.csproj b/src/ServiceControl.Connector.MassTransit.Host/ServiceControl.Connector.MassTransit.Host.csproj index 5de3cccd..ec096663 100644 --- a/src/ServiceControl.Connector.MassTransit.Host/ServiceControl.Connector.MassTransit.Host.csproj +++ b/src/ServiceControl.Connector.MassTransit.Host/ServiceControl.Connector.MassTransit.Host.csproj @@ -15,6 +15,7 @@ + diff --git a/src/ServiceControl.Connector.MassTransit.RabbitMQ/AdapterRabbitMqConfiguration.cs b/src/ServiceControl.Connector.MassTransit.RabbitMQ/AdapterRabbitMqConfiguration.cs index 8125b771..175aef21 100644 --- a/src/ServiceControl.Connector.MassTransit.RabbitMQ/AdapterRabbitMqConfiguration.cs +++ b/src/ServiceControl.Connector.MassTransit.RabbitMQ/AdapterRabbitMqConfiguration.cs @@ -8,7 +8,7 @@ public static class AdapterRabbitMqConfiguration { - public static void UsingRabbitMQ(this IServiceCollection services, string connectionString, Uri managementApi, string? username, string? password) + public static void UsingRabbitMQ(this IServiceCollection services, string connectionString, Uri managementApi, string? username, string? password, QueueType queueType = QueueType.Quorum) { var connectionConfiguration = ConnectionConfiguration.Create(connectionString, string.Empty); var defaultCredential = new NetworkCredential(username ?? connectionConfiguration.UserName, password ?? connectionConfiguration.Password); @@ -19,7 +19,7 @@ public static void UsingRabbitMQ(this IServiceCollection services, string connec services.AddSingleton(rabbitMqHelper); services.AddSingleton(); services.AddTransient(_ => new RabbitMQTransport( - RoutingTopology.Conventional(QueueType.Quorum), + RoutingTopology.Conventional(queueType), connectionString, enableDelayedDelivery: false ) From 846a5bec6bf0f6d43c4a5705d6b12f2026c2129c Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Mon, 9 Mar 2026 15:59:42 +0100 Subject: [PATCH 2/8] Acceptance tests --- .../.editorconfig | 9 +++ ...it.AcceptanceTests.RabbitMQ.Classic.csproj | 32 +++++++++++ .../TestSuiteConfiguration.cs | 7 +++ ...ConfigureRabbitMQTransportTestExecution.cs | 57 ++++++++++++++++--- .../ConnectionHelper.cs | 32 +++++++++++ .../AdapterRabbitMqConfiguration.cs | 2 +- ...trol.Connector.MassTransit.RabbitMQ.csproj | 5 ++ src/ServiceControl.Connector.MassTransit.sln | 7 +++ 8 files changed, 141 insertions(+), 10 deletions(-) create mode 100644 src/ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ.Classic/.editorconfig create mode 100644 src/ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ.Classic/ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ.Classic.csproj create mode 100644 src/ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ.Classic/TestSuiteConfiguration.cs create mode 100644 src/ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ/ConnectionHelper.cs diff --git a/src/ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ.Classic/.editorconfig b/src/ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ.Classic/.editorconfig new file mode 100644 index 00000000..bfa0f29e --- /dev/null +++ b/src/ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ.Classic/.editorconfig @@ -0,0 +1,9 @@ +[*.cs] + +# Justification: Test project +dotnet_diagnostic.CA2007.severity = none +dotnet_diagnostic.PS0004.severity = none # A parameter of type CancellationToken on a private delegate or method should be required +dotnet_diagnostic.PS0018.severity = none # Add a CancellationToken parameter + +# Justification: Tests don't support cancellation and don't need to forward IMessageHandlerContext.CancellationToken +dotnet_diagnostic.NSB0002.severity = suggestion \ No newline at end of file diff --git a/src/ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ.Classic/ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ.Classic.csproj b/src/ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ.Classic/ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ.Classic.csproj new file mode 100644 index 00000000..8b21e8a6 --- /dev/null +++ b/src/ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ.Classic/ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ.Classic.csproj @@ -0,0 +1,32 @@ + + + + net8.0 + enable + enable + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ.Classic/TestSuiteConfiguration.cs b/src/ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ.Classic/TestSuiteConfiguration.cs new file mode 100644 index 00000000..d552be4a --- /dev/null +++ b/src/ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ.Classic/TestSuiteConfiguration.cs @@ -0,0 +1,7 @@ +[assembly: RabbitMQTest] + +public partial class TestSuiteConfiguration +{ + public IConfigureTransportTestExecution CreateTransportConfiguration() => new ConfigureRabbitMQTransportTestExecution(QueueType.Classic); + public Task Cleanup() => Task.CompletedTask; +} \ No newline at end of file diff --git a/src/ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ/ConfigureRabbitMQTransportTestExecution.cs b/src/ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ/ConfigureRabbitMQTransportTestExecution.cs index 9b675595..858b774a 100644 --- a/src/ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ/ConfigureRabbitMQTransportTestExecution.cs +++ b/src/ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ/ConfigureRabbitMQTransportTestExecution.cs @@ -2,13 +2,16 @@ using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using NServiceBus.AcceptanceTesting.Support; +using NServiceBus.Transport; +using ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ; -class ConfigureRabbitMQTransportTestExecution : IConfigureTransportTestExecution +class ConfigureRabbitMQTransportTestExecution(QueueType queueType = QueueType.Quorum) : IConfigureTransportTestExecution { + TestRabbitMQTransport? transport; + public Func ConfigureTransportForEndpoint(EndpointConfiguration endpointConfiguration, PublisherMetadata publisherMetadata) { - var transport = new RabbitMQTransport( - RoutingTopology.Conventional(QueueType.Quorum), "host=localhost", false); + transport = new TestRabbitMQTransport(RoutingTopology.Conventional(queueType), "host=localhost", false); endpointConfiguration.UseTransport(transport); return Cleanup; } @@ -32,19 +35,55 @@ public void ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurato { if (cfg is IRabbitMqReceiveEndpointConfigurator rmq) { - rmq.SetQuorumQueue(); + if (queueType == QueueType.Quorum) + { + rmq.SetQuorumQueue(); + } } }); } - public void ConfigureTransportForConnector(IServiceCollection services, IConfiguration configuration) - { - services.UsingRabbitMQ("host=localhost", new Uri("http://localhost:15672/"), "guest", "guest"); - } + public void ConfigureTransportForConnector(IServiceCollection services, IConfiguration configuration) => services.UsingRabbitMQ("host=localhost", new Uri("http://localhost:15672/"), "guest", "guest"); Task Cleanup(CancellationToken cancellationToken) { - //TODO? + PurgeQueues(); return Task.CompletedTask; } + + void PurgeQueues() + { + if (transport == null) + { + return; + } + + var queues = transport.QueuesToCleanup.ToHashSet(); + + using var connection = ConnectionHelper.ConnectionFactory.CreateConnection("Test Queue Purger"); + using var channel = connection.CreateModel(); + foreach (var queue in queues) + { + try + { + channel.QueuePurge(queue); + } + catch (Exception ex) + { + Console.WriteLine("Unable to clear queue {0}: {1}", queue, ex); + } + } + } + + class TestRabbitMQTransport(RoutingTopology routingTopology, string connectionString, bool enableDelayedDelivery) : RabbitMQTransport(routingTopology, connectionString, enableDelayedDelivery) + { + public override async Task Initialize(HostSettings hostSettings, ReceiveSettings[] receivers, string[] sendingAddresses, CancellationToken cancellationToken = default) + { + var infrastructure = await base.Initialize(hostSettings, receivers, sendingAddresses, cancellationToken); + QueuesToCleanup.AddRange(infrastructure.Receivers.Select(x => x.Value.ReceiveAddress).Concat(sendingAddresses).Distinct()); + return infrastructure; + } + + public List QueuesToCleanup { get; } = []; + } } \ No newline at end of file diff --git a/src/ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ/ConnectionHelper.cs b/src/ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ/ConnectionHelper.cs new file mode 100644 index 00000000..871db9fe --- /dev/null +++ b/src/ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ/ConnectionHelper.cs @@ -0,0 +1,32 @@ +namespace ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ; + +using System; +using System.Security.Authentication; +using global::RabbitMQ.Client; + +public static class ConnectionHelper +{ + static Lazy connectionFactory = new(() => + { + var connectionConfiguration = AdapterRabbitMqConfiguration.ConnectionConfiguration.Create("host=localhost", "AcceptanceTests"); + + var factory = new ConnectionFactory + { + AutomaticRecoveryEnabled = true, + HostName = connectionConfiguration.Host, + Port = connectionConfiguration.Port, + VirtualHost = connectionConfiguration.VirtualHost, + UserName = connectionConfiguration.UserName ?? "guest", + Password = connectionConfiguration.Password ?? "guest" + }; + + factory.Ssl.ServerName = factory.HostName; + factory.Ssl.Certs = null; + factory.Ssl.Version = SslProtocols.Tls12; + factory.Ssl.Enabled = connectionConfiguration.UseTls; + + return factory; + }); + + public static ConnectionFactory ConnectionFactory => connectionFactory.Value; +} \ No newline at end of file diff --git a/src/ServiceControl.Connector.MassTransit.RabbitMQ/AdapterRabbitMqConfiguration.cs b/src/ServiceControl.Connector.MassTransit.RabbitMQ/AdapterRabbitMqConfiguration.cs index 175aef21..25b0efae 100644 --- a/src/ServiceControl.Connector.MassTransit.RabbitMQ/AdapterRabbitMqConfiguration.cs +++ b/src/ServiceControl.Connector.MassTransit.RabbitMQ/AdapterRabbitMqConfiguration.cs @@ -41,7 +41,7 @@ public static void UsingRabbitMQ(this IServiceCollection services, string connec } // Copied from the NServiceBus.Rabbit repository - https://github.com/Particular/NServiceBus.RabbitMQ/blob/master/src/NServiceBus.Transport.RabbitMQ/Configuration/ConnectionConfiguration.cs - class ConnectionConfiguration + internal class ConnectionConfiguration { const bool defaultUseTls = false; const int defaultPort = 5672; diff --git a/src/ServiceControl.Connector.MassTransit.RabbitMQ/ServiceControl.Connector.MassTransit.RabbitMQ.csproj b/src/ServiceControl.Connector.MassTransit.RabbitMQ/ServiceControl.Connector.MassTransit.RabbitMQ.csproj index 1482cb15..bb583adf 100644 --- a/src/ServiceControl.Connector.MassTransit.RabbitMQ/ServiceControl.Connector.MassTransit.RabbitMQ.csproj +++ b/src/ServiceControl.Connector.MassTransit.RabbitMQ/ServiceControl.Connector.MassTransit.RabbitMQ.csproj @@ -14,4 +14,9 @@ + + + + + diff --git a/src/ServiceControl.Connector.MassTransit.sln b/src/ServiceControl.Connector.MassTransit.sln index 5d07aeda..4400cc17 100644 --- a/src/ServiceControl.Connector.MassTransit.sln +++ b/src/ServiceControl.Connector.MassTransit.sln @@ -41,6 +41,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ServiceControl.Connector.MassTransit.AcceptanceTests.AzureServiceBusDeadLetter", "ServiceControl.Connector.MassTransit.AcceptanceTests.AzureServiceBusDeadLetter\ServiceControl.Connector.MassTransit.AcceptanceTests.AzureServiceBusDeadLetter.csproj", "{B76A74EB-2FA7-4052-8CB9-96AAFDAD5AC9}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ.Classic", "ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ.Classic\ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ.Classic.csproj", "{00F20BD2-3EB9-4D8F-9B76-EE335F5B9F93}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -95,6 +97,10 @@ Global {B76A74EB-2FA7-4052-8CB9-96AAFDAD5AC9}.Debug|Any CPU.Build.0 = Debug|Any CPU {B76A74EB-2FA7-4052-8CB9-96AAFDAD5AC9}.Release|Any CPU.ActiveCfg = Release|Any CPU {B76A74EB-2FA7-4052-8CB9-96AAFDAD5AC9}.Release|Any CPU.Build.0 = Release|Any CPU + {00F20BD2-3EB9-4D8F-9B76-EE335F5B9F93}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {00F20BD2-3EB9-4D8F-9B76-EE335F5B9F93}.Debug|Any CPU.Build.0 = Debug|Any CPU + {00F20BD2-3EB9-4D8F-9B76-EE335F5B9F93}.Release|Any CPU.ActiveCfg = Release|Any CPU + {00F20BD2-3EB9-4D8F-9B76-EE335F5B9F93}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -110,6 +116,7 @@ Global {93E5D84C-5CC7-4628-804C-A29B62C0C7E1} = {F5A3D35F-95A6-41F6-857F-866CA96E06FD} {D1335BCA-C2A5-415D-8877-61FA146391AC} = {F5A3D35F-95A6-41F6-857F-866CA96E06FD} {B76A74EB-2FA7-4052-8CB9-96AAFDAD5AC9} = {F5A3D35F-95A6-41F6-857F-866CA96E06FD} + {00F20BD2-3EB9-4D8F-9B76-EE335F5B9F93} = {F5A3D35F-95A6-41F6-857F-866CA96E06FD} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {2E5F2F88-CFD8-439C-98E9-52803B9F4641} From ce5b0460a5c12c1434fffa6c26b71ba632d725ba Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Mon, 9 Mar 2026 16:39:50 +0100 Subject: [PATCH 3/8] Add queueType parameter to RabbitMQ transport configuration in acceptance tests --- .../ConfigureRabbitMQTransportTestExecution.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ/ConfigureRabbitMQTransportTestExecution.cs b/src/ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ/ConfigureRabbitMQTransportTestExecution.cs index 858b774a..a13ebd1f 100644 --- a/src/ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ/ConfigureRabbitMQTransportTestExecution.cs +++ b/src/ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ/ConfigureRabbitMQTransportTestExecution.cs @@ -43,7 +43,7 @@ public void ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurato }); } - public void ConfigureTransportForConnector(IServiceCollection services, IConfiguration configuration) => services.UsingRabbitMQ("host=localhost", new Uri("http://localhost:15672/"), "guest", "guest"); + public void ConfigureTransportForConnector(IServiceCollection services, IConfiguration configuration) => services.UsingRabbitMQ("host=localhost", new Uri("http://localhost:15672/"), "guest", "guest", queueType); Task Cleanup(CancellationToken cancellationToken) { From e6b83e5a6bb51fb93fca409fbd03195f1839c03d Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Mon, 9 Mar 2026 16:40:09 +0100 Subject: [PATCH 4/8] Fix misleading assert.ignore --- .../TestAttributes/RabbitMQTestAttribute.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ServiceControl.Connector.MassTransit.AcceptanceTesting/TestAttributes/RabbitMQTestAttribute.cs b/src/ServiceControl.Connector.MassTransit.AcceptanceTesting/TestAttributes/RabbitMQTestAttribute.cs index a39bff8a..f018157e 100644 --- a/src/ServiceControl.Connector.MassTransit.AcceptanceTesting/TestAttributes/RabbitMQTestAttribute.cs +++ b/src/ServiceControl.Connector.MassTransit.AcceptanceTesting/TestAttributes/RabbitMQTestAttribute.cs @@ -11,7 +11,7 @@ public void ApplyToContext(TestExecutionContext context) var connectionString = Environment.GetEnvironmentVariable("RabbitMQTransport_ConnectionString"); if (string.IsNullOrWhiteSpace(connectionString)) { - Assert.Ignore("Ignoring because environment variable RabbitMQConnectionString is not available"); + Assert.Ignore("Ignoring because environment variable RabbitMQTransport_ConnectionString is not available"); } } } \ No newline at end of file From 25dff1cc99b52900cfb764f83a9c7a63d59f8f99 Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Tue, 10 Mar 2026 09:53:52 +0100 Subject: [PATCH 5/8] Refactor transport configuration to support asynchronous cleanup delegation in acceptance tests --- .../IConfigureTransportTestExecution.cs | 3 ++- .../ConfigureAmazonSQSTransportTestExecution.cs | 3 ++- ...gureAzureServiceBusTransportTestExecution.cs | 3 ++- ...gureAzureServiceBusTransportTestExecution.cs | 3 ++- .../ConfigureRabbitMQTransportTestExecution.cs | 17 ++++++++++++++--- .../Shared/Support/ConnectorComponent.cs | 7 ++++++- .../Support/MassTransitComponentExtensions.cs | 8 ++------ 7 files changed, 30 insertions(+), 14 deletions(-) diff --git a/src/ServiceControl.Connector.MassTransit.AcceptanceTesting/IConfigureTransportTestExecution.cs b/src/ServiceControl.Connector.MassTransit.AcceptanceTesting/IConfigureTransportTestExecution.cs index 65c9af18..3941b627 100644 --- a/src/ServiceControl.Connector.MassTransit.AcceptanceTesting/IConfigureTransportTestExecution.cs +++ b/src/ServiceControl.Connector.MassTransit.AcceptanceTesting/IConfigureTransportTestExecution.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using MassTransit; @@ -13,5 +14,5 @@ public interface IConfigureTransportTestExecution void ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurator configurator); - void ConfigureTransportForConnector(IServiceCollection services, IConfiguration configuration); + Func, CancellationToken, Task> ConfigureTransportForConnector(IServiceCollection services, IConfiguration configuration); } \ No newline at end of file diff --git a/src/ServiceControl.Connector.MassTransit.AcceptanceTests.AmazonSQS/ConfigureAmazonSQSTransportTestExecution.cs b/src/ServiceControl.Connector.MassTransit.AcceptanceTests.AmazonSQS/ConfigureAmazonSQSTransportTestExecution.cs index 9771d2d8..ecfd7bf0 100644 --- a/src/ServiceControl.Connector.MassTransit.AcceptanceTests.AmazonSQS/ConfigureAmazonSQSTransportTestExecution.cs +++ b/src/ServiceControl.Connector.MassTransit.AcceptanceTests.AmazonSQS/ConfigureAmazonSQSTransportTestExecution.cs @@ -31,7 +31,7 @@ public void ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurato }); } - public void ConfigureTransportForConnector(IServiceCollection services, IConfiguration configuration) + public Func, CancellationToken, Task> ConfigureTransportForConnector(IServiceCollection services, IConfiguration configuration) { services.UsingAmazonSqs(transport => { @@ -39,5 +39,6 @@ public void ConfigureTransportForConnector(IServiceCollection services, IConfigu transport.TopicNamePrefix = NamePrefixGenerator.GetNamePrefix(); transport.QueueNameGenerator = TestNameHelper.GetSqsQueueName; }); + return (_, _) => Task.CompletedTask; } } \ No newline at end of file diff --git a/src/ServiceControl.Connector.MassTransit.AcceptanceTests.AzureServiceBus/ConfigureAzureServiceBusTransportTestExecution.cs b/src/ServiceControl.Connector.MassTransit.AcceptanceTests.AzureServiceBus/ConfigureAzureServiceBusTransportTestExecution.cs index 4fcec794..218eb843 100644 --- a/src/ServiceControl.Connector.MassTransit.AcceptanceTests.AzureServiceBus/ConfigureAzureServiceBusTransportTestExecution.cs +++ b/src/ServiceControl.Connector.MassTransit.AcceptanceTests.AzureServiceBus/ConfigureAzureServiceBusTransportTestExecution.cs @@ -23,9 +23,10 @@ public void ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurato }); } - public void ConfigureTransportForConnector(IServiceCollection services, IConfiguration configuration) + public Func, CancellationToken, Task> ConfigureTransportForConnector(IServiceCollection services, IConfiguration configuration) { services.UsingAzureServiceBus(configuration, connectionString); + return (_, _) => Task.CompletedTask; } Task Cleanup(CancellationToken cancellationToken) diff --git a/src/ServiceControl.Connector.MassTransit.AcceptanceTests.AzureServiceBusDeadLetter/ConfigureAzureServiceBusTransportTestExecution.cs b/src/ServiceControl.Connector.MassTransit.AcceptanceTests.AzureServiceBusDeadLetter/ConfigureAzureServiceBusTransportTestExecution.cs index ba1fd6ac..8ff8c392 100644 --- a/src/ServiceControl.Connector.MassTransit.AcceptanceTests.AzureServiceBusDeadLetter/ConfigureAzureServiceBusTransportTestExecution.cs +++ b/src/ServiceControl.Connector.MassTransit.AcceptanceTests.AzureServiceBusDeadLetter/ConfigureAzureServiceBusTransportTestExecution.cs @@ -31,9 +31,10 @@ public void ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurato }); } - public void ConfigureTransportForConnector(IServiceCollection services, IConfiguration configuration) + public Func, CancellationToken, Task> ConfigureTransportForConnector(IServiceCollection services, IConfiguration configuration) { services.UsingAzureServiceBus(configuration, connectionString, true); + return (_, _) => Task.CompletedTask; } Task Cleanup(CancellationToken cancellationToken) diff --git a/src/ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ/ConfigureRabbitMQTransportTestExecution.cs b/src/ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ/ConfigureRabbitMQTransportTestExecution.cs index a13ebd1f..8db0a01b 100644 --- a/src/ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ/ConfigureRabbitMQTransportTestExecution.cs +++ b/src/ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ/ConfigureRabbitMQTransportTestExecution.cs @@ -43,7 +43,15 @@ public void ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurato }); } - public void ConfigureTransportForConnector(IServiceCollection services, IConfiguration configuration) => services.UsingRabbitMQ("host=localhost", new Uri("http://localhost:15672/"), "guest", "guest", queueType); + public Func, CancellationToken, Task> ConfigureTransportForConnector(IServiceCollection services, IConfiguration configuration) + { + services.UsingRabbitMQ("host=localhost", new Uri("http://localhost:15672/"), "guest", "guest", queueType); + return (queuesToDelete, _) => + { + DeleteQueues(queuesToDelete); + return Task.CompletedTask; + }; + } Task Cleanup(CancellationToken cancellationToken) { @@ -58,15 +66,18 @@ void PurgeQueues() return; } - var queues = transport.QueuesToCleanup.ToHashSet(); + DeleteQueues(transport.QueuesToCleanup.ToHashSet()); + } + static void DeleteQueues(IReadOnlyCollection queues) + { using var connection = ConnectionHelper.ConnectionFactory.CreateConnection("Test Queue Purger"); using var channel = connection.CreateModel(); foreach (var queue in queues) { try { - channel.QueuePurge(queue); + channel.QueueDelete(queue, false, false); } catch (Exception ex) { diff --git a/src/ServiceControl.Connector.MassTransit.AcceptanceTests/Shared/Support/ConnectorComponent.cs b/src/ServiceControl.Connector.MassTransit.AcceptanceTests/Shared/Support/ConnectorComponent.cs index 0cf12b5b..66ed695c 100644 --- a/src/ServiceControl.Connector.MassTransit.AcceptanceTests/Shared/Support/ConnectorComponent.cs +++ b/src/ServiceControl.Connector.MassTransit.AcceptanceTests/Shared/Support/ConnectorComponent.cs @@ -67,7 +67,7 @@ public override async Task Start(CancellationToken cancellationToken = default) configuration, provider.GetRequiredService())); } - transportConfig.ConfigureTransportForConnector(services, hostContext.Configuration); + cleanup = transportConfig.ConfigureTransportForConnector(services, hostContext.Configuration); }); host = builder.Build(); @@ -92,9 +92,14 @@ public override async Task Stop(CancellationToken cancellationToken = default) finally { host.Dispose(); + if (cleanup != null) + { + await cleanup([errorQueue, serviceControlQueue ?? "Particular.ServiceControl", returnQueue, $"{returnQueue}.poison", .. queueNamesToMonitor], cancellationToken); + } } } IHost? host; + Func? cleanup; } } \ No newline at end of file diff --git a/src/ServiceControl.Connector.MassTransit.AcceptanceTests/Shared/Support/MassTransitComponentExtensions.cs b/src/ServiceControl.Connector.MassTransit.AcceptanceTests/Shared/Support/MassTransitComponentExtensions.cs index abde3334..ddbabd90 100644 --- a/src/ServiceControl.Connector.MassTransit.AcceptanceTests/Shared/Support/MassTransitComponentExtensions.cs +++ b/src/ServiceControl.Connector.MassTransit.AcceptanceTests/Shared/Support/MassTransitComponentExtensions.cs @@ -11,10 +11,6 @@ public static IScenarioWithEndpointBehavior WithMassTransit( string name, Action busConfig, Action? hostConfig = null) - where TContext : ScenarioContext - { - return scenario.WithComponent(new MassTransitComponent(name, busConfig, hostConfig ?? ((_, _) => - { - }))); - } + where TContext : ScenarioContext => + scenario.WithComponent(new MassTransitComponent(name, busConfig, hostConfig ?? ((_, _) => { }))); } \ No newline at end of file From 1943ccff38b3a9011612137875fea46f57b0e3c9 Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Tue, 10 Mar 2026 10:00:27 +0100 Subject: [PATCH 6/8] Simplify pattern Co-authored-by: Sean Feldman --- .../ConnectorOptionsValidator.cs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/ServiceControl.Connector.MassTransit.Host/ConnectorOptionsValidator.cs b/src/ServiceControl.Connector.MassTransit.Host/ConnectorOptionsValidator.cs index 0912092c..9089357f 100644 --- a/src/ServiceControl.Connector.MassTransit.Host/ConnectorOptionsValidator.cs +++ b/src/ServiceControl.Connector.MassTransit.Host/ConnectorOptionsValidator.cs @@ -39,8 +39,7 @@ public ValidateOptionsResult Validate(string? name, ConnectorOptions options) static bool IsValidQueueType(string? queueType) => queueType?.ToLowerInvariant() switch { - "classic" => true, - "quorum" => true, + "classic" or "quorum" or null => true, // Default will be used _ => false }; From 788d60ec9eefc340f6a5f1f4ae5002533efe883c Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Tue, 10 Mar 2026 10:37:12 +0100 Subject: [PATCH 7/8] Also cleanup consumer queues in acceptance tests --- .../IConfigureTransportTestExecution.cs | 2 +- .../ConfigureAmazonSQSTransportTestExecution.cs | 3 ++- ...igureAzureServiceBusTransportTestExecution.cs | 3 ++- ...igureAzureServiceBusTransportTestExecution.cs | 3 ++- .../ConfigureRabbitMQTransportTestExecution.cs | 8 +++++++- .../Shared/Support/MassTransitComponent.cs | 16 +++++++++++++++- 6 files changed, 29 insertions(+), 6 deletions(-) diff --git a/src/ServiceControl.Connector.MassTransit.AcceptanceTesting/IConfigureTransportTestExecution.cs b/src/ServiceControl.Connector.MassTransit.AcceptanceTesting/IConfigureTransportTestExecution.cs index 3941b627..01e841e2 100644 --- a/src/ServiceControl.Connector.MassTransit.AcceptanceTesting/IConfigureTransportTestExecution.cs +++ b/src/ServiceControl.Connector.MassTransit.AcceptanceTesting/IConfigureTransportTestExecution.cs @@ -12,7 +12,7 @@ public interface IConfigureTransportTestExecution { Func ConfigureTransportForEndpoint(EndpointConfiguration endpointConfiguration, PublisherMetadata publisherMetadata); - void ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurator configurator); + Func, CancellationToken, Task> ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurator configurator); Func, CancellationToken, Task> ConfigureTransportForConnector(IServiceCollection services, IConfiguration configuration); } \ No newline at end of file diff --git a/src/ServiceControl.Connector.MassTransit.AcceptanceTests.AmazonSQS/ConfigureAmazonSQSTransportTestExecution.cs b/src/ServiceControl.Connector.MassTransit.AcceptanceTests.AmazonSQS/ConfigureAmazonSQSTransportTestExecution.cs index ecfd7bf0..ed2035e0 100644 --- a/src/ServiceControl.Connector.MassTransit.AcceptanceTests.AmazonSQS/ConfigureAmazonSQSTransportTestExecution.cs +++ b/src/ServiceControl.Connector.MassTransit.AcceptanceTests.AmazonSQS/ConfigureAmazonSQSTransportTestExecution.cs @@ -14,7 +14,7 @@ public Func ConfigureTransportForEndpoint(EndpointConfi return _ => Task.CompletedTask; } - public void ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurator configurator) + public Func, CancellationToken, Task> ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurator configurator) { var region = FallbackRegionFactory.GetRegionEndpoint().SystemName; @@ -29,6 +29,7 @@ public void ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurato cfg.ConfigureEndpoints(context, new DefaultEndpointNameFormatter(NamePrefixGenerator.GetNamePrefix(), false)); }); + return (_, _) => Task.CompletedTask; } public Func, CancellationToken, Task> ConfigureTransportForConnector(IServiceCollection services, IConfiguration configuration) diff --git a/src/ServiceControl.Connector.MassTransit.AcceptanceTests.AzureServiceBus/ConfigureAzureServiceBusTransportTestExecution.cs b/src/ServiceControl.Connector.MassTransit.AcceptanceTests.AzureServiceBus/ConfigureAzureServiceBusTransportTestExecution.cs index 218eb843..36ab5d1e 100644 --- a/src/ServiceControl.Connector.MassTransit.AcceptanceTests.AzureServiceBus/ConfigureAzureServiceBusTransportTestExecution.cs +++ b/src/ServiceControl.Connector.MassTransit.AcceptanceTests.AzureServiceBus/ConfigureAzureServiceBusTransportTestExecution.cs @@ -14,13 +14,14 @@ public Func ConfigureTransportForEndpoint(EndpointConfi return Cleanup; } - public void ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurator configurator) + public Func, CancellationToken, Task> ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurator configurator) { configurator.UsingAzureServiceBus((context, cfg) => { cfg.Host(connectionString); cfg.ConfigureEndpoints(context); }); + return (_, _) => Task.CompletedTask; } public Func, CancellationToken, Task> ConfigureTransportForConnector(IServiceCollection services, IConfiguration configuration) diff --git a/src/ServiceControl.Connector.MassTransit.AcceptanceTests.AzureServiceBusDeadLetter/ConfigureAzureServiceBusTransportTestExecution.cs b/src/ServiceControl.Connector.MassTransit.AcceptanceTests.AzureServiceBusDeadLetter/ConfigureAzureServiceBusTransportTestExecution.cs index 8ff8c392..8123b158 100644 --- a/src/ServiceControl.Connector.MassTransit.AcceptanceTests.AzureServiceBusDeadLetter/ConfigureAzureServiceBusTransportTestExecution.cs +++ b/src/ServiceControl.Connector.MassTransit.AcceptanceTests.AzureServiceBusDeadLetter/ConfigureAzureServiceBusTransportTestExecution.cs @@ -14,7 +14,7 @@ public Func ConfigureTransportForEndpoint(EndpointConfi return Cleanup; } - public void ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurator configurator) + public Func, CancellationToken, Task> ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurator configurator) { configurator.UsingAzureServiceBus((context, cfg) => { @@ -29,6 +29,7 @@ public void ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurato sb.ConfigureDeadLetterQueueErrorTransport(); } }); + return (_, _) => Task.CompletedTask; } public Func, CancellationToken, Task> ConfigureTransportForConnector(IServiceCollection services, IConfiguration configuration) diff --git a/src/ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ/ConfigureRabbitMQTransportTestExecution.cs b/src/ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ/ConfigureRabbitMQTransportTestExecution.cs index 8db0a01b..8ab85e73 100644 --- a/src/ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ/ConfigureRabbitMQTransportTestExecution.cs +++ b/src/ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ/ConfigureRabbitMQTransportTestExecution.cs @@ -16,7 +16,7 @@ public Func ConfigureTransportForEndpoint(EndpointConfi return Cleanup; } - public void ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurator configurator) + public Func, CancellationToken, Task> ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurator configurator) { configurator.UsingRabbitMq((context, cfg) => { @@ -41,6 +41,12 @@ public void ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurato } } }); + + return (queuesToDelete, _) => + { + DeleteQueues(queuesToDelete); + return Task.CompletedTask; + }; } public Func, CancellationToken, Task> ConfigureTransportForConnector(IServiceCollection services, IConfiguration configuration) diff --git a/src/ServiceControl.Connector.MassTransit.AcceptanceTests/Shared/Support/MassTransitComponent.cs b/src/ServiceControl.Connector.MassTransit.AcceptanceTests/Shared/Support/MassTransitComponent.cs index 8f156556..4a40c849 100644 --- a/src/ServiceControl.Connector.MassTransit.AcceptanceTests/Shared/Support/MassTransitComponent.cs +++ b/src/ServiceControl.Connector.MassTransit.AcceptanceTests/Shared/Support/MassTransitComponent.cs @@ -1,4 +1,5 @@ using MassTransit; +using MassTransit.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; @@ -30,7 +31,7 @@ public override async Task Start(CancellationToken cancellationToken = default) services.AddMassTransit(x => { busConfig(x); - transportConfig.ConfigureTransportForMassTransitEndpoint(x); + cleanup = transportConfig.ConfigureTransportForMassTransitEndpoint(x); }); hostConfig(hostContext, services); services.AddSingleton((TContext)scenarioContext); @@ -47,16 +48,29 @@ public override async Task Stop(CancellationToken cancellationToken = default) return; } + var queuesToDelete = new List(); try { + var consumerRegistrations = host.Services.GetServices(); + foreach (var registration in consumerRegistrations) + { + var consumerDefinition = registration.GetDefinition(null); + // we assume the endpoint name is already set and formatted + queuesToDelete.Add(consumerDefinition.GetEndpointName(null)); + } await host.StopAsync(cancellationToken).ConfigureAwait(false); } finally { host.Dispose(); + if (cleanup != null) + { + await cleanup(queuesToDelete, cancellationToken).ConfigureAwait(false); + } } } IHost? host; + Func, CancellationToken, Task>? cleanup; } } \ No newline at end of file From 784b6fd2bfd00e26c45c6f6e4891e924f4c06461 Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Tue, 10 Mar 2026 10:40:24 +0100 Subject: [PATCH 8/8] Null value handling --- .../Shared/Support/MassTransitComponent.cs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/ServiceControl.Connector.MassTransit.AcceptanceTests/Shared/Support/MassTransitComponent.cs b/src/ServiceControl.Connector.MassTransit.AcceptanceTests/Shared/Support/MassTransitComponent.cs index 4a40c849..0674304d 100644 --- a/src/ServiceControl.Connector.MassTransit.AcceptanceTests/Shared/Support/MassTransitComponent.cs +++ b/src/ServiceControl.Connector.MassTransit.AcceptanceTests/Shared/Support/MassTransitComponent.cs @@ -54,9 +54,8 @@ public override async Task Stop(CancellationToken cancellationToken = default) var consumerRegistrations = host.Services.GetServices(); foreach (var registration in consumerRegistrations) { - var consumerDefinition = registration.GetDefinition(null); // we assume the endpoint name is already set and formatted - queuesToDelete.Add(consumerDefinition.GetEndpointName(null)); + queuesToDelete.Add(registration.GetDefinition(null!).GetEndpointName(null!)); } await host.StopAsync(cancellationToken).ConfigureAwait(false); }