From a5cec7ef8e44816d0248f4310b4317522803be3b Mon Sep 17 00:00:00 2001 From: tkhurana Date: Mon, 11 May 2026 17:33:27 -0700 Subject: [PATCH 1/3] PHOENIX-7854 Use Abortable interface to trigger RS aborts Use @CoreCoprocessor + HasRegionServerServices to obtain an Abortable (RegionServerServices) in IndexRegionObserver and PhoenixRegionServerEndpoint, and thread it into ReplicationLogGroup so that abort() triggers a real RS shutdown instead of only throwing a RuntimeException. --- .../PhoenixRegionServerEndpoint.java | 10 +- .../hbase/index/IndexRegionObserver.java | 12 +- .../replication/ReplicationLogDiscovery.java | 1 + .../replication/ReplicationLogGroup.java | 189 +++++++++++------- .../replication/StoreAndForwardModeImpl.java | 6 +- .../replication/SyncAndForwardModeImpl.java | 4 +- .../phoenix/replication/SyncModeImpl.java | 5 +- .../reader/ReplicationLogReplay.java | 1 - .../replication/ReplicationLogGroupTest.java | 58 +++--- 9 files changed, 165 insertions(+), 121 deletions(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java index 3f35b40cef8..ead41a69509 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java @@ -32,8 +32,11 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor; +import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices; import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment; import org.apache.hadoop.hbase.util.Bytes; @@ -62,12 +65,14 @@ /** * This is first implementation of RegionServer coprocessor introduced by Phoenix. */ +@CoreCoprocessor public class PhoenixRegionServerEndpoint extends RegionServerEndpointProtos.RegionServerEndpointService implements RegionServerCoprocessor { private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixRegionServerEndpoint.class); private MetricsMetadataCachingSource metricsSource; protected Configuration conf; protected ServerName serverName; + private Abortable abortable; private ExecutorService prewarmExecutor; // regionserver level thread pool used by Uncovered Indexes to scan data table rows @@ -79,6 +84,9 @@ public void start(CoprocessorEnvironment env) throws IOException { if (env instanceof RegionServerCoprocessorEnvironment) { this.serverName = ((RegionServerCoprocessorEnvironment) env).getServerName(); } + if (env instanceof HasRegionServerServices) { + this.abortable = ((HasRegionServerServices) env).getRegionServerServices(); + } this.metricsSource = MetricsPhoenixCoprocessorSourceFactory.getInstance().getMetadataCachingSource(); initUncoveredIndexThreadPool(this.conf); @@ -308,7 +316,7 @@ private void startHAGroupStoreClientPrewarming() { manager.getClusterRoleRecord(haGroup); if (shouldInitReplicationLogGroup) { try { - ReplicationLogGroup.get(conf, serverName, haGroup); + ReplicationLogGroup.get(conf, serverName, haGroup, abortable); LOGGER.info("Eagerly initialized ReplicationLogGroup {} on server {}", haGroup, serverName); } catch (Exception e) { diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java index 1ad0579a451..16fd9455725 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java @@ -53,6 +53,7 @@ import java.util.function.Predicate; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellScanner; @@ -70,6 +71,8 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor; +import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; @@ -170,6 +173,7 @@ * does batch mutations. *

*/ +@CoreCoprocessor public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { private static final Logger LOG = LoggerFactory.getLogger(IndexRegionObserver.class); @@ -439,6 +443,7 @@ public int getMaxPendingRowCount() { private static final int DEFAULT_CONCURRENT_MUTATION_WAIT_DURATION_IN_MS = 100; private byte[] encodedRegionName; private boolean shouldReplicate; + private Abortable abortable; // Don't replicate the mutation if this attribute is set private static final Predicate IGNORE_REPLICATION = @@ -541,6 +546,9 @@ public void start(CoprocessorEnvironment e) throws IOException { if (this.shouldReplicate) { this.ignoreReplicationFilter = getSynchronousReplicationFilter(tableName); } + if (e instanceof HasRegionServerServices) { + this.abortable = ((HasRegionServerServices) e).getRegionServerServices(); + } } catch (NoSuchMethodError ex) { disabled = true; LOG.error("Must be too early a version of HBase. Disabled coprocessor ", ex); @@ -689,7 +697,7 @@ private Optional getHAGroupFromBatch(RegionCoprocessorEnvir byte[] haGroupName = m.getAttribute(BaseScannerRegionObserverConstants.HA_GROUP_NAME_ATTRIB); if (haGroupName != null) { ReplicationLogGroup logGroup = ReplicationLogGroup.get(env.getConfiguration(), - env.getServerName(), Bytes.toString(haGroupName)); + env.getServerName(), Bytes.toString(haGroupName), abortable); return Optional.of(logGroup); } } @@ -707,7 +715,7 @@ private Optional getHAGroupFromWALKey(RegionCoprocessorEnvi logKey.getExtendedAttribute(BaseScannerRegionObserverConstants.HA_GROUP_NAME_ATTRIB); if (haGroupName != null) { ReplicationLogGroup logGroup = ReplicationLogGroup.get(env.getConfiguration(), - env.getServerName(), Bytes.toString(haGroupName)); + env.getServerName(), Bytes.toString(haGroupName), abortable); return Optional.of(logGroup); } return Optional.empty(); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java index 3c64e9ec61e..ccd0e82caf4 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java @@ -125,6 +125,7 @@ public void init() throws IOException { } public void close() { + replicationLogTracker.close(); if (this.metrics != null) { this.metrics.close(); } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java index ded1fec7b1b..77564b999af 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java @@ -56,6 +56,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.Mutation; import org.apache.phoenix.jdbc.HAGroupStoreManager; @@ -67,7 +68,6 @@ import org.slf4j.LoggerFactory; import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.apache.phoenix.thirdparty.com.google.common.base.Throwables; import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; @@ -171,6 +171,9 @@ public class ReplicationLogGroup { public static final long DEFAULT_REPLICATION_LOG_PEER_INIT_TIMEOUT_MS = 10_000L; public static final String WAL_SYNC_TIMEOUT_MS_KEY = "hbase.regionserver.wal.sync.timeout"; public static final long DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000L; + public static final String REPLICATION_LOG_GROUP_SHUTDOWN_TIMEOUT_MS_KEY = + "phoenix.replication.log.group.shutdown.timeout.ms"; + public static final long DEFAULT_REPLICATION_LOG_GROUP_SHUTDOWN_TIMEOUT_MS = 30_000L; public static final String STANDBY_DIR = "in"; public static final String FALLBACK_DIR = "out"; @@ -193,7 +196,9 @@ public class ReplicationLogGroup { protected ReplicationLogDiscoveryForwarder logForwarder; protected long syncTimeoutMs; protected long peerInitTimeoutMs; - protected volatile boolean closed = false; + protected final AtomicBoolean closed = new AtomicBoolean(false); + protected final Abortable abortable; + protected long shutdownTimeoutMs; /** * The replication mode determines how mutations are handled. Mode transitions occur automatically @@ -344,10 +349,25 @@ public Record(String tableName, long commitId, Mutation mutation) { */ public static ReplicationLogGroup get(Configuration conf, ServerName serverName, String haGroupName) throws IOException { + return get(conf, serverName, haGroupName, (Abortable) null); + } + + /** + * Get or create a ReplicationLogGroup instance for the given HA Group. + * @param conf Configuration object + * @param serverName The server name + * @param haGroupName The HA Group name + * @param abortable Abortable to invoke on fatal errors (typically RegionServerServices) + * @return ReplicationLogGroup instance + * @throws IOException if initialization fails + */ + public static ReplicationLogGroup get(Configuration conf, ServerName serverName, + String haGroupName, Abortable abortable) throws IOException { try { return INSTANCES.computeIfAbsent(haGroupName, k -> { try { - ReplicationLogGroup group = new ReplicationLogGroup(conf, serverName, haGroupName); + ReplicationLogGroup group = + new ReplicationLogGroup(conf, serverName, haGroupName, abortable); group.init(); return group; } catch (IOException e) { @@ -395,7 +415,19 @@ public static ReplicationLogGroup get(Configuration conf, ServerName serverName, * @param haGroupName The HA Group name */ protected ReplicationLogGroup(Configuration conf, ServerName serverName, String haGroupName) { - this(conf, serverName, haGroupName, HAGroupStoreManager.getInstance(conf)); + this(conf, serverName, haGroupName, HAGroupStoreManager.getInstance(conf), null); + } + + /** + * Protected constructor for ReplicationLogGroup. + * @param conf Configuration object + * @param serverName The server name + * @param haGroupName The HA Group name + * @param abortable Abortable to invoke on fatal errors (may be null) + */ + protected ReplicationLogGroup(Configuration conf, ServerName serverName, String haGroupName, + Abortable abortable) { + this(conf, serverName, haGroupName, HAGroupStoreManager.getInstance(conf), abortable); } /** @@ -407,6 +439,19 @@ protected ReplicationLogGroup(Configuration conf, ServerName serverName, String */ protected ReplicationLogGroup(Configuration conf, ServerName serverName, String haGroupName, HAGroupStoreManager haGroupStoreManager) { + this(conf, serverName, haGroupName, haGroupStoreManager, null); + } + + /** + * Protected constructor for ReplicationLogGroup. + * @param conf Configuration object + * @param serverName The server name + * @param haGroupName The HA Group name + * @param haGroupStoreManager HA Group Store Manager instance + * @param abortable Abortable to invoke on fatal errors (may be null) + */ + protected ReplicationLogGroup(Configuration conf, ServerName serverName, String haGroupName, + HAGroupStoreManager haGroupStoreManager, Abortable abortable) { // conf object from coprocessor is instance of // org.apache.hadoop.hbase.coprocessor.ReadOnlyConfiguration and we need to modify it when // we send rpc to namenode so copying it @@ -420,6 +465,9 @@ protected ReplicationLogGroup(Configuration conf, ServerName serverName, String this.serverName = serverName; this.haGroupName = haGroupName; this.haGroupStoreManager = haGroupStoreManager; + this.abortable = abortable; + this.shutdownTimeoutMs = clonedConf.getLong(REPLICATION_LOG_GROUP_SHUTDOWN_TIMEOUT_MS_KEY, + DEFAULT_REPLICATION_LOG_GROUP_SHUTDOWN_TIMEOUT_MS); this.metrics = createMetricsSource(); this.mode = new AtomicReference<>(INIT); } @@ -517,7 +565,7 @@ public void append(String tableName, long commitId, Mutation mutation) throws IO if (LOG.isTraceEnabled()) { LOG.trace("Append: table={}, commitId={}, mutation={}", tableName, commitId, mutation); } - if (closed) { + if (isClosed()) { throw new IOException("Closed"); } long startTime = System.nanoTime(); @@ -557,7 +605,7 @@ public void sync() throws IOException { if (LOG.isTraceEnabled()) { LOG.trace("Sync"); } - if (closed) { + if (isClosed()) { throw new IOException("Closed"); } long startTime = System.nanoTime(); @@ -589,16 +637,16 @@ protected void syncInternal() throws IOException { Thread.currentThread().interrupt(); throw new InterruptedIOException("Interrupted while waiting for sync"); } catch (ExecutionException e) { - // After exhausting all attempts to sync to the standby cluster we switch mode - // and then retry again. If that also fails, it is a fatal error + // The consumer thread already called abort(); just propagate the error to the client + Throwable cause = e.getCause(); String message = String.format("HAGroup %s sync operation failed", this); - LOG.error(message, e); - abort(message, e); + LOG.error(message, cause); + throw asIOException(message, cause); } catch (TimeoutException e) { String message = String.format("HAGroup %s replication log sync timed out after %d ms", this, syncTimeoutMs); LOG.error(message, e); - PhoenixWALSyncTimeoutException timeoutEx = new PhoenixWALSyncTimeoutException(message, e); + PhoenixWALSyncTimeoutException timeoutEx = new PhoenixWALSyncTimeoutException(message); abort(message, timeoutEx); } } @@ -608,67 +656,50 @@ protected void syncInternal() throws IOException { * @return true if closed, false otherwise */ public boolean isClosed() { - return closed; + return closed.get(); } /** - * Force closes the log group upon an unrecoverable internal error. This is a fail-stop behavior: - * once called, the log group is marked as closed, the Disruptor is halted, and all subsequent - * append() and sync() calls will throw an IOException("Closed"). This ensures that no further - * operations are attempted on a log group that has encountered a critical error. + * Close the ReplicationLogGroup. When graceful, drains pending events from the Disruptor (which + * triggers onShutdown → modeImpl.onExit → ReplicationLog.close) with a bounded timeout; when + * non-graceful, halts the Disruptor immediately. In both cases the instance is removed from the + * INSTANCES cache and subsequent append()/sync() calls throw IOException("Closed"). + * @param graceful true for orderly RS shutdown, false for fatal error / abort */ - protected void closeOnError() { - if (closed) { + protected void close(boolean graceful) { + if (!closed.compareAndSet(false, true)) { return; } - synchronized (this) { - if (closed) { - return; + LOG.info("Closing HAGroup {} graceful={}", this, graceful); + INSTANCES.remove(haGroupName); + if (graceful) { + gracefulShutdownEventHandlerFlag.set(true); + try { + disruptor.shutdown(shutdownTimeoutMs, TimeUnit.MILLISECONDS); + } catch (com.lmax.disruptor.TimeoutException e) { + LOG.warn("HAGroup {} graceful shutdown timed out after {}ms, halting", this, + shutdownTimeoutMs); + gracefulShutdownEventHandlerFlag.set(false); + disruptor.halt(); } - // setting closed to true prevents future producers to add events to the ring buffer - closed = true; + shutdownDisruptorExecutor(); + } else { + gracefulShutdownEventHandlerFlag.set(false); + disruptor.halt(); } - // Directly halt the disruptor. shutdown() would wait for events to drain. We are expecting - // that will not work. - gracefulShutdownEventHandlerFlag.set(false); - disruptor.halt(); + // stop the forwarder if running + logForwarder.stop(); + logForwarder.close(); metrics.close(); - LOG.info("HAGroup {} closed on error", this); + LOG.info("HAGroup {} closed graceful={}", this, graceful); } /** - * Close the ReplicationLogGroup and all associated resources. This method is thread-safe and can - * be called multiple times. + * Gracefully close the ReplicationLogGroup. Convenience method that delegates to + * {@link #close(boolean)} with graceful=true. */ public void close() { - if (closed) { - return; - } - synchronized (this) { - if (closed) { - return; - } - LOG.info("Closing HAGroup {}", this); - // setting closed to true prevents future producers to add events to the ring buffer - closed = true; - // Remove from instances cache - INSTANCES.remove(haGroupName); - // Sync before shutting down to flush all pending appends. - try { - syncInternal(); - gracefulShutdownEventHandlerFlag.set(true); - // waits until all the events in the disruptor have been processed - disruptor.shutdown(); - } catch (IOException e) { - LOG.warn("Error during final sync on close", e); - gracefulShutdownEventHandlerFlag.set(false); - disruptor.halt(); // Go directly to halt. - } - // wait for the disruptor threads to finish - shutdownDisruptorExecutor(); - metrics.close(); - LOG.info("HAGroup {} closed", this); - } + close(true); } /** @@ -902,20 +933,29 @@ protected long setHAGroupStatusToSync() throws Exception { } /** - * Abort when we hit a fatal error + * Throws the given cause as an IOException. If it already is one, rethrows directly; otherwise + * wraps it. */ - protected void abort(String reason, Throwable cause) { - // TODO better to use abort using RegionServerServices - String msg = "***** ABORTING region server: " + reason + " *****"; - if (cause != null) { - msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause); + static IOException asIOException(String msg, Throwable cause) { + if (cause instanceof IOException) { + return (IOException) cause; } - LOG.error(msg); - if (cause != null) { - throw new RuntimeException(msg, cause); - } else { - throw new RuntimeException(msg); + return cause != null ? new IOException(msg, cause) : new IOException(msg); + } + + /** + * Abort the region server due to an unrecoverable replication failure. Halts the Disruptor via + * close(false) first, then invokes the Abortable (RegionServerServices) to trigger an + * asynchronous RS shutdown. Still throws so the current call path unwinds with a fatal exception. + */ + protected void abort(String reason, Throwable cause) throws IOException { + String msg = "Aborting region server due to replication failure: " + reason; + LOG.error(msg, cause); + close(false); + if (abortable != null) { + abortable.abort(msg, cause); } + throw asIOException(msg, cause); } /** @@ -996,7 +1036,8 @@ private void processPendingSyncs(long sequence) throws IOException { // is not processing any event like append/sync because this is the only thread // that is consuming the events from the ring buffer and handing them off to the // mode - currentModeImpl.onExit(true); + ReplicationModeImpl oldModeImpl = currentModeImpl; + disruptorExecutor.execute(() -> oldModeImpl.onExit(true)); initializeMode(newMode); } } @@ -1126,8 +1167,6 @@ public void onEvent(LogEvent event, long sequence, boolean endOfBatch) throws Ex // replication pipeline does, so the SAF forwarder cannot reconcile it. Abort the RS // so region reassignment + preWALRestore re-ships the orphaned edits. abort("Both SYNC and SAF replication writes failed", fatalEx); - // halt the disruptor with the fatal exception - throw fatalEx; } } } @@ -1155,20 +1194,20 @@ protected class LogExceptionHandler implements ExceptionHandler { public void handleEventException(Throwable e, long sequence, LogEvent event) { String message = "Exception processing sequence " + sequence + " for event " + event; LOG.error(message, e); - closeOnError(); + close(false); } @Override public void handleOnStartException(Throwable e) { LOG.error("Exception during Disruptor startup", e); - closeOnError(); + close(false); } @Override public void handleOnShutdownException(Throwable e) { // Should not happen, but if it does, the regionserver is aborting or shutting down. LOG.error("Exception during Disruptor shutdown", e); - closeOnError(); + close(false); } } } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java index 7bcb4c89e09..8f205ea7673 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java @@ -110,12 +110,10 @@ void onExit(boolean gracefulShutdown) { @Override ReplicationMode onFailure(Throwable e) throws IOException { - // Treating failures in STORE_AND_FORWARD mode as fatal errors + // Failures in STORE_AND_FORWARD mode are fatal — throw to let the event handler abort String message = String.format("HAGroup %s mode=%s got error", logGroup, this); LOG.error(message, e); - logGroup.abort(message, e); - // unreachable, we remain in the same mode - return STORE_AND_FORWARD; + throw ReplicationLogGroup.asIOException(message, e); } @Override diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java index 02c57f7f3fd..941cde59773 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java @@ -66,11 +66,11 @@ ReplicationMode onFailure(Throwable e) throws IOException { try { logGroup.setHAGroupStatusToStoreAndForward(); } catch (Exception ex) { - // Fatal error when we can't update the HAGroup status + // Fatal — can't transition to SAF; throw to let the event handler abort String message = String.format("HAGroup %s could not update status to STORE_AND_FORWARD", logGroup); LOG.error(message, ex); - logGroup.abort(message, ex); + throw ReplicationLogGroup.asIOException(message, ex); } return STORE_AND_FORWARD; } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java index 0a5b5a48c66..1378d70fe9b 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java @@ -59,14 +59,13 @@ ReplicationMode onFailure(Throwable e) throws IOException { LOG.info("HAGroup {} mode={} got error", logGroup, this, e); logGroup.getMetrics().incrementSyncToSafTransitions(); try { - // first update the HAGroupStore state logGroup.setHAGroupStatusToStoreAndForward(); } catch (Exception ex) { - // Fatal error when we can't update the HAGroup status + // Fatal — can't transition to SAF; throw to let the event handler abort String message = String.format("HAGroup %s could not update status to STORE_AND_FORWARD", logGroup); LOG.error(message, ex); - logGroup.abort(message, ex); + throw ReplicationLogGroup.asIOException(message, ex); } return STORE_AND_FORWARD; } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplay.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplay.java index 88eeb791e31..d062aa8c530 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplay.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplay.java @@ -128,7 +128,6 @@ protected void init() throws IOException { public void close() { LOG.info("Closing ReplicationLogReplay for haGroup: {}", haGroupName); - replicationLogDiscoveryReplay.getReplicationLogFileTracker().close(); replicationLogDiscoveryReplay.close(); // Remove the instance from cache INSTANCES.remove(haGroupName); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java index 2bf723defd0..fb8ca7a9537 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java @@ -263,13 +263,12 @@ public Object answer(InvocationOnMock invocation) throws Throwable { logGroup.append(tableName, commitId, put); // sync on the writer will timeout — syncInternal wraps in PhoenixWALSyncTimeoutException - // and calls abort() which throws RuntimeException + // and calls abort() which rethrows the IOException cause directly try { logGroup.sync(); - fail("Should have thrown RuntimeException because sync timed out"); - } catch (RuntimeException e) { - assertTrue("Expected PhoenixWALSyncTimeoutException cause", - e.getCause() instanceof PhoenixWALSyncTimeoutException); + fail("Should have thrown PhoenixWALSyncTimeoutException because sync timed out"); + } catch (PhoenixWALSyncTimeoutException e) { + // expected } // reset doNothing().when(innerWriter).sync(); @@ -628,10 +627,9 @@ public void testEventProcessingException() throws Exception { logGroup.append(tableName, commitId, put); try { logGroup.sync(); - fail("Should have thrown RuntimeException because sync timed out"); - } catch (RuntimeException e) { - assertTrue("Expected PhoenixWALSyncTimeoutException cause", - e.getCause() instanceof PhoenixWALSyncTimeoutException); + fail("Should have thrown PhoenixWALSyncTimeoutException because sync timed out"); + } catch (PhoenixWALSyncTimeoutException e) { + // expected } // Verify that subsequent operations fail because the log is closed @@ -719,12 +717,10 @@ public void testFailToUpdateHAGroupStatusOnSwitchToStoreAndForward() throws Exce // Try to sync. Should fail after exhausting retries and then switch to STORE_AND_FORWARD try { logGroup.sync(); - fail("Should have thrown exception because of failure to update mode"); - } catch (RuntimeException ex) { + fail("Should have thrown IOException because of failure to update mode"); + } catch (IOException ex) { assertTrue(ex.getMessage().contains("Simulated sync failure")); } - // wait for the event processor thread to clean up - Thread.sleep(3); } /** @@ -975,10 +971,9 @@ public void testRuntimeExceptionDuringLengthCheck() throws Exception { logGroup.append(tableName, commitId, put); try { logGroup.sync(); - fail("Should have thrown RuntimeException because sync timed out"); - } catch (RuntimeException e) { - assertTrue("Expected PhoenixWALSyncTimeoutException cause", - e.getCause() instanceof PhoenixWALSyncTimeoutException); + fail("Should have thrown PhoenixWALSyncTimeoutException because sync timed out"); + } catch (PhoenixWALSyncTimeoutException e) { + // expected } // Verify that subsequent operations fail because the log is closed @@ -1016,10 +1011,9 @@ public void testAppendAfterCloseOnError() throws Exception { logGroup.append(tableName, commitId, put); try { logGroup.sync(); - fail("Should have thrown RuntimeException because sync timed out"); - } catch (RuntimeException e) { - assertTrue("Expected PhoenixWALSyncTimeoutException cause", - e.getCause() instanceof PhoenixWALSyncTimeoutException); + fail("Should have thrown PhoenixWALSyncTimeoutException because sync timed out"); + } catch (PhoenixWALSyncTimeoutException e) { + // expected } // Verify that subsequent append operations fail because the log is closed @@ -1057,10 +1051,9 @@ public void testSyncAfterCloseOnError() throws Exception { logGroup.append(tableName, commitId, put); try { logGroup.sync(); - fail("Should have thrown RuntimeException because sync timed out"); - } catch (RuntimeException e) { - assertTrue("Expected PhoenixWALSyncTimeoutException cause", - e.getCause() instanceof PhoenixWALSyncTimeoutException); + fail("Should have thrown PhoenixWALSyncTimeoutException because sync timed out"); + } catch (PhoenixWALSyncTimeoutException e) { + // expected } // Verify that subsequent sync operations fail because the log is closed @@ -1721,11 +1714,10 @@ public void testBothSyncAndSafFailuresTriggersAbort() throws Exception { logGroup.append(tableName, commitId, put); try { logGroup.sync(); - fail("Should have thrown RuntimeException from abort"); - } catch (RuntimeException e) { - assertTrue("Abort message should mention both SYNC and SAF", - e.getMessage().contains("Both SYNC and SAF replication writes failed") - || e.getMessage().contains("ABORTING")); + fail("Should have thrown IOException from abort"); + } catch (IOException e) { + assertTrue("Abort message should contain sync failure", + e.getMessage().contains("Simulated sync failure")); } } @@ -1763,10 +1755,10 @@ public void testSafBothAttemptsFailTriggersAbort() throws Exception { logGroup.append(tableName, commitId, put); try { logGroup.sync(); - fail("Should have thrown RuntimeException from abort"); - } catch (RuntimeException e) { + fail("Should have thrown IOException from abort"); + } catch (IOException e) { assertTrue("Abort should fire from SAF failure path", - e.getMessage().contains("ABORTING") || e.getMessage().contains("got error")); + e.getMessage().contains("Simulated SAF sync failure")); } } From 00d53f569d673604a985899931062d87f416b4b1 Mon Sep 17 00:00:00 2001 From: tkhurana Date: Wed, 13 May 2026 15:15:03 -0700 Subject: [PATCH 2/3] Simplify --- .../replication/ReplicationLogGroup.java | 26 ++----------------- .../replication/ReplicationModeImpl.java | 16 ++++++++++++ .../replication/SyncAndForwardModeImpl.java | 13 +--------- .../phoenix/replication/SyncModeImpl.java | 13 +--------- 4 files changed, 20 insertions(+), 48 deletions(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java index 77564b999af..6fb43328f24 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java @@ -366,8 +366,8 @@ public static ReplicationLogGroup get(Configuration conf, ServerName serverName, try { return INSTANCES.computeIfAbsent(haGroupName, k -> { try { - ReplicationLogGroup group = - new ReplicationLogGroup(conf, serverName, haGroupName, abortable); + ReplicationLogGroup group = new ReplicationLogGroup(conf, serverName, haGroupName, + HAGroupStoreManager.getInstance(conf), abortable); group.init(); return group; } catch (IOException e) { @@ -408,28 +408,6 @@ public static ReplicationLogGroup get(Configuration conf, ServerName serverName, } } - /** - * Protected constructor for ReplicationLogGroup. - * @param conf Configuration object - * @param serverName The server name - * @param haGroupName The HA Group name - */ - protected ReplicationLogGroup(Configuration conf, ServerName serverName, String haGroupName) { - this(conf, serverName, haGroupName, HAGroupStoreManager.getInstance(conf), null); - } - - /** - * Protected constructor for ReplicationLogGroup. - * @param conf Configuration object - * @param serverName The server name - * @param haGroupName The HA Group name - * @param abortable Abortable to invoke on fatal errors (may be null) - */ - protected ReplicationLogGroup(Configuration conf, ServerName serverName, String haGroupName, - Abortable abortable) { - this(conf, serverName, haGroupName, HAGroupStoreManager.getInstance(conf), abortable); - } - /** * Protected constructor for ReplicationLogGroup. * @param conf Configuration object diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationModeImpl.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationModeImpl.java index 12385131353..114e519b428 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationModeImpl.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationModeImpl.java @@ -20,6 +20,8 @@ import java.io.IOException; import org.apache.phoenix.replication.ReplicationLogGroup.Record; import org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; @@ -31,6 +33,7 @@ *

*/ public abstract class ReplicationModeImpl { + private static final Logger LOG = LoggerFactory.getLogger(ReplicationModeImpl.class); protected final ReplicationLogGroup logGroup; // The mode manages the underlying log to which the append and sync events will be sent @@ -91,4 +94,17 @@ void closeReplicationLog(boolean graceful) { log.close(graceful); } } + + protected ReplicationLogGroup.ReplicationMode transitionToStoreAndForward() throws IOException { + logGroup.getMetrics().incrementSyncToSafTransitions(); + try { + logGroup.setHAGroupStatusToStoreAndForward(); + } catch (Exception ex) { + String message = + String.format("HAGroup %s could not update status to STORE_AND_FORWARD", logGroup); + LOG.error(message, ex); + throw ReplicationLogGroup.asIOException(message, ex); + } + return ReplicationLogGroup.ReplicationMode.STORE_AND_FORWARD; + } } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java index 941cde59773..6c85ee27d47 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java @@ -17,7 +17,6 @@ */ package org.apache.phoenix.replication; -import static org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode.STORE_AND_FORWARD; import static org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode.SYNC_AND_FORWARD; import java.io.IOException; @@ -62,17 +61,7 @@ void onExit(boolean gracefulShutdown) { @Override ReplicationMode onFailure(Throwable e) throws IOException { LOG.info("HAGroup {} mode={} got error", logGroup, this, e); - logGroup.getMetrics().incrementSyncToSafTransitions(); - try { - logGroup.setHAGroupStatusToStoreAndForward(); - } catch (Exception ex) { - // Fatal — can't transition to SAF; throw to let the event handler abort - String message = - String.format("HAGroup %s could not update status to STORE_AND_FORWARD", logGroup); - LOG.error(message, ex); - throw ReplicationLogGroup.asIOException(message, ex); - } - return STORE_AND_FORWARD; + return transitionToStoreAndForward(); } @Override diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java index 1378d70fe9b..ca258a5b760 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java @@ -17,7 +17,6 @@ */ package org.apache.phoenix.replication; -import static org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode.STORE_AND_FORWARD; import static org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode.SYNC; import java.io.IOException; @@ -57,17 +56,7 @@ void onExit(boolean gracefulShutdown) { @Override ReplicationMode onFailure(Throwable e) throws IOException { LOG.info("HAGroup {} mode={} got error", logGroup, this, e); - logGroup.getMetrics().incrementSyncToSafTransitions(); - try { - logGroup.setHAGroupStatusToStoreAndForward(); - } catch (Exception ex) { - // Fatal — can't transition to SAF; throw to let the event handler abort - String message = - String.format("HAGroup %s could not update status to STORE_AND_FORWARD", logGroup); - LOG.error(message, ex); - throw ReplicationLogGroup.asIOException(message, ex); - } - return STORE_AND_FORWARD; + return transitionToStoreAndForward(); } @Override From bec9e9445ad614df2b6167fafd886a505f48a094 Mon Sep 17 00:00:00 2001 From: tkhurana Date: Wed, 13 May 2026 17:02:11 -0700 Subject: [PATCH 3/3] PHOENIX-7854 Adopt fatal-exception pattern for Disruptor event handler lifecycle MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The event handler thread no longer calls abort() or close() on itself. On fatal errors it sets a fatalException field, fails pending sync futures, and keeps consuming events (so the ring buffer doesn't block producers). The producer thread that receives the failed future calls abort(), which closes resources and invokes Abortable on its own thread. This eliminates the close(boolean graceful) split — a single close() always drains with a timeout and falls back to halt. When the handler has a fatal exception the drain completes instantly. The onShutdown callback derives graceful vs non-graceful from fatalException == null, removing the gracefulShutdownEventHandlerFlag field entirely. --- .../replication/ReplicationLogGroup.java | 134 +++++++++--------- .../replication/ReplicationLogGroupTest.java | 80 +++++------ 2 files changed, 108 insertions(+), 106 deletions(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java index 6fb43328f24..1fe559b449c 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java @@ -336,8 +336,6 @@ public Record(String tableName, long commitId, Mutation mutation) { protected Disruptor disruptor; protected RingBuffer ringBuffer; protected LogEventHandler eventHandler; - // Used to inform the disruptor event thread whether this is a graceful or a forced shutdown - private final AtomicBoolean gracefulShutdownEventHandlerFlag = new AtomicBoolean(); /** * Get or create a ReplicationLogGroup instance for the given HA Group. @@ -546,6 +544,10 @@ public void append(String tableName, long commitId, Mutation mutation) throws IO if (isClosed()) { throw new IOException("Closed"); } + IOException fatal = eventHandler.getFatalException(); + if (fatal != null) { + throw fatal; + } long startTime = System.nanoTime(); try { // ringBuffer.next() claims the next sequence number. Because we initialize the Disruptor @@ -586,6 +588,10 @@ public void sync() throws IOException { if (isClosed()) { throw new IOException("Closed"); } + IOException fatal = eventHandler.getFatalException(); + if (fatal != null) { + abort("ReplicationLogGroup has a fatal exception", fatal); + } long startTime = System.nanoTime(); try { syncInternal(); @@ -615,11 +621,10 @@ protected void syncInternal() throws IOException { Thread.currentThread().interrupt(); throw new InterruptedIOException("Interrupted while waiting for sync"); } catch (ExecutionException e) { - // The consumer thread already called abort(); just propagate the error to the client Throwable cause = e.getCause(); String message = String.format("HAGroup %s sync operation failed", this); LOG.error(message, cause); - throw asIOException(message, cause); + abort(message, cause); } catch (TimeoutException e) { String message = String.format("HAGroup %s replication log sync timed out after %d ms", this, syncTimeoutMs); @@ -638,46 +643,29 @@ public boolean isClosed() { } /** - * Close the ReplicationLogGroup. When graceful, drains pending events from the Disruptor (which - * triggers onShutdown → modeImpl.onExit → ReplicationLog.close) with a bounded timeout; when - * non-graceful, halts the Disruptor immediately. In both cases the instance is removed from the - * INSTANCES cache and subsequent append()/sync() calls throw IOException("Closed"). - * @param graceful true for orderly RS shutdown, false for fatal error / abort + * Close the ReplicationLogGroup. Drains pending events from the Disruptor with a bounded timeout + * (which triggers onShutdown → modeImpl.onExit → ReplicationLog.close), then cleans up all + * resources. When the event handler has a fatal exception the drain completes instantly because + * each event hits the short-circuit path. The instance is removed from the INSTANCES cache and + * subsequent append()/sync() calls throw IOException. */ - protected void close(boolean graceful) { + public void close() { if (!closed.compareAndSet(false, true)) { return; } - LOG.info("Closing HAGroup {} graceful={}", this, graceful); + LOG.info("Closing HAGroup {}", this); INSTANCES.remove(haGroupName); - if (graceful) { - gracefulShutdownEventHandlerFlag.set(true); - try { - disruptor.shutdown(shutdownTimeoutMs, TimeUnit.MILLISECONDS); - } catch (com.lmax.disruptor.TimeoutException e) { - LOG.warn("HAGroup {} graceful shutdown timed out after {}ms, halting", this, - shutdownTimeoutMs); - gracefulShutdownEventHandlerFlag.set(false); - disruptor.halt(); - } - shutdownDisruptorExecutor(); - } else { - gracefulShutdownEventHandlerFlag.set(false); + try { + disruptor.shutdown(shutdownTimeoutMs, TimeUnit.MILLISECONDS); + } catch (com.lmax.disruptor.TimeoutException e) { + LOG.warn("HAGroup {} shutdown timed out after {}ms, halting", this, shutdownTimeoutMs); disruptor.halt(); } - // stop the forwarder if running + shutdownDisruptorExecutor(); logForwarder.stop(); logForwarder.close(); metrics.close(); - LOG.info("HAGroup {} closed graceful={}", this, graceful); - } - - /** - * Gracefully close the ReplicationLogGroup. Convenience method that delegates to - * {@link #close(boolean)} with graceful=true. - */ - public void close() { - close(true); + LOG.info("HAGroup {} closed", this); } /** @@ -922,14 +910,18 @@ static IOException asIOException(String msg, Throwable cause) { } /** - * Abort the region server due to an unrecoverable replication failure. Halts the Disruptor via - * close(false) first, then invokes the Abortable (RegionServerServices) to trigger an - * asynchronous RS shutdown. Still throws so the current call path unwinds with a fatal exception. + * Abort the region server due to an unrecoverable replication failure. Must be called from a + * producer thread (never the event handler thread). Sets the fatal exception so remaining events + * drain instantly, then closes all resources and invokes the Abortable to trigger RS shutdown. + * Always throws so the caller unwinds. */ protected void abort(String reason, Throwable cause) throws IOException { String msg = "Aborting region server due to replication failure: " + reason; LOG.error(msg, cause); - close(false); + // Idempotent; may already be set by the event handler (ExecutionException path) + // but is needed here for the TimeoutException path where the event handler is still alive. + eventHandler.setFatalException(asIOException(msg, cause)); + close(); if (abortable != null) { abortable.abort(msg, cause); } @@ -941,12 +933,24 @@ protected void abort(String reason, Throwable cause) throws IOException { */ protected class LogEventHandler implements EventHandler, LifecycleAware { private final List> pendingSyncFutures = new ArrayList<>(); - // Current replication mode implementation which will handle the events private ReplicationModeImpl currentModeImpl; + private volatile IOException fatalException; public LogEventHandler() { } + void setFatalException(IOException cause) { + if (fatalException != null) { + return; + } + LOG.error("HAGroup {} event handler hit fatal exception", ReplicationLogGroup.this, cause); + this.fatalException = cause; + } + + IOException getFatalException() { + return fatalException; + } + public void init() throws IOException { initializeMode(getMode()); } @@ -1105,25 +1109,27 @@ private void replayFailedEvent(LogEvent failedEvent, long sequence) throws IOExc */ @Override public void onEvent(LogEvent event, long sequence, boolean endOfBatch) throws Exception { - // Calculate time spent in ring buffer long currentTimeNs = System.nanoTime(); long ringBufferTimeNs = currentTimeNs - event.timestampNs; metrics.updateRingBufferTime(ringBufferTimeNs); + if (fatalException != null) { + // Append events are ignored; sync futures are failed immediately + // so producer threads unblock without waiting for the sync timeout. + if (event.type == EVENT_TYPE_SYNC) { + event.syncFuture.completeExceptionally(fatalException); + } + return; + } try { switch (event.type) { case EVENT_TYPE_DATA: currentModeImpl.append(event.record); - // Process any pending syncs at the end of batch. if (endOfBatch) { processPendingSyncs(sequence); } return; case EVENT_TYPE_SYNC: - // Add this sync future to the pending list - // OK, to add the same future multiple times when we rewind the batch - // as completing an already completed future is a no-op pendingSyncFutures.add(event.syncFuture); - // Process any pending syncs at the end of batch. if (endOfBatch) { processPendingSyncs(sequence); } @@ -1137,15 +1143,16 @@ public void onEvent(LogEvent event, long sequence, boolean endOfBatch) throws Ex e); onFailure(event, sequence, e); } catch (Exception fatalEx) { - // Either we failed to switch the mode or we are in STORE_AND_FORWARD mode - // and got an exception. This is a fatal exception so halt the disruptor - // fail the pending sync events with the original exception - failPendingSyncs(sequence, e); - // Both SYNC and SAF writes failed. The local HBase WAL has the mutation but neither - // replication pipeline does, so the SAF forwarder cannot reconcile it. Abort the RS - // so region reassignment + preWALRestore re-ships the orphaned edits. - abort("Both SYNC and SAF replication writes failed", fatalEx); + IOException fatalIOE = + asIOException("Both SYNC and SAF replication writes failed", fatalEx); + setFatalException(fatalIOE); + failPendingSyncs(sequence, fatalIOE); } + } catch (Throwable t) { + IOException wrapped = + new IOException("Unexpected error in event handler at sequence " + sequence, t); + setFatalException(wrapped); + failPendingSyncs(sequence, wrapped); } } @@ -1156,36 +1163,35 @@ public void onStart() { @Override public void onShutdown() { - boolean isGracefulShutdown = gracefulShutdownEventHandlerFlag.get(); + boolean graceful = (fatalException == null); LOG.info("HAGroup {} shutting down event handler graceful={}", ReplicationLogGroup.this, - isGracefulShutdown); - currentModeImpl.onExit(isGracefulShutdown); + graceful); + currentModeImpl.onExit(graceful); } } /** - * Handler for critical errors during the Disruptor lifecycle that closes the writer to prevent - * data loss. + * Safety-net handler for exceptions that escape the event handler. Should never fire because + * onEvent catches all Throwables. If it does fire, sets a fatal exception so pending futures + * fail. */ protected class LogExceptionHandler implements ExceptionHandler { @Override public void handleEventException(Throwable e, long sequence, LogEvent event) { - String message = "Exception processing sequence " + sequence + " for event " + event; - LOG.error(message, e); - close(false); + LOG.error("UNEXPECTED: Exception escaped onEvent at sequence {}", sequence, e); + eventHandler + .setFatalException(new IOException("Exception escaped to LogExceptionHandler", e)); } @Override public void handleOnStartException(Throwable e) { LOG.error("Exception during Disruptor startup", e); - close(false); + eventHandler.setFatalException(new IOException("Disruptor startup failed", e)); } @Override public void handleOnShutdownException(Throwable e) { - // Should not happen, but if it does, the regionserver is aborting or shutting down. LOG.error("Exception during Disruptor shutdown", e); - close(false); } } } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java index fb8ca7a9537..ea8fa451ed2 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java @@ -622,26 +622,25 @@ public void testEventProcessingException() throws Exception { doThrow(new RuntimeException("Simulated critical error")).when(innerWriter).append(anyString(), anyLong(), any(Mutation.class)); - // Append data. This should trigger the LogExceptionHandler, which will close logWriter. - // The sync future times out, syncInternal wraps in PhoenixWALSyncTimeoutException and aborts. + // Append publishes to the ring buffer. The event handler catches the RuntimeException via + // catch(Throwable), poisons itself, and fails the sync future. The producer receives + // ExecutionException and calls abort(). logGroup.append(tableName, commitId, put); try { logGroup.sync(); - fail("Should have thrown PhoenixWALSyncTimeoutException because sync timed out"); - } catch (PhoenixWALSyncTimeoutException e) { - // expected + fail("Should have thrown IOException from abort"); + } catch (IOException e) { + assertTrue(e.getMessage().contains("Unexpected error in event handler")); } - // Verify that subsequent operations fail because the log is closed + // Verify that subsequent operations fail try { logGroup.append(tableName, commitId + 1, put); - fail("Should have thrown IOException because log is closed"); + fail("Should have thrown IOException because log is poisoned/closed"); } catch (IOException e) { - assertTrue("Expected an IOException because log is closed", - e.getMessage().contains("Closed")); + // expected } - // Verify that the inner writer was closed by the LogExceptionHandler verify(innerWriter, times(1)).close(); } @@ -714,12 +713,13 @@ public void testFailToUpdateHAGroupStatusOnSwitchToStoreAndForward() throws Exce // Append data logGroup.append(tableName, commitId, put); - // Try to sync. Should fail after exhausting retries and then switch to STORE_AND_FORWARD + // Sync fails, mode transition to SAF also fails because HAGroupStore update throws. + // The event handler poisons itself, and the producer calls abort(). try { logGroup.sync(); fail("Should have thrown IOException because of failure to update mode"); } catch (IOException ex) { - assertTrue(ex.getMessage().contains("Simulated sync failure")); + assertTrue(ex.getMessage().contains("Simulated failure to update HAGroupStore state")); } } @@ -966,32 +966,30 @@ public void testRuntimeExceptionDuringLengthCheck() throws Exception { // Configure writer to throw RuntimeException on getLength() doThrow(new RuntimeException("Simulated critical error")).when(innerWriter).getLength(); - // Append data. This should trigger the LogExceptionHandler, which will close logWriter. - // The sync future times out, syncInternal wraps in PhoenixWALSyncTimeoutException and aborts. + // Append publishes to the ring buffer. The event handler catches the RuntimeException, + // poisons itself, and fails the sync future. The producer calls abort(). logGroup.append(tableName, commitId, put); try { logGroup.sync(); - fail("Should have thrown PhoenixWALSyncTimeoutException because sync timed out"); - } catch (PhoenixWALSyncTimeoutException e) { - // expected + fail("Should have thrown IOException from abort"); + } catch (IOException e) { + assertTrue(e.getMessage().contains("Unexpected error in event handler")); } - // Verify that subsequent operations fail because the log is closed + // Verify that subsequent operations fail try { logGroup.append(tableName, commitId + 1, put); - fail("Should have thrown IOException because log is closed"); + fail("Should have thrown IOException because log is poisoned/closed"); } catch (IOException e) { - assertTrue("Expected an IOException because log is closed", - e.getMessage().contains("Closed")); + // expected } - // Verify that the inner writer was closed by the LogExceptionHandler verify(innerWriter, times(1)).close(); } /** - * Tests behavior when a RuntimeException occurs during append() after closeOnError() has been - * called. Verifies that the system properly rejects sync operations after being closed. + * Tests behavior when a RuntimeException occurs during append and subsequent appends are + * rejected. Verifies that the system properly rejects operations after being poisoned/closed. */ @Test public void testAppendAfterCloseOnError() throws Exception { @@ -1007,31 +1005,30 @@ public void testAppendAfterCloseOnError() throws Exception { doThrow(new RuntimeException("Simulated critical error")).when(innerWriter).append(anyString(), anyLong(), any(Mutation.class)); - // Append data to trigger closeOnError() + // Append publishes to the ring buffer. The event handler catches the RuntimeException, + // poisons itself, and fails the sync future. The producer calls abort(). logGroup.append(tableName, commitId, put); try { logGroup.sync(); - fail("Should have thrown PhoenixWALSyncTimeoutException because sync timed out"); - } catch (PhoenixWALSyncTimeoutException e) { + fail("Should have thrown IOException from abort"); + } catch (IOException e) { // expected } - // Verify that subsequent append operations fail because the log is closed + // Verify that subsequent append operations fail because the log is poisoned/closed try { logGroup.append(tableName, commitId, put); - fail("Should have thrown IOException because log is closed"); + fail("Should have thrown IOException because log is poisoned/closed"); } catch (IOException e) { - assertTrue("Expected an IOException because log is closed", - e.getMessage().contains("Closed")); + // expected } - // Verify that the inner writer was closed by the LogExceptionHandler verify(innerWriter, times(1)).close(); } /** - * Tests behavior when a RuntimeException occurs during sync() after closeOnError() has been - * called. Verifies that the system properly rejects sync operations after being closed. + * Tests behavior when a RuntimeException occurs during append and subsequent syncs are rejected. + * Verifies that the system properly rejects operations after being poisoned/closed. */ @Test public void testSyncAfterCloseOnError() throws Exception { @@ -1047,25 +1044,24 @@ public void testSyncAfterCloseOnError() throws Exception { doThrow(new RuntimeException("Simulated critical error")).when(innerWriter).append(anyString(), anyLong(), any(Mutation.class)); - // Append data to trigger closeOnError() + // Append publishes to the ring buffer. The event handler catches the RuntimeException, + // poisons itself, and fails the sync future. The producer calls abort(). logGroup.append(tableName, commitId, put); try { logGroup.sync(); - fail("Should have thrown PhoenixWALSyncTimeoutException because sync timed out"); - } catch (PhoenixWALSyncTimeoutException e) { + fail("Should have thrown IOException from abort"); + } catch (IOException e) { // expected } - // Verify that subsequent sync operations fail because the log is closed + // Verify that subsequent sync operations fail because the log is poisoned/closed try { logGroup.sync(); - fail("Should have thrown IOException because log is closed"); + fail("Should have thrown IOException because log is poisoned/closed"); } catch (IOException e) { - assertTrue("Expected an IOException because log is closed", - e.getMessage().contains("Closed")); + // expected } - // Verify that the inner writer was closed by the LogExceptionHandler verify(innerWriter, times(1)).close(); }