Skip to content

Commit 415c560

Browse files
committed
fix SessionDispatcherLeaderProcess and its test
1 parent a138f32 commit 415c560

2 files changed

Lines changed: 79 additions & 21 deletions

File tree

flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@
5050
import java.util.ArrayList;
5151
import java.util.Collection;
5252
import java.util.Collections;
53+
import java.util.HashMap;
54+
import java.util.Map;
5355
import java.util.Optional;
5456
import java.util.Set;
5557
import java.util.UUID;
@@ -309,23 +311,45 @@ private Collection<AbstractApplication> recoverApplications(
309311
log.info("Recover all persisted applications that are not finished, yet.");
310312
final Collection<ApplicationID> applicationIds = getApplicationIds();
311313
final Collection<AbstractApplication> recoveredApplications = new ArrayList<>();
312-
final Collection<JobInfo> recoveredJobInfos =
313-
recoveredJobs.stream()
314-
.map(
315-
executionPlan ->
316-
new JobInfoImpl(
317-
executionPlan.getJobID(), executionPlan.getName()))
318-
.collect(Collectors.toList());
319-
final Collection<JobInfo> recoveredTerminalJobInfos =
320-
dirtyJobResults.stream()
321-
.map(
322-
jobResult ->
323-
new JobInfoImpl(
324-
jobResult.getJobId(), jobResult.getJobName()))
325-
.collect(Collectors.toList());
314+
315+
final Map<ApplicationID, Collection<JobInfo>> recoveredJobInfosByApplication =
316+
new HashMap<>();
317+
for (ExecutionPlan executionPlan : recoveredJobs) {
318+
ApplicationID applicationId =
319+
executionPlan
320+
.getApplicationId()
321+
.orElseThrow(
322+
() ->
323+
new IllegalStateException(
324+
"Application ID is missing in the recovered execution plan. This suggests the job was submitted through an unsupported or incomplete path."));
325+
recoveredJobInfosByApplication
326+
.computeIfAbsent(applicationId, k -> new ArrayList<>())
327+
.add(new JobInfoImpl(executionPlan.getJobID(), executionPlan.getName()));
328+
}
329+
330+
final Map<ApplicationID, Collection<JobInfo>> recoveredTerminalJobInfosByApplication =
331+
new HashMap<>();
332+
for (JobResult jobResult : dirtyJobResults) {
333+
ApplicationID applicationId =
334+
jobResult
335+
.getApplicationId()
336+
.orElseThrow(
337+
() ->
338+
new IllegalStateException(
339+
"Application ID is missing in the recovered job result. This suggests the job was submitted through an unsupported or incomplete path."));
340+
recoveredTerminalJobInfosByApplication
341+
.computeIfAbsent(applicationId, k -> new ArrayList<>())
342+
.add(new JobInfoImpl(jobResult.getJobId(), jobResult.getJobName()));
343+
}
326344

327345
for (ApplicationID applicationId : applicationIds) {
328346
if (!recoveredDirtyJobResults.contains(applicationId)) {
347+
Collection<JobInfo> recoveredJobInfos =
348+
recoveredJobInfosByApplication.getOrDefault(
349+
applicationId, Collections.emptyList());
350+
Collection<JobInfo> recoveredTerminalJobInfos =
351+
recoveredTerminalJobInfosByApplication.getOrDefault(
352+
applicationId, Collections.emptyList());
329353
tryRecoverApplication(applicationId, recoveredJobInfos, recoveredTerminalJobInfos)
330354
.ifPresent(recoveredApplications::add);
331355
} else {

flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818

1919
package org.apache.flink.runtime.dispatcher.runner;
2020

21+
import org.apache.flink.api.common.ApplicationID;
2122
import org.apache.flink.api.common.JobID;
2223
import org.apache.flink.core.testutils.FlinkAssertions;
2324
import org.apache.flink.core.testutils.OneShotLatch;
25+
import org.apache.flink.runtime.application.SingleJobApplication;
2426
import org.apache.flink.runtime.blob.BlobServer;
2527
import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
2628
import org.apache.flink.runtime.client.JobSubmissionException;
@@ -32,6 +34,9 @@
3234
import org.apache.flink.runtime.jobmanager.ExecutionPlanStore;
3335
import org.apache.flink.runtime.jobmaster.JobResult;
3436
import org.apache.flink.runtime.messages.Acknowledge;
37+
import org.apache.flink.runtime.messages.FlinkApplicationNotFoundException;
38+
import org.apache.flink.runtime.testutils.TestingApplicationResultStore;
39+
import org.apache.flink.runtime.testutils.TestingApplicationStore;
3540
import org.apache.flink.runtime.testutils.TestingExecutionPlanStore;
3641
import org.apache.flink.runtime.testutils.TestingJobResultStore;
3742
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
@@ -96,13 +101,16 @@ class SessionDispatcherLeaderProcessTest {
96101
@BeforeAll
97102
static void setupClass() {
98103
ioExecutor = Executors.newSingleThreadExecutor();
104+
JOB_GRAPH.setApplicationId(ApplicationID.fromHexString(JOB_GRAPH.getJobID().toHexString()));
99105
}
100106

101107
@BeforeEach
102108
void setup() {
103109
fatalErrorHandler = new TestingFatalErrorHandler();
104110
executionPlanStore = TestingExecutionPlanStore.newBuilder().build();
105111
jobResultStore = TestingJobResultStore.builder().build();
112+
applicationStore = TestingApplicationStore.newBuilder().build();
113+
applicationResultStore = TestingApplicationResultStore.builder().build();
106114
dispatcherServiceFactory =
107115
createFactoryBasedOnGenericSupplier(
108116
() -> TestingDispatcherGatewayService.newBuilder().build());
@@ -183,6 +191,7 @@ void testRecoveryWithMultipleExecutionPlansAndOneMatchingDirtyJobResult() throws
183191
final JobResult matchingDirtyJobResult =
184192
TestingJobResultStore.createSuccessfulJobResult(JOB_GRAPH.getJobID());
185193
final ExecutionPlan otherExecutionPlan = JobGraphTestUtils.emptyJobGraph();
194+
otherExecutionPlan.setApplicationId(JOB_GRAPH.getApplicationId().get());
186195

187196
testJobRecovery(
188197
Arrays.asList(otherExecutionPlan, JOB_GRAPH),
@@ -566,11 +575,27 @@ void onAddedExecutionPlan_submitsRecoveredJob() throws Exception {
566575
final CompletableFuture<ExecutionPlan> submittedJobFuture = new CompletableFuture<>();
567576
final TestingDispatcherGateway testingDispatcherGateway =
568577
TestingDispatcherGateway.newBuilder()
569-
.setSubmitFunction(
570-
submittedJob -> {
571-
submittedJobFuture.complete(submittedJob);
572-
return CompletableFuture.completedFuture(Acknowledge.get());
578+
.setRequestApplicationFunction(
579+
applicationId ->
580+
FutureUtils.completedExceptionally(
581+
new FlinkApplicationNotFoundException(
582+
applicationId)))
583+
.setSubmitApplicationFunction(
584+
application -> {
585+
if (application instanceof SingleJobApplication) {
586+
ExecutionPlan submittedJob =
587+
((SingleJobApplication) application)
588+
.getExecutionPlan();
589+
submittedJobFuture.complete(submittedJob);
590+
return CompletableFuture.completedFuture(Acknowledge.get());
591+
}
592+
return FutureUtils.completedExceptionally(
593+
new UnsupportedOperationException());
573594
})
595+
.setSubmitFunction(
596+
ignored ->
597+
FutureUtils.completedExceptionally(
598+
new UnsupportedOperationException()))
574599
.build();
575600

576601
dispatcherServiceFactory =
@@ -715,11 +740,20 @@ private void runJobRecoveryFailureTest(FlinkException testException) throws Exce
715740
void onAddedExecutionPlan_failingRecoveredJobSubmission_failsFatally() throws Exception {
716741
final TestingDispatcherGateway dispatcherGateway =
717742
TestingDispatcherGateway.newBuilder()
718-
.setSubmitFunction(
719-
jobGraph ->
743+
.setRequestApplicationFunction(
744+
applicationId ->
745+
FutureUtils.completedExceptionally(
746+
new FlinkApplicationNotFoundException(
747+
applicationId)))
748+
.setSubmitApplicationFunction(
749+
application ->
720750
FutureUtils.completedExceptionally(
721751
new JobSubmissionException(
722-
jobGraph.getJobID(), "test exception")))
752+
JOB_GRAPH.getJobID(), "test exception")))
753+
.setSubmitFunction(
754+
ignored ->
755+
FutureUtils.completedExceptionally(
756+
new UnsupportedOperationException()))
723757
.build();
724758

725759
runOnAddedExecutionPlanTest(

0 commit comments

Comments
 (0)