Skip to content

Commit 2613629

Browse files
committed
Reimplement the change without a sleep
Signed-off-by: davidradl <david_radley@uk.ibm.com>
1 parent 548cc0b commit 2613629

2 files changed

Lines changed: 17 additions & 10 deletions

File tree

flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import java.io.IOException;
5757
import java.util.Collections;
5858
import java.util.Iterator;
59+
import java.util.concurrent.CompletableFuture;
5960
import java.util.concurrent.ScheduledExecutorService;
6061
import java.util.function.Consumer;
6162

@@ -198,6 +199,13 @@ private static ResourceID offerSlots(SlotPool slotPool, int numSlots) {
198199
@Test
199200
void testCancelWhileFailing() throws Exception {
200201
try (SlotPool slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
202+
CompletableFuture<JobStatus> failingStatusFuture = new CompletableFuture<>();
203+
JobStatusListener jobStatusListener =
204+
(jobId, newJobStatus, timestamp) -> {
205+
if (newJobStatus == JobStatus.FAILING) {
206+
failingStatusFuture.complete(newJobStatus);
207+
}
208+
};
201209
SchedulerBase scheduler =
202210
new DefaultSchedulerBuilder(
203211
createJobGraph(),
@@ -207,6 +215,7 @@ void testCancelWhileFailing() throws Exception {
207215
createExecutionSlotAllocatorFactory(slotPool))
208216
.setRestartBackoffTimeStrategy(
209217
new TestRestartBackoffTimeStrategy(false, Long.MAX_VALUE))
218+
.setJobStatusListener(jobStatusListener)
210219
.build();
211220
ExecutionGraph graph = scheduler.getExecutionGraph();
212221

@@ -220,8 +229,8 @@ void testCancelWhileFailing() throws Exception {
220229

221230
scheduler.handleGlobalFailure(new Exception("test"));
222231

223-
// Give time for the failure to be processed
224-
Thread.sleep(10);
232+
// Wait for the state to transition to FAILING
233+
failingStatusFuture.join();
225234

226235
assertThat(graph.getState()).isEqualTo(JobStatus.FAILING);
227236

flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,9 @@ void testNoFinishedTaskButRatioIsZero() throws Exception {
7272
final int parallelism = 3;
7373
final JobVertex jobVertex = createNoOpVertex(parallelism);
7474
final ExecutionGraph executionGraph = createExecutionGraph(jobVertex);
75-
76-
final ExecutionTimeBasedSlowTaskDetector slowTaskDetector = createSlowTaskDetector(0, 1, 0);
77-
78-
// Sleep to ensure that the tasks have some execution time
79-
Thread.sleep(10);
75+
// set lower bounds to 10ms. 0 can introduce intermittent unit test failures.
76+
final ExecutionTimeBasedSlowTaskDetector slowTaskDetector =
77+
createSlowTaskDetector(0, 1, 10);
8078

8179
final Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks =
8280
slowTaskDetector.findSlowTasks(executionGraph);
@@ -251,8 +249,10 @@ void testBalancedInput() throws Exception {
251249
DistributionPattern.ALL_TO_ALL,
252250
ResultPartitionType.PIPELINED);
253251
final ExecutionGraph executionGraph = createExecutionGraph(jobVertex1, jobVertex2);
252+
// with lower bounds 0, the test intermittently fails, as the expected 2 tasks had sometimes
253+
// not yet started.
254254
final ExecutionTimeBasedSlowTaskDetector slowTaskDetector =
255-
createSlowTaskDetector(0.3, 1, 0);
255+
createSlowTaskDetector(0.3, 1, 10);
256256

257257
final ExecutionVertex ev21 =
258258
executionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[0];
@@ -264,8 +264,6 @@ void testBalancedInput() throws Exception {
264264
executionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[2];
265265
ev23.setInputBytes(1024);
266266

267-
// Sleep to ensure that the 3 tasks do actually start
268-
Thread.sleep(10);
269267
ev23.getCurrentExecutionAttempt().markFinished();
270268

271269
final Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks =

0 commit comments

Comments
 (0)