diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java index 3441053671..9bdf5c2492 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java @@ -466,11 +466,27 @@ private void sendStopRequest(int coordinatorEpoch) { if (throwable != null) { // todo: in FLUSS-55886145, we will introduce a sender thread to send // the request. - // in here, we just ignore the error. LOG.warn( "Failed to send stop replica request to tablet server {}.", serverId, throwable); + // For delete replicas (delete=true && deleteRemote=true), we must emit + // a failure event so the retry/give-up logic in + // processDeleteReplicaResponseReceived can move the replica out of + // ReplicaDeletionStarted. Without this, replicas get permanently stuck + // in ReplicaDeletionStarted with no code path to recover. + if (!deletedReplicaBuckets.isEmpty()) { + List failedResults = + new ArrayList<>(); + ApiError apiError = ApiError.fromThrowable(throwable); + for (TableBucket bucket : deletedReplicaBuckets) { + failedResults.add( + new DeleteReplicaResultForBucket( + bucket, serverId, apiError)); + } + eventManager.put( + new DeleteReplicaResponseReceivedEvent(failedResults)); + } return; } // handle the response diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java index 23c60c82e1..1459e2def8 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java @@ -316,9 +316,43 @@ public static LogTablet create( Clock clock, boolean isCleanShutdown) throws Exception { + return create( + dataDir, + tablePath, + tabletDir, + conf, + new Configuration(), + serverMetricGroup, + recoveryPoint, + scheduler, + logFormat, + tieredLogLocalSegments, + isChangelog, + clock, + isCleanShutdown); + } + + public static LogTablet create( + File dataDir, + PhysicalTablePath tablePath, + File tabletDir, + Configuration conf, + Configuration tableProperties, + TabletServerMetricGroup serverMetricGroup, + long recoveryPoint, + Scheduler scheduler, + LogFormat logFormat, + int tieredLogLocalSegments, + boolean isChangelog, + Clock clock, + boolean isCleanShutdown) + throws Exception { // create the log directory if it doesn't exist Files.createDirectories(tabletDir.toPath()); + // Build a merged config where table-level log.* properties override server-level config. + conf = mergeLogConfig(conf, tableProperties); + TableBucket tableBucket = FlussPaths.parseTabletDir(tabletDir).f1; LogSegments segments = new LogSegments(tableBucket); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerTest.java index f5357c549f..3b21332dbb 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerTest.java @@ -23,6 +23,8 @@ import org.apache.fluss.metadata.TableBucketReplica; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePartition; +import org.apache.fluss.rpc.messages.StopReplicaRequest; +import org.apache.fluss.rpc.messages.StopReplicaResponse; import org.apache.fluss.server.coordinator.event.CoordinatorEvent; import org.apache.fluss.server.coordinator.event.DeleteReplicaResponseReceivedEvent; import org.apache.fluss.server.coordinator.event.TestingEventManager; @@ -30,6 +32,7 @@ import org.apache.fluss.server.coordinator.statemachine.TableBucketStateMachine; import org.apache.fluss.server.entity.DeleteReplicaResultForBucket; import org.apache.fluss.server.metadata.ServerInfo; +import org.apache.fluss.server.tablet.TestTabletServerGateway; import org.apache.fluss.server.zk.NOPErrorHandler; import org.apache.fluss.server.zk.ZkEpoch; import org.apache.fluss.server.zk.ZooKeeperClient; @@ -38,6 +41,7 @@ import org.apache.fluss.server.zk.data.PartitionAssignment; import org.apache.fluss.server.zk.data.TableAssignment; import org.apache.fluss.testutils.common.AllCallbackWrapper; +import org.apache.fluss.utils.types.Tuple2; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -56,6 +60,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -217,6 +222,115 @@ void testDeleteTable() throws Exception { assertThat(coordinatorContext.getAllReplicasForTable(tableId)).isEmpty(); } + @Test + void testDeleteTableWithSendFailure() throws Exception { + // Create a table and set up for deletion + long tableId = zookeeperClient.getTableIdAndIncrement(); + TableAssignment assignment = createAssignment(); + zookeeperClient.registerTableAssignment(tableId, assignment); + + coordinatorContext.putTableInfo( + TableInfo.of( + DATA1_TABLE_PATH_PK, + tableId, + 0, + DATA1_TABLE_DESCRIPTOR_PK, + DEFAULT_REMOTE_DATA_DIR, + System.currentTimeMillis(), + System.currentTimeMillis())); + tableManager.onCreateNewTable(DATA1_TABLE_PATH_PK, tableId, assignment); + + // Reconfigure all gateways to return a failed future (send-level throwable) for + // stopReplica. This simulates a network failure at send time, NOT a response-level error. + Map throwingGateways = + new java.util.HashMap<>(); + for (int serverId : coordinatorContext.liveTabletServerSet()) { + throwingGateways.put( + serverId, + new TestTabletServerGateway(false, Collections.emptySet()) { + @Override + public CompletableFuture stopReplica( + StopReplicaRequest request) { + CompletableFuture future = + new CompletableFuture<>(); + future.completeExceptionally( + new RuntimeException("simulated network error")); + return future; + } + }); + } + testCoordinatorChannelManager.setGateways(throwingGateways); + + // Queue and start deletion. With the fix, send failures now emit + // DeleteReplicaResponseReceivedEvent with error results instead of silently ignoring them. + coordinatorContext.queueTableDeletion(Collections.singleton(tableId)); + tableManager.onDeleteTable(tableId); + + // Collect the failure events emitted by the fix and simulate the retry loop + // (as CoordinatorEventProcessor.processDeleteReplicaResponseReceived would do). + // After DELETE_TRY_TIMES failures, replicas are force-marked ReplicaDeletionSuccessful. + Set allReplicas = getReplicas(tableId, assignment); + + for (int attempt = 0; attempt <= CoordinatorContext.DELETE_TRY_TIMES; attempt++) { + List events = testingEventManager.getEvents(); + + // Collect all failure results from delete replica events + Set failedReplicas = new HashSet<>(); + Set succeededReplicas = new HashSet<>(); + for (CoordinatorEvent event : events) { + if (event instanceof DeleteReplicaResponseReceivedEvent) { + for (DeleteReplicaResultForBucket result : + ((DeleteReplicaResponseReceivedEvent) event) + .getDeleteReplicaResults()) { + if (result.succeeded()) { + succeededReplicas.add(result.getTableBucketReplica()); + } else { + failedReplicas.add(result.getTableBucketReplica()); + } + } + } + } + testingEventManager.clearEvents(); + + coordinatorContext.clearFailDeleteNumbers(succeededReplicas); + + // retryDeleteAndSuccessDeleteReplicas increments fail counters and returns replicas + // to retry vs. those that exceeded DELETE_TRY_TIMES and should be force-succeeded + Tuple2, Set> retryAndSuccess = + coordinatorContext.retryDeleteAndSuccessDeleteReplicas(failedReplicas); + + Set toRetry = retryAndSuccess.f0; + succeededReplicas.addAll(retryAndSuccess.f1); + + // Mark force-succeeded replicas as ReplicaDeletionSuccessful + for (TableBucketReplica replica : succeededReplicas) { + coordinatorContext.putReplicaState(replica, ReplicaDeletionSuccessful); + } + + if (toRetry.isEmpty()) { + break; + } + + // Retry: re-send stop replica requests for replicas that haven't exhausted retries. + // onDeleteTable transitions them through OfflineReplica → ReplicaDeletionStarted again, + // which will again trigger send failures and emit new failure events. + tableManager.onDeleteTable(tableId); + } + + // All replicas must now be in ReplicaDeletionSuccessful state + for (TableBucketReplica replica : allReplicas) { + assertThat(coordinatorContext.getReplicaState(replica)) + .isEqualTo(ReplicaDeletionSuccessful); + } + + // resumeDeletions should now complete the deletion (remove from ZK and context) + tableManager.resumeDeletions(); + retry( + Duration.ofSeconds(30), + () -> assertThat(zookeeperClient.getTableAssignment(tableId)).isEmpty()); + assertThat(coordinatorContext.getAllReplicasForTable(tableId)).isEmpty(); + } + @Test void testResumeDeletionAfterRestart() throws Exception { // first, create a table