diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6a8b91de03..57ceb9f956 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -18,7 +18,7 @@ jobs: strategy: matrix: os: [windows-latest, ubuntu-latest] - test-category: [ Default, SqlServer, AzureServiceBus, RabbitMQ, AzureStorageQueues, MSMQ, SQS, PrimaryRavenAcceptance, PrimaryRavenPersistence, PostgreSQL ] + test-category: [ Default, SqlServer, AzureServiceBus, RabbitMQ, AzureStorageQueues, MSMQ, SQS, PrimaryRavenAcceptance, PrimaryRavenPersistence, PostgreSQL, IBMMQ ] include: - os: windows-latest os-name: Windows @@ -27,6 +27,8 @@ jobs: exclude: - os: ubuntu-latest test-category: MSMQ + - os: windows-latest + test-category: IBMMQ fail-fast: false steps: - name: Check for secrets @@ -103,6 +105,18 @@ jobs: connection-string-name: ServiceControl_TransportTests_ASQ_ConnectionString azure-credentials: ${{ secrets.AZURE_ACI_CREDENTIALS }} tag: ServiceControl + - name: Setup IBM MQ + if: matrix.test-category == 'IBMMQ' + run: | + docker run --name ibmmq -d -p 1414:1414 -p 9443:9443 ` + --health-cmd "dspmq" --health-interval 10s --health-timeout 5s --health-retries 10 --health-start-period 30s ` + -e LICENSE=accept -e MQ_QMGR_NAME=QM1 -e MQ_ADMIN_PASSWORD=passw0rd ` + icr.io/ibm-messaging/mq:latest + # Wait for container health check to pass + while ((docker inspect --format '{{.State.Health.Status}}' ibmmq) -ne 'healthy') { + Start-Sleep -Seconds 2 + } + echo "ServiceControl_TransportTests_IBMMQ_ConnectionString=mq://admin:passw0rd@localhost:1414/QM1?channel=DEV.ADMIN.SVRCONN&topicprefix=DEV" | Out-File -FilePath $Env:GITHUB_ENV -Encoding utf-8 -Append - name: Setup SQS environment variables if: matrix.test-category == 'SQS' run: | diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props index fa54c02e52..5ff1d79444 100644 --- a/src/Directory.Packages.props +++ b/src/Directory.Packages.props @@ -52,6 +52,7 @@ + diff --git a/src/ServiceControl.Transports.IBMMQ.Tests/DeadLetterQueueCheckTests.cs b/src/ServiceControl.Transports.IBMMQ.Tests/DeadLetterQueueCheckTests.cs new file mode 100644 index 0000000000..b05f681e9c --- /dev/null +++ b/src/ServiceControl.Transports.IBMMQ.Tests/DeadLetterQueueCheckTests.cs @@ -0,0 +1,148 @@ +namespace ServiceControl.Transport.Tests; + +using System; +using System.Collections; +using System.Threading.Tasks; +using System.Web; +using IBM.WMQ; +using NUnit.Framework; +using Transports; +using Transports.IBMMQ; + +[TestFixture] +class DeadLetterQueueCheckTests +{ + [Test] + public async Task Should_pass_when_custom_checks_disabled() + { + var settings = new TransportSettings + { + ConnectionString = ConnectionString, + RunCustomChecks = false + }; + + var check = new DeadLetterQueueCheck(settings); + var result = await check.PerformCheck().ConfigureAwait(false); + + Assert.That(result.HasFailed, Is.False); + } + + [Test] + public async Task Should_pass_when_dlq_is_empty() + { + DrainDeadLetterQueue(); + + var settings = new TransportSettings + { + ConnectionString = ConnectionString, + RunCustomChecks = true + }; + + var check = new DeadLetterQueueCheck(settings); + var result = await check.PerformCheck().ConfigureAwait(false); + + Assert.That(result.HasFailed, Is.False); + } + + [Test] + public async Task Should_fail_when_dlq_has_messages() + { + DrainDeadLetterQueue(); + PutMessageOnDeadLetterQueue(); + + try + { + var settings = new TransportSettings + { + ConnectionString = ConnectionString, + RunCustomChecks = true + }; + + var check = new DeadLetterQueueCheck(settings); + var result = await check.PerformCheck().ConfigureAwait(false); + + Assert.That(result.HasFailed, Is.True); + Assert.That(result.FailureReason, Does.Contain("messages in the Dead Letter Queue")); + } + finally + { + DrainDeadLetterQueue(); + } + } + + [Test] + public async Task Should_fail_when_connection_is_invalid() + { + var settings = new TransportSettings + { + ConnectionString = "mq://admin:passw0rd@localhost:19999/BOGUS", + RunCustomChecks = true + }; + + var check = new DeadLetterQueueCheck(settings); + var result = await check.PerformCheck().ConfigureAwait(false); + + Assert.That(result.HasFailed, Is.True); + Assert.That(result.FailureReason, Does.Contain("Unable to check Dead Letter Queue")); + Assert.That(result.FailureReason, Does.Contain("RC=")); + } + + static void PutMessageOnDeadLetterQueue() + { + var (qmName, props) = ParseConnectionString(); + using var qm = new MQQueueManager(qmName, props); + var dlqName = qm.DeadLetterQueueName.Trim(); + using var dlq = qm.AccessQueue(dlqName, MQC.MQOO_OUTPUT); + var msg = new MQMessage(); + msg.WriteString("DLQ test message"); + dlq.Put(msg); + } + + static void DrainDeadLetterQueue() + { + var (qmName, props) = ParseConnectionString(); + using var qm = new MQQueueManager(qmName, props); + var dlqName = qm.DeadLetterQueueName.Trim(); + using var dlq = qm.AccessQueue(dlqName, MQC.MQOO_INPUT_SHARED | MQC.MQOO_FAIL_IF_QUIESCING); + var gmo = new MQGetMessageOptions { WaitInterval = 0, Options = MQC.MQGMO_NO_WAIT }; + while (true) + { + try + { + dlq.Get(new MQMessage(), gmo); + } + catch (MQException e) when (e.ReasonCode == MQC.MQRC_NO_MSG_AVAILABLE) + { + break; + } + } + } + + static (string queueManagerName, Hashtable properties) ParseConnectionString() + { + var uri = new Uri(ConnectionString); + var query = HttpUtility.ParseQueryString(uri.Query); + + var qmName = uri.AbsolutePath.Trim('/') is { Length: > 0 } path + ? Uri.UnescapeDataString(path) + : "QM1"; + + var props = new Hashtable + { + [MQC.TRANSPORT_PROPERTY] = MQC.TRANSPORT_MQSERIES_MANAGED, + [MQC.HOST_NAME_PROPERTY] = uri.Host, + [MQC.PORT_PROPERTY] = uri.Port > 0 ? uri.Port : 1414, + [MQC.CHANNEL_PROPERTY] = query["channel"] ?? "DEV.ADMIN.SVRCONN", + [MQC.USE_MQCSP_AUTHENTICATION_PROPERTY] = true, + [MQC.USER_ID_PROPERTY] = Uri.UnescapeDataString(uri.UserInfo.Split(':')[0]), + [MQC.PASSWORD_PROPERTY] = Uri.UnescapeDataString(uri.UserInfo.Split(':')[1]) + }; + + return (qmName, props); + } + + static readonly string ConnectionString = + Environment.GetEnvironmentVariable("ServiceControl_TransportTests_IBMMQ_ConnectionString") + ?? Environment.GetEnvironmentVariable("SERVICECONTROL_TRANSPORTTESTS_IBMMQ_CONNECTIONSTRING") + ?? "mq://admin:passw0rd@localhost:1414"; +} diff --git a/src/ServiceControl.Transports.IBMMQ.Tests/ServiceControl.Transports.IBMMQ.Tests.csproj b/src/ServiceControl.Transports.IBMMQ.Tests/ServiceControl.Transports.IBMMQ.Tests.csproj new file mode 100644 index 0000000000..b8c5306c4e --- /dev/null +++ b/src/ServiceControl.Transports.IBMMQ.Tests/ServiceControl.Transports.IBMMQ.Tests.csproj @@ -0,0 +1,32 @@ + + + + net10.0 + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/ServiceControl.Transports.IBMMQ.Tests/ServiceControlAuditEndpointTests.cs b/src/ServiceControl.Transports.IBMMQ.Tests/ServiceControlAuditEndpointTests.cs new file mode 100644 index 0000000000..f70a724658 --- /dev/null +++ b/src/ServiceControl.Transports.IBMMQ.Tests/ServiceControlAuditEndpointTests.cs @@ -0,0 +1,8 @@ +namespace ServiceControl.Transport.Tests; + +using System; + +partial class ServiceControlAuditEndpointTests +{ + private static partial int GetTransportDefaultConcurrency() => 32; +} \ No newline at end of file diff --git a/src/ServiceControl.Transports.IBMMQ.Tests/ServiceControlMonitoringEndpointTests.cs b/src/ServiceControl.Transports.IBMMQ.Tests/ServiceControlMonitoringEndpointTests.cs new file mode 100644 index 0000000000..0a62a8e7d4 --- /dev/null +++ b/src/ServiceControl.Transports.IBMMQ.Tests/ServiceControlMonitoringEndpointTests.cs @@ -0,0 +1,8 @@ +namespace ServiceControl.Transport.Tests; + +using System; + +partial class ServiceControlMonitoringEndpointTests +{ + private static partial int GetTransportDefaultConcurrency() => 32; +} \ No newline at end of file diff --git a/src/ServiceControl.Transports.IBMMQ.Tests/ServiceControlPrimaryEndpointTests.cs b/src/ServiceControl.Transports.IBMMQ.Tests/ServiceControlPrimaryEndpointTests.cs new file mode 100644 index 0000000000..56579e59cd --- /dev/null +++ b/src/ServiceControl.Transports.IBMMQ.Tests/ServiceControlPrimaryEndpointTests.cs @@ -0,0 +1,8 @@ +namespace ServiceControl.Transport.Tests; + +using System; + +partial class ServiceControlPrimaryEndpointTests +{ + private static partial int GetTransportDefaultConcurrency() => 10; +} \ No newline at end of file diff --git a/src/ServiceControl.Transports.IBMMQ.Tests/TestsFilter.cs b/src/ServiceControl.Transports.IBMMQ.Tests/TestsFilter.cs new file mode 100644 index 0000000000..76ebf15442 --- /dev/null +++ b/src/ServiceControl.Transports.IBMMQ.Tests/TestsFilter.cs @@ -0,0 +1 @@ +[assembly: IncludeInIBMMQTests()] \ No newline at end of file diff --git a/src/ServiceControl.Transports.IBMMQ.Tests/TransportTestsConfiguration.cs b/src/ServiceControl.Transports.IBMMQ.Tests/TransportTestsConfiguration.cs new file mode 100644 index 0000000000..6d52918c91 --- /dev/null +++ b/src/ServiceControl.Transports.IBMMQ.Tests/TransportTestsConfiguration.cs @@ -0,0 +1,53 @@ +namespace ServiceControl.Transport.Tests; + +using System; +using System.Threading.Tasks; +using Transports; +using Transports.IBMMQ; +using NServiceBus; +using NServiceBus.Transport.IBMMQ; +using NUnit.Framework; + +[SetUpFixture] +public class BootstrapFixture +{ + [OneTimeSetUp] + public void RunBeforeAnyTests() => TransportTestFixture.QueueNameSeparator = '.'; +} + +class TransportTestsConfiguration +{ + public string ConnectionString { get; private set; } + + public ITransportCustomization TransportCustomization { get; private set; } + + public Task Configure() + { + TransportCustomization = new TestIBMMQTransportCustomization(); + ConnectionString = Environment.GetEnvironmentVariable(ConnectionStringKey) + ?? Environment.GetEnvironmentVariable(ConnectionStringKey.ToUpperInvariant()); // Env keys are case sensitive, POSIX is all uppercase + + if (string.IsNullOrEmpty(ConnectionString)) + { + throw new Exception($"Environment variable {ConnectionStringKey} is required for IBM MQ transport tests to run"); + } + + return Task.CompletedTask; + } + + public Task Cleanup() => Task.CompletedTask; + + static string ConnectionStringKey = "ServiceControl_TransportTests_IBMMQ_ConnectionString"; +} + +sealed class TestIBMMQTransportCustomization : IBMMQTransportCustomization +{ + protected override IBMMQTransport CreateTransport(TransportSettings transportSettings, TransportTransactionMode preferredTransactionMode = TransportTransactionMode.ReceiveOnly) + { + transportSettings.Set>(o => o.ResourceNameSanitizer = name => name + .Replace("ServiceControlMonitoring", "SCM") // Mitigate max queue name length + .Replace("-", ".") // dash is an illegal char + ); + return base.CreateTransport(transportSettings, preferredTransactionMode); + } +} \ No newline at end of file diff --git a/src/ServiceControl.Transports.IBMMQ/ConnectionProperties.cs b/src/ServiceControl.Transports.IBMMQ/ConnectionProperties.cs new file mode 100644 index 0000000000..4c3606b5e6 --- /dev/null +++ b/src/ServiceControl.Transports.IBMMQ/ConnectionProperties.cs @@ -0,0 +1,66 @@ +namespace ServiceControl.Transports.IBMMQ; + +using System; +using System.Collections; +using System.Web; +using IBM.WMQ; + +static class ConnectionProperties +{ + public static (string queueManagerName, Hashtable properties) Parse(string connectionString) + { + var connectionUri = new Uri(connectionString); + var query = HttpUtility.ParseQueryString(connectionUri.Query); + + var queueManagerName = connectionUri.AbsolutePath.Trim('/') is { Length: > 0 } path + ? Uri.UnescapeDataString(path) + : "QM1"; + + var properties = new Hashtable + { + [MQC.TRANSPORT_PROPERTY] = MQC.TRANSPORT_MQSERIES_MANAGED, + [MQC.HOST_NAME_PROPERTY] = connectionUri.Host, + [MQC.PORT_PROPERTY] = connectionUri.Port > 0 ? connectionUri.Port : 1414, + [MQC.CHANNEL_PROPERTY] = query["channel"] ?? "DEV.ADMIN.SVRCONN" + }; + + var userInfo = connectionUri.UserInfo; + if (!string.IsNullOrEmpty(userInfo)) + { + var parts = userInfo.Split(':'); + var user = Uri.UnescapeDataString(parts[0]); + + if (!string.IsNullOrWhiteSpace(user)) + { + properties[MQC.USE_MQCSP_AUTHENTICATION_PROPERTY] = true; + properties[MQC.USER_ID_PROPERTY] = user; + } + + if (parts.Length > 1) + { + var password = Uri.UnescapeDataString(parts[1]); + if (!string.IsNullOrWhiteSpace(password)) + { + properties[MQC.PASSWORD_PROPERTY] = password; + } + } + } + + if (query["sslkeyrepo"] is { } sslKeyRepo) + { + properties[MQC.SSL_CERT_STORE_PROPERTY] = sslKeyRepo; + } + + if (query["cipherspec"] is { } cipherSpec) + { + properties[MQC.SSL_CIPHER_SPEC_PROPERTY] = cipherSpec; + } + + if (query["sslpeername"] is { } sslPeerName) + { + properties[MQC.SSL_PEER_NAME_PROPERTY] = sslPeerName; + } + + return (queueManagerName, properties); + } +} diff --git a/src/ServiceControl.Transports.IBMMQ/DeadLetterQueueCheck.cs b/src/ServiceControl.Transports.IBMMQ/DeadLetterQueueCheck.cs new file mode 100644 index 0000000000..5bc5041dd9 --- /dev/null +++ b/src/ServiceControl.Transports.IBMMQ/DeadLetterQueueCheck.cs @@ -0,0 +1,66 @@ +namespace ServiceControl.Transports.IBMMQ; + +using System; +using System.Threading; +using System.Threading.Tasks; +using IBM.WMQ; +using Microsoft.Extensions.Logging; +using NServiceBus.CustomChecks; +using ServiceControl.Infrastructure; + +public class DeadLetterQueueCheck : CustomCheck +{ + public DeadLetterQueueCheck(TransportSettings settings) : base(id: "Dead Letter Queue", category: "Transport", repeatAfter: TimeSpan.FromHours(1)) + { + Logger.LogDebug("IBM MQ Dead Letter Queue custom check starting"); + + (queueManagerName, connectionProperties) = ConnectionProperties.Parse(settings.ConnectionString); + runCheck = settings.RunCustomChecks; + } + + public override Task PerformCheck(CancellationToken cancellationToken = default) + { + if (!runCheck) + { + return Task.FromResult(CheckResult.Pass); + } + + Logger.LogDebug("Checking Dead Letter Queue length"); + + try + { + using var queueManager = new MQQueueManager(queueManagerName, connectionProperties); + + var dlqName = queueManager.DeadLetterQueueName?.Trim(); + if (string.IsNullOrEmpty(dlqName)) + { + return Task.FromResult(CheckResult.Pass); + } + + using var dlq = queueManager.AccessQueue(dlqName, MQC.MQOO_INQUIRE | MQC.MQOO_FAIL_IF_QUIESCING); + var depth = dlq.CurrentDepth; + + if (depth > 0) + { + var message = $"{depth} messages in the Dead Letter Queue '{dlqName}' on queue manager '{queueManagerName}'. This could indicate a problem with ServiceControl's retries. Please submit a support ticket to Particular if you would like help from our engineers to ensure no message loss while resolving these dead letter messages."; + Logger.LogWarning("{DeadLetterMessageCount} messages in the Dead Letter Queue '{DeadLetterQueueName}' on queue manager '{QueueManagerName}'", depth, dlqName, queueManagerName); + return Task.FromResult(CheckResult.Failed(message)); + } + + Logger.LogDebug("No messages in Dead Letter Queue"); + return Task.FromResult(CheckResult.Pass); + } + catch (MQException e) + { + var message = $"Unable to check Dead Letter Queue on queue manager '{queueManagerName}'. Reason: {e.Message} (RC={e.ReasonCode})"; + Logger.LogWarning(e, "Unable to check Dead Letter Queue on queue manager '{QueueManagerName}'", queueManagerName); + return Task.FromResult(CheckResult.Failed(message)); + } + } + + readonly string queueManagerName; + readonly System.Collections.Hashtable connectionProperties; + readonly bool runCheck; + + static readonly ILogger Logger = LoggerUtil.CreateStaticLogger(typeof(DeadLetterQueueCheck)); +} diff --git a/src/ServiceControl.Transports.IBMMQ/IBMMQTransportCustomization.cs b/src/ServiceControl.Transports.IBMMQ/IBMMQTransportCustomization.cs new file mode 100644 index 0000000000..a818034d14 --- /dev/null +++ b/src/ServiceControl.Transports.IBMMQ/IBMMQTransportCustomization.cs @@ -0,0 +1,43 @@ +namespace ServiceControl.Transports.IBMMQ; + +using System; +using System.Linq; +using Microsoft.Extensions.DependencyInjection; +using NServiceBus; +using NServiceBus.Transport.IBMMQ; + +public class IBMMQTransportCustomization : TransportCustomization +{ + protected override void CustomizeTransportForPrimaryEndpoint(EndpointConfiguration endpointConfiguration, IBMMQTransport transportDefinition, TransportSettings transportSettings) => + transportDefinition.TransportTransactionMode = TransportTransactionMode.SendsAtomicWithReceive; + + protected override void CustomizeTransportForAuditEndpoint(EndpointConfiguration endpointConfiguration, IBMMQTransport transportDefinition, TransportSettings transportSettings) => + transportDefinition.TransportTransactionMode = TransportTransactionMode.ReceiveOnly; + + protected override void CustomizeTransportForMonitoringEndpoint(EndpointConfiguration endpointConfiguration, IBMMQTransport transportDefinition, TransportSettings transportSettings) => + transportDefinition.TransportTransactionMode = TransportTransactionMode.ReceiveOnly; + + protected override void AddTransportForMonitoringCore(IServiceCollection services, TransportSettings transportSettings) + { + services.AddSingleton(); + services.AddHostedService(provider => provider.GetRequiredService()); + } + + protected override IBMMQTransport CreateTransport(TransportSettings transportSettings, TransportTransactionMode preferredTransactionMode = TransportTransactionMode.ReceiveOnly) + { + var transport = new IBMMQTransport(o => + { + if (transportSettings.TryGet>(out var overrides)) + { + overrides(o); + } + + TestConnectionDetails.Apply(o); + }); + transport.TransportTransactionMode = transport.GetSupportedTransactionModes().Contains(preferredTransactionMode) + ? preferredTransactionMode + : TransportTransactionMode.ReceiveOnly; + + return transport; + } +} \ No newline at end of file diff --git a/src/ServiceControl.Transports.IBMMQ/QueueLengthProvider.cs b/src/ServiceControl.Transports.IBMMQ/QueueLengthProvider.cs new file mode 100644 index 0000000000..7dcf53bcc2 --- /dev/null +++ b/src/ServiceControl.Transports.IBMMQ/QueueLengthProvider.cs @@ -0,0 +1,161 @@ +#nullable enable +namespace ServiceControl.Transports.IBMMQ; + +using System; +using System.Collections; +using System.Collections.Concurrent; +using System.Threading; +using System.Threading.Tasks; +using IBM.WMQ; +using Microsoft.Extensions.Logging; + +class QueueLengthProvider : AbstractQueueLengthProvider +{ + public QueueLengthProvider(TransportSettings settings, Action store, ILogger logger) + : base(settings, store) + { + this.logger = logger; + (queueManagerName, connectionProperties) = ConnectionProperties.Parse(ConnectionString); + } + + public override void TrackEndpointInputQueue(EndpointToQueueMapping queueToTrack) => + endpointQueues.AddOrUpdate(queueToTrack.EndpointName, _ => queueToTrack.InputQueue, (_, currentValue) => + { + if (currentValue != queueToTrack.InputQueue) + { + sizes.TryRemove(currentValue, out var _); + } + + return queueToTrack.InputQueue; + }); + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + while (!stoppingToken.IsCancellationRequested) + { + try + { + FetchQueueLengths(); + + UpdateQueueLengthStore(); + + await Task.Delay(QueryDelayInterval, stoppingToken).ConfigureAwait(false); + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + // no-op + } + catch (Exception e) + { + logger.LogError(e, "Queue length query loop failure"); + CloseConnection(); + await Task.Delay(ReconnectDelayInterval, stoppingToken).ConfigureAwait(false); + } + } + + CloseConnection(); + } + + void UpdateQueueLengthStore() + { + var nowTicks = DateTime.UtcNow.Ticks; + + foreach (var endpointQueuePair in endpointQueues) + { + if (sizes.TryGetValue(endpointQueuePair.Value, out var size)) + { + Store( + [ + new QueueLengthEntry + { + DateTicks = nowTicks, + Value = size + } + ], + new EndpointToQueueMapping(endpointQueuePair.Key, endpointQueuePair.Value)); + } + } + } + + void FetchQueueLengths() + { + if (endpointQueues.IsEmpty) + { + return; + } + + var manager = EnsureConnected(); + + foreach (var endpointQueuePair in endpointQueues) + { + var queueName = endpointQueuePair.Value; + try + { + using var queue = manager.AccessQueue(queueName, MQC.MQOO_INQUIRE | MQC.MQOO_FAIL_IF_QUIESCING); + sizes[queueName] = queue.CurrentDepth; + } + catch (MQException e) when (IsConnectionError(e)) + { + logger.LogWarning(e, "Lost connection to queue manager while querying {QueueName}", queueName); + CloseConnection(); + throw; + } + catch (Exception e) + { + logger.LogWarning(e, "Error querying queue length for {QueueName}", queueName); + } + } + } + + MQQueueManager EnsureConnected() + { + if (queueManager is not null) + { + return queueManager; + } + + queueManager = new MQQueueManager(queueManagerName, connectionProperties); + logger.LogInformation("Connected to queue manager '{QueueManagerName}'", queueManagerName); + return queueManager; + } + + void CloseConnection() + { + if (queueManager is null) + { + return; + } + + try + { + queueManager.Disconnect(); + } + catch (Exception e) + { + logger.LogDebug(e, "Error disconnecting from queue manager"); + } + + queueManager = null; + } + + static bool IsConnectionError(MQException e) => e.ReasonCode + is MQC.MQRC_CONNECTION_BROKEN + or MQC.MQRC_CONNECTION_ERROR + or MQC.MQRC_CONNECTION_STOPPED + or MQC.MQRC_CONNECTION_QUIESCING + or MQC.MQRC_CONNECTION_NOT_AVAILABLE + or MQC.MQRC_Q_MGR_NOT_AVAILABLE + or MQC.MQRC_Q_MGR_NOT_ACTIVE; + + static readonly TimeSpan QueryDelayInterval = TimeSpan.FromMilliseconds(200); + static readonly TimeSpan ReconnectDelayInterval = TimeSpan.FromSeconds(10); + + MQQueueManager? queueManager; + + readonly ConcurrentDictionary endpointQueues = new(); + readonly ConcurrentDictionary sizes = new(); + + readonly string queueManagerName; + readonly Hashtable connectionProperties; + readonly ILogger logger; +} diff --git a/src/ServiceControl.Transports.IBMMQ/ServiceControl.Transports.IBMMQ.csproj b/src/ServiceControl.Transports.IBMMQ/ServiceControl.Transports.IBMMQ.csproj new file mode 100644 index 0000000000..2b2d9a9296 --- /dev/null +++ b/src/ServiceControl.Transports.IBMMQ/ServiceControl.Transports.IBMMQ.csproj @@ -0,0 +1,26 @@ + + + + net10.0 + true + + + + + + + + + + + + + + + + + + + + + diff --git a/src/ServiceControl.Transports.IBMMQ/TestConnectionDetails.cs b/src/ServiceControl.Transports.IBMMQ/TestConnectionDetails.cs new file mode 100644 index 0000000000..573ac0e28d --- /dev/null +++ b/src/ServiceControl.Transports.IBMMQ/TestConnectionDetails.cs @@ -0,0 +1,59 @@ +#nullable enable +using System; +using System.Collections.Specialized; +using System.Web; +using NServiceBus.Transport.IBMMQ; + +/// +/// Copied directly from: +/// +/// https://github.com/ParticularLabs/NServiceBus.IBMMQ/blob/main/src/Testing/TestConnectionDetails.cs +/// +/// +static class TestConnectionDetails +{ + // mq://admin:passw0rd@localhost:1414/QM1?appname=&sslkeyrepo=&cipherspec=&sslpeername=&topicprefix=DEV&channel=DEV.ADMIN.SVRCONN + static readonly Uri ConnectionUri = new( + Environment.GetEnvironmentVariable("ServiceControl_TransportTests_IBMMQ_ConnectionString") + ?? Environment.GetEnvironmentVariable("SERVICECONTROL_TRANSPORTTESTS_IBMMQ_CONNECTIONSTRING") + ?? "mq://admin:passw0rd@localhost:1414" + ); + + static readonly NameValueCollection Query = HttpUtility.ParseQueryString(ConnectionUri.Query); + + public static string Host => ConnectionUri.Host; + public static int Port => ConnectionUri.Port > 0 ? ConnectionUri.Port : 1414; + public static string User => Uri.UnescapeDataString(ConnectionUri.UserInfo.Split(':')[0]); + public static string Password => Uri.UnescapeDataString(ConnectionUri.UserInfo.Split(':')[1]); + public static string QueueManagerName => ConnectionUri.AbsolutePath.Trim('/') is { Length: > 0 } path ? Uri.UnescapeDataString(path) : "QM1"; + public static string Channel => Query["channel"] ?? "DEV.ADMIN.SVRCONN"; + public static string TopicPrefix => Query["topicprefix"] ?? "DEV"; + + + public static void Apply(IBMMQTransportOptions options) + { + options.Host = Host; + options.Port = Port; + options.User = User; + options.Password = Password; + options.QueueManagerName = QueueManagerName; + options.Channel = Channel; + + if (Query["appname"] is { } appName) + { + options.ApplicationName = appName; + } + if (Query["sslkeyrepo"] is { } sslKeyRepo) + { + options.SslKeyRepository = sslKeyRepo; + } + if (Query["cipherspec"] is { } cipherSpec) + { + options.CipherSpec = cipherSpec; + } + if (Query["sslpeername"] is { } sslPeerName) + { + options.SslPeerName = sslPeerName; + } + } +} diff --git a/src/ServiceControl.Transports.IBMMQ/transport.manifest b/src/ServiceControl.Transports.IBMMQ/transport.manifest new file mode 100644 index 0000000000..0cb1b0f720 --- /dev/null +++ b/src/ServiceControl.Transports.IBMMQ/transport.manifest @@ -0,0 +1,14 @@ +{ + "Definitions": [ + { + "Name": "IBMMQ", + "DisplayName": "IBM MQ", + "AssemblyName": "ServiceControl.Transports.IBMMQ", + "TypeName": "ServiceControl.Transports.IBMMQ.IBMMQTransportCustomization, ServiceControl.Transports.IBMMQ", + "SampleConnectionString": "mq://admin:passw0rd@localhost:1414/QM1?topicprefix=DEV&channel=DEV.ADMIN.SVRCONN&appname={APPNAME}&sslkeyrepo={SSLKEYREPO}&cipherspec={CIPHERSPEC}&sslpeername={SSLPEERNAME}", + "AvailableInSCMU": true, + "Aliases": [ + ] + } + ] +} \ No newline at end of file diff --git a/src/ServiceControl.Transports.Tests/ApprovalFiles/TransportManifestLibraryTests.All_types_defined_in_manifest_files_exist_in_specified_assembly.approved.txt b/src/ServiceControl.Transports.Tests/ApprovalFiles/TransportManifestLibraryTests.All_types_defined_in_manifest_files_exist_in_specified_assembly.approved.txt index 6d201c38cc..397809505d 100644 --- a/src/ServiceControl.Transports.Tests/ApprovalFiles/TransportManifestLibraryTests.All_types_defined_in_manifest_files_exist_in_specified_assembly.approved.txt +++ b/src/ServiceControl.Transports.Tests/ApprovalFiles/TransportManifestLibraryTests.All_types_defined_in_manifest_files_exist_in_specified_assembly.approved.txt @@ -1,6 +1,7 @@ [ "AmazonSQS", "AzureStorageQueue", + "IBMMQ", "LearningTransport", "LearningTransport", "MSMQ", diff --git a/src/ServiceControl.Transports.Tests/TransportTestFixture.cs b/src/ServiceControl.Transports.Tests/TransportTestFixture.cs index de88f52cac..7c36817a98 100644 --- a/src/ServiceControl.Transports.Tests/TransportTestFixture.cs +++ b/src/ServiceControl.Transports.Tests/TransportTestFixture.cs @@ -21,6 +21,11 @@ abstract class TransportTestFixture { + /// + /// Not all transports support - as a separator, as this is not a source package this can be overridden. + /// + public static char QueueNameSeparator = '-'; + [SetUp] public virtual async Task Setup() { @@ -30,7 +35,7 @@ public virtual async Task Setup() configuration = new TransportTestsConfiguration(); testCancellationTokenSource = Debugger.IsAttached ? new CancellationTokenSource() : new CancellationTokenSource(TestTimeout); registrations = []; - queueSuffix = $"-{Path.GetRandomFileName().Replace(".", string.Empty)}"; + queueSuffix = $"{QueueNameSeparator}{Path.GetRandomFileName().Replace(".", string.Empty)}"; await configuration.Configure(); @@ -70,7 +75,7 @@ public virtual async Task Cleanup() protected IMessageDispatcher Dispatcher => dispatcherTransportInfrastructure.Dispatcher; - protected string GetTestQueueName(string name) => $"{name}-{queueSuffix}"; + protected string GetTestQueueName(string name) => $"{name}{queueSuffix}"; protected TaskCompletionSource CreateTaskCompletionSource() { diff --git a/src/ServiceControl.Transports/DevelopmentTransportLocations.cs b/src/ServiceControl.Transports/DevelopmentTransportLocations.cs index b2b81e72b6..95e7e8c021 100644 --- a/src/ServiceControl.Transports/DevelopmentTransportLocations.cs +++ b/src/ServiceControl.Transports/DevelopmentTransportLocations.cs @@ -19,6 +19,7 @@ static DevelopmentTransportLocations() { ManifestFiles.Add(BuildManifestPath(srcFolder, "ServiceControl.Transports.ASBS")); ManifestFiles.Add(BuildManifestPath(srcFolder, "ServiceControl.Transports.ASQ")); + ManifestFiles.Add(BuildManifestPath(srcFolder, "ServiceControl.Transports.IBMMQ")); ManifestFiles.Add(BuildManifestPath(srcFolder, "ServiceControl.Transports.Learning")); ManifestFiles.Add(BuildManifestPath(srcFolder, "ServiceControl.Transports.Msmq", "net10.0-windows")); ManifestFiles.Add(BuildManifestPath(srcFolder, "ServiceControl.Transports.RabbitMQ")); diff --git a/src/ServiceControl.slnx b/src/ServiceControl.slnx index 2ff353b79a..fc2f5dd384 100644 --- a/src/ServiceControl.slnx +++ b/src/ServiceControl.slnx @@ -13,6 +13,7 @@ + @@ -78,6 +79,7 @@ + @@ -89,6 +91,7 @@ + diff --git a/src/ServiceControlInstaller.Engine.UnitTests/ApprovalFiles/APIApprovals.TransportNames.approved.txt b/src/ServiceControlInstaller.Engine.UnitTests/ApprovalFiles/APIApprovals.TransportNames.approved.txt index bb2e392a4e..58499293d7 100644 --- a/src/ServiceControlInstaller.Engine.UnitTests/ApprovalFiles/APIApprovals.TransportNames.approved.txt +++ b/src/ServiceControlInstaller.Engine.UnitTests/ApprovalFiles/APIApprovals.TransportNames.approved.txt @@ -35,6 +35,11 @@ "ServiceControl.Transports.AzureStorageQueues.ServiceControlAzureStorageQueueTransport, ServiceControl.Transports.AzureStorageQueues" ] }, + { + "Name": "IBMMQ", + "DisplayName": "IBM MQ", + "Aliases": [] + }, { "Name": "LearningTransport", "DisplayName": "Learning Transport (Non-Production)", diff --git a/src/ServiceControlInstaller.Packaging.UnitTests/DeploymentPackageTests.cs b/src/ServiceControlInstaller.Packaging.UnitTests/DeploymentPackageTests.cs index 2a9390c7a3..b4fb53fd8e 100644 --- a/src/ServiceControlInstaller.Packaging.UnitTests/DeploymentPackageTests.cs +++ b/src/ServiceControlInstaller.Packaging.UnitTests/DeploymentPackageTests.cs @@ -88,7 +88,8 @@ public void Should_package_all_transports() "MSMQ", "AmazonSQS", "LearningTransport", - "PostgreSql"}; + "PostgreSql", + "IBMMQ"}; var bundledTransports = deploymentPackage.DeploymentUnits .Where(u => u.Category == "Transports") diff --git a/src/TestHelper/IncludeInIBMMQTestsAttribute.cs b/src/TestHelper/IncludeInIBMMQTestsAttribute.cs new file mode 100644 index 0000000000..2bb18aeb93 --- /dev/null +++ b/src/TestHelper/IncludeInIBMMQTestsAttribute.cs @@ -0,0 +1,4 @@ +public class IncludeInIBMMQTestsAttribute : IncludeInTestsAttribute +{ + protected override string Filter => "IBMMQ"; +}