diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java index 3e08c7536b8..8063c22e862 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java @@ -155,8 +155,9 @@ public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext event return new ExecGobblinStats(numWUsGenerated, numWUsCommitted, recordsWritten, bytesWritten, jobProps.getProperty(Help.USER_TO_PROXY_KEY)); } catch (Exception e) { - // Emit a failed GobblinTrackingEvent to record job failures - timerFactory.create(TimingEvent.LauncherTimings.JOB_FAILED).submit(); // update GaaS: `ExecutionStatus.FAILED`; `TimingEvent.JOB_END_TIME` + // Note: the JOB_FAILED GobblinTrackingEvent is now emitted by GobblinTemporalJobLauncher's JVM shutdown hook, + // which queries Temporal for the workflow's terminal status. This keeps the AM JVM as the single source of + // truth for completion GTEs (success/failure/cancel/terminate/timeout/abandoned-while-running). throw ApplicationFailure.newNonRetryableFailureWithCause( String.format("Failed Gobblin job %s", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)), e.getClass().getName(), e); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java index 26feccee1f5..be133d68a30 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java @@ -18,12 +18,16 @@ package org.apache.gobblin.temporal.joblauncher; import java.io.IOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.annotations.VisibleForTesting; import com.google.common.eventbus.EventBus; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; @@ -43,7 +47,9 @@ import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys; import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest; +import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.metrics.Tag; +import org.apache.gobblin.metrics.event.TimingEvent; import org.apache.gobblin.runtime.JobLauncher; import org.apache.gobblin.source.workunit.WorkUnit; import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner; @@ -75,12 +81,30 @@ public abstract class GobblinTemporalJobLauncher extends GobblinJobLauncher { private static final Logger log = Workflow.getLogger(GobblinTemporalJobLauncher.class); private static final int TERMINATION_TIMEOUT_SECONDS = 3; + @VisibleForTesting + static final String WORKFLOW_ID_METADATA_FIELD = "workflowId"; + @VisibleForTesting + static final String WORKFLOW_STATUS_METADATA_FIELD = "workflowStatus"; + @VisibleForTesting + static final String FAILURE_REASON_METADATA_FIELD = "failureReason"; + @VisibleForTesting + static final String AM_TERMINATED_DURING_EXECUTION_REASON = "AM_TERMINATED_DURING_EXECUTION"; + protected ManagedWorkflowServiceStubs managedWorkflowServiceStubs; protected WorkflowClient client; protected String queueName; protected String namespace; protected String workflowId; + private final AtomicBoolean jobCompletionGTEEmitted = new AtomicBoolean(false); + + // In temporal-on-yarn each AM JVM launches exactly one workflow (see {@link #handleLaunchFinalization}), + // so a static field is the single source of terminal status for the AM main() exit-code path. Populated + // either by {@link #handleLaunchFinalization} (normal completion, while Temporal stubs are still open) or + // by {@link #emitJobCompletionGTE} (JVM shutdown hook for abnormal terminations). Read by + // {@code GobblinTemporalApplicationMaster.main()} after {@code start()} returns to drive {@code System.exit}. + private static volatile WorkflowExecutionStatus lastTerminalStatus = null; + public GobblinTemporalJobLauncher(Properties jobProps, Path appWorkDir, List> metadataTags, ConcurrentHashMap runningMap, EventBus eventBus) throws Exception { @@ -97,7 +121,135 @@ public GobblinTemporalJobLauncher(Properties jobProps, Path appWorkDir, // non-null value indicates job has been submitted this.workflowId = null; + // Reset the process-wide terminal-status cache for this fresh launcher. In a real AM JVM only one + // launcher ever runs, but tests re-instantiate within the same JVM and must not see leaked status. + lastTerminalStatus = null; startCancellationExecutor(); + registerJobCompletionGTEHook(); + } + + /** + * Register a JVM shutdown hook that, on AM exit, queries Temporal for the workflow's terminal state and + * emits a single {@link org.apache.gobblin.metrics.GobblinTrackingEvent} capturing the outcome. Mirrors the + * {@code registerCleanupShutdownHook} pattern in {@link GobblinJobLauncher}. Because in temporal-on-yarn each + * Yarn application launches exactly one workflow (see {@link #handleLaunchFinalization}), AM termination + * coincides with job completion, so this hook is the single source of truth for job-completion GTEs. + */ + private void registerJobCompletionGTEHook() { + Runtime.getRuntime().addShutdownHook( + new Thread(this::emitJobCompletionGTE, + "GobblinTemporalJobLauncher-JobCompletionGTE-" + this.jobContext.getJobId())); + } + + /** + * Fetch the workflow's terminal {@link WorkflowExecutionStatus} from Temporal, translate it to the corresponding + * {@link TimingEvent.LauncherTimings} event name, and submit the GTE via the inherited {@code eventSubmitter}. + * Idempotency-guarded so multiple shutdown triggers result in a single emission. + */ + @VisibleForTesting + void emitJobCompletionGTE() { + if (!jobCompletionGTEEmitted.compareAndSet(false, true)) { + return; + } + if (this.workflowId == null) { + // submitJob was never invoked on this launcher; nothing to report. + return; + } + try { + // Prefer the value cached during handleLaunchFinalization (queried while Temporal stubs were definitely + // alive); fall back to a fresh query if we got here without a normal-flow capture (e.g., kill -9). + WorkflowExecutionStatus status = lastTerminalStatus != null ? lastTerminalStatus : fetchWorkflowStatus(); + lastTerminalStatus = status; + String eventName = mapWorkflowStatusToEventName(status); + Map metadata = buildCompletionMetadata(status); + new TimingEvent(this.eventSubmitter, eventName).stop(metadata); + log.info("Emitted job completion GTE {} for workflow {} (Temporal status {})", + eventName, this.workflowId, status); + } catch (Exception e) { + log.error("Failed to emit job completion GTE for workflow " + this.workflowId, e); + } + } + + /** + * Query Temporal for the current execution status of {@link #workflowId}. Returns + * {@code WORKFLOW_EXECUTION_STATUS_UNSPECIFIED} as a safe fallback if the describe call fails, so callers + * downstream emit a JOB_FAILED rather than swallowing the GTE entirely. + */ + private WorkflowExecutionStatus fetchWorkflowStatus() { + try { + WorkflowStub workflowStub = this.client.newUntypedWorkflowStub(this.workflowId); + DescribeWorkflowExecutionRequest request = DescribeWorkflowExecutionRequest.newBuilder() + .setNamespace(this.namespace) + .setExecution(workflowStub.getExecution()) + .build(); + DescribeWorkflowExecutionResponse response = managedWorkflowServiceStubs.getWorkflowServiceStubs() + .blockingStub().describeWorkflowExecution(request); + return response.getWorkflowExecutionInfo().getStatus(); + } catch (Exception e) { + log.warn("Failed to describe workflow {} for completion GTE; treating as UNSPECIFIED (will emit JOB_FAILED)", + this.workflowId, e); + return WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_UNSPECIFIED; + } + } + + /** + * Build the GTE metadata carrying flow/job identifiers (required by {@code KafkaAvroJobStatusMonitor.acceptEvent}, + * which drops events lacking flow group/name/executionId), plus diagnostic fields for the workflow status. + * When the workflow is still RUNNING at AM shutdown, mark a synthetic failure reason so downstream consumers can + * distinguish AM-killed-mid-execute from a genuine workflow failure. + */ + private Map buildCompletionMetadata(WorkflowExecutionStatus status) { + Map metadata = new HashMap<>(); + metadata.put(WORKFLOW_ID_METADATA_FIELD, this.workflowId); + metadata.put(WORKFLOW_STATUS_METADATA_FIELD, status.name()); + addFlowMetadataIfPresent(metadata, TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, ConfigurationKeys.FLOW_GROUP_KEY); + addFlowMetadataIfPresent(metadata, TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, ConfigurationKeys.FLOW_NAME_KEY); + addFlowMetadataIfPresent(metadata, TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, ConfigurationKeys.FLOW_EXECUTION_ID_KEY); + addFlowMetadataIfPresent(metadata, TimingEvent.FlowEventConstants.JOB_NAME_FIELD, ConfigurationKeys.JOB_NAME_KEY); + addFlowMetadataIfPresent(metadata, TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, ConfigurationKeys.JOB_GROUP_KEY); + if (isNonTerminal(status)) { + metadata.put(FAILURE_REASON_METADATA_FIELD, AM_TERMINATED_DURING_EXECUTION_REASON); + } + return metadata; + } + + private void addFlowMetadataIfPresent(Map metadata, String metadataKey, String jobPropKey) { + String value = this.jobProps.getProperty(jobPropKey); + if (value != null) { + metadata.put(metadataKey, value); + } + } + + /** + * Map a Temporal {@link WorkflowExecutionStatus} to the {@link TimingEvent.LauncherTimings} event name that + * {@code KafkaAvroJobStatusMonitor.parseJobStatus} understands. Non-terminal statuses (RUNNING, + * CONTINUED_AS_NEW, UNSPECIFIED) collapse to JOB_FAILED — the AM is going down, so from the GaaS perspective + * the job did not complete successfully. + */ + @VisibleForTesting + static String mapWorkflowStatusToEventName(WorkflowExecutionStatus status) { + switch (status) { + case WORKFLOW_EXECUTION_STATUS_COMPLETED: + return TimingEvent.LauncherTimings.JOB_SUCCEEDED; + case WORKFLOW_EXECUTION_STATUS_CANCELED: + return TimingEvent.LauncherTimings.JOB_CANCEL; + case WORKFLOW_EXECUTION_STATUS_FAILED: + case WORKFLOW_EXECUTION_STATUS_TERMINATED: + case WORKFLOW_EXECUTION_STATUS_TIMED_OUT: + case WORKFLOW_EXECUTION_STATUS_RUNNING: + case WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW: + case WORKFLOW_EXECUTION_STATUS_UNSPECIFIED: + case UNRECOGNIZED: + default: + return TimingEvent.LauncherTimings.JOB_FAILED; + } + } + + private static boolean isNonTerminal(WorkflowExecutionStatus status) { + return status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING + || status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW + || status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_UNSPECIFIED + || status == WorkflowExecutionStatus.UNRECOGNIZED; } /** @return {@link Config} now featuring all overrides rooted at {@link GobblinTemporalConfigurationKeys#GOBBLIN_TEMPORAL_JOB_LAUNCHER_CONFIG_OVERRIDES} */ @@ -114,10 +266,42 @@ protected void handleLaunchFinalization() { // for achieving batch job behavior. Given the current constraints of yarn applications requiring a static proxy user // during application creation, it is not possible to have multiple workflows running in the same application. // and so it makes sense to just kill the job after this is completed + // Capture the terminal workflow status now, while Temporal service stubs are guaranteed open. The shutdown-hook + // GTE emitter (which races with stub close) reads this cache; AM main() also reads it to set the JVM exit code. + captureTerminalWorkflowStatus(); log.info("Requesting the AM to shutdown after the job {} completed", this.jobContext.getJobId()); eventBus.post(new ClusterManagerShutdownRequest()); } + @VisibleForTesting + void captureTerminalWorkflowStatus() { + if (this.workflowId == null) { + return; + } + WorkflowExecutionStatus status = fetchWorkflowStatus(); + lastTerminalStatus = status; + log.info("Captured terminal workflow status {} for workflow {}", status, this.workflowId); + } + + /** + * @return the most recently captured terminal {@link WorkflowExecutionStatus} for the AM's single workflow, + * or {@code null} if neither {@link #handleLaunchFinalization} nor {@link #emitJobCompletionGTE} ran (e.g., + * AM crashed before any job was submitted). + */ + public static WorkflowExecutionStatus getLastTerminalStatus() { + return lastTerminalStatus; + } + + /** + * Convert a {@link WorkflowExecutionStatus} into a process exit code: {@code 0} for a clean + * {@code COMPLETED} workflow, {@code 1} for anything else (failed, cancelled, terminated, timed out, + * still running at shutdown, or unspecified/unknown). Used by {@code GobblinTemporalApplicationMaster.main()} + * to surface job-level failures as non-zero AM JVM exit codes. + */ + public static int computeExitCode(WorkflowExecutionStatus status) { + return status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED ? 0 : 1; + } + /** * Submit a job to run. */ diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/GobblinTemporalApplicationMaster.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/GobblinTemporalApplicationMaster.java index 3efadb11b38..7e1cf5b5810 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/GobblinTemporalApplicationMaster.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/GobblinTemporalApplicationMaster.java @@ -27,10 +27,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.temporal.api.enums.v1.WorkflowExecutionStatus; + import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys; import org.apache.gobblin.cluster.GobblinClusterUtils; import org.apache.gobblin.temporal.cluster.GobblinTemporalClusterManager; +import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.JvmUtils; import org.apache.gobblin.util.PathUtils; @@ -166,6 +169,16 @@ public static void main(String[] args) throws Exception { applicationMaster.start(); } + + // Surface the underlying workflow outcome as the AM JVM exit code so GGW/Grid Gateway dashboards see + // failures end-to-end. The status was captured into a static cache by GobblinTemporalJobLauncher (either + // via handleLaunchFinalization on normal completion or via the JVM shutdown-hook GTE emitter on abnormal + // termination). A null cache means no workflow ever ran -> treat as success (0). + WorkflowExecutionStatus terminalStatus = GobblinTemporalJobLauncher.getLastTerminalStatus(); + int exitCode = terminalStatus == null ? 0 : GobblinTemporalJobLauncher.computeExitCode(terminalStatus); + LOGGER.info("GobblinTemporalApplicationMaster exiting with code {} (workflow terminal status: {})", + exitCode, terminalStatus); + System.exit(exitCode); } catch (ParseException pe) { printUsage(options); System.exit(1); diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java index b359a4ba2c6..f86c41cc28e 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java @@ -46,6 +46,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.example.simplejson.SimpleJsonSource; +import org.apache.gobblin.metrics.event.TimingEvent; import org.apache.gobblin.runtime.JobState; import org.apache.gobblin.runtime.locks.FileBasedJobLock; import org.apache.gobblin.source.workunit.WorkUnit; @@ -60,6 +61,7 @@ import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; @@ -96,6 +98,13 @@ protected void cleanupStagingDirectory(JobState jobState) throws IOException { public org.apache.gobblin.runtime.JobContext getJobContext() { return this.jobContext; } + + // Drive only the normal-flow status-capture half of handleLaunchFinalization. We deliberately skip the + // eventBus.post(ClusterManagerShutdownRequest) tail because the unit-test constructor passes a null + // eventBus, and the capture-vs-broadcast pieces are independent for our tests' purposes. + public void triggerHandleLaunchFinalizationForTest() { + this.captureTerminalWorkflowStatus(); + } } @@ -126,6 +135,10 @@ public void setUp() throws Exception { @BeforeMethod public void methodSetUp() throws Exception { + // Reset invocation counts on the class-scoped mocks so per-test `verify(... times(N))` assertions + // are not polluted by interactions from earlier tests in the suite. + Mockito.clearInvocations(mockClient, mockExecutionInfo); + mockStub = mock(WorkflowStub.class); when(mockClient.newUntypedWorkflowStub(Mockito.anyString())).thenReturn(mockStub); when(mockStub.getExecution()).thenReturn(WorkflowExecution.getDefaultInstance()); @@ -304,6 +317,184 @@ public void testCloseTriggersCleanup() throws Exception { tmpDir.delete(); } + @Test + public void testMapWorkflowStatusToEventNameForCompletedEmitsJobSucceeded() { + assertEquals(GobblinTemporalJobLauncher.mapWorkflowStatusToEventName( + WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED), + TimingEvent.LauncherTimings.JOB_SUCCEEDED); + } + + @Test + public void testMapWorkflowStatusToEventNameForCancelledEmitsJobCancel() { + assertEquals(GobblinTemporalJobLauncher.mapWorkflowStatusToEventName( + WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CANCELED), + TimingEvent.LauncherTimings.JOB_CANCEL); + } + + @Test + public void testMapWorkflowStatusToEventNameForFailureStatusesEmitsJobFailed() { + // FAILED, TERMINATED, TIMED_OUT are all genuine workflow-side failures that should map to JOB_FAILED. + assertEquals(GobblinTemporalJobLauncher.mapWorkflowStatusToEventName( + WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED), + TimingEvent.LauncherTimings.JOB_FAILED); + assertEquals(GobblinTemporalJobLauncher.mapWorkflowStatusToEventName( + WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TERMINATED), + TimingEvent.LauncherTimings.JOB_FAILED); + assertEquals(GobblinTemporalJobLauncher.mapWorkflowStatusToEventName( + WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TIMED_OUT), + TimingEvent.LauncherTimings.JOB_FAILED); + } + + @Test + public void testMapWorkflowStatusToEventNameForNonTerminalStatusesEmitsJobFailed() { + // RUNNING / CONTINUED_AS_NEW / UNSPECIFIED mean the AM is going down while the workflow has not reached a + // terminal state. From the GaaS perspective this is a failure-to-complete, so we surface JOB_FAILED rather + // than leaving the consumer without a terminal event. + assertEquals(GobblinTemporalJobLauncher.mapWorkflowStatusToEventName( + WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING), + TimingEvent.LauncherTimings.JOB_FAILED); + assertEquals(GobblinTemporalJobLauncher.mapWorkflowStatusToEventName( + WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW), + TimingEvent.LauncherTimings.JOB_FAILED); + assertEquals(GobblinTemporalJobLauncher.mapWorkflowStatusToEventName( + WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_UNSPECIFIED), + TimingEvent.LauncherTimings.JOB_FAILED); + } + + @Test + public void testEmitJobCompletionGTESkipsWhenWorkflowIdUnset() throws Exception { + // submitJob has not been called, so this.workflowId is null. The hook must short-circuit and not even + // attempt to describe a workflow, otherwise it would NPE on the WorkflowClient call. + jobLauncher.emitJobCompletionGTE(); + + verify(mockClient, times(0)).newUntypedWorkflowStub(Mockito.anyString()); + } + + @Test + public void testEmitJobCompletionGTEIsIdempotent() throws Exception { + jobLauncher.submitJob(null); + when(mockExecutionInfo.getStatus()) + .thenReturn(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED); + + jobLauncher.emitJobCompletionGTE(); + jobLauncher.emitJobCompletionGTE(); + + // newUntypedWorkflowStub is the entry point inside fetchWorkflowStatus; only the first emit call should reach it. + verify(mockClient, times(1)).newUntypedWorkflowStub(Mockito.anyString()); + } + + @Test + public void testEmitJobCompletionGTEHandlesDescribeFailureWithoutThrowing() throws Exception { + jobLauncher.submitJob(null); + // Simulate Temporal being unreachable / describe failing at hook fire time. The hook must swallow the error + // and still mark itself as fired so a subsequent invocation does not retry. + Mockito.doThrow(new RuntimeException("temporal unreachable")) + .when(mockExecutionInfo).getStatus(); + try { + jobLauncher.emitJobCompletionGTE(); + // Second invocation must be a no-op even though the first one failed to fetch status. + jobLauncher.emitJobCompletionGTE(); + + verify(mockClient, times(1)).newUntypedWorkflowStub(Mockito.anyString()); + } finally { + // Always restore mock state so the doThrow does not leak into subsequent tests in the suite. + Mockito.reset(mockExecutionInfo); + } + } + + @Test + public void testComputeExitCodeForCompletedReturnsZero() { + assertEquals(GobblinTemporalJobLauncher.computeExitCode( + WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED), 0); + } + + @Test + public void testComputeExitCodeForAnyNonCompletedReturnsOne() { + // Every non-COMPLETED terminal status (and the non-terminal ones we collapse to JOB_FAILED) must + // produce a non-zero exit code so the AM JVM surfaces the failure to GGW. + WorkflowExecutionStatus[] nonSuccess = new WorkflowExecutionStatus[] { + WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED, + WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CANCELED, + WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TERMINATED, + WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TIMED_OUT, + WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING, + WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW, + WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_UNSPECIFIED + }; + for (WorkflowExecutionStatus status : nonSuccess) { + assertEquals(GobblinTemporalJobLauncher.computeExitCode(status), 1, + "Expected non-zero exit code for " + status); + } + } + + @Test + public void testHandleLaunchFinalizationPopulatesLastTerminalStatus() throws Exception { + jobLauncher.submitJob(null); + when(mockExecutionInfo.getStatus()) + .thenReturn(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED); + + jobLauncher.triggerHandleLaunchFinalizationForTest(); + + // The status must be cached so that AM main() can read it after close() has shut down Temporal stubs. + assertEquals(GobblinTemporalJobLauncher.getLastTerminalStatus(), + WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED); + } + + @Test + public void testLastTerminalStatusSurvivesClose() throws Exception { + jobLauncher.submitJob(null); + when(mockExecutionInfo.getStatus()) + .thenReturn(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED); + + jobLauncher.triggerHandleLaunchFinalizationForTest(); + jobLauncher.close(); + + // close() shuts down Temporal stubs; the cached status must remain readable so AM main() can drive exit. + assertEquals(GobblinTemporalJobLauncher.getLastTerminalStatus(), + WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED); + } + + @Test + public void testEmitJobCompletionGTEDoesNotReQueryWhenStatusAlreadyCached() throws Exception { + jobLauncher.submitJob(null); + when(mockExecutionInfo.getStatus()) + .thenReturn(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED); + + // First, normal-flow capture path queries Temporal exactly once. + jobLauncher.triggerHandleLaunchFinalizationForTest(); + verify(mockClient, times(1)).newUntypedWorkflowStub(Mockito.anyString()); + + // Then the JVM shutdown hook fires: it must reuse the cached status, not hit Temporal again. + jobLauncher.emitJobCompletionGTE(); + verify(mockClient, times(1)).newUntypedWorkflowStub(Mockito.anyString()); + } + + @Test + public void testConstructorResetsStaleTerminalStatus() throws Exception { + // Simulate a previous launcher in the same JVM having captured a terminal status. + jobLauncher.submitJob(null); + when(mockExecutionInfo.getStatus()) + .thenReturn(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED); + jobLauncher.triggerHandleLaunchFinalizationForTest(); + assertEquals(GobblinTemporalJobLauncher.getLastTerminalStatus(), + WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED); + + // A new launcher (mirrors a fresh AM JVM or test-suite reuse) must clear the static cache so a prior + // run's status does not leak into the new launcher's exit code. The job name + lock dir must be unique + // so the per-job FileBasedJobLock the parent launcher acquires doesn't reject the second construction. + File tmpDir = Files.createTempDir(); + Path appWorkDir = new Path(tmpDir.getAbsolutePath(), "freshAppWorkDir"); + String freshJobName = "freshLauncherJob"; + Properties freshProps = (Properties) jobProperties.clone(); + freshProps.setProperty(ConfigurationKeys.JOB_NAME_KEY, freshJobName); + freshProps.setProperty(ConfigurationKeys.JOB_ID_KEY, JobLauncherUtils.newJobId(freshJobName)); + freshProps.setProperty(FileBasedJobLock.JOB_LOCK_DIR, new Path(tmpDir.getAbsolutePath(), "freshLockDir").toString()); + freshProps.setProperty(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, new Path(tmpDir.getAbsolutePath(), "freshStateStore").toString()); + new GobblinTemporalJobLauncherForTest(freshProps, appWorkDir); + + assertNull(GobblinTemporalJobLauncher.getLastTerminalStatus()); + } + @Test public void testCleanupRunsOnlyOnce() throws Exception { File tmpDir = Files.createTempDir(); diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java index 5ef1b54c250..0f4f3d2448b 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java @@ -33,6 +33,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.avro.Schema; @@ -102,7 +103,10 @@ import org.apache.gobblin.cluster.GobblinClusterUtils; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.metrics.GobblinMetrics; +import org.apache.gobblin.metrics.RootMetricContext; import org.apache.gobblin.metrics.Tag; +import org.apache.gobblin.metrics.event.EventSubmitter; +import org.apache.gobblin.metrics.event.TimingEvent; import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry; import org.apache.gobblin.metrics.kafka.SchemaRegistryException; import org.apache.gobblin.metrics.reporter.util.KafkaReporterUtils; @@ -237,6 +241,41 @@ public class GobblinYarnAppLauncher { private volatile boolean stopped = false; + // GGW-side terminal GTE + exit-code propagation. The AM emits its own terminal GTE on a graceful shutdown + // (see GobblinTemporalJobLauncher#emitJobCompletionGTE), but YARN-fatal paths (AM OOM, SIGKILL, + // container-kill before the hook runs, AM CLI parse error) leave GaaS without a terminal state. + // This launcher, which is already polling the YARN ApplicationReport, is the natural place to + // synthesize a terminal GTE for those gaps and to surface the failure as a non-zero process exit. + @VisibleForTesting + static final String GGW_FAILURE_REASON_FIELD = "failureReason"; + @VisibleForTesting + static final String GGW_OBSERVED_AM_TERMINATION_REASON = "GGW_OBSERVED_AM_TERMINATION"; + @VisibleForTesting + static final String GGW_LOST_AM_VISIBILITY_REASON = "GGW_LOST_AM_VISIBILITY"; + @VisibleForTesting + static final String FINAL_APPLICATION_STATUS_FIELD = "finalApplicationStatus"; + @VisibleForTesting + static final String APPLICATION_ID_FIELD = "applicationId"; + @VisibleForTesting + static final String APPLICATION_NAME_FIELD = "applicationName"; + @VisibleForTesting + static final String DIAGNOSTICS_FIELD = "diagnostics"; + @VisibleForTesting + static final int MAX_DIAGNOSTICS_LENGTH = 4096; + private static final String GGW_EVENT_NAMESPACE = "org.apache.gobblin.yarn.GobblinYarnAppLauncher"; + + // Idempotency guard so a replayed report-arrival event (or a race between the two handlers) does not + // emit more than one terminal GTE per launcher lifetime. + private final AtomicBoolean terminalGteEmitted = new AtomicBoolean(false); + + // Set non-zero on any terminal AM failure observed by the launcher; read by main() to drive System.exit. + @Getter + private volatile int exitCode = 0; + + // Lazily initialized EventSubmitter scoped to the GGW launcher namespace. Built on first use to avoid + // adding metric-context wiring to the constructor — terminal-state emission is rare. + private volatile EventSubmitter eventSubmitter; + private final boolean emailNotificationOnShutdown; private final boolean detachOnExitEnabled; @@ -496,6 +535,8 @@ public void handleApplicationReportArrivalEvent(ApplicationReportArrivalEvent ap LOGGER.error("Gobblin Yarn application failed for the following reason: " + applicationReport.getDiagnostics()); } + handleTerminalAppStatus(applicationReport); + try { GobblinYarnAppLauncher.this.stop(); } catch (IOException ioe) { @@ -510,6 +551,117 @@ public void handleApplicationReportArrivalEvent(ApplicationReportArrivalEvent ap } } + /** + * On a terminal {@link ApplicationReport}: (1) update {@link #exitCode} so the launcher JVM surfaces the + * failure to GGW dashboards, and (2) synthesize a terminal GTE for non-success outcomes — the AM-side + * shutdown-hook GTE (see {@code GobblinTemporalJobLauncher#emitJobCompletionGTE}) does not run when the + * AM JVM dies before its hook registers (OOM/SIGKILL/early-startup-crash/CLI parse error). Idempotency + * is guarded by {@link #terminalGteEmitted}. + */ + @VisibleForTesting + void handleTerminalAppStatus(ApplicationReport applicationReport) { + FinalApplicationStatus finalStatus = applicationReport.getFinalApplicationStatus(); + if (finalStatus != FinalApplicationStatus.SUCCEEDED) { + this.exitCode = 1; + } + emitTerminalGteForAmTermination(applicationReport); + } + + private void emitTerminalGteForAmTermination(ApplicationReport applicationReport) { + FinalApplicationStatus finalStatus = applicationReport.getFinalApplicationStatus(); + // SUCCEEDED: AM's own shutdown-hook GTE already covered the outcome; nothing to synthesize here. + if (finalStatus == FinalApplicationStatus.SUCCEEDED) { + return; + } + if (!terminalGteEmitted.compareAndSet(false, true)) { + return; + } + try { + String eventName = mapFinalAppStatusToEventName(finalStatus); + Map metadata = buildLauncherTerminalMetadata(applicationReport, GGW_OBSERVED_AM_TERMINATION_REASON); + new TimingEvent(getEventSubmitter(), eventName).stop(metadata); + LOGGER.info("Emitted GGW terminal GTE {} for application {} (FinalApplicationStatus: {})", + eventName, applicationReport.getApplicationId(), finalStatus); + } catch (Exception e) { + LOGGER.error("Failed to emit GGW terminal GTE for application " + applicationReport.getApplicationId(), e); + } + } + + /** + * Map a YARN {@link FinalApplicationStatus} to the {@link TimingEvent.LauncherTimings} event name that + * {@code KafkaAvroJobStatusMonitor} understands. Mirrors {@code GobblinTemporalJobLauncher#mapWorkflowStatusToEventName} + * so the two emission paths produce a uniform event vocabulary downstream. + */ + @VisibleForTesting + static String mapFinalAppStatusToEventName(FinalApplicationStatus status) { + switch (status) { + case SUCCEEDED: + return TimingEvent.LauncherTimings.JOB_SUCCEEDED; + case KILLED: + return TimingEvent.LauncherTimings.JOB_CANCEL; + case FAILED: + case UNDEFINED: + default: + return TimingEvent.LauncherTimings.JOB_FAILED; + } + } + + /** + * Build GTE metadata using the same flow/job identifier keys as the AM-side hook (see + * {@code GobblinTemporalJobLauncher#buildCompletionMetadata}). Flow IDs are sourced from the launcher's + * {@link Config}, which the GGW/Azkaban submitter populates before launching the AM. Adds + * launcher-specific diagnostics so triage can distinguish AM-side from GGW-side emissions. + */ + private Map buildLauncherTerminalMetadata(ApplicationReport applicationReport, String failureReason) { + Map metadata = new HashMap<>(); + metadata.put(GGW_FAILURE_REASON_FIELD, failureReason); + metadata.put(APPLICATION_NAME_FIELD, this.applicationName); + if (applicationReport != null) { + if (applicationReport.getApplicationId() != null) { + metadata.put(APPLICATION_ID_FIELD, applicationReport.getApplicationId().toString()); + } + if (applicationReport.getFinalApplicationStatus() != null) { + metadata.put(FINAL_APPLICATION_STATUS_FIELD, applicationReport.getFinalApplicationStatus().name()); + } + String diagnostics = applicationReport.getDiagnostics(); + if (diagnostics != null && !diagnostics.isEmpty()) { + metadata.put(DIAGNOSTICS_FIELD, + diagnostics.length() > MAX_DIAGNOSTICS_LENGTH ? diagnostics.substring(0, MAX_DIAGNOSTICS_LENGTH) : diagnostics); + } + } else if (this.applicationId.isPresent()) { + metadata.put(APPLICATION_ID_FIELD, this.applicationId.get().toString()); + } + addLauncherFlowMetadataIfPresent(metadata, TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, ConfigurationKeys.FLOW_GROUP_KEY); + addLauncherFlowMetadataIfPresent(metadata, TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, ConfigurationKeys.FLOW_NAME_KEY); + addLauncherFlowMetadataIfPresent(metadata, TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, ConfigurationKeys.FLOW_EXECUTION_ID_KEY); + addLauncherFlowMetadataIfPresent(metadata, TimingEvent.FlowEventConstants.JOB_NAME_FIELD, ConfigurationKeys.JOB_NAME_KEY); + addLauncherFlowMetadataIfPresent(metadata, TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, ConfigurationKeys.JOB_GROUP_KEY); + return metadata; + } + + private void addLauncherFlowMetadataIfPresent(Map metadata, String metadataKey, String configKey) { + if (this.config.hasPath(configKey)) { + String value = this.config.getString(configKey); + if (value != null && !value.isEmpty()) { + metadata.put(metadataKey, value); + } + } + } + + private EventSubmitter getEventSubmitter() { + EventSubmitter local = this.eventSubmitter; + if (local == null) { + synchronized (this) { + local = this.eventSubmitter; + if (local == null) { + local = new EventSubmitter.Builder(RootMetricContext.get(), GGW_EVENT_NAMESPACE).build(); + this.eventSubmitter = local; + } + } + } + return local; + } + @Subscribe public void handleGetApplicationReportFailureEvent( GetApplicationReportFailureEvent getApplicationReportFailureEvent) { @@ -519,6 +671,8 @@ public void handleGetApplicationReportFailureEvent( .format("Number of consecutive failures to get the ApplicationReport %d exceeds the threshold %d", numConsecutiveFailures, this.maxGetApplicationReportFailures)); + handleLostAmVisibility(); + try { stop(); } catch (IOException ioe) { @@ -533,6 +687,26 @@ public void handleGetApplicationReportFailureEvent( } } + /** + * Called when the launcher has given up polling the AM (consecutive ApplicationReport fetch failures + * exceeded the threshold). We have no live YARN report to attribute, so emit a JOB_FAILED with a distinct + * failureReason so triage can tell this apart from an observed AM termination, and flip the exit code. + */ + @VisibleForTesting + void handleLostAmVisibility() { + this.exitCode = 1; + if (!terminalGteEmitted.compareAndSet(false, true)) { + return; + } + try { + Map metadata = buildLauncherTerminalMetadata(null, GGW_LOST_AM_VISIBILITY_REASON); + new TimingEvent(getEventSubmitter(), TimingEvent.LauncherTimings.JOB_FAILED).stop(metadata); + LOGGER.info("Emitted GGW lost-AM-visibility terminal GTE for application {}", this.applicationId); + } catch (Exception e) { + LOGGER.error("Failed to emit GGW lost-AM-visibility terminal GTE for application " + this.applicationId, e); + } + } + @VisibleForTesting void startYarnClient() { for (YarnClient yarnClient : potentialYarnClients.values()) { @@ -1181,5 +1355,12 @@ public void run() { }); gobblinYarnAppLauncher.launch(); + + // Surface AM-level failures (FAILED/KILLED/UNDEFINED FinalApplicationStatus, or lost-AM-visibility on + // exhausted report fetches) as a non-zero launcher process exit so GGW/Grid Gateway dashboards see them + // end-to-end. SUCCEEDED keeps exitCode at the field's default of 0. + int finalExitCode = gobblinYarnAppLauncher.getExitCode(); + LOGGER.info("GobblinYarnAppLauncher exiting with code {}", finalExitCode); + System.exit(finalExitCode); } } diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTerminalGteTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTerminalGteTest.java new file mode 100644 index 00000000000..6c523a9c4a6 --- /dev/null +++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTerminalGteTest.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.yarn; + +import java.lang.reflect.Field; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.google.common.base.Optional; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.event.TimingEvent; + +import static org.mockito.Mockito.CALLS_REAL_METHODS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.withSettings; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + + +/** + * Focused unit tests for the launcher-side terminal-GTE + exit-code logic added to + * {@link GobblinYarnAppLauncher}. Avoids spinning up the full {@link MiniYARNCluster} / Helix integration + * stack that {@link GobblinYarnAppLauncherTest} requires by constructing a real-methods Mockito stand-in + * and injecting the small handful of fields the new code paths read. + */ +public class GobblinYarnAppLauncherTerminalGteTest { + + private static final String TEST_APP_NAME = "test-app"; + private static final String TEST_APP_ID_STR = "application_1700000000000_0001"; + + private GobblinYarnAppLauncher launcher; + private ApplicationId applicationId; + + @BeforeMethod + public void setUp() throws Exception { + // Build a launcher without running the heavy constructor — we only exercise methods that read + // applicationName, config, applicationId, terminalGteEmitted, and exitCode. + this.launcher = mock(GobblinYarnAppLauncher.class, withSettings().defaultAnswer(CALLS_REAL_METHODS)); + + Config testConfig = ConfigFactory.empty() + .withValue(ConfigurationKeys.FLOW_GROUP_KEY, ConfigValueFactory.fromAnyRef("testFlowGroup")) + .withValue(ConfigurationKeys.FLOW_NAME_KEY, ConfigValueFactory.fromAnyRef("testFlowName")) + .withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, ConfigValueFactory.fromAnyRef("123456789")) + .withValue(ConfigurationKeys.JOB_NAME_KEY, ConfigValueFactory.fromAnyRef("testJob")) + .withValue(ConfigurationKeys.JOB_GROUP_KEY, ConfigValueFactory.fromAnyRef("testJobGroup")); + + this.applicationId = mock(ApplicationId.class); + when(this.applicationId.toString()).thenReturn(TEST_APP_ID_STR); + + setField("applicationName", TEST_APP_NAME); + setField("config", testConfig); + setField("applicationId", Optional.of(this.applicationId)); + setField("terminalGteEmitted", new AtomicBoolean(false)); + setField("exitCode", 0); + // eventSubmitter starts null; getEventSubmitter() lazily builds one rooted at RootMetricContext.get(), + // which is a process-wide singleton and is safe to submit no-op events into during tests. + } + + private void setField(String name, Object value) throws Exception { + Field f = GobblinYarnAppLauncher.class.getDeclaredField(name); + f.setAccessible(true); + f.set(this.launcher, value); + } + + private ApplicationReport mockReport(FinalApplicationStatus status, String diagnostics) { + ApplicationReport report = mock(ApplicationReport.class); + when(report.getApplicationId()).thenReturn(this.applicationId); + when(report.getFinalApplicationStatus()).thenReturn(status); + when(report.getDiagnostics()).thenReturn(diagnostics); + return report; + } + + // ---------- mapFinalAppStatusToEventName ---------- + + @Test + public void testMapFinalAppStatusSucceeded() { + assertEquals(GobblinYarnAppLauncher.mapFinalAppStatusToEventName(FinalApplicationStatus.SUCCEEDED), + TimingEvent.LauncherTimings.JOB_SUCCEEDED); + } + + @Test + public void testMapFinalAppStatusFailed() { + assertEquals(GobblinYarnAppLauncher.mapFinalAppStatusToEventName(FinalApplicationStatus.FAILED), + TimingEvent.LauncherTimings.JOB_FAILED); + } + + @Test + public void testMapFinalAppStatusKilled() { + assertEquals(GobblinYarnAppLauncher.mapFinalAppStatusToEventName(FinalApplicationStatus.KILLED), + TimingEvent.LauncherTimings.JOB_CANCEL); + } + + @Test + public void testMapFinalAppStatusUndefined() { + // UNDEFINED is treated as a failure so GaaS never sees a missing terminal event. + assertEquals(GobblinYarnAppLauncher.mapFinalAppStatusToEventName(FinalApplicationStatus.UNDEFINED), + TimingEvent.LauncherTimings.JOB_FAILED); + } + + // ---------- handleTerminalAppStatus: exit-code wiring ---------- + + @Test + public void testHandleTerminalAppStatusFailedSetsExitCodeOne() throws Exception { + this.launcher.handleTerminalAppStatus(mockReport(FinalApplicationStatus.FAILED, "oom in AM")); + assertEquals(readExitCode(), 1); + } + + @Test + public void testHandleTerminalAppStatusKilledSetsExitCodeOne() throws Exception { + this.launcher.handleTerminalAppStatus(mockReport(FinalApplicationStatus.KILLED, "killed by yarn")); + assertEquals(readExitCode(), 1); + } + + @Test + public void testHandleTerminalAppStatusUndefinedSetsExitCodeOne() throws Exception { + this.launcher.handleTerminalAppStatus(mockReport(FinalApplicationStatus.UNDEFINED, null)); + assertEquals(readExitCode(), 1); + } + + @Test + public void testHandleTerminalAppStatusSucceededLeavesExitCodeZero() throws Exception { + this.launcher.handleTerminalAppStatus(mockReport(FinalApplicationStatus.SUCCEEDED, null)); + assertEquals(readExitCode(), 0); + } + + @Test + public void testHandleTerminalAppStatusSucceededDoesNotSetEmittedFlag() throws Exception { + this.launcher.handleTerminalAppStatus(mockReport(FinalApplicationStatus.SUCCEEDED, null)); + // The launcher must not consume the idempotency token on a clean success — that way an asynchronous + // late-arriving FAILED report (e.g., from a replay) could still emit if the contract changed. + assertEquals(readTerminalGteEmitted().get(), false); + } + + @Test + public void testHandleTerminalAppStatusFailedConsumesIdempotencyToken() throws Exception { + this.launcher.handleTerminalAppStatus(mockReport(FinalApplicationStatus.FAILED, "oom")); + assertTrue(readTerminalGteEmitted().get()); + } + + @Test + public void testHandleTerminalAppStatusIsIdempotentOnReplay() throws Exception { + this.launcher.handleTerminalAppStatus(mockReport(FinalApplicationStatus.FAILED, "first")); + this.launcher.handleTerminalAppStatus(mockReport(FinalApplicationStatus.FAILED, "second")); + // Both calls flip exit code (cheap), but the GTE emission token is only consumed once. We assert the + // token is set; deeper GTE-stream verification belongs in the integration test. + assertEquals(readExitCode(), 1); + assertTrue(readTerminalGteEmitted().get()); + } + + // ---------- handleLostAmVisibility ---------- + + @Test + public void testHandleLostAmVisibilitySetsExitCodeAndEmittedFlag() throws Exception { + this.launcher.handleLostAmVisibility(); + assertEquals(readExitCode(), 1); + assertTrue(readTerminalGteEmitted().get()); + } + + @Test + public void testHandleLostAmVisibilityIsIdempotent() throws Exception { + this.launcher.handleLostAmVisibility(); + this.launcher.handleLostAmVisibility(); + assertEquals(readExitCode(), 1); + assertTrue(readTerminalGteEmitted().get()); + } + + @Test + public void testHandleLostAmVisibilityRespectsExistingTerminalEmission() throws Exception { + // If a report-arrival path already emitted a terminal GTE, the lost-visibility path must not double-emit. + this.launcher.handleTerminalAppStatus(mockReport(FinalApplicationStatus.FAILED, "first")); + boolean wasEmittedBefore = readTerminalGteEmitted().get(); + this.launcher.handleLostAmVisibility(); + assertEquals(readExitCode(), 1); + assertTrue(wasEmittedBefore); + assertTrue(readTerminalGteEmitted().get()); + } + + // ---------- metadata population ---------- + + @Test + public void testBuildLauncherTerminalMetadataPopulatesFlowFieldsFromConfig() throws Exception { + ApplicationReport report = mockReport(FinalApplicationStatus.FAILED, "diag-text"); + Map metadata = invokeBuildMetadata(report, GobblinYarnAppLauncher.GGW_OBSERVED_AM_TERMINATION_REASON); + + assertEquals(metadata.get(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD), "testFlowGroup"); + assertEquals(metadata.get(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD), "testFlowName"); + assertEquals(metadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD), "123456789"); + assertEquals(metadata.get(TimingEvent.FlowEventConstants.JOB_NAME_FIELD), "testJob"); + assertEquals(metadata.get(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD), "testJobGroup"); + } + + @Test + public void testBuildLauncherTerminalMetadataIncludesLauncherDiagnostics() throws Exception { + ApplicationReport report = mockReport(FinalApplicationStatus.FAILED, "diag-text"); + Map metadata = invokeBuildMetadata(report, GobblinYarnAppLauncher.GGW_OBSERVED_AM_TERMINATION_REASON); + + assertEquals(metadata.get(GobblinYarnAppLauncher.GGW_FAILURE_REASON_FIELD), + GobblinYarnAppLauncher.GGW_OBSERVED_AM_TERMINATION_REASON); + assertEquals(metadata.get(GobblinYarnAppLauncher.APPLICATION_NAME_FIELD), TEST_APP_NAME); + assertEquals(metadata.get(GobblinYarnAppLauncher.APPLICATION_ID_FIELD), TEST_APP_ID_STR); + assertEquals(metadata.get(GobblinYarnAppLauncher.FINAL_APPLICATION_STATUS_FIELD), "FAILED"); + assertEquals(metadata.get(GobblinYarnAppLauncher.DIAGNOSTICS_FIELD), "diag-text"); + } + + @Test + public void testBuildLauncherTerminalMetadataTruncatesLongDiagnostics() throws Exception { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < GobblinYarnAppLauncher.MAX_DIAGNOSTICS_LENGTH + 100; i++) { + sb.append('x'); + } + ApplicationReport report = mockReport(FinalApplicationStatus.FAILED, sb.toString()); + + Map metadata = invokeBuildMetadata(report, GobblinYarnAppLauncher.GGW_OBSERVED_AM_TERMINATION_REASON); + + assertEquals(metadata.get(GobblinYarnAppLauncher.DIAGNOSTICS_FIELD).length(), + GobblinYarnAppLauncher.MAX_DIAGNOSTICS_LENGTH); + } + + @Test + public void testBuildLauncherTerminalMetadataLostVisibilityHasNoReport() throws Exception { + Map metadata = invokeBuildMetadata(null, GobblinYarnAppLauncher.GGW_LOST_AM_VISIBILITY_REASON); + + assertEquals(metadata.get(GobblinYarnAppLauncher.GGW_FAILURE_REASON_FIELD), + GobblinYarnAppLauncher.GGW_LOST_AM_VISIBILITY_REASON); + // No live ApplicationReport is available, so finalApplicationStatus is absent. ApplicationId falls + // back to the launcher's tracked id. + assertNull(metadata.get(GobblinYarnAppLauncher.FINAL_APPLICATION_STATUS_FIELD)); + assertEquals(metadata.get(GobblinYarnAppLauncher.APPLICATION_ID_FIELD), TEST_APP_ID_STR); + } + + // ---------- reflection helpers ---------- + + @SuppressWarnings("unchecked") + private Map invokeBuildMetadata(ApplicationReport report, String failureReason) throws Exception { + java.lang.reflect.Method m = GobblinYarnAppLauncher.class.getDeclaredMethod( + "buildLauncherTerminalMetadata", ApplicationReport.class, String.class); + m.setAccessible(true); + return (Map) m.invoke(this.launcher, report, failureReason); + } + + private int readExitCode() throws Exception { + Field f = GobblinYarnAppLauncher.class.getDeclaredField("exitCode"); + f.setAccessible(true); + return (int) f.get(this.launcher); + } + + private AtomicBoolean readTerminalGteEmitted() throws Exception { + Field f = GobblinYarnAppLauncher.class.getDeclaredField("terminalGteEmitted"); + f.setAccessible(true); + return (AtomicBoolean) f.get(this.launcher); + } +}