Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.ArrayDeque;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;

Expand Down Expand Up @@ -101,6 +102,9 @@ final class ServerConnection {
@GuardedBy("lock")
private int retryAuthCount = 0;

@GuardedBy("lock")
private ScheduledFuture<?> authRetryTask = null;

ServerConnection(
Bootstrap bootstrap,
ServerNode node,
Expand Down Expand Up @@ -184,6 +188,12 @@ private CompletableFuture<Void> close(Throwable cause) {
channel.close();
}

// Cancel any pending authentication retry task
if (authRetryTask != null) {
authRetryTask.cancel(false);
authRetryTask = null;
}

// TODO all return completeExceptionally will let some test cases blocked, so we
// need to find why the test cases are blocked and remove the if statement.
if (cause instanceof ClosedChannelException
Expand Down Expand Up @@ -452,11 +462,16 @@ private void handleAuthenticateResponse(ApiMessage response, Throwable cause) {
if (cause != null) {
if (cause instanceof RetriableAuthenticationException) {
LOG.warn("Authentication failed, retrying {} times", retryAuthCount, cause);
channel.eventLoop()
.schedule(
this::sendInitialToken,
backoff.backoff(retryAuthCount++),
TimeUnit.MILLISECONDS);
// Cancel any existing auth retry task before scheduling a new one
if (authRetryTask != null) {
authRetryTask.cancel(false);
}
authRetryTask =
channel.eventLoop()
.schedule(
this::sendInitialToken,
backoff.backoff(retryAuthCount++),
TimeUnit.MILLISECONDS);
} else {
close(cause);
}
Expand Down