Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -466,11 +466,27 @@ private void sendStopRequest(int coordinatorEpoch) {
if (throwable != null) {
// todo: in FLUSS-55886145, we will introduce a sender thread to send
// the request.
// in here, we just ignore the error.
LOG.warn(
"Failed to send stop replica request to tablet server {}.",
serverId,
throwable);
// For delete replicas (delete=true && deleteRemote=true), we must emit
// a failure event so the retry/give-up logic in
// processDeleteReplicaResponseReceived can move the replica out of
// ReplicaDeletionStarted. Without this, replicas get permanently stuck
// in ReplicaDeletionStarted with no code path to recover.
if (!deletedReplicaBuckets.isEmpty()) {
List<DeleteReplicaResultForBucket> failedResults =
new ArrayList<>();
ApiError apiError = ApiError.fromThrowable(throwable);
for (TableBucket bucket : deletedReplicaBuckets) {
failedResults.add(
new DeleteReplicaResultForBucket(
bucket, serverId, apiError));
}
eventManager.put(
new DeleteReplicaResponseReceivedEvent(failedResults));
}
return;
}
// handle the response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,43 @@ public static LogTablet create(
Clock clock,
boolean isCleanShutdown)
throws Exception {
return create(
dataDir,
tablePath,
tabletDir,
conf,
new Configuration(),
serverMetricGroup,
recoveryPoint,
scheduler,
logFormat,
tieredLogLocalSegments,
isChangelog,
clock,
isCleanShutdown);
}

