2323import org .apache .flink .configuration .Configuration ;
2424import org .apache .flink .configuration .SlowTaskDetectorOptions ;
2525import org .apache .flink .runtime .concurrent .ComponentMainThreadExecutorServiceAdapter ;
26- import org .apache .flink .runtime .execution .ExecutionState ;
2726import org .apache .flink .runtime .executiongraph .ExecutionAttemptID ;
2827import org .apache .flink .runtime .executiongraph .ExecutionGraph ;
2928import org .apache .flink .runtime .executiongraph .ExecutionGraphTestUtils ;
@@ -458,15 +457,6 @@ void testHandleNotifySlowTasksException() throws Exception {
458457 private ExecutionGraph createExecutionGraph (JobVertex ... jobVertices ) throws Exception {
459458 final JobGraph jobGraph = JobGraphTestUtils .streamingJobGraph (jobVertices );
460459
461- // Calculate total number of vertices
462- int totalVertices = 0 ;
463- for (JobVertex jobVertex : jobVertices ) {
464- totalVertices += jobVertex .getParallelism ();
465- }
466- final int expectedVertexCount = totalVertices ;
467-
468- CompletableFuture <Void > allVerticesRunningFuture = new CompletableFuture <>();
469-
470460 final SchedulerBase scheduler =
471461 SchedulerTestingUtils .createScheduler (
472462 jobGraph ,
@@ -475,33 +465,9 @@ private ExecutionGraph createExecutionGraph(JobVertex... jobVertices) throws Exc
475465
476466 final ExecutionGraph executionGraph = scheduler .getExecutionGraph ();
477467
478- // Create job status listener that checks when all vertices are running
479- JobStatusListener jobStatusListener =
480- (jobId , newJobStatus , timestamp ) -> {
481- if (newJobStatus == JobStatus .RUNNING ) {
482- // Only complete the future when all vertices are in RUNNING state
483- long runningCount =
484- executionGraph .getAllExecutionVertices ().stream ()
485- .filter (
486- v ->
487- v .getCurrentExecutionAttempt ().getState ()
488- == ExecutionState .RUNNING )
489- .count ();
490- if (runningCount >= expectedVertexCount ) {
491- allVerticesRunningFuture .complete (null );
492- }
493- }
494- };
495-
496- // Register the listener
497- executionGraph .registerJobStatusListener (jobStatusListener );
498-
499468 scheduler .startScheduling ();
500469 ExecutionGraphTestUtils .switchAllVerticesToRunning (executionGraph );
501470
502- // Wait for all vertices to be in RUNNING state
503- allVerticesRunningFuture .join ();
504-
505471 return executionGraph ;
506472 }
507473
0 commit comments