diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
index 3f35b40cef8..ead41a69509 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
@@ -32,8 +32,11 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Bytes;
@@ -62,12 +65,14 @@
/**
* This is first implementation of RegionServer coprocessor introduced by Phoenix.
*/
+@CoreCoprocessor
public class PhoenixRegionServerEndpoint extends
RegionServerEndpointProtos.RegionServerEndpointService implements RegionServerCoprocessor {
private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixRegionServerEndpoint.class);
private MetricsMetadataCachingSource metricsSource;
protected Configuration conf;
protected ServerName serverName;
+ private Abortable abortable;
private ExecutorService prewarmExecutor;
// regionserver level thread pool used by Uncovered Indexes to scan data table rows
@@ -79,6 +84,9 @@ public void start(CoprocessorEnvironment env) throws IOException {
if (env instanceof RegionServerCoprocessorEnvironment) {
this.serverName = ((RegionServerCoprocessorEnvironment) env).getServerName();
}
+ if (env instanceof HasRegionServerServices) {
+ this.abortable = ((HasRegionServerServices) env).getRegionServerServices();
+ }
this.metricsSource =
MetricsPhoenixCoprocessorSourceFactory.getInstance().getMetadataCachingSource();
initUncoveredIndexThreadPool(this.conf);
@@ -308,7 +316,7 @@ private void startHAGroupStoreClientPrewarming() {
manager.getClusterRoleRecord(haGroup);
if (shouldInitReplicationLogGroup) {
try {
- ReplicationLogGroup.get(conf, serverName, haGroup);
+ ReplicationLogGroup.get(conf, serverName, haGroup, abortable);
LOGGER.info("Eagerly initialized ReplicationLogGroup {} on server {}", haGroup,
serverName);
} catch (Exception e) {
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 1ad0579a451..16fd9455725 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -53,6 +53,7 @@
import java.util.function.Predicate;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellScanner;
@@ -70,6 +71,8 @@
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -170,6 +173,7 @@
* does batch mutations.
*
*/
+@CoreCoprocessor
public class IndexRegionObserver implements RegionCoprocessor, RegionObserver {
private static final Logger LOG = LoggerFactory.getLogger(IndexRegionObserver.class);
@@ -439,6 +443,7 @@ public int getMaxPendingRowCount() {
private static final int DEFAULT_CONCURRENT_MUTATION_WAIT_DURATION_IN_MS = 100;
private byte[] encodedRegionName;
private boolean shouldReplicate;
+ private Abortable abortable;
// Don't replicate the mutation if this attribute is set
private static final Predicate IGNORE_REPLICATION =
@@ -541,6 +546,9 @@ public void start(CoprocessorEnvironment e) throws IOException {
if (this.shouldReplicate) {
this.ignoreReplicationFilter = getSynchronousReplicationFilter(tableName);
}
+ if (e instanceof HasRegionServerServices) {
+ this.abortable = ((HasRegionServerServices) e).getRegionServerServices();
+ }
} catch (NoSuchMethodError ex) {
disabled = true;
LOG.error("Must be too early a version of HBase. Disabled coprocessor ", ex);
@@ -689,7 +697,7 @@ private Optional getHAGroupFromBatch(RegionCoprocessorEnvir
byte[] haGroupName = m.getAttribute(BaseScannerRegionObserverConstants.HA_GROUP_NAME_ATTRIB);
if (haGroupName != null) {
ReplicationLogGroup logGroup = ReplicationLogGroup.get(env.getConfiguration(),
- env.getServerName(), Bytes.toString(haGroupName));
+ env.getServerName(), Bytes.toString(haGroupName), abortable);
return Optional.of(logGroup);
}
}
@@ -707,7 +715,7 @@ private Optional getHAGroupFromWALKey(RegionCoprocessorEnvi
logKey.getExtendedAttribute(BaseScannerRegionObserverConstants.HA_GROUP_NAME_ATTRIB);
if (haGroupName != null) {
ReplicationLogGroup logGroup = ReplicationLogGroup.get(env.getConfiguration(),
- env.getServerName(), Bytes.toString(haGroupName));
+ env.getServerName(), Bytes.toString(haGroupName), abortable);
return Optional.of(logGroup);
}
return Optional.empty();
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
index 3c64e9ec61e..ccd0e82caf4 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
@@ -125,6 +125,7 @@ public void init() throws IOException {
}
public void close() {
+ replicationLogTracker.close();
if (this.metrics != null) {
this.metrics.close();
}
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
index ded1fec7b1b..1fe559b449c 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
@@ -56,6 +56,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.phoenix.jdbc.HAGroupStoreManager;
@@ -67,7 +68,6 @@
import org.slf4j.LoggerFactory;
import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.phoenix.thirdparty.com.google.common.base.Throwables;
import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
@@ -171,6 +171,9 @@ public class ReplicationLogGroup {
public static final long DEFAULT_REPLICATION_LOG_PEER_INIT_TIMEOUT_MS = 10_000L;
public static final String WAL_SYNC_TIMEOUT_MS_KEY = "hbase.regionserver.wal.sync.timeout";
public static final long DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000L;
+ public static final String REPLICATION_LOG_GROUP_SHUTDOWN_TIMEOUT_MS_KEY =
+ "phoenix.replication.log.group.shutdown.timeout.ms";
+ public static final long DEFAULT_REPLICATION_LOG_GROUP_SHUTDOWN_TIMEOUT_MS = 30_000L;
public static final String STANDBY_DIR = "in";
public static final String FALLBACK_DIR = "out";
@@ -193,7 +196,9 @@ public class ReplicationLogGroup {
protected ReplicationLogDiscoveryForwarder logForwarder;
protected long syncTimeoutMs;
protected long peerInitTimeoutMs;
- protected volatile boolean closed = false;
+ protected final AtomicBoolean closed = new AtomicBoolean(false);
+ protected final Abortable abortable;
+ protected long shutdownTimeoutMs;
/**
* The replication mode determines how mutations are handled. Mode transitions occur automatically
@@ -331,8 +336,6 @@ public Record(String tableName, long commitId, Mutation mutation) {
protected Disruptor disruptor;
protected RingBuffer ringBuffer;
protected LogEventHandler eventHandler;
- // Used to inform the disruptor event thread whether this is a graceful or a forced shutdown
- private final AtomicBoolean gracefulShutdownEventHandlerFlag = new AtomicBoolean();
/**
* Get or create a ReplicationLogGroup instance for the given HA Group.
@@ -344,10 +347,25 @@ public Record(String tableName, long commitId, Mutation mutation) {
*/
public static ReplicationLogGroup get(Configuration conf, ServerName serverName,
String haGroupName) throws IOException {
+ return get(conf, serverName, haGroupName, (Abortable) null);
+ }
+
+ /**
+ * Get or create a ReplicationLogGroup instance for the given HA Group.
+ * @param conf Configuration object
+ * @param serverName The server name
+ * @param haGroupName The HA Group name
+ * @param abortable Abortable to invoke on fatal errors (typically RegionServerServices)
+ * @return ReplicationLogGroup instance
+ * @throws IOException if initialization fails
+ */
+ public static ReplicationLogGroup get(Configuration conf, ServerName serverName,
+ String haGroupName, Abortable abortable) throws IOException {
try {
return INSTANCES.computeIfAbsent(haGroupName, k -> {
try {
- ReplicationLogGroup group = new ReplicationLogGroup(conf, serverName, haGroupName);
+ ReplicationLogGroup group = new ReplicationLogGroup(conf, serverName, haGroupName,
+ HAGroupStoreManager.getInstance(conf), abortable);
group.init();
return group;
} catch (IOException e) {
@@ -390,12 +408,14 @@ public static ReplicationLogGroup get(Configuration conf, ServerName serverName,
/**
* Protected constructor for ReplicationLogGroup.
- * @param conf Configuration object
- * @param serverName The server name
- * @param haGroupName The HA Group name
+ * @param conf Configuration object
+ * @param serverName The server name
+ * @param haGroupName The HA Group name
+ * @param haGroupStoreManager HA Group Store Manager instance
*/
- protected ReplicationLogGroup(Configuration conf, ServerName serverName, String haGroupName) {
- this(conf, serverName, haGroupName, HAGroupStoreManager.getInstance(conf));
+ protected ReplicationLogGroup(Configuration conf, ServerName serverName, String haGroupName,
+ HAGroupStoreManager haGroupStoreManager) {
+ this(conf, serverName, haGroupName, haGroupStoreManager, null);
}
/**
@@ -404,9 +424,10 @@ protected ReplicationLogGroup(Configuration conf, ServerName serverName, String
* @param serverName The server name
* @param haGroupName The HA Group name
* @param haGroupStoreManager HA Group Store Manager instance
+ * @param abortable Abortable to invoke on fatal errors (may be null)
*/
protected ReplicationLogGroup(Configuration conf, ServerName serverName, String haGroupName,
- HAGroupStoreManager haGroupStoreManager) {
+ HAGroupStoreManager haGroupStoreManager, Abortable abortable) {
// conf object from coprocessor is instance of
// org.apache.hadoop.hbase.coprocessor.ReadOnlyConfiguration and we need to modify it when
// we send rpc to namenode so copying it
@@ -420,6 +441,9 @@ protected ReplicationLogGroup(Configuration conf, ServerName serverName, String
this.serverName = serverName;
this.haGroupName = haGroupName;
this.haGroupStoreManager = haGroupStoreManager;
+ this.abortable = abortable;
+ this.shutdownTimeoutMs = clonedConf.getLong(REPLICATION_LOG_GROUP_SHUTDOWN_TIMEOUT_MS_KEY,
+ DEFAULT_REPLICATION_LOG_GROUP_SHUTDOWN_TIMEOUT_MS);
this.metrics = createMetricsSource();
this.mode = new AtomicReference<>(INIT);
}
@@ -517,9 +541,13 @@ public void append(String tableName, long commitId, Mutation mutation) throws IO
if (LOG.isTraceEnabled()) {
LOG.trace("Append: table={}, commitId={}, mutation={}", tableName, commitId, mutation);
}
- if (closed) {
+ if (isClosed()) {
throw new IOException("Closed");
}
+ IOException fatal = eventHandler.getFatalException();
+ if (fatal != null) {
+ throw fatal;
+ }
long startTime = System.nanoTime();
try {
// ringBuffer.next() claims the next sequence number. Because we initialize the Disruptor
@@ -557,9 +585,13 @@ public void sync() throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Sync");
}
- if (closed) {
+ if (isClosed()) {
throw new IOException("Closed");
}
+ IOException fatal = eventHandler.getFatalException();
+ if (fatal != null) {
+ abort("ReplicationLogGroup has a fatal exception", fatal);
+ }
long startTime = System.nanoTime();
try {
syncInternal();
@@ -589,16 +621,15 @@ protected void syncInternal() throws IOException {
Thread.currentThread().interrupt();
throw new InterruptedIOException("Interrupted while waiting for sync");
} catch (ExecutionException e) {
- // After exhausting all attempts to sync to the standby cluster we switch mode
- // and then retry again. If that also fails, it is a fatal error
+ Throwable cause = e.getCause();
String message = String.format("HAGroup %s sync operation failed", this);
- LOG.error(message, e);
- abort(message, e);
+ LOG.error(message, cause);
+ abort(message, cause);
} catch (TimeoutException e) {
String message =
String.format("HAGroup %s replication log sync timed out after %d ms", this, syncTimeoutMs);
LOG.error(message, e);
- PhoenixWALSyncTimeoutException timeoutEx = new PhoenixWALSyncTimeoutException(message, e);
+ PhoenixWALSyncTimeoutException timeoutEx = new PhoenixWALSyncTimeoutException(message);
abort(message, timeoutEx);
}
}
@@ -608,67 +639,33 @@ protected void syncInternal() throws IOException {
* @return true if closed, false otherwise
*/
public boolean isClosed() {
- return closed;
- }
-
- /**
- * Force closes the log group upon an unrecoverable internal error. This is a fail-stop behavior:
- * once called, the log group is marked as closed, the Disruptor is halted, and all subsequent
- * append() and sync() calls will throw an IOException("Closed"). This ensures that no further
- * operations are attempted on a log group that has encountered a critical error.
- */
- protected void closeOnError() {
- if (closed) {
- return;
- }
- synchronized (this) {
- if (closed) {
- return;
- }
- // setting closed to true prevents future producers to add events to the ring buffer
- closed = true;
- }
- // Directly halt the disruptor. shutdown() would wait for events to drain. We are expecting
- // that will not work.
- gracefulShutdownEventHandlerFlag.set(false);
- disruptor.halt();
- metrics.close();
- LOG.info("HAGroup {} closed on error", this);
+ return closed.get();
}
/**
- * Close the ReplicationLogGroup and all associated resources. This method is thread-safe and can
- * be called multiple times.
+ * Close the ReplicationLogGroup. Drains pending events from the Disruptor with a bounded timeout
+ * (which triggers onShutdown → modeImpl.onExit → ReplicationLog.close), then cleans up all
+ * resources. When the event handler has a fatal exception the drain completes instantly because
+ * each event hits the short-circuit path. The instance is removed from the INSTANCES cache and
+ * subsequent append()/sync() calls throw IOException.
*/
public void close() {
- if (closed) {
+ if (!closed.compareAndSet(false, true)) {
return;
}
- synchronized (this) {
- if (closed) {
- return;
- }
- LOG.info("Closing HAGroup {}", this);
- // setting closed to true prevents future producers to add events to the ring buffer
- closed = true;
- // Remove from instances cache
- INSTANCES.remove(haGroupName);
- // Sync before shutting down to flush all pending appends.
- try {
- syncInternal();
- gracefulShutdownEventHandlerFlag.set(true);
- // waits until all the events in the disruptor have been processed
- disruptor.shutdown();
- } catch (IOException e) {
- LOG.warn("Error during final sync on close", e);
- gracefulShutdownEventHandlerFlag.set(false);
- disruptor.halt(); // Go directly to halt.
- }
- // wait for the disruptor threads to finish
- shutdownDisruptorExecutor();
- metrics.close();
- LOG.info("HAGroup {} closed", this);
+ LOG.info("Closing HAGroup {}", this);
+ INSTANCES.remove(haGroupName);
+ try {
+ disruptor.shutdown(shutdownTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (com.lmax.disruptor.TimeoutException e) {
+ LOG.warn("HAGroup {} shutdown timed out after {}ms, halting", this, shutdownTimeoutMs);
+ disruptor.halt();
}
+ shutdownDisruptorExecutor();
+ logForwarder.stop();
+ logForwarder.close();
+ metrics.close();
+ LOG.info("HAGroup {} closed", this);
}
/**
@@ -902,20 +899,33 @@ protected long setHAGroupStatusToSync() throws Exception {
}
/**
- * Abort when we hit a fatal error
+ * Throws the given cause as an IOException. If it already is one, rethrows directly; otherwise
+ * wraps it.
*/
- protected void abort(String reason, Throwable cause) {
- // TODO better to use abort using RegionServerServices
- String msg = "***** ABORTING region server: " + reason + " *****";
- if (cause != null) {
- msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause);
+ static IOException asIOException(String msg, Throwable cause) {
+ if (cause instanceof IOException) {
+ return (IOException) cause;
}
- LOG.error(msg);
- if (cause != null) {
- throw new RuntimeException(msg, cause);
- } else {
- throw new RuntimeException(msg);
+ return cause != null ? new IOException(msg, cause) : new IOException(msg);
+ }
+
+ /**
+ * Abort the region server due to an unrecoverable replication failure. Must be called from a
+ * producer thread (never the event handler thread). Sets the fatal exception so remaining events
+ * drain instantly, then closes all resources and invokes the Abortable to trigger RS shutdown.
+ * Always throws so the caller unwinds.
+ */
+ protected void abort(String reason, Throwable cause) throws IOException {
+ String msg = "Aborting region server due to replication failure: " + reason;
+ LOG.error(msg, cause);
+ // Idempotent; may already be set by the event handler (ExecutionException path)
+ // but is needed here for the TimeoutException path where the event handler is still alive.
+ eventHandler.setFatalException(asIOException(msg, cause));
+ close();
+ if (abortable != null) {
+ abortable.abort(msg, cause);
}
+ throw asIOException(msg, cause);
}
/**
@@ -923,12 +933,24 @@ protected void abort(String reason, Throwable cause) {
*/
protected class LogEventHandler implements EventHandler, LifecycleAware {
private final List> pendingSyncFutures = new ArrayList<>();
- // Current replication mode implementation which will handle the events
private ReplicationModeImpl currentModeImpl;
+ private volatile IOException fatalException;
public LogEventHandler() {
}
+ void setFatalException(IOException cause) {
+ if (fatalException != null) {
+ return;
+ }
+ LOG.error("HAGroup {} event handler hit fatal exception", ReplicationLogGroup.this, cause);
+ this.fatalException = cause;
+ }
+
+ IOException getFatalException() {
+ return fatalException;
+ }
+
public void init() throws IOException {
initializeMode(getMode());
}
@@ -996,7 +1018,8 @@ private void processPendingSyncs(long sequence) throws IOException {
// is not processing any event like append/sync because this is the only thread
// that is consuming the events from the ring buffer and handing them off to the
// mode
- currentModeImpl.onExit(true);
+ ReplicationModeImpl oldModeImpl = currentModeImpl;
+ disruptorExecutor.execute(() -> oldModeImpl.onExit(true));
initializeMode(newMode);
}
}
@@ -1086,25 +1109,27 @@ private void replayFailedEvent(LogEvent failedEvent, long sequence) throws IOExc
*/
@Override
public void onEvent(LogEvent event, long sequence, boolean endOfBatch) throws Exception {
- // Calculate time spent in ring buffer
long currentTimeNs = System.nanoTime();
long ringBufferTimeNs = currentTimeNs - event.timestampNs;
metrics.updateRingBufferTime(ringBufferTimeNs);
+ if (fatalException != null) {
+ // Append events are ignored; sync futures are failed immediately
+ // so producer threads unblock without waiting for the sync timeout.
+ if (event.type == EVENT_TYPE_SYNC) {
+ event.syncFuture.completeExceptionally(fatalException);
+ }
+ return;
+ }
try {
switch (event.type) {
case EVENT_TYPE_DATA:
currentModeImpl.append(event.record);
- // Process any pending syncs at the end of batch.
if (endOfBatch) {
processPendingSyncs(sequence);
}
return;
case EVENT_TYPE_SYNC:
- // Add this sync future to the pending list
- // OK, to add the same future multiple times when we rewind the batch
- // as completing an already completed future is a no-op
pendingSyncFutures.add(event.syncFuture);
- // Process any pending syncs at the end of batch.
if (endOfBatch) {
processPendingSyncs(sequence);
}
@@ -1118,17 +1143,16 @@ public void onEvent(LogEvent event, long sequence, boolean endOfBatch) throws Ex
e);
onFailure(event, sequence, e);
} catch (Exception fatalEx) {
- // Either we failed to switch the mode or we are in STORE_AND_FORWARD mode
- // and got an exception. This is a fatal exception so halt the disruptor
- // fail the pending sync events with the original exception
- failPendingSyncs(sequence, e);
- // Both SYNC and SAF writes failed. The local HBase WAL has the mutation but neither
- // replication pipeline does, so the SAF forwarder cannot reconcile it. Abort the RS
- // so region reassignment + preWALRestore re-ships the orphaned edits.
- abort("Both SYNC and SAF replication writes failed", fatalEx);
- // halt the disruptor with the fatal exception
- throw fatalEx;
+ IOException fatalIOE =
+ asIOException("Both SYNC and SAF replication writes failed", fatalEx);
+ setFatalException(fatalIOE);
+ failPendingSyncs(sequence, fatalIOE);
}
+ } catch (Throwable t) {
+ IOException wrapped =
+ new IOException("Unexpected error in event handler at sequence " + sequence, t);
+ setFatalException(wrapped);
+ failPendingSyncs(sequence, wrapped);
}
}
@@ -1139,36 +1163,35 @@ public void onStart() {
@Override
public void onShutdown() {
- boolean isGracefulShutdown = gracefulShutdownEventHandlerFlag.get();
+ boolean graceful = (fatalException == null);
LOG.info("HAGroup {} shutting down event handler graceful={}", ReplicationLogGroup.this,
- isGracefulShutdown);
- currentModeImpl.onExit(isGracefulShutdown);
+ graceful);
+ currentModeImpl.onExit(graceful);
}
}
/**
- * Handler for critical errors during the Disruptor lifecycle that closes the writer to prevent
- * data loss.
+ * Safety-net handler for exceptions that escape the event handler. Should never fire because
+ * onEvent catches all Throwables. If it does fire, sets a fatal exception so pending futures
+ * fail.
*/
protected class LogExceptionHandler implements ExceptionHandler {
@Override
public void handleEventException(Throwable e, long sequence, LogEvent event) {
- String message = "Exception processing sequence " + sequence + " for event " + event;
- LOG.error(message, e);
- closeOnError();
+ LOG.error("UNEXPECTED: Exception escaped onEvent at sequence {}", sequence, e);
+ eventHandler
+ .setFatalException(new IOException("Exception escaped to LogExceptionHandler", e));
}
@Override
public void handleOnStartException(Throwable e) {
LOG.error("Exception during Disruptor startup", e);
- closeOnError();
+ eventHandler.setFatalException(new IOException("Disruptor startup failed", e));
}
@Override
public void handleOnShutdownException(Throwable e) {
- // Should not happen, but if it does, the regionserver is aborting or shutting down.
LOG.error("Exception during Disruptor shutdown", e);
- closeOnError();
}
}
}
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationModeImpl.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationModeImpl.java
index 12385131353..114e519b428 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationModeImpl.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationModeImpl.java
@@ -20,6 +20,8 @@
import java.io.IOException;
import org.apache.phoenix.replication.ReplicationLogGroup.Record;
import org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
@@ -31,6 +33,7 @@
*
*/
public abstract class ReplicationModeImpl {
+ private static final Logger LOG = LoggerFactory.getLogger(ReplicationModeImpl.class);
protected final ReplicationLogGroup logGroup;
// The mode manages the underlying log to which the append and sync events will be sent
@@ -91,4 +94,17 @@ void closeReplicationLog(boolean graceful) {
log.close(graceful);
}
}
+
+ protected ReplicationLogGroup.ReplicationMode transitionToStoreAndForward() throws IOException {
+ logGroup.getMetrics().incrementSyncToSafTransitions();
+ try {
+ logGroup.setHAGroupStatusToStoreAndForward();
+ } catch (Exception ex) {
+ String message =
+ String.format("HAGroup %s could not update status to STORE_AND_FORWARD", logGroup);
+ LOG.error(message, ex);
+ throw ReplicationLogGroup.asIOException(message, ex);
+ }
+ return ReplicationLogGroup.ReplicationMode.STORE_AND_FORWARD;
+ }
}
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java
index 7bcb4c89e09..8f205ea7673 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java
@@ -110,12 +110,10 @@ void onExit(boolean gracefulShutdown) {
@Override
ReplicationMode onFailure(Throwable e) throws IOException {
- // Treating failures in STORE_AND_FORWARD mode as fatal errors
+ // Failures in STORE_AND_FORWARD mode are fatal — throw to let the event handler abort
String message = String.format("HAGroup %s mode=%s got error", logGroup, this);
LOG.error(message, e);
- logGroup.abort(message, e);
- // unreachable, we remain in the same mode
- return STORE_AND_FORWARD;
+ throw ReplicationLogGroup.asIOException(message, e);
}
@Override
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java
index 02c57f7f3fd..6c85ee27d47 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java
@@ -17,7 +17,6 @@
*/
package org.apache.phoenix.replication;
-import static org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode.STORE_AND_FORWARD;
import static org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode.SYNC_AND_FORWARD;
import java.io.IOException;
@@ -62,17 +61,7 @@ void onExit(boolean gracefulShutdown) {
@Override
ReplicationMode onFailure(Throwable e) throws IOException {
LOG.info("HAGroup {} mode={} got error", logGroup, this, e);
- logGroup.getMetrics().incrementSyncToSafTransitions();
- try {
- logGroup.setHAGroupStatusToStoreAndForward();
- } catch (Exception ex) {
- // Fatal error when we can't update the HAGroup status
- String message =
- String.format("HAGroup %s could not update status to STORE_AND_FORWARD", logGroup);
- LOG.error(message, ex);
- logGroup.abort(message, ex);
- }
- return STORE_AND_FORWARD;
+ return transitionToStoreAndForward();
}
@Override
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java
index 0a5b5a48c66..ca258a5b760 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java
@@ -17,7 +17,6 @@
*/
package org.apache.phoenix.replication;
-import static org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode.STORE_AND_FORWARD;
import static org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode.SYNC;
import java.io.IOException;
@@ -57,18 +56,7 @@ void onExit(boolean gracefulShutdown) {
@Override
ReplicationMode onFailure(Throwable e) throws IOException {
LOG.info("HAGroup {} mode={} got error", logGroup, this, e);
- logGroup.getMetrics().incrementSyncToSafTransitions();
- try {
- // first update the HAGroupStore state
- logGroup.setHAGroupStatusToStoreAndForward();
- } catch (Exception ex) {
- // Fatal error when we can't update the HAGroup status
- String message =
- String.format("HAGroup %s could not update status to STORE_AND_FORWARD", logGroup);
- LOG.error(message, ex);
- logGroup.abort(message, ex);
- }
- return STORE_AND_FORWARD;
+ return transitionToStoreAndForward();
}
@Override
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplay.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplay.java
index 88eeb791e31..d062aa8c530 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplay.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplay.java
@@ -128,7 +128,6 @@ protected void init() throws IOException {
public void close() {
LOG.info("Closing ReplicationLogReplay for haGroup: {}", haGroupName);
- replicationLogDiscoveryReplay.getReplicationLogFileTracker().close();
replicationLogDiscoveryReplay.close();
// Remove the instance from cache
INSTANCES.remove(haGroupName);
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
index 2bf723defd0..ea8fa451ed2 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
@@ -263,13 +263,12 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
logGroup.append(tableName, commitId, put);
// sync on the writer will timeout — syncInternal wraps in PhoenixWALSyncTimeoutException
- // and calls abort() which throws RuntimeException
+ // and calls abort() which rethrows the IOException cause directly
try {
logGroup.sync();
- fail("Should have thrown RuntimeException because sync timed out");
- } catch (RuntimeException e) {
- assertTrue("Expected PhoenixWALSyncTimeoutException cause",
- e.getCause() instanceof PhoenixWALSyncTimeoutException);
+ fail("Should have thrown PhoenixWALSyncTimeoutException because sync timed out");
+ } catch (PhoenixWALSyncTimeoutException e) {
+ // expected
}
// reset
doNothing().when(innerWriter).sync();
@@ -623,27 +622,25 @@ public void testEventProcessingException() throws Exception {
doThrow(new RuntimeException("Simulated critical error")).when(innerWriter).append(anyString(),
anyLong(), any(Mutation.class));
- // Append data. This should trigger the LogExceptionHandler, which will close logWriter.
- // The sync future times out, syncInternal wraps in PhoenixWALSyncTimeoutException and aborts.
+ // Append publishes to the ring buffer. The event handler catches the RuntimeException via
+ // catch(Throwable), poisons itself, and fails the sync future. The producer receives
+ // ExecutionException and calls abort().
logGroup.append(tableName, commitId, put);
try {
logGroup.sync();
- fail("Should have thrown RuntimeException because sync timed out");
- } catch (RuntimeException e) {
- assertTrue("Expected PhoenixWALSyncTimeoutException cause",
- e.getCause() instanceof PhoenixWALSyncTimeoutException);
+ fail("Should have thrown IOException from abort");
+ } catch (IOException e) {
+ assertTrue(e.getMessage().contains("Unexpected error in event handler"));
}
- // Verify that subsequent operations fail because the log is closed
+ // Verify that subsequent operations fail
try {
logGroup.append(tableName, commitId + 1, put);
- fail("Should have thrown IOException because log is closed");
+ fail("Should have thrown IOException because log is poisoned/closed");
} catch (IOException e) {
- assertTrue("Expected an IOException because log is closed",
- e.getMessage().contains("Closed"));
+ // expected
}
- // Verify that the inner writer was closed by the LogExceptionHandler
verify(innerWriter, times(1)).close();
}
@@ -716,15 +713,14 @@ public void testFailToUpdateHAGroupStatusOnSwitchToStoreAndForward() throws Exce
// Append data
logGroup.append(tableName, commitId, put);
- // Try to sync. Should fail after exhausting retries and then switch to STORE_AND_FORWARD
+ // Sync fails, mode transition to SAF also fails because HAGroupStore update throws.
+ // The event handler poisons itself, and the producer calls abort().
try {
logGroup.sync();
- fail("Should have thrown exception because of failure to update mode");
- } catch (RuntimeException ex) {
- assertTrue(ex.getMessage().contains("Simulated sync failure"));
+ fail("Should have thrown IOException because of failure to update mode");
+ } catch (IOException ex) {
+ assertTrue(ex.getMessage().contains("Simulated failure to update HAGroupStore state"));
}
- // wait for the event processor thread to clean up
- Thread.sleep(3);
}
/**
@@ -970,33 +966,30 @@ public void testRuntimeExceptionDuringLengthCheck() throws Exception {
// Configure writer to throw RuntimeException on getLength()
doThrow(new RuntimeException("Simulated critical error")).when(innerWriter).getLength();
- // Append data. This should trigger the LogExceptionHandler, which will close logWriter.
- // The sync future times out, syncInternal wraps in PhoenixWALSyncTimeoutException and aborts.
+ // Append publishes to the ring buffer. The event handler catches the RuntimeException,
+ // poisons itself, and fails the sync future. The producer calls abort().
logGroup.append(tableName, commitId, put);
try {
logGroup.sync();
- fail("Should have thrown RuntimeException because sync timed out");
- } catch (RuntimeException e) {
- assertTrue("Expected PhoenixWALSyncTimeoutException cause",
- e.getCause() instanceof PhoenixWALSyncTimeoutException);
+ fail("Should have thrown IOException from abort");
+ } catch (IOException e) {
+ assertTrue(e.getMessage().contains("Unexpected error in event handler"));
}
- // Verify that subsequent operations fail because the log is closed
+ // Verify that subsequent operations fail
try {
logGroup.append(tableName, commitId + 1, put);
- fail("Should have thrown IOException because log is closed");
+ fail("Should have thrown IOException because log is poisoned/closed");
} catch (IOException e) {
- assertTrue("Expected an IOException because log is closed",
- e.getMessage().contains("Closed"));
+ // expected
}
- // Verify that the inner writer was closed by the LogExceptionHandler
verify(innerWriter, times(1)).close();
}
/**
- * Tests behavior when a RuntimeException occurs during append() after closeOnError() has been
- * called. Verifies that the system properly rejects sync operations after being closed.
+ * Tests behavior when a RuntimeException occurs during append and subsequent appends are
+ * rejected. Verifies that the system properly rejects operations after being poisoned/closed.
*/
@Test
public void testAppendAfterCloseOnError() throws Exception {
@@ -1012,32 +1005,30 @@ public void testAppendAfterCloseOnError() throws Exception {
doThrow(new RuntimeException("Simulated critical error")).when(innerWriter).append(anyString(),
anyLong(), any(Mutation.class));
- // Append data to trigger closeOnError()
+ // Append publishes to the ring buffer. The event handler catches the RuntimeException,
+ // poisons itself, and fails the sync future. The producer calls abort().
logGroup.append(tableName, commitId, put);
try {
logGroup.sync();
- fail("Should have thrown RuntimeException because sync timed out");
- } catch (RuntimeException e) {
- assertTrue("Expected PhoenixWALSyncTimeoutException cause",
- e.getCause() instanceof PhoenixWALSyncTimeoutException);
+ fail("Should have thrown IOException from abort");
+ } catch (IOException e) {
+ // expected
}
- // Verify that subsequent append operations fail because the log is closed
+ // Verify that subsequent append operations fail because the log is poisoned/closed
try {
logGroup.append(tableName, commitId, put);
- fail("Should have thrown IOException because log is closed");
+ fail("Should have thrown IOException because log is poisoned/closed");
} catch (IOException e) {
- assertTrue("Expected an IOException because log is closed",
- e.getMessage().contains("Closed"));
+ // expected
}
- // Verify that the inner writer was closed by the LogExceptionHandler
verify(innerWriter, times(1)).close();
}
/**
- * Tests behavior when a RuntimeException occurs during sync() after closeOnError() has been
- * called. Verifies that the system properly rejects sync operations after being closed.
+ * Tests behavior when a RuntimeException occurs during append and subsequent syncs are rejected.
+ * Verifies that the system properly rejects operations after being poisoned/closed.
*/
@Test
public void testSyncAfterCloseOnError() throws Exception {
@@ -1053,26 +1044,24 @@ public void testSyncAfterCloseOnError() throws Exception {
doThrow(new RuntimeException("Simulated critical error")).when(innerWriter).append(anyString(),
anyLong(), any(Mutation.class));
- // Append data to trigger closeOnError()
+ // Append publishes to the ring buffer. The event handler catches the RuntimeException,
+ // poisons itself, and fails the sync future. The producer calls abort().
logGroup.append(tableName, commitId, put);
try {
logGroup.sync();
- fail("Should have thrown RuntimeException because sync timed out");
- } catch (RuntimeException e) {
- assertTrue("Expected PhoenixWALSyncTimeoutException cause",
- e.getCause() instanceof PhoenixWALSyncTimeoutException);
+ fail("Should have thrown IOException from abort");
+ } catch (IOException e) {
+ // expected
}
- // Verify that subsequent sync operations fail because the log is closed
+ // Verify that subsequent sync operations fail because the log is poisoned/closed
try {
logGroup.sync();
- fail("Should have thrown IOException because log is closed");
+ fail("Should have thrown IOException because log is poisoned/closed");
} catch (IOException e) {
- assertTrue("Expected an IOException because log is closed",
- e.getMessage().contains("Closed"));
+ // expected
}
- // Verify that the inner writer was closed by the LogExceptionHandler
verify(innerWriter, times(1)).close();
}
@@ -1721,11 +1710,10 @@ public void testBothSyncAndSafFailuresTriggersAbort() throws Exception {
logGroup.append(tableName, commitId, put);
try {
logGroup.sync();
- fail("Should have thrown RuntimeException from abort");
- } catch (RuntimeException e) {
- assertTrue("Abort message should mention both SYNC and SAF",
- e.getMessage().contains("Both SYNC and SAF replication writes failed")
- || e.getMessage().contains("ABORTING"));
+ fail("Should have thrown IOException from abort");
+ } catch (IOException e) {
+ assertTrue("Abort message should contain sync failure",
+ e.getMessage().contains("Simulated sync failure"));
}
}
@@ -1763,10 +1751,10 @@ public void testSafBothAttemptsFailTriggersAbort() throws Exception {
logGroup.append(tableName, commitId, put);
try {
logGroup.sync();
- fail("Should have thrown RuntimeException from abort");
- } catch (RuntimeException e) {
+ fail("Should have thrown IOException from abort");
+ } catch (IOException e) {
assertTrue("Abort should fire from SAF failure path",
- e.getMessage().contains("ABORTING") || e.getMessage().contains("got error"));
+ e.getMessage().contains("Simulated SAF sync failure"));
}
}