Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,9 @@ public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext event
return new ExecGobblinStats(numWUsGenerated, numWUsCommitted, recordsWritten, bytesWritten,
jobProps.getProperty(Help.USER_TO_PROXY_KEY));
} catch (Exception e) {
// Emit a failed GobblinTrackingEvent to record job failures
timerFactory.create(TimingEvent.LauncherTimings.JOB_FAILED).submit(); // update GaaS: `ExecutionStatus.FAILED`; `TimingEvent.JOB_END_TIME`
// Note: the JOB_FAILED GobblinTrackingEvent is now emitted by GobblinTemporalJobLauncher's JVM shutdown hook,
// which queries Temporal for the workflow's terminal status. This keeps the AM JVM as the single source of
// truth for completion GTEs (success/failure/cancel/terminate/timeout/abandoned-while-running).
throw ApplicationFailure.newNonRetryableFailureWithCause(
String.format("Failed Gobblin job %s", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)),
e.getClass().getName(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@
package org.apache.gobblin.temporal.joblauncher;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.eventbus.EventBus;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
Expand All @@ -43,7 +47,9 @@
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.JobLauncher;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
Expand Down Expand Up @@ -75,12 +81,30 @@ public abstract class GobblinTemporalJobLauncher extends GobblinJobLauncher {
private static final Logger log = Workflow.getLogger(GobblinTemporalJobLauncher.class);
private static final int TERMINATION_TIMEOUT_SECONDS = 3;

@VisibleForTesting
static final String WORKFLOW_ID_METADATA_FIELD = "workflowId";
@VisibleForTesting
static final String WORKFLOW_STATUS_METADATA_FIELD = "workflowStatus";
@VisibleForTesting
static final String FAILURE_REASON_METADATA_FIELD = "failureReason";
@VisibleForTesting
static final String AM_TERMINATED_DURING_EXECUTION_REASON = "AM_TERMINATED_DURING_EXECUTION";

protected ManagedWorkflowServiceStubs managedWorkflowServiceStubs;
protected WorkflowClient client;
protected String queueName;
protected String namespace;
protected String workflowId;

private final AtomicBoolean jobCompletionGTEEmitted = new AtomicBoolean(false);

// In temporal-on-yarn each AM JVM launches exactly one workflow (see {@link #handleLaunchFinalization}),
// so a static field is the single source of terminal status for the AM main() exit-code path. Populated
// either by {@link #handleLaunchFinalization} (normal completion, while Temporal stubs are still open) or
// by {@link #emitJobCompletionGTE} (JVM shutdown hook for abnormal terminations). Read by
// {@code GobblinTemporalApplicationMaster.main()} after {@code start()} returns to drive {@code System.exit}.
private static volatile WorkflowExecutionStatus lastTerminalStatus = null;

public GobblinTemporalJobLauncher(Properties jobProps, Path appWorkDir,
List<? extends Tag<?>> metadataTags, ConcurrentHashMap<String, Boolean> runningMap, EventBus eventBus)
throws Exception {
Expand All @@ -97,7 +121,135 @@ public GobblinTemporalJobLauncher(Properties jobProps, Path appWorkDir,

// non-null value indicates job has been submitted
this.workflowId = null;
// Reset the process-wide terminal-status cache for this fresh launcher. In a real AM JVM only one
// launcher ever runs, but tests re-instantiate within the same JVM and must not see leaked status.
lastTerminalStatus = null;
startCancellationExecutor();
registerJobCompletionGTEHook();
}

/**
* Register a JVM shutdown hook that, on AM exit, queries Temporal for the workflow's terminal state and
* emits a single {@link org.apache.gobblin.metrics.GobblinTrackingEvent} capturing the outcome. Mirrors the
* {@code registerCleanupShutdownHook} pattern in {@link GobblinJobLauncher}. Because in temporal-on-yarn each
* Yarn application launches exactly one workflow (see {@link #handleLaunchFinalization}), AM termination
* coincides with job completion, so this hook is the single source of truth for job-completion GTEs.
*/
private void registerJobCompletionGTEHook() {
Runtime.getRuntime().addShutdownHook(
new Thread(this::emitJobCompletionGTE,
"GobblinTemporalJobLauncher-JobCompletionGTE-" + this.jobContext.getJobId()));
}

/**
* Fetch the workflow's terminal {@link WorkflowExecutionStatus} from Temporal, translate it to the corresponding
* {@link TimingEvent.LauncherTimings} event name, and submit the GTE via the inherited {@code eventSubmitter}.
* Idempotency-guarded so multiple shutdown triggers result in a single emission.
*/
@VisibleForTesting
void emitJobCompletionGTE() {
if (!jobCompletionGTEEmitted.compareAndSet(false, true)) {
return;
}
if (this.workflowId == null) {
// submitJob was never invoked on this launcher; nothing to report.
return;
}
try {
// Prefer the value cached during handleLaunchFinalization (queried while Temporal stubs were definitely
// alive); fall back to a fresh query if we got here without a normal-flow capture (e.g., kill -9).
WorkflowExecutionStatus status = lastTerminalStatus != null ? lastTerminalStatus : fetchWorkflowStatus();
lastTerminalStatus = status;
String eventName = mapWorkflowStatusToEventName(status);
Map<String, String> metadata = buildCompletionMetadata(status);
new TimingEvent(this.eventSubmitter, eventName).stop(metadata);
log.info("Emitted job completion GTE {} for workflow {} (Temporal status {})",
eventName, this.workflowId, status);
} catch (Exception e) {
log.error("Failed to emit job completion GTE for workflow " + this.workflowId, e);
}
}

/**
* Query Temporal for the current execution status of {@link #workflowId}. Returns
* {@code WORKFLOW_EXECUTION_STATUS_UNSPECIFIED} as a safe fallback if the describe call fails, so callers
* downstream emit a JOB_FAILED rather than swallowing the GTE entirely.
*/
private WorkflowExecutionStatus fetchWorkflowStatus() {
try {
WorkflowStub workflowStub = this.client.newUntypedWorkflowStub(this.workflowId);
DescribeWorkflowExecutionRequest request = DescribeWorkflowExecutionRequest.newBuilder()
.setNamespace(this.namespace)
.setExecution(workflowStub.getExecution())
.build();
DescribeWorkflowExecutionResponse response = managedWorkflowServiceStubs.getWorkflowServiceStubs()
.blockingStub().describeWorkflowExecution(request);
return response.getWorkflowExecutionInfo().getStatus();
} catch (Exception e) {
log.warn("Failed to describe workflow {} for completion GTE; treating as UNSPECIFIED (will emit JOB_FAILED)",
this.workflowId, e);
return WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_UNSPECIFIED;
}
}

/**
* Build the GTE metadata carrying flow/job identifiers (required by {@code KafkaAvroJobStatusMonitor.acceptEvent},
* which drops events lacking flow group/name/executionId), plus diagnostic fields for the workflow status.
* When the workflow is still RUNNING at AM shutdown, mark a synthetic failure reason so downstream consumers can
* distinguish AM-killed-mid-execute from a genuine workflow failure.
*/
private Map<String, String> buildCompletionMetadata(WorkflowExecutionStatus status) {
Map<String, String> metadata = new HashMap<>();
metadata.put(WORKFLOW_ID_METADATA_FIELD, this.workflowId);
metadata.put(WORKFLOW_STATUS_METADATA_FIELD, status.name());
addFlowMetadataIfPresent(metadata, TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, ConfigurationKeys.FLOW_GROUP_KEY);
addFlowMetadataIfPresent(metadata, TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, ConfigurationKeys.FLOW_NAME_KEY);
addFlowMetadataIfPresent(metadata, TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
addFlowMetadataIfPresent(metadata, TimingEvent.FlowEventConstants.JOB_NAME_FIELD, ConfigurationKeys.JOB_NAME_KEY);
addFlowMetadataIfPresent(metadata, TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, ConfigurationKeys.JOB_GROUP_KEY);
if (isNonTerminal(status)) {
metadata.put(FAILURE_REASON_METADATA_FIELD, AM_TERMINATED_DURING_EXECUTION_REASON);
}
return metadata;
}

private void addFlowMetadataIfPresent(Map<String, String> metadata, String metadataKey, String jobPropKey) {
String value = this.jobProps.getProperty(jobPropKey);
if (value != null) {
metadata.put(metadataKey, value);
}
}

/**
* Map a Temporal {@link WorkflowExecutionStatus} to the {@link TimingEvent.LauncherTimings} event name that
* {@code KafkaAvroJobStatusMonitor.parseJobStatus} understands. Non-terminal statuses (RUNNING,
* CONTINUED_AS_NEW, UNSPECIFIED) collapse to JOB_FAILED — the AM is going down, so from the GaaS perspective
* the job did not complete successfully.
*/
@VisibleForTesting
static String mapWorkflowStatusToEventName(WorkflowExecutionStatus status) {
switch (status) {
case WORKFLOW_EXECUTION_STATUS_COMPLETED:
return TimingEvent.LauncherTimings.JOB_SUCCEEDED;
case WORKFLOW_EXECUTION_STATUS_CANCELED:
return TimingEvent.LauncherTimings.JOB_CANCEL;
case WORKFLOW_EXECUTION_STATUS_FAILED:
case WORKFLOW_EXECUTION_STATUS_TERMINATED:
case WORKFLOW_EXECUTION_STATUS_TIMED_OUT:
case WORKFLOW_EXECUTION_STATUS_RUNNING:
case WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW:
case WORKFLOW_EXECUTION_STATUS_UNSPECIFIED:
case UNRECOGNIZED:
default:
return TimingEvent.LauncherTimings.JOB_FAILED;
}
}

private static boolean isNonTerminal(WorkflowExecutionStatus status) {
return status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING
|| status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW
|| status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_UNSPECIFIED
|| status == WorkflowExecutionStatus.UNRECOGNIZED;
}

/** @return {@link Config} now featuring all overrides rooted at {@link GobblinTemporalConfigurationKeys#GOBBLIN_TEMPORAL_JOB_LAUNCHER_CONFIG_OVERRIDES} */
Expand All @@ -114,10 +266,42 @@ protected void handleLaunchFinalization() {
// for achieving batch job behavior. Given the current constraints of yarn applications requiring a static proxy user
// during application creation, it is not possible to have multiple workflows running in the same application.
// and so it makes sense to just kill the job after this is completed
// Capture the terminal workflow status now, while Temporal service stubs are guaranteed open. The shutdown-hook
// GTE emitter (which races with stub close) reads this cache; AM main() also reads it to set the JVM exit code.
captureTerminalWorkflowStatus();
log.info("Requesting the AM to shutdown after the job {} completed", this.jobContext.getJobId());
eventBus.post(new ClusterManagerShutdownRequest());
}

@VisibleForTesting
void captureTerminalWorkflowStatus() {
if (this.workflowId == null) {
return;
}
WorkflowExecutionStatus status = fetchWorkflowStatus();
lastTerminalStatus = status;
log.info("Captured terminal workflow status {} for workflow {}", status, this.workflowId);
}

/**
* @return the most recently captured terminal {@link WorkflowExecutionStatus} for the AM's single workflow,
* or {@code null} if neither {@link #handleLaunchFinalization} nor {@link #emitJobCompletionGTE} ran (e.g.,
* AM crashed before any job was submitted).
*/
public static WorkflowExecutionStatus getLastTerminalStatus() {
return lastTerminalStatus;
}

/**
* Convert a {@link WorkflowExecutionStatus} into a process exit code: {@code 0} for a clean
* {@code COMPLETED} workflow, {@code 1} for anything else (failed, cancelled, terminated, timed out,
* still running at shutdown, or unspecified/unknown). Used by {@code GobblinTemporalApplicationMaster.main()}
* to surface job-level failures as non-zero AM JVM exit codes.
*/
public static int computeExitCode(WorkflowExecutionStatus status) {
return status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED ? 0 : 1;
}

/**
* Submit a job to run.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.temporal.api.enums.v1.WorkflowExecutionStatus;

import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
import org.apache.gobblin.cluster.GobblinClusterUtils;
import org.apache.gobblin.temporal.cluster.GobblinTemporalClusterManager;
import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.JvmUtils;
import org.apache.gobblin.util.PathUtils;
Expand Down Expand Up @@ -166,6 +169,16 @@ public static void main(String[] args) throws Exception {

applicationMaster.start();
}

// Surface the underlying workflow outcome as the AM JVM exit code so GGW/Grid Gateway dashboards see
// failures end-to-end. The status was captured into a static cache by GobblinTemporalJobLauncher (either
// via handleLaunchFinalization on normal completion or via the JVM shutdown-hook GTE emitter on abnormal
// termination). A null cache means no workflow ever ran -> treat as success (0).
WorkflowExecutionStatus terminalStatus = GobblinTemporalJobLauncher.getLastTerminalStatus();
int exitCode = terminalStatus == null ? 0 : GobblinTemporalJobLauncher.computeExitCode(terminalStatus);
LOGGER.info("GobblinTemporalApplicationMaster exiting with code {} (workflow terminal status: {})",
exitCode, terminalStatus);
System.exit(exitCode);
} catch (ParseException pe) {
printUsage(options);
System.exit(1);
Expand Down
Loading