Skip to content

Commit e35a7ac

Browse files
ritegargRitesh Garg
authored andcommitted
PHOENIX-7719 Prewarm HAGroupStore Client (#2313)
Co-authored-by: Ritesh Garg <ritesh.garg@riteshg-ltmd34g.internal.salesforce.com>
1 parent 2fb17d8 commit e35a7ac

4 files changed

Lines changed: 292 additions & 13 deletions

File tree

phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,9 @@ public interface QueryServices extends SQLCloseable {
450450
// Check HAGroup is Stale for mutations
451451
public static final String HA_GROUP_STALE_FOR_MUTATION_CHECK_ENABLED =
452452
"phoenix.ha.group.stale.for.mutation.check.enabled";
453+
// Enable prewarming of HAGroupStoreClients at RegionServer startup
454+
String HA_GROUP_STORE_CLIENT_PREWARM_ENABLED
455+
= "phoenix.ha.group.store.client.prewarm.enabled";
453456
// Enable Thread Pool Creation in CQSI to be used for HBase Client.
454457
String CQSI_THREAD_POOL_ENABLED = "phoenix.cqsi.thread.pool.enabled";
455458
// CQSI Thread Pool Related Configuration.

phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import static org.apache.phoenix.query.QueryServices.GROUPBY_MAX_CACHE_SIZE_ATTRIB;
6161
import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILLABLE_ATTRIB;
6262
import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILL_FILES_ATTRIB;
63+
import static org.apache.phoenix.query.QueryServices.HA_GROUP_STORE_CLIENT_PREWARM_ENABLED;
6364
import static org.apache.phoenix.query.QueryServices.HA_GROUP_STORE_SYNC_INTERVAL_SECONDS;
6465
import static org.apache.phoenix.query.QueryServices.HBASE_CLIENT_SCANNER_TIMEOUT_ATTRIB;
6566
import static org.apache.phoenix.query.QueryServices.IMMUTABLE_ROWS_ATTRIB;
@@ -493,6 +494,7 @@ public class QueryServicesOptions {
493494

494495
public static final Boolean DEFAULT_CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED = false;
495496
public static final Boolean DEFAULT_HA_GROUP_STALE_FOR_MUTATION_CHECK_ENABLED = true;
497+
public static final Boolean DEFAULT_HA_GROUP_STORE_CLIENT_PREWARM_ENABLED = true;
496498
public static final Boolean DEFAULT_CQSI_THREAD_POOL_ENABLED = false;
497499
public static final int DEFAULT_CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS = 60;
498500
public static final int DEFAULT_CQSI_THREAD_POOL_CORE_POOL_SIZE = 25;
@@ -639,7 +641,9 @@ public static QueryServicesOptions withDefaults() {
639641
.setIfUnset(CDC_TTL_SHARED_CACHE_EXPIRY_SECONDS, DEFAULT_CDC_TTL_SHARED_CACHE_EXPIRY_SECONDS)
640642
.setIfUnset(REPLICATION_LOG_ROTATION_TIME_MS_KEY, DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS)
641643
.setIfUnset(HA_GROUP_STORE_SYNC_INTERVAL_SECONDS,
642-
DEFAULT_HA_GROUP_STORE_SYNC_INTERVAL_SECONDS);
644+
DEFAULT_HA_GROUP_STORE_SYNC_INTERVAL_SECONDS)
645+
.setIfUnset(HA_GROUP_STORE_CLIENT_PREWARM_ENABLED,
646+
DEFAULT_HA_GROUP_STORE_CLIENT_PREWARM_ENABLED);
643647

644648
// HBase sets this to 1, so we reset it to something more appropriate.
645649
// Hopefully HBase will change this, because we can't know if a user set

phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java

Lines changed: 108 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,13 @@
2424
import com.google.protobuf.RpcController;
2525
import com.google.protobuf.Service;
2626
import java.io.IOException;
27+
import java.util.ArrayList;
2728
import java.util.Collections;
29+
import java.util.Iterator;
30+
import java.util.List;
31+
import java.util.concurrent.ExecutorService;
32+
import java.util.concurrent.Executors;
33+
2834
import org.apache.hadoop.conf.Configuration;
2935
import org.apache.hadoop.hbase.CoprocessorEnvironment;
3036
import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
@@ -58,18 +64,26 @@ public class PhoenixRegionServerEndpoint extends
5864
private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixRegionServerEndpoint.class);
5965
private MetricsMetadataCachingSource metricsSource;
6066
protected Configuration conf;
61-
private String zkUrl;
67+
private ExecutorService prewarmExecutor;
6268

6369
// regionserver level thread pool used by Uncovered Indexes to scan data table rows
6470
private static TaskRunner uncoveredIndexThreadPool;
6571

6672
@Override
6773
public void start(CoprocessorEnvironment env) throws IOException {
6874
this.conf = env.getConfiguration();
69-
this.metricsSource =
70-
MetricsPhoenixCoprocessorSourceFactory.getInstance().getMetadataCachingSource();
75+
this.metricsSource = MetricsPhoenixCoprocessorSourceFactory
76+
.getInstance().getMetadataCachingSource();
7177
initUncoveredIndexThreadPool(this.conf);
72-
this.zkUrl = getLocalZkUrl(conf);
78+
// Start async prewarming of HAGroupStoreClients if enabled
79+
if (conf.getBoolean(
80+
QueryServices.HA_GROUP_STORE_CLIENT_PREWARM_ENABLED,
81+
QueryServicesOptions
82+
.DEFAULT_HA_GROUP_STORE_CLIENT_PREWARM_ENABLED)) {
83+
startHAGroupStoreClientPrewarming();
84+
} else {
85+
LOGGER.info("HAGroupStoreClient prewarming is disabled");
86+
}
7387
// Start replication log replay
7488
ReplicationLogReplayService.getInstance(conf).start();
7589
}
@@ -84,6 +98,10 @@ public void stop(CoprocessorEnvironment env) throws IOException {
8498
.stop("PhoenixRegionServerEndpoint is stopping. Shutting down uncovered index threadpool.");
8599
}
86100
ServerUtil.ConnectionFactory.shutdown();
101+
// Stop prewarming executor
102+
if (prewarmExecutor != null) {
103+
prewarmExecutor.shutdownNow();
104+
}
87105
}
88106

89107
@Override
@@ -215,4 +233,90 @@ private static void initUncoveredIndexThreadPool(Configuration conf) {
215233
LOGGER.info("Initialized region level thread pool for Uncovered Global Indexes.");
216234
}
217235

236+
/**
237+
* Prewarms HAGroupStoreClients in background thread with retry.
238+
* Initializes all HA group clients asynchronously at startup.
239+
* <p>
240+
* Phase 1 : Retry indefinitely until HAGroupStoreManager is initialized
241+
* and HAGroupNames are retrieved. If the SYSTEM.HA_GROUP table region
242+
* is not ready, manager.getHAGroupNames() would return an exception.
243+
* So we need to retry until the SYSTEM.HA_GROUP table region is ready
244+
* and then retrieve the HAGroupNames for prewarming.
245+
*
246+
* <p>
247+
* Phase 2 : Prewarm individual HAGroupStoreClients with retry.
248+
* If the HAGroupStoreClient is not ready/initialized,
249+
* manager.getClusterRoleRecord(haGroup) would throw an exception.
250+
* So we need to retry until the HAGroupStoreClient is ready/initialized.
251+
*/
252+
private void startHAGroupStoreClientPrewarming() {
253+
prewarmExecutor = Executors.newSingleThreadExecutor(r -> {
254+
Thread t = new Thread(r, "HAGroupStoreClient-Prewarm");
255+
t.setDaemon(true);
256+
return t;
257+
});
258+
259+
prewarmExecutor.submit(() -> {
260+
HAGroupStoreManager manager = null;
261+
List<String> pending = null;
262+
// Phase 1: Retry indefinitely until HAGroupStoreManager is initialized
263+
// and HAGroupNames are retrieved.
264+
while (pending == null) {
265+
try {
266+
manager = HAGroupStoreManager.getInstance(conf);
267+
if (manager != null) {
268+
pending = new ArrayList<>(manager.getHAGroupNames());
269+
LOGGER.info("Starting prewarming for {} HAGroupStoreClients",
270+
pending.size());
271+
} else {
272+
LOGGER.debug("HAGroupStoreManager is null, retrying in 2s...");
273+
Thread.sleep(2000);
274+
}
275+
} catch (InterruptedException e) {
276+
LOGGER.info("HAGroupStoreClient prewarming interrupted during "
277+
+ "initialization");
278+
Thread.currentThread().interrupt();
279+
return;
280+
} catch (Exception e) {
281+
LOGGER.debug("Failed to initialize HAGroupStoreManager, retrying in "
282+
+ "2s...", e);
283+
try {
284+
Thread.sleep(2000);
285+
} catch (InterruptedException ie) {
286+
LOGGER.info("HAGroupStoreClient prewarming interrupted");
287+
Thread.currentThread().interrupt();
288+
return;
289+
}
290+
}
291+
}
292+
293+
// Phase 2: Prewarm individual HAGroupStoreClients with retry
294+
try {
295+
while (!pending.isEmpty()) {
296+
Iterator<String> iterator = pending.iterator();
297+
while (iterator.hasNext()) {
298+
String haGroup = iterator.next();
299+
try {
300+
manager.getClusterRoleRecord(haGroup);
301+
iterator.remove();
302+
LOGGER.info("Prewarmed HAGroupStoreClient: {} ({} remaining)",
303+
haGroup, pending.size());
304+
} catch (Exception e) {
305+
LOGGER.debug("Failed to prewarm {}, will retry", haGroup, e);
306+
}
307+
}
308+
309+
if (!pending.isEmpty()) {
310+
Thread.sleep(2000);
311+
}
312+
}
313+
314+
LOGGER.info("Completed prewarming all HAGroupStoreClients");
315+
} catch (InterruptedException e) {
316+
LOGGER.info("HAGroupStoreClient prewarming interrupted during warmup");
317+
Thread.currentThread().interrupt();
318+
}
319+
});
320+
}
321+
218322
}

0 commit comments

Comments
 (0)