Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,11 @@ Each key is the name of the pool (can be assigned any string). Each value is a J
would cause all `/rest/` endpoints to be hosted at `/accumulo/rest/*`.
""",
"2.1.4"),
MONITOR_LONG_RUNNING_COMPACTION_LIMIT("monitor.compactions.long.running.limit", "50",
PropertyType.COUNT,
"The number of long running compactions to display per resource group. The Monitor server will"
+ " keep twice this number in memory as it builds the next list while serving up the current list.",
"4.0.0"),
// per table properties
TABLE_PREFIX("table.", null, PropertyType.PREFIX, """
Properties in this category affect tablet server treatment of tablets, \
Expand Down
69 changes: 69 additions & 0 deletions core/src/main/java/org/apache/accumulo/core/rpc/RpcFuture.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.rpc;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.accumulo.core.client.admin.servers.ServerId;

public class RpcFuture<T> implements Future<T> {

private final Future<T> future;
private final ServerId server;

public RpcFuture(Future<T> future, ServerId address) {
super();
this.future = future;
this.server = address;
}

public ServerId getServer() {
return server;
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return future.cancel(mayInterruptIfRunning);
}

@Override
public boolean isCancelled() {
return future.isCancelled();
}

@Override
public boolean isDone() {
return future.isDone();
}

@Override
public T get() throws InterruptedException, ExecutionException {
return future.get();
}

@Override
public T get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return future.get(timeout, unit);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.Consumer;

import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService;
Expand All @@ -46,6 +48,7 @@
import org.apache.accumulo.core.lock.ServiceLockPaths.ResourceGroupPredicate;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.rpc.RpcFuture;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
Expand Down Expand Up @@ -155,7 +158,7 @@ public static List<ActiveCompaction> getActiveCompaction(HostAndPort compactor,
* @return external compaction job or null if none running
*/
public static TExternalCompaction getRunningCompaction(HostAndPort compactorAddr,
ClientContext context) {
ClientContext context) throws TException {

CompactorService.Client client = null;
try {
Expand All @@ -169,6 +172,7 @@ public static TExternalCompaction getRunningCompaction(HostAndPort compactorAddr
}
} catch (TException e) {
LOG.debug("Failed to contact compactor {}", compactorAddr, e);
throw e;
} finally {
ThriftUtil.returnClient(client, context);
}
Expand All @@ -193,38 +197,79 @@ private static ExternalCompactionId getRunningCompactionId(HostAndPort compactor
}

/**
* This method returns information from the Compactor about the job that is currently running. The
* RunningCompactions are not fully populated. This method is used from the CompactionCoordinator
* on a restart to re-populate the set of running compactions on the compactors.
* This method returns information from the Compactors about the job that is currently running.
* This method will use a thread pool with 16 threads to query the Compactors.
*
* @param context server context
* @return list of compactor and external compaction jobs
* @param consumer object that will accept TExternalCompaction objects
*/
public static List<TExternalCompaction> getCompactionsRunningOnCompactors(ClientContext context) {
final List<Future<TExternalCompaction>> rcFutures = new ArrayList<>();
public static void getCompactionsRunningOnCompactors(ClientContext context,
Consumer<TExternalCompaction> consumer) throws InterruptedException {
final ExecutorService executor = ThreadPools.getServerThreadPools()
.getPoolBuilder(COMPACTOR_RUNNING_COMPACTIONS_POOL).numCoreThreads(16).build();
try {
getCompactionsRunningOnCompactors(context, executor, consumer);
} finally {
executor.shutdownNow();
}
}

context.getServerPaths().getCompactor(ResourceGroupPredicate.ANY, AddressSelector.all(), true)
.forEach(slp -> {
final HostAndPort hp = HostAndPort.fromString(slp.getServer());
rcFutures.add(executor.submit(() -> getRunningCompaction(hp, context)));
});
executor.shutdown();
/**
* This method returns information from the Compactors about the job that is currently running.
*
* @param context server context
* @param executor thread pool executor to use for querying Compactors
* @param consumer object that will accept TExternalCompaction objects
* @return list of compactor addresses where RPC failed
*/
public static List<ServerId> getCompactionsRunningOnCompactors(ClientContext context,
ExecutorService executor, Consumer<TExternalCompaction> consumer)
throws InterruptedException {

final List<TExternalCompaction> results = new ArrayList<>();
rcFutures.forEach(rcf -> {
try {
TExternalCompaction job = rcf.get();
if (job == null || job.getJob() == null || job.getJob().getExternalCompactionId() == null) {
return;
final List<RpcFuture<TExternalCompaction>> rcFutures = new ArrayList<>();
final List<ServerId> failures = new ArrayList<>();

try {
Set<ServerId> compactors = context.instanceOperations().getServers(ServerId.Type.COMPACTOR);
compactors.forEach(s -> {
final HostAndPort address = HostAndPort.fromParts(s.getHost(), s.getPort());
Future<TExternalCompaction> future =
executor.submit(() -> getRunningCompaction(address, context));
rcFutures.add(new RpcFuture<TExternalCompaction>(future, s));
});

while (!rcFutures.isEmpty()) {
var futureIter = rcFutures.iterator();
while (futureIter.hasNext()) {
var future = futureIter.next();
if (future.isDone()) {
try {
TExternalCompaction tec = future.get();
if (tec != null && tec.getJob() != null
&& tec.getJob().getExternalCompactionId() != null) {
consumer.accept(tec);
}
} catch (ExecutionException e) {
LOG.error("Error getting compaction from compactor: " + future.getServer(), e);
failures.add(future.getServer());
} finally {
futureIter.remove();
}
}
}
results.add(job);
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException(e);
Thread.sleep(100);
}
});
return results;
return failures;
} catch (InterruptedException e) {
// If this thread is interrupted, cancel all remaining tasks
var futureIter = rcFutures.iterator();
while (futureIter.hasNext()) {
var future = futureIter.next();
future.cancel(true);
}
rcFutures.clear();
throw e;
}
}

public static Collection<ExternalCompactionId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,24 @@
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

import java.util.Comparator;
import java.util.List;
import java.util.TreeMap;

import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate;
import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.tabletserver.thrift.InputFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RunningCompactionInfo {
Comment thread
dlmarion marked this conversation as resolved.

// Variable names become JSON keys
public static record CompactionInputFileDetails(String metadataFileEntry, long size, long entries,
long timestamp) {
}

private static final Logger log = LoggerFactory.getLogger(RunningCompactionInfo.class);

// DO NOT CHANGE Variable names - they map to JSON keys in the Monitor
Expand All @@ -44,6 +53,8 @@ public class RunningCompactionInfo {
public final long duration;
public final String status;
public final long lastUpdate;
public final List<CompactionInputFileDetails> inputFiles;
public final String outputFile;

/**
* Info parsed about the external running compaction. Calculate the progress, which is defined as
Expand Down Expand Up @@ -76,40 +87,52 @@ public RunningCompactionInfo(TExternalCompaction ec) {
last = lastEntry.getValue();
updateMillis = lastEntry.getKey();
duration = NANOSECONDS.toMillis(last.getCompactionAgeNanos());
long durationMinutes = MILLISECONDS.toMinutes(duration);
if (durationMinutes > 15) {
log.trace("Compaction {} has been running for {} minutes", ecid, durationMinutes);
}

lastUpdate = nowMillis - updateMillis;
long sinceLastUpdateSeconds = MILLISECONDS.toSeconds(lastUpdate);
log.trace("Time since Last update {} - {} = {} seconds", nowMillis, updateMillis,
sinceLastUpdateSeconds);

var total = last.getEntriesToBeCompacted();
if (total > 0) {
percent = (last.getEntriesRead() / (float) total) * 100;
}
progress = percent;

if (updates.isEmpty()) {
status = "na";
} else {
status = last.state.name();
}
log.trace("Parsed running compaction {} for {} with progress = {}%", status, ecid, progress);
if (sinceLastUpdateSeconds > 30) {
log.trace("Compaction hasn't progressed from {} in {} seconds.", progress,
sinceLastUpdateSeconds);
}
} else {
log.trace("No updates found for {}", ecid);
lastUpdate = 1;
progress = percent;
status = "na";
duration = 0;
return;
}
long durationMinutes = MILLISECONDS.toMinutes(duration);
if (durationMinutes > 15) {
log.trace("Compaction {} has been running for {} minutes", ecid, durationMinutes);
}

lastUpdate = nowMillis - updateMillis;
long sinceLastUpdateSeconds = MILLISECONDS.toSeconds(lastUpdate);
log.trace("Time since Last update {} - {} = {} seconds", nowMillis, updateMillis,
sinceLastUpdateSeconds);
this.inputFiles = convertInputFiles(job.files);
this.outputFile = job.outputFile;

var total = last.getEntriesToBeCompacted();
if (total > 0) {
percent = (last.getEntriesRead() / (float) total) * 100;
}
progress = percent;
}

if (updates.isEmpty()) {
status = "na";
} else {
status = last.state.name();
}
log.trace("Parsed running compaction {} for {} with progress = {}%", status, ecid, progress);
if (sinceLastUpdateSeconds > 30) {
log.trace("Compaction hasn't progressed from {} in {} seconds.", progress,
sinceLastUpdateSeconds);
}
/**
* @return a list of {@link CompactionInputFileDetails} sorted largest to smallest
*/
private List<CompactionInputFileDetails> convertInputFiles(List<InputFile> files) {
return files.stream()
.map(file -> new CompactionInputFileDetails(file.metadataFileEntry, file.size, file.entries,
file.timestamp))
.sorted(Comparator.comparingLong(CompactionInputFileDetails::size).reversed()).toList();
}

@Override
Expand Down
Loading