Skip to content

Commit 9512a4d

Browse files
committed
1 parent 2b48c23 commit 9512a4d

2 files changed

Lines changed: 22 additions & 31 deletions

File tree

modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -485,7 +485,7 @@ public GridJobProcessor(GridKernalContext ctx) {
485485
// Cancel only if we force grid to stop
486486
if (cancel) {
487487
for (GridJobWorker job : activeJobs.values()) {
488-
job.onStopping();
488+
job.onNodeStopping();
489489

490490
cancelJob(job, false);
491491
}

modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java

Lines changed: 21 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -145,16 +145,19 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
145145
private final AtomicBoolean masterLeaveGuard = new AtomicBoolean();
146146

147147
/** */
148-
private volatile boolean timedOut;
148+
private volatile boolean isStarted;
149149

150150
/** */
151-
private volatile boolean sysCancelled;
151+
private volatile boolean isCancelled;
152152

153153
/** */
154-
private volatile boolean sysStopping;
154+
private volatile boolean isCancelledBySystem;
155155

156156
/** */
157-
private volatile boolean isStarted;
157+
private volatile boolean isTimedOut;
158+
159+
/** */
160+
private volatile boolean isNodeStopping;
158161

159162
/** Deployed job. */
160163
private ComputeJob job;
@@ -272,15 +275,6 @@ public GridDeployment getDeployment() {
272275
return dep;
273276
}
274277

275-
/**
276-
* Returns {@code True} if job was cancelled by the system.
277-
*
278-
* @return {@code True} if job was cancelled by the system.
279-
*/
280-
boolean isSystemCanceled() {
281-
return sysCancelled;
282-
}
283-
284278
/**
285279
* @return Create time.
286280
*/
@@ -402,7 +396,7 @@ long getQueuedTime() {
402396
* @return {@code True} if job is timed out.
403397
*/
404398
public boolean isTimedOut() {
405-
return timedOut;
399+
return isTimedOut;
406400
}
407401

408402
/**
@@ -417,7 +411,7 @@ public boolean isInternal() {
417411
if (finishing.get())
418412
return;
419413

420-
timedOut = true;
414+
isTimedOut = true;
421415

422416
U.warn(log, "Job has timed out: " + ses);
423417

@@ -430,8 +424,8 @@ public boolean isInternal() {
430424
/**
431425
* Callback for whenever grid is stopping.
432426
*/
433-
public void onStopping() {
434-
sysStopping = true;
427+
public void onNodeStopping() {
428+
isNodeStopping = true;
435429
}
436430

437431
/**
@@ -561,10 +555,6 @@ private void execute0(boolean skipNtf) {
561555
}
562556
}
563557

564-
if (isCancelled())
565-
// If job was cancelled prior to assigning runner to it?
566-
super.cancel();
567-
568558
if (!skipNtf) {
569559
if (holdLsnr.onUnheld(this)) {
570560
if (held.decrementAndGet() == 0)
@@ -618,7 +608,7 @@ private void execute0(boolean skipNtf) {
618608
}
619609
}
620610
catch (IgniteException e) {
621-
if (sysStopping && e.hasCause(IgniteInterruptedCheckedException.class, InterruptedException.class)) {
611+
if (isNodeStopping && e.hasCause(IgniteInterruptedCheckedException.class, InterruptedException.class)) {
622612
ex = handleThrowable(e);
623613

624614
assert ex != null;
@@ -700,7 +690,7 @@ private IgniteException handleThrowable(Throwable e) {
700690

701691
// Special handling for weird interrupted exception which
702692
// happens due to JDk 1.5 bug.
703-
if (e instanceof InterruptedException && !sysStopping) {
693+
if (e instanceof InterruptedException && !isNodeStopping) {
704694
msg = "Failed to execute job due to interrupted exception.";
705695

706696
// Turn interrupted exception into checked exception.
@@ -716,7 +706,7 @@ else if ((e instanceof NoClassDefFoundError || e instanceof ClassNotFoundExcepti
716706

717707
ex = new ComputeUserUndeclaredException(msg, e);
718708
}
719-
else if (sysStopping && X.hasCause(e, InterruptedException.class, IgniteInterruptedCheckedException.class)) {
709+
else if (isNodeStopping && X.hasCause(e, InterruptedException.class, IgniteInterruptedCheckedException.class)) {
720710
msg = "Job got interrupted due to system stop (will attempt failover).";
721711

722712
ex = new ComputeExecutionRejectedException(e);
@@ -751,10 +741,11 @@ public void cancel(boolean sys) {
751741
if (log.isDebugEnabled())
752742
log.debug("Cancelling job: " + ses);
753743

754-
status = CANCELLED;
744+
isCancelled = true;
745+
746+
isCancelledBySystem = sys;
755747

756-
if (sys)
757-
sysCancelled = true;
748+
status = CANCELLED;
758749

759750
final ComputeJob job0 = job;
760751

@@ -842,7 +833,7 @@ void finishJob(
842833

843834
// Do not send reply if job has been cancelled from system.
844835
if (sndReply)
845-
sndReply = !sysCancelled;
836+
sndReply = !isCancelledBySystem;
846837

847838
// We should save message ID here since listener callback will reset sequence.
848839
ClusterNode sndNode = ctx.discovery().node(taskNode.id());
@@ -903,7 +894,7 @@ else if (!internal && ctx.event().isRecordable(EVT_JOB_REJECTED))
903894
ex,
904895
res,
905896
attrs,
906-
isCancelled(),
897+
isCancelled,
907898
retry ? ctx.cache().context().exchange().readyAffinityVersion() : null);
908899

909900
if (!loc)
@@ -1091,7 +1082,7 @@ ComputeJobStatusEnum status() {
10911082

10921083
/** {@inheritDoc} */
10931084
@Override public boolean isCancelled() {
1094-
return status == CANCELLED;
1085+
return isCancelled;
10951086
}
10961087

10971088
/** {@inheritDoc} */

0 commit comments

Comments
 (0)