Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
* @author Mark Fisher
* @since 3.0
* @see #setPoolSize
* @see #setTaskStartupOverrunThreshold
* @see #setRemoveOnCancelPolicy
* @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
* @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
Expand Down Expand Up @@ -92,6 +93,8 @@ public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport

private Clock clock = Clock.systemDefaultZone();

private volatile @Nullable Duration taskStartupOverrunThreshold;

private @Nullable ScheduledExecutorService scheduledExecutor;


Expand Down Expand Up @@ -427,6 +430,38 @@ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Duration delay)
}


/**
* Check whether the given task is being executed significantly later than
* its scheduled time and, if a {@link #setTaskStartupOverrunThreshold threshold}
* is set, emit a {@code WARN}-level log message.
*
* <p>This hook is called by the underlying {@link ScheduledThreadPoolExecutor}
* immediately before each task is handed to a thread for execution.
*
* @param thread the thread that will run task {@code task}
* @param task the task that is about to be executed
* @since 7.0
* @see #setTaskStartupOverrunThreshold(Duration)
*/
@Override
protected void beforeExecute(Thread thread, Runnable task) {
Duration threshold = this.taskStartupOverrunThreshold;
if (threshold != null && task instanceof RunnableScheduledFuture<?> scheduledTask) {
long delayNanos = scheduledTask.getDelay(TimeUnit.NANOSECONDS);
if (delayNanos < 0) {
Duration overrun = Duration.ofNanos(-delayNanos);
if (overrun.compareTo(threshold) >= 0) {
if (logger.isWarnEnabled()) {
logger.warn("Task [" + task + "] is starting " + overrun.toMillis() +
"ms late (threshold: " + threshold.toMillis() + "ms). " +
"Consider increasing the thread pool size or reducing task duration.");
}
}
}
}
super.beforeExecute(thread, task);
}

private <V> RunnableScheduledFuture<V> decorateTaskIfNecessary(RunnableScheduledFuture<V> future) {
return (this.taskDecorator != null ? new DelegatingRunnableScheduledFuture<>(future, this.taskDecorator) :
future);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright 2002-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.scheduling.concurrent;

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

/**
* Tests for the task startup overrun warning in {@link ThreadPoolTaskScheduler}.
*
* @author Naman Agrawal
* @since 7.0
* @see ThreadPoolTaskScheduler#setTaskStartupOverrunThreshold(Duration)
*/
class ThreadPoolTaskSchedulerOverrunTests {

private final ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();

@AfterEach
void tearDown() {
scheduler.destroy();
}

@Test
void defaultThresholdIsNull() {
assertThat(scheduler.getTaskStartupOverrunThreshold()).isNull();
}

@Test
void thresholdIsStoredCorrectly() {
Duration threshold = Duration.ofMillis(500);
scheduler.setTaskStartupOverrunThreshold(threshold);
assertThat(scheduler.getTaskStartupOverrunThreshold()).isEqualTo(threshold);
}

@Test
void thresholdCanBeReset() {
scheduler.setTaskStartupOverrunThreshold(Duration.ofSeconds(1));
scheduler.setTaskStartupOverrunThreshold(null);
assertThat(scheduler.getTaskStartupOverrunThreshold()).isNull();
}

@Test
void taskExecutesNormallyWithThresholdSet() throws InterruptedException {
scheduler.setTaskStartupOverrunThreshold(Duration.ofMillis(100));
scheduler.initialize();

CountDownLatch latch = new CountDownLatch(1);
scheduler.schedule(latch::countDown, Instant.now().plusMillis(50));

assertThat(latch.await(2, TimeUnit.SECONDS))
.as("Task should execute normally even with threshold configured")
.isTrue();
}

@Test
void taskExecutesNormallyWithoutThreshold() throws InterruptedException {
scheduler.initialize();

CountDownLatch latch = new CountDownLatch(1);
scheduler.schedule(latch::countDown, Instant.now().plusMillis(50));

assertThat(latch.await(2, TimeUnit.SECONDS))
.as("Task should execute normally without threshold configured")
.isTrue();
}

@Test
void overrunIsDetectedWhenTaskStartsLate() throws InterruptedException {
// Pool size 1 so the first long task blocks the second task from starting on time
scheduler.setPoolSize(1);
scheduler.setTaskStartupOverrunThreshold(Duration.ofMillis(100));
scheduler.initialize();

CountDownLatch blocker = new CountDownLatch(1);
CountDownLatch blocked = new CountDownLatch(1);

// Schedule a task immediately that holds the single thread for 500 ms
scheduler.schedule(() -> {
try {
Thread.sleep(500);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
finally {
blocker.countDown();
}
}, Instant.now());

// Schedule a second task 50 ms from now - it will be at least 450 ms late
scheduler.schedule(blocked::countDown, Instant.now().plusMillis(50));

// Both should still complete despite the overrun
assertThat(blocker.await(3, TimeUnit.SECONDS)).isTrue();
assertThat(blocked.await(3, TimeUnit.SECONDS)).isTrue();
}

@Test
void noOverrunWhenTaskStartsOnTime() throws InterruptedException {
scheduler.setPoolSize(2); // enough threads so nothing is starved
scheduler.setTaskStartupOverrunThreshold(Duration.ofMillis(100));
scheduler.initialize();

CountDownLatch latch = new CountDownLatch(2);

// Both tasks should start on time with 2 threads
scheduler.schedule(latch::countDown, Instant.now().plusMillis(50));
scheduler.schedule(latch::countDown, Instant.now().plusMillis(50));

assertThat(latch.await(2, TimeUnit.SECONDS)).isTrue();
}

}