diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java index 3f35b40cef8..ead41a69509 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java @@ -32,8 +32,11 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor; +import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices; import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment; import org.apache.hadoop.hbase.util.Bytes; @@ -62,12 +65,14 @@ /** * This is first implementation of RegionServer coprocessor introduced by Phoenix. */ +@CoreCoprocessor public class PhoenixRegionServerEndpoint extends RegionServerEndpointProtos.RegionServerEndpointService implements RegionServerCoprocessor { private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixRegionServerEndpoint.class); private MetricsMetadataCachingSource metricsSource; protected Configuration conf; protected ServerName serverName; + private Abortable abortable; private ExecutorService prewarmExecutor; // regionserver level thread pool used by Uncovered Indexes to scan data table rows @@ -79,6 +84,9 @@ public void start(CoprocessorEnvironment env) throws IOException { if (env instanceof RegionServerCoprocessorEnvironment) { this.serverName = ((RegionServerCoprocessorEnvironment) env).getServerName(); } + if (env instanceof HasRegionServerServices) { + this.abortable = ((HasRegionServerServices) env).getRegionServerServices(); + } this.metricsSource = MetricsPhoenixCoprocessorSourceFactory.getInstance().getMetadataCachingSource(); initUncoveredIndexThreadPool(this.conf); @@ -308,7 +316,7 @@ private void startHAGroupStoreClientPrewarming() { manager.getClusterRoleRecord(haGroup); if (shouldInitReplicationLogGroup) { try { - ReplicationLogGroup.get(conf, serverName, haGroup); + ReplicationLogGroup.get(conf, serverName, haGroup, abortable); LOGGER.info("Eagerly initialized ReplicationLogGroup {} on server {}", haGroup, serverName); } catch (Exception e) { diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java index 1ad0579a451..16fd9455725 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java @@ -53,6 +53,7 @@ import java.util.function.Predicate; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellScanner; @@ -70,6 +71,8 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor; +import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; @@ -170,6 +173,7 @@ * does batch mutations. *

