Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/src/main/java/org/apache/accumulo/core/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public class Constants {
public static final String ZDEAD = "/dead";
public static final String ZDEADTSERVERS = ZDEAD + "/tservers";

public static final String ZSHUTTING_DOWN_TSERVERS = "/shutting-down-tservers";

public static final String ZFATE = "/fate";

public static final String ZNEXT_FILE = "/next_file";
Expand Down
43 changes: 40 additions & 3 deletions core/src/main/java/org/apache/accumulo/core/fate/FateKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Optional;

import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
Expand All @@ -41,28 +42,40 @@ public class FateKey {
private final FateKeyType type;
private final Optional<KeyExtent> keyExtent;
private final Optional<ExternalCompactionId> compactionId;
private final Optional<TServerInstance> tServerInstance;
private final byte[] serialized;

private FateKey(FateKeyType type, KeyExtent keyExtent) {
this.type = Objects.requireNonNull(type);
this.keyExtent = Optional.of(keyExtent);
this.compactionId = Optional.empty();
this.tServerInstance = Optional.empty();
this.serialized = serialize(type, keyExtent);
}

private FateKey(FateKeyType type, ExternalCompactionId compactionId) {
this.type = Objects.requireNonNull(type);
this.keyExtent = Optional.empty();
this.compactionId = Optional.of(compactionId);
this.tServerInstance = Optional.empty();
this.serialized = serialize(type, compactionId);
}

private FateKey(FateKeyType type, TServerInstance tServerInstance) {
this.type = Objects.requireNonNull(type);
this.keyExtent = Optional.empty();
this.compactionId = Optional.empty();
this.tServerInstance = Optional.of(tServerInstance);
this.serialized = serialize(type, tServerInstance);
}

private FateKey(byte[] serialized) {
try (DataInputBuffer buffer = new DataInputBuffer()) {
buffer.reset(serialized, serialized.length);
this.type = FateKeyType.valueOf(buffer.readUTF());
this.keyExtent = deserializeKeyExtent(type, buffer);
this.compactionId = deserializeCompactionId(type, buffer);
this.tServerInstance = deserializeTserverId(type, buffer);
this.serialized = serialized;
} catch (IOException e) {
throw new UncheckedIOException(e);
Expand Down Expand Up @@ -127,8 +140,12 @@ public static FateKey forMerge(KeyExtent extent) {
return new FateKey(FateKeyType.MERGE, extent);
}

public static FateKey forShutdown(TServerInstance tServerInstance) {
return new FateKey(FateKeyType.TSERVER_SHUTDOWN, tServerInstance);
}

public enum FateKeyType {
SPLIT, COMPACTION_COMMIT, MERGE
SPLIT, COMPACTION_COMMIT, MERGE, TSERVER_SHUTDOWN
}

private static byte[] serialize(FateKeyType type, KeyExtent ke) {
Expand All @@ -155,22 +172,42 @@ private static byte[] serialize(FateKeyType type, ExternalCompactionId compactio
}
}

private static byte[] serialize(FateKeyType type, TServerInstance tServerInstance) {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos)) {
dos.writeUTF(type.toString());
dos.writeUTF(tServerInstance.getHostPortSession());
dos.close();
return baos.toByteArray();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private static Optional<KeyExtent> deserializeKeyExtent(FateKeyType type, DataInputBuffer buffer)
throws IOException {
return switch (type) {
case SPLIT, MERGE -> Optional.of(KeyExtent.readFrom(buffer));
case COMPACTION_COMMIT -> Optional.empty();
case COMPACTION_COMMIT, TSERVER_SHUTDOWN -> Optional.empty();
};
}

private static Optional<ExternalCompactionId> deserializeCompactionId(FateKeyType type,
DataInputBuffer buffer) throws IOException {
return switch (type) {
case SPLIT, MERGE -> Optional.empty();
case SPLIT, MERGE, TSERVER_SHUTDOWN -> Optional.empty();
case COMPACTION_COMMIT -> Optional.of(ExternalCompactionId.of(buffer.readUTF()));
};
}

private static Optional<TServerInstance> deserializeTserverId(FateKeyType type,
DataInputBuffer buffer) throws IOException {
return switch (type) {
case SPLIT, MERGE, COMPACTION_COMMIT -> Optional.empty();
case TSERVER_SHUTDOWN -> Optional.of(new TServerInstance(buffer.readUTF()));
};
}

@Override
public String toString() {
var buf = new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ void initialize(final ServerContext context, final String rootTabletDirName,
ZooUtil.NodeExistsPolicy.FAIL);
zrwChroot.putPersistentData(Constants.ZMANAGER_ASSISTANT_LOCK, EMPTY_BYTE_ARRAY,
ZooUtil.NodeExistsPolicy.FAIL);
zrwChroot.putPersistentData(Constants.ZSHUTTING_DOWN_TSERVERS, EMPTY_BYTE_ARRAY,
ZooUtil.NodeExistsPolicy.FAIL);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.cli.ServerOpts;
Expand Down Expand Up @@ -206,7 +207,6 @@ public class Manager extends AbstractServer
private final List<TabletGroupWatcher> watchers = new ArrayList<>();
final Map<TServerInstance,AtomicInteger> badServers =
Collections.synchronizedMap(new HashMap<>());
final Set<TServerInstance> serversToShutdown = Collections.synchronizedSet(new HashSet<>());
final EventCoordinator nextEvent = new EventCoordinator();
RecoveryManager recoveryManager = null;
private final ManagerTime timeKeeper;
Expand Down Expand Up @@ -557,10 +557,6 @@ protected Manager(ServerOpts opts,
aconf.getTimeInMillis(Property.MANAGER_RECOVERY_WAL_EXISTENCE_CACHE_TIME);
}

public TServerConnection getConnection(TServerInstance server) {
return tserverSet.getConnection(server);
}

void setManagerGoalState(ManagerGoalState state) {
try {
getContext().getZooSession().asReaderWriter().putPersistentData(Constants.ZMANAGER_GOAL_STATE,
Expand Down Expand Up @@ -667,14 +663,15 @@ public void run() {
}

boolean canBalance(DataLevel dataLevel, TServerStatus tServerStatus) {
Set<TServerInstance> serversToShutdown;
if (!badServers.isEmpty()) {
log.debug("not balancing {} because the balance information is out-of-date {}", dataLevel,
badServers.keySet());
return false;
} else if (getManagerGoalState() == ManagerGoalState.CLEAN_STOP) {
log.debug("not balancing {} because the manager is attempting to stop cleanly", dataLevel);
return false;
} else if (!serversToShutdown.isEmpty()) {
} else if (!(serversToShutdown = shutdownServers()).isEmpty()) {
log.debug("not balancing {} while shutting down servers {}", dataLevel, serversToShutdown);
return false;
} else {
Expand Down Expand Up @@ -766,7 +763,6 @@ public void run() {
log.debug("stopping {} tablet servers", currentServers.size());
for (TServerInstance server : currentServers) {
try {
serversToShutdown.add(server);
tserverSet.getConnection(server).fastHalt(primaryManagerLock);
} catch (TException e) {
// its probably down, and we don't care
Expand Down Expand Up @@ -1591,38 +1587,35 @@ public void update(LiveTServerSet current, Set<TServerInstance> deleted,
obit.delete(up.getHostPort());
}
}
for (TServerInstance dead : deleted) {
String cause = "unexpected failure";
if (serversToShutdown.contains(dead)) {
cause = "clean shutdown"; // maybe an incorrect assumption

if (!deleted.isEmpty()) {
// This set is read from zookeeper, so only get it if its actually needed
var serversToShutdown = shutdownServers();
for (TServerInstance dead : deleted) {
String cause = "unexpected failure";
if (serversToShutdown.contains(dead)) {
cause = "clean shutdown"; // maybe an incorrect assumption
}
if (!getManagerGoalState().equals(ManagerGoalState.CLEAN_STOP)) {
obit.post(dead.getHostPort(), cause);
}
}
if (!getManagerGoalState().equals(ManagerGoalState.CLEAN_STOP)) {
obit.post(dead.getHostPort(), cause);

Set<TServerInstance> unexpected = new HashSet<>(deleted);
unexpected.removeAll(serversToShutdown);
if (!unexpected.isEmpty()
&& (stillManager() && !getManagerGoalState().equals(ManagerGoalState.CLEAN_STOP))) {
log.warn("Lost servers {}", unexpected);
}
badServers.keySet().removeAll(deleted);
}

Set<TServerInstance> unexpected = new HashSet<>(deleted);
unexpected.removeAll(this.serversToShutdown);
if (!unexpected.isEmpty()
&& (stillManager() && !getManagerGoalState().equals(ManagerGoalState.CLEAN_STOP))) {
log.warn("Lost servers {}", unexpected);
}
serversToShutdown.removeAll(deleted);
badServers.keySet().removeAll(deleted);
// clear out any bad server with the same host/port as a new server
synchronized (badServers) {
cleanListByHostAndPort(badServers.keySet(), deleted, added);
}
synchronized (serversToShutdown) {
cleanListByHostAndPort(serversToShutdown, deleted, added);
}
nextEvent.event("There are now %d tablet servers", current.size());
}

// clear out any servers that are no longer current
// this is needed when we are using a fate operation to shutdown a tserver as it
// will continue to add the server to the serversToShutdown (ACCUMULO-4410)
serversToShutdown.retainAll(current.getCurrentServers());
}

static void cleanListByHostAndPort(Collection<TServerInstance> badServers,
Expand Down Expand Up @@ -1669,15 +1662,6 @@ public LiveTServersSnapshot tserversSnapshot() {
return tserverSet.getSnapshot();
}

// recovers state from the persistent transaction to shutdown a server
public boolean shutdownTServer(TServerInstance server) {
if (serversToShutdown.add(server)) {
nextEvent.event("Tablet Server shutdown requested for %s", server);
return true;
}
return false;
}

public EventCoordinator getEventCoordinator() {
return nextEvent;
}
Expand Down Expand Up @@ -1725,12 +1709,8 @@ public ManagerMonitorInfo getManagerMonitorInfo() {
result.state = getManagerState();
result.goalState = getManagerGoalState();
result.unassignedTablets = displayUnassigned();
result.serversShuttingDown = new HashSet<>();
synchronized (serversToShutdown) {
for (TServerInstance server : serversToShutdown) {
result.serversShuttingDown.add(server.getHostPort());
}
}
result.serversShuttingDown =
shutdownServers().stream().map(TServerInstance::getHostPort).collect(Collectors.toSet());
DeadServerList obit = new DeadServerList(getContext());
result.deadTabletServers = obit.getList();
return result;
Expand All @@ -1744,8 +1724,12 @@ public boolean delegationTokensAvailable() {
}

public Set<TServerInstance> shutdownServers() {
synchronized (serversToShutdown) {
return Set.copyOf(serversToShutdown);
try {
List<String> children =
getContext().getZooSession().asReader().getChildren(Constants.ZSHUTTING_DOWN_TSERVERS);
return children.stream().map(TServerInstance::new).collect(Collectors.toSet());
} catch (KeeperException | InterruptedException e) {
throw new RuntimeException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import org.apache.accumulo.core.Constants;
Expand Down Expand Up @@ -62,6 +64,7 @@
import org.apache.accumulo.core.fate.FateClient;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.FateKey;
import org.apache.accumulo.core.fate.TraceRepo;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.manager.state.tables.TableState;
Expand All @@ -85,7 +88,7 @@
import org.apache.accumulo.core.securityImpl.thrift.TDelegationTokenConfig;
import org.apache.accumulo.core.util.ByteBufferUtil;
import org.apache.accumulo.manager.tableOps.FateEnv;
import org.apache.accumulo.manager.tserverOps.ShutdownTServer;
import org.apache.accumulo.manager.tserverOps.BeginTserverShutdown;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.client.ClientServiceHandler;
import org.apache.accumulo.server.conf.store.NamespacePropKey;
Expand Down Expand Up @@ -336,16 +339,16 @@ public void shutdownTabletServer(TInfo info, TCredentials c, String tabletServer
}

FateClient<FateEnv> fate = manager.fateClient(FateInstanceType.META);
FateId fateId = fate.startTransaction();

String msg = "Shutdown tserver " + tabletServer;
var repo = new TraceRepo<>(
new BeginTserverShutdown(doomed, manager.tserverSet.getResourceGroup(doomed), force));

fate.seedTransaction(Fate.FateOperation.SHUTDOWN_TSERVER, fateId,
new TraceRepo<>(
new ShutdownTServer(doomed, manager.tserverSet.getResourceGroup(doomed), force)),
false, msg);
fate.waitForCompletion(fateId);
fate.delete(fateId);
CompletableFuture<Optional<FateId>> future;
try (var seeder = fate.beginSeeding()) {
future = seeder.attemptToSeedTransaction(Fate.FateOperation.SHUTDOWN_TSERVER,
FateKey.forShutdown(doomed), repo, true);
}
future.join().ifPresent(fate::waitForCompletion);

log.debug("FATE op shutting down " + tabletServer + " finished");
}
Expand All @@ -360,17 +363,15 @@ public void tabletServerStopping(TInfo tinfo, TCredentials credentials, String t
}
log.info("Tablet Server {} has reported it's shutting down", tabletServer);
var tserver = new TServerInstance(tabletServer);
if (manager.shutdownTServer(tserver)) {
// If there is an exception seeding the fate tx this should cause the RPC to fail which should
// cause the tserver to halt. Because of that not making an attempt to handle failure here.
FateClient<FateEnv> fate = manager.fateClient(FateInstanceType.META);
var tid = fate.startTransaction();
String msg = "Shutdown tserver " + tabletServer;
// If there is an exception seeding the fate tx this should cause the RPC to fail which should
// cause the tserver to halt. Because of that not making an attempt to handle failure here.
FateClient<FateEnv> fate = manager.fateClient(FateInstanceType.META);

fate.seedTransaction(Fate.FateOperation.SHUTDOWN_TSERVER, tid,
new TraceRepo<>(new ShutdownTServer(tserver, ResourceGroupId.of(resourceGroup), false)),
true, msg);
}
var repo = new TraceRepo<>(
new BeginTserverShutdown(tserver, ResourceGroupId.of(resourceGroup), false));
// only seed a new transaction if nothing is running for this tserver
fate.seedTransaction(Fate.FateOperation.SHUTDOWN_TSERVER, FateKey.forShutdown(tserver), repo,
true);
}

@Override
Expand Down
Loading
Loading