public static LogTablet create(
File dataDir,
PhysicalTablePath tablePath,
File tabletDir,
Configuration conf,
Configuration tableProperties,
TabletServerMetricGroup serverMetricGroup,
long recoveryPoint,
Scheduler scheduler,
LogFormat logFormat,
int tieredLogLocalSegments,
boolean isChangelog,
Clock clock,
boolean isCleanShutdown)
throws Exception {
// create the log directory if it doesn't exist
Files.createDirectories(tabletDir.toPath());

// Build a merged config where table-level log.* properties override server-level config.
conf = mergeLogConfig(conf, tableProperties);

TableBucket tableBucket = FlussPaths.parseTabletDir(tabletDir).f1;
LogSegments segments = new LogSegments(tableBucket);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@
import org.apache.fluss.metadata.TableBucketReplica;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePartition;
import org.apache.fluss.rpc.messages.StopReplicaRequest;
import org.apache.fluss.rpc.messages.StopReplicaResponse;
import org.apache.fluss.server.coordinator.event.CoordinatorEvent;
import org.apache.fluss.server.coordinator.event.DeleteReplicaResponseReceivedEvent;
import org.apache.fluss.server.coordinator.event.TestingEventManager;
import org.apache.fluss.server.coordinator.statemachine.ReplicaStateMachine;
import org.apache.fluss.server.coordinator.statemachine.TableBucketStateMachine;
import org.apache.fluss.server.entity.DeleteReplicaResultForBucket;
import org.apache.fluss.server.metadata.ServerInfo;
import org.apache.fluss.server.tablet.TestTabletServerGateway;
import org.apache.fluss.server.zk.NOPErrorHandler;
import org.apache.fluss.server.zk.ZkEpoch;
import org.apache.fluss.server.zk.ZooKeeperClient;
Expand All @@ -38,6 +41,7 @@
import org.apache.fluss.server.zk.data.PartitionAssignment;
import org.apache.fluss.server.zk.data.TableAssignment;
import org.apache.fluss.testutils.common.AllCallbackWrapper;
import org.apache.fluss.utils.types.Tuple2;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -56,6 +60,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

Expand Down Expand Up @@ -217,6 +222,115 @@ void testDeleteTable() throws Exception {
assertThat(coordinatorContext.getAllReplicasForTable(tableId)).isEmpty();
}

@Test
void testDeleteTableWithSendFailure() throws Exception {
// Create a table and set up for deletion
long tableId = zookeeperClient.getTableIdAndIncrement();
TableAssignment assignment = createAssignment();
zookeeperClient.registerTableAssignment(tableId, assignment);

coordinatorContext.putTableInfo(
TableInfo.of(
DATA1_TABLE_PATH_PK,
tableId,
0,
DATA1_TABLE_DESCRIPTOR_PK,
DEFAULT_REMOTE_DATA_DIR,
System.currentTimeMillis(),
System.currentTimeMillis()));
tableManager.onCreateNewTable(DATA1_TABLE_PATH_PK, tableId, assignment);

// Reconfigure all gateways to return a failed future (send-level throwable) for
// stopReplica. This simulates a network failure at send time, NOT a response-level error.
Map<Integer, org.apache.fluss.rpc.gateway.TabletServerGateway> throwingGateways =
new java.util.HashMap<>();
for (int serverId : coordinatorContext.liveTabletServerSet()) {
throwingGateways.put(
serverId,
new TestTabletServerGateway(false, Collections.emptySet()) {
@Override
public CompletableFuture<StopReplicaResponse> stopReplica(
StopReplicaRequest request) {
CompletableFuture<StopReplicaResponse> future =
new CompletableFuture<>();
future.completeExceptionally(
new RuntimeException("simulated network error"));
return future;
}
});
}
testCoordinatorChannelManager.setGateways(throwingGateways);

// Queue and start deletion. With the fix, send failures now emit
// DeleteReplicaResponseReceivedEvent with error results instead of silently ignoring them.
coordinatorContext.queueTableDeletion(Collections.singleton(tableId));
tableManager.onDeleteTable(tableId);

// Collect the failure events emitted by the fix and simulate the retry loop
// (as CoordinatorEventProcessor.processDeleteReplicaResponseReceived would do).
// After DELETE_TRY_TIMES failures, replicas are force-marked ReplicaDeletionSuccessful.
Set<TableBucketReplica> allReplicas = getReplicas(tableId, assignment);

for (int attempt = 0; attempt <= CoordinatorContext.DELETE_TRY_TIMES; attempt++) {
List<CoordinatorEvent> events = testingEventManager.getEvents();

// Collect all failure results from delete replica events
Set<TableBucketReplica> failedReplicas = new HashSet<>();
Set<TableBucketReplica> succeededReplicas = new HashSet<>();
for (CoordinatorEvent event : events) {
if (event instanceof DeleteReplicaResponseReceivedEvent) {
for (DeleteReplicaResultForBucket result :
((DeleteReplicaResponseReceivedEvent) event)
.getDeleteReplicaResults()) {
if (result.succeeded()) {
succeededReplicas.add(result.getTableBucketReplica());
} else {
failedReplicas.add(result.getTableBucketReplica());
}
}
}
}
testingEventManager.clearEvents();

coordinatorContext.clearFailDeleteNumbers(succeededReplicas);

// retryDeleteAndSuccessDeleteReplicas increments fail counters and returns replicas
// to retry vs. those that exceeded DELETE_TRY_TIMES and should be force-succeeded
Tuple2<Set<TableBucketReplica>, Set<TableBucketReplica>> retryAndSuccess =
coordinatorContext.retryDeleteAndSuccessDeleteReplicas(failedReplicas);

Set<TableBucketReplica> toRetry = retryAndSuccess.f0;
succeededReplicas.addAll(retryAndSuccess.f1);

// Mark force-succeeded replicas as ReplicaDeletionSuccessful
for (TableBucketReplica replica : succeededReplicas) {
coordinatorContext.putReplicaState(replica, ReplicaDeletionSuccessful);
}

if (toRetry.isEmpty()) {
break;
}

// Retry: re-send stop replica requests for replicas that haven't exhausted retries.
// onDeleteTable transitions them through OfflineReplica → ReplicaDeletionStarted again,
// which will again trigger send failures and emit new failure events.
tableManager.onDeleteTable(tableId);
}

// All replicas must now be in ReplicaDeletionSuccessful state
for (TableBucketReplica replica : allReplicas) {
assertThat(coordinatorContext.getReplicaState(replica))
.isEqualTo(ReplicaDeletionSuccessful);
}

// resumeDeletions should now complete the deletion (remove from ZK and context)
tableManager.resumeDeletions();
retry(
Duration.ofSeconds(30),
() -> assertThat(zookeeperClient.getTableAssignment(tableId)).isEmpty());
assertThat(coordinatorContext.getAllReplicasForTable(tableId)).isEmpty();
}

@Test
void testResumeDeletionAfterRestart() throws Exception {
// first, create a table
Expand Down