*/ +@CoreCoprocessor public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { private static final Logger LOG = LoggerFactory.getLogger(IndexRegionObserver.class); @@ -439,6 +443,7 @@ public int getMaxPendingRowCount() { private static final int DEFAULT_CONCURRENT_MUTATION_WAIT_DURATION_IN_MS = 100; private byte[] encodedRegionName; private boolean shouldReplicate; + private Abortable abortable; // Don't replicate the mutation if this attribute is set private static final Predicate IGNORE_REPLICATION = @@ -541,6 +546,9 @@ public void start(CoprocessorEnvironment e) throws IOException { if (this.shouldReplicate) { this.ignoreReplicationFilter = getSynchronousReplicationFilter(tableName); } + if (e instanceof HasRegionServerServices) { + this.abortable = ((HasRegionServerServices) e).getRegionServerServices(); + } } catch (NoSuchMethodError ex) { disabled = true; LOG.error("Must be too early a version of HBase. Disabled coprocessor ", ex); @@ -689,7 +697,7 @@ private Optional getHAGroupFromBatch(RegionCoprocessorEnvir byte[] haGroupName = m.getAttribute(BaseScannerRegionObserverConstants.HA_GROUP_NAME_ATTRIB); if (haGroupName != null) { ReplicationLogGroup logGroup = ReplicationLogGroup.get(env.getConfiguration(), - env.getServerName(), Bytes.toString(haGroupName)); + env.getServerName(), Bytes.toString(haGroupName), abortable); return Optional.of(logGroup); } } @@ -707,7 +715,7 @@ private Optional getHAGroupFromWALKey(RegionCoprocessorEnvi logKey.getExtendedAttribute(BaseScannerRegionObserverConstants.HA_GROUP_NAME_ATTRIB); if (haGroupName != null) { ReplicationLogGroup logGroup = ReplicationLogGroup.get(env.getConfiguration(), - env.getServerName(), Bytes.toString(haGroupName)); + env.getServerName(), Bytes.toString(haGroupName), abortable); return Optional.of(logGroup); } return Optional.empty(); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java index 3c64e9ec61e..ccd0e82caf4 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java @@ -125,6 +125,7 @@ public void init() throws IOException { } public void close() { + replicationLogTracker.close(); if (this.metrics != null) { this.metrics.close(); } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java index ded1fec7b1b..1fe559b449c 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java @@ -56,6 +56,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.Mutation; import org.apache.phoenix.jdbc.HAGroupStoreManager; @@ -67,7 +68,6 @@ import org.slf4j.LoggerFactory; import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.apache.phoenix.thirdparty.com.google.common.base.Throwables; import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; @@ -171,6 +171,9 @@ public class ReplicationLogGroup { public static final long DEFAULT_REPLICATION_LOG_PEER_INIT_TIMEOUT_MS = 10_000L; public static final String WAL_SYNC_TIMEOUT_MS_KEY = "hbase.regionserver.wal.sync.timeout"; public static final long DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000L; + public static final String REPLICATION_LOG_GROUP_SHUTDOWN_TIMEOUT_MS_KEY = + "phoenix.replication.log.group.shutdown.timeout.ms"; + public static final long DEFAULT_REPLICATION_LOG_GROUP_SHUTDOWN_TIMEOUT_MS = 30_000L; public static final String STANDBY_DIR = "in"; public static final String FALLBACK_DIR = "out"; @@ -193,7 +196,9 @@ public class ReplicationLogGroup { protected ReplicationLogDiscoveryForwarder logForwarder; protected long syncTimeoutMs; protected long peerInitTimeoutMs; - protected volatile boolean closed = false; + protected final AtomicBoolean closed = new AtomicBoolean(false); + protected final Abortable abortable; + protected long shutdownTimeoutMs; /** * The replication mode determines how mutations are handled. Mode transitions occur automatically @@ -331,8 +336,6 @@ public Record(String tableName, long commitId, Mutation mutation) { protected Disruptor disruptor; protected RingBuffer ringBuffer; protected LogEventHandler eventHandler; - // Used to inform the disruptor event thread whether this is a graceful or a forced shutdown - private final AtomicBoolean gracefulShutdownEventHandlerFlag = new AtomicBoolean(); /** * Get or create a ReplicationLogGroup instance for the given HA Group. @@ -344,10 +347,25 @@ public Record(String tableName, long commitId, Mutation mutation) { */ public static ReplicationLogGroup get(Configuration conf, ServerName serverName, String haGroupName) throws IOException { + return get(conf, serverName, haGroupName, (Abortable) null); + } + + /** + * Get or create a ReplicationLogGroup instance for the given HA Group. + * @param conf Configuration object + * @param serverName The server name + * @param haGroupName The HA Group name + * @param abortable Abortable to invoke on fatal errors (typically RegionServerServices) + * @return ReplicationLogGroup instance + * @throws IOException if initialization fails + */ + public static ReplicationLogGroup get(Configuration conf, ServerName serverName, + String haGroupName, Abortable abortable) throws IOException { try { return INSTANCES.computeIfAbsent(haGroupName, k -> { try { - ReplicationLogGroup group = new ReplicationLogGroup(conf, serverName, haGroupName); + ReplicationLogGroup group = new ReplicationLogGroup(conf, serverName, haGroupName, + HAGroupStoreManager.getInstance(conf), abortable); group.init(); return group; } catch (IOException e) { @@ -390,12 +408,14 @@ public static ReplicationLogGroup get(Configuration conf, ServerName serverName, /** * Protected constructor for ReplicationLogGroup. - * @param conf Configuration object - * @param serverName The server name - * @param haGroupName The HA Group name + * @param conf Configuration object + * @param serverName The server name + * @param haGroupName The HA Group name + * @param haGroupStoreManager HA Group Store Manager instance */ - protected ReplicationLogGroup(Configuration conf, ServerName serverName, String haGroupName) { - this(conf, serverName, haGroupName, HAGroupStoreManager.getInstance(conf)); + protected ReplicationLogGroup(Configuration conf, ServerName serverName, String haGroupName, + HAGroupStoreManager haGroupStoreManager) { + this(conf, serverName, haGroupName, haGroupStoreManager, null); } /** @@ -404,9 +424,10 @@ protected ReplicationLogGroup(Configuration conf, ServerName serverName, String * @param serverName The server name * @param haGroupName The HA Group name * @param haGroupStoreManager HA Group Store Manager instance + * @param abortable Abortable to invoke on fatal errors (may be null) */ protected ReplicationLogGroup(Configuration conf, ServerName serverName, String haGroupName, - HAGroupStoreManager haGroupStoreManager) { + HAGroupStoreManager haGroupStoreManager, Abortable abortable) { // conf object from coprocessor is instance of // org.apache.hadoop.hbase.coprocessor.ReadOnlyConfiguration and we need to modify it when // we send rpc to namenode so copying it @@ -420,6 +441,9 @@ protected ReplicationLogGroup(Configuration conf, ServerName serverName, String this.serverName = serverName; this.haGroupName = haGroupName; this.haGroupStoreManager = haGroupStoreManager; + this.abortable = abortable; + this.shutdownTimeoutMs = clonedConf.getLong(REPLICATION_LOG_GROUP_SHUTDOWN_TIMEOUT_MS_KEY, + DEFAULT_REPLICATION_LOG_GROUP_SHUTDOWN_TIMEOUT_MS); this.metrics = createMetricsSource(); this.mode = new AtomicReference<>(INIT); } @@ -517,9 +541,13 @@ public void append(String tableName, long commitId, Mutation mutation) throws IO if (LOG.isTraceEnabled()) { LOG.trace("Append: table={}, commitId={}, mutation={}", tableName, commitId, mutation); } - if (closed) { + if (isClosed()) { throw new IOException("Closed"); } + IOException fatal = eventHandler.getFatalException(); + if (fatal != null) { + throw fatal; + } long startTime = System.nanoTime(); try { // ringBuffer.next() claims the next sequence number. Because we initialize the Disruptor @@ -557,9 +585,13 @@ public void sync() throws IOException { if (LOG.isTraceEnabled()) { LOG.trace("Sync"); } - if (closed) { + if (isClosed()) { throw new IOException("Closed"); } + IOException fatal = eventHandler.getFatalException(); + if (fatal != null) { + abort("ReplicationLogGroup has a fatal exception", fatal); + } long startTime = System.nanoTime(); try { syncInternal(); @@ -589,16 +621,15 @@ protected void syncInternal() throws IOException { Thread.currentThread().interrupt(); throw new InterruptedIOException("Interrupted while waiting for sync"); } catch (ExecutionException e) { - // After exhausting all attempts to sync to the standby cluster we switch mode - // and then retry again. If that also fails, it is a fatal error + Throwable cause = e.getCause(); String message = String.format("HAGroup %s sync operation failed", this); - LOG.error(message, e); - abort(message, e); + LOG.error(message, cause); + abort(message, cause); } catch (TimeoutException e) { String message = String.format("HAGroup %s replication log sync timed out after %d ms", this, syncTimeoutMs); LOG.error(message, e); - PhoenixWALSyncTimeoutException timeoutEx = new PhoenixWALSyncTimeoutException(message, e); + PhoenixWALSyncTimeoutException timeoutEx = new PhoenixWALSyncTimeoutException(message); abort(message, timeoutEx); } } @@ -608,67 +639,33 @@ protected void syncInternal() throws IOException { * @return true if closed, false otherwise */ public boolean isClosed() { - return closed; - } - - /** - * Force closes the log group upon an unrecoverable internal error. This is a fail-stop behavior: - * once called, the log group is marked as closed, the Disruptor is halted, and all subsequent - * append() and sync() calls will throw an IOException("Closed"). This ensures that no further - * operations are attempted on a log group that has encountered a critical error. - */ - protected void closeOnError() { - if (closed) { - return; - } - synchronized (this) { - if (closed) { - return; - } - // setting closed to true prevents future producers to add events to the ring buffer - closed = true; - } - // Directly halt the disruptor. shutdown() would wait for events to drain. We are expecting - // that will not work. - gracefulShutdownEventHandlerFlag.set(false); - disruptor.halt(); - metrics.close(); - LOG.info("HAGroup {} closed on error", this); + return closed.get(); } /** - * Close the ReplicationLogGroup and all associated resources. This method is thread-safe and can - * be called multiple times. + * Close the ReplicationLogGroup. Drains pending events from the Disruptor with a bounded timeout + * (which triggers onShutdown → modeImpl.onExit → ReplicationLog.close), then cleans up all + * resources. When the event handler has a fatal exception the drain completes instantly because + * each event hits the short-circuit path. The instance is removed from the INSTANCES cache and + * subsequent append()/sync() calls throw IOException. */ public void close() { - if (closed) { + if (!closed.compareAndSet(false, true)) { return; } - synchronized (this) { - if (closed) { - return; - } - LOG.info("Closing HAGroup {}", this); - // setting closed to true prevents future producers to add events to the ring buffer - closed = true; - // Remove from instances cache - INSTANCES.remove(haGroupName); - // Sync before shutting down to flush all pending appends. - try { - syncInternal(); - gracefulShutdownEventHandlerFlag.set(true); - // waits until all the events in the disruptor have been processed - disruptor.shutdown(); - } catch (IOException e) { - LOG.warn("Error during final sync on close", e); - gracefulShutdownEventHandlerFlag.set(false); - disruptor.halt(); // Go directly to halt. - } - // wait for the disruptor threads to finish - shutdownDisruptorExecutor(); - metrics.close(); - LOG.info("HAGroup {} closed", this); + LOG.info("Closing HAGroup {}", this); + INSTANCES.remove(haGroupName); + try { + disruptor.shutdown(shutdownTimeoutMs, TimeUnit.MILLISECONDS); + } catch (com.lmax.disruptor.TimeoutException e) { + LOG.warn("HAGroup {} shutdown timed out after {}ms, halting", this, shutdownTimeoutMs); + disruptor.halt(); } + shutdownDisruptorExecutor(); + logForwarder.stop(); + logForwarder.close(); + metrics.close(); + LOG.info("HAGroup {} closed", this); } /** @@ -902,20 +899,33 @@ protected long setHAGroupStatusToSync() throws Exception { } /** - * Abort when we hit a fatal error + * Throws the given cause as an IOException. If it already is one, rethrows directly; otherwise + * wraps it. */ - protected void abort(String reason, Throwable cause) { - // TODO better to use abort using RegionServerServices - String msg = "***** ABORTING region server: " + reason + " *****"; - if (cause != null) { - msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause); + static IOException asIOException(String msg, Throwable cause) { + if (cause instanceof IOException) { + return (IOException) cause; } - LOG.error(msg); - if (cause != null) { - throw new RuntimeException(msg, cause); - } else { - throw new RuntimeException(msg); + return cause != null ? new IOException(msg, cause) : new IOException(msg); + } + + /** + * Abort the region server due to an unrecoverable replication failure. Must be called from a + * producer thread (never the event handler thread). Sets the fatal exception so remaining events + * drain instantly, then closes all resources and invokes the Abortable to trigger RS shutdown. + * Always throws so the caller unwinds. + */ + protected void abort(String reason, Throwable cause) throws IOException { + String msg = "Aborting region server due to replication failure: " + reason; + LOG.error(msg, cause); + // Idempotent; may already be set by the event handler (ExecutionException path) + // but is needed here for the TimeoutException path where the event handler is still alive. + eventHandler.setFatalException(asIOException(msg, cause)); + close(); + if (abortable != null) { + abortable.abort(msg, cause); } + throw asIOException(msg, cause); } /** @@ -923,12 +933,24 @@ protected void abort(String reason, Throwable cause) { */ protected class LogEventHandler implements EventHandler, LifecycleAware { private final List> pendingSyncFutures = new ArrayList<>(); - // Current replication mode implementation which will handle the events private ReplicationModeImpl currentModeImpl; + private volatile IOException fatalException; public LogEventHandler() { } + void setFatalException(IOException cause) { + if (fatalException != null) { + return; + } + LOG.error("HAGroup {} event handler hit fatal exception", ReplicationLogGroup.this, cause); + this.fatalException = cause; + } + + IOException getFatalException() { + return fatalException; + } + public void init() throws IOException { initializeMode(getMode()); } @@ -996,7 +1018,8 @@ private void processPendingSyncs(long sequence) throws IOException { // is not processing any event like append/sync because this is the only thread // that is consuming the events from the ring buffer and handing them off to the // mode - currentModeImpl.onExit(true); + ReplicationModeImpl oldModeImpl = currentModeImpl; + disruptorExecutor.execute(() -> oldModeImpl.onExit(true)); initializeMode(newMode); } } @@ -1086,25 +1109,27 @@ private void replayFailedEvent(LogEvent failedEvent, long sequence) throws IOExc */ @Override public void onEvent(LogEvent event, long sequence, boolean endOfBatch) throws Exception { - // Calculate time spent in ring buffer long currentTimeNs = System.nanoTime(); long ringBufferTimeNs = currentTimeNs - event.timestampNs; metrics.updateRingBufferTime(ringBufferTimeNs); + if (fatalException != null) { + // Append events are ignored; sync futures are failed immediately + // so producer threads unblock without waiting for the sync timeout. + if (event.type == EVENT_TYPE_SYNC) { + event.syncFuture.completeExceptionally(fatalException); + } + return; + } try { switch (event.type) { case EVENT_TYPE_DATA: currentModeImpl.append(event.record); - // Process any pending syncs at the end of batch. if (endOfBatch) { processPendingSyncs(sequence); } return; case EVENT_TYPE_SYNC: - // Add this sync future to the pending list - // OK, to add the same future multiple times when we rewind the batch - // as completing an already completed future is a no-op pendingSyncFutures.add(event.syncFuture); - // Process any pending syncs at the end of batch. if (endOfBatch) { processPendingSyncs(sequence); } @@ -1118,17 +1143,16 @@ public void onEvent(LogEvent event, long sequence, boolean endOfBatch) throws Ex e); onFailure(event, sequence, e); } catch (Exception fatalEx) { - // Either we failed to switch the mode or we are in STORE_AND_FORWARD mode - // and got an exception. This is a fatal exception so halt the disruptor - // fail the pending sync events with the original exception - failPendingSyncs(sequence, e); - // Both SYNC and SAF writes failed. The local HBase WAL has the mutation but neither - // replication pipeline does, so the SAF forwarder cannot reconcile it. Abort the RS - // so region reassignment + preWALRestore re-ships the orphaned edits. - abort("Both SYNC and SAF replication writes failed", fatalEx); - // halt the disruptor with the fatal exception - throw fatalEx; + IOException fatalIOE = + asIOException("Both SYNC and SAF replication writes failed", fatalEx); + setFatalException(fatalIOE); + failPendingSyncs(sequence, fatalIOE); } + } catch (Throwable t) { + IOException wrapped = + new IOException("Unexpected error in event handler at sequence " + sequence, t); + setFatalException(wrapped); + failPendingSyncs(sequence, wrapped); } } @@ -1139,36 +1163,35 @@ public void onStart() { @Override public void onShutdown() { - boolean isGracefulShutdown = gracefulShutdownEventHandlerFlag.get(); + boolean graceful = (fatalException == null); LOG.info("HAGroup {} shutting down event handler graceful={}", ReplicationLogGroup.this, - isGracefulShutdown); - currentModeImpl.onExit(isGracefulShutdown); + graceful); + currentModeImpl.onExit(graceful); } } /** - * Handler for critical errors during the Disruptor lifecycle that closes the writer to prevent - * data loss. + * Safety-net handler for exceptions that escape the event handler. Should never fire because + * onEvent catches all Throwables. If it does fire, sets a fatal exception so pending futures + * fail. */ protected class LogExceptionHandler implements ExceptionHandler { @Override public void handleEventException(Throwable e, long sequence, LogEvent event) { - String message = "Exception processing sequence " + sequence + " for event " + event; - LOG.error(message, e); - closeOnError(); + LOG.error("UNEXPECTED: Exception escaped onEvent at sequence {}", sequence, e); + eventHandler + .setFatalException(new IOException("Exception escaped to LogExceptionHandler", e)); } @Override public void handleOnStartException(Throwable e) { LOG.error("Exception during Disruptor startup", e); - closeOnError(); + eventHandler.setFatalException(new IOException("Disruptor startup failed", e)); } @Override public void handleOnShutdownException(Throwable e) { - // Should not happen, but if it does, the regionserver is aborting or shutting down. LOG.error("Exception during Disruptor shutdown", e); - closeOnError(); } } } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationModeImpl.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationModeImpl.java index 12385131353..114e519b428 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationModeImpl.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationModeImpl.java @@ -20,6 +20,8 @@ import java.io.IOException; import org.apache.phoenix.replication.ReplicationLogGroup.Record; import org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; @@ -31,6 +33,7 @@ *

*/ public abstract class ReplicationModeImpl { + private static final Logger LOG = LoggerFactory.getLogger(ReplicationModeImpl.class); protected final ReplicationLogGroup logGroup; // The mode manages the underlying log to which the append and sync events will be sent @@ -91,4 +94,17 @@ void closeReplicationLog(boolean graceful) { log.close(graceful); } } + + protected ReplicationLogGroup.ReplicationMode transitionToStoreAndForward() throws IOException { + logGroup.getMetrics().incrementSyncToSafTransitions(); + try { + logGroup.setHAGroupStatusToStoreAndForward(); + } catch (Exception ex) { + String message = + String.format("HAGroup %s could not update status to STORE_AND_FORWARD", logGroup); + LOG.error(message, ex); + throw ReplicationLogGroup.asIOException(message, ex); + } + return ReplicationLogGroup.ReplicationMode.STORE_AND_FORWARD; + } } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java index 7bcb4c89e09..8f205ea7673 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java @@ -110,12 +110,10 @@ void onExit(boolean gracefulShutdown) { @Override ReplicationMode onFailure(Throwable e) throws IOException { - // Treating failures in STORE_AND_FORWARD mode as fatal errors + // Failures in STORE_AND_FORWARD mode are fatal — throw to let the event handler abort String message = String.format("HAGroup %s mode=%s got error", logGroup, this); LOG.error(message, e); - logGroup.abort(message, e); - // unreachable, we remain in the same mode - return STORE_AND_FORWARD; + throw ReplicationLogGroup.asIOException(message, e); } @Override diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java index 02c57f7f3fd..6c85ee27d47 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java @@ -17,7 +17,6 @@ */ package org.apache.phoenix.replication; -import static org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode.STORE_AND_FORWARD; import static org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode.SYNC_AND_FORWARD; import java.io.IOException; @@ -62,17 +61,7 @@ void onExit(boolean gracefulShutdown) { @Override ReplicationMode onFailure(Throwable e) throws IOException { LOG.info("HAGroup {} mode={} got error", logGroup, this, e); - logGroup.getMetrics().incrementSyncToSafTransitions(); - try { - logGroup.setHAGroupStatusToStoreAndForward(); - } catch (Exception ex) { - // Fatal error when we can't update the HAGroup status - String message = - String.format("HAGroup %s could not update status to STORE_AND_FORWARD", logGroup); - LOG.error(message, ex); - logGroup.abort(message, ex); - } - return STORE_AND_FORWARD; + return transitionToStoreAndForward(); } @Override diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java index 0a5b5a48c66..ca258a5b760 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java @@ -17,7 +17,6 @@ */ package org.apache.phoenix.replication; -import static org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode.STORE_AND_FORWARD; import static org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode.SYNC; import java.io.IOException; @@ -57,18 +56,7 @@ void onExit(boolean gracefulShutdown) { @Override ReplicationMode onFailure(Throwable e) throws IOException { LOG.info("HAGroup {} mode={} got error", logGroup, this, e); - logGroup.getMetrics().incrementSyncToSafTransitions(); - try { - // first update the HAGroupStore state - logGroup.setHAGroupStatusToStoreAndForward(); - } catch (Exception ex) { - // Fatal error when we can't update the HAGroup status - String message = - String.format("HAGroup %s could not update status to STORE_AND_FORWARD", logGroup); - LOG.error(message, ex); - logGroup.abort(message, ex); - } - return STORE_AND_FORWARD; + return transitionToStoreAndForward(); } @Override diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplay.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplay.java index 88eeb791e31..d062aa8c530 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplay.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplay.java @@ -128,7 +128,6 @@ protected void init() throws IOException { public void close() { LOG.info("Closing ReplicationLogReplay for haGroup: {}", haGroupName); - replicationLogDiscoveryReplay.getReplicationLogFileTracker().close(); replicationLogDiscoveryReplay.close(); // Remove the instance from cache INSTANCES.remove(haGroupName); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java index 2bf723defd0..ea8fa451ed2 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java @@ -263,13 +263,12 @@ public Object answer(InvocationOnMock invocation) throws Throwable { logGroup.append(tableName, commitId, put); // sync on the writer will timeout — syncInternal wraps in PhoenixWALSyncTimeoutException - // and calls abort() which throws RuntimeException + // and calls abort() which rethrows the IOException cause directly try { logGroup.sync(); - fail("Should have thrown RuntimeException because sync timed out"); - } catch (RuntimeException e) { - assertTrue("Expected PhoenixWALSyncTimeoutException cause", - e.getCause() instanceof PhoenixWALSyncTimeoutException); + fail("Should have thrown PhoenixWALSyncTimeoutException because sync timed out"); + } catch (PhoenixWALSyncTimeoutException e) { + // expected } // reset doNothing().when(innerWriter).sync(); @@ -623,27 +622,25 @@ public void testEventProcessingException() throws Exception { doThrow(new RuntimeException("Simulated critical error")).when(innerWriter).append(anyString(), anyLong(), any(Mutation.class)); - // Append data. This should trigger the LogExceptionHandler, which will close logWriter. - // The sync future times out, syncInternal wraps in PhoenixWALSyncTimeoutException and aborts. + // Append publishes to the ring buffer. The event handler catches the RuntimeException via + // catch(Throwable), poisons itself, and fails the sync future. The producer receives + // ExecutionException and calls abort(). logGroup.append(tableName, commitId, put); try { logGroup.sync(); - fail("Should have thrown RuntimeException because sync timed out"); - } catch (RuntimeException e) { - assertTrue("Expected PhoenixWALSyncTimeoutException cause", - e.getCause() instanceof PhoenixWALSyncTimeoutException); + fail("Should have thrown IOException from abort"); + } catch (IOException e) { + assertTrue(e.getMessage().contains("Unexpected error in event handler")); } - // Verify that subsequent operations fail because the log is closed + // Verify that subsequent operations fail try { logGroup.append(tableName, commitId + 1, put); - fail("Should have thrown IOException because log is closed"); + fail("Should have thrown IOException because log is poisoned/closed"); } catch (IOException e) { - assertTrue("Expected an IOException because log is closed", - e.getMessage().contains("Closed")); + // expected } - // Verify that the inner writer was closed by the LogExceptionHandler verify(innerWriter, times(1)).close(); } @@ -716,15 +713,14 @@ public void testFailToUpdateHAGroupStatusOnSwitchToStoreAndForward() throws Exce // Append data logGroup.append(tableName, commitId, put); - // Try to sync. Should fail after exhausting retries and then switch to STORE_AND_FORWARD + // Sync fails, mode transition to SAF also fails because HAGroupStore update throws. + // The event handler poisons itself, and the producer calls abort(). try { logGroup.sync(); - fail("Should have thrown exception because of failure to update mode"); - } catch (RuntimeException ex) { - assertTrue(ex.getMessage().contains("Simulated sync failure")); + fail("Should have thrown IOException because of failure to update mode"); + } catch (IOException ex) { + assertTrue(ex.getMessage().contains("Simulated failure to update HAGroupStore state")); } - // wait for the event processor thread to clean up - Thread.sleep(3); } /** @@ -970,33 +966,30 @@ public void testRuntimeExceptionDuringLengthCheck() throws Exception { // Configure writer to throw RuntimeException on getLength() doThrow(new RuntimeException("Simulated critical error")).when(innerWriter).getLength(); - // Append data. This should trigger the LogExceptionHandler, which will close logWriter. - // The sync future times out, syncInternal wraps in PhoenixWALSyncTimeoutException and aborts. + // Append publishes to the ring buffer. The event handler catches the RuntimeException, + // poisons itself, and fails the sync future. The producer calls abort(). logGroup.append(tableName, commitId, put); try { logGroup.sync(); - fail("Should have thrown RuntimeException because sync timed out"); - } catch (RuntimeException e) { - assertTrue("Expected PhoenixWALSyncTimeoutException cause", - e.getCause() instanceof PhoenixWALSyncTimeoutException); + fail("Should have thrown IOException from abort"); + } catch (IOException e) { + assertTrue(e.getMessage().contains("Unexpected error in event handler")); } - // Verify that subsequent operations fail because the log is closed + // Verify that subsequent operations fail try { logGroup.append(tableName, commitId + 1, put); - fail("Should have thrown IOException because log is closed"); + fail("Should have thrown IOException because log is poisoned/closed"); } catch (IOException e) { - assertTrue("Expected an IOException because log is closed", - e.getMessage().contains("Closed")); + // expected } - // Verify that the inner writer was closed by the LogExceptionHandler verify(innerWriter, times(1)).close(); } /** - * Tests behavior when a RuntimeException occurs during append() after closeOnError() has been - * called. Verifies that the system properly rejects sync operations after being closed. + * Tests behavior when a RuntimeException occurs during append and subsequent appends are + * rejected. Verifies that the system properly rejects operations after being poisoned/closed. */ @Test public void testAppendAfterCloseOnError() throws Exception { @@ -1012,32 +1005,30 @@ public void testAppendAfterCloseOnError() throws Exception { doThrow(new RuntimeException("Simulated critical error")).when(innerWriter).append(anyString(), anyLong(), any(Mutation.class)); - // Append data to trigger closeOnError() + // Append publishes to the ring buffer. The event handler catches the RuntimeException, + // poisons itself, and fails the sync future. The producer calls abort(). logGroup.append(tableName, commitId, put); try { logGroup.sync(); - fail("Should have thrown RuntimeException because sync timed out"); - } catch (RuntimeException e) { - assertTrue("Expected PhoenixWALSyncTimeoutException cause", - e.getCause() instanceof PhoenixWALSyncTimeoutException); + fail("Should have thrown IOException from abort"); + } catch (IOException e) { + // expected } - // Verify that subsequent append operations fail because the log is closed + // Verify that subsequent append operations fail because the log is poisoned/closed try { logGroup.append(tableName, commitId, put); - fail("Should have thrown IOException because log is closed"); + fail("Should have thrown IOException because log is poisoned/closed"); } catch (IOException e) { - assertTrue("Expected an IOException because log is closed", - e.getMessage().contains("Closed")); + // expected } - // Verify that the inner writer was closed by the LogExceptionHandler verify(innerWriter, times(1)).close(); } /** - * Tests behavior when a RuntimeException occurs during sync() after closeOnError() has been - * called. Verifies that the system properly rejects sync operations after being closed. + * Tests behavior when a RuntimeException occurs during append and subsequent syncs are rejected. + * Verifies that the system properly rejects operations after being poisoned/closed. */ @Test public void testSyncAfterCloseOnError() throws Exception { @@ -1053,26 +1044,24 @@ public void testSyncAfterCloseOnError() throws Exception { doThrow(new RuntimeException("Simulated critical error")).when(innerWriter).append(anyString(), anyLong(), any(Mutation.class)); - // Append data to trigger closeOnError() + // Append publishes to the ring buffer. The event handler catches the RuntimeException, + // poisons itself, and fails the sync future. The producer calls abort(). logGroup.append(tableName, commitId, put); try { logGroup.sync(); - fail("Should have thrown RuntimeException because sync timed out"); - } catch (RuntimeException e) { - assertTrue("Expected PhoenixWALSyncTimeoutException cause", - e.getCause() instanceof PhoenixWALSyncTimeoutException); + fail("Should have thrown IOException from abort"); + } catch (IOException e) { + // expected } - // Verify that subsequent sync operations fail because the log is closed + // Verify that subsequent sync operations fail because the log is poisoned/closed try { logGroup.sync(); - fail("Should have thrown IOException because log is closed"); + fail("Should have thrown IOException because log is poisoned/closed"); } catch (IOException e) { - assertTrue("Expected an IOException because log is closed", - e.getMessage().contains("Closed")); + // expected } - // Verify that the inner writer was closed by the LogExceptionHandler verify(innerWriter, times(1)).close(); } @@ -1721,11 +1710,10 @@ public void testBothSyncAndSafFailuresTriggersAbort() throws Exception { logGroup.append(tableName, commitId, put); try { logGroup.sync(); - fail("Should have thrown RuntimeException from abort"); - } catch (RuntimeException e) { - assertTrue("Abort message should mention both SYNC and SAF", - e.getMessage().contains("Both SYNC and SAF replication writes failed") - || e.getMessage().contains("ABORTING")); + fail("Should have thrown IOException from abort"); + } catch (IOException e) { + assertTrue("Abort message should contain sync failure", + e.getMessage().contains("Simulated sync failure")); } } @@ -1763,10 +1751,10 @@ public void testSafBothAttemptsFailTriggersAbort() throws Exception { logGroup.append(tableName, commitId, put); try { logGroup.sync(); - fail("Should have thrown RuntimeException from abort"); - } catch (RuntimeException e) { + fail("Should have thrown IOException from abort"); + } catch (IOException e) { assertTrue("Abort should fire from SAF failure path", - e.getMessage().contains("ABORTING") || e.getMessage().contains("got error")); + e.getMessage().contains("Simulated SAF sync failure")); } }