diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java index a3fd7afbd5..3f49ab4e6f 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java @@ -59,6 +59,7 @@ import java.util.ArrayDeque; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; @@ -101,6 +102,9 @@ final class ServerConnection { @GuardedBy("lock") private int retryAuthCount = 0; + @GuardedBy("lock") + private ScheduledFuture authRetryTask = null; + ServerConnection( Bootstrap bootstrap, ServerNode node, @@ -184,6 +188,12 @@ private CompletableFuture close(Throwable cause) { channel.close(); } + // Cancel any pending authentication retry task + if (authRetryTask != null) { + authRetryTask.cancel(false); + authRetryTask = null; + } + // TODO all return completeExceptionally will let some test cases blocked, so we // need to find why the test cases are blocked and remove the if statement. if (cause instanceof ClosedChannelException @@ -452,11 +462,16 @@ private void handleAuthenticateResponse(ApiMessage response, Throwable cause) { if (cause != null) { if (cause instanceof RetriableAuthenticationException) { LOG.warn("Authentication failed, retrying {} times", retryAuthCount, cause); - channel.eventLoop() - .schedule( - this::sendInitialToken, - backoff.backoff(retryAuthCount++), - TimeUnit.MILLISECONDS); + // Cancel any existing auth retry task before scheduling a new one + if (authRetryTask != null) { + authRetryTask.cancel(false); + } + authRetryTask = + channel.eventLoop() + .schedule( + this::sendInitialToken, + backoff.backoff(retryAuthCount++), + TimeUnit.MILLISECONDS); } else { close(cause); }