Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ private JobResult(

checkArgument(netRuntime >= 0, "netRuntime must be greater than or equals 0");
checkArgument(
jobStatus == null || jobStatus.isGloballyTerminalState(),
"jobStatus must be globally terminal or unknow(null)");
jobStatus == null || jobStatus.isTerminalState(),
"jobStatus must be terminal or null");

this.jobId = requireNonNull(jobId);
this.jobStatus = jobStatus;
Expand Down Expand Up @@ -147,12 +147,27 @@ public JobExecutionResult toJobExecutionResult(ClassLoader classLoader)
exception = new JobExecutionException(jobId, "Job execution failed.", cause);
} else if (jobStatus == JobStatus.CANCELED) {
exception = new JobCancellationException(jobId, "Job was cancelled.", cause);
} else {
} else if (jobStatus == JobStatus.SUSPENDED) {
exception =
new JobExecutionException(
jobId,
"Job completed with illegal status: " + jobStatus + '.',
"Job is in state SUSPENDED. This commonly happens when the "
+ "JobManager lost leadership. The job may recover "
+ "automatically if High Availability and a persistent "
+ "job store are configured. If recovery is not possible "
+ "(e.g., non-persistent ExecutionPlanStore), the job "
+ "needs to be resubmitted.",
cause);
} else {
final String statusMessage =
jobStatus != null
? "Job completed with unexpected status: " + jobStatus
: "Job completed with unknown status";
final String message =
cause != null
? statusMessage + "."
: statusMessage + " (no cause provided).";
exception = new JobExecutionException(jobId, message, cause);
}

