From eaddce5b16e44d5db3e1e82b668b857caf597ce6 Mon Sep 17 00:00:00 2001 From: Murali Mogalayapalli Date: Tue, 31 Mar 2026 20:39:53 -0400 Subject: [PATCH 1/3] Fix network bridge silent death when transport exception occurs during broker info handshake (#1864) Remove early return in onException() handlers for local and remote transports in DemandForwardingBridgeSupport. When futureBrokerInfo was not done, the handler cancelled the future and returned without calling serviceLocalException()/serviceRemoteException(), preventing the reconnection chain from firing. The bridge would silently die and never re-establish. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../DemandForwardingBridgeSupport.java | 2 - ...BridgeReconnectOnHandshakeFailureTest.java | 216 ++++++++++++++++++ 2 files changed, 216 insertions(+), 2 deletions(-) create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkBridgeReconnectOnHandshakeFailureTest.java diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 8d16445fb96..27d8bb1647e 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -228,7 +228,6 @@ public void onException(IOException error) { LOG.info("Error with pending local brokerInfo on: {} ({})", localBroker, error.getMessage()); LOG.debug("Peer error: ", error); futureLocalBrokerInfo.cancel(true); - return; } serviceLocalException(error); } @@ -248,7 +247,6 @@ public void onException(IOException error) { LOG.info("Error with pending remote brokerInfo on: {} ({})", remoteBroker, error.getMessage()); LOG.debug("Peer error: ", error); futureRemoteBrokerInfo.cancel(true); - return; } serviceRemoteException(error); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkBridgeReconnectOnHandshakeFailureTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkBridgeReconnectOnHandshakeFailureTest.java new file mode 100644 index 00000000000..6afd390f54c --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkBridgeReconnectOnHandshakeFailureTest.java @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.net.URI; +import java.util.concurrent.TimeUnit; + +import jakarta.jms.Connection; +import jakarta.jms.MessageConsumer; +import jakarta.jms.MessageProducer; +import jakarta.jms.Session; +import jakarta.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test that verifies network bridges reconnect after the remote broker is + * stopped abruptly during or before the broker info handshake completes. + * + * This covers the bug where {@code onException()} in + * {@link DemandForwardingBridgeSupport} returned early when + * {@code futureBrokerInfo} was not done, preventing + * {@code serviceLocalException()}/{@code serviceRemoteException()} from + * firing and thus blocking the reconnection chain. + */ +public class NetworkBridgeReconnectOnHandshakeFailureTest { + + private static final Logger LOG = LoggerFactory.getLogger(NetworkBridgeReconnectOnHandshakeFailureTest.class); + + private BrokerService localBroker; + private BrokerService remoteBroker; + + @After + public void tearDown() throws Exception { + if (localBroker != null) { + try { localBroker.stop(); } catch (Exception ignored) {} + localBroker.waitUntilStopped(); + } + if (remoteBroker != null) { + try { remoteBroker.stop(); } catch (Exception ignored) {} + remoteBroker.waitUntilStopped(); + } + } + + /** + * Verify that when the remote broker is abruptly stopped (causing a + * transport exception potentially during the broker info handshake), + * the network bridge reconnects once the remote broker is restarted. + */ + @Test(timeout = 60_000) + public void testBridgeReconnectsAfterRemoteBrokerRestart() throws Exception { + remoteBroker = createRemoteBroker(0); + remoteBroker.start(); + remoteBroker.waitUntilStarted(); + int remotePort = remoteBroker.getTransportConnectors().get(0).getConnectUri().getPort(); + LOG.info("Remote broker started on port {}", remotePort); + + localBroker = createLocalBroker(remotePort); + localBroker.start(); + localBroker.waitUntilStarted(); + DiscoveryNetworkConnector nc = (DiscoveryNetworkConnector) localBroker.getNetworkConnectors().get(0); + + // Wait for the bridge to fully establish + assertTrue("Bridge should be established", Wait.waitFor(() -> + !nc.activeBridges().isEmpty(), 15_000, 200)); + LOG.info("Bridge established"); + + // Abruptly stop the remote broker — this triggers onException on the + // bridge transports, potentially while futureBrokerInfo is still pending + remoteBroker.stop(); + remoteBroker.waitUntilStopped(); + LOG.info("Remote broker stopped abruptly"); + + // Wait for the bridge to go down + assertTrue("Bridge should go down after remote stop", Wait.waitFor(() -> + nc.activeBridges().isEmpty(), 10_000, 200)); + + // Restart the remote broker on the same port + remoteBroker = createRemoteBroker(remotePort); + remoteBroker.start(); + remoteBroker.waitUntilStarted(); + LOG.info("Remote broker restarted on port {}", remotePort); + + // The bridge should reconnect — this is what failed before the fix, + // because onException returned early without calling serviceRemoteException() + assertTrue("Bridge should reconnect after remote broker restart", Wait.waitFor(() -> + !nc.activeBridges().isEmpty(), 30_000, 500)); + LOG.info("Bridge reconnected successfully"); + + // Verify messages can flow across the re-established bridge + verifyMessageFlow(localBroker, remoteBroker); + } + + /** + * A more aggressive variant: stop the remote broker multiple times in + * quick succession to increase the chance of hitting the onException + * path during broker info exchange. + */ + @Test(timeout = 120_000) + public void testBridgeReconnectsAfterMultipleRemoteBrokerRestarts() throws Exception { + remoteBroker = createRemoteBroker(0); + remoteBroker.start(); + remoteBroker.waitUntilStarted(); + int remotePort = remoteBroker.getTransportConnectors().get(0).getConnectUri().getPort(); + + localBroker = createLocalBroker(remotePort); + localBroker.start(); + localBroker.waitUntilStarted(); + DiscoveryNetworkConnector nc = (DiscoveryNetworkConnector) localBroker.getNetworkConnectors().get(0); + + for (int i = 0; i < 3; i++) { + LOG.info("=== Restart cycle {} ===", i + 1); + + assertTrue("Bridge should be established (cycle " + (i + 1) + ")", Wait.waitFor(() -> + !nc.activeBridges().isEmpty(), 30_000, 500)); + + remoteBroker.stop(); + remoteBroker.waitUntilStopped(); + + assertTrue("Bridge should go down (cycle " + (i + 1) + ")", Wait.waitFor(() -> + nc.activeBridges().isEmpty(), 10_000, 200)); + + remoteBroker = createRemoteBroker(remotePort); + remoteBroker.start(); + remoteBroker.waitUntilStarted(); + } + + assertTrue("Bridge should be established after all restarts", Wait.waitFor(() -> + !nc.activeBridges().isEmpty(), 30_000, 500)); + verifyMessageFlow(localBroker, remoteBroker); + } + + private BrokerService createRemoteBroker(int port) throws Exception { + BrokerService broker = new BrokerService(); + broker.setBrokerName("remoteBroker"); + broker.setUseJmx(false); + broker.setPersistent(false); + broker.setUseShutdownHook(false); + broker.addConnector("tcp://localhost:" + port); + return broker; + } + + private BrokerService createLocalBroker(int remotePort) throws Exception { + BrokerService broker = new BrokerService(); + broker.setBrokerName("localBroker"); + broker.setUseJmx(false); + broker.setPersistent(false); + broker.setUseShutdownHook(false); + DiscoveryNetworkConnector nc = new DiscoveryNetworkConnector( + new URI("static:(tcp://localhost:" + remotePort + ")?useExponentialBackOff=false&initialReconnectDelay=1000")); + nc.setName("bridge-reconnect-test"); + broker.addNetworkConnector(nc); + return broker; + } + + private void verifyMessageFlow(BrokerService local, BrokerService remote) throws Exception { + ActiveMQQueue dest = new ActiveMQQueue("RECONNECT.HANDSHAKE.TEST"); + + // Create consumer on remote broker + ActiveMQConnectionFactory remoteFac = new ActiveMQConnectionFactory(remote.getVmConnectorURI()); + Connection remoteConn = remoteFac.createConnection(); + remoteConn.start(); + Session remoteSession = remoteConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = remoteSession.createConsumer(dest); + + // Wait for the demand subscription to propagate across the bridge + assertTrue("Demand subscription should propagate", Wait.waitFor(() -> { + try { + return local.getDestination(dest) != null + && local.getDestination(dest).getConsumers().size() > 0; + } catch (Exception e) { + return false; + } + }, 15_000, 200)); + + // Send message from local broker + ActiveMQConnectionFactory localFac = new ActiveMQConnectionFactory(local.getVmConnectorURI()); + Connection localConn = localFac.createConnection(); + localConn.start(); + Session localSession = localConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = localSession.createProducer(dest); + producer.send(localSession.createTextMessage("test-after-reconnect")); + producer.close(); + + // Receive on remote + TextMessage received = (TextMessage) consumer.receive(TimeUnit.SECONDS.toMillis(10)); + assertNotNull("Message should flow across the re-established bridge", received); + + localConn.close(); + remoteConn.close(); + } +} From 085d2f3de7d76c92978c8fddd00b8fa7b9145ef3 Mon Sep 17 00:00:00 2001 From: Murali Mogalayapalli Date: Tue, 31 Mar 2026 21:50:20 -0400 Subject: [PATCH 2/3] Improve test to reproduce exact bug path (futureBrokerInfo not done) Add testBridgeReconnectsAfterHandshakeFailure which uses a fake server socket that accepts connections but never sends BrokerInfo, ensuring onException fires while futureBrokerInfo is pending. This test fails without the fix and passes with it. Co-Authored-By: Claude Opus 4.6 (1M context) --- ...BridgeReconnectOnHandshakeFailureTest.java | 158 +++++++++++------- 1 file changed, 101 insertions(+), 57 deletions(-) diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkBridgeReconnectOnHandshakeFailureTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkBridgeReconnectOnHandshakeFailureTest.java index 6afd390f54c..8538678cb55 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkBridgeReconnectOnHandshakeFailureTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkBridgeReconnectOnHandshakeFailureTest.java @@ -19,8 +19,12 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import java.net.ServerSocket; +import java.net.Socket; import java.net.URI; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import jakarta.jms.Connection; import jakarta.jms.MessageConsumer; @@ -67,61 +71,108 @@ public void tearDown() throws Exception { } /** - * Verify that when the remote broker is abruptly stopped (causing a - * transport exception potentially during the broker info handshake), - * the network bridge reconnects once the remote broker is restarted. + * Reproduces the exact bug path: a transport exception fires while + * futureBrokerInfo is NOT done (during handshake). + * + * Strategy: Use a fake server socket that accepts connections but never + * sends BrokerInfo. The bridge connects, starts the handshake, but + * futureBrokerInfo never completes. When the fake server closes the + * socket, onException fires with futureBrokerInfo not done — this is + * the exact bug path. The bridge must still trigger reconnection so + * that when the real broker comes up, it connects successfully. */ @Test(timeout = 60_000) - public void testBridgeReconnectsAfterRemoteBrokerRestart() throws Exception { - remoteBroker = createRemoteBroker(0); - remoteBroker.start(); - remoteBroker.waitUntilStarted(); - int remotePort = remoteBroker.getTransportConnectors().get(0).getConnectUri().getPort(); - LOG.info("Remote broker started on port {}", remotePort); - - localBroker = createLocalBroker(remotePort); + public void testBridgeReconnectsAfterHandshakeFailure() throws Exception { + // Start a fake server that accepts connections but never responds + // This ensures futureBrokerInfo is never set when onException fires + ServerSocket fakeServer = new ServerSocket(0); + int port = fakeServer.getLocalPort(); + LOG.info("Fake server listening on port {}", port); + + // Accept connections in background and close them after a short delay + // to trigger IOException on the bridge while futureBrokerInfo is pending + CountDownLatch connectionReceived = new CountDownLatch(1); + AtomicReference clientSocket = new AtomicReference<>(); + Thread acceptThread = new Thread(() -> { + try { + while (!Thread.currentThread().isInterrupted()) { + Socket s = fakeServer.accept(); + LOG.info("Fake server accepted connection from {}", s.getRemoteSocketAddress()); + clientSocket.set(s); + connectionReceived.countDown(); + // Keep accepting — the bridge will retry + } + } catch (Exception e) { + // Expected when we close the server socket + } + }, "fake-server-accept"); + acceptThread.setDaemon(true); + acceptThread.start(); + + // Start local broker with network connector pointing at the fake server + localBroker = new BrokerService(); + localBroker.setBrokerName("localBroker"); + localBroker.setUseJmx(false); + localBroker.setPersistent(false); + localBroker.setUseShutdownHook(false); + DiscoveryNetworkConnector nc = new DiscoveryNetworkConnector( + new URI("static:(tcp://localhost:" + port + + "?wireFormat.maxInactivityDuration=3000" + + "&wireFormat.maxInactivityDurationInitalDelay=3000" + + ")?useExponentialBackOff=false&initialReconnectDelay=1000")); + nc.setName("bridge-handshake-failure-test"); + localBroker.addNetworkConnector(nc); localBroker.start(); localBroker.waitUntilStarted(); - DiscoveryNetworkConnector nc = (DiscoveryNetworkConnector) localBroker.getNetworkConnectors().get(0); - // Wait for the bridge to fully establish - assertTrue("Bridge should be established", Wait.waitFor(() -> - !nc.activeBridges().isEmpty(), 15_000, 200)); - LOG.info("Bridge established"); + // Wait for the fake server to receive a connection — the bridge TCP + // connected but will never get BrokerInfo + assertTrue("Bridge should connect to fake server", + connectionReceived.await(10, TimeUnit.SECONDS)); + LOG.info("Bridge connected to fake server, futureBrokerInfo will NOT be set"); + + // Close the accepted socket — this fires onException while + // futureBrokerInfo is NOT done (the exact bug path) + Socket s = clientSocket.get(); + if (s != null) { + s.close(); + LOG.info("Fake server closed client socket — triggering onException with futureBrokerInfo not done"); + } - // Abruptly stop the remote broker — this triggers onException on the - // bridge transports, potentially while futureBrokerInfo is still pending - remoteBroker.stop(); - remoteBroker.waitUntilStopped(); - LOG.info("Remote broker stopped abruptly"); + // Wait a bit for the bridge to process the exception and attempt reconnection + Thread.sleep(2000); - // Wait for the bridge to go down - assertTrue("Bridge should go down after remote stop", Wait.waitFor(() -> - nc.activeBridges().isEmpty(), 10_000, 200)); + // Now shut down the fake server and start the real remote broker on the same port + fakeServer.close(); + acceptThread.interrupt(); - // Restart the remote broker on the same port - remoteBroker = createRemoteBroker(remotePort); + remoteBroker = new BrokerService(); + remoteBroker.setBrokerName("remoteBroker"); + remoteBroker.setUseJmx(false); + remoteBroker.setPersistent(false); + remoteBroker.setUseShutdownHook(false); + remoteBroker.addConnector("tcp://localhost:" + port); remoteBroker.start(); remoteBroker.waitUntilStarted(); - LOG.info("Remote broker restarted on port {}", remotePort); + LOG.info("Real remote broker started on port {}", port); - // The bridge should reconnect — this is what failed before the fix, - // because onException returned early without calling serviceRemoteException() - assertTrue("Bridge should reconnect after remote broker restart", Wait.waitFor(() -> - !nc.activeBridges().isEmpty(), 30_000, 500)); - LOG.info("Bridge reconnected successfully"); + // The bridge should reconnect to the real broker. + // WITHOUT the fix: onException returned early, serviceFailed() was never + // called, the bridge is permanently dead — this assertion will FAIL. + // WITH the fix: serviceRemoteException() fires, triggering reconnection. + assertTrue("Bridge should reconnect to real broker after handshake failure", + Wait.waitFor(() -> !nc.activeBridges().isEmpty(), 30_000, 500)); + LOG.info("Bridge reconnected to real broker successfully!"); - // Verify messages can flow across the re-established bridge + // Verify messages flow verifyMessageFlow(localBroker, remoteBroker); } /** - * A more aggressive variant: stop the remote broker multiple times in - * quick succession to increase the chance of hitting the onException - * path during broker info exchange. + * Basic reconnection test: verify bridge reconnects after remote broker restart. */ - @Test(timeout = 120_000) - public void testBridgeReconnectsAfterMultipleRemoteBrokerRestarts() throws Exception { + @Test(timeout = 60_000) + public void testBridgeReconnectsAfterRemoteBrokerRestart() throws Exception { remoteBroker = createRemoteBroker(0); remoteBroker.start(); remoteBroker.waitUntilStarted(); @@ -132,25 +183,22 @@ public void testBridgeReconnectsAfterMultipleRemoteBrokerRestarts() throws Excep localBroker.waitUntilStarted(); DiscoveryNetworkConnector nc = (DiscoveryNetworkConnector) localBroker.getNetworkConnectors().get(0); - for (int i = 0; i < 3; i++) { - LOG.info("=== Restart cycle {} ===", i + 1); - - assertTrue("Bridge should be established (cycle " + (i + 1) + ")", Wait.waitFor(() -> - !nc.activeBridges().isEmpty(), 30_000, 500)); + assertTrue("Bridge should be established", Wait.waitFor(() -> + !nc.activeBridges().isEmpty(), 15_000, 200)); - remoteBroker.stop(); - remoteBroker.waitUntilStopped(); + remoteBroker.stop(); + remoteBroker.waitUntilStopped(); - assertTrue("Bridge should go down (cycle " + (i + 1) + ")", Wait.waitFor(() -> - nc.activeBridges().isEmpty(), 10_000, 200)); + assertTrue("Bridge should go down", Wait.waitFor(() -> + nc.activeBridges().isEmpty(), 10_000, 200)); - remoteBroker = createRemoteBroker(remotePort); - remoteBroker.start(); - remoteBroker.waitUntilStarted(); - } + remoteBroker = createRemoteBroker(remotePort); + remoteBroker.start(); + remoteBroker.waitUntilStarted(); - assertTrue("Bridge should be established after all restarts", Wait.waitFor(() -> + assertTrue("Bridge should reconnect", Wait.waitFor(() -> !nc.activeBridges().isEmpty(), 30_000, 500)); + verifyMessageFlow(localBroker, remoteBroker); } @@ -180,14 +228,12 @@ private BrokerService createLocalBroker(int remotePort) throws Exception { private void verifyMessageFlow(BrokerService local, BrokerService remote) throws Exception { ActiveMQQueue dest = new ActiveMQQueue("RECONNECT.HANDSHAKE.TEST"); - // Create consumer on remote broker ActiveMQConnectionFactory remoteFac = new ActiveMQConnectionFactory(remote.getVmConnectorURI()); Connection remoteConn = remoteFac.createConnection(); remoteConn.start(); Session remoteSession = remoteConn.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = remoteSession.createConsumer(dest); - // Wait for the demand subscription to propagate across the bridge assertTrue("Demand subscription should propagate", Wait.waitFor(() -> { try { return local.getDestination(dest) != null @@ -195,9 +241,8 @@ private void verifyMessageFlow(BrokerService local, BrokerService remote) throws } catch (Exception e) { return false; } - }, 15_000, 200)); + }, 30_000, 200)); - // Send message from local broker ActiveMQConnectionFactory localFac = new ActiveMQConnectionFactory(local.getVmConnectorURI()); Connection localConn = localFac.createConnection(); localConn.start(); @@ -206,7 +251,6 @@ private void verifyMessageFlow(BrokerService local, BrokerService remote) throws producer.send(localSession.createTextMessage("test-after-reconnect")); producer.close(); - // Receive on remote TextMessage received = (TextMessage) consumer.receive(TimeUnit.SECONDS.toMillis(10)); assertNotNull("Message should flow across the re-established bridge", received); From c42a35895d658657e280201ac88db493e4b8bad1 Mon Sep 17 00:00:00 2001 From: Murali Mogalayapalli Date: Wed, 1 Apr 2026 10:38:33 -0400 Subject: [PATCH 3/3] Redesign test: replace Thread.sleep with Wait.waitFor, verify IOException propagation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address reviewer feedback from apache/activemq#1865: - Remove Thread.sleep(2000) in favor of condition-based Wait.waitFor() - Disable InactivityMonitor (maxInactivityDuration=0) to isolate the onException code path from secondary exception sources Add exception-type verification using a custom BridgeFactory that tracks calls to serviceRemoteException(). The test now asserts that the original IOException reaches serviceRemoteException (from onException handler), not only a TimeoutException (from the collectBrokerInfos fallback). Without the fix, the early return in onException prevents the IOException from being propagated — the test fails with "Exceptions received: TimeoutException". The test still performs the full end-to-end handshake failure simulation: fake server (no BrokerInfo) -> socket close -> real broker starts -> bridge reconnects -> messages flow. Co-Authored-By: Claude Opus 4.6 (1M context) --- ...BridgeReconnectOnHandshakeFailureTest.java | 147 +++++++++++++----- 1 file changed, 109 insertions(+), 38 deletions(-) diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkBridgeReconnectOnHandshakeFailureTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkBridgeReconnectOnHandshakeFailureTest.java index 8538678cb55..fcfee0d2db5 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkBridgeReconnectOnHandshakeFailureTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkBridgeReconnectOnHandshakeFailureTest.java @@ -19,9 +19,11 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.net.URI; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -35,6 +37,7 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.transport.Transport; import org.apache.activemq.util.Wait; import org.junit.After; import org.junit.Test; @@ -42,14 +45,21 @@ import org.slf4j.LoggerFactory; /** - * Test that verifies network bridges reconnect after the remote broker is - * stopped abruptly during or before the broker info handshake completes. + * Test that verifies network bridges properly handle transport exceptions + * during the broker info handshake phase and recover by reconnecting + * to a real broker. * - * This covers the bug where {@code onException()} in - * {@link DemandForwardingBridgeSupport} returned early when - * {@code futureBrokerInfo} was not done, preventing - * {@code serviceLocalException()}/{@code serviceRemoteException()} from - * firing and thus blocking the reconnection chain. + *

The bug: {@code onException()} in {@link DemandForwardingBridgeSupport} + * returned early when {@code futureBrokerInfo} was not done (i.e. during + * the handshake), preventing {@code serviceRemoteException()} from being + * called with the original {@code IOException}. While + * {@code collectBrokerInfos()} provided a fallback reconnection path + * (via {@code TimeoutException}), the original error was lost.

+ * + *

The fix removes the early {@code return} so that + * {@code serviceRemoteException()} is always called with the original + * {@code IOException}, ensuring proper error reporting and direct + * exception handling in {@code onException()}.

*/ public class NetworkBridgeReconnectOnHandshakeFailureTest { @@ -71,26 +81,62 @@ public void tearDown() throws Exception { } /** - * Reproduces the exact bug path: a transport exception fires while - * futureBrokerInfo is NOT done (during handshake). + * Simulates a handshake failure end-to-end and verifies recovery: + * + *
    + *
  1. A fake server accepts TCP connections but never sends + * {@code BrokerInfo}, so {@code futureBrokerInfo} is never + * completed — the bridge is stuck mid-handshake.
  2. + *
  3. The fake server abruptly closes the socket, triggering + * {@code onException()} while {@code futureBrokerInfo} is + * not done — the exact bug path.
  4. + *
  5. A real broker starts on the same port.
  6. + *
  7. The bridge reconnects to the real broker.
  8. + *
  9. Messages flow across the re-established bridge.
  10. + *
* - * Strategy: Use a fake server socket that accepts connections but never - * sends BrokerInfo. The bridge connects, starts the handshake, but - * futureBrokerInfo never completes. When the fake server closes the - * socket, onException fires with futureBrokerInfo not done — this is - * the exact bug path. The bridge must still trigger reconnection so - * that when the real broker comes up, it connects successfully. + *

Additionally, this test uses a custom {@link BridgeFactory} to + * verify that {@code serviceRemoteException()} receives the original + * {@code IOException} (from {@code onException()}), not only a + * {@code TimeoutException} (from the {@code collectBrokerInfos()} + * fallback). Without the fix, the early {@code return} prevents the + * {@code IOException} from reaching {@code serviceRemoteException()}. + *

*/ @Test(timeout = 60_000) public void testBridgeReconnectsAfterHandshakeFailure() throws Exception { - // Start a fake server that accepts connections but never responds - // This ensures futureBrokerInfo is never set when onException fires + // Track exceptions passed to serviceRemoteException to verify + // the original IOException is properly propagated + CopyOnWriteArrayList remoteExceptions = new CopyOnWriteArrayList<>(); + CountDownLatch exceptionLatch = new CountDownLatch(1); + + BridgeFactory trackingFactory = new BridgeFactory() { + @Override + public DemandForwardingBridge createNetworkBridge( + NetworkBridgeConfiguration configuration, + Transport localTransport, Transport remoteTransport, + NetworkBridgeListener listener) { + DemandForwardingBridge bridge = new DemandForwardingBridge(configuration, localTransport, remoteTransport) { + @Override + public void serviceRemoteException(Throwable error) { + LOG.info("serviceRemoteException called with: {} ({})", + error.getClass().getSimpleName(), error.getMessage()); + remoteExceptions.add(error); + exceptionLatch.countDown(); + super.serviceRemoteException(error); + } + }; + bridge.setNetworkBridgeListener(listener); + return bridge; + } + }; + + // Phase 1: Start a fake server that accepts connections but never + // sends BrokerInfo — this keeps futureBrokerInfo incomplete ServerSocket fakeServer = new ServerSocket(0); int port = fakeServer.getLocalPort(); LOG.info("Fake server listening on port {}", port); - // Accept connections in background and close them after a short delay - // to trigger IOException on the bridge while futureBrokerInfo is pending CountDownLatch connectionReceived = new CountDownLatch(1); AtomicReference clientSocket = new AtomicReference<>(); Thread acceptThread = new Thread(() -> { @@ -100,7 +146,6 @@ public void testBridgeReconnectsAfterHandshakeFailure() throws Exception { LOG.info("Fake server accepted connection from {}", s.getRemoteSocketAddress()); clientSocket.set(s); connectionReceived.countDown(); - // Keep accepting — the bridge will retry } } catch (Exception e) { // Expected when we close the server socket @@ -109,7 +154,7 @@ public void testBridgeReconnectsAfterHandshakeFailure() throws Exception { acceptThread.setDaemon(true); acceptThread.start(); - // Start local broker with network connector pointing at the fake server + // Start the local broker with network connector pointing at the fake server localBroker = new BrokerService(); localBroker.setBrokerName("localBroker"); localBroker.setUseJmx(false); @@ -117,32 +162,59 @@ public void testBridgeReconnectsAfterHandshakeFailure() throws Exception { localBroker.setUseShutdownHook(false); DiscoveryNetworkConnector nc = new DiscoveryNetworkConnector( new URI("static:(tcp://localhost:" + port - + "?wireFormat.maxInactivityDuration=3000" - + "&wireFormat.maxInactivityDurationInitalDelay=3000" + + "?wireFormat.maxInactivityDuration=0" + ")?useExponentialBackOff=false&initialReconnectDelay=1000")); nc.setName("bridge-handshake-failure-test"); + nc.setBridgeFactory(trackingFactory); localBroker.addNetworkConnector(nc); localBroker.start(); localBroker.waitUntilStarted(); - // Wait for the fake server to receive a connection — the bridge TCP - // connected but will never get BrokerInfo + // Wait for the bridge to TCP-connect to the fake server. + // At this point futureBrokerInfo is NOT done — the handshake + // is stuck because the fake server never sends BrokerInfo. assertTrue("Bridge should connect to fake server", connectionReceived.await(10, TimeUnit.SECONDS)); LOG.info("Bridge connected to fake server, futureBrokerInfo will NOT be set"); - // Close the accepted socket — this fires onException while - // futureBrokerInfo is NOT done (the exact bug path) + // Phase 2: Simulate a handshake failure — close the socket to + // trigger onException() while futureBrokerInfo is not done Socket s = clientSocket.get(); if (s != null) { s.close(); - LOG.info("Fake server closed client socket — triggering onException with futureBrokerInfo not done"); + LOG.info("Closed fake server socket — simulating handshake failure"); } - // Wait a bit for the bridge to process the exception and attempt reconnection - Thread.sleep(2000); + // Verify serviceRemoteException is called with the original IOException. + // Without the fix, only a TimeoutException from collectBrokerInfos + // would reach serviceRemoteException. + assertTrue("serviceRemoteException should be called", + exceptionLatch.await(10, TimeUnit.SECONDS)); + + // Allow time for both code paths (onException and collectBrokerInfos) + // to call serviceRemoteException + assertTrue("Should receive exception(s)", Wait.waitFor(() -> + !remoteExceptions.isEmpty(), 5_000, 100)); + + for (int i = 0; i < remoteExceptions.size(); i++) { + Throwable ex = remoteExceptions.get(i); + LOG.info("serviceRemoteException call [{}]: {} ({})", + i, ex.getClass().getName(), ex.getMessage()); + } - // Now shut down the fake server and start the real remote broker on the same port + boolean hasIOException = remoteExceptions.stream() + .anyMatch(ex -> ex instanceof IOException); + assertTrue( + "serviceRemoteException should receive the original IOException " + + "(from onException handler), not only TimeoutException " + + "(from collectBrokerInfos fallback). Exceptions received: " + + remoteExceptions.stream() + .map(ex -> ex.getClass().getSimpleName()) + .reduce((a, b) -> a + ", " + b).orElse("none"), + hasIOException); + + // Phase 3: Shut down the fake server and start a real broker + // on the same port fakeServer.close(); acceptThread.interrupt(); @@ -156,20 +228,19 @@ public void testBridgeReconnectsAfterHandshakeFailure() throws Exception { remoteBroker.waitUntilStarted(); LOG.info("Real remote broker started on port {}", port); - // The bridge should reconnect to the real broker. - // WITHOUT the fix: onException returned early, serviceFailed() was never - // called, the bridge is permanently dead — this assertion will FAIL. - // WITH the fix: serviceRemoteException() fires, triggering reconnection. + // Phase 4: The bridge should reconnect to the real broker assertTrue("Bridge should reconnect to real broker after handshake failure", Wait.waitFor(() -> !nc.activeBridges().isEmpty(), 30_000, 500)); - LOG.info("Bridge reconnected to real broker successfully!"); + LOG.info("Bridge reconnected successfully after handshake failure"); - // Verify messages flow + // Phase 5: Verify messages flow across the re-established bridge verifyMessageFlow(localBroker, remoteBroker); } /** - * Basic reconnection test: verify bridge reconnects after remote broker restart. + * Verify that when the remote broker is abruptly stopped (causing a + * transport exception potentially during the broker info handshake), + * the network bridge reconnects once the remote broker is restarted. */ @Test(timeout = 60_000) public void testBridgeReconnectsAfterRemoteBrokerRestart() throws Exception {