Skip to content

Commit 5366844

Browse files
committed
TEZ-4505: Create counters about time intervals spent in certain states in StateMachineTez
1 parent 5beab4c commit 5366844

7 files changed

Lines changed: 123 additions & 2 deletions

File tree

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.tez.common.counters;
19+
20+
public class TaskAttemptCounter {
21+
22+
}

tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2088,6 +2088,19 @@ static Set<String> getPropertySet() {
20882088
+ "dag.status.pollinterval-ms";
20892089
public static final long TEZ_DAG_STATUS_POLLINTERVAL_MS_DEFAULT = 500;
20902090

2091+
/**
2092+
* Boolean value
2093+
* Whether to count how much time the DAG StateMachine spends in certain states
2094+
* and report it in counters.
2095+
* Minor performance degradation is possible so this is turned off by default.
2096+
*/
2097+
@ConfigurationScope(Scope.DAG)
2098+
@ConfigurationProperty(type = "boolean")
2099+
public static final String TEZ_DAG_STATE_INTERVAL_MONITOR_ENABLED = TEZ_PREFIX
2100+
+ "dag.state.interval.monitor.enabled";
2101+
//FIXME: false
2102+
public static final boolean TEZ_DAG_STATE_INTERVAL_MONITOR_ENABLED_DEFAULT = true;
2103+
20912104
/**
20922105
* Long value.
20932106
* Time to wait (in seconds) for apps to complete on MiniTezCluster shutdown.

tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ public void setCounters(TezCounters counters) {
7676
List<String> getDiagnostics();
7777
TaskAttemptTerminationCause getTerminationCause();
7878
TezCounters getCounters();
79+
TezCounters getStateCounters();
7980
@VisibleForTesting
8081
void setCounters(TezCounters counters);
8182
float getProgress();

tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -588,6 +588,9 @@ private void augmentStateMachine() {
588588
STATE_CHANGED_CALLBACK)
589589
.registerStateEnteredCallback(DAGState.ERROR,
590590
STATE_CHANGED_CALLBACK);
591+
if (StateMachineTez.isStateIntervalMonitorEnabled(dagConf)) {
592+
stateMachine.enableStateIntervalMonitor();
593+
}
591594
}
592595

593596
private static class DagStateChangedCallback
@@ -1943,6 +1946,7 @@ public TezCounters constructFinalFullcounters() {
19431946
for (Vertex v : this.vertices.values()) {
19441947
aggregateTezCounters.aggrAllCounters(v.getAllCounters());
19451948
}
1949+
stateMachine.incrementStateCounters(DAGCounter.class.getName(), aggregateTezCounters);
19461950
return aggregateTezCounters;
19471951
}
19481952

tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,11 @@
3838
import org.apache.tez.dag.app.dag.event.TaskEventTAKilled;
3939
import org.apache.tez.dag.app.dag.event.TaskEventTALaunched;
4040
import org.apache.tez.dag.app.dag.event.TaskEventTASucceeded;
41+
import org.apache.tez.dag.app.dag.event.TaskEventType;
4142
import org.apache.tez.dag.app.rm.AMSchedulerEventTAStateUpdated;
4243
import org.apache.tez.runtime.api.TaskFailureType;
4344
import org.apache.tez.serviceplugins.api.TaskScheduler;
45+
import org.apache.tez.state.StateMachineTez;
4446
import org.apache.tez.util.StringInterner;
4547
import org.slf4j.Logger;
4648
import org.slf4j.LoggerFactory;
@@ -63,6 +65,7 @@
6365
import org.apache.hadoop.yarn.util.Records;
6466
import org.apache.tez.common.TezUtilsInternal;
6567
import org.apache.tez.common.counters.DAGCounter;
68+
import org.apache.tez.common.counters.TaskCounter;
6669
import org.apache.tez.common.counters.TezCounters;
6770
import org.apache.tez.dag.api.TezConfiguration;
6871
import org.apache.tez.dag.api.TezUncheckedException;
@@ -76,7 +79,9 @@
7679
import org.apache.tez.dag.app.TaskHeartbeatHandler;
7780
import org.apache.tez.dag.app.dag.TaskAttempt;
7881
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
82+
import org.apache.tez.dag.app.dag.TaskStateInternal;
7983
import org.apache.tez.dag.app.dag.Vertex;
84+
import org.apache.tez.dag.app.dag.DAGState;
8085
import org.apache.tez.dag.app.dag.Task;
8186
import org.apache.tez.dag.app.dag.event.DAGEvent;
8287
import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
@@ -242,7 +247,7 @@ public static DataEventDependencyInfo fromProto(DataEventDependencyInfoProto pro
242247
private static SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
243248
STATUS_UPDATER = new StatusUpdaterTransition();
244249

245-
private final StateMachine<TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent> stateMachine;
250+
private final StateMachineTez<TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent, TaskAttemptImpl> stateMachine;
246251

247252
// TODO TEZ-2003 (post) TEZ-2667 We may need some additional state management for STATUS_UPDATES, FAILED, KILLED coming in before
248253
// TASK_STARTED_REMOTELY. In case of a PUSH it's more intuitive to send TASK_STARTED_REMOTELY after communicating
@@ -571,7 +576,10 @@ public TaskAttemptImpl(TezTaskAttemptID attemptId, EventHandler eventHandler,
571576
this.reportedStatus = new TaskAttemptStatus(this.attemptId);
572577
initTaskAttemptStatus(reportedStatus);
573578
RackResolver.init(conf);
574-
this.stateMachine = stateMachineFactory.make(this);
579+
this.stateMachine =
580+
new StateMachineTez<TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent, TaskAttemptImpl>(
581+
stateMachineFactory.make(this), this);
582+
augmentStateMachine();
575583
this.isRescheduled = isRescheduled;
576584
this.taskResource = resource;
577585
this.containerContext = containerContext;
@@ -584,6 +592,12 @@ public TaskAttemptImpl(TezTaskAttemptID attemptId, EventHandler eventHandler,
584592
null : appContext.getDAGRecoveryData().getTaskAttemptRecoveryData(attemptId);
585593
}
586594

595+
private void augmentStateMachine() {
596+
if (StateMachineTez.isStateIntervalMonitorEnabled(conf)) {
597+
stateMachine.enableStateIntervalMonitor();
598+
}
599+
}
600+
587601
@Override
588602
public TezTaskAttemptID getTaskAttemptID() {
589603
return attemptId;
@@ -655,6 +669,13 @@ public TezCounters getCounters() {
655669
}
656670
}
657671

672+
@Override
673+
public TezCounters getStateCounters() {
674+
TezCounters counters = new TezCounters();
675+
stateMachine.incrementStateCounters("TaskAttemptCounter_" + getVertex().getName(), counters);
676+
return counters;
677+
}
678+
658679
@VisibleForTesting
659680
@Override
660681
public void setCounters(TezCounters counters) {

tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.commons.lang.StringUtils;
4040
import org.apache.commons.lang.exception.ExceptionUtils;
4141
import org.apache.hadoop.classification.InterfaceAudience.Private;
42+
import org.apache.tez.common.counters.TaskAttemptCounter;
4243
import org.apache.tez.dag.app.dag.event.TaskEventTAFailed;
4344
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
4445
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -58,6 +59,7 @@
5859
import org.apache.hadoop.yarn.state.StateMachineFactory;
5960
import org.apache.hadoop.yarn.util.Clock;
6061
import org.apache.tez.common.counters.TaskCounter;
62+
import org.apache.tez.common.counters.TezCounter;
6163
import org.apache.tez.common.counters.TezCounters;
6264
import org.apache.tez.dag.api.TaskLocationHint;
6365
import org.apache.tez.dag.api.TezUncheckedException;
@@ -315,6 +317,9 @@ private void augmentStateMachine() {
315317
stateMachine
316318
.registerStateEnteredCallback(TaskStateInternal.SUCCEEDED,
317319
STATE_CHANGED_CALLBACK);
320+
if (StateMachineTez.isStateIntervalMonitorEnabled(conf)) {
321+
stateMachine.enableStateIntervalMonitor();
322+
}
318323
}
319324

320325
private final StateMachineTez<TaskStateInternal, TaskEventType, TaskEvent, TaskImpl>
@@ -474,6 +479,10 @@ public TezCounters getCounters() {
474479
try {
475480
TaskAttempt bestAttempt = selectBestAttempt();
476481
TezCounters taskCounters = (bestAttempt != null) ? bestAttempt.getCounters() : TaskAttemptImpl.EMPTY_COUNTERS;
482+
483+
stateMachine.incrementStateCounters(TaskCounter.class.getSimpleName() + "_" + getVertex().getName(), taskCounters);
484+
maybeMergeAllTaskAttemptCounters(taskCounters);
485+
477486
if (getVertex().isSpeculationEnabled()) {
478487
tezCounters.incrAllCounters(taskCounters);
479488
return tezCounters;
@@ -484,6 +493,18 @@ public TezCounters getCounters() {
484493
}
485494
}
486495

496+
private void maybeMergeAllTaskAttemptCounters(TezCounters taskCounters) {
497+
if (stateMachine.isStateIntervalMonitorEnabled()) {
498+
// if the state interval monitoring is disabled for this TaskImpl.stateMachine,
499+
// it's disabled for all TaskAttemptImpl's stateMachine too
500+
return;
501+
}
502+
String groupName = TaskAttemptCounter.class.getSimpleName() + "_" + getVertex().getName();
503+
for (TaskAttempt at : attempts.values()) {
504+
taskCounters.getGroup(groupName).aggrAllCounters(at.getStateCounters().getGroup(groupName));
505+
}
506+
}
507+
487508
TaskStatistics getStatistics() {
488509
// simply return the stats from the best attempt
489510
readLock.lock();

tez-dag/src/main/java/org/apache/tez/state/StateMachineTez.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,12 @@
2121
import java.util.HashMap;
2222
import java.util.Map;
2323

24+
import org.apache.hadoop.conf.Configuration;
25+
import org.apache.hadoop.util.Time;
2426
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
2527
import org.apache.hadoop.yarn.state.StateMachine;
28+
import org.apache.tez.common.counters.TezCounters;
29+
import org.apache.tez.dag.api.TezConfiguration;
2630
import org.apache.tez.dag.records.TezID;
2731

2832
public class StateMachineTez<STATE extends Enum<STATE>, EVENTTYPE extends Enum<EVENTTYPE>, EVENT, OPERAND>
@@ -34,6 +38,10 @@ public class StateMachineTez<STATE extends Enum<STATE>, EVENTTYPE extends Enum<E
3438

3539
private final StateMachine<STATE, EVENTTYPE, EVENT> realStatemachine;
3640

41+
private boolean isStateIntervalMonitorEnabled = false;
42+
private long lastStateChangedTime = Time.monotonicNow();
43+
private Map<String, Long> intervalSpentInStatesMs = new HashMap<>();
44+
3745
@SuppressWarnings("unchecked")
3846
public StateMachineTez(StateMachine sm, OPERAND operand) {
3947
this.realStatemachine = sm;
@@ -67,7 +75,38 @@ public STATE doTransition(EVENTTYPE eventType, EVENT event) throws
6775
if (callback != null) {
6876
callback.onStateChanged(operand, newState);
6977
}
78+
if (isStateIntervalMonitorEnabled) {
79+
String stateName = oldState.name();
80+
if (!intervalSpentInStatesMs.containsKey(stateName)) {
81+
intervalSpentInStatesMs.put(stateName, 0L);
82+
}
83+
long now = Time.monotonicNow();
84+
intervalSpentInStatesMs.put(stateName, now - lastStateChangedTime);
85+
lastStateChangedTime = now;
86+
}
7087
}
7188
return newState;
7289
}
90+
91+
public static boolean isStateIntervalMonitorEnabled(Configuration conf) {
92+
return conf.getBoolean(TezConfiguration.TEZ_DAG_STATE_INTERVAL_MONITOR_ENABLED,
93+
TezConfiguration.TEZ_DAG_STATE_INTERVAL_MONITOR_ENABLED_DEFAULT);
94+
}
95+
96+
public boolean isStateIntervalMonitorEnabled() {
97+
return isStateIntervalMonitorEnabled;
98+
}
99+
100+
public void enableStateIntervalMonitor() {
101+
this.isStateIntervalMonitorEnabled = true;
102+
}
103+
104+
public void incrementStateCounters(String group, TezCounters counters) {
105+
if (isStateIntervalMonitorEnabled) {
106+
return;
107+
}
108+
for (Map.Entry<String, Long> e : intervalSpentInStatesMs.entrySet()) {
109+
counters.getGroup(group).findCounter(e.getKey(), true).increment(e.getValue());
110+
}
111+
}
73112
}

0 commit comments

Comments
 (0)