From 5892dc9138d48a856b1a736e3c6ed2279c1c484c Mon Sep 17 00:00:00 2001 From: Ritesh Garg Date: Thu, 14 May 2026 14:24:23 -0700 Subject: [PATCH 1/5] PHOENIX-7787 Make CCF HAGroupStore ZK Updates backward compatible with existing ZK based client Co-authored-by: Cursor --- ...taleClusterRoleRecordVersionException.java | 47 ++ .../phoenix/jdbc/ClusterRoleRecord.java | 78 ++- .../phoenix/jdbc/HAGroupStoreClient.java | 187 ++++++- .../apache/phoenix/jdbc/PhoenixHAAdmin.java | 119 ++++- .../apache/phoenix/query/QueryServices.java | 9 + .../phoenix/query/QueryServicesOptions.java | 14 +- .../apache/phoenix/schema/MetaDataClient.java | 6 +- .../phoenix/jdbc/HAGroupStoreClientIT.java | 500 ++++++++++++++++++ 8 files changed, 916 insertions(+), 44 deletions(-) create mode 100644 phoenix-core-client/src/main/java/org/apache/phoenix/exception/StaleClusterRoleRecordVersionException.java diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/StaleClusterRoleRecordVersionException.java b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/StaleClusterRoleRecordVersionException.java new file mode 100644 index 00000000000..01e898960a1 --- /dev/null +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/StaleClusterRoleRecordVersionException.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.exception; + +/** + * Exception thrown when a CAS write to the legacy {@code /phoenix/ha} {@code ClusterRoleRecord} + * znode fails because another writer modified it (BadVersion) or created it concurrently + * (NodeExists). Indicates the caller should re-read and retry. + *

+ * This is the {@code ClusterRoleRecord} analog of {@link StaleHAGroupStoreRecordVersionException}. + * The two exceptions intentionally live in parallel so callers can disambiguate which znode + * (consistentHA vs ha) the CAS conflict occurred on. + */ +public class StaleClusterRoleRecordVersionException extends Exception { + private static final long serialVersionUID = 1L; + + /** + * @param msg reason for the exception + */ + public StaleClusterRoleRecordVersionException(String msg) { + super(msg); + } + + /** + * @param msg reason for the exception + * @param cause the underlying cause + */ + public StaleClusterRoleRecordVersionException(String msg, Throwable cause) { + super(msg, cause); + } + +} diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java index 0ba6b312d7b..aee742d290d 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.Objects; import java.util.Optional; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; @@ -126,40 +127,45 @@ public enum RegistryType { private final long version; /** - * To handle backward compatibility with old ClusterRoleRecords which had zk1 and zk2 as keys for - * zk urls, This constructor is only being used {@link ClusterRoleRecord#fromJson} when we - * deserialize Cluster Role Record read from ZooKeeper ZNode. If CRR is in old format we will read - * zk1 and zk2 and url1 and url2 will be null and if it is in new format zk1 and zk2 will be null - * in both cases final url is being stored in url1 and url2 url will be stored in normalized forms - * which looks like zk1\\:port1,zk2\\:port2,zk3\\:port3, zk4\\:port4,zk5\\:port5::znode or - * master1\\:port1,master2\\:port2,master3\\:port3, master4\\:port4,master5\\:port5 + * Convenience constructor that defaults {@code registryType} to + * {@link #DEFAULT_PHOENIX_HA_CRR_REGISTRY_TYPE} (RPC). Most call sites in production should use + * this. The legacy {@code /phoenix/ha} sync path uses the explicit-{@link RegistryType} overload + * below to write {@link RegistryType#ZK} records for backward compatibility with pre-consistentHA + * ZK-registry clients. * @param haGroupName HighAvailability Group name / CRR name - * @param policy Policy used by give CRR - * @param url1 ZK/HMaster url based on registry type for first cluster - * @param role1 {@link ClusterRole} which describes the current state of first cluster - * @param url2 ZK/HMaster url based on registry type for second cluster - * @param role2 {@link ClusterRole} which describes the current state of second cluster - * @param version version of a given CRR + * @param policy Policy used by given CRR + * @param url1 URL for the first cluster (canonicalized per {@code registryType}) + * @param role1 {@link ClusterRole} describing the current state of the first cluster + * @param url2 URL for the second cluster (canonicalized per {@code registryType}) + * @param role2 {@link ClusterRole} describing the current state of the second cluster + * @param version monotonic version of this CRR + */ + public ClusterRoleRecord(String haGroupName, HighAvailabilityPolicy policy, String url1, + ClusterRole role1, String url2, ClusterRole role2, long version) { + this(haGroupName, policy, DEFAULT_PHOENIX_HA_CRR_REGISTRY_TYPE, url1, role1, url2, role2, + version); + } + + /** + * Canonical constructor; also the {@code @JsonCreator} entry point so the persisted + * {@code registryType} round-trips correctly. Records written prior to this change have no + * {@code registryType} field in their JSON; for those, Jackson passes {@code null} here and we + * default to {@link #DEFAULT_PHOENIX_HA_CRR_REGISTRY_TYPE} (RPC). + *

+ * Outside of the legacy {@code /phoenix/ha} sync path, prefer the no-registry convenience + * constructor above. */ @JsonCreator public ClusterRoleRecord(@JsonProperty("haGroupName") String haGroupName, - @JsonProperty("policy") HighAvailabilityPolicy policy, @JsonProperty("url1") String url1, + @JsonProperty("policy") HighAvailabilityPolicy policy, + @JsonProperty("registryType") RegistryType registryType, @JsonProperty("url1") String url1, @JsonProperty("role1") ClusterRole role1, @JsonProperty("url2") String url2, @JsonProperty("role2") ClusterRole role2, @JsonProperty("version") long version) { this.haGroupName = haGroupName; this.policy = policy; + this.registryType = registryType != null ? registryType : DEFAULT_PHOENIX_HA_CRR_REGISTRY_TYPE; - // Default registry type is RPC from Consistent Cluster Failover onwards - this.registryType = DEFAULT_PHOENIX_HA_CRR_REGISTRY_TYPE; - - // Do we really need to normalize here ? - // We are normalizing to have urls in specific formats for each registryType for getting - // accurate comparisons. We are passing registryType as these url most probably won't have - // protocol in url, and it might be normalized based to wrong registry type, as we normalize - // w.r.t {@link ConnectionInfo.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY}, - // As we are expecting only master URLs as Consistent Cluster Failover onwards we only will - // allow RPC registry type url will be in form :- - // master1\\:port1,master2\\:port2,master3\\:port3,master4\\:port4,master5\\:port5 + // Normalize URLs to the registry-specific canonical form for accurate comparisons. url1 = JDBCUtil.formatUrl(url1, this.registryType); url2 = JDBCUtil.formatUrl(url2, this.registryType); @@ -221,6 +227,28 @@ public boolean hasSameInfo(ClusterRoleRecord other) { return haGroupName.equals(other.haGroupName) && policy.equals(other.policy); } + /** + * Returns true if {@code other} is logically equivalent to this record, ignoring {@code version} + * and {@code registryType}. Compares only the six fields that determine whether the persisted + * record needs to be rewritten: {@code haGroupName, policy, url1, url2, role1, role2}. + *

+ * Used by the legacy {@code /phoenix/ha} sync path as a short-circuit: if the existing znode is + * logically equal to what we'd write, skip the CAS round-trip. {@code version} is excluded + * because every desired CRR bumps it; {@code registryType} is excluded so a pre-existing + * RPC-stamped record converges to ZK on the next role change rather than thrashing on every sync + * cycle. + *

+ * Returns {@code false} if {@code other} is {@code null}. + */ + public boolean isLogicallyEqualIgnoringVersionAndRegistry(ClusterRoleRecord other) { + if (other == null) { + return false; + } + return Objects.equals(haGroupName, other.haGroupName) && Objects.equals(policy, other.policy) + && Objects.equals(url1, other.url1) && Objects.equals(url2, other.url2) + && role1 == other.role1 && role2 == other.role2; + } + /** Returns true if CRR has any url in UNKNOWN role/state. */ public boolean hasUnknownRole() { return role1 == ClusterRole.UNKNOWN || role2 == ClusterRole.UNKNOWN; diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java index fe299d14fbb..fd498f4b92a 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java @@ -19,6 +19,7 @@ import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT; import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLUSTER_ROLE_1; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLUSTER_ROLE_2; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLUSTER_URL_1; @@ -34,7 +35,11 @@ import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl; import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath; import static org.apache.phoenix.query.QueryServices.HA_GROUP_STORE_SYNC_INTERVAL_SECONDS; +import static org.apache.phoenix.query.QueryServices.PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS; +import static org.apache.phoenix.query.QueryServices.PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_HA_GROUP_STORE_SYNC_INTERVAL_SECONDS; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED; import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_ZK; @@ -62,11 +67,13 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.NodeCache; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.utils.ZKPaths; import org.apache.hadoop.conf.Configuration; import org.apache.phoenix.exception.InvalidClusterRoleTransitionException; +import org.apache.phoenix.exception.StaleClusterRoleRecordVersionException; import org.apache.phoenix.exception.StaleHAGroupStoreRecordVersionException; import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole; import org.apache.phoenix.jdbc.ClusterRoleRecord.RegistryType; @@ -80,6 +87,7 @@ import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions; +import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.MoreExecutors; /** * Main implementation of HAGroupStoreClient with peer support. Write-through cache for HAGroupStore @@ -98,8 +106,14 @@ public class HAGroupStoreClient implements Closeable { public static final double ZK_SESSION_TIMEOUT_MULTIPLIER = 1.1; // Maximum jitter in seconds for sync job start time (10 seconds) private static final long SYNC_JOB_MAX_JITTER_SECONDS = 10; + // 0-30s initial jitter on the periodic reconciler (nextLong is exclusive on the upper bound). + private static final long LEGACY_CRR_SYNC_JOB_MAX_JITTER_SECONDS = 31; private PhoenixHAAdmin phoenixHaAdmin; private PhoenixHAAdmin peerPhoenixHaAdmin; + // Admin on /phoenix/ha; null when feature disabled. volatile: read across event/periodic threads. + private volatile PhoenixHAAdmin legacyHaAdmin; + // In-memory cache of /phoenix/ha/; avoids a ZK read per sync. Null when feature disabled. + private volatile NodeCache legacyCrrNodeCache; private static final Logger LOGGER = LoggerFactory.getLogger(HAGroupStoreClient.class); // Map of > private static final Map> instances = @@ -132,6 +146,12 @@ public class HAGroupStoreClient implements Closeable { CopyOnWriteArraySet> targetStateSubscribers = new ConcurrentHashMap<>(); // Scheduled executor for periodic sync job private ScheduledExecutorService syncExecutor; + // Feature flag captured at construction. + private final boolean legacyCrrSyncEnabled; + // Serializes read-decide-write of the legacy znode across ctor/event/periodic threads. + private final Object legacyCrrSyncLock = new Object(); + // Periodic reconciler; null when feature disabled or interval<=0. + private volatile ScheduledExecutorService legacyCrrSyncExecutor; public static HAGroupStoreClient getInstance(Configuration conf, String haGroupName) throws SQLException { @@ -224,6 +244,8 @@ public static List getHAGroupNames(String zkUrl) throws SQLException { conf.getLong(ZK_SESSION_TIMEOUT, DEFAULT_ZK_SESSION_TIMEOUT) * ZK_SESSION_TIMEOUT_MULTIPLIER); this.rotationTimeMs = conf.getLong(QueryServices.REPLICATION_LOG_ROTATION_TIME_MS_KEY, QueryServicesOptions.DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS); + this.legacyCrrSyncEnabled = conf.getBoolean(PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED, + DEFAULT_PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED); // Custom Event Listener this.peerCustomPathChildrenCacheListener = peerPathChildrenCacheListener; try { @@ -246,6 +268,21 @@ public static List getHAGroupNames(String zkUrl) throws SQLException { // Start periodic sync job startPeriodicSyncJob(); + // Opt-in legacy /phoenix/ha sync. If the operator enabled this feature, a setup failure + // here propagates so the whole client is marked unhealthy — silently degrading to "no + // sync" would leave pre-consistentHA ZK-registry clients reading a stale legacy znode + // with no signal. The initial sync call itself swallows exceptions internally; only the + // framework setup (admin, NodeCache start) gates client health. + if (legacyCrrSyncEnabled && this.isHealthy) { + this.legacyHaAdmin = + new PhoenixHAAdmin(this.zkUrl, conf, PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE); + this.legacyCrrNodeCache = + new NodeCache(this.legacyHaAdmin.getCurator(), toPath(haGroupName)); + this.legacyCrrNodeCache.start(true); // synchronous initial build + syncLegacyCRRIfRoleChanged(); + startLegacyCrrReconciliation(); + } + } catch (Exception e) { this.isHealthy = false; close(); @@ -886,9 +923,13 @@ private PathChildrenCacheListener createCacheListener(CountDownLatch latch, if (cacheType == ClusterType.LOCAL) { maybeInitializePeerPathChildrenCache(); } + // Propagate combined view to legacy /phoenix/ha; no-op when feature disabled. + syncLegacyCRRIfRoleChanged(); } break; case CHILD_REMOVED: + // No-op for legacy /phoenix/ha: operator owns its lifecycle. Peer disappearance is + // naturally reflected as role2=UNKNOWN by the next sync. break; case INITIALIZED: latch.countDown(); @@ -985,30 +1026,42 @@ private void closePeerConnection() { */ private void shutdownSyncExecutor() { if (syncExecutor != null) { - syncExecutor.shutdown(); - try { - if (!syncExecutor.awaitTermination(5, TimeUnit.SECONDS)) { - syncExecutor.shutdownNow(); - } - } catch (InterruptedException e) { - syncExecutor.shutdownNow(); - Thread.currentThread().interrupt(); - } + MoreExecutors.shutdownAndAwaitTermination(syncExecutor, 5, TimeUnit.SECONDS); syncExecutor = null; } } + /** Shuts down the legacy CRR reconciliation executor gracefully (5s grace period). */ + private void shutdownLegacyCrrSyncExecutor() { + if (legacyCrrSyncExecutor != null) { + MoreExecutors.shutdownAndAwaitTermination(legacyCrrSyncExecutor, 5, TimeUnit.SECONDS); + legacyCrrSyncExecutor = null; + } + } + @Override public void close() { try { LOGGER.info("Closing HAGroupStoreClient"); - // Shutdown sync executor + // Executors first, then caches, then admins. Null-before-close on the last two so an + // in-flight event listener observes either an open resource or null, never half-closed. shutdownSyncExecutor(); + shutdownLegacyCrrSyncExecutor(); if (pathChildrenCache != null) { pathChildrenCache.close(); pathChildrenCache = null; } closePeerConnection(); + NodeCache nodeCache = this.legacyCrrNodeCache; + this.legacyCrrNodeCache = null; + if (nodeCache != null) { + nodeCache.close(); + } + PhoenixHAAdmin admin = this.legacyHaAdmin; + this.legacyHaAdmin = null; + if (admin != null) { + admin.close(); + } LOGGER.info("Closed HAGroupStoreClient"); } catch (IOException e) { LOGGER.error("Exception closing HAGroupStoreClient", e); @@ -1047,6 +1100,120 @@ private long validateTransitionAndGetWaitTime(HAGroupStoreRecord.HAGroupState cu return Math.max(0, remainingTime); } + // ========== Legacy /phoenix/ha CRR Sync ========== + + /** + * Derives the combined {@link ClusterRoleRecord} from local + peer consistentHA records and + * CAS-writes it to {@code /phoenix/ha} via {@link #legacyCrrNodeCache}. On {@code BadVersion} we + * log and bail out; the NodeCache watcher plus the next event/periodic cycle reconverge. + */ + private void syncLegacyCRRIfRoleChanged() { + if (!legacyCrrSyncEnabled) { + return; + } + if (!isHealthy || legacyHaAdmin == null || legacyCrrNodeCache == null) { + LOGGER.debug( + "Skipping legacy CRR sync for HA group {}: isHealthy={}, legacyHaAdmin={}, " + + "legacyCrrNodeCache={}", + haGroupName, isHealthy, legacyHaAdmin != null, legacyCrrNodeCache != null); + return; + } + synchronized (legacyCrrSyncLock) { + try { + HAGroupStoreRecord local = getHAGroupStoreRecord(); + if (local == null) { + LOGGER.debug("Skipping legacy CRR sync for HA group {}: no local consistentHA record", + haGroupName); + return; + } + HAGroupStoreRecord peer = getHAGroupStoreRecordFromPeer(); + Pair snapshot = readLegacyCrrFromCache(); + ClusterRoleRecord existing = snapshot.getLeft(); + ClusterRoleRecord desired = buildDesiredLegacyCrr(local, peer, existing); + if (desired.isLogicallyEqualIgnoringVersionAndRegistry(existing)) { + LOGGER.debug("Legacy CRR for HA group {} already up to date at version {}", haGroupName, + existing.getVersion()); + return; + } + try { + legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, desired, + expectedVersionFor(snapshot.getRight())); + LOGGER.info("Synced legacy CRR for HA group {} (version {} -> {})", haGroupName, + existing != null ? existing.getVersion() : -1L, desired.getVersion()); + } catch (StaleClusterRoleRecordVersionException stale) { + // Another writer won. NodeCache will refresh via its watcher; next cycle reconverges. + LOGGER.debug( + "Legacy CRR CAS lost race for HA group {} at expected version {}; " + + "will reconverge on next event/periodic cycle", + haGroupName, snapshot.getRight() != null ? snapshot.getRight().getVersion() : -1); + } + } catch (Exception e) { + LOGGER.warn( + "Legacy CRR sync failed for HA group {}; will be retried by next event/periodic cycle", + haGroupName, e); + } + } + } + + /** + * Builds the desired legacy CRR. Always stamps {@link RegistryType#ZK} for pre-consistentHA + * ZK-registry consumers. {@code existing} may be null (no prior legacy znode). + */ + private ClusterRoleRecord buildDesiredLegacyCrr(HAGroupStoreRecord local, HAGroupStoreRecord peer, + ClusterRoleRecord existing) { + ClusterRole role2 = (peer != null) ? peer.getClusterRole() : ClusterRole.UNKNOWN; + long peerAdminVersion = (peer != null) ? peer.getAdminCRRVersion() : 0L; + long baseVersion = Math.max(existing != null ? existing.getVersion() : 0L, + Math.max(local.getAdminCRRVersion(), peerAdminVersion)); + return new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.valueOf(local.getPolicy()), + RegistryType.ZK, this.zkUrl, local.getClusterRole(), local.getPeerZKUrl(), role2, + baseVersion + 1); + } + + /** In-memory read of the cached legacy CRR. Returns {@code (null, null)} if absent. */ + private Pair readLegacyCrrFromCache() { + ChildData current = legacyCrrNodeCache.getCurrentData(); + if (current == null) { + return Pair.of(null, null); + } + ClusterRoleRecord record = ClusterRoleRecord.fromJson(current.getData()).orElse(null); + return Pair.of(record, current.getStat()); + } + + /** Maps a null/non-null {@link Stat} to the CAS {@code expectedStatVersion} argument. */ + private static int expectedVersionFor(Stat stat) { + return (stat != null) ? stat.getVersion() : PhoenixHAAdmin.CREATE_NEW_RECORD_STAT_VERSION; + } + + /** + * Schedules the periodic reconciler; no-op when interval <= 0 (event-driven sync still runs). + */ + private void startLegacyCrrReconciliation() { + long intervalSec = conf.getLong(PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS, + DEFAULT_PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS); + if (intervalSec <= 0) { + LOGGER.info("Legacy CRR periodic reconciliation disabled (interval={}s) for HA group {}", + intervalSec, haGroupName); + return; + } + legacyCrrSyncExecutor = Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r, "HAGroupStoreClient-LegacyCRRSync-" + haGroupName); + t.setDaemon(true); + return t; + }); + long jitterSec = + ThreadLocalRandom.current().nextLong(0, LEGACY_CRR_SYNC_JOB_MAX_JITTER_SECONDS); + LOGGER.info("Starting legacy CRR reconciliation for HA group {} with initial delay {}s, " + + "then every {}s", haGroupName, jitterSec, intervalSec); + legacyCrrSyncExecutor.scheduleAtFixedRate(() -> { + try { + syncLegacyCRRIfRoleChanged(); + } catch (Throwable t) { + LOGGER.warn("Periodic legacy CRR reconciliation failed for HA group {}", haGroupName, t); + } + }, jitterSec, intervalSec, TimeUnit.SECONDS); + } + // ========== HA Group State Change Subscription Methods ========== /** diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java index f0fa7189520..416aff9e303 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java @@ -39,6 +39,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.util.PairOfSameType; +import org.apache.phoenix.exception.StaleClusterRoleRecordVersionException; import org.apache.phoenix.exception.StaleHAGroupStoreRecordVersionException; import org.apache.phoenix.util.JDBCUtil; import org.apache.zookeeper.CreateMode; @@ -83,6 +84,22 @@ public CuratorFramework getCurator(String zkUrl, Properties properties, String n private static final Logger LOG = LoggerFactory.getLogger(PhoenixHAAdmin.class); + /** + * Sentinel for {@code expectedStatVersion} meaning "no existing znode; create it". Routed to a + * {@code create()} call rather than {@code setData()}. Intentionally distinct from + * {@link #ZK_MATCH_ANY_VERSION} so the two semantics can't be confused. + * @see #createOrUpdateClusterRoleRecordWithCAS(String, ClusterRoleRecord, int) + */ + public static final int CREATE_NEW_RECORD_STAT_VERSION = -2; + + /** + * ZooKeeper's documented wildcard value for {@code setData} / {@code delete}: when passed, the + * operation skips version-checking and matches any current version (i.e. bypasses CAS). Use only + * for deliberate force-overwrite paths; never for the normal CAS update flow. + * @see #createOrUpdateClusterRoleRecordWithCAS(String, ClusterRoleRecord, int) + */ + public static final int ZK_MATCH_ANY_VERSION = -1; + /** The fully qualified ZK URL for an HBase cluster in format host:port:/hbase */ private final String zkUrl; /** Configuration of this command line tool. */ @@ -524,17 +541,22 @@ public void updateHAGroupStoreRecordInZooKeeper(String haGroupName, } /** - * Gets the HAGroupStoreRecord and Stat from ZooKeeper. + * Atomically reads the {@link HAGroupStoreRecord} and its {@link Stat} from ZooKeeper using + * {@code getData().storingStatIn(stat)}, so the returned stat version always corresponds to the + * returned bytes. This closes a pre-existing TOCTOU window where the previous two-call + * implementation ({@code getData()} + a separate {@code checkExists()}) could return a stat whose + * {@code version} did not match the {@code data} bytes if another writer updated the znode in + * between. * @param haGroupName the HA group name - * @return a pair of HAGroupStoreRecord and Stat - * @throws IOException if any error occurs during the retrieval + * @return a pair of (record, stat); both null if the znode does not exist + * @throws IOException on unexpected ZK errors */ public Pair getHAGroupStoreRecordInZooKeeper(String haGroupName) throws IOException { try { - byte[] data = getCurator().getData().forPath(toPath(haGroupName)); + Stat stat = new Stat(); + byte[] data = getCurator().getData().storingStatIn(stat).forPath(toPath(haGroupName)); HAGroupStoreRecord record = HAGroupStoreRecord.fromJson(data).orElse(null); - Stat stat = getCurator().checkExists().forPath(toPath(haGroupName)); return Pair.of(record, stat); } catch (KeeperException.NoNodeException nne) { LOG.warn("No HAGroupStoreRecord for HA group {} in ZK", haGroupName, nne); @@ -559,6 +581,93 @@ public void deleteHAGroupStoreRecordInZooKeeper(String haGroupName) throws IOExc } } + // ----- Helpers for the legacy /phoenix/ha ClusterRoleRecord sync path. ----- + // These mirror the HAGroupStoreRecord helpers above but operate on ClusterRoleRecord + // and read data + stat atomically. + + /** + * Atomically reads the {@link ClusterRoleRecord} and its {@link Stat} from ZooKeeper using + * {@code getData().storingStatIn(stat)}, so the returned stat version always corresponds to the + * returned bytes. + * @param haGroupName the HA group name + * @return a pair of (record, stat); both null if the znode does not exist + * @throws IOException on unexpected ZK errors + */ + public Pair getClusterRoleRecordAndStatInZooKeeper(String haGroupName) + throws IOException { + try { + Stat stat = new Stat(); + byte[] data = getCurator().getData().storingStatIn(stat).forPath(toPath(haGroupName)); + ClusterRoleRecord record = ClusterRoleRecord.fromJson(data).orElse(null); + return Pair.of(record, stat); + } catch (KeeperException.NoNodeException nne) { + return Pair.of(null, null); + } catch (Exception e) { + LOG.error("Failed to get ClusterRoleRecord for HA group {}", haGroupName, e); + throw new IOException("Failed to get ClusterRoleRecord for HA group " + haGroupName, e); + } + } + + /** + * Writes {@code newRecord} to the legacy znode. Dispatch is determined solely by + * {@code expectedStatVersion}, which must be exactly one of: + *

+ * Any other value (e.g. {@code -3}, {@code Integer.MIN_VALUE}) is rejected with + * {@link IllegalArgumentException} so a stray sentinel can't silently fall through into one of + * the above semantics. + * @throws StaleClusterRoleRecordVersionException if CAS fails with BadVersion or the create races + * with a concurrent NodeExists + * @throws IOException on other errors + * @throws IllegalArgumentException if {@code expectedStatVersion} is not one of the + * three documented categories + */ + public void createOrUpdateClusterRoleRecordWithCAS(String haGroupName, + ClusterRoleRecord newRecord, int expectedStatVersion) + throws IOException, StaleClusterRoleRecordVersionException { + // Validate caller intent BEFORE entering the try/catch so the IllegalArgumentException is + // not accidentally wrapped in IOException by the catch-all below. + boolean isCreate = expectedStatVersion == CREATE_NEW_RECORD_STAT_VERSION; + boolean isWildcardOverwrite = expectedStatVersion == ZK_MATCH_ANY_VERSION; + boolean isCasUpdate = expectedStatVersion >= 0; + Preconditions.checkArgument(isCreate || isWildcardOverwrite || isCasUpdate, + "expectedStatVersion must be CREATE_NEW_RECORD_STAT_VERSION (create), ZK_MATCH_ANY_VERSION " + + "(unconditional overwrite), or a non-negative stat version (CAS update); got " + + expectedStatVersion); + try { + byte[] data = ClusterRoleRecord.toJson(newRecord); + if (isCreate) { + getCurator().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) + .forPath(toPath(haGroupName), data); + } else { + // Both ZK_MATCH_ANY_VERSION (-1, wildcard) and >= 0 (CAS) are valid arguments to + // setData().withVersion(); ZK applies the appropriate semantics server-side. + getCurator().setData().withVersion(expectedStatVersion).forPath(toPath(haGroupName), data); + } + } catch (KeeperException.BadVersionException e) { + throw new StaleClusterRoleRecordVersionException( + "Failed to set ClusterRoleRecord for HA group " + haGroupName + " with cached stat version " + + expectedStatVersion, + e); + } catch (KeeperException.NodeExistsException e) { + // Race with another writer creating the node; surface as stale-version so caller retries. + throw new StaleClusterRoleRecordVersionException( + "Failed to create ClusterRoleRecord for HA group " + haGroupName + ": node already exists", + e); + } catch (Exception e) { + LOG.error("Failed to write ClusterRoleRecord for HA group {}", haGroupName, e); + throw new IOException("Failed to write ClusterRoleRecord for HA group " + haGroupName, e); + } + } + public String getZkUrl() { return zkUrl; } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java index 72075853aa8..8d4a8e4c7ff 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -647,6 +647,15 @@ public interface QueryServices extends SQLCloseable { // HA Group Store sync job interval in seconds String HA_GROUP_STORE_SYNC_INTERVAL_SECONDS = "phoenix.ha.group.store.sync.interval.seconds"; + // Master switch for syncing the legacy /phoenix/ha CRR from /phoenix/consistentHA. + // When false, no legacy znode is read, written, or deleted by HAGroupStoreClient. + String PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED = "phoenix.ha.legacy.crr.sync.enabled"; + + // Periodic reconciliation interval for the legacy /phoenix/ha CRR sync, in seconds. + // 0 disables the periodic loop only; event-driven sync still runs. + String PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS = + "phoenix.ha.legacy.crr.reconciliation.interval.seconds"; + String REPLICATION_LOG_ROTATION_TIME_MS_KEY = "phoenix.replication.log.rotation.time.ms"; /** diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 339b6763a45..3e99b9fff68 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -92,6 +92,8 @@ import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK; import static org.apache.phoenix.query.QueryServices.PHOENIX_ACLS_ENABLED; +import static org.apache.phoenix.query.QueryServices.PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS; +import static org.apache.phoenix.query.QueryServices.PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED; import static org.apache.phoenix.query.QueryServices.QUERY_SERVICES_NAME; import static org.apache.phoenix.query.QueryServices.QUEUE_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.REGIONSERVER_INFO_PORT_ATTRIB; @@ -517,6 +519,13 @@ public class QueryServicesOptions { // Default HA Group Store sync job interval in seconds (15 minutes = 900 seconds) public static final int DEFAULT_HA_GROUP_STORE_SYNC_INTERVAL_SECONDS = 900; + // Legacy /phoenix/ha CRR sync is opt-in (default off). + public static final boolean DEFAULT_PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED = false; + + // Periodic reconciliation interval for legacy /phoenix/ha CRR sync, in seconds. + // 0 disables the periodic loop only; event-driven sync still runs. + public static final long DEFAULT_PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS = 60L; + public static final long DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS = 60 * 1000L; private final Configuration config; @@ -643,7 +652,10 @@ public static QueryServicesOptions withDefaults() { .setIfUnset(HA_GROUP_STORE_SYNC_INTERVAL_SECONDS, DEFAULT_HA_GROUP_STORE_SYNC_INTERVAL_SECONDS) .setIfUnset(HA_GROUP_STORE_CLIENT_PREWARM_ENABLED, - DEFAULT_HA_GROUP_STORE_CLIENT_PREWARM_ENABLED); + DEFAULT_HA_GROUP_STORE_CLIENT_PREWARM_ENABLED) + .setIfUnset(PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED, DEFAULT_PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED) + .setIfUnset(PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS, + DEFAULT_PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS); // HBase sets this to 1, so we reset it to something more appropriate. // Hopefully HBase will change this, because we can't know if a user set diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index edb38da970c..1f57187a2ca 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -4738,9 +4738,9 @@ public MutationState addColumn(PTable table, List origColumnDefs, /** * To check if TTL is defined at any of the child below we are checking it at * {@link org.apache.phoenix.coprocessor.MetaDataEndpointImpl#mutateColumn(List, ColumnMutator, int, PTable, PTable, boolean)} - * level where in function - * {@link org.apache.phoenix.coprocessor.MetaDataEndpointImpl# validateIfMutationAllowedOnParent(PTable, List, PTableType, long, byte[], byte[], byte[], List, int)} - * we are already traversing through allDescendantViews. + * level where in function {@link org.apache.phoenix.coprocessor.MetaDataEndpointImpl# + * validateIfMutationAllowedOnParent(PTable, List, PTableType, long, byte[], byte[], + * byte[], List, int)} we are already traversing through allDescendantViews. */ } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java index ba667876740..1a809370116 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java @@ -18,9 +18,12 @@ package org.apache.phoenix.jdbc; import static org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_RECORD_NAMESPACE; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_HA_GROUP_NAME; import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl; import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath; +import static org.apache.phoenix.query.QueryServices.PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS; +import static org.apache.phoenix.query.QueryServices.PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED; import static org.apache.phoenix.replication.reader.ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED; import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_ZK; @@ -47,13 +50,16 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.hadoop.conf.Configuration; import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; import org.apache.phoenix.exception.InvalidClusterRoleTransitionException; +import org.apache.phoenix.exception.StaleClusterRoleRecordVersionException; import org.apache.phoenix.util.HAGroupStoreTestUtil; +import org.apache.phoenix.util.JDBCUtil; import org.apache.zookeeper.data.Stat; import org.junit.After; import org.junit.Before; @@ -72,6 +78,8 @@ public class HAGroupStoreClientIT extends HABaseIT { private static final Long ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS = 5000L; private PhoenixHAAdmin haAdmin; private PhoenixHAAdmin peerHaAdmin; + // Admin on the legacy /phoenix/ha namespace; used to inspect/seed/corrupt the legacy znode. + private PhoenixHAAdmin legacyHaAdmin; private String zkUrl; private String peerZKUrl; private String masterUrl; @@ -95,8 +103,11 @@ public void before() throws Exception { ZK_CONSISTENT_HA_GROUP_RECORD_NAMESPACE); peerHaAdmin = new PhoenixHAAdmin(CLUSTERS.getHBaseCluster2().getConfiguration(), ZK_CONSISTENT_HA_GROUP_RECORD_NAMESPACE); + legacyHaAdmin = new PhoenixHAAdmin(CLUSTERS.getHBaseCluster1().getConfiguration(), + PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE); haAdmin.getCurator().delete().quietly().forPath(toPath(testName.getMethodName())); peerHaAdmin.getCurator().delete().quietly().forPath(toPath(testName.getMethodName())); + legacyHaAdmin.getCurator().delete().quietly().forPath(toPath(testName.getMethodName())); zkUrl = getLocalZkUrl(CLUSTERS.getHBaseCluster1().getConfiguration()); // Clean existing records in system table List haGroupNames = HAGroupStoreClient.getHAGroupNames(zkUrl); @@ -117,8 +128,10 @@ public void before() throws Exception { public void after() throws Exception { haAdmin.getCurator().delete().quietly().forPath(toPath(testName.getMethodName())); peerHaAdmin.getCurator().delete().quietly().forPath(toPath(testName.getMethodName())); + legacyHaAdmin.getCurator().delete().quietly().forPath(toPath(testName.getMethodName())); haAdmin.close(); peerHaAdmin.close(); + legacyHaAdmin.close(); } @Test @@ -1203,4 +1216,491 @@ public void testPeriodicSyncJobExecutorStartsAndSyncsData() throws Exception { assertTrue("Sync executor should be shutdown after close", syncExecutor.isShutdown()); } } + + // ============================================================================================ + // Legacy /phoenix/ha CRR sync tests + // Verify feature-flag gating, derivation, monotonic version, registry-type preservation, + // deletion mirroring, and short-circuit behavior of HAGroupStoreClient's legacy sync path. + // ============================================================================================ + + @Test + public void testLegacyCrrSyncFeatureOffByDefault_NoLegacyZnodeWritten() throws Exception { + String haGroupName = testName.getMethodName(); + Configuration conf = legacyCrrConf(/* legacyEnabled */ false, /* periodicSec */ 60); + HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); + assertNotNull(client); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + Pair legacy = readLegacyCrr(haGroupName); + assertNull("Legacy CRR must not exist when feature is off", legacy.getLeft()); + assertNull("Legacy znode stat must be null when feature is off", legacy.getRight()); + } + + @Test + public void testLegacyCrrSyncFeatureOn_InitialSyncCreatesZkRegistryLegacyZnode() + throws Exception { + String haGroupName = testName.getMethodName(); + Configuration conf = legacyCrrConf(true, 60); + HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); + assertNotNull(client); + + Pair legacy = awaitLegacyCrrPresent(haGroupName); + ClusterRoleRecord crr = legacy.getLeft(); + assertEquals("Legacy CRR must use ZK registry type for backward compatibility", + ClusterRoleRecord.RegistryType.ZK, crr.getRegistryType()); + assertEquals(haGroupName, crr.getHaGroupName()); + assertEquals(HighAvailabilityPolicy.FAILOVER, crr.getPolicy()); + // Local cluster role is ACTIVE per System Table seed. + assertEquals(ClusterRoleRecord.ClusterRole.ACTIVE, + crr.getRole(formattedZkUrlFor(ClusterType.LOCAL))); + assertTrue("CRR version must be > 0 after initial sync", crr.getVersion() > 0); + } + + @Test + public void testLegacyCrrSyncRoleChangePropagatesAndIsNewerThanWorks() throws Exception { + String haGroupName = testName.getMethodName(); + Configuration conf = legacyCrrConf(true, 60); + HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); + assertNotNull(client); + + ClusterRoleRecord initial = awaitLegacyCrrPresent(haGroupName).getLeft(); + long initialVersion = initial.getVersion(); + + // ACTIVE_IN_SYNC -> ACTIVE_IN_SYNC_TO_STANDBY (role change ACTIVE -> ACTIVE_TO_STANDBY). + client.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY); + Pair updated = awaitLegacyCrrRole(haGroupName, ClusterType.LOCAL, + ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY); + ClusterRoleRecord updatedCRR = updated.getLeft(); + assertTrue("Version must monotonically increase after role change", + updatedCRR.getVersion() > initialVersion); + assertTrue("isNewerThan must return true for the updated record", + updatedCRR.isNewerThan(initial)); + assertEquals(ClusterRoleRecord.RegistryType.ZK, updatedCRR.getRegistryType()); + } + + @Test + public void testLegacyCrrSyncStateOnlyChangeDoesNotRewriteLegacy() throws Exception { + String haGroupName = testName.getMethodName(); + Configuration conf = legacyCrrConf(true, 60); + HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); + assertNotNull(client); + + Pair initial = awaitLegacyCrrPresent(haGroupName); + int initialZkVersion = initial.getRight().getVersion(); + long initialCrrVersion = initial.getLeft().getVersion(); + + // ACTIVE_IN_SYNC -> ACTIVE_NOT_IN_SYNC: ClusterRole stays ACTIVE. + client.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + Pair after = readLegacyCrr(haGroupName); + assertNotNull(after.getLeft()); + assertEquals("Legacy CRR ZK stat version must not change on state-only transitions", + initialZkVersion, after.getRight().getVersion()); + assertEquals("Legacy CRR logical version must not change on state-only transitions", + initialCrrVersion, after.getLeft().getVersion()); + } + + /** Matrix #10: LOCAL CHILD_REMOVED on consistentHA does not delete the legacy znode. */ + @Test + public void testLegacyCrrSyncLocalChildRemovedDoesNotDeleteLegacy() throws Exception { + String haGroupName = testName.getMethodName(); + Configuration conf = legacyCrrConf(true, 60); + HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); + assertNotNull(client); + Pair initial = awaitLegacyCrrPresent(haGroupName); + long initialVersion = initial.getLeft().getVersion(); + + haAdmin.deleteHAGroupStoreRecordInZooKeeper(haGroupName); + + // Wait long enough for any potential event-driven delete to have fired. + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + Pair after = readLegacyCrr(haGroupName); + assertNotNull("Legacy znode must NOT be deleted on LOCAL CHILD_REMOVED", after.getLeft()); + assertTrue("Legacy CRR version must not regress after LOCAL CHILD_REMOVED", + after.getLeft().getVersion() >= initialVersion); + } + + @Test + public void testLegacyCrrSyncPeriodicDisabledStillSyncsViaEvents() throws Exception { + String haGroupName = testName.getMethodName(); + Configuration conf = legacyCrrConf(true, 0); // periodic disabled + HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); + assertNotNull(client); + ClusterRoleRecord initial = awaitLegacyCrrPresent(haGroupName).getLeft(); + assertEquals("Initial local role should be ACTIVE per @Before seed", + ClusterRoleRecord.ClusterRole.ACTIVE, initial.getRole(formattedZkUrlFor(ClusterType.LOCAL))); + assertEquals("Initial registry type must be ZK", ClusterRoleRecord.RegistryType.ZK, + initial.getRegistryType()); + + client.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY); + ClusterRoleRecord updated = awaitLegacyCrrRole(haGroupName, ClusterType.LOCAL, + ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).getLeft(); + assertTrue("Updated version must monotonically advance past the initial version", + updated.getVersion() > initial.getVersion()); + assertTrue("isNewerThan must return true for the post-event record", + updated.isNewerThan(initial)); + assertEquals("Registry type must remain ZK after an event-driven sync", + ClusterRoleRecord.RegistryType.ZK, updated.getRegistryType()); + } + + /** Matrix #11: PEER CHILD_REMOVED on consistentHA does not delete the legacy znode. */ + @Test + public void testLegacyCrrSyncPeerChildRemovedDoesNotDeleteLegacy() throws Exception { + String haGroupName = testName.getMethodName(); + // Seed a peer record so that PEER cache initializes and PEER CHILD_REMOVED can fire later. + HAGroupStoreRecord peerRecord = + new HAGroupStoreRecord("v1.0", haGroupName, HAGroupStoreRecord.HAGroupState.STANDBY, 0L, + HighAvailabilityPolicy.FAILOVER.toString(), this.zkUrl, this.peerMasterUrl, this.masterUrl, + CLUSTERS.getHdfsUrl2(), CLUSTERS.getHdfsUrl1(), 0L); + createOrUpdateHAGroupStoreRecordOnZookeeper(peerHaAdmin, haGroupName, peerRecord); + + Configuration conf = legacyCrrConf(true, 0); // periodic disabled to isolate event behavior + HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); + assertNotNull(client); + + Pair initial = awaitLegacyCrrPresent(haGroupName); + long initialCrrVersion = initial.getLeft().getVersion(); + + peerHaAdmin.deleteHAGroupStoreRecordInZooKeeper(haGroupName); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + Pair after = readLegacyCrr(haGroupName); + assertNotNull("Legacy znode must NOT be deleted on PEER CHILD_REMOVED", after.getLeft()); + assertTrue("Legacy CRR version must not regress after PEER CHILD_REMOVED", + after.getLeft().getVersion() >= initialCrrVersion); + } + + /** Matrix #7 (helper-level): CAS keeps one writer per attempt; loser sees stale exception. */ + @Test + public void testLegacyCrrCASWithConcurrentWriters_OneSucceedsOneStales() throws Exception { + String haGroupName = testName.getMethodName(); + + // Step 1: create the legacy znode using the explicit create sentinel (NOT -1, which ZK + // reserves as the match-any-version wildcard). + ClusterRoleRecord initial = new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.FAILOVER, + ClusterRoleRecord.RegistryType.ZK, this.zkUrl, ClusterRoleRecord.ClusterRole.ACTIVE, + this.peerZKUrl, ClusterRoleRecord.ClusterRole.STANDBY, 1L); + legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, initial, + PhoenixHAAdmin.CREATE_NEW_RECORD_STAT_VERSION); + + // Step 2: both "writers" snapshot the same stat version. + Pair existing = + legacyHaAdmin.getClusterRoleRecordAndStatInZooKeeper(haGroupName); + assertNotNull(existing.getLeft()); + int sharedVersion = existing.getRight().getVersion(); + + // Step 3: writer A wins the race. + ClusterRoleRecord writerA = new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.FAILOVER, + ClusterRoleRecord.RegistryType.ZK, this.zkUrl, + ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY, this.peerZKUrl, + ClusterRoleRecord.ClusterRole.STANDBY, 2L); + legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, writerA, sharedVersion); + + // Step 4: writer B uses the now-stale stat version → BadVersion → mapped to + // StaleClusterRoleRecordVersionException (exactly what the client's retry path catches). + ClusterRoleRecord writerB = new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.FAILOVER, + ClusterRoleRecord.RegistryType.ZK, this.zkUrl, ClusterRoleRecord.ClusterRole.STANDBY, + this.peerZKUrl, ClusterRoleRecord.ClusterRole.ACTIVE, 2L); + assertThrows(StaleClusterRoleRecordVersionException.class, () -> legacyHaAdmin + .createOrUpdateClusterRoleRecordWithCAS(haGroupName, writerB, sharedVersion)); + + // Step 5: final znode state reflects writer A's update; loser observed BadVersion. + Pair winner = + legacyHaAdmin.getClusterRoleRecordAndStatInZooKeeper(haGroupName); + assertNotNull(winner.getLeft()); + assertEquals(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY, + winner.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL))); + assertTrue("Stat version must advance exactly once for the single winning CAS", + winner.getRight().getVersion() > sharedVersion); + + // Step 6: concurrent create races map to the same exception (NodeExistsException is also + // surfaced as StaleClusterRoleRecordVersionException so the client retry path is uniform). + ClusterRoleRecord raceCreate = + new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.FAILOVER, + ClusterRoleRecord.RegistryType.ZK, this.zkUrl, ClusterRoleRecord.ClusterRole.STANDBY, + this.peerZKUrl, ClusterRoleRecord.ClusterRole.STANDBY, 3L); + assertThrows(StaleClusterRoleRecordVersionException.class, + () -> legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, raceCreate, + PhoenixHAAdmin.CREATE_NEW_RECORD_STAT_VERSION)); + + // Step 7: ZK_MATCH_ANY_VERSION is allowed (operator/migration tooling may want a + // force-overwrite path), but any negative value that is neither the wildcard nor + // CREATE_NEW_RECORD_STAT_VERSION must fail fast so a stray sentinel doesn't silently take + // on one of the defined semantics. + Stat statBeforeWildcard = + legacyHaAdmin.getClusterRoleRecordAndStatInZooKeeper(haGroupName).getRight(); + ClusterRoleRecord wildcardOverwrite = + new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.FAILOVER, + ClusterRoleRecord.RegistryType.ZK, this.zkUrl, ClusterRoleRecord.ClusterRole.STANDBY, + this.peerZKUrl, ClusterRoleRecord.ClusterRole.STANDBY, 4L); + legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, wildcardOverwrite, + PhoenixHAAdmin.ZK_MATCH_ANY_VERSION); + Pair afterWildcard = + legacyHaAdmin.getClusterRoleRecordAndStatInZooKeeper(haGroupName); + assertEquals("ZK_MATCH_ANY_VERSION must unconditionally overwrite", + ClusterRoleRecord.ClusterRole.STANDBY, + afterWildcard.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL))); + assertTrue("Wildcard overwrite must still bump the stat version", + afterWildcard.getRight().getVersion() > statBeforeWildcard.getVersion()); + + // Negative values that are neither the create sentinel nor the wildcard must be rejected + // up front with IllegalArgumentException, before any ZK call is attempted. + ClusterRoleRecord illegalArgumentRecord = + new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.FAILOVER, + ClusterRoleRecord.RegistryType.ZK, this.zkUrl, ClusterRoleRecord.ClusterRole.OFFLINE, + this.peerZKUrl, ClusterRoleRecord.ClusterRole.OFFLINE, 5L); + assertThrows(IllegalArgumentException.class, () -> legacyHaAdmin + .createOrUpdateClusterRoleRecordWithCAS(haGroupName, illegalArgumentRecord, -3)); + assertThrows(IllegalArgumentException.class, + () -> legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, illegalArgumentRecord, + Integer.MIN_VALUE)); + } + + /** Matrix #4: peer-side role flip propagates to role2 in the local legacy CRR. */ + @Test + public void testLegacyCrrSyncPeerRoleFlipUpdatesLegacyRole2() throws Exception { + String haGroupName = testName.getMethodName(); + // Seed peer with STANDBY before client starts so the initial sync sees role2=STANDBY. + HAGroupStoreRecord peerStandby = + new HAGroupStoreRecord("v1.0", haGroupName, HAGroupStoreRecord.HAGroupState.STANDBY, 0L, + HighAvailabilityPolicy.FAILOVER.toString(), this.zkUrl, this.peerMasterUrl, this.masterUrl, + CLUSTERS.getHdfsUrl2(), CLUSTERS.getHdfsUrl1(), 0L); + createOrUpdateHAGroupStoreRecordOnZookeeper(peerHaAdmin, haGroupName, peerStandby); + + Configuration conf = legacyCrrConf(true, 0); // periodic disabled to isolate event-driven path + HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); + assertNotNull(client); + awaitLegacyCrrRole(haGroupName, ClusterType.PEER, ClusterRoleRecord.ClusterRole.STANDBY); + + // Flip the peer record to a state whose cluster role is ACTIVE_TO_STANDBY. + HAGroupStoreRecord peerFlipped = new HAGroupStoreRecord("v1.0", haGroupName, + HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY, 0L, + HighAvailabilityPolicy.FAILOVER.toString(), this.zkUrl, this.peerMasterUrl, this.masterUrl, + CLUSTERS.getHdfsUrl2(), CLUSTERS.getHdfsUrl1(), 0L); + createOrUpdateHAGroupStoreRecordOnZookeeper(peerHaAdmin, haGroupName, peerFlipped); + + Pair after = awaitLegacyCrrRole(haGroupName, ClusterType.PEER, + ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY); + assertEquals("Registry type must remain ZK after a peer-driven role flip", + ClusterRoleRecord.RegistryType.ZK, after.getLeft().getRegistryType()); + } + + /** Matrix #6: absent peer record yields role2=UNKNOWN; converges when the peer record appears. */ + @Test + public void testLegacyCrrSyncPeerAbsentYieldsUnknownAndConvergesOnRecovery() throws Exception { + String haGroupName = testName.getMethodName(); + // No peer record seeded: peer cache is empty so getHAGroupStoreRecordFromPeer() returns null + // and role2 falls through to UNKNOWN. + Configuration conf = legacyCrrConf(true, 0); + HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); + assertNotNull(client); + Pair initial = + awaitLegacyCrrRole(haGroupName, ClusterType.PEER, ClusterRoleRecord.ClusterRole.UNKNOWN); + long initialVersion = initial.getLeft().getVersion(); + + // Peer "recovers" by writing its consistentHA record. The PEER CHILD_ADDED event triggers + // the legacy sync to update role2. + HAGroupStoreRecord peerRecord = + new HAGroupStoreRecord("v1.0", haGroupName, HAGroupStoreRecord.HAGroupState.STANDBY, 0L, + HighAvailabilityPolicy.FAILOVER.toString(), this.zkUrl, this.peerMasterUrl, this.masterUrl, + CLUSTERS.getHdfsUrl2(), CLUSTERS.getHdfsUrl1(), 0L); + createOrUpdateHAGroupStoreRecordOnZookeeper(peerHaAdmin, haGroupName, peerRecord); + + Pair recovered = + awaitLegacyCrrRole(haGroupName, ClusterType.PEER, ClusterRoleRecord.ClusterRole.STANDBY); + assertTrue("Version must bump when role2 transitions UNKNOWN -> STANDBY", + recovered.getLeft().getVersion() > initialVersion); + } + + /** Matrix #12: registryType stays ZK across multiple sync cycles (never reverts to RPC). */ + @Test + public void testLegacyCrrSyncRegistryTypePreservedAcrossMultipleCycles() throws Exception { + String haGroupName = testName.getMethodName(); + Configuration conf = legacyCrrConf(true, 0); + HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); + assertNotNull(client); + + Pair initial = awaitLegacyCrrPresent(haGroupName); + assertEquals(ClusterRoleRecord.RegistryType.ZK, initial.getLeft().getRegistryType()); + assertEquals("Initial local role should be ACTIVE per @Before seed", + ClusterRoleRecord.ClusterRole.ACTIVE, + initial.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL))); + long lastVersion = initial.getLeft().getVersion(); + + // Drive a sequence of distinct peer states; each event drives a sync that rewrites the + // legacy znode (or short-circuits if logically equal). Direct ZK writes intentionally + // bypass setHAGroupStatusIfNeeded's transition guard. + HAGroupStoreRecord.HAGroupState[] cycle = + new HAGroupStoreRecord.HAGroupState[] { HAGroupStoreRecord.HAGroupState.STANDBY, + HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY, + HAGroupStoreRecord.HAGroupState.STANDBY_TO_ACTIVE, HAGroupStoreRecord.HAGroupState.OFFLINE, + HAGroupStoreRecord.HAGroupState.STANDBY }; + for (HAGroupStoreRecord.HAGroupState state : cycle) { + HAGroupStoreRecord peer = new HAGroupStoreRecord("v1.0", haGroupName, state, 0L, + HighAvailabilityPolicy.FAILOVER.toString(), this.zkUrl, this.peerMasterUrl, this.masterUrl, + CLUSTERS.getHdfsUrl2(), CLUSTERS.getHdfsUrl1(), 0L); + createOrUpdateHAGroupStoreRecordOnZookeeper(peerHaAdmin, haGroupName, peer); + Pair after = + awaitLegacyCrrRole(haGroupName, ClusterType.PEER, state.getClusterRole()); + assertEquals( + "Local role must remain ACTIVE across peer-driven cycles (peer state=" + state + ")", + ClusterRoleRecord.ClusterRole.ACTIVE, + after.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL))); + assertEquals("Registry type must remain ZK after a sync cycle (peer state=" + state + ")", + ClusterRoleRecord.RegistryType.ZK, after.getLeft().getRegistryType()); + assertTrue( + "Logical version must monotonically increase across distinct sync cycles (peer state=" + + state + ")", + after.getLeft().getVersion() > lastVersion); + lastVersion = after.getLeft().getVersion(); + } + } + + /** Matrix #9: periodic loop repairs an external divergence with no consistentHA event. */ + @Test + public void testLegacyCrrSyncPeriodicReconciliationRecoversAfterDivergence() throws Exception { + String haGroupName = testName.getMethodName(); + Configuration conf = legacyCrrConf(true, 2); // 2s interval; jitter is 0-30s on first run + HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); + assertNotNull(client); + Pair initial = awaitLegacyCrrPresent(haGroupName); + assertEquals("Sanity: local role1 should be ACTIVE per @Before seed", + ClusterRoleRecord.ClusterRole.ACTIVE, + initial.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL))); + + // Externally overwrite the legacy znode with role1=STANDBY (logically different from desired). + // This does NOT trigger any /phoenix/consistentHA event, so the only path to recovery is + // the periodic reconciliation loop. + ClusterRoleRecord corrupt = new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.FAILOVER, + ClusterRoleRecord.RegistryType.ZK, this.zkUrl, ClusterRoleRecord.ClusterRole.STANDBY, + this.peerZKUrl, ClusterRoleRecord.ClusterRole.ACTIVE, initial.getLeft().getVersion() + 10); + legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, corrupt, + initial.getRight().getVersion()); + Pair corrupted = readLegacyCrr(haGroupName); + assertEquals("Sanity: corruption took (role1 now STANDBY)", + ClusterRoleRecord.ClusterRole.STANDBY, + corrupted.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL))); + + // Wait for the periodic loop to fire. Worst case: jitter=30s, so allow 40s. + long deadline = System.currentTimeMillis() + 40_000L; + Pair after = readLegacyCrr(haGroupName); + while ( + (after.getLeft() == null || after.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL)) + != ClusterRoleRecord.ClusterRole.ACTIVE) + && System.currentTimeMillis() < deadline + ) { + Thread.sleep(500); + after = readLegacyCrr(haGroupName); + } + assertNotNull("Legacy znode missing after periodic reconciliation window", after.getLeft()); + assertEquals("Periodic reconciliation must restore role1 to ACTIVE", + ClusterRoleRecord.ClusterRole.ACTIVE, + after.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL))); + assertTrue("Version must monotonically advance past the externally-bumped version", + after.getLeft().getVersion() > corrupted.getLeft().getVersion()); + assertEquals("Registry type must remain ZK after periodic recovery", + ClusterRoleRecord.RegistryType.ZK, after.getLeft().getRegistryType()); + } + + /** Matrix #7/#12 e2e: client overwrites a pre-seeded logically-stale legacy znode. */ + @Test + public void testLegacyCrrSyncOverwritesPreSeededLogicallyStaleZnode() throws Exception { + String haGroupName = testName.getMethodName(); + // Pre-seed the legacy znode with role2=OFFLINE so the desired CRR (role2=UNKNOWN by default, + // since no peer record exists yet) is logically different. + ClusterRoleRecord preSeed = new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.FAILOVER, + ClusterRoleRecord.RegistryType.ZK, this.zkUrl, ClusterRoleRecord.ClusterRole.ACTIVE, + this.peerZKUrl, ClusterRoleRecord.ClusterRole.OFFLINE, 5L); + legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, preSeed, + PhoenixHAAdmin.CREATE_NEW_RECORD_STAT_VERSION); + Pair seeded = readLegacyCrr(haGroupName); + assertNotNull(seeded.getLeft()); + assertEquals(ClusterRoleRecord.ClusterRole.OFFLINE, + seeded.getLeft().getRole(formattedZkUrlFor(ClusterType.PEER))); + + Configuration conf = legacyCrrConf(true, 0); // periodic disabled; rely on initial sync only + HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); + assertNotNull(client); + + // Wait for the initial sync to converge role2 away from OFFLINE. + long deadline = System.currentTimeMillis() + ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS; + Pair after = readLegacyCrr(haGroupName); + while ( + after.getLeft() != null && after.getLeft().getRole(formattedZkUrlFor(ClusterType.PEER)) + == ClusterRoleRecord.ClusterRole.OFFLINE + && System.currentTimeMillis() < deadline + ) { + Thread.sleep(100); + after = readLegacyCrr(haGroupName); + } + assertNotNull(after.getLeft()); + assertEquals("Legacy CRR registry type must round-trip as ZK after overwrite", + ClusterRoleRecord.RegistryType.ZK, after.getLeft().getRegistryType()); + assertTrue("Logical version must advance past the pre-seeded version", + after.getLeft().getVersion() > seeded.getLeft().getVersion()); + // role2 must no longer be OFFLINE (we don't assert UNKNOWN vs STANDBY explicitly because + // either is acceptable depending on peer cache initialization timing). + assertFalse("role2 must be overwritten from OFFLINE", + after.getLeft().getRole(formattedZkUrlFor(ClusterType.PEER)) + == ClusterRoleRecord.ClusterRole.OFFLINE); + } + + // ---------- Legacy CRR sync test helpers ---------- + + /** Per-test Configuration clone with the legacy CRR flag and reconciliation interval. */ + private Configuration legacyCrrConf(boolean legacyEnabled, long periodicSec) { + Configuration src = CLUSTERS.getHBaseCluster1().getConfiguration(); + Configuration cloned = new Configuration(src); + cloned.setBoolean(PHOENIX_HA_LEGACY_CRR_SYNC_ENABLED, legacyEnabled); + cloned.setLong(PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS, periodicSec); + return cloned; + } + + private Pair readLegacyCrr(String haGroupName) throws IOException { + return legacyHaAdmin.getClusterRoleRecordAndStatInZooKeeper(haGroupName); + } + + /** Waits up to the propagation deadline for the cached legacy CRR to match {@code condition}. */ + private Pair awaitLegacyCrr(String haGroupName, + Predicate condition, String description) throws Exception { + long deadline = System.currentTimeMillis() + ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS; + Pair legacy = readLegacyCrr(haGroupName); + while ( + (legacy.getLeft() == null || !condition.test(legacy.getLeft())) + && System.currentTimeMillis() < deadline + ) { + Thread.sleep(100); + legacy = readLegacyCrr(haGroupName); + } + assertNotNull("Legacy znode missing while awaiting: " + description, legacy.getLeft()); + assertTrue("Legacy CRR condition not met within timeout: " + description, + condition.test(legacy.getLeft())); + return legacy; + } + + private Pair awaitLegacyCrrPresent(String haGroupName) throws Exception { + return awaitLegacyCrr(haGroupName, crr -> true, "znode present"); + } + + /** + * Polls the legacy CRR until the role at the local ({@link ClusterType#LOCAL}) or peer + * ({@link ClusterType#PEER}) slot matches {@code expectedRole}. + */ + private Pair awaitLegacyCrrRole(String haGroupName, + ClusterType clusterType, ClusterRoleRecord.ClusterRole expectedRole) throws Exception { + String url = formattedZkUrlFor(clusterType); + return awaitLegacyCrr(haGroupName, crr -> crr.getRole(url) == expectedRole, + clusterType + " role == " + expectedRole); + } + + /** + * Returns the LOCAL or PEER ZK URL in the canonical form used by {@link ClusterRoleRecord} (i.e. + * via {@link JDBCUtil#formatUrl(String, ClusterRoleRecord.RegistryType)}). Required when looking + * up roles by URL on records that the legacy sync wrote. + */ + private String formattedZkUrlFor(ClusterType clusterType) { + String raw = (clusterType == ClusterType.LOCAL) ? zkUrl : peerZKUrl; + return JDBCUtil.formatUrl(raw, ClusterRoleRecord.RegistryType.ZK); + } } From 9c87392e1a6a3dab788f7328c845de803a953d81 Mon Sep 17 00:00:00 2001 From: Ritesh Garg Date: Thu, 14 May 2026 22:06:54 -0700 Subject: [PATCH 2/5] PHOENIX-7787 Addendum: review feedback on legacy /phoenix/ha CRR sync - Replace raw-int sentinels in PhoenixHAAdmin.createOrUpdateClusterRoleRecordWithCAS with a typed PhoenixHAAdmin.LegacyCrrWriteMode { CREATE_NEW, FORCE_OVERWRITE, CAS_WITH_VERSION } and (mode, expectedStatVersion) signature. Negative versions are rejected only for CAS_WITH_VERSION. - In HAGroupStoreClient.syncLegacyCRRIfRoleChanged, when the NodeCache snapshot shows the legacy znode as absent, fall back to an authoritative ZK read before the logical-equality check and CAS. Prevents an unnecessary rewrite when the cache lags behind the actual znode (avoids CREATE_NEW vs CAS misroute and the stat-version bump that follows). - Rename testLegacyCrrCASWithConcurrentWriters_OneSucceedsOneStales to testLegacyCrrCasErrorMappingAndModeDispatch and update its Javadoc to reflect what it actually tests (sequential CAS error mapping and mode dispatch). - Add backward-compat JSON tests in ClusterRoleRecordTest covering: missing registryType -> defaults to RPC; explicit RPC round-trips; explicit ZK round-trips; full ZK toJson/fromJson round-trip preserves registryType. Backed by three new JSON fixtures. - Bump "CAS lost" log from DEBUG to INFO so sustained CAS contention is visible in standard operational logs. - Trim Javadoc and inline comments across the touched files for brevity. Generated-by: Cursor (Claude). Co-authored-by: Cursor --- ...taleClusterRoleRecordVersionException.java | 17 +- .../phoenix/jdbc/ClusterRoleRecord.java | 38 +---- .../phoenix/jdbc/HAGroupStoreClient.java | 78 ++++----- .../apache/phoenix/jdbc/PhoenixHAAdmin.java | 115 +++++-------- .../phoenix/jdbc/HAGroupStoreClientIT.java | 152 ++++++++---------- .../phoenix/jdbc/ClusterRoleRecordTest.java | 47 ++++++ .../json/test_role_record_explicit_rpc.json | 10 ++ .../json/test_role_record_explicit_zk.json | 10 ++ .../test_role_record_no_registry_type.json | 9 ++ 9 files changed, 229 insertions(+), 247 deletions(-) create mode 100644 phoenix-core/src/test/resources/json/test_role_record_explicit_rpc.json create mode 100644 phoenix-core/src/test/resources/json/test_role_record_explicit_zk.json create mode 100644 phoenix-core/src/test/resources/json/test_role_record_no_registry_type.json diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/StaleClusterRoleRecordVersionException.java b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/StaleClusterRoleRecordVersionException.java index 01e898960a1..35d85258794 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/StaleClusterRoleRecordVersionException.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/StaleClusterRoleRecordVersionException.java @@ -18,30 +18,17 @@ package org.apache.phoenix.exception; /** - * Exception thrown when a CAS write to the legacy {@code /phoenix/ha} {@code ClusterRoleRecord} - * znode fails because another writer modified it (BadVersion) or created it concurrently - * (NodeExists). Indicates the caller should re-read and retry. - *

- * This is the {@code ClusterRoleRecord} analog of {@link StaleHAGroupStoreRecordVersionException}. - * The two exceptions intentionally live in parallel so callers can disambiguate which znode - * (consistentHA vs ha) the CAS conflict occurred on. + * CAS write to the legacy {@code /phoenix/ha} CRR znode failed (BadVersion or NodeExists); the + * caller should re-read and retry. Analog of {@link StaleHAGroupStoreRecordVersionException}. */ public class StaleClusterRoleRecordVersionException extends Exception { private static final long serialVersionUID = 1L; - /** - * @param msg reason for the exception - */ public StaleClusterRoleRecordVersionException(String msg) { super(msg); } - /** - * @param msg reason for the exception - * @param cause the underlying cause - */ public StaleClusterRoleRecordVersionException(String msg, Throwable cause) { super(msg, cause); } - } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java index aee742d290d..dd6914a19f7 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java @@ -127,18 +127,9 @@ public enum RegistryType { private final long version; /** - * Convenience constructor that defaults {@code registryType} to - * {@link #DEFAULT_PHOENIX_HA_CRR_REGISTRY_TYPE} (RPC). Most call sites in production should use - * this. The legacy {@code /phoenix/ha} sync path uses the explicit-{@link RegistryType} overload - * below to write {@link RegistryType#ZK} records for backward compatibility with pre-consistentHA - * ZK-registry clients. - * @param haGroupName HighAvailability Group name / CRR name - * @param policy Policy used by given CRR - * @param url1 URL for the first cluster (canonicalized per {@code registryType}) - * @param role1 {@link ClusterRole} describing the current state of the first cluster - * @param url2 URL for the second cluster (canonicalized per {@code registryType}) - * @param role2 {@link ClusterRole} describing the current state of the second cluster - * @param version monotonic version of this CRR + * Convenience constructor: defaults {@code registryType} to + * {@link #DEFAULT_PHOENIX_HA_CRR_REGISTRY_TYPE} (RPC). Use the explicit overload below to write + * {@link RegistryType#ZK} records for the legacy {@code /phoenix/ha} znode. */ public ClusterRoleRecord(String haGroupName, HighAvailabilityPolicy policy, String url1, ClusterRole role1, String url2, ClusterRole role2, long version) { @@ -147,13 +138,8 @@ public ClusterRoleRecord(String haGroupName, HighAvailabilityPolicy policy, Stri } /** - * Canonical constructor; also the {@code @JsonCreator} entry point so the persisted - * {@code registryType} round-trips correctly. Records written prior to this change have no - * {@code registryType} field in their JSON; for those, Jackson passes {@code null} here and we - * default to {@link #DEFAULT_PHOENIX_HA_CRR_REGISTRY_TYPE} (RPC). - *

- * Outside of the legacy {@code /phoenix/ha} sync path, prefer the no-registry convenience - * constructor above. + * Canonical / {@code @JsonCreator} constructor. A {@code null} {@code registryType} defaults to + * {@link #DEFAULT_PHOENIX_HA_CRR_REGISTRY_TYPE} (RPC). */ @JsonCreator public ClusterRoleRecord(@JsonProperty("haGroupName") String haGroupName, @@ -228,17 +214,9 @@ public boolean hasSameInfo(ClusterRoleRecord other) { } /** - * Returns true if {@code other} is logically equivalent to this record, ignoring {@code version} - * and {@code registryType}. Compares only the six fields that determine whether the persisted - * record needs to be rewritten: {@code haGroupName, policy, url1, url2, role1, role2}. - *

- * Used by the legacy {@code /phoenix/ha} sync path as a short-circuit: if the existing znode is - * logically equal to what we'd write, skip the CAS round-trip. {@code version} is excluded - * because every desired CRR bumps it; {@code registryType} is excluded so a pre-existing - * RPC-stamped record converges to ZK on the next role change rather than thrashing on every sync - * cycle. - *

- * Returns {@code false} if {@code other} is {@code null}. + * Equality on the six identity/role fields ({@code haGroupName, policy, url1, url2, role1, + * role2}); ignores {@code version} (always bumps) and {@code registryType} (avoids RPC->ZK + * thrash). Returns {@code false} if {@code other} is {@code null}. */ public boolean isLogicallyEqualIgnoringVersionAndRegistry(ClusterRoleRecord other) { if (other == null) { diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java index fd498f4b92a..c25336a3d7d 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java @@ -106,13 +106,12 @@ public class HAGroupStoreClient implements Closeable { public static final double ZK_SESSION_TIMEOUT_MULTIPLIER = 1.1; // Maximum jitter in seconds for sync job start time (10 seconds) private static final long SYNC_JOB_MAX_JITTER_SECONDS = 10; - // 0-30s initial jitter on the periodic reconciler (nextLong is exclusive on the upper bound). + // Exclusive upper bound for initial-delay jitter on the periodic reconciler (0..30s). private static final long LEGACY_CRR_SYNC_JOB_MAX_JITTER_SECONDS = 31; private PhoenixHAAdmin phoenixHaAdmin; private PhoenixHAAdmin peerPhoenixHaAdmin; - // Admin on /phoenix/ha; null when feature disabled. volatile: read across event/periodic threads. + // Admin + NodeCache on /phoenix/ha; null when feature disabled. private volatile PhoenixHAAdmin legacyHaAdmin; - // In-memory cache of /phoenix/ha/; avoids a ZK read per sync. Null when feature disabled. private volatile NodeCache legacyCrrNodeCache; private static final Logger LOGGER = LoggerFactory.getLogger(HAGroupStoreClient.class); // Map of > @@ -146,11 +145,9 @@ public class HAGroupStoreClient implements Closeable { CopyOnWriteArraySet> targetStateSubscribers = new ConcurrentHashMap<>(); // Scheduled executor for periodic sync job private ScheduledExecutorService syncExecutor; - // Feature flag captured at construction. + // Legacy CRR sync state. private final boolean legacyCrrSyncEnabled; - // Serializes read-decide-write of the legacy znode across ctor/event/periodic threads. private final Object legacyCrrSyncLock = new Object(); - // Periodic reconciler; null when feature disabled or interval<=0. private volatile ScheduledExecutorService legacyCrrSyncExecutor; public static HAGroupStoreClient getInstance(Configuration conf, String haGroupName) @@ -268,17 +265,14 @@ public static List getHAGroupNames(String zkUrl) throws SQLException { // Start periodic sync job startPeriodicSyncJob(); - // Opt-in legacy /phoenix/ha sync. If the operator enabled this feature, a setup failure - // here propagates so the whole client is marked unhealthy — silently degrading to "no - // sync" would leave pre-consistentHA ZK-registry clients reading a stale legacy znode - // with no signal. The initial sync call itself swallows exceptions internally; only the - // framework setup (admin, NodeCache start) gates client health. + // Opt-in legacy /phoenix/ha sync. Setup failures propagate so the client is marked + // unhealthy rather than silently dropping the legacy znode out of sync. if (legacyCrrSyncEnabled && this.isHealthy) { this.legacyHaAdmin = new PhoenixHAAdmin(this.zkUrl, conf, PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE); this.legacyCrrNodeCache = new NodeCache(this.legacyHaAdmin.getCurator(), toPath(haGroupName)); - this.legacyCrrNodeCache.start(true); // synchronous initial build + this.legacyCrrNodeCache.start(true); syncLegacyCRRIfRoleChanged(); startLegacyCrrReconciliation(); } @@ -923,13 +917,11 @@ private PathChildrenCacheListener createCacheListener(CountDownLatch latch, if (cacheType == ClusterType.LOCAL) { maybeInitializePeerPathChildrenCache(); } - // Propagate combined view to legacy /phoenix/ha; no-op when feature disabled. syncLegacyCRRIfRoleChanged(); } break; case CHILD_REMOVED: - // No-op for legacy /phoenix/ha: operator owns its lifecycle. Peer disappearance is - // naturally reflected as role2=UNKNOWN by the next sync. + // No-op: the legacy /phoenix/ha znode is never deleted by this client. break; case INITIALIZED: latch.countDown(); @@ -1031,7 +1023,6 @@ private void shutdownSyncExecutor() { } } - /** Shuts down the legacy CRR reconciliation executor gracefully (5s grace period). */ private void shutdownLegacyCrrSyncExecutor() { if (legacyCrrSyncExecutor != null) { MoreExecutors.shutdownAndAwaitTermination(legacyCrrSyncExecutor, 5, TimeUnit.SECONDS); @@ -1043,8 +1034,8 @@ private void shutdownLegacyCrrSyncExecutor() { public void close() { try { LOGGER.info("Closing HAGroupStoreClient"); - // Executors first, then caches, then admins. Null-before-close on the last two so an - // in-flight event listener observes either an open resource or null, never half-closed. + // Executors -> caches -> admins. Null-before-close on legacy resources so a racing + // listener sees either a live or null reference, never half-closed. shutdownSyncExecutor(); shutdownLegacyCrrSyncExecutor(); if (pathChildrenCache != null) { @@ -1103,9 +1094,8 @@ private long validateTransitionAndGetWaitTime(HAGroupStoreRecord.HAGroupState cu // ========== Legacy /phoenix/ha CRR Sync ========== /** - * Derives the combined {@link ClusterRoleRecord} from local + peer consistentHA records and - * CAS-writes it to {@code /phoenix/ha} via {@link #legacyCrrNodeCache}. On {@code BadVersion} we - * log and bail out; the NodeCache watcher plus the next event/periodic cycle reconverge. + * Derives the combined CRR from local + peer records and CAS-writes it to {@code /phoenix/ha}. + * CAS losses are logged and skipped; the NodeCache watcher and next periodic cycle reconverge. */ private void syncLegacyCRRIfRoleChanged() { if (!legacyCrrSyncEnabled) { @@ -1127,8 +1117,14 @@ private void syncLegacyCRRIfRoleChanged() { return; } HAGroupStoreRecord peer = getHAGroupStoreRecordFromPeer(); - Pair snapshot = readLegacyCrrFromCache(); + // NodeCache is eventually consistent; on apparent absence, fall back to an + // authoritative ZK read so the equality check and CAS both see consistent state. + Pair snapshot = readLegacyCrrSnapshot(); + if (snapshot.getRight() == null) { + snapshot = legacyHaAdmin.getClusterRoleRecordAndStatInZooKeeper(haGroupName); + } ClusterRoleRecord existing = snapshot.getLeft(); + Stat existingStat = snapshot.getRight(); ClusterRoleRecord desired = buildDesiredLegacyCrr(local, peer, existing); if (desired.isLogicallyEqualIgnoringVersionAndRegistry(existing)) { LOGGER.debug("Legacy CRR for HA group {} already up to date at version {}", haGroupName, @@ -1136,16 +1132,19 @@ private void syncLegacyCRRIfRoleChanged() { return; } try { - legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, desired, - expectedVersionFor(snapshot.getRight())); + if (existingStat == null) { + legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, desired, + PhoenixHAAdmin.LegacyCrrWriteMode.CREATE_NEW, /* ignored */ 0); + } else { + legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, desired, + PhoenixHAAdmin.LegacyCrrWriteMode.CAS_WITH_VERSION, existingStat.getVersion()); + } LOGGER.info("Synced legacy CRR for HA group {} (version {} -> {})", haGroupName, existing != null ? existing.getVersion() : -1L, desired.getVersion()); } catch (StaleClusterRoleRecordVersionException stale) { - // Another writer won. NodeCache will refresh via its watcher; next cycle reconverges. - LOGGER.debug( - "Legacy CRR CAS lost race for HA group {} at expected version {}; " - + "will reconverge on next event/periodic cycle", - haGroupName, snapshot.getRight() != null ? snapshot.getRight().getVersion() : -1); + // CAS lost; next event/periodic cycle reconverges. + LOGGER.info("Legacy CRR CAS lost for HA group {} at expected stat version {}", + haGroupName, existingStat != null ? existingStat.getVersion() : -1); } } catch (Exception e) { LOGGER.warn( @@ -1155,10 +1154,7 @@ private void syncLegacyCRRIfRoleChanged() { } } - /** - * Builds the desired legacy CRR. Always stamps {@link RegistryType#ZK} for pre-consistentHA - * ZK-registry consumers. {@code existing} may be null (no prior legacy znode). - */ + /** Builds the desired legacy CRR, always stamped {@link RegistryType#ZK}. */ private ClusterRoleRecord buildDesiredLegacyCrr(HAGroupStoreRecord local, HAGroupStoreRecord peer, ClusterRoleRecord existing) { ClusterRole role2 = (peer != null) ? peer.getClusterRole() : ClusterRole.UNKNOWN; @@ -1170,8 +1166,11 @@ private ClusterRoleRecord buildDesiredLegacyCrr(HAGroupStoreRecord local, HAGrou baseVersion + 1); } - /** In-memory read of the cached legacy CRR. Returns {@code (null, null)} if absent. */ - private Pair readLegacyCrrFromCache() { + /** + * NodeCache snapshot of the legacy CRR. {@code (null, null)} on cache miss; callers must confirm + * absence with an authoritative ZK read since the cache is eventually consistent. + */ + private Pair readLegacyCrrSnapshot() { ChildData current = legacyCrrNodeCache.getCurrentData(); if (current == null) { return Pair.of(null, null); @@ -1180,14 +1179,7 @@ private Pair readLegacyCrrFromCache() { return Pair.of(record, current.getStat()); } - /** Maps a null/non-null {@link Stat} to the CAS {@code expectedStatVersion} argument. */ - private static int expectedVersionFor(Stat stat) { - return (stat != null) ? stat.getVersion() : PhoenixHAAdmin.CREATE_NEW_RECORD_STAT_VERSION; - } - - /** - * Schedules the periodic reconciler; no-op when interval <= 0 (event-driven sync still runs). - */ + /** Schedules the periodic reconciler; no-op when {@code intervalSec <= 0}. */ private void startLegacyCrrReconciliation() { long intervalSec = conf.getLong(PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS, DEFAULT_PHOENIX_HA_LEGACY_CRR_RECONCILIATION_INTERVAL_SECONDS); diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java index 416aff9e303..727ba8dadd9 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java @@ -84,21 +84,21 @@ public CuratorFramework getCurator(String zkUrl, Properties properties, String n private static final Logger LOG = LoggerFactory.getLogger(PhoenixHAAdmin.class); - /** - * Sentinel for {@code expectedStatVersion} meaning "no existing znode; create it". Routed to a - * {@code create()} call rather than {@code setData()}. Intentionally distinct from - * {@link #ZK_MATCH_ANY_VERSION} so the two semantics can't be confused. - * @see #createOrUpdateClusterRoleRecordWithCAS(String, ClusterRoleRecord, int) - */ - public static final int CREATE_NEW_RECORD_STAT_VERSION = -2; + /** ZK's wildcard for {@code setData()/delete()}: bypasses version check. */ + static final int ZK_MATCH_ANY_VERSION = -1; /** - * ZooKeeper's documented wildcard value for {@code setData} / {@code delete}: when passed, the - * operation skips version-checking and matches any current version (i.e. bypasses CAS). Use only - * for deliberate force-overwrite paths; never for the normal CAS update flow. - * @see #createOrUpdateClusterRoleRecordWithCAS(String, ClusterRoleRecord, int) + * Write mode for {@link #createOrUpdateClusterRoleRecordWithCAS}. The accompanying + * {@code expectedStatVersion} argument is interpreted only for {@link #CAS_WITH_VERSION}. */ - public static final int ZK_MATCH_ANY_VERSION = -1; + public enum LegacyCrrWriteMode { + /** Create the znode; no prior version expected. */ + CREATE_NEW, + /** Unconditional overwrite (no CAS). For operator/migration tooling only. */ + FORCE_OVERWRITE, + /** CAS update; {@code expectedStatVersion} must be {@code >= 0}. */ + CAS_WITH_VERSION + } /** The fully qualified ZK URL for an HBase cluster in format host:port:/hbase */ private final String zkUrl; @@ -541,15 +541,8 @@ public void updateHAGroupStoreRecordInZooKeeper(String haGroupName, } /** - * Atomically reads the {@link HAGroupStoreRecord} and its {@link Stat} from ZooKeeper using - * {@code getData().storingStatIn(stat)}, so the returned stat version always corresponds to the - * returned bytes. This closes a pre-existing TOCTOU window where the previous two-call - * implementation ({@code getData()} + a separate {@code checkExists()}) could return a stat whose - * {@code version} did not match the {@code data} bytes if another writer updated the znode in - * between. - * @param haGroupName the HA group name - * @return a pair of (record, stat); both null if the znode does not exist - * @throws IOException on unexpected ZK errors + * Atomic read of (record, stat) via {@code storingStatIn}. Returns {@code (null, null)} if the + * znode does not exist. */ public Pair getHAGroupStoreRecordInZooKeeper(String haGroupName) throws IOException { @@ -581,17 +574,11 @@ public void deleteHAGroupStoreRecordInZooKeeper(String haGroupName) throws IOExc } } - // ----- Helpers for the legacy /phoenix/ha ClusterRoleRecord sync path. ----- - // These mirror the HAGroupStoreRecord helpers above but operate on ClusterRoleRecord - // and read data + stat atomically. + // ----- Legacy /phoenix/ha ClusterRoleRecord sync helpers ----- /** - * Atomically reads the {@link ClusterRoleRecord} and its {@link Stat} from ZooKeeper using - * {@code getData().storingStatIn(stat)}, so the returned stat version always corresponds to the - * returned bytes. - * @param haGroupName the HA group name - * @return a pair of (record, stat); both null if the znode does not exist - * @throws IOException on unexpected ZK errors + * Atomic read of (record, stat) on the legacy CRR znode. Returns {@code (null, null)} if the + * znode does not exist. */ public Pair getClusterRoleRecordAndStatInZooKeeper(String haGroupName) throws IOException { @@ -609,59 +596,43 @@ public Pair getClusterRoleRecordAndStatInZooKeeper(Stri } /** - * Writes {@code newRecord} to the legacy znode. Dispatch is determined solely by - * {@code expectedStatVersion}, which must be exactly one of: - *

    - *
  • {@link #CREATE_NEW_RECORD_STAT_VERSION}: create the znode (no prior version expected). A - * concurrent {@code NodeExists} surfaces as {@link StaleClusterRoleRecordVersionException} so - * callers can retry uniformly with the BadVersion path.
  • - *
  • {@link #ZK_MATCH_ANY_VERSION}: ZooKeeper's wildcard; forwarded to - * {@code setData().withVersion(-1)} which overwrites unconditionally (no CAS). For deliberate - * force-overwrite paths only.
  • - *
  • Any {@code int >= 0}: CAS update with the given expected ZK stat version. A mismatch - * surfaces as {@link StaleClusterRoleRecordVersionException}.
  • - *
- * Any other value (e.g. {@code -3}, {@code Integer.MIN_VALUE}) is rejected with - * {@link IllegalArgumentException} so a stray sentinel can't silently fall through into one of - * the above semantics. - * @throws StaleClusterRoleRecordVersionException if CAS fails with BadVersion or the create races - * with a concurrent NodeExists - * @throws IOException on other errors - * @throws IllegalArgumentException if {@code expectedStatVersion} is not one of the - * three documented categories + * Writes {@code newRecord} per {@code mode}. {@code expectedStatVersion} is used only for + * {@link LegacyCrrWriteMode#CAS_WITH_VERSION} (must be {@code >= 0}). Both BadVersion and + * NodeExists surface as {@link StaleClusterRoleRecordVersionException}. */ public void createOrUpdateClusterRoleRecordWithCAS(String haGroupName, - ClusterRoleRecord newRecord, int expectedStatVersion) + ClusterRoleRecord newRecord, LegacyCrrWriteMode mode, int expectedStatVersion) throws IOException, StaleClusterRoleRecordVersionException { - // Validate caller intent BEFORE entering the try/catch so the IllegalArgumentException is - // not accidentally wrapped in IOException by the catch-all below. - boolean isCreate = expectedStatVersion == CREATE_NEW_RECORD_STAT_VERSION; - boolean isWildcardOverwrite = expectedStatVersion == ZK_MATCH_ANY_VERSION; - boolean isCasUpdate = expectedStatVersion >= 0; - Preconditions.checkArgument(isCreate || isWildcardOverwrite || isCasUpdate, - "expectedStatVersion must be CREATE_NEW_RECORD_STAT_VERSION (create), ZK_MATCH_ANY_VERSION " - + "(unconditional overwrite), or a non-negative stat version (CAS update); got " - + expectedStatVersion); + Preconditions.checkNotNull(mode, "mode"); + if (mode == LegacyCrrWriteMode.CAS_WITH_VERSION) { + Preconditions.checkArgument(expectedStatVersion >= 0, + "CAS_WITH_VERSION requires expectedStatVersion >= 0; got " + expectedStatVersion); + } try { byte[] data = ClusterRoleRecord.toJson(newRecord); - if (isCreate) { - getCurator().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) - .forPath(toPath(haGroupName), data); - } else { - // Both ZK_MATCH_ANY_VERSION (-1, wildcard) and >= 0 (CAS) are valid arguments to - // setData().withVersion(); ZK applies the appropriate semantics server-side. - getCurator().setData().withVersion(expectedStatVersion).forPath(toPath(haGroupName), data); + switch (mode) { + case CREATE_NEW: + getCurator().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) + .forPath(toPath(haGroupName), data); + break; + case FORCE_OVERWRITE: + getCurator().setData().withVersion(ZK_MATCH_ANY_VERSION).forPath(toPath(haGroupName), + data); + break; + case CAS_WITH_VERSION: + getCurator().setData().withVersion(expectedStatVersion).forPath(toPath(haGroupName), + data); + break; + default: + throw new IllegalStateException("Unhandled LegacyCrrWriteMode: " + mode); } } catch (KeeperException.BadVersionException e) { throw new StaleClusterRoleRecordVersionException( - "Failed to set ClusterRoleRecord for HA group " + haGroupName + " with cached stat version " - + expectedStatVersion, + "CAS failed for HA group " + haGroupName + " at expectedStatVersion " + expectedStatVersion, e); } catch (KeeperException.NodeExistsException e) { - // Race with another writer creating the node; surface as stale-version so caller retries. throw new StaleClusterRoleRecordVersionException( - "Failed to create ClusterRoleRecord for HA group " + haGroupName + ": node already exists", - e); + "Create failed for HA group " + haGroupName + ": node already exists", e); } catch (Exception e) { LOG.error("Failed to write ClusterRoleRecord for HA group {}", haGroupName, e); throw new IOException("Failed to write ClusterRoleRecord for HA group " + haGroupName, e); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java index 1a809370116..a532cc5f174 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java @@ -1300,7 +1300,7 @@ public void testLegacyCrrSyncStateOnlyChangeDoesNotRewriteLegacy() throws Except initialCrrVersion, after.getLeft().getVersion()); } - /** Matrix #10: LOCAL CHILD_REMOVED on consistentHA does not delete the legacy znode. */ + /** LOCAL CHILD_REMOVED on consistentHA does not delete the legacy znode. */ @Test public void testLegacyCrrSyncLocalChildRemovedDoesNotDeleteLegacy() throws Exception { String haGroupName = testName.getMethodName(); @@ -1344,7 +1344,7 @@ public void testLegacyCrrSyncPeriodicDisabledStillSyncsViaEvents() throws Except ClusterRoleRecord.RegistryType.ZK, updated.getRegistryType()); } - /** Matrix #11: PEER CHILD_REMOVED on consistentHA does not delete the legacy znode. */ + /** PEER CHILD_REMOVED on consistentHA does not delete the legacy znode. */ @Test public void testLegacyCrrSyncPeerChildRemovedDoesNotDeleteLegacy() throws Exception { String haGroupName = testName.getMethodName(); @@ -1371,93 +1371,88 @@ public void testLegacyCrrSyncPeerChildRemovedDoesNotDeleteLegacy() throws Except after.getLeft().getVersion() >= initialCrrVersion); } - /** Matrix #7 (helper-level): CAS keeps one writer per attempt; loser sees stale exception. */ + /** + * Each {@link PhoenixHAAdmin.LegacyCrrWriteMode}: error mapping (BadVersion + NodeExists -> + * {@link StaleClusterRoleRecordVersionException}), unconditional FORCE_OVERWRITE, and + * CAS_WITH_VERSION rejecting negative versions. Sequential: ZK serializes versioned writes + * server-side, so the client retry path is identical to a real race. + */ @Test - public void testLegacyCrrCASWithConcurrentWriters_OneSucceedsOneStales() throws Exception { + public void testLegacyCrrCasErrorMappingAndModeDispatch() throws Exception { String haGroupName = testName.getMethodName(); - // Step 1: create the legacy znode using the explicit create sentinel (NOT -1, which ZK - // reserves as the match-any-version wildcard). + // Create. ClusterRoleRecord initial = new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.FAILOVER, ClusterRoleRecord.RegistryType.ZK, this.zkUrl, ClusterRoleRecord.ClusterRole.ACTIVE, this.peerZKUrl, ClusterRoleRecord.ClusterRole.STANDBY, 1L); legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, initial, - PhoenixHAAdmin.CREATE_NEW_RECORD_STAT_VERSION); + PhoenixHAAdmin.LegacyCrrWriteMode.CREATE_NEW, 0); - // Step 2: both "writers" snapshot the same stat version. Pair existing = legacyHaAdmin.getClusterRoleRecordAndStatInZooKeeper(haGroupName); assertNotNull(existing.getLeft()); int sharedVersion = existing.getRight().getVersion(); - // Step 3: writer A wins the race. + // CAS winner. ClusterRoleRecord writerA = new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.FAILOVER, ClusterRoleRecord.RegistryType.ZK, this.zkUrl, ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY, this.peerZKUrl, ClusterRoleRecord.ClusterRole.STANDBY, 2L); - legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, writerA, sharedVersion); + legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, writerA, + PhoenixHAAdmin.LegacyCrrWriteMode.CAS_WITH_VERSION, sharedVersion); - // Step 4: writer B uses the now-stale stat version → BadVersion → mapped to - // StaleClusterRoleRecordVersionException (exactly what the client's retry path catches). + // CAS loser: same expected version -> BadVersion -> StaleClusterRoleRecordVersionException. ClusterRoleRecord writerB = new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.FAILOVER, ClusterRoleRecord.RegistryType.ZK, this.zkUrl, ClusterRoleRecord.ClusterRole.STANDBY, this.peerZKUrl, ClusterRoleRecord.ClusterRole.ACTIVE, 2L); - assertThrows(StaleClusterRoleRecordVersionException.class, () -> legacyHaAdmin - .createOrUpdateClusterRoleRecordWithCAS(haGroupName, writerB, sharedVersion)); + assertThrows(StaleClusterRoleRecordVersionException.class, + () -> legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, writerB, + PhoenixHAAdmin.LegacyCrrWriteMode.CAS_WITH_VERSION, sharedVersion)); - // Step 5: final znode state reflects writer A's update; loser observed BadVersion. Pair winner = legacyHaAdmin.getClusterRoleRecordAndStatInZooKeeper(haGroupName); - assertNotNull(winner.getLeft()); assertEquals(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY, winner.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL))); - assertTrue("Stat version must advance exactly once for the single winning CAS", - winner.getRight().getVersion() > sharedVersion); + assertTrue(winner.getRight().getVersion() > sharedVersion); - // Step 6: concurrent create races map to the same exception (NodeExistsException is also - // surfaced as StaleClusterRoleRecordVersionException so the client retry path is uniform). + // CREATE_NEW on an existing znode -> NodeExists -> StaleClusterRoleRecordVersionException. ClusterRoleRecord raceCreate = new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.FAILOVER, ClusterRoleRecord.RegistryType.ZK, this.zkUrl, ClusterRoleRecord.ClusterRole.STANDBY, this.peerZKUrl, ClusterRoleRecord.ClusterRole.STANDBY, 3L); assertThrows(StaleClusterRoleRecordVersionException.class, () -> legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, raceCreate, - PhoenixHAAdmin.CREATE_NEW_RECORD_STAT_VERSION)); + PhoenixHAAdmin.LegacyCrrWriteMode.CREATE_NEW, 0)); - // Step 7: ZK_MATCH_ANY_VERSION is allowed (operator/migration tooling may want a - // force-overwrite path), but any negative value that is neither the wildcard nor - // CREATE_NEW_RECORD_STAT_VERSION must fail fast so a stray sentinel doesn't silently take - // on one of the defined semantics. - Stat statBeforeWildcard = + // FORCE_OVERWRITE bypasses CAS and bumps the stat version. + Stat statBeforeOverwrite = legacyHaAdmin.getClusterRoleRecordAndStatInZooKeeper(haGroupName).getRight(); - ClusterRoleRecord wildcardOverwrite = + ClusterRoleRecord overwrite = new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.FAILOVER, ClusterRoleRecord.RegistryType.ZK, this.zkUrl, ClusterRoleRecord.ClusterRole.STANDBY, this.peerZKUrl, ClusterRoleRecord.ClusterRole.STANDBY, 4L); - legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, wildcardOverwrite, - PhoenixHAAdmin.ZK_MATCH_ANY_VERSION); - Pair afterWildcard = + legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, overwrite, + PhoenixHAAdmin.LegacyCrrWriteMode.FORCE_OVERWRITE, 0); + Pair afterOverwrite = legacyHaAdmin.getClusterRoleRecordAndStatInZooKeeper(haGroupName); - assertEquals("ZK_MATCH_ANY_VERSION must unconditionally overwrite", - ClusterRoleRecord.ClusterRole.STANDBY, - afterWildcard.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL))); - assertTrue("Wildcard overwrite must still bump the stat version", - afterWildcard.getRight().getVersion() > statBeforeWildcard.getVersion()); - - // Negative values that are neither the create sentinel nor the wildcard must be rejected - // up front with IllegalArgumentException, before any ZK call is attempted. - ClusterRoleRecord illegalArgumentRecord = + assertEquals(ClusterRoleRecord.ClusterRole.STANDBY, + afterOverwrite.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL))); + assertTrue(afterOverwrite.getRight().getVersion() > statBeforeOverwrite.getVersion()); + + // CAS_WITH_VERSION rejects negative expectedStatVersion before any ZK call. + ClusterRoleRecord illegalRecord = new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.FAILOVER, ClusterRoleRecord.RegistryType.ZK, this.zkUrl, ClusterRoleRecord.ClusterRole.OFFLINE, this.peerZKUrl, ClusterRoleRecord.ClusterRole.OFFLINE, 5L); - assertThrows(IllegalArgumentException.class, () -> legacyHaAdmin - .createOrUpdateClusterRoleRecordWithCAS(haGroupName, illegalArgumentRecord, -3)); assertThrows(IllegalArgumentException.class, - () -> legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, illegalArgumentRecord, - Integer.MIN_VALUE)); + () -> legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, illegalRecord, + PhoenixHAAdmin.LegacyCrrWriteMode.CAS_WITH_VERSION, -1)); + assertThrows(IllegalArgumentException.class, + () -> legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, illegalRecord, + PhoenixHAAdmin.LegacyCrrWriteMode.CAS_WITH_VERSION, Integer.MIN_VALUE)); } - /** Matrix #4: peer-side role flip propagates to role2 in the local legacy CRR. */ + /** Peer-side role flip propagates to role2 in the local legacy CRR. */ @Test public void testLegacyCrrSyncPeerRoleFlipUpdatesLegacyRole2() throws Exception { String haGroupName = testName.getMethodName(); @@ -1486,7 +1481,7 @@ public void testLegacyCrrSyncPeerRoleFlipUpdatesLegacyRole2() throws Exception { ClusterRoleRecord.RegistryType.ZK, after.getLeft().getRegistryType()); } - /** Matrix #6: absent peer record yields role2=UNKNOWN; converges when the peer record appears. */ + /** Absent peer record yields role2=UNKNOWN; converges when the peer record appears. */ @Test public void testLegacyCrrSyncPeerAbsentYieldsUnknownAndConvergesOnRecovery() throws Exception { String haGroupName = testName.getMethodName(); @@ -1513,7 +1508,7 @@ public void testLegacyCrrSyncPeerAbsentYieldsUnknownAndConvergesOnRecovery() thr recovered.getLeft().getVersion() > initialVersion); } - /** Matrix #12: registryType stays ZK across multiple sync cycles (never reverts to RPC). */ + /** registryType stays ZK across multiple sync cycles (never reverts to RPC). */ @Test public void testLegacyCrrSyncRegistryTypePreservedAcrossMultipleCycles() throws Exception { String haGroupName = testName.getMethodName(); @@ -1557,7 +1552,7 @@ public void testLegacyCrrSyncRegistryTypePreservedAcrossMultipleCycles() throws } } - /** Matrix #9: periodic loop repairs an external divergence with no consistentHA event. */ + /** Periodic loop repairs an external divergence with no consistentHA event. */ @Test public void testLegacyCrrSyncPeriodicReconciliationRecoversAfterDivergence() throws Exception { String haGroupName = testName.getMethodName(); @@ -1565,24 +1560,21 @@ public void testLegacyCrrSyncPeriodicReconciliationRecoversAfterDivergence() thr HAGroupStoreClient client = HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); assertNotNull(client); Pair initial = awaitLegacyCrrPresent(haGroupName); - assertEquals("Sanity: local role1 should be ACTIVE per @Before seed", - ClusterRoleRecord.ClusterRole.ACTIVE, + assertEquals(ClusterRoleRecord.ClusterRole.ACTIVE, initial.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL))); - // Externally overwrite the legacy znode with role1=STANDBY (logically different from desired). - // This does NOT trigger any /phoenix/consistentHA event, so the only path to recovery is - // the periodic reconciliation loop. + // Externally corrupt the legacy znode; no consistentHA event fires, so only the periodic + // reconciler can recover. ClusterRoleRecord corrupt = new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.FAILOVER, ClusterRoleRecord.RegistryType.ZK, this.zkUrl, ClusterRoleRecord.ClusterRole.STANDBY, this.peerZKUrl, ClusterRoleRecord.ClusterRole.ACTIVE, initial.getLeft().getVersion() + 10); legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, corrupt, - initial.getRight().getVersion()); + PhoenixHAAdmin.LegacyCrrWriteMode.CAS_WITH_VERSION, initial.getRight().getVersion()); Pair corrupted = readLegacyCrr(haGroupName); - assertEquals("Sanity: corruption took (role1 now STANDBY)", - ClusterRoleRecord.ClusterRole.STANDBY, + assertEquals(ClusterRoleRecord.ClusterRole.STANDBY, corrupted.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL))); - // Wait for the periodic loop to fire. Worst case: jitter=30s, so allow 40s. + // Worst-case wait: jitter up to 30s + 2s interval; allow 40s. long deadline = System.currentTimeMillis() + 40_000L; Pair after = readLegacyCrr(haGroupName); while ( @@ -1593,27 +1585,23 @@ public void testLegacyCrrSyncPeriodicReconciliationRecoversAfterDivergence() thr Thread.sleep(500); after = readLegacyCrr(haGroupName); } - assertNotNull("Legacy znode missing after periodic reconciliation window", after.getLeft()); - assertEquals("Periodic reconciliation must restore role1 to ACTIVE", - ClusterRoleRecord.ClusterRole.ACTIVE, + assertNotNull(after.getLeft()); + assertEquals(ClusterRoleRecord.ClusterRole.ACTIVE, after.getLeft().getRole(formattedZkUrlFor(ClusterType.LOCAL))); - assertTrue("Version must monotonically advance past the externally-bumped version", - after.getLeft().getVersion() > corrupted.getLeft().getVersion()); - assertEquals("Registry type must remain ZK after periodic recovery", - ClusterRoleRecord.RegistryType.ZK, after.getLeft().getRegistryType()); + assertTrue(after.getLeft().getVersion() > corrupted.getLeft().getVersion()); + assertEquals(ClusterRoleRecord.RegistryType.ZK, after.getLeft().getRegistryType()); } - /** Matrix #7/#12 e2e: client overwrites a pre-seeded logically-stale legacy znode. */ + /** Client overwrites a pre-seeded, logically-stale legacy znode on initial sync. */ @Test public void testLegacyCrrSyncOverwritesPreSeededLogicallyStaleZnode() throws Exception { String haGroupName = testName.getMethodName(); - // Pre-seed the legacy znode with role2=OFFLINE so the desired CRR (role2=UNKNOWN by default, - // since no peer record exists yet) is logically different. + // Pre-seed role2=OFFLINE; desired CRR is role2=UNKNOWN (no peer record), so logically stale. ClusterRoleRecord preSeed = new ClusterRoleRecord(haGroupName, HighAvailabilityPolicy.FAILOVER, ClusterRoleRecord.RegistryType.ZK, this.zkUrl, ClusterRoleRecord.ClusterRole.ACTIVE, this.peerZKUrl, ClusterRoleRecord.ClusterRole.OFFLINE, 5L); legacyHaAdmin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, preSeed, - PhoenixHAAdmin.CREATE_NEW_RECORD_STAT_VERSION); + PhoenixHAAdmin.LegacyCrrWriteMode.CREATE_NEW, 0); Pair seeded = readLegacyCrr(haGroupName); assertNotNull(seeded.getLeft()); assertEquals(ClusterRoleRecord.ClusterRole.OFFLINE, @@ -1635,20 +1623,17 @@ public void testLegacyCrrSyncOverwritesPreSeededLogicallyStaleZnode() throws Exc after = readLegacyCrr(haGroupName); } assertNotNull(after.getLeft()); - assertEquals("Legacy CRR registry type must round-trip as ZK after overwrite", - ClusterRoleRecord.RegistryType.ZK, after.getLeft().getRegistryType()); - assertTrue("Logical version must advance past the pre-seeded version", - after.getLeft().getVersion() > seeded.getLeft().getVersion()); - // role2 must no longer be OFFLINE (we don't assert UNKNOWN vs STANDBY explicitly because - // either is acceptable depending on peer cache initialization timing). - assertFalse("role2 must be overwritten from OFFLINE", - after.getLeft().getRole(formattedZkUrlFor(ClusterType.PEER)) - == ClusterRoleRecord.ClusterRole.OFFLINE); + assertEquals(ClusterRoleRecord.RegistryType.ZK, after.getLeft().getRegistryType()); + assertTrue(after.getLeft().getVersion() > seeded.getLeft().getVersion()); + // role2 must no longer be OFFLINE (UNKNOWN vs STANDBY both acceptable; depends on peer + // cache init timing). + assertFalse(after.getLeft().getRole(formattedZkUrlFor(ClusterType.PEER)) + == ClusterRoleRecord.ClusterRole.OFFLINE); } // ---------- Legacy CRR sync test helpers ---------- - /** Per-test Configuration clone with the legacy CRR flag and reconciliation interval. */ + /** Configuration clone with the legacy CRR flag and reconciliation interval set. */ private Configuration legacyCrrConf(boolean legacyEnabled, long periodicSec) { Configuration src = CLUSTERS.getHBaseCluster1().getConfiguration(); Configuration cloned = new Configuration(src); @@ -1661,7 +1646,7 @@ private Pair readLegacyCrr(String haGroupName) throws I return legacyHaAdmin.getClusterRoleRecordAndStatInZooKeeper(haGroupName); } - /** Waits up to the propagation deadline for the cached legacy CRR to match {@code condition}. */ + /** Polls the legacy CRR until {@code condition} matches or the propagation deadline elapses. */ private Pair awaitLegacyCrr(String haGroupName, Predicate condition, String description) throws Exception { long deadline = System.currentTimeMillis() + ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS; @@ -1683,10 +1668,7 @@ private Pair awaitLegacyCrrPresent(String haGroupName) return awaitLegacyCrr(haGroupName, crr -> true, "znode present"); } - /** - * Polls the legacy CRR until the role at the local ({@link ClusterType#LOCAL}) or peer - * ({@link ClusterType#PEER}) slot matches {@code expectedRole}. - */ + /** Polls until the LOCAL or PEER role in the legacy CRR matches {@code expectedRole}. */ private Pair awaitLegacyCrrRole(String haGroupName, ClusterType clusterType, ClusterRoleRecord.ClusterRole expectedRole) throws Exception { String url = formattedZkUrlFor(clusterType); @@ -1694,11 +1676,7 @@ private Pair awaitLegacyCrrRole(String haGroupName, clusterType + " role == " + expectedRole); } - /** - * Returns the LOCAL or PEER ZK URL in the canonical form used by {@link ClusterRoleRecord} (i.e. - * via {@link JDBCUtil#formatUrl(String, ClusterRoleRecord.RegistryType)}). Required when looking - * up roles by URL on records that the legacy sync wrote. - */ + /** LOCAL or PEER ZK URL in the canonical ZK-registry form used by the legacy sync. */ private String formattedZkUrlFor(ClusterType clusterType) { String raw = (clusterType == ClusterType.LOCAL) ? zkUrl : peerZKUrl; return JDBCUtil.formatUrl(raw, ClusterRoleRecord.RegistryType.ZK); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ClusterRoleRecordTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ClusterRoleRecordTest.java index e6e71d86a3f..1dfe3dae960 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ClusterRoleRecordTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ClusterRoleRecordTest.java @@ -229,6 +229,53 @@ public void testClusterRoleFromInvalidBytes() { assertEquals(ClusterRole.UNKNOWN, role); } + /** JSON without a {@code registryType} field defaults to RPC on deserialization. */ + @Test + public void testFromJsonWithoutRegistryTypeDefaultsToRpc() throws IOException { + byte[] data = readFile("json/test_role_record_no_registry_type.json"); + Optional opt = ClusterRoleRecord.fromJson(data); + assertTrue(opt.isPresent()); + ClusterRoleRecord record = opt.get(); + assertEquals(ClusterRoleRecord.RegistryType.RPC, record.getRegistryType()); + assertEquals(HighAvailabilityPolicy.FAILOVER, record.getPolicy()); + assertEquals(7L, record.getVersion()); + assertEquals(ClusterRole.ACTIVE, record.getRole1()); + assertEquals(ClusterRole.STANDBY, record.getRole2()); + } + + /** Explicit {@code registryType=RPC} must round-trip as RPC. */ + @Test + public void testFromJsonExplicitRpcRegistryTypeRoundTrips() throws IOException { + Optional opt = + ClusterRoleRecord.fromJson(readFile("json/test_role_record_explicit_rpc.json")); + assertTrue(opt.isPresent()); + assertEquals(ClusterRoleRecord.RegistryType.RPC, opt.get().getRegistryType()); + assertEquals(11L, opt.get().getVersion()); + } + + /** Explicit {@code registryType=ZK} round-trips as ZK. */ + @Test + public void testFromJsonExplicitZkRegistryTypeRoundTrips() throws IOException { + Optional opt = + ClusterRoleRecord.fromJson(readFile("json/test_role_record_explicit_zk.json")); + assertTrue(opt.isPresent()); + assertEquals(ClusterRoleRecord.RegistryType.ZK, opt.get().getRegistryType()); + assertEquals(13L, opt.get().getVersion()); + } + + /** Round-trip: ZK-registry CRR -> JSON -> CRR preserves {@code registryType}. */ + @Test + public void testToFromJsonPreservesZkRegistryTypeAcrossRoundTrip() throws IOException { + ClusterRoleRecord written = new ClusterRoleRecord(testName.getMethodName(), + HighAvailabilityPolicy.FAILOVER, ClusterRoleRecord.RegistryType.ZK, "zk1\\:2181::/hbase", + ClusterRole.ACTIVE, "zk2\\:2181::/hbase", ClusterRole.STANDBY, 42L); + Optional read = + ClusterRoleRecord.fromJson(ClusterRoleRecord.toJson(written)); + assertTrue(read.isPresent()); + assertEquals(ClusterRoleRecord.RegistryType.ZK, read.get().getRegistryType()); + assertEquals(written, read.get()); + } + // Private Helper Methods private ClusterRoleRecord getClusterRoleRecord(String name, HighAvailabilityPolicy policy, diff --git a/phoenix-core/src/test/resources/json/test_role_record_explicit_rpc.json b/phoenix-core/src/test/resources/json/test_role_record_explicit_rpc.json new file mode 100644 index 00000000000..e2b7269b48b --- /dev/null +++ b/phoenix-core/src/test/resources/json/test_role_record_explicit_rpc.json @@ -0,0 +1,10 @@ +{ + "haGroupName" : "testFromJsonExplicitRpcRoundTrips", + "policy" : "FAILOVER", + "registryType" : "RPC", + "url1" : "url1\\:2181", + "role1" : "ACTIVE", + "url2" : "url2\\:2181", + "role2" : "STANDBY", + "version" : 11 +} diff --git a/phoenix-core/src/test/resources/json/test_role_record_explicit_zk.json b/phoenix-core/src/test/resources/json/test_role_record_explicit_zk.json new file mode 100644 index 00000000000..1e9ce174ef7 --- /dev/null +++ b/phoenix-core/src/test/resources/json/test_role_record_explicit_zk.json @@ -0,0 +1,10 @@ +{ + "haGroupName" : "testFromJsonExplicitZkRoundTrips", + "policy" : "FAILOVER", + "registryType" : "ZK", + "url1" : "zk1\\:2181::/hbase", + "role1" : "ACTIVE", + "url2" : "zk2\\:2181::/hbase", + "role2" : "STANDBY", + "version" : 13 +} diff --git a/phoenix-core/src/test/resources/json/test_role_record_no_registry_type.json b/phoenix-core/src/test/resources/json/test_role_record_no_registry_type.json new file mode 100644 index 00000000000..517e061b654 --- /dev/null +++ b/phoenix-core/src/test/resources/json/test_role_record_no_registry_type.json @@ -0,0 +1,9 @@ +{ + "haGroupName" : "testFromJsonWithoutRegistryTypeDefaultsToRpc", + "policy" : "FAILOVER", + "url1" : "url1\\:2181", + "role1" : "ACTIVE", + "url2" : "url2\\:2181", + "role2" : "STANDBY", + "version" : 7 +} From b97d511f59363d8b93d07e525f510449e22124a8 Mon Sep 17 00:00:00 2001 From: Ritesh Garg Date: Thu, 14 May 2026 23:01:42 -0700 Subject: [PATCH 3/5] PHOENIX-7787 Restore pre-PR Javadoc detail on two trimmed methods - PhoenixHAAdmin.getHAGroupStoreRecordInZooKeeper: restore @param/@return/@throws from the pre-PR Javadoc, keeping the atomic-read note. - ClusterRoleRecord canonical and convenience constructors: restore per-parameter @param tags; on the @JsonCreator constructor also restore the URL-normalization note and document the registryType backward-compat default. No behavior change. Generated-by: Cursor (Claude). Co-authored-by: Cursor --- .../phoenix/jdbc/ClusterRoleRecord.java | 25 +++++++++++++++++-- .../apache/phoenix/jdbc/PhoenixHAAdmin.java | 7 ++++-- 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java index dd6914a19f7..9ca7ca23288 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java @@ -130,6 +130,13 @@ public enum RegistryType { * Convenience constructor: defaults {@code registryType} to * {@link #DEFAULT_PHOENIX_HA_CRR_REGISTRY_TYPE} (RPC). Use the explicit overload below to write * {@link RegistryType#ZK} records for the legacy {@code /phoenix/ha} znode. + * @param haGroupName HighAvailability Group name / CRR name + * @param policy Policy used by the given CRR + * @param url1 ZK/HMaster url for the first cluster + * @param role1 {@link ClusterRole} describing the current state of the first cluster + * @param url2 ZK/HMaster url for the second cluster + * @param role2 {@link ClusterRole} describing the current state of the second cluster + * @param version monotonic version of this CRR */ public ClusterRoleRecord(String haGroupName, HighAvailabilityPolicy policy, String url1, ClusterRole role1, String url2, ClusterRole role2, long version) { @@ -138,8 +145,22 @@ public ClusterRoleRecord(String haGroupName, HighAvailabilityPolicy policy, Stri } /** - * Canonical / {@code @JsonCreator} constructor. A {@code null} {@code registryType} defaults to - * {@link #DEFAULT_PHOENIX_HA_CRR_REGISTRY_TYPE} (RPC). + * Canonical constructor; also the {@code @JsonCreator} entry point so the persisted + * {@code registryType} round-trips correctly. Records persisted before {@code registryType} was + * added as a JSON field pass {@code null} here and default to + * {@link #DEFAULT_PHOENIX_HA_CRR_REGISTRY_TYPE} (RPC). URLs are normalized to the + * {@code registryType}-specific canonical form for accurate comparisons; the resulting + * {@code url1}/{@code url2} are stored as {@code zk1\:port1,zk2\:port2,...::znode} for ZK or + * {@code master1\:port1,master2\:port2,...} for RPC/MASTER. + * @param haGroupName HighAvailability Group name / CRR name + * @param policy Policy used by the given CRR + * @param registryType {@link RegistryType} for URL normalization; {@code null} defaults to + * {@link #DEFAULT_PHOENIX_HA_CRR_REGISTRY_TYPE} (RPC) + * @param url1 ZK/HMaster url for the first cluster + * @param role1 {@link ClusterRole} describing the current state of the first cluster + * @param url2 ZK/HMaster url for the second cluster + * @param role2 {@link ClusterRole} describing the current state of the second cluster + * @param version monotonic version of this CRR */ @JsonCreator public ClusterRoleRecord(@JsonProperty("haGroupName") String haGroupName, diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java index 727ba8dadd9..ad15607201c 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java @@ -541,8 +541,11 @@ public void updateHAGroupStoreRecordInZooKeeper(String haGroupName, } /** - * Atomic read of (record, stat) via {@code storingStatIn}. Returns {@code (null, null)} if the - * znode does not exist. + * Gets the HAGroupStoreRecord and Stat from ZooKeeper. Reads (record, stat) atomically via + * {@code storingStatIn} so the returned stat version always corresponds to the returned bytes. + * @param haGroupName the HA group name + * @return a pair of HAGroupStoreRecord and Stat; both {@code null} if the znode does not exist + * @throws IOException if any error occurs during the retrieval */ public Pair getHAGroupStoreRecordInZooKeeper(String haGroupName) throws IOException { From 54c87f074ea59f4cb443126dcf4a9405b710ade2 Mon Sep 17 00:00:00 2001 From: ritegarg <58840065+ritegarg@users.noreply.github.com> Date: Thu, 14 May 2026 23:20:57 -0700 Subject: [PATCH 4/5] Update StaleClusterRoleRecordVersionException.java --- .../exception/StaleClusterRoleRecordVersionException.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/StaleClusterRoleRecordVersionException.java b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/StaleClusterRoleRecordVersionException.java index 35d85258794..0b20a53a7bc 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/StaleClusterRoleRecordVersionException.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/StaleClusterRoleRecordVersionException.java @@ -19,7 +19,7 @@ /** * CAS write to the legacy {@code /phoenix/ha} CRR znode failed (BadVersion or NodeExists); the - * caller should re-read and retry. Analog of {@link StaleHAGroupStoreRecordVersionException}. + * caller can re-read and retry if needed. Analog of {@link StaleHAGroupStoreRecordVersionException}. */ public class StaleClusterRoleRecordVersionException extends Exception { private static final long serialVersionUID = 1L; From bf925cc1cf7bb45b017a8919a6ecce9a49922670 Mon Sep 17 00:00:00 2001 From: Ritesh Garg Date: Thu, 14 May 2026 23:24:10 -0700 Subject: [PATCH 5/5] PHOENIX-7787 Restore pre-PR URL-normalization comment in ClusterRoleRecord ctor Restores L155-L162 of the pre-PR ClusterRoleRecord.java verbatim (8-line URL-normalization rationale). No behavior change. Generated-by: Cursor (Claude). Co-authored-by: Cursor --- .../java/org/apache/phoenix/jdbc/ClusterRoleRecord.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java index 9ca7ca23288..327db3eaa5e 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java @@ -172,7 +172,14 @@ public ClusterRoleRecord(@JsonProperty("haGroupName") String haGroupName, this.policy = policy; this.registryType = registryType != null ? registryType : DEFAULT_PHOENIX_HA_CRR_REGISTRY_TYPE; - // Normalize URLs to the registry-specific canonical form for accurate comparisons. + // Do we really need to normalize here ? + // We are normalizing to have urls in specific formats for each registryType for getting + // accurate comparisons. We are passing registryType as these url most probably won't have + // protocol in url, and it might be normalized based to wrong registry type, as we normalize + // w.r.t {@link ConnectionInfo.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY}, + // As we are expecting only master URLs as Consistent Cluster Failover onwards we only will + // allow RPC registry type url will be in form :- + // master1\\:port1,master2\\:port2,master3\\:port3,master4\\:port4,master5\\:port5 url1 = JDBCUtil.formatUrl(url1, this.registryType); url2 = JDBCUtil.formatUrl(url2, this.registryType);