diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 6a8b91de03..57ceb9f956 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -18,7 +18,7 @@ jobs:
strategy:
matrix:
os: [windows-latest, ubuntu-latest]
- test-category: [ Default, SqlServer, AzureServiceBus, RabbitMQ, AzureStorageQueues, MSMQ, SQS, PrimaryRavenAcceptance, PrimaryRavenPersistence, PostgreSQL ]
+ test-category: [ Default, SqlServer, AzureServiceBus, RabbitMQ, AzureStorageQueues, MSMQ, SQS, PrimaryRavenAcceptance, PrimaryRavenPersistence, PostgreSQL, IBMMQ ]
include:
- os: windows-latest
os-name: Windows
@@ -27,6 +27,8 @@ jobs:
exclude:
- os: ubuntu-latest
test-category: MSMQ
+ - os: windows-latest
+ test-category: IBMMQ
fail-fast: false
steps:
- name: Check for secrets
@@ -103,6 +105,18 @@ jobs:
connection-string-name: ServiceControl_TransportTests_ASQ_ConnectionString
azure-credentials: ${{ secrets.AZURE_ACI_CREDENTIALS }}
tag: ServiceControl
+ - name: Setup IBM MQ
+ if: matrix.test-category == 'IBMMQ'
+ run: |
+ docker run --name ibmmq -d -p 1414:1414 -p 9443:9443 `
+ --health-cmd "dspmq" --health-interval 10s --health-timeout 5s --health-retries 10 --health-start-period 30s `
+ -e LICENSE=accept -e MQ_QMGR_NAME=QM1 -e MQ_ADMIN_PASSWORD=passw0rd `
+ icr.io/ibm-messaging/mq:latest
+ # Wait for container health check to pass
+ while ((docker inspect --format '{{.State.Health.Status}}' ibmmq) -ne 'healthy') {
+ Start-Sleep -Seconds 2
+ }
+ echo "ServiceControl_TransportTests_IBMMQ_ConnectionString=mq://admin:passw0rd@localhost:1414/QM1?channel=DEV.ADMIN.SVRCONN&topicprefix=DEV" | Out-File -FilePath $Env:GITHUB_ENV -Encoding utf-8 -Append
- name: Setup SQS environment variables
if: matrix.test-category == 'SQS'
run: |
diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props
index fa54c02e52..5ff1d79444 100644
--- a/src/Directory.Packages.props
+++ b/src/Directory.Packages.props
@@ -52,6 +52,7 @@
+
diff --git a/src/ServiceControl.Transports.IBMMQ.Tests/DeadLetterQueueCheckTests.cs b/src/ServiceControl.Transports.IBMMQ.Tests/DeadLetterQueueCheckTests.cs
new file mode 100644
index 0000000000..b05f681e9c
--- /dev/null
+++ b/src/ServiceControl.Transports.IBMMQ.Tests/DeadLetterQueueCheckTests.cs
@@ -0,0 +1,148 @@
+namespace ServiceControl.Transport.Tests;
+
+using System;
+using System.Collections;
+using System.Threading.Tasks;
+using System.Web;
+using IBM.WMQ;
+using NUnit.Framework;
+using Transports;
+using Transports.IBMMQ;
+
+[TestFixture]
+class DeadLetterQueueCheckTests
+{
+ [Test]
+ public async Task Should_pass_when_custom_checks_disabled()
+ {
+ var settings = new TransportSettings
+ {
+ ConnectionString = ConnectionString,
+ RunCustomChecks = false
+ };
+
+ var check = new DeadLetterQueueCheck(settings);
+ var result = await check.PerformCheck().ConfigureAwait(false);
+
+ Assert.That(result.HasFailed, Is.False);
+ }
+
+ [Test]
+ public async Task Should_pass_when_dlq_is_empty()
+ {
+ DrainDeadLetterQueue();
+
+ var settings = new TransportSettings
+ {
+ ConnectionString = ConnectionString,
+ RunCustomChecks = true
+ };
+
+ var check = new DeadLetterQueueCheck(settings);
+ var result = await check.PerformCheck().ConfigureAwait(false);
+
+ Assert.That(result.HasFailed, Is.False);
+ }
+
+ [Test]
+ public async Task Should_fail_when_dlq_has_messages()
+ {
+ DrainDeadLetterQueue();
+ PutMessageOnDeadLetterQueue();
+
+ try
+ {
+ var settings = new TransportSettings
+ {
+ ConnectionString = ConnectionString,
+ RunCustomChecks = true
+ };
+
+ var check = new DeadLetterQueueCheck(settings);
+ var result = await check.PerformCheck().ConfigureAwait(false);
+
+ Assert.That(result.HasFailed, Is.True);
+ Assert.That(result.FailureReason, Does.Contain("messages in the Dead Letter Queue"));
+ }
+ finally
+ {
+ DrainDeadLetterQueue();
+ }
+ }
+
+ [Test]
+ public async Task Should_fail_when_connection_is_invalid()
+ {
+ var settings = new TransportSettings
+ {
+ ConnectionString = "mq://admin:passw0rd@localhost:19999/BOGUS",
+ RunCustomChecks = true
+ };
+
+ var check = new DeadLetterQueueCheck(settings);
+ var result = await check.PerformCheck().ConfigureAwait(false);
+
+ Assert.That(result.HasFailed, Is.True);
+ Assert.That(result.FailureReason, Does.Contain("Unable to check Dead Letter Queue"));
+ Assert.That(result.FailureReason, Does.Contain("RC="));
+ }
+
+ static void PutMessageOnDeadLetterQueue()
+ {
+ var (qmName, props) = ParseConnectionString();
+ using var qm = new MQQueueManager(qmName, props);
+ var dlqName = qm.DeadLetterQueueName.Trim();
+ using var dlq = qm.AccessQueue(dlqName, MQC.MQOO_OUTPUT);
+ var msg = new MQMessage();
+ msg.WriteString("DLQ test message");
+ dlq.Put(msg);
+ }
+
+ static void DrainDeadLetterQueue()
+ {
+ var (qmName, props) = ParseConnectionString();
+ using var qm = new MQQueueManager(qmName, props);
+ var dlqName = qm.DeadLetterQueueName.Trim();
+ using var dlq = qm.AccessQueue(dlqName, MQC.MQOO_INPUT_SHARED | MQC.MQOO_FAIL_IF_QUIESCING);
+ var gmo = new MQGetMessageOptions { WaitInterval = 0, Options = MQC.MQGMO_NO_WAIT };
+ while (true)
+ {
+ try
+ {
+ dlq.Get(new MQMessage(), gmo);
+ }
+ catch (MQException e) when (e.ReasonCode == MQC.MQRC_NO_MSG_AVAILABLE)
+ {
+ break;
+ }
+ }
+ }
+
+ static (string queueManagerName, Hashtable properties) ParseConnectionString()
+ {
+ var uri = new Uri(ConnectionString);
+ var query = HttpUtility.ParseQueryString(uri.Query);
+
+ var qmName = uri.AbsolutePath.Trim('/') is { Length: > 0 } path
+ ? Uri.UnescapeDataString(path)
+ : "QM1";
+
+ var props = new Hashtable
+ {
+ [MQC.TRANSPORT_PROPERTY] = MQC.TRANSPORT_MQSERIES_MANAGED,
+ [MQC.HOST_NAME_PROPERTY] = uri.Host,
+ [MQC.PORT_PROPERTY] = uri.Port > 0 ? uri.Port : 1414,
+ [MQC.CHANNEL_PROPERTY] = query["channel"] ?? "DEV.ADMIN.SVRCONN",
+ [MQC.USE_MQCSP_AUTHENTICATION_PROPERTY] = true,
+ [MQC.USER_ID_PROPERTY] = Uri.UnescapeDataString(uri.UserInfo.Split(':')[0]),
+ [MQC.PASSWORD_PROPERTY] = Uri.UnescapeDataString(uri.UserInfo.Split(':')[1])
+ };
+
+ return (qmName, props);
+ }
+
+ static readonly string ConnectionString =
+ Environment.GetEnvironmentVariable("ServiceControl_TransportTests_IBMMQ_ConnectionString")
+ ?? Environment.GetEnvironmentVariable("SERVICECONTROL_TRANSPORTTESTS_IBMMQ_CONNECTIONSTRING")
+ ?? "mq://admin:passw0rd@localhost:1414";
+}
diff --git a/src/ServiceControl.Transports.IBMMQ.Tests/ServiceControl.Transports.IBMMQ.Tests.csproj b/src/ServiceControl.Transports.IBMMQ.Tests/ServiceControl.Transports.IBMMQ.Tests.csproj
new file mode 100644
index 0000000000..b8c5306c4e
--- /dev/null
+++ b/src/ServiceControl.Transports.IBMMQ.Tests/ServiceControl.Transports.IBMMQ.Tests.csproj
@@ -0,0 +1,32 @@
+
+
+
+ net10.0
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/ServiceControl.Transports.IBMMQ.Tests/ServiceControlAuditEndpointTests.cs b/src/ServiceControl.Transports.IBMMQ.Tests/ServiceControlAuditEndpointTests.cs
new file mode 100644
index 0000000000..f70a724658
--- /dev/null
+++ b/src/ServiceControl.Transports.IBMMQ.Tests/ServiceControlAuditEndpointTests.cs
@@ -0,0 +1,8 @@
+namespace ServiceControl.Transport.Tests;
+
+using System;
+
+partial class ServiceControlAuditEndpointTests
+{
+ private static partial int GetTransportDefaultConcurrency() => 32;
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Transports.IBMMQ.Tests/ServiceControlMonitoringEndpointTests.cs b/src/ServiceControl.Transports.IBMMQ.Tests/ServiceControlMonitoringEndpointTests.cs
new file mode 100644
index 0000000000..0a62a8e7d4
--- /dev/null
+++ b/src/ServiceControl.Transports.IBMMQ.Tests/ServiceControlMonitoringEndpointTests.cs
@@ -0,0 +1,8 @@
+namespace ServiceControl.Transport.Tests;
+
+using System;
+
+partial class ServiceControlMonitoringEndpointTests
+{
+ private static partial int GetTransportDefaultConcurrency() => 32;
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Transports.IBMMQ.Tests/ServiceControlPrimaryEndpointTests.cs b/src/ServiceControl.Transports.IBMMQ.Tests/ServiceControlPrimaryEndpointTests.cs
new file mode 100644
index 0000000000..56579e59cd
--- /dev/null
+++ b/src/ServiceControl.Transports.IBMMQ.Tests/ServiceControlPrimaryEndpointTests.cs
@@ -0,0 +1,8 @@
+namespace ServiceControl.Transport.Tests;
+
+using System;
+
+partial class ServiceControlPrimaryEndpointTests
+{
+ private static partial int GetTransportDefaultConcurrency() => 10;
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Transports.IBMMQ.Tests/TestsFilter.cs b/src/ServiceControl.Transports.IBMMQ.Tests/TestsFilter.cs
new file mode 100644
index 0000000000..76ebf15442
--- /dev/null
+++ b/src/ServiceControl.Transports.IBMMQ.Tests/TestsFilter.cs
@@ -0,0 +1 @@
+[assembly: IncludeInIBMMQTests()]
\ No newline at end of file
diff --git a/src/ServiceControl.Transports.IBMMQ.Tests/TransportTestsConfiguration.cs b/src/ServiceControl.Transports.IBMMQ.Tests/TransportTestsConfiguration.cs
new file mode 100644
index 0000000000..6d52918c91
--- /dev/null
+++ b/src/ServiceControl.Transports.IBMMQ.Tests/TransportTestsConfiguration.cs
@@ -0,0 +1,53 @@
+namespace ServiceControl.Transport.Tests;
+
+using System;
+using System.Threading.Tasks;
+using Transports;
+using Transports.IBMMQ;
+using NServiceBus;
+using NServiceBus.Transport.IBMMQ;
+using NUnit.Framework;
+
+[SetUpFixture]
+public class BootstrapFixture
+{
+ [OneTimeSetUp]
+ public void RunBeforeAnyTests() => TransportTestFixture.QueueNameSeparator = '.';
+}
+
+class TransportTestsConfiguration
+{
+ public string ConnectionString { get; private set; }
+
+ public ITransportCustomization TransportCustomization { get; private set; }
+
+ public Task Configure()
+ {
+ TransportCustomization = new TestIBMMQTransportCustomization();
+ ConnectionString = Environment.GetEnvironmentVariable(ConnectionStringKey)
+ ?? Environment.GetEnvironmentVariable(ConnectionStringKey.ToUpperInvariant()); // Env keys are case sensitive, POSIX is all uppercase
+
+ if (string.IsNullOrEmpty(ConnectionString))
+ {
+ throw new Exception($"Environment variable {ConnectionStringKey} is required for IBM MQ transport tests to run");
+ }
+
+ return Task.CompletedTask;
+ }
+
+ public Task Cleanup() => Task.CompletedTask;
+
+ static string ConnectionStringKey = "ServiceControl_TransportTests_IBMMQ_ConnectionString";
+}
+
+sealed class TestIBMMQTransportCustomization : IBMMQTransportCustomization
+{
+ protected override IBMMQTransport CreateTransport(TransportSettings transportSettings, TransportTransactionMode preferredTransactionMode = TransportTransactionMode.ReceiveOnly)
+ {
+ transportSettings.Set>(o => o.ResourceNameSanitizer = name => name
+ .Replace("ServiceControlMonitoring", "SCM") // Mitigate max queue name length
+ .Replace("-", ".") // dash is an illegal char
+ );
+ return base.CreateTransport(transportSettings, preferredTransactionMode);
+ }
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Transports.IBMMQ/ConnectionProperties.cs b/src/ServiceControl.Transports.IBMMQ/ConnectionProperties.cs
new file mode 100644
index 0000000000..4c3606b5e6
--- /dev/null
+++ b/src/ServiceControl.Transports.IBMMQ/ConnectionProperties.cs
@@ -0,0 +1,66 @@
+namespace ServiceControl.Transports.IBMMQ;
+
+using System;
+using System.Collections;
+using System.Web;
+using IBM.WMQ;
+
+static class ConnectionProperties
+{
+ public static (string queueManagerName, Hashtable properties) Parse(string connectionString)
+ {
+ var connectionUri = new Uri(connectionString);
+ var query = HttpUtility.ParseQueryString(connectionUri.Query);
+
+ var queueManagerName = connectionUri.AbsolutePath.Trim('/') is { Length: > 0 } path
+ ? Uri.UnescapeDataString(path)
+ : "QM1";
+
+ var properties = new Hashtable
+ {
+ [MQC.TRANSPORT_PROPERTY] = MQC.TRANSPORT_MQSERIES_MANAGED,
+ [MQC.HOST_NAME_PROPERTY] = connectionUri.Host,
+ [MQC.PORT_PROPERTY] = connectionUri.Port > 0 ? connectionUri.Port : 1414,
+ [MQC.CHANNEL_PROPERTY] = query["channel"] ?? "DEV.ADMIN.SVRCONN"
+ };
+
+ var userInfo = connectionUri.UserInfo;
+ if (!string.IsNullOrEmpty(userInfo))
+ {
+ var parts = userInfo.Split(':');
+ var user = Uri.UnescapeDataString(parts[0]);
+
+ if (!string.IsNullOrWhiteSpace(user))
+ {
+ properties[MQC.USE_MQCSP_AUTHENTICATION_PROPERTY] = true;
+ properties[MQC.USER_ID_PROPERTY] = user;
+ }
+
+ if (parts.Length > 1)
+ {
+ var password = Uri.UnescapeDataString(parts[1]);
+ if (!string.IsNullOrWhiteSpace(password))
+ {
+ properties[MQC.PASSWORD_PROPERTY] = password;
+ }
+ }
+ }
+
+ if (query["sslkeyrepo"] is { } sslKeyRepo)
+ {
+ properties[MQC.SSL_CERT_STORE_PROPERTY] = sslKeyRepo;
+ }
+
+ if (query["cipherspec"] is { } cipherSpec)
+ {
+ properties[MQC.SSL_CIPHER_SPEC_PROPERTY] = cipherSpec;
+ }
+
+ if (query["sslpeername"] is { } sslPeerName)
+ {
+ properties[MQC.SSL_PEER_NAME_PROPERTY] = sslPeerName;
+ }
+
+ return (queueManagerName, properties);
+ }
+}
diff --git a/src/ServiceControl.Transports.IBMMQ/DeadLetterQueueCheck.cs b/src/ServiceControl.Transports.IBMMQ/DeadLetterQueueCheck.cs
new file mode 100644
index 0000000000..5bc5041dd9
--- /dev/null
+++ b/src/ServiceControl.Transports.IBMMQ/DeadLetterQueueCheck.cs
@@ -0,0 +1,66 @@
+namespace ServiceControl.Transports.IBMMQ;
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using IBM.WMQ;
+using Microsoft.Extensions.Logging;
+using NServiceBus.CustomChecks;
+using ServiceControl.Infrastructure;
+
+public class DeadLetterQueueCheck : CustomCheck
+{
+ public DeadLetterQueueCheck(TransportSettings settings) : base(id: "Dead Letter Queue", category: "Transport", repeatAfter: TimeSpan.FromHours(1))
+ {
+ Logger.LogDebug("IBM MQ Dead Letter Queue custom check starting");
+
+ (queueManagerName, connectionProperties) = ConnectionProperties.Parse(settings.ConnectionString);
+ runCheck = settings.RunCustomChecks;
+ }
+
+ public override Task PerformCheck(CancellationToken cancellationToken = default)
+ {
+ if (!runCheck)
+ {
+ return Task.FromResult(CheckResult.Pass);
+ }
+
+ Logger.LogDebug("Checking Dead Letter Queue length");
+
+ try
+ {
+ using var queueManager = new MQQueueManager(queueManagerName, connectionProperties);
+
+ var dlqName = queueManager.DeadLetterQueueName?.Trim();
+ if (string.IsNullOrEmpty(dlqName))
+ {
+ return Task.FromResult(CheckResult.Pass);
+ }
+
+ using var dlq = queueManager.AccessQueue(dlqName, MQC.MQOO_INQUIRE | MQC.MQOO_FAIL_IF_QUIESCING);
+ var depth = dlq.CurrentDepth;
+
+ if (depth > 0)
+ {
+ var message = $"{depth} messages in the Dead Letter Queue '{dlqName}' on queue manager '{queueManagerName}'. This could indicate a problem with ServiceControl's retries. Please submit a support ticket to Particular if you would like help from our engineers to ensure no message loss while resolving these dead letter messages.";
+ Logger.LogWarning("{DeadLetterMessageCount} messages in the Dead Letter Queue '{DeadLetterQueueName}' on queue manager '{QueueManagerName}'", depth, dlqName, queueManagerName);
+ return Task.FromResult(CheckResult.Failed(message));
+ }
+
+ Logger.LogDebug("No messages in Dead Letter Queue");
+ return Task.FromResult(CheckResult.Pass);
+ }
+ catch (MQException e)
+ {
+ var message = $"Unable to check Dead Letter Queue on queue manager '{queueManagerName}'. Reason: {e.Message} (RC={e.ReasonCode})";
+ Logger.LogWarning(e, "Unable to check Dead Letter Queue on queue manager '{QueueManagerName}'", queueManagerName);
+ return Task.FromResult(CheckResult.Failed(message));
+ }
+ }
+
+ readonly string queueManagerName;
+ readonly System.Collections.Hashtable connectionProperties;
+ readonly bool runCheck;
+
+ static readonly ILogger Logger = LoggerUtil.CreateStaticLogger(typeof(DeadLetterQueueCheck));
+}
diff --git a/src/ServiceControl.Transports.IBMMQ/IBMMQTransportCustomization.cs b/src/ServiceControl.Transports.IBMMQ/IBMMQTransportCustomization.cs
new file mode 100644
index 0000000000..a818034d14
--- /dev/null
+++ b/src/ServiceControl.Transports.IBMMQ/IBMMQTransportCustomization.cs
@@ -0,0 +1,43 @@
+namespace ServiceControl.Transports.IBMMQ;
+
+using System;
+using System.Linq;
+using Microsoft.Extensions.DependencyInjection;
+using NServiceBus;
+using NServiceBus.Transport.IBMMQ;
+
+public class IBMMQTransportCustomization : TransportCustomization
+{
+ protected override void CustomizeTransportForPrimaryEndpoint(EndpointConfiguration endpointConfiguration, IBMMQTransport transportDefinition, TransportSettings transportSettings) =>
+ transportDefinition.TransportTransactionMode = TransportTransactionMode.SendsAtomicWithReceive;
+
+ protected override void CustomizeTransportForAuditEndpoint(EndpointConfiguration endpointConfiguration, IBMMQTransport transportDefinition, TransportSettings transportSettings) =>
+ transportDefinition.TransportTransactionMode = TransportTransactionMode.ReceiveOnly;
+
+ protected override void CustomizeTransportForMonitoringEndpoint(EndpointConfiguration endpointConfiguration, IBMMQTransport transportDefinition, TransportSettings transportSettings) =>
+ transportDefinition.TransportTransactionMode = TransportTransactionMode.ReceiveOnly;
+
+ protected override void AddTransportForMonitoringCore(IServiceCollection services, TransportSettings transportSettings)
+ {
+ services.AddSingleton();
+ services.AddHostedService(provider => provider.GetRequiredService());
+ }
+
+ protected override IBMMQTransport CreateTransport(TransportSettings transportSettings, TransportTransactionMode preferredTransactionMode = TransportTransactionMode.ReceiveOnly)
+ {
+ var transport = new IBMMQTransport(o =>
+ {
+ if (transportSettings.TryGet>(out var overrides))
+ {
+ overrides(o);
+ }
+
+ TestConnectionDetails.Apply(o);
+ });
+ transport.TransportTransactionMode = transport.GetSupportedTransactionModes().Contains(preferredTransactionMode)
+ ? preferredTransactionMode
+ : TransportTransactionMode.ReceiveOnly;
+
+ return transport;
+ }
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Transports.IBMMQ/QueueLengthProvider.cs b/src/ServiceControl.Transports.IBMMQ/QueueLengthProvider.cs
new file mode 100644
index 0000000000..7dcf53bcc2
--- /dev/null
+++ b/src/ServiceControl.Transports.IBMMQ/QueueLengthProvider.cs
@@ -0,0 +1,161 @@
+#nullable enable
+namespace ServiceControl.Transports.IBMMQ;
+
+using System;
+using System.Collections;
+using System.Collections.Concurrent;
+using System.Threading;
+using System.Threading.Tasks;
+using IBM.WMQ;
+using Microsoft.Extensions.Logging;
+
+class QueueLengthProvider : AbstractQueueLengthProvider
+{
+ public QueueLengthProvider(TransportSettings settings, Action store, ILogger logger)
+ : base(settings, store)
+ {
+ this.logger = logger;
+ (queueManagerName, connectionProperties) = ConnectionProperties.Parse(ConnectionString);
+ }
+
+ public override void TrackEndpointInputQueue(EndpointToQueueMapping queueToTrack) =>
+ endpointQueues.AddOrUpdate(queueToTrack.EndpointName, _ => queueToTrack.InputQueue, (_, currentValue) =>
+ {
+ if (currentValue != queueToTrack.InputQueue)
+ {
+ sizes.TryRemove(currentValue, out var _);
+ }
+
+ return queueToTrack.InputQueue;
+ });
+
+ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
+ {
+ while (!stoppingToken.IsCancellationRequested)
+ {
+ try
+ {
+ FetchQueueLengths();
+
+ UpdateQueueLengthStore();
+
+ await Task.Delay(QueryDelayInterval, stoppingToken).ConfigureAwait(false);
+ }
+ catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
+ {
+ // no-op
+ }
+ catch (Exception e)
+ {
+ logger.LogError(e, "Queue length query loop failure");
+ CloseConnection();
+ await Task.Delay(ReconnectDelayInterval, stoppingToken).ConfigureAwait(false);
+ }
+ }
+
+ CloseConnection();
+ }
+
+ void UpdateQueueLengthStore()
+ {
+ var nowTicks = DateTime.UtcNow.Ticks;
+
+ foreach (var endpointQueuePair in endpointQueues)
+ {
+ if (sizes.TryGetValue(endpointQueuePair.Value, out var size))
+ {
+ Store(
+ [
+ new QueueLengthEntry
+ {
+ DateTicks = nowTicks,
+ Value = size
+ }
+ ],
+ new EndpointToQueueMapping(endpointQueuePair.Key, endpointQueuePair.Value));
+ }
+ }
+ }
+
+ void FetchQueueLengths()
+ {
+ if (endpointQueues.IsEmpty)
+ {
+ return;
+ }
+
+ var manager = EnsureConnected();
+
+ foreach (var endpointQueuePair in endpointQueues)
+ {
+ var queueName = endpointQueuePair.Value;
+ try
+ {
+ using var queue = manager.AccessQueue(queueName, MQC.MQOO_INQUIRE | MQC.MQOO_FAIL_IF_QUIESCING);
+ sizes[queueName] = queue.CurrentDepth;
+ }
+ catch (MQException e) when (IsConnectionError(e))
+ {
+ logger.LogWarning(e, "Lost connection to queue manager while querying {QueueName}", queueName);
+ CloseConnection();
+ throw;
+ }
+ catch (Exception e)
+ {
+ logger.LogWarning(e, "Error querying queue length for {QueueName}", queueName);
+ }
+ }
+ }
+
+ MQQueueManager EnsureConnected()
+ {
+ if (queueManager is not null)
+ {
+ return queueManager;
+ }
+
+ queueManager = new MQQueueManager(queueManagerName, connectionProperties);
+ logger.LogInformation("Connected to queue manager '{QueueManagerName}'", queueManagerName);
+ return queueManager;
+ }
+
+ void CloseConnection()
+ {
+ if (queueManager is null)
+ {
+ return;
+ }
+
+ try
+ {
+ queueManager.Disconnect();
+ }
+ catch (Exception e)
+ {
+ logger.LogDebug(e, "Error disconnecting from queue manager");
+ }
+
+ queueManager = null;
+ }
+
+ static bool IsConnectionError(MQException e) => e.ReasonCode
+ is MQC.MQRC_CONNECTION_BROKEN
+ or MQC.MQRC_CONNECTION_ERROR
+ or MQC.MQRC_CONNECTION_STOPPED
+ or MQC.MQRC_CONNECTION_QUIESCING
+ or MQC.MQRC_CONNECTION_NOT_AVAILABLE
+ or MQC.MQRC_Q_MGR_NOT_AVAILABLE
+ or MQC.MQRC_Q_MGR_NOT_ACTIVE;
+
+ static readonly TimeSpan QueryDelayInterval = TimeSpan.FromMilliseconds(200);
+ static readonly TimeSpan ReconnectDelayInterval = TimeSpan.FromSeconds(10);
+
+ MQQueueManager? queueManager;
+
+ readonly ConcurrentDictionary endpointQueues = new();
+ readonly ConcurrentDictionary sizes = new();
+
+ readonly string queueManagerName;
+ readonly Hashtable connectionProperties;
+ readonly ILogger logger;
+}
diff --git a/src/ServiceControl.Transports.IBMMQ/ServiceControl.Transports.IBMMQ.csproj b/src/ServiceControl.Transports.IBMMQ/ServiceControl.Transports.IBMMQ.csproj
new file mode 100644
index 0000000000..2b2d9a9296
--- /dev/null
+++ b/src/ServiceControl.Transports.IBMMQ/ServiceControl.Transports.IBMMQ.csproj
@@ -0,0 +1,26 @@
+
+
+
+ net10.0
+ true
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/ServiceControl.Transports.IBMMQ/TestConnectionDetails.cs b/src/ServiceControl.Transports.IBMMQ/TestConnectionDetails.cs
new file mode 100644
index 0000000000..573ac0e28d
--- /dev/null
+++ b/src/ServiceControl.Transports.IBMMQ/TestConnectionDetails.cs
@@ -0,0 +1,59 @@
+#nullable enable
+using System;
+using System.Collections.Specialized;
+using System.Web;
+using NServiceBus.Transport.IBMMQ;
+
+///
+/// Copied directly from:
+///
+/// https://github.com/ParticularLabs/NServiceBus.IBMMQ/blob/main/src/Testing/TestConnectionDetails.cs
+///
+///
+static class TestConnectionDetails
+{
+ // mq://admin:passw0rd@localhost:1414/QM1?appname=&sslkeyrepo=&cipherspec=&sslpeername=&topicprefix=DEV&channel=DEV.ADMIN.SVRCONN
+ static readonly Uri ConnectionUri = new(
+ Environment.GetEnvironmentVariable("ServiceControl_TransportTests_IBMMQ_ConnectionString")
+ ?? Environment.GetEnvironmentVariable("SERVICECONTROL_TRANSPORTTESTS_IBMMQ_CONNECTIONSTRING")
+ ?? "mq://admin:passw0rd@localhost:1414"
+ );
+
+ static readonly NameValueCollection Query = HttpUtility.ParseQueryString(ConnectionUri.Query);
+
+ public static string Host => ConnectionUri.Host;
+ public static int Port => ConnectionUri.Port > 0 ? ConnectionUri.Port : 1414;
+ public static string User => Uri.UnescapeDataString(ConnectionUri.UserInfo.Split(':')[0]);
+ public static string Password => Uri.UnescapeDataString(ConnectionUri.UserInfo.Split(':')[1]);
+ public static string QueueManagerName => ConnectionUri.AbsolutePath.Trim('/') is { Length: > 0 } path ? Uri.UnescapeDataString(path) : "QM1";
+ public static string Channel => Query["channel"] ?? "DEV.ADMIN.SVRCONN";
+ public static string TopicPrefix => Query["topicprefix"] ?? "DEV";
+
+
+ public static void Apply(IBMMQTransportOptions options)
+ {
+ options.Host = Host;
+ options.Port = Port;
+ options.User = User;
+ options.Password = Password;
+ options.QueueManagerName = QueueManagerName;
+ options.Channel = Channel;
+
+ if (Query["appname"] is { } appName)
+ {
+ options.ApplicationName = appName;
+ }
+ if (Query["sslkeyrepo"] is { } sslKeyRepo)
+ {
+ options.SslKeyRepository = sslKeyRepo;
+ }
+ if (Query["cipherspec"] is { } cipherSpec)
+ {
+ options.CipherSpec = cipherSpec;
+ }
+ if (Query["sslpeername"] is { } sslPeerName)
+ {
+ options.SslPeerName = sslPeerName;
+ }
+ }
+}
diff --git a/src/ServiceControl.Transports.IBMMQ/transport.manifest b/src/ServiceControl.Transports.IBMMQ/transport.manifest
new file mode 100644
index 0000000000..0cb1b0f720
--- /dev/null
+++ b/src/ServiceControl.Transports.IBMMQ/transport.manifest
@@ -0,0 +1,14 @@
+{
+ "Definitions": [
+ {
+ "Name": "IBMMQ",
+ "DisplayName": "IBM MQ",
+ "AssemblyName": "ServiceControl.Transports.IBMMQ",
+ "TypeName": "ServiceControl.Transports.IBMMQ.IBMMQTransportCustomization, ServiceControl.Transports.IBMMQ",
+ "SampleConnectionString": "mq://admin:passw0rd@localhost:1414/QM1?topicprefix=DEV&channel=DEV.ADMIN.SVRCONN&appname={APPNAME}&sslkeyrepo={SSLKEYREPO}&cipherspec={CIPHERSPEC}&sslpeername={SSLPEERNAME}",
+ "AvailableInSCMU": true,
+ "Aliases": [
+ ]
+ }
+ ]
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Transports.Tests/ApprovalFiles/TransportManifestLibraryTests.All_types_defined_in_manifest_files_exist_in_specified_assembly.approved.txt b/src/ServiceControl.Transports.Tests/ApprovalFiles/TransportManifestLibraryTests.All_types_defined_in_manifest_files_exist_in_specified_assembly.approved.txt
index 6d201c38cc..397809505d 100644
--- a/src/ServiceControl.Transports.Tests/ApprovalFiles/TransportManifestLibraryTests.All_types_defined_in_manifest_files_exist_in_specified_assembly.approved.txt
+++ b/src/ServiceControl.Transports.Tests/ApprovalFiles/TransportManifestLibraryTests.All_types_defined_in_manifest_files_exist_in_specified_assembly.approved.txt
@@ -1,6 +1,7 @@
[
"AmazonSQS",
"AzureStorageQueue",
+ "IBMMQ",
"LearningTransport",
"LearningTransport",
"MSMQ",
diff --git a/src/ServiceControl.Transports.Tests/TransportTestFixture.cs b/src/ServiceControl.Transports.Tests/TransportTestFixture.cs
index de88f52cac..7c36817a98 100644
--- a/src/ServiceControl.Transports.Tests/TransportTestFixture.cs
+++ b/src/ServiceControl.Transports.Tests/TransportTestFixture.cs
@@ -21,6 +21,11 @@
abstract class TransportTestFixture
{
+ ///
+ /// Not all transports support - as a separator, as this is not a source package this can be overridden.
+ ///
+ public static char QueueNameSeparator = '-';
+
[SetUp]
public virtual async Task Setup()
{
@@ -30,7 +35,7 @@ public virtual async Task Setup()
configuration = new TransportTestsConfiguration();
testCancellationTokenSource = Debugger.IsAttached ? new CancellationTokenSource() : new CancellationTokenSource(TestTimeout);
registrations = [];
- queueSuffix = $"-{Path.GetRandomFileName().Replace(".", string.Empty)}";
+ queueSuffix = $"{QueueNameSeparator}{Path.GetRandomFileName().Replace(".", string.Empty)}";
await configuration.Configure();
@@ -70,7 +75,7 @@ public virtual async Task Cleanup()
protected IMessageDispatcher Dispatcher => dispatcherTransportInfrastructure.Dispatcher;
- protected string GetTestQueueName(string name) => $"{name}-{queueSuffix}";
+ protected string GetTestQueueName(string name) => $"{name}{queueSuffix}";
protected TaskCompletionSource CreateTaskCompletionSource()
{
diff --git a/src/ServiceControl.Transports/DevelopmentTransportLocations.cs b/src/ServiceControl.Transports/DevelopmentTransportLocations.cs
index b2b81e72b6..95e7e8c021 100644
--- a/src/ServiceControl.Transports/DevelopmentTransportLocations.cs
+++ b/src/ServiceControl.Transports/DevelopmentTransportLocations.cs
@@ -19,6 +19,7 @@ static DevelopmentTransportLocations()
{
ManifestFiles.Add(BuildManifestPath(srcFolder, "ServiceControl.Transports.ASBS"));
ManifestFiles.Add(BuildManifestPath(srcFolder, "ServiceControl.Transports.ASQ"));
+ ManifestFiles.Add(BuildManifestPath(srcFolder, "ServiceControl.Transports.IBMMQ"));
ManifestFiles.Add(BuildManifestPath(srcFolder, "ServiceControl.Transports.Learning"));
ManifestFiles.Add(BuildManifestPath(srcFolder, "ServiceControl.Transports.Msmq", "net10.0-windows"));
ManifestFiles.Add(BuildManifestPath(srcFolder, "ServiceControl.Transports.RabbitMQ"));
diff --git a/src/ServiceControl.slnx b/src/ServiceControl.slnx
index 2ff353b79a..fc2f5dd384 100644
--- a/src/ServiceControl.slnx
+++ b/src/ServiceControl.slnx
@@ -13,6 +13,7 @@
+
@@ -78,6 +79,7 @@
+
@@ -89,6 +91,7 @@
+
diff --git a/src/ServiceControlInstaller.Engine.UnitTests/ApprovalFiles/APIApprovals.TransportNames.approved.txt b/src/ServiceControlInstaller.Engine.UnitTests/ApprovalFiles/APIApprovals.TransportNames.approved.txt
index bb2e392a4e..58499293d7 100644
--- a/src/ServiceControlInstaller.Engine.UnitTests/ApprovalFiles/APIApprovals.TransportNames.approved.txt
+++ b/src/ServiceControlInstaller.Engine.UnitTests/ApprovalFiles/APIApprovals.TransportNames.approved.txt
@@ -35,6 +35,11 @@
"ServiceControl.Transports.AzureStorageQueues.ServiceControlAzureStorageQueueTransport, ServiceControl.Transports.AzureStorageQueues"
]
},
+ {
+ "Name": "IBMMQ",
+ "DisplayName": "IBM MQ",
+ "Aliases": []
+ },
{
"Name": "LearningTransport",
"DisplayName": "Learning Transport (Non-Production)",
diff --git a/src/ServiceControlInstaller.Packaging.UnitTests/DeploymentPackageTests.cs b/src/ServiceControlInstaller.Packaging.UnitTests/DeploymentPackageTests.cs
index 2a9390c7a3..b4fb53fd8e 100644
--- a/src/ServiceControlInstaller.Packaging.UnitTests/DeploymentPackageTests.cs
+++ b/src/ServiceControlInstaller.Packaging.UnitTests/DeploymentPackageTests.cs
@@ -88,7 +88,8 @@ public void Should_package_all_transports()
"MSMQ",
"AmazonSQS",
"LearningTransport",
- "PostgreSql"};
+ "PostgreSql",
+ "IBMMQ"};
var bundledTransports = deploymentPackage.DeploymentUnits
.Where(u => u.Category == "Transports")
diff --git a/src/TestHelper/IncludeInIBMMQTestsAttribute.cs b/src/TestHelper/IncludeInIBMMQTestsAttribute.cs
new file mode 100644
index 0000000000..2bb18aeb93
--- /dev/null
+++ b/src/TestHelper/IncludeInIBMMQTestsAttribute.cs
@@ -0,0 +1,4 @@
+public class IncludeInIBMMQTestsAttribute : IncludeInTestsAttribute
+{
+ protected override string Filter => "IBMMQ";
+}