Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -62,12 +65,14 @@
/**
* This is first implementation of RegionServer coprocessor introduced by Phoenix.
*/
@CoreCoprocessor
public class PhoenixRegionServerEndpoint extends
RegionServerEndpointProtos.RegionServerEndpointService implements RegionServerCoprocessor {
private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixRegionServerEndpoint.class);
private MetricsMetadataCachingSource metricsSource;
protected Configuration conf;
protected ServerName serverName;
private Abortable abortable;
private ExecutorService prewarmExecutor;

// regionserver level thread pool used by Uncovered Indexes to scan data table rows
Expand All @@ -79,6 +84,9 @@ public void start(CoprocessorEnvironment env) throws IOException {
if (env instanceof RegionServerCoprocessorEnvironment) {
this.serverName = ((RegionServerCoprocessorEnvironment) env).getServerName();
}
if (env instanceof HasRegionServerServices) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this check necessary? Are there cases where env is not an instance of RegionServerCoprocessorEnvironment here or is this just defensive coding? Can we add a comment here to clarify?

this.abortable = ((HasRegionServerServices) env).getRegionServerServices();
}
this.metricsSource =
MetricsPhoenixCoprocessorSourceFactory.getInstance().getMetadataCachingSource();
initUncoveredIndexThreadPool(this.conf);
Expand Down Expand Up @@ -308,7 +316,7 @@ private void startHAGroupStoreClientPrewarming() {
manager.getClusterRoleRecord(haGroup);
if (shouldInitReplicationLogGroup) {
try {
ReplicationLogGroup.get(conf, serverName, haGroup);
ReplicationLogGroup.get(conf, serverName, haGroup, abortable);
LOGGER.info("Eagerly initialized ReplicationLogGroup {} on server {}", haGroup,
serverName);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.function.Predicate;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellScanner;
Expand All @@ -70,6 +71,8 @@
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
Expand Down Expand Up @@ -170,6 +173,7 @@
* does batch mutations.
* <p>
*/
@CoreCoprocessor
public class IndexRegionObserver implements RegionCoprocessor, RegionObserver {

private static final Logger LOG = LoggerFactory.getLogger(IndexRegionObserver.class);
Expand Down Expand Up @@ -439,6 +443,7 @@ public int getMaxPendingRowCount() {
private static final int DEFAULT_CONCURRENT_MUTATION_WAIT_DURATION_IN_MS = 100;
private byte[] encodedRegionName;
private boolean shouldReplicate;
private Abortable abortable;

// Don't replicate the mutation if this attribute is set
private static final Predicate<Mutation> IGNORE_REPLICATION =
Expand Down Expand Up @@ -541,6 +546,9 @@ public void start(CoprocessorEnvironment e) throws IOException {
if (this.shouldReplicate) {
this.ignoreReplicationFilter = getSynchronousReplicationFilter(tableName);
}
if (e instanceof HasRegionServerServices) {
this.abortable = ((HasRegionServerServices) e).getRegionServerServices();
}
} catch (NoSuchMethodError ex) {
disabled = true;
LOG.error("Must be too early a version of HBase. Disabled coprocessor ", ex);
Expand Down Expand Up @@ -689,7 +697,7 @@ private Optional<ReplicationLogGroup> getHAGroupFromBatch(RegionCoprocessorEnvir
byte[] haGroupName = m.getAttribute(BaseScannerRegionObserverConstants.HA_GROUP_NAME_ATTRIB);
if (haGroupName != null) {
ReplicationLogGroup logGroup = ReplicationLogGroup.get(env.getConfiguration(),
env.getServerName(), Bytes.toString(haGroupName));
env.getServerName(), Bytes.toString(haGroupName), abortable);
return Optional.of(logGroup);
}
}
Expand All @@ -707,7 +715,7 @@ private Optional<ReplicationLogGroup> getHAGroupFromWALKey(RegionCoprocessorEnvi
logKey.getExtendedAttribute(BaseScannerRegionObserverConstants.HA_GROUP_NAME_ATTRIB);
if (haGroupName != null) {
ReplicationLogGroup logGroup = ReplicationLogGroup.get(env.getConfiguration(),
env.getServerName(), Bytes.toString(haGroupName));
env.getServerName(), Bytes.toString(haGroupName), abortable);
return Optional.of(logGroup);
}
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public void init() throws IOException {
}

public void close() {
replicationLogTracker.close();
if (this.metrics != null) {
this.metrics.close();
}
Expand Down
Loading