diff --git a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskScheduler.java b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskScheduler.java index ad06bf052a69..6552bc63d3fb 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskScheduler.java +++ b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskScheduler.java @@ -63,6 +63,7 @@ * @author Mark Fisher * @since 3.0 * @see #setPoolSize + * @see #setTaskStartupOverrunThreshold * @see #setRemoveOnCancelPolicy * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy @@ -92,6 +93,8 @@ public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport private Clock clock = Clock.systemDefaultZone(); + private volatile @Nullable Duration taskStartupOverrunThreshold; + private @Nullable ScheduledExecutorService scheduledExecutor; @@ -427,6 +430,38 @@ public ScheduledFuture scheduleWithFixedDelay(Runnable task, Duration delay) } + /** + * Check whether the given task is being executed significantly later than + * its scheduled time and, if a {@link #setTaskStartupOverrunThreshold threshold} + * is set, emit a {@code WARN}-level log message. + * + *

This hook is called by the underlying {@link ScheduledThreadPoolExecutor} + * immediately before each task is handed to a thread for execution. + * + * @param thread the thread that will run task {@code task} + * @param task the task that is about to be executed + * @since 7.0 + * @see #setTaskStartupOverrunThreshold(Duration) + */ + @Override + protected void beforeExecute(Thread thread, Runnable task) { + Duration threshold = this.taskStartupOverrunThreshold; + if (threshold != null && task instanceof RunnableScheduledFuture scheduledTask) { + long delayNanos = scheduledTask.getDelay(TimeUnit.NANOSECONDS); + if (delayNanos < 0) { + Duration overrun = Duration.ofNanos(-delayNanos); + if (overrun.compareTo(threshold) >= 0) { + if (logger.isWarnEnabled()) { + logger.warn("Task [" + task + "] is starting " + overrun.toMillis() + + "ms late (threshold: " + threshold.toMillis() + "ms). " + + "Consider increasing the thread pool size or reducing task duration."); + } + } + } + } + super.beforeExecute(thread, task); + } + private RunnableScheduledFuture decorateTaskIfNecessary(RunnableScheduledFuture future) { return (this.taskDecorator != null ? new DelegatingRunnableScheduledFuture<>(future, this.taskDecorator) : future); diff --git a/spring-context/src/test/java/org/springframework/scheduling/concurrent/ThreadPoolTaskSchedulerOverrunTests.java b/spring-context/src/test/java/org/springframework/scheduling/concurrent/ThreadPoolTaskSchedulerOverrunTests.java new file mode 100644 index 000000000000..5a6070bbe8f7 --- /dev/null +++ b/spring-context/src/test/java/org/springframework/scheduling/concurrent/ThreadPoolTaskSchedulerOverrunTests.java @@ -0,0 +1,135 @@ +/* + * Copyright 2002-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.scheduling.concurrent; + +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for the task startup overrun warning in {@link ThreadPoolTaskScheduler}. + * + * @author Naman Agrawal + * @since 7.0 + * @see ThreadPoolTaskScheduler#setTaskStartupOverrunThreshold(Duration) + */ +class ThreadPoolTaskSchedulerOverrunTests { + + private final ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); + + @AfterEach + void tearDown() { + scheduler.destroy(); + } + + @Test + void defaultThresholdIsNull() { + assertThat(scheduler.getTaskStartupOverrunThreshold()).isNull(); + } + + @Test + void thresholdIsStoredCorrectly() { + Duration threshold = Duration.ofMillis(500); + scheduler.setTaskStartupOverrunThreshold(threshold); + assertThat(scheduler.getTaskStartupOverrunThreshold()).isEqualTo(threshold); + } + + @Test + void thresholdCanBeReset() { + scheduler.setTaskStartupOverrunThreshold(Duration.ofSeconds(1)); + scheduler.setTaskStartupOverrunThreshold(null); + assertThat(scheduler.getTaskStartupOverrunThreshold()).isNull(); + } + + @Test + void taskExecutesNormallyWithThresholdSet() throws InterruptedException { + scheduler.setTaskStartupOverrunThreshold(Duration.ofMillis(100)); + scheduler.initialize(); + + CountDownLatch latch = new CountDownLatch(1); + scheduler.schedule(latch::countDown, Instant.now().plusMillis(50)); + + assertThat(latch.await(2, TimeUnit.SECONDS)) + .as("Task should execute normally even with threshold configured") + .isTrue(); + } + + @Test + void taskExecutesNormallyWithoutThreshold() throws InterruptedException { + scheduler.initialize(); + + CountDownLatch latch = new CountDownLatch(1); + scheduler.schedule(latch::countDown, Instant.now().plusMillis(50)); + + assertThat(latch.await(2, TimeUnit.SECONDS)) + .as("Task should execute normally without threshold configured") + .isTrue(); + } + + @Test + void overrunIsDetectedWhenTaskStartsLate() throws InterruptedException { + // Pool size 1 so the first long task blocks the second task from starting on time + scheduler.setPoolSize(1); + scheduler.setTaskStartupOverrunThreshold(Duration.ofMillis(100)); + scheduler.initialize(); + + CountDownLatch blocker = new CountDownLatch(1); + CountDownLatch blocked = new CountDownLatch(1); + + // Schedule a task immediately that holds the single thread for 500 ms + scheduler.schedule(() -> { + try { + Thread.sleep(500); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + finally { + blocker.countDown(); + } + }, Instant.now()); + + // Schedule a second task 50 ms from now - it will be at least 450 ms late + scheduler.schedule(blocked::countDown, Instant.now().plusMillis(50)); + + // Both should still complete despite the overrun + assertThat(blocker.await(3, TimeUnit.SECONDS)).isTrue(); + assertThat(blocked.await(3, TimeUnit.SECONDS)).isTrue(); + } + + @Test + void noOverrunWhenTaskStartsOnTime() throws InterruptedException { + scheduler.setPoolSize(2); // enough threads so nothing is starved + scheduler.setTaskStartupOverrunThreshold(Duration.ofMillis(100)); + scheduler.initialize(); + + CountDownLatch latch = new CountDownLatch(2); + + // Both tasks should start on time with 2 threads + scheduler.schedule(latch::countDown, Instant.now().plusMillis(50)); + scheduler.schedule(latch::countDown, Instant.now().plusMillis(50)); + + assertThat(latch.await(2, TimeUnit.SECONDS)).isTrue(); + } + +}