Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1208,7 +1208,7 @@ public synchronized Caches getCaches() {
ensureOpen();
if (caches == null) {
caches = Caches.getInstance();
if (micrometer != null
if (micrometer != null && getConfiguration().getBoolean(Property.GENERAL_MICROMETER_ENABLED)
&& getConfiguration().getBoolean(Property.GENERAL_MICROMETER_CACHE_METRICS_ENABLED)) {
caches.registerMetrics(micrometer);
}
Expand Down
12 changes: 8 additions & 4 deletions core/src/main/java/org/apache/accumulo/core/conf/Property.java
Original file line number Diff line number Diff line change
Expand Up @@ -353,10 +353,14 @@ temporary files (for example, when creating a pre-split table). \
"The maximum amount of time that a Scanner should wait before retrying a failed RPC.",
"1.7.3"),
GENERAL_MICROMETER_CACHE_METRICS_ENABLED("general.micrometer.cache.metrics.enabled", "false",
PropertyType.BOOLEAN, "Enables Caffeine Cache metrics functionality using Micrometer.",
PropertyType.BOOLEAN,
"Enables Caffeine Cache metrics functionality using Micrometer. Requires "
+ " property 'general.micrometer.enabled' to be set to 'true' to take effect.",
"4.0.0"),
GENERAL_MICROMETER_ENABLED("general.micrometer.enabled", "false", PropertyType.BOOLEAN,
"Enables metrics collection and reporting functionality using Micrometer.", "2.1.0"),
GENERAL_MICROMETER_ENABLED("general.micrometer.enabled", "true", PropertyType.BOOLEAN,
"Enables metrics collection and reporting functionality using Micrometer. The Monitor"
+ " is dependent on metrics being enabled to function correctly.",
"2.1.0"),
GENERAL_MICROMETER_JVM_METRICS_ENABLED("general.micrometer.jvm.metrics.enabled", "false",
PropertyType.BOOLEAN,
"Enables additional JVM metrics collection and reporting using Micrometer. Requires "
Expand All @@ -369,7 +373,7 @@ temporary files (for example, when creating a pre-split table). \
'log4j2' or 'logback'.
""", "2.1.4"),
GENERAL_MICROMETER_FACTORY("general.micrometer.factory",
"org.apache.accumulo.core.spi.metrics.LoggingMeterRegistryFactory",
"org.apache.accumulo.core.spi.metrics.AccumuloMonitorMeterRegistryFactory",
PropertyType.CLASSNAMELIST,
"""
A comma separated list of one or more class names that implements \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import com.google.common.net.HostAndPort;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;

public interface MetricsInfo {
Expand Down Expand Up @@ -91,6 +92,17 @@ static List<Tag> addressTags(final HostAndPort hostAndPort) {

boolean isMetricsEnabled();

/**
* @return true if the MonitorMeterRegistry has been enabled.
*/
public boolean isMonitorRegistryEnabled();

/**
*
* @return Monitor MeterRegistry, or null
*/
public MeterRegistry getMonitorRegistry();

/**
* Common tags for all services.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.spi.metrics;

import org.apache.accumulo.core.metrics.MonitorMeterRegistry;

import io.micrometer.core.instrument.MeterRegistry;

public class AccumuloMonitorMeterRegistryFactory implements MeterRegistryFactory {

@Override
public MeterRegistry create(InitParameters params) {
return new MonitorMeterRegistry();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,10 @@ public void testIsValidTablePropertyKey() {
@Test
public void testFixedPropertiesNonNull() {
Property.FIXED_PROPERTIES.forEach(p -> {
assertNotNull(p.getDefaultValue());
assertFalse(p.getDefaultValue().isBlank());
assertNotNull(p.getDefaultValue(), "Default value is null: " + p.getKey());
if (!p.equals(Property.GENERAL_MICROMETER_FACTORY)) {
assertFalse(p.getDefaultValue().isBlank(), "Default value is blank: " + p.getKey());
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,11 @@ MiniAccumuloConfigImpl initialize() {
mergeProp(Property.INSTANCE_SECRET.getKey(), DEFAULT_INSTANCE_SECRET);
}

// enable metrics reporting - by default will appear in standard log files.
mergeProp(Property.GENERAL_MICROMETER_ENABLED.getKey(), "true");
// Disable metrics for MiniAccumulo if not specifically enabled
if (!siteConfig.containsKey(Property.GENERAL_MICROMETER_ENABLED.getKey())) {
setProperty(Property.GENERAL_MICROMETER_ENABLED, "false");
setProperty(Property.GENERAL_MICROMETER_FACTORY, "");
}

mergeProp(Property.TSERV_DATACACHE_SIZE.getKey(), "10M");
mergeProp(Property.TSERV_INDEXCACHE_SIZE.getKey(), "10M");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.accumulo.core.data.ResourceGroupId;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.metrics.Metric;
import org.apache.accumulo.core.metrics.MetricsInfo;
import org.apache.accumulo.core.metrics.MetricsProducer;
import org.apache.accumulo.core.process.thrift.MetricResponse;
import org.apache.accumulo.core.process.thrift.MetricSource;
Expand All @@ -53,7 +54,6 @@
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.server.mem.LowMemoryDetector;
import org.apache.accumulo.server.metrics.MetricResponseWrapper;
import org.apache.accumulo.server.metrics.MetricsInfoImpl;
import org.apache.accumulo.server.metrics.ProcessMetrics;
import org.apache.accumulo.server.rpc.ServerAddress;
import org.apache.accumulo.server.security.SecurityUtil;
Expand Down Expand Up @@ -405,9 +405,9 @@ public MetricResponse getMetrics(TInfo tinfo, TCredentials credentials) throws T
response.setResourceGroup(getResourceGroup().canonical());
response.setTimestamp(System.currentTimeMillis());

var registry = MetricsInfoImpl.MONITOR_REGISTRY.get();
if (registry != null) {
registry.getMeters().forEach(m -> {
final MetricsInfo mi = getContext().getMetricsInfo();
if (mi.isMonitorRegistryEnabled()) {
mi.getMonitorRegistry().getMeters().forEach(m -> {
if (m.getId().getName().startsWith("accumulo.")
|| m.getId().getName().equals(Metric.EXECUTOR_COMPLETED.getName())
|| m.getId().getName().equals(Metric.EXECUTOR_QUEUED.getName())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,10 @@ public class MetricsInfoImpl implements MetricsInfo {
private AutoCloseable logMetrics;

private final boolean metricsEnabled;
private final AtomicReference<MeterRegistry> monitorRegistry = new AtomicReference<>();

private final List<MetricsProducer> producers = new ArrayList<>();

public static final AtomicReference<MeterRegistry> MONITOR_REGISTRY = new AtomicReference<>();

public MetricsInfoImpl(final ServerContext context) {
this.context = context;
metricsEnabled = context.getConfiguration().getBoolean(Property.GENERAL_MICROMETER_ENABLED);
Expand All @@ -97,6 +96,16 @@ public boolean isMetricsEnabled() {
return metricsEnabled;
}

@Override
public boolean isMonitorRegistryEnabled() {
return monitorRegistry.get() != null;
}

@Override
public MeterRegistry getMonitorRegistry() {
return monitorRegistry.get();
}

@Override
public synchronized void addMetricsProducers(MetricsProducer... producer) {
if (!metricsEnabled) {
Expand Down Expand Up @@ -162,16 +171,16 @@ public synchronized void init(Collection<Tag> tags) {
for (String factoryName : getTrimmedStrings(userRegistryFactories)) {
try {
MeterRegistry registry = getRegistryFromFactory(factoryName, context);
if (registry.getClass().equals(MonitorMeterRegistry.class)) {
monitorRegistry.compareAndSet(null, registry);
}
registry.config().commonTags(commonTags);
Metrics.globalRegistry.add(registry);
} catch (ReflectiveOperationException ex) {
LOG.warn("Could not load registry {}", factoryName, ex);
}
}

MONITOR_REGISTRY.set(new MonitorMeterRegistry());
Metrics.globalRegistry.add(MONITOR_REGISTRY.get());

// Set the MeterRegistry on the ThreadPools
ThreadPools.getServerThreadPools().setMeterRegistry(Metrics.globalRegistry);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -904,17 +904,19 @@ private void setupAssistantMetrics(MetricsProducer... producers) {
// This is called after getting the primary manager lock
private void setupPrimaryMetrics() {
MetricsInfo metricsInfo = getContext().getMetricsInfo();
metricsInfo.addMetricsProducers(balanceManager.getMetrics());
// ensure all tablet group watchers are setup
Preconditions.checkState(watchers.size() == DataLevel.values().length);
watchers.forEach(watcher -> metricsInfo.addMetricsProducers(watcher.getMetrics()));
metricsInfo.addMetricsProducers(requireNonNull(compactionCoordinator));
// ensure fate is completely setup
metricsInfo.addMetricsProducers(new MetaFateMetrics(getContext(),
getConfiguration().getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL)));
metricsInfo.addMetricsProducers(new UserFateMetrics(getContext(),
getConfiguration().getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL)));
metricsInfo.addMetricsProducers(this);
if (metricsInfo.isMetricsEnabled()) {
metricsInfo.addMetricsProducers(balanceManager.getMetrics());
// ensure all tablet group watchers are setup
Preconditions.checkState(watchers.size() == DataLevel.values().length);
watchers.forEach(watcher -> metricsInfo.addMetricsProducers(watcher.getMetrics()));
metricsInfo.addMetricsProducers(requireNonNull(compactionCoordinator));
// ensure fate is completely setup
metricsInfo.addMetricsProducers(new MetaFateMetrics(getContext(),
getConfiguration().getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL)));
metricsInfo.addMetricsProducers(new UserFateMetrics(getContext(),
getConfiguration().getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL)));
metricsInfo.addMetricsProducers(this);
}
}

@Override
Expand Down
2 changes: 0 additions & 2 deletions test/src/main/java/org/apache/accumulo/test/BalanceIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ public class BalanceIT extends ConfigurableMacBase {
public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
Map<String,String> siteConfig = cfg.getSiteConfig();
siteConfig.put(Property.TSERV_MAXMEM.getKey(), "10K");
siteConfig.put(Property.GENERAL_MICROMETER_ENABLED.getKey(), "true");
siteConfig.put("general.custom.metrics.opts.logging.step", "0.5s");
cfg.setSiteConfig(siteConfig);
cfg.getClusterServerConfiguration().setNumDefaultTabletServers(2);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ public class BalanceInPresenceOfOfflineTableIT extends AccumuloClusterHarness {
public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
Map<String,String> siteConfig = cfg.getSiteConfig();
siteConfig.put(Property.TSERV_MAXMEM.getKey(), "10K");
siteConfig.put(Property.GENERAL_MICROMETER_ENABLED.getKey(), "true");
siteConfig.put("general.custom.metrics.opts.logging.step", "0.5s");
cfg.setSiteConfig(siteConfig);
// ensure we have two tservers
if (cfg.getClusterServerConfiguration().getTabletServerConfiguration()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ public static void beforeAll() throws Exception {

// Tell the server processes to use a StatsDMeterRegistry that will be configured
// to push all metrics to the sink we started.

cfg.setProperty(Property.GENERAL_MICROMETER_ENABLED, "true");
cfg.setProperty(Property.GENERAL_MICROMETER_FACTORY,
TestStatsDRegistryFactory.class.getName());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.test.metrics;

import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.metrics.MetricsProducer;
import org.apache.accumulo.core.spi.metrics.LoggingMeterRegistryFactory;
import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.fate.FateTestUtil;
import org.apache.accumulo.test.fate.SlowFateSplitManager;
import org.apache.accumulo.test.functional.ConfigurableMacBase;
import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import io.micrometer.core.instrument.MeterRegistry;

public class MetricsDisabledIT extends ConfigurableMacBase implements MetricsProducer {
private static TestStatsDSink sink;
private static final int numFateThreadsPool1 = 5;
private static final String allOpsFateExecutorName = "pool1";

@Override
protected Duration defaultTimeout() {
return Duration.ofMinutes(3);
}

@BeforeAll
public static void before() throws Exception {
sink = new TestStatsDSink();
}

@AfterAll
public static void after() throws Exception {
sink.close();
}

@Override
protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
cfg.setProperty(Property.GC_CYCLE_START, "1s");
cfg.setProperty(Property.GC_CYCLE_DELAY, "1s");
cfg.setProperty(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL, "1s");
// Tell the server processes to use a StatsDMeterRegistry and the simple logging registry
// that will be configured to push all metrics to the sink we started.
cfg.setProperty(Property.GENERAL_MICROMETER_USER_TAGS, "tag1=value1,tag2=value2");
cfg.setProperty(Property.GENERAL_MICROMETER_CACHE_METRICS_ENABLED, "true");
cfg.setProperty(Property.GENERAL_MICROMETER_JVM_METRICS_ENABLED, "true");
cfg.setProperty("general.custom.metrics.opts.logging.step", "10s");
String clazzList = LoggingMeterRegistryFactory.class.getName() + ","
+ TestStatsDRegistryFactory.class.getName();
cfg.setProperty(Property.GENERAL_MICROMETER_FACTORY, clazzList);
Map<String,String> sysProps = Map.of(TestStatsDRegistryFactory.SERVER_HOST, "127.0.0.1",
TestStatsDRegistryFactory.SERVER_PORT, Integer.toString(sink.getPort()));
cfg.setSystemProperties(sysProps);
// custom config for the fate thread pools.
var fatePoolsConfig = FateTestUtil.updateFateConfig(new ConfigurationCopy(),
numFateThreadsPool1, allOpsFateExecutorName);
cfg.setProperty(Property.MANAGER_FATE_USER_CONFIG.getKey(),
fatePoolsConfig.get(Property.MANAGER_FATE_USER_CONFIG));
cfg.setProperty(Property.MANAGER_FATE_META_CONFIG.getKey(),
fatePoolsConfig.get(Property.MANAGER_FATE_META_CONFIG));
// Make splits run slowly, used for testing the fate metrics
cfg.setServerClass(ServerType.MANAGER, r -> SlowFateSplitManager.class);
}

@Test
public void confirmNoMetricsPublished() throws Exception {

AtomicReference<Exception> error = new AtomicReference<>();
Thread workerThread = new Thread(() -> {
try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) {
MetricsIT.doWorkToGenerateMetrics(client, getClass());
} catch (Exception e) {
error.set(e);
}
});
workerThread.start();

Timer t = Timer.startNew();

// GENERAL_MICROMETER_ENABLED is set to false in MiniAccumuloConfigImpl
// We should not see any metrics.
while (t.elapsed(TimeUnit.SECONDS) < 60) {
List<String> lines = sink.getLines();
assertTrue(lines.isEmpty(), "Encountered the following metrics when disabled: " + lines);
}
workerThread.join();
assertNull(error.get());
}

@Override
public void registerMetrics(MeterRegistry registry) {
// unused; this class only extends MetricsProducer to easily reference its methods/constants
}
}
Loading