From 2b48c23c6bfd7600c1fdb2bf34ba725771170b80 Mon Sep 17 00:00:00 2001 From: Mikhail Petrov Date: Wed, 11 Mar 2026 18:14:51 +0300 Subject: [PATCH 1/3] IGNITE-28189 Fixed incorrect status of a Compute Task when it is canceled --- .../processors/job/GridJobWorker.java | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java index 90f32db551fff..e3c20f307836d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java @@ -748,23 +748,21 @@ else if (sysStopping && X.hasCause(e, InterruptedException.class, IgniteInterrup */ public void cancel(boolean sys) { try { - final ComputeJob job0 = job; + if (log.isDebugEnabled()) + log.debug("Cancelling job: " + ses); + + status = CANCELLED; if (sys) sysCancelled = true; - if (job0 != null) { - if (log.isDebugEnabled()) - log.debug("Cancelling job: " + ses); - - status = CANCELLED; + final ComputeJob job0 = job; - U.wrapThreadLoader(dep.classLoader(), (IgniteRunnable)() -> { - try (Scope ignored = ctx.security().withContext(secCtx)) { - job0.cancel(); - } - }); - } + U.wrapThreadLoader(dep.classLoader(), (IgniteRunnable)() -> { + try (Scope ignored = ctx.security().withContext(secCtx)) { + job0.cancel(); + } + }); // Interrupting only when all 'cancelled' flags are set. // This allows the 'job' to determine it's a cancellation. @@ -1091,6 +1089,11 @@ ComputeJobStatusEnum status() { return jobId.hashCode(); } + /** {@inheritDoc} */ + @Override public boolean isCancelled() { + return status == CANCELLED; + } + /** {@inheritDoc} */ @Override protected void onCancel(boolean firstCancelRequest) { if (firstCancelRequest) From 8b76370322257fd5912fc5ee5e39095e876c717f Mon Sep 17 00:00:00 2001 From: Mikhail Petrov Date: Thu, 12 Mar 2026 01:32:55 +0300 Subject: [PATCH 2/3] IGNITE-28189 --- .../processors/job/GridJobProcessor.java | 2 +- .../processors/job/GridJobWorker.java | 51 +++++++------------ 2 files changed, 18 insertions(+), 35 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java index fd1b0122d6dae..3a5924c968430 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java @@ -485,7 +485,7 @@ public GridJobProcessor(GridKernalContext ctx) { // Cancel only if we force grid to stop if (cancel) { for (GridJobWorker job : activeJobs.values()) { - job.onStopping(); + job.onNodeStopping(); cancelJob(job, false); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java index e3c20f307836d..29201106d8c21 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java @@ -145,16 +145,16 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject { private final AtomicBoolean masterLeaveGuard = new AtomicBoolean(); /** */ - private volatile boolean timedOut; + private volatile boolean isStarted; /** */ - private volatile boolean sysCancelled; + private volatile boolean isCancelledBySystem; /** */ - private volatile boolean sysStopping; + private volatile boolean isTimedOut; /** */ - private volatile boolean isStarted; + private volatile boolean isNodeStopping; /** Deployed job. */ private ComputeJob job; @@ -272,15 +272,6 @@ public GridDeployment getDeployment() { return dep; } - /** - * Returns {@code True} if job was cancelled by the system. - * - * @return {@code True} if job was cancelled by the system. - */ - boolean isSystemCanceled() { - return sysCancelled; - } - /** * @return Create time. */ @@ -402,7 +393,7 @@ long getQueuedTime() { * @return {@code True} if job is timed out. */ public boolean isTimedOut() { - return timedOut; + return isTimedOut; } /** @@ -417,7 +408,7 @@ public boolean isInternal() { if (finishing.get()) return; - timedOut = true; + isTimedOut = true; U.warn(log, "Job has timed out: " + ses); @@ -430,8 +421,8 @@ public boolean isInternal() { /** * Callback for whenever grid is stopping. */ - public void onStopping() { - sysStopping = true; + public void onNodeStopping() { + isNodeStopping = true; } /** @@ -561,10 +552,6 @@ private void execute0(boolean skipNtf) { } } - if (isCancelled()) - // If job was cancelled prior to assigning runner to it? - super.cancel(); - if (!skipNtf) { if (holdLsnr.onUnheld(this)) { if (held.decrementAndGet() == 0) @@ -618,7 +605,7 @@ private void execute0(boolean skipNtf) { } } catch (IgniteException e) { - if (sysStopping && e.hasCause(IgniteInterruptedCheckedException.class, InterruptedException.class)) { + if (isNodeStopping && e.hasCause(IgniteInterruptedCheckedException.class, InterruptedException.class)) { ex = handleThrowable(e); assert ex != null; @@ -700,7 +687,7 @@ private IgniteException handleThrowable(Throwable e) { // Special handling for weird interrupted exception which // happens due to JDk 1.5 bug. - if (e instanceof InterruptedException && !sysStopping) { + if (e instanceof InterruptedException && !isNodeStopping) { msg = "Failed to execute job due to interrupted exception."; // Turn interrupted exception into checked exception. @@ -716,7 +703,7 @@ else if ((e instanceof NoClassDefFoundError || e instanceof ClassNotFoundExcepti ex = new ComputeUserUndeclaredException(msg, e); } - else if (sysStopping && X.hasCause(e, InterruptedException.class, IgniteInterruptedCheckedException.class)) { + else if (isNodeStopping && X.hasCause(e, InterruptedException.class, IgniteInterruptedCheckedException.class)) { msg = "Job got interrupted due to system stop (will attempt failover)."; ex = new ComputeExecutionRejectedException(e); @@ -751,10 +738,11 @@ public void cancel(boolean sys) { if (log.isDebugEnabled()) log.debug("Cancelling job: " + ses); - status = CANCELLED; + boolean firstCancel = isCancelled.compareAndSet(false, true); - if (sys) - sysCancelled = true; + isCancelledBySystem = sys; + + status = CANCELLED; final ComputeJob job0 = job; @@ -766,7 +754,7 @@ public void cancel(boolean sys) { // Interrupting only when all 'cancelled' flags are set. // This allows the 'job' to determine it's a cancellation. - super.cancel(); + onCancel(firstCancel); if (!internal && ctx.event().isRecordable(EVT_JOB_CANCELLED)) recordEvent(EVT_JOB_CANCELLED, "Job was cancelled: " + job0); @@ -842,7 +830,7 @@ void finishJob( // Do not send reply if job has been cancelled from system. if (sndReply) - sndReply = !sysCancelled; + sndReply = !isCancelledBySystem; // We should save message ID here since listener callback will reset sequence. ClusterNode sndNode = ctx.discovery().node(taskNode.id()); @@ -1089,11 +1077,6 @@ ComputeJobStatusEnum status() { return jobId.hashCode(); } - /** {@inheritDoc} */ - @Override public boolean isCancelled() { - return status == CANCELLED; - } - /** {@inheritDoc} */ @Override protected void onCancel(boolean firstCancelRequest) { if (firstCancelRequest) From 7605187454d86b8584f99cfeeadf4e49bbc141b8 Mon Sep 17 00:00:00 2001 From: Mikhail Petrov Date: Thu, 12 Mar 2026 11:37:58 +0300 Subject: [PATCH 3/3] IGNITE-28189 --- .../processors/job/GridJobWorker.java | 37 ++++++++----------- 1 file changed, 15 insertions(+), 22 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java index 29201106d8c21..b59fd5b15da55 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java @@ -63,7 +63,6 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.Marshaller; import org.jetbrains.annotations.Nullable; @@ -734,39 +733,33 @@ else if (isNodeStopping && X.hasCause(e, InterruptedException.class, IgniteInter * @param sys System flag. */ public void cancel(boolean sys) { - try { - if (log.isDebugEnabled()) - log.debug("Cancelling job: " + ses); - - boolean firstCancel = isCancelled.compareAndSet(false, true); + if (log.isDebugEnabled()) + log.debug("Cancelling job: " + ses); - isCancelledBySystem = sys; + boolean firstCancel = isCancelled.compareAndSet(false, true); - status = CANCELLED; + isCancelledBySystem = sys; - final ComputeJob job0 = job; + status = CANCELLED; - U.wrapThreadLoader(dep.classLoader(), (IgniteRunnable)() -> { - try (Scope ignored = ctx.security().withContext(secCtx)) { - job0.cancel(); - } - }); + final ComputeJob job0 = job; - // Interrupting only when all 'cancelled' flags are set. - // This allows the 'job' to determine it's a cancellation. - onCancel(firstCancel); - - if (!internal && ctx.event().isRecordable(EVT_JOB_CANCELLED)) - recordEvent(EVT_JOB_CANCELLED, "Job was cancelled: " + job0); + try (Scope ignored = ctx.security().withContext(secCtx)) { + U.wrapThreadLoader(dep.classLoader(), job0::cancel); } - // Catch throwable to protect against bad user code. - catch (Throwable e) { + catch (Throwable e) { // Catch throwable to protect against bad user code. U.error(log, "Failed to cancel job due to undeclared user exception [jobId=" + ses.getJobId() + ", ses=" + ses + ']', e); if (e instanceof Error) throw e; } + finally { + onCancel(firstCancel); + + if (!internal && ctx.event().isRecordable(EVT_JOB_CANCELLED)) + recordEvent(EVT_JOB_CANCELLED, "Job was cancelled: " + job0); + } } /**