Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ public GridJobProcessor(GridKernalContext ctx) {
// Cancel only if we force grid to stop
if (cancel) {
for (GridJobWorker job : activeJobs.values()) {
job.onStopping();
job.onNodeStopping();

cancelJob(job, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -145,16 +144,16 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
private final AtomicBoolean masterLeaveGuard = new AtomicBoolean();

/** */
private volatile boolean timedOut;
private volatile boolean isStarted;

/** */
private volatile boolean sysCancelled;
private volatile boolean isCancelledBySystem;

/** */
private volatile boolean sysStopping;
private volatile boolean isTimedOut;

/** */
private volatile boolean isStarted;
private volatile boolean isNodeStopping;

/** Deployed job. */
private ComputeJob job;
Expand Down Expand Up @@ -272,15 +271,6 @@ public GridDeployment getDeployment() {
return dep;
}

/**
* Returns {@code True} if job was cancelled by the system.
*
* @return {@code True} if job was cancelled by the system.
*/
boolean isSystemCanceled() {
return sysCancelled;
}

/**
* @return Create time.
*/
Expand Down Expand Up @@ -402,7 +392,7 @@ long getQueuedTime() {
* @return {@code True} if job is timed out.
*/
public boolean isTimedOut() {
return timedOut;
return isTimedOut;
}

/**
Expand All @@ -417,7 +407,7 @@ public boolean isInternal() {
if (finishing.get())
return;

timedOut = true;
isTimedOut = true;

U.warn(log, "Job has timed out: " + ses);

Expand All @@ -430,8 +420,8 @@ public boolean isInternal() {
/**
* Callback for whenever grid is stopping.
*/
public void onStopping() {
sysStopping = true;
public void onNodeStopping() {
isNodeStopping = true;
}

/**
Expand Down Expand Up @@ -561,10 +551,6 @@ private void execute0(boolean skipNtf) {
}
}

if (isCancelled())
// If job was cancelled prior to assigning runner to it?
super.cancel();

if (!skipNtf) {
if (holdLsnr.onUnheld(this)) {
if (held.decrementAndGet() == 0)
Expand Down Expand Up @@ -618,7 +604,7 @@ private void execute0(boolean skipNtf) {
}
}
catch (IgniteException e) {
if (sysStopping && e.hasCause(IgniteInterruptedCheckedException.class, InterruptedException.class)) {
if (isNodeStopping && e.hasCause(IgniteInterruptedCheckedException.class, InterruptedException.class)) {
ex = handleThrowable(e);

assert ex != null;
Expand Down Expand Up @@ -700,7 +686,7 @@ private IgniteException handleThrowable(Throwable e) {

// Special handling for weird interrupted exception which
// happens due to JDk 1.5 bug.
if (e instanceof InterruptedException && !sysStopping) {
if (e instanceof InterruptedException && !isNodeStopping) {
msg = "Failed to execute job due to interrupted exception.";

// Turn interrupted exception into checked exception.
Expand All @@ -716,7 +702,7 @@ else if ((e instanceof NoClassDefFoundError || e instanceof ClassNotFoundExcepti

ex = new ComputeUserUndeclaredException(msg, e);
}
else if (sysStopping && X.hasCause(e, InterruptedException.class, IgniteInterruptedCheckedException.class)) {
else if (isNodeStopping && X.hasCause(e, InterruptedException.class, IgniteInterruptedCheckedException.class)) {
msg = "Job got interrupted due to system stop (will attempt failover).";

ex = new ComputeExecutionRejectedException(e);
Expand Down Expand Up @@ -747,40 +733,33 @@ else if (sysStopping && X.hasCause(e, InterruptedException.class, IgniteInterrup
* @param sys System flag.
*/
public void cancel(boolean sys) {
try {
final ComputeJob job0 = job;

if (sys)
sysCancelled = true;
if (log.isDebugEnabled())
log.debug("Cancelling job: " + ses);

if (job0 != null) {
if (log.isDebugEnabled())
log.debug("Cancelling job: " + ses);
boolean firstCancel = isCancelled.compareAndSet(false, true);

status = CANCELLED;
isCancelledBySystem = sys;

U.wrapThreadLoader(dep.classLoader(), (IgniteRunnable)() -> {
try (Scope ignored = ctx.security().withContext(secCtx)) {
job0.cancel();
}
});
}
status = CANCELLED;

// Interrupting only when all 'cancelled' flags are set.
// This allows the 'job' to determine it's a cancellation.
super.cancel();
final ComputeJob job0 = job;

if (!internal && ctx.event().isRecordable(EVT_JOB_CANCELLED))
recordEvent(EVT_JOB_CANCELLED, "Job was cancelled: " + job0);
try (Scope ignored = ctx.security().withContext(secCtx)) {
U.wrapThreadLoader(dep.classLoader(), job0::cancel);
}
// Catch throwable to protect against bad user code.
catch (Throwable e) {
catch (Throwable e) { // Catch throwable to protect against bad user code.
U.error(log, "Failed to cancel job due to undeclared user exception [jobId=" + ses.getJobId() +
", ses=" + ses + ']', e);

if (e instanceof Error)
throw e;
}
finally {
onCancel(firstCancel);

if (!internal && ctx.event().isRecordable(EVT_JOB_CANCELLED))
recordEvent(EVT_JOB_CANCELLED, "Job was cancelled: " + job0);
}
}

/**
Expand Down Expand Up @@ -844,7 +823,7 @@ void finishJob(

// Do not send reply if job has been cancelled from system.
if (sndReply)
sndReply = !sysCancelled;
sndReply = !isCancelledBySystem;

// We should save message ID here since listener callback will reset sequence.
ClusterNode sndNode = ctx.discovery().node(taskNode.id());
Expand Down
Loading