throw exception;
Expand Down Expand Up @@ -234,7 +249,8 @@ public static JobResult createFrom(AccessExecutionGraph accessExecutionGraph) {
final JobResult.Builder builder = new JobResult.Builder();
builder.jobId(jobId);

builder.jobStatus(jobStatus.isGloballyTerminalState() ? jobStatus : null);
// Store the actual terminal status (including locally terminal states like SUSPENDED)
builder.jobStatus(jobStatus);

final long netRuntime =
accessExecutionGraph.getStatusTimestamp(jobStatus)
Expand All @@ -249,6 +265,12 @@ public static JobResult createFrom(AccessExecutionGraph accessExecutionGraph) {
checkNotNull(errorInfo, "No root cause is found for the job failure.");

builder.serializedThrowable(errorInfo.getException());
} else if (jobStatus == JobStatus.SUSPENDED) {
// SUSPENDED jobs may or may not have a failure cause
final ErrorInfo errorInfo = accessExecutionGraph.getFailureInfo();
if (errorInfo != null) {
builder.serializedThrowable(errorInfo.getException());
}
}

return builder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public JobResultDeserializer() {
public JobResult deserialize(final JsonParser p, final DeserializationContext ctxt)
throws IOException {
JobID jobId = null;
JobStatus jobStatus = null;
JobStatus jobStatusFromApplicationStatus = null;
JobStatus jobStatusDirect = null;
long netRuntime = -1;
SerializedThrowable serializedThrowable = null;
Map<String, SerializedValue<OptionalFailure<Object>>> accumulatorResults = null;
Expand All @@ -92,13 +93,17 @@ public JobResult deserialize(final JsonParser p, final DeserializationContext ct
case JobResultSerializer.FIELD_NAME_APPLICATION_STATUS:
assertNextToken(p, JsonToken.VALUE_STRING);
try {
jobStatus =
jobStatusFromApplicationStatus =
ApplicationStatus.valueOf(p.getValueAsString().toUpperCase())
.deriveJobStatus();
} catch (UnsupportedOperationException e) {
// jobStatus = null to indicate that the job status is unknown
}
break;
case JobResultSerializer.FIELD_NAME_JOB_STATUS:
assertNextToken(p, JsonToken.VALUE_STRING);
jobStatusDirect = JobStatus.valueOf(p.getValueAsString().toUpperCase());
break;
case JobResultSerializer.FIELD_NAME_NET_RUNTIME:
assertNextToken(p, JsonToken.VALUE_NUMBER_INT);
netRuntime = p.getLongValue();
Expand All @@ -116,6 +121,11 @@ public JobResult deserialize(final JsonParser p, final DeserializationContext ct
}
}

// Prefer direct job-status field if present; fall back to application-status for
// backward compatibility
final JobStatus jobStatus =
jobStatusDirect != null ? jobStatusDirect : jobStatusFromApplicationStatus;

try {
return new JobResult.Builder()
.jobId(jobId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public class JobResultSerializer extends StdSerializer<JobResult> {

static final String FIELD_NAME_APPLICATION_STATUS = "application-status";

static final String FIELD_NAME_JOB_STATUS = "job-status";

static final String FIELD_NAME_NET_RUNTIME = "net-runtime";

static final String FIELD_NAME_ACCUMULATOR_RESULTS = "accumulator-results";
Expand Down Expand Up @@ -83,6 +85,12 @@ public void serialize(
gen.writeFieldName(FIELD_NAME_APPLICATION_STATUS);
gen.writeString(ApplicationStatus.fromJobStatus(result.getJobStatus().orElse(null)).name());

// also include the actual job status for precise state information (e.g., SUSPENDED)
if (result.getJobStatus().isPresent()) {
gen.writeFieldName(FIELD_NAME_JOB_STATUS);
gen.writeString(result.getJobStatus().get().name());
}

gen.writeFieldName(FIELD_NAME_ACCUMULATOR_RESULTS);
gen.writeStartObject();
final Map<String, SerializedValue<OptionalFailure<Object>>> accumulatorResults =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,52 @@ void testFailureResultRequiresFailureCause() {
.build()))
.isInstanceOf(NullPointerException.class);
}

@Test
void testSuspendedJobIsFailureResult() {
final JobResult jobResult =
JobResult.createFrom(
new ArchivedExecutionGraphBuilder()
.setJobID(new JobID())
.setState(JobStatus.SUSPENDED)
.build());

assertThat(jobResult.isSuccess()).isFalse();
assertThat(jobResult.getJobStatus()).hasValue(JobStatus.SUSPENDED);
}

@Test
void testSuspendedJobThrowsJobExecutionExceptionWithDetailedMessage() {
final FlinkException cause = new FlinkException("Leadership lost");
final JobResult jobResult =
JobResult.createFrom(
new ArchivedExecutionGraphBuilder()
.setJobID(new JobID())
.setState(JobStatus.SUSPENDED)
.setFailureCause(new ErrorInfo(cause, 42L))
.build());

assertThatThrownBy(() -> jobResult.toJobExecutionResult(getClass().getClassLoader()))
.isInstanceOf(JobExecutionException.class)
.hasMessageContaining("SUSPENDED")
.hasMessageContaining("JobManager lost leadership")
.cause()
.isEqualTo(cause);
}

@Test
void testSuspendedJobWithoutCauseThrowsJobExecutionException() {
final JobResult jobResult =
JobResult.createFrom(
new ArchivedExecutionGraphBuilder()
.setJobID(new JobID())
.setState(JobStatus.SUSPENDED)
.build());

assertThatThrownBy(() -> jobResult.toJobExecutionResult(getClass().getClassLoader()))
.isInstanceOf(JobExecutionException.class)
.hasMessageContaining("SUSPENDED")
.hasMessageContaining("JobManager lost leadership")
.hasNoCause();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.rest.messages.json;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.util.jackson.JacksonMapperFactory;

Expand Down Expand Up @@ -91,4 +92,58 @@ void testIncompleteJobResult() throws Exception {
.isInstanceOf(JsonMappingException.class)
.hasMessageContaining("Could not deserialize JobResult");
}

@Test
void testDeserializationWithJobStatus() throws Exception {
final JobResult jobResult =
objectMapper.readValue(
"{\n"
+ "\t\"id\": \"1bb5e8c7df49938733b7c6a73678de6a\",\n"
+ "\t\"application-status\": \"UNKNOWN\",\n"
+ "\t\"job-status\": \"SUSPENDED\",\n"
+ "\t\"accumulator-results\": {},\n"
+ "\t\"net-runtime\": 0\n"
+ "}",
JobResult.class);

assertThat(jobResult.getJobId())
.isEqualTo(JobID.fromHexString("1bb5e8c7df49938733b7c6a73678de6a"));
assertThat(jobResult.getJobStatus()).hasValue(JobStatus.SUSPENDED);
}

@Test
void testDeserializationWithOnlyApplicationStatus() throws Exception {
// Test backward compatibility: deserialize without job-status field
final JobResult jobResult =
objectMapper.readValue(
"{\n"
+ "\t\"id\": \"1bb5e8c7df49938733b7c6a73678de6a\",\n"
+ "\t\"application-status\": \"SUCCEEDED\",\n"
+ "\t\"accumulator-results\": {},\n"
+ "\t\"net-runtime\": 0\n"
+ "}",
JobResult.class);

assertThat(jobResult.getJobId())
.isEqualTo(JobID.fromHexString("1bb5e8c7df49938733b7c6a73678de6a"));
assertThat(jobResult.getJobStatus()).hasValue(JobStatus.FINISHED);
}

@Test
void testDeserializationWithUnknownApplicationStatus() throws Exception {
// Test backward compatibility: UNKNOWN application-status without job-status
final JobResult jobResult =
objectMapper.readValue(
"{\n"
+ "\t\"id\": \"1bb5e8c7df49938733b7c6a73678de6a\",\n"
+ "\t\"application-status\": \"UNKNOWN\",\n"
+ "\t\"accumulator-results\": {},\n"
+ "\t\"net-runtime\": 0\n"
+ "}",
JobResult.class);

assertThat(jobResult.getJobId())
.isEqualTo(JobID.fromHexString("1bb5e8c7df49938733b7c6a73678de6a"));
assertThat(jobResult.getJobStatus()).isEmpty();
}
}