diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java index fd1b0122d6dae..3a5924c968430 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java @@ -485,7 +485,7 @@ public GridJobProcessor(GridKernalContext ctx) { // Cancel only if we force grid to stop if (cancel) { for (GridJobWorker job : activeJobs.values()) { - job.onStopping(); + job.onNodeStopping(); cancelJob(job, false); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java index 90f32db551fff..b59fd5b15da55 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java @@ -63,7 +63,6 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.Marshaller; import org.jetbrains.annotations.Nullable; @@ -145,16 +144,16 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject { private final AtomicBoolean masterLeaveGuard = new AtomicBoolean(); /** */ - private volatile boolean timedOut; + private volatile boolean isStarted; /** */ - private volatile boolean sysCancelled; + private volatile boolean isCancelledBySystem; /** */ - private volatile boolean sysStopping; + private volatile boolean isTimedOut; /** */ - private volatile boolean isStarted; + private volatile boolean isNodeStopping; /** Deployed job. */ private ComputeJob job; @@ -272,15 +271,6 @@ public GridDeployment getDeployment() { return dep; } - /** - * Returns {@code True} if job was cancelled by the system. - * - * @return {@code True} if job was cancelled by the system. - */ - boolean isSystemCanceled() { - return sysCancelled; - } - /** * @return Create time. */ @@ -402,7 +392,7 @@ long getQueuedTime() { * @return {@code True} if job is timed out. */ public boolean isTimedOut() { - return timedOut; + return isTimedOut; } /** @@ -417,7 +407,7 @@ public boolean isInternal() { if (finishing.get()) return; - timedOut = true; + isTimedOut = true; U.warn(log, "Job has timed out: " + ses); @@ -430,8 +420,8 @@ public boolean isInternal() { /** * Callback for whenever grid is stopping. */ - public void onStopping() { - sysStopping = true; + public void onNodeStopping() { + isNodeStopping = true; } /** @@ -561,10 +551,6 @@ private void execute0(boolean skipNtf) { } } - if (isCancelled()) - // If job was cancelled prior to assigning runner to it? - super.cancel(); - if (!skipNtf) { if (holdLsnr.onUnheld(this)) { if (held.decrementAndGet() == 0) @@ -618,7 +604,7 @@ private void execute0(boolean skipNtf) { } } catch (IgniteException e) { - if (sysStopping && e.hasCause(IgniteInterruptedCheckedException.class, InterruptedException.class)) { + if (isNodeStopping && e.hasCause(IgniteInterruptedCheckedException.class, InterruptedException.class)) { ex = handleThrowable(e); assert ex != null; @@ -700,7 +686,7 @@ private IgniteException handleThrowable(Throwable e) { // Special handling for weird interrupted exception which // happens due to JDk 1.5 bug. - if (e instanceof InterruptedException && !sysStopping) { + if (e instanceof InterruptedException && !isNodeStopping) { msg = "Failed to execute job due to interrupted exception."; // Turn interrupted exception into checked exception. @@ -716,7 +702,7 @@ else if ((e instanceof NoClassDefFoundError || e instanceof ClassNotFoundExcepti ex = new ComputeUserUndeclaredException(msg, e); } - else if (sysStopping && X.hasCause(e, InterruptedException.class, IgniteInterruptedCheckedException.class)) { + else if (isNodeStopping && X.hasCause(e, InterruptedException.class, IgniteInterruptedCheckedException.class)) { msg = "Job got interrupted due to system stop (will attempt failover)."; ex = new ComputeExecutionRejectedException(e); @@ -747,40 +733,33 @@ else if (sysStopping && X.hasCause(e, InterruptedException.class, IgniteInterrup * @param sys System flag. */ public void cancel(boolean sys) { - try { - final ComputeJob job0 = job; - - if (sys) - sysCancelled = true; + if (log.isDebugEnabled()) + log.debug("Cancelling job: " + ses); - if (job0 != null) { - if (log.isDebugEnabled()) - log.debug("Cancelling job: " + ses); + boolean firstCancel = isCancelled.compareAndSet(false, true); - status = CANCELLED; + isCancelledBySystem = sys; - U.wrapThreadLoader(dep.classLoader(), (IgniteRunnable)() -> { - try (Scope ignored = ctx.security().withContext(secCtx)) { - job0.cancel(); - } - }); - } + status = CANCELLED; - // Interrupting only when all 'cancelled' flags are set. - // This allows the 'job' to determine it's a cancellation. - super.cancel(); + final ComputeJob job0 = job; - if (!internal && ctx.event().isRecordable(EVT_JOB_CANCELLED)) - recordEvent(EVT_JOB_CANCELLED, "Job was cancelled: " + job0); + try (Scope ignored = ctx.security().withContext(secCtx)) { + U.wrapThreadLoader(dep.classLoader(), job0::cancel); } - // Catch throwable to protect against bad user code. - catch (Throwable e) { + catch (Throwable e) { // Catch throwable to protect against bad user code. U.error(log, "Failed to cancel job due to undeclared user exception [jobId=" + ses.getJobId() + ", ses=" + ses + ']', e); if (e instanceof Error) throw e; } + finally { + onCancel(firstCancel); + + if (!internal && ctx.event().isRecordable(EVT_JOB_CANCELLED)) + recordEvent(EVT_JOB_CANCELLED, "Job was cancelled: " + job0); + } } /** @@ -844,7 +823,7 @@ void finishJob( // Do not send reply if job has been cancelled from system. if (sndReply) - sndReply = !sysCancelled; + sndReply = !isCancelledBySystem; // We should save message ID here since listener callback will reset sequence. ClusterNode sndNode = ctx.discovery().node(taskNode.id());