From e4e7772147d397a7751680c47737e4cd18e3d6d5 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Fri, 13 Mar 2026 11:43:23 +0000 Subject: [PATCH 1/5] wip --- .../accumulo/core/metrics/MetricsInfo.java | 3 +- .../server/metrics/MetricsInfoImpl.java | 9 +- .../org/apache/accumulo/manager/Manager.java | 20 ++- .../test/MultipleManagerMetricsIT.java | 155 ++++++++++++++++++ 4 files changed, 183 insertions(+), 4 deletions(-) create mode 100644 test/src/main/java/org/apache/accumulo/test/MultipleManagerMetricsIT.java diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsInfo.java b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsInfo.java index ba79378a29e..99cd42e99f5 100644 --- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsInfo.java +++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsInfo.java @@ -30,6 +30,7 @@ import com.google.common.net.HostAndPort; import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.config.MeterFilter; public interface MetricsInfo { @@ -113,7 +114,7 @@ static Collection serviceTags(final String instanceName, final String appli * Initialize the metrics system. This sets the list of common tags that are emitted with the * metrics. */ - void init(Collection commonTags); + void init(Collection commonTags, MeterFilter... filters); /** * Close the underlying registry and release resources. The registry will not accept new meters diff --git a/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsInfoImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsInfoImpl.java index 60f48b6b292..7bd297fc75b 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsInfoImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsInfoImpl.java @@ -48,6 +48,7 @@ import io.micrometer.core.instrument.binder.logging.Log4j2Metrics; import io.micrometer.core.instrument.binder.logging.LogbackMetrics; import io.micrometer.core.instrument.binder.system.ProcessorMetrics; +import io.micrometer.core.instrument.config.MeterFilter; public class MetricsInfoImpl implements MetricsInfo { @@ -111,7 +112,7 @@ public synchronized void addMetricsProducers(MetricsProducer... producer) { } @Override - public synchronized void init(Collection tags) { + public synchronized void init(Collection tags, MeterFilter... filters) { Objects.requireNonNull(tags); if (!metricsEnabled) { @@ -197,7 +198,11 @@ public synchronized void init(Collection tags) { Property.GENERAL_MICROMETER_LOG_METRICS.getKey()); } - LOG.info("Metrics initialization. Register producers: {}", producers); + for (MeterFilter f : filters) { + Metrics.globalRegistry.config().meterFilter(f); + } + + LOG.info("Metrics initialization. Registered producers: {}", producers); producers.forEach(p -> p.registerMetrics(Metrics.globalRegistry)); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 5ff232765c4..c81f9b6eb0a 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -170,6 +170,8 @@ import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.config.MeterFilter; import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Scope; @@ -184,6 +186,9 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener, FateEnv, PrimaryManagerThriftService { static final Logger log = LoggerFactory.getLogger(Manager.class); + + // visible for testing + public static final String PRIMARY_TAG_KEY = "manager.primary"; // When in safe mode totalAssignedOrHosted() is called every 10s // which logs 3 messages about assigned tablets, 1 message @@ -945,8 +950,21 @@ private void checkForHeldServer(SortedMap ts private void setupAssistantMetrics(MetricsProducer... producers) { MetricsInfo metricsInfo = getContext().getMetricsInfo(); metricsInfo.addMetricsProducers(producers); - metricsInfo.init(MetricsInfo.serviceTags(getContext().getInstanceName(), getApplicationName(), + List tags = new ArrayList<>(); + tags.addAll(MetricsInfo.serviceTags(getContext().getInstanceName(), getApplicationName(), getAdvertiseAddress(), getResourceGroup())); + // Add the primary tag + tags.add(Tag.of(PRIMARY_TAG_KEY, "false")); + // Create a MeterFilter that will replace the primary + // tag value with a true if this Manager holds the primary lock + var primaryMeterFilter = MeterFilter.replaceTagValues(PRIMARY_TAG_KEY, (currentValue) -> { + if (isPrimaryManager()) { + return "true"; + } else { + return "false"; + } + }); + metricsInfo.init(tags, primaryMeterFilter); } // This is called after getting the primary manager lock diff --git a/test/src/main/java/org/apache/accumulo/test/MultipleManagerMetricsIT.java b/test/src/main/java/org/apache/accumulo/test/MultipleManagerMetricsIT.java new file mode 100644 index 00000000000..9c43a74fe38 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/MultipleManagerMetricsIT.java @@ -0,0 +1,155 @@ +package org.apache.accumulo.test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.accumulo.core.client.admin.servers.ServerId; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; +import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; +import org.apache.accumulo.core.metrics.MetricsInfo; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.functional.ConfigurableMacBase; +import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory; +import org.apache.accumulo.test.metrics.TestStatsDSink; +import org.apache.accumulo.test.util.Wait; +import org.apache.hadoop.conf.Configuration; +import org.apache.zookeeper.KeeperException; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.net.HostAndPort; + +public class MultipleManagerMetricsIT extends ConfigurableMacBase { + + private static final Logger LOG = LoggerFactory.getLogger(MultipleManagerMetricsIT.class); + private static TestStatsDSink sink; + + private final AtomicReference currentPrimary = new AtomicReference<>(); + private final AtomicBoolean stopThread = new AtomicBoolean(false); + + @Override + protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + // Set this lower so that locks timeout faster + cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s"); + cfg.setProperty(Property.MANAGER_STARTUP_MANAGER_AVAIL_MIN_COUNT, "2"); + cfg.setProperty(Property.MANAGER_STARTUP_MANAGER_AVAIL_MAX_WAIT, "10s"); + cfg.getClusterServerConfiguration().setNumManagers(2); + + // Tell the server processes to use a StatsDMeterRegistry and the simple logging registry + // that will be configured to push all metrics to the sink we started. + cfg.setProperty(Property.GENERAL_MICROMETER_ENABLED, "true"); + cfg.setProperty(Property.GENERAL_MICROMETER_CACHE_METRICS_ENABLED, "true"); + cfg.setProperty(Property.GENERAL_MICROMETER_JVM_METRICS_ENABLED, "true"); + cfg.setProperty(Property.GENERAL_MICROMETER_FACTORY, TestStatsDRegistryFactory.class.getName()); + Map sysProps = Map.of(TestStatsDRegistryFactory.SERVER_HOST, "127.0.0.1", + TestStatsDRegistryFactory.SERVER_PORT, Integer.toString(sink.getPort())); + cfg.setSystemProperties(sysProps); + + super.configure(cfg, hadoopCoreSite); + } + + @BeforeAll + public static void before() throws Exception { + sink = new TestStatsDSink(); + } + + @AfterAll + public static void after() throws Exception { + sink.close(); + } + + private String getPrimary() { + Set primaries = getCluster().getServerContext().instanceOperations().getServers(ServerId.Type.MANAGER); + assertEquals(1, primaries.size()); + ServerId primary = primaries.iterator().next(); + return primary.toHostPortString(); + } + +// private Set getManagers() { +// Set results = new HashSet<>(); +// Set managers = getCluster().getServerContext().getServerPaths().getAssistantManagers(AddressSelector.all(), true); +// managers.forEach(m -> results.add(m.getServer())); +// return results; +// } + + private void killPrimary() throws KeeperException, InterruptedException { + ServiceLockPath slp = getCluster().getServerContext().getServerPaths().getManager(true); + getCluster().getServerContext().getZooSession().asReaderWriter().recursiveDelete(slp.toString(), NodeMissingPolicy.SKIP); + } + + @Test + public void testPrimaryManagerTagChanges() throws Exception { + + AtomicReference threadError = new AtomicReference<>(); + + Thread t = new Thread(() -> { + List statsDMetrics; + while (!stopThread.get()) { + if (!(statsDMetrics = sink.getLines()).isEmpty()) { + List metrics = statsDMetrics.stream().filter(line -> line.startsWith("accumulo")) + .map(TestStatsDSink::parseStatsDMetric) + .filter(m -> m.getTags().get(MetricsInfo.PROCESS_NAME_TAG_KEY).equals(ServerId.Type.MANAGER.name())) + .filter(m -> m.getTags().get(Manager.PRIMARY_TAG_KEY).equals("true")) + .toList(); + if (metrics.size() > 0) { + TestStatsDSink.Metric last = metrics.get(metrics.size() - 1); + String host = last.getTags().get(MetricsInfo.HOST_TAG_KEY); + String port = last.getTags().get(MetricsInfo.PORT_TAG_KEY); + if (host != null && port != null) { + currentPrimary.set(HostAndPort.fromParts(host, Integer.parseInt(port)).toString()); + } + } + } + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + threadError.set(e); + } + } + }); + + t.start(); + try { + String previousPrimary = null; + + for (int i = 0; i < 5; i++) { + // Wait for primary lock to be acquired + Wait.waitFor(() -> getCluster().getServerContext().getServerPaths().getManager(true) != null, 60_000); + + String primary = getPrimary(); + LOG.info("Primary manager at: {}", primary); + + // Check that the primary address switched + if (previousPrimary != null) { + assertNotEquals(previousPrimary, primary); + getCluster().start(); + } + + // Wait for primary metric to match current primary + final String tmp = primary; + Wait.waitFor(() -> currentPrimary.get() != null && currentPrimary.get().equals(tmp), 60_000); + + // Kill the Primary + previousPrimary = primary; + killPrimary(); + } + } finally { + stopThread.set(true); + t.join(); + } + assertNull(threadError.get()); + } + +} From efd174c040ce933632541be34673ee2987d20236 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Fri, 13 Mar 2026 20:37:59 +0000 Subject: [PATCH 2/5] Added reinit to MetricsInfoImpl --- .../accumulo/core/metrics/MetricsInfo.java | 13 +- .../core/util/threads/ThreadPools.java | 4 + .../server/metrics/MetricsInfoImpl.java | 60 ++- .../org/apache/accumulo/manager/Manager.java | 17 +- .../test/MultipleManagerMetricsIT.java | 374 ++++++++++++++---- .../accumulo/test/metrics/MetricsIT.java | 51 +-- 6 files changed, 383 insertions(+), 136 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsInfo.java b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsInfo.java index 99cd42e99f5..bbb3801efd8 100644 --- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsInfo.java +++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsInfo.java @@ -30,7 +30,6 @@ import com.google.common.net.HostAndPort; import io.micrometer.core.instrument.Tag; -import io.micrometer.core.instrument.config.MeterFilter; public interface MetricsInfo { @@ -110,11 +109,21 @@ static Collection serviceTags(final String instanceName, final String appli void addMetricsProducers(MetricsProducer... producer); + /** + * @return true if {@code #init(Collection)} has been called, false otherwise + */ + boolean isInitialized(); + /** * Initialize the metrics system. This sets the list of common tags that are emitted with the * metrics. */ - void init(Collection commonTags, MeterFilter... filters); + void init(Collection commonTags); + + /** + * Clears all Meters and re-registers all MetricsProducers + */ + void reinit(Collection replacementTags); /** * Close the underlying registry and release resources. The registry will not accept new meters diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java index 5a80b380f73..285793d859f 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java @@ -763,6 +763,10 @@ public void setMeterRegistry(MeterRegistry r) { } } + public void clearMeterRegistry() { + registry.set(null); + } + /** * Called by MetricsInfoImpl.init on the server side if metrics are disabled. ClientContext calls * {@code #getClientThreadPools(AccumuloConfiguration, UncaughtExceptionHandler)} above. diff --git a/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsInfoImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsInfoImpl.java index 7bd297fc75b..8eeae38a21d 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsInfoImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsInfoImpl.java @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.List; import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; import org.apache.accumulo.core.classloader.ClassLoaderUtil; import org.apache.accumulo.core.conf.Property; @@ -48,7 +49,6 @@ import io.micrometer.core.instrument.binder.logging.Log4j2Metrics; import io.micrometer.core.instrument.binder.logging.LogbackMetrics; import io.micrometer.core.instrument.binder.system.ProcessorMetrics; -import io.micrometer.core.instrument.config.MeterFilter; public class MetricsInfoImpl implements MetricsInfo { @@ -56,8 +56,6 @@ public class MetricsInfoImpl implements MetricsInfo { private final ServerContext context; - private List commonTags = null; - // JvmGcMetrics are declared with AutoCloseable - keep reference to use with close() private JvmGcMetrics jvmGcMetrics; // Log4j2Metrics and LogbackMetrics are declared with AutoCloseable - keep reference to use with @@ -66,7 +64,9 @@ public class MetricsInfoImpl implements MetricsInfo { private final boolean metricsEnabled; - private final List producers = new ArrayList<>(); + private List commonTags = null; + private final List initializedProducers = new ArrayList<>(); + private final AtomicReference configuration = new AtomicReference<>(); public MetricsInfoImpl(final ServerContext context) { this.context = context; @@ -104,15 +104,18 @@ public synchronized void addMetricsProducers(MetricsProducer... producer) { return; } - if (commonTags == null) { - producers.addAll(Arrays.asList(producer)); - } else { + initializedProducers.addAll(Arrays.asList(producer)); + if (isInitialized()) { Arrays.stream(producer).forEach(p -> p.registerMetrics(Metrics.globalRegistry)); } } + public synchronized boolean isInitialized() { + return configuration.get() != null; + } + @Override - public synchronized void init(Collection tags, MeterFilter... filters) { + public synchronized void init(Collection tags) { Objects.requireNonNull(tags); if (!metricsEnabled) { @@ -121,7 +124,7 @@ public synchronized void init(Collection tags, MeterFilter... filters) { return; } - if (commonTags != null) { + if (isInitialized()) { LOG.warn("metrics registry has already been initialized"); return; } @@ -142,10 +145,7 @@ public synchronized void init(Collection tags, MeterFilter... filters) { } } - commonTags = List.copyOf(tags); - - LOG.info("Metrics initialization. common tags: {}", commonTags); - + commonTags = new ArrayList<>(tags); Metrics.globalRegistry.config().commonTags(commonTags); boolean jvmMetricsEnabled = @@ -198,12 +198,30 @@ public synchronized void init(Collection tags, MeterFilter... filters) { Property.GENERAL_MICROMETER_LOG_METRICS.getKey()); } - for (MeterFilter f : filters) { - Metrics.globalRegistry.config().meterFilter(f); - } + initializedProducers.forEach(p -> p.registerMetrics(Metrics.globalRegistry)); + configuration.set("Producers: " + initializedProducers + ", common tags: " + commonTags); + LOG.info("Metrics initialized. " + configuration.get()); + } - LOG.info("Metrics initialization. Registered producers: {}", producers); - producers.forEach(p -> p.registerMetrics(Metrics.globalRegistry)); + @Override + public void reinit(Collection replacementTags) { + LOG.info("Reinitializing Metrics"); + List removals = new ArrayList<>(); + for (Tag r : replacementTags) { + for (Tag t : commonTags) { + if (t.getKey().equals(r.getKey())) { + removals.add(t); + } + } + } + commonTags.removeAll(removals); + commonTags.addAll(replacementTags); + Metrics.globalRegistry.getRegistries().forEach(r -> r.close()); + Metrics.globalRegistry.getRegistries().forEach(r -> Metrics.globalRegistry.remove(r)); + Metrics.globalRegistry.clear(); + configuration.set(null); + ThreadPools.getServerThreadPools().clearMeterRegistry(); + init(commonTags); } @VisibleForTesting @@ -244,6 +262,10 @@ public synchronized void close() { @Override public synchronized String toString() { - return "MetricsCommonTags{tags=" + commonTags + '}'; + String msg = configuration.get(); + if (msg == null) { + return "MetricsInfo not initialized yet."; + } + return msg; } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index c81f9b6eb0a..c66cf367c1f 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -171,7 +171,6 @@ import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tag; -import io.micrometer.core.instrument.config.MeterFilter; import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Scope; @@ -186,7 +185,7 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener, FateEnv, PrimaryManagerThriftService { static final Logger log = LoggerFactory.getLogger(Manager.class); - + // visible for testing public static final String PRIMARY_TAG_KEY = "manager.primary"; @@ -949,27 +948,19 @@ private void checkForHeldServer(SortedMap ts // This is called after getting the assistant manager lock private void setupAssistantMetrics(MetricsProducer... producers) { MetricsInfo metricsInfo = getContext().getMetricsInfo(); - metricsInfo.addMetricsProducers(producers); List tags = new ArrayList<>(); tags.addAll(MetricsInfo.serviceTags(getContext().getInstanceName(), getApplicationName(), getAdvertiseAddress(), getResourceGroup())); // Add the primary tag tags.add(Tag.of(PRIMARY_TAG_KEY, "false")); - // Create a MeterFilter that will replace the primary - // tag value with a true if this Manager holds the primary lock - var primaryMeterFilter = MeterFilter.replaceTagValues(PRIMARY_TAG_KEY, (currentValue) -> { - if (isPrimaryManager()) { - return "true"; - } else { - return "false"; - } - }); - metricsInfo.init(tags, primaryMeterFilter); + metricsInfo.init(tags); + metricsInfo.addMetricsProducers(producers); } // This is called after getting the primary manager lock private void setupPrimaryMetrics() { MetricsInfo metricsInfo = getContext().getMetricsInfo(); + metricsInfo.reinit(List.of(Tag.of(PRIMARY_TAG_KEY, "true"))); metricsInfo.addMetricsProducers(balanceManager.getMetrics()); // ensure all tablet group watchers are setup Preconditions.checkState(watchers.size() == DataLevel.values().length); diff --git a/test/src/main/java/org/apache/accumulo/test/MultipleManagerMetricsIT.java b/test/src/main/java/org/apache/accumulo/test/MultipleManagerMetricsIT.java index 9c43a74fe38..e55919e5f28 100644 --- a/test/src/main/java/org/apache/accumulo/test/MultipleManagerMetricsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/MultipleManagerMetricsIT.java @@ -1,23 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.accumulo.test; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import java.util.Arrays; +import java.util.BitSet; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; +import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; +import org.apache.accumulo.core.metrics.Metric; import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.test.functional.ConfigurableMacBase; +import org.apache.accumulo.test.metrics.MetricsIT; import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory; import org.apache.accumulo.test.metrics.TestStatsDSink; import org.apache.accumulo.test.util.Wait; @@ -36,17 +64,17 @@ public class MultipleManagerMetricsIT extends ConfigurableMacBase { private static final Logger LOG = LoggerFactory.getLogger(MultipleManagerMetricsIT.class); private static TestStatsDSink sink; + private final AtomicReference currentAssistant = new AtomicReference<>(); private final AtomicReference currentPrimary = new AtomicReference<>(); - private final AtomicBoolean stopThread = new AtomicBoolean(false); - + @Override protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { // Set this lower so that locks timeout faster cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s"); cfg.setProperty(Property.MANAGER_STARTUP_MANAGER_AVAIL_MIN_COUNT, "2"); - cfg.setProperty(Property.MANAGER_STARTUP_MANAGER_AVAIL_MAX_WAIT, "10s"); + cfg.setProperty(Property.MANAGER_STARTUP_MANAGER_AVAIL_MAX_WAIT, "5m"); cfg.getClusterServerConfiguration().setNumManagers(2); - + // Tell the server processes to use a StatsDMeterRegistry and the simple logging registry // that will be configured to push all metrics to the sink we started. cfg.setProperty(Property.GENERAL_MICROMETER_ENABLED, "true"); @@ -56,10 +84,10 @@ protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSit Map sysProps = Map.of(TestStatsDRegistryFactory.SERVER_HOST, "127.0.0.1", TestStatsDRegistryFactory.SERVER_PORT, Integer.toString(sink.getPort())); cfg.setSystemProperties(sysProps); - + super.configure(cfg, hadoopCoreSite); } - + @BeforeAll public static void before() throws Exception { sink = new TestStatsDSink(); @@ -69,87 +97,279 @@ public static void before() throws Exception { public static void after() throws Exception { sink.close(); } - - private String getPrimary() { - Set primaries = getCluster().getServerContext().instanceOperations().getServers(ServerId.Type.MANAGER); + + // @BeforeEach + // public void setup() throws Exception { + // // Dump the stored metrics + // sink.getLines(); + // } + + private String getPrimaryFromZK() { + Set primaries = + getCluster().getServerContext().instanceOperations().getServers(ServerId.Type.MANAGER); + if (primaries == null || primaries.isEmpty()) { + return null; + } assertEquals(1, primaries.size()); ServerId primary = primaries.iterator().next(); return primary.toHostPortString(); } - -// private Set getManagers() { -// Set results = new HashSet<>(); -// Set managers = getCluster().getServerContext().getServerPaths().getAssistantManagers(AddressSelector.all(), true); -// managers.forEach(m -> results.add(m.getServer())); -// return results; -// } - + private void killPrimary() throws KeeperException, InterruptedException { ServiceLockPath slp = getCluster().getServerContext().getServerPaths().getManager(true); - getCluster().getServerContext().getZooSession().asReaderWriter().recursiveDelete(slp.toString(), NodeMissingPolicy.SKIP); + ServiceLock.deleteLock(getCluster().getServerContext().getZooSession().asReaderWriter(), slp); + // We removed the lock in ZooKeeper, we need to refresh the processes in Mini. + // If we don't do this, then it will still think that 2 are running and a + // subsequent call to start() will do nothing. + while (getCluster().getClusterControl().getProcesses(ServerType.MANAGER).size() != 1) { + Thread.sleep(250); + getCluster().getClusterControl().refreshProcesses(ServerType.MANAGER); + } + } + + private void processMetrics() throws InterruptedException { + // Dump the stored metrics + sink.getLines(); + // The TestStatsDSink polling frequency is 3s, wait 10s. + Thread.sleep(10_000); + List statsDMetrics = sink.getLines(); + + // Figure out current primary + List primaryMetrics = statsDMetrics.stream() + .filter(line -> line.startsWith("accumulo")).map(TestStatsDSink::parseStatsDMetric) + .filter(m -> m.getTags().get(MetricsInfo.PROCESS_NAME_TAG_KEY) + .equals(ServerId.Type.MANAGER.name())) + .filter(m -> m.getTags().get(Manager.PRIMARY_TAG_KEY).equals("true")).toList(); + if (primaryMetrics.size() > 0) { + TestStatsDSink.Metric last = primaryMetrics.get(primaryMetrics.size() - 1); + LOG.info("Last primary metric found: {}", last); + String host = last.getTags().get(MetricsInfo.HOST_TAG_KEY); + String port = last.getTags().get(MetricsInfo.PORT_TAG_KEY); + if (host != null && port != null) { + currentPrimary.set(HostAndPort.fromParts(host, Integer.parseInt(port)).toString()); + } + } + + // Figure out current assistant + List assistantMetrics = statsDMetrics.stream() + .filter(line -> line.startsWith("accumulo")).map(TestStatsDSink::parseStatsDMetric) + .filter(m -> m.getTags().get(MetricsInfo.PROCESS_NAME_TAG_KEY) + .equals(ServerId.Type.MANAGER.name())) + .filter(m -> m.getTags().get(Manager.PRIMARY_TAG_KEY).equals("false")).toList(); + if (assistantMetrics.size() > 0) { + TestStatsDSink.Metric last = assistantMetrics.get(assistantMetrics.size() - 1); + LOG.info("Last assistant metric found: {}", last); + String host = last.getTags().get(MetricsInfo.HOST_TAG_KEY); + String port = last.getTags().get(MetricsInfo.PORT_TAG_KEY); + if (host != null && port != null) { + String assistant = HostAndPort.fromParts(host, Integer.parseInt(port)).toString(); + if (!assistant.equals(currentPrimary.get())) { + currentAssistant.set(assistant); + } + } + } } - + @Test public void testPrimaryManagerTagChanges() throws Exception { - - AtomicReference threadError = new AtomicReference<>(); - - Thread t = new Thread(() -> { - List statsDMetrics; - while (!stopThread.get()) { - if (!(statsDMetrics = sink.getLines()).isEmpty()) { - List metrics = statsDMetrics.stream().filter(line -> line.startsWith("accumulo")) - .map(TestStatsDSink::parseStatsDMetric) - .filter(m -> m.getTags().get(MetricsInfo.PROCESS_NAME_TAG_KEY).equals(ServerId.Type.MANAGER.name())) - .filter(m -> m.getTags().get(Manager.PRIMARY_TAG_KEY).equals("true")) - .toList(); - if (metrics.size() > 0) { - TestStatsDSink.Metric last = metrics.get(metrics.size() - 1); - String host = last.getTags().get(MetricsInfo.HOST_TAG_KEY); - String port = last.getTags().get(MetricsInfo.PORT_TAG_KEY); - if (host != null && port != null) { - currentPrimary.set(HostAndPort.fromParts(host, Integer.parseInt(port)).toString()); - } - } - } - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - threadError.set(e); - } + + String previousPrimary = null; + String previousAssistant = null; + + for (int i = 0; i < 5; i++) { + // Wait for primary lock to be acquired + LOG.info("Waiting for primary manager lock"); + Wait.waitFor(() -> getPrimaryFromZK() != null, 60_000); + + String primary = getPrimaryFromZK(); + LOG.info("Primary manager: {}", primary); + + // Check that the primary address switched + if (previousPrimary != null) { + assertNotEquals(previousPrimary, primary, "Current primary is equal to prior primary"); + LOG.info("Primary manager address has changed, starting killed Manager"); + getCluster().start(); } - }); - - t.start(); - try { - String previousPrimary = null; - - for (int i = 0; i < 5; i++) { - // Wait for primary lock to be acquired - Wait.waitFor(() -> getCluster().getServerContext().getServerPaths().getManager(true) != null, 60_000); - - String primary = getPrimary(); - LOG.info("Primary manager at: {}", primary); - - // Check that the primary address switched - if (previousPrimary != null) { - assertNotEquals(previousPrimary, primary); - getCluster().start(); - } - - // Wait for primary metric to match current primary - final String tmp = primary; - Wait.waitFor(() -> currentPrimary.get() != null && currentPrimary.get().equals(tmp), 60_000); - - // Kill the Primary - previousPrimary = primary; - killPrimary(); + + // Wait for primary metric to match current primary + final String tmp = primary; + LOG.info("Waiting for metric to equal current primary"); + Wait.waitFor(() -> { + processMetrics(); + return currentPrimary.get() != null && currentPrimary.get().equals(tmp); + }, 60_000, 10_000); + + // Check that the new primary is the prior assistant + if (previousAssistant != null) { + final String tmpAssistant = previousAssistant; + Wait.waitFor(() -> { + processMetrics(); + return !currentAssistant.get().equals(tmpAssistant); + }, 60_000, 10_000); + assertEquals(previousAssistant, primary, + "Current primary is not equal to the prior assistant"); + } else { + Wait.waitFor(() -> { + processMetrics(); + return currentAssistant.get() != null; + }, 60_000, 10_000); } - } finally { - stopThread.set(true); - t.join(); + previousAssistant = currentAssistant.get(); + + // Kill the Primary + previousPrimary = primary; + LOG.info("Killing primary manager: {}", primary); + killPrimary(); } - assertNull(threadError.get()); + } + + @Test + public void testMetricsPublishedOnNewPrimary() throws Exception { + + // Wait for primary lock to be acquired + LOG.info("Waiting for primary manager lock"); + Wait.waitFor(() -> getPrimaryFromZK() != null, 60_000); + + String primary = getPrimaryFromZK(); + LOG.info("Primary manager: {}", primary); + + // Wait for primary metric to match current primary + final String tmp = primary; + LOG.info("Waiting for metric to equal current primary"); + Wait.waitFor(() -> { + processMetrics(); + return currentPrimary.get() != null && currentPrimary.get().equals(tmp); + }, 60_000, 10_000); + + Wait.waitFor(() -> { + processMetrics(); + return currentAssistant.get() != null; + }, 60_000, 10_000); + String previousAssistant = currentAssistant.get(); + + confirmMetricsPublished(); + + // Kill the Primary + LOG.info("Killing primary manager: {}", primary); + killPrimary(); + + // Wait for primary lock to be acquired + LOG.info("Waiting for primary manager lock"); + Wait.waitFor(() -> getPrimaryFromZK() != null, 60_000); + + primary = getPrimaryFromZK(); + LOG.info("Primary manager: {}", primary); + + // Wait for primary metric to match current primary + final String tmp2 = primary; + LOG.info("Waiting for metric to equal current primary"); + Wait.waitFor(() -> { + processMetrics(); + return currentPrimary.get() != null && currentPrimary.get().equals(tmp2); + }, 60_000, 10_000); + + getCluster().start(); + + final String tmpAssistant = previousAssistant; + Wait.waitFor(() -> { + processMetrics(); + return !currentAssistant.get().equals(tmpAssistant); + }, 60_000, 10_000); + assertEquals(previousAssistant, primary, "Current primary is not equal to the prior assistant"); + + confirmMetricsPublished(); + + } + + public void confirmMetricsPublished() throws Exception { + + Set expectedMetrics = new HashSet<>(Arrays.asList(Metric.values())); + expectedMetrics.removeAll(MetricsIT.flakyMetrics); // might not see these + expectedMetrics.removeAll(MetricsIT.unexpectedMetrics); // definitely shouldn't see these + assertFalse(expectedMetrics.isEmpty()); // make sure we didn't remove everything + + Set seenMetrics = new HashSet<>(); + + List statsDMetrics; + + final int compactionPriorityQueueQueuedBit = 0; + final int compactionPriorityQueueDequeuedBit = 1; + final int compactionPriorityQueueRejectedBit = 2; + final int compactionPriorityQueuePriorityBit = 3; + final int compactionPriorityQueueSizeBit = 4; + + final BitSet trueSet = new BitSet(5); + trueSet.set(0, 4, true); + + final BitSet queueMetricsSeen = new BitSet(5); + + AtomicReference error = new AtomicReference<>(); + Thread workerThread = new Thread(() -> { + try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { + MetricsIT.doWorkToGenerateMetrics(client, getClass()); + } catch (Exception e) { + error.set(e); + } + }); + workerThread.start(); + + // Wait for metrics to build up + Thread.sleep(30_000); + + // loop until we run out of lines or until we see all expected metrics + while (!(statsDMetrics = sink.getLines()).isEmpty() && !expectedMetrics.isEmpty() + && !queueMetricsSeen.intersects(trueSet)) { + // for each metric name not yet seen, check if it is expected, flaky, or unknown + statsDMetrics.stream().filter(line -> line.startsWith("accumulo")) + .map(TestStatsDSink::parseStatsDMetric).map(metric -> Metric.fromName(metric.getName())) + .filter(metric -> !seenMetrics.contains(metric)).forEach(metric -> { + if (expectedMetrics.contains(metric)) { + // record expected Metric as seen + seenMetrics.add(metric); + expectedMetrics.remove(metric); + } else if (MetricsIT.flakyMetrics.contains(metric)) { + // ignore any flaky metric names seen + // these aren't always expected, but we shouldn't be surprised if we see them + } else if (metric.getName().startsWith("accumulo.compaction.")) { + // Compactor queue metrics are not guaranteed to be emitted + // during the call to doWorkToGenerateMetrics above. This will + // flip a bit in the BitSet when each metric is seen. The top-level + // loop will continue to iterate until all the metrics are seen. + seenMetrics.add(metric); + expectedMetrics.remove(metric); + switch (metric) { + case COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED: + queueMetricsSeen.set(compactionPriorityQueueQueuedBit, true); + break; + case COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED: + queueMetricsSeen.set(compactionPriorityQueueDequeuedBit, true); + break; + case COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED: + queueMetricsSeen.set(compactionPriorityQueueRejectedBit, true); + break; + case COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY: + queueMetricsSeen.set(compactionPriorityQueuePriorityBit, true); + break; + case COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_SIZE: + queueMetricsSeen.set(compactionPriorityQueueSizeBit, true); + break; + default: + break; + } + } else { + // completely unexpected metric + fail("Found accumulo metric not in expectedMetricNames or flakyMetricNames: " + + metric); + } + }); + log.debug("METRICS: metrics expected, but not seen so far: {}", expectedMetrics); + Thread.sleep(4_000); + } + assertTrue(expectedMetrics.isEmpty(), + "Did not see all expected metric names, missing: " + expectedMetrics); + + workerThread.join(); + assertNull(error.get()); } } diff --git a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java index e764b5c7359..3dc7684f447 100644 --- a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java @@ -100,6 +100,30 @@ public class MetricsIT extends ConfigurableMacBase implements MetricsProducer { private static final int numFateThreadsPool3 = 15; private static final String allOpsFateExecutorName = "pool1"; + // meter names sorted and formatting disabled to make it easier to diff changes + // @formatter:off + public static Set unexpectedMetrics = Set.of( + SCAN_YIELDS, + COMPACTOR_MAJC_CANCELLED, + COMPACTOR_MAJC_FAILED, + COMPACTOR_MAJC_FAILURES_CONSECUTIVE, + COMPACTOR_MAJC_FAILURES_TERMINATION + ); + + // add sserver as flaky until scan server included in mini tests. + public static Set flakyMetrics = Set.of( + COMPACTOR_MAJC_STUCK, + FATE_TYPE_IN_PROGRESS, + MANAGER_BALANCER_MIGRATIONS_NEEDED, + SCAN_BUSY_TIMEOUT_COUNT, + SCAN_RESERVATION_CONFLICT_COUNTER, + SCAN_RESERVATION_TOTAL_TIMER, + SCAN_RESERVATION_WRITEOUT_TIMER, + SCAN_TABLET_METADATA_CACHE, + SERVER_IDLE + ); + // @formatter:on + @Override protected Duration defaultTimeout() { return Duration.ofMinutes(3); @@ -147,30 +171,6 @@ protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSit @Test public void confirmMetricsPublished() throws Exception { - // meter names sorted and formatting disabled to make it easier to diff changes - // @formatter:off - Set unexpectedMetrics = Set.of( - SCAN_YIELDS, - COMPACTOR_MAJC_CANCELLED, - COMPACTOR_MAJC_FAILED, - COMPACTOR_MAJC_FAILURES_CONSECUTIVE, - COMPACTOR_MAJC_FAILURES_TERMINATION - ); - - // add sserver as flaky until scan server included in mini tests. - Set flakyMetrics = Set.of( - COMPACTOR_MAJC_STUCK, - FATE_TYPE_IN_PROGRESS, - MANAGER_BALANCER_MIGRATIONS_NEEDED, - SCAN_BUSY_TIMEOUT_COUNT, - SCAN_RESERVATION_CONFLICT_COUNTER, - SCAN_RESERVATION_TOTAL_TIMER, - SCAN_RESERVATION_WRITEOUT_TIMER, - SCAN_TABLET_METADATA_CACHE, - SERVER_IDLE - ); - // @formatter:on - Set expectedMetrics = new HashSet<>(Arrays.asList(Metric.values())); expectedMetrics.removeAll(flakyMetrics); // might not see these expectedMetrics.removeAll(unexpectedMetrics); // definitely shouldn't see these @@ -465,7 +465,8 @@ private void changeFateConfig(AccumuloClient client, FateInstanceType type) thro } } - static void doWorkToGenerateMetrics(AccumuloClient client, Class testClass) throws Exception { + public static void doWorkToGenerateMetrics(AccumuloClient client, Class testClass) + throws Exception { String tableName = testClass.getSimpleName(); client.tableOperations().create(tableName); BatchWriterConfig config = new BatchWriterConfig().setMaxMemory(0); From 43009eb2dff572df099806e7d2672f5c2fe12112 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Fri, 13 Mar 2026 20:52:18 +0000 Subject: [PATCH 3/5] Add missing override annotation --- .../java/org/apache/accumulo/server/metrics/MetricsInfoImpl.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsInfoImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsInfoImpl.java index 8eeae38a21d..449e21b095b 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsInfoImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsInfoImpl.java @@ -110,6 +110,7 @@ public synchronized void addMetricsProducers(MetricsProducer... producer) { } } + @Override public synchronized boolean isInitialized() { return configuration.get() != null; } From 9def379e2d2f5d53753ad7bfe85ecad821454fa3 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Fri, 13 Mar 2026 21:03:29 +0000 Subject: [PATCH 4/5] Remove commented method in test --- .../org/apache/accumulo/test/MultipleManagerMetricsIT.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/test/src/main/java/org/apache/accumulo/test/MultipleManagerMetricsIT.java b/test/src/main/java/org/apache/accumulo/test/MultipleManagerMetricsIT.java index e55919e5f28..78bdc283d2d 100644 --- a/test/src/main/java/org/apache/accumulo/test/MultipleManagerMetricsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/MultipleManagerMetricsIT.java @@ -98,12 +98,6 @@ public static void after() throws Exception { sink.close(); } - // @BeforeEach - // public void setup() throws Exception { - // // Dump the stored metrics - // sink.getLines(); - // } - private String getPrimaryFromZK() { Set primaries = getCluster().getServerContext().instanceOperations().getServers(ServerId.Type.MANAGER); From 254a2432fc1a80879aec26d2dea319768b68bcee Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Fri, 13 Mar 2026 21:27:56 +0000 Subject: [PATCH 5/5] Make MetricsIT vars final --- .../main/java/org/apache/accumulo/test/metrics/MetricsIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java index 3dc7684f447..4e5379dd2f0 100644 --- a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java @@ -102,7 +102,7 @@ public class MetricsIT extends ConfigurableMacBase implements MetricsProducer { // meter names sorted and formatting disabled to make it easier to diff changes // @formatter:off - public static Set unexpectedMetrics = Set.of( + public static final Set unexpectedMetrics = Set.of( SCAN_YIELDS, COMPACTOR_MAJC_CANCELLED, COMPACTOR_MAJC_FAILED, @@ -111,7 +111,7 @@ public class MetricsIT extends ConfigurableMacBase implements MetricsProducer { ); // add sserver as flaky until scan server included in mini tests. - public static Set flakyMetrics = Set.of( + public static final Set flakyMetrics = Set.of( COMPACTOR_MAJC_STUCK, FATE_TYPE_IN_PROGRESS, MANAGER_BALANCER_MIGRATIONS_NEEDED,