From 6c9eb667d381bd52fa22db6596a54d823ffcdc56 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Fri, 24 Apr 2026 21:39:33 +0000 Subject: [PATCH 1/2] WIP --- .../monitor/next/InformationFetcher.java | 79 +++++++++++++++++++ 1 file changed, 79 insertions(+) diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/next/InformationFetcher.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/next/InformationFetcher.java index 026828c5795..5df1ef053ba 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/next/InformationFetcher.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/next/InformationFetcher.java @@ -22,18 +22,24 @@ import java.time.Duration; import java.util.ArrayList; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Stream; +import com.google.common.base.Suppliers; import jakarta.ws.rs.ServiceUnavailableException; import jakarta.ws.rs.core.Response; @@ -41,9 +47,18 @@ import org.apache.accumulo.core.client.admin.TabletInformation; import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.client.admin.servers.ServerId.Type; +import org.apache.accumulo.core.clientImpl.TabletInformationImpl; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.RowRange; import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.metadata.TabletState; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.process.thrift.MetricResponse; import org.apache.accumulo.core.process.thrift.ServerProcessService.Client; import org.apache.accumulo.core.rpc.ThriftUtil; @@ -139,6 +154,69 @@ public void run() { } + public static Future fetchTabletMetadata(ServerContext ctx, Consumer tabletConsumer, ExecutorService executor){ + + Supplier currentTime = Suppliers.memoize(() -> { + try { + return ctx.instanceOperations().getManagerTime(); + } catch (Exception e) { + throw new IllegalStateException(e); + } + }); + + // create an initial background task to read root tablet metadata from zookeeper + var rootStage = CompletableFuture.supplyAsync(()-> { + // read live tservers from zookeeper + Set liveTserverSet = TabletMetadata.getLiveTServers(ctx); + // read root tablet metadata from zookeeper + var rootTablet = ctx.getAmple().readTablet(RootTable.EXTENT); + var tabletSate = TabletState.compute(rootTablet, liveTserverSet); + tabletConsumer.accept(new TabletInformationImpl(rootTablet, tabletSate::toString, currentTime)); + if(tabletSate == TabletState.HOSTED){ + return liveTserverSet; + }else{ + LOG.info("Not scanning root tablet because its state is {}", TabletState.compute(rootTablet, liveTserverSet)); + return null; + } + }, executor); + + // runs a follow on task that scans the root tablet and creates a background task to scan each metadata table + var metaStage = rootStage.thenCompose(liveTserverSet->{ + if(liveTserverSet == null){ + // root stage was successful + return null; + } + try(var metaTablets = ctx.getAmple().readTablets().forLevel(Ample.DataLevel.METADATA).build()){ + List> futures = new ArrayList<>(); + for(var metaTablet : metaTablets){ + var tabletState = TabletState.compute(metaTablet, liveTserverSet); + tabletConsumer.accept(new TabletInformationImpl(metaTablet, tabletState::toString, currentTime)); + if(tabletState == TabletState.HOSTED) { + var range = MetadataSchema.TabletsSection.getRange().clip(metaTablet.getExtent().toDataRange(), true); + if(range != null) { + // spawn a task to scan this metadata tablet + var future = CompletableFuture.runAsync(()->{ + try(var userTablets = ctx.getAmple().readTablets().scanMetadataTable().overRange(range).build()){ + for(var userTablet : userTablets){ + tabletConsumer.accept(new TabletInformationImpl(userTablet, ()->TabletState.compute(userTablet, liveTserverSet).toString(), currentTime)); + } + } + }); + futures.add(future); + } + }else{ + LOG.info("Not scanning meta tablet {} because its state is {}", metaTablet.getExtent(), tabletState); + } + } + + // return a completable future that waits for all the scans of metadata tablets + return CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)); + } + }); + + return metaStage; + } + private class TableInformationFetcher implements Runnable { private final ServerContext ctx; private final TableId tableId; @@ -296,6 +374,7 @@ public void run() { futures.add(this.pool.submit(new RunningCompactionFetcher(summary, pool))); // Fetch Tablet / Tablet information from the metadata table + for (TableId tableId : this.ctx.createQualifiedTableNameToIdMap().values()) { futures.add(this.pool.submit(new TableInformationFetcher(this.ctx, tableId, summary))); } From de35b648bf9be6a3c100cf09b41293ee83669dfe Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Mon, 27 Apr 2026 18:52:28 +0000 Subject: [PATCH 2/2] WIP --- .../monitor/next/InformationFetcher.java | 127 +++++++++--------- 1 file changed, 62 insertions(+), 65 deletions(-) diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/next/InformationFetcher.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/next/InformationFetcher.java index 5df1ef053ba..2ca7a8af30b 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/next/InformationFetcher.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/next/InformationFetcher.java @@ -22,11 +22,8 @@ import java.time.Duration; import java.util.ArrayList; -import java.util.Comparator; -import java.util.Iterator; import java.util.List; import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -37,9 +34,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Supplier; -import java.util.stream.Stream; -import com.google.common.base.Suppliers; import jakarta.ws.rs.ServiceUnavailableException; import jakarta.ws.rs.core.Response; @@ -49,16 +44,14 @@ import org.apache.accumulo.core.client.admin.servers.ServerId.Type; import org.apache.accumulo.core.clientImpl.TabletInformationImpl; import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.RowRange; import org.apache.accumulo.core.data.TableId; -import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.SystemTables; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.TabletState; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.core.metadata.schema.TabletMetadata; -import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.process.thrift.MetricResponse; import org.apache.accumulo.core.process.thrift.ServerProcessService.Client; import org.apache.accumulo.core.rpc.ThriftUtil; @@ -79,6 +72,7 @@ import com.github.benmanes.caffeine.cache.RemovalCause; import com.github.benmanes.caffeine.cache.RemovalListener; import com.github.benmanes.caffeine.cache.Scheduler; +import com.google.common.base.Suppliers; import com.google.common.net.HostAndPort; public class InformationFetcher implements RemovalListener, Runnable { @@ -154,7 +148,27 @@ public void run() { } - public static Future fetchTabletMetadata(ServerContext ctx, Consumer tabletConsumer, ExecutorService executor){ + public static Future fetchTabletMetadata(ServerContext ctx, ExecutorService executor, + SystemInformation summary) { + return fetchTabletMetadata(ctx, tabletInformation -> { + final TableId tableId = tabletInformation.getTabletId().getTable(); + try { + final String tableName = ctx.getQualifiedTableName(tableId); + summary.processTabletInformation(tableId, tableName, tabletInformation); + } catch (TableNotFoundException e) { + if (SystemTables.containsTableId(tableId)) { + throw new IllegalStateException(e); + } else { + LOG.debug("Table name for table id : {}, assuming table deleted", tableId); + } + throw new RuntimeException(e); + } + + }, executor); + } + + public static Future fetchTabletMetadata(ServerContext ctx, + Consumer tabletConsumer, ExecutorService executor) { Supplier currentTime = Suppliers.memoize(() -> { try { @@ -165,47 +179,59 @@ public static Future fetchTabletMetadata(ServerContext ctx, Consumer { + var rootStage = CompletableFuture.supplyAsync(() -> { // read live tservers from zookeeper Set liveTserverSet = TabletMetadata.getLiveTServers(ctx); // read root tablet metadata from zookeeper var rootTablet = ctx.getAmple().readTablet(RootTable.EXTENT); var tabletSate = TabletState.compute(rootTablet, liveTserverSet); - tabletConsumer.accept(new TabletInformationImpl(rootTablet, tabletSate::toString, currentTime)); - if(tabletSate == TabletState.HOSTED){ + tabletConsumer + .accept(new TabletInformationImpl(rootTablet, tabletSate::toString, currentTime)); + if (tabletSate == TabletState.HOSTED) { return liveTserverSet; - }else{ - LOG.info("Not scanning root tablet because its state is {}", TabletState.compute(rootTablet, liveTserverSet)); + } else { + LOG.info("Not scanning root tablet because its state is {}", + TabletState.compute(rootTablet, liveTserverSet)); return null; } }, executor); - // runs a follow on task that scans the root tablet and creates a background task to scan each metadata table - var metaStage = rootStage.thenCompose(liveTserverSet->{ - if(liveTserverSet == null){ - // root stage was successful + // runs a follow on task that scans the root tablet and creates a background task to scan each + // metadata table + var metaStage = rootStage.thenCompose(liveTserverSet -> { + if (liveTserverSet == null) { + // root stage was not successful return null; } - try(var metaTablets = ctx.getAmple().readTablets().forLevel(Ample.DataLevel.METADATA).build()){ + // TODO set an aggressive timeout on the metadata scan, if a failure happens after our checks + // it could cause the scan to hang. + try (var metaTablets = + ctx.getAmple().readTablets().forLevel(Ample.DataLevel.METADATA).build()) { List> futures = new ArrayList<>(); - for(var metaTablet : metaTablets){ + for (var metaTablet : metaTablets) { var tabletState = TabletState.compute(metaTablet, liveTserverSet); - tabletConsumer.accept(new TabletInformationImpl(metaTablet, tabletState::toString, currentTime)); - if(tabletState == TabletState.HOSTED) { - var range = MetadataSchema.TabletsSection.getRange().clip(metaTablet.getExtent().toDataRange(), true); - if(range != null) { + tabletConsumer + .accept(new TabletInformationImpl(metaTablet, tabletState::toString, currentTime)); + if (tabletState == TabletState.HOSTED) { + var range = MetadataSchema.TabletsSection.getRange() + .clip(metaTablet.getExtent().toDataRange(), true); + if (range != null) { // spawn a task to scan this metadata tablet - var future = CompletableFuture.runAsync(()->{ - try(var userTablets = ctx.getAmple().readTablets().scanMetadataTable().overRange(range).build()){ - for(var userTablet : userTablets){ - tabletConsumer.accept(new TabletInformationImpl(userTablet, ()->TabletState.compute(userTablet, liveTserverSet).toString(), currentTime)); + var future = CompletableFuture.runAsync(() -> { + try (var userTablets = + ctx.getAmple().readTablets().scanMetadataTable().overRange(range).build()) { + for (var userTablet : userTablets) { + tabletConsumer.accept(new TabletInformationImpl(userTablet, + () -> TabletState.compute(userTablet, liveTserverSet).toString(), + currentTime)); } } }); futures.add(future); } - }else{ - LOG.info("Not scanning meta tablet {} because its state is {}", metaTablet.getExtent(), tabletState); + } else { + LOG.info("Not scanning meta tablet {} because its state is {}", metaTablet.getExtent(), + tabletState); } } @@ -217,34 +243,6 @@ public static Future fetchTabletMetadata(ServerContext ctx, Consumer tablets = - this.ctx.tableOperations().getTabletInformation(tableName, List.of(RowRange.all()))) { - tablets.forEach(t -> summary.processTabletInformation(tableId, tableName, t)); - } - } catch (TableNotFoundException e) { - LOG.warn("TableNotFoundException thrown while trying to gather information for TableId: {}", - tableId, e); - } catch (Exception e) { - LOG.warn("Interrupted while trying to gather information for TableId: {}", tableId, e); - } - } - } - private class RunningCompactionFetcher implements Runnable { private final SystemInformation summary; @@ -375,9 +373,7 @@ public void run() { // Fetch Tablet / Tablet information from the metadata table - for (TableId tableId : this.ctx.createQualifiedTableNameToIdMap().values()) { - futures.add(this.pool.submit(new TableInformationFetcher(this.ctx, tableId, summary))); - } + futures.add(fetchTabletMetadata(ctx, pool, summary)); futures.add(this.pool.submit(() -> { try { @@ -403,13 +399,10 @@ public void run() { tookToLong = true; } - Iterator> iter = futures.iterator(); - while (iter.hasNext()) { - Future future = iter.next(); + for (Future future : futures) { if (tookToLong && !future.isCancelled()) { future.cancel(true); } else if (future.isDone()) { - iter.remove(); try { future.get(); } catch (CancellationException | InterruptedException | ExecutionException e) { @@ -417,6 +410,10 @@ public void run() { } } } + + // more efficient to do batch removal from array list than doing it one by one + futures.removeIf(Future::isDone); + if (!futures.isEmpty()) { UtilWaitThread.sleep(3_000); }