Skip to content

Commit c253872

Browse files
committed
TEZ-4601: New counters in task scheduler: request times sum, max, average
1 parent 871866e commit c253872

12 files changed

Lines changed: 250 additions & 16 deletions

File tree

tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,5 +122,18 @@ public enum DAGCounter {
122122
* task assignments). This is typically exposed by a resource manager
123123
* client.
124124
*/
125-
NODE_TOTAL_COUNT
125+
NODE_TOTAL_COUNT,
126+
127+
/*
128+
* The maximum amount of time a task spent waiting as a task request before being scheduled.
129+
*/
130+
TASK_SCHEDULER_MAX_PENDING_TIME_MS,
131+
/*
132+
* The total accumulated time that all tasks spent waiting as task requests.
133+
*/
134+
TASK_SCHEDULER_SUM_PENDING_TIME_MS,
135+
/*
136+
* The average time tasks spent waiting as task requests before being scheduled.
137+
*/
138+
TASK_SCHEDULER_AVG_PENDING_TIME_MS
126139
}

tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@
2626
import org.apache.hadoop.yarn.api.records.Priority;
2727
import org.apache.hadoop.yarn.api.records.Resource;
2828
import org.apache.tez.common.ServicePluginLifecycle;
29+
import org.apache.tez.common.counters.AbstractCounters;
30+
import org.apache.tez.common.counters.CounterGroup;
31+
import org.apache.tez.common.counters.TezCounter;
32+
import org.apache.tez.common.counters.TezCounters;
2933

3034
/**
3135
* This class represents the API for a custom TaskScheduler which can be run within the Tez AM.
@@ -54,7 +58,6 @@ public enum SchedulerTaskState {
5458
public TaskScheduler(TaskSchedulerContext taskSchedulerContext) {
5559
this.taskSchedulerContext = taskSchedulerContext;
5660
}
57-
5861
/**
5962
* An entry point for initialization.
6063
* Order of service setup. Constructor, initialize(), start() - when starting a service.
@@ -280,4 +283,12 @@ protected void onContainersAllocated(List<Container> containers) {
280283
getContext().containerAllocated(container);
281284
}
282285
}
286+
287+
/**
288+
* Collects DAG-level counters from the TaskScheduler, which is then aggregated by the DAG implementation.
289+
* @return null by default, handled in upper layers
290+
*/
291+
public TaskSchedulerStatistics getStatistics() {
292+
return null;
293+
}
283294
}
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.tez.serviceplugins.api;
19+
20+
import org.apache.hadoop.util.Time;
21+
import org.apache.tez.common.counters.DAGCounter;
22+
import org.apache.tez.common.counters.TezCounters;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
/**
27+
* Statistics aggregator for task scheduler requests.
28+
*/
29+
public class TaskSchedulerStatistics {
30+
private static final Logger LOG = LoggerFactory.getLogger(TaskSchedulerStatistics.class);
31+
32+
/*
33+
* Tracking task allocation pending times.
34+
*/
35+
private int maxPendingTime = 0;
36+
private int sumPendingTime = 0;
37+
private int averagePendingTime = 0;
38+
private int pendingTaskRequestSamples = 0;
39+
40+
/**
41+
* Called by task schedulers when a request is removed.
42+
*/
43+
public void trackRequestPendingTime(TaskRequestData request) {
44+
if (request == null) {
45+
LOG.info("Got null TaskRequestData, ignoring (it's fine in case of early shutdowns)");
46+
return;
47+
}
48+
long requestPendingTime = Time.now() - request.getCreatedTime();
49+
LOG.debug("Tracking request pending time: {}", requestPendingTime);
50+
51+
sumPendingTime += (int) requestPendingTime;
52+
pendingTaskRequestSamples += 1;
53+
maxPendingTime = Math.max(maxPendingTime, (int) requestPendingTime);
54+
}
55+
56+
/**
57+
* Calculates some derived statistics.
58+
* @return this instance
59+
*/
60+
protected TaskSchedulerStatistics aggregate() {
61+
// it's fine to lose float precision here, we're interested in average of milliseconds
62+
this.averagePendingTime = pendingTaskRequestSamples > 0 ? sumPendingTime / pendingTaskRequestSamples : 0;
63+
return this;
64+
}
65+
66+
/**
67+
* Adds the stats from another aggregator to this one and calculates the derived fields by calling aggregate().
68+
* @return this instance with ready-to-use data
69+
*/
70+
public TaskSchedulerStatistics add(TaskSchedulerStatistics other) {
71+
if (other == null || other.pendingTaskRequestSamples == 0) {
72+
return aggregate();
73+
}
74+
this.maxPendingTime = Math.max(maxPendingTime, other.maxPendingTime);
75+
this.sumPendingTime += other.sumPendingTime;
76+
this.pendingTaskRequestSamples += other.pendingTaskRequestSamples;
77+
return aggregate();
78+
}
79+
80+
public TezCounters getCounters() {
81+
TezCounters counters = new TezCounters();
82+
LOG.info("Getting counters from statistics: {}", this.statsToString());
83+
// prevent filling the counters with useless 0 values if any (e.g. in case of unused TaskScheduler)
84+
if (pendingTaskRequestSamples != 0) {
85+
counters.findCounter(DAGCounter.TASK_SCHEDULER_MAX_PENDING_TIME_MS).setValue(maxPendingTime);
86+
counters.findCounter(DAGCounter.TASK_SCHEDULER_SUM_PENDING_TIME_MS).setValue(sumPendingTime);
87+
counters.findCounter(DAGCounter.TASK_SCHEDULER_AVG_PENDING_TIME_MS).setValue(averagePendingTime);
88+
}
89+
return counters;
90+
}
91+
92+
public TaskSchedulerStatistics clear() {
93+
averagePendingTime = 0;
94+
sumPendingTime = 0;
95+
pendingTaskRequestSamples = 0;
96+
maxPendingTime = 0;
97+
return this;
98+
}
99+
100+
/**
101+
* Classes implementing this interface tells basic characteristics about task requests they encapsulate.
102+
*/
103+
public interface TaskRequestData {
104+
// The time when the request was created
105+
long getCreatedTime();
106+
}
107+
108+
public String statsToString(){
109+
return String.format("[pending: {max: %d, sum: %d, average: %d, samples: %d}]", maxPendingTime, sumPendingTime,
110+
averagePendingTime, pendingTaskRequestSamples);
111+
}
112+
}

tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
import org.apache.tez.client.TezClientUtils;
7474
import org.apache.tez.common.ReflectionUtils;
7575
import org.apache.tez.common.TezUtils;
76+
import org.apache.tez.common.counters.TezCounters;
7677
import org.apache.tez.dag.api.NamedEntityDescriptor;
7778
import org.apache.tez.dag.api.SessionNotRunning;
7879
import org.apache.tez.dag.api.UserPayload;

tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2591,5 +2591,7 @@ private void updateCounters() {
25912591
setDagCounter(DAGCounter.TOTAL_CONTAINERS_USED, containersUsedByCurrentDAG.size());
25922592
setDagCounter(DAGCounter.NODE_USED_COUNT, nodesUsedByCurrentDAG.size());
25932593
setDagCounter(DAGCounter.NODE_TOTAL_COUNT, appContext.getTaskScheduler().getNumClusterNodes(true));
2594+
2595+
dagCounters.incrAllCounters(appContext.getTaskScheduler().getCounters());
25942596
}
25952597
}

tez-dag/src/main/java/org/apache/tez/dag/app/rm/DagAwareYarnTaskScheduler.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,17 @@
4343
import org.apache.hadoop.yarn.util.resource.Resources;
4444
import org.apache.tez.common.ContainerSignatureMatcher;
4545
import org.apache.tez.common.TezUtils;
46+
import org.apache.tez.common.counters.AbstractCounters;
47+
import org.apache.tez.common.counters.CounterGroup;
48+
import org.apache.tez.common.counters.TezCounter;
4649
import org.apache.tez.dag.api.TezConfiguration;
4750
import org.apache.tez.dag.app.dag.TaskAttempt;
4851
import org.apache.tez.serviceplugins.api.DagInfo;
4952
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
5053
import org.apache.tez.serviceplugins.api.TaskScheduler;
5154
import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
5255
import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AMState;
56+
import org.apache.tez.serviceplugins.api.TaskSchedulerStatistics;
5357
import org.slf4j.Logger;
5458
import org.slf4j.LoggerFactory;
5559

@@ -157,6 +161,8 @@ public class DagAwareYarnTaskScheduler extends TaskScheduler
157161
private int lastPreemptionHeartbeat = 0;
158162
private long preemptionMaxWaitTime;
159163

164+
private final TaskSchedulerStatistics taskSchedulerStatistics = new TaskSchedulerStatistics();
165+
160166
public DagAwareYarnTaskScheduler(TaskSchedulerContext taskSchedulerContext) {
161167
super(taskSchedulerContext);
162168
signatureMatcher = taskSchedulerContext.getContainerSignatureMatcher();
@@ -595,6 +601,7 @@ private void informAppAboutAssignment(TaskRequest request, Container container)
595601
request.getCookie());
596602
} else {
597603
getContext().taskAllocated(request.getTask(), request.getCookie(), container);
604+
taskSchedulerStatistics.trackRequestPendingTime(request);
598605
}
599606
}
600607

@@ -1162,6 +1169,7 @@ public synchronized void dagComplete() {
11621169
hc.resetMatchingLevel();
11631170
}
11641171
vertexDescendants = null;
1172+
taskSchedulerStatistics.clear();
11651173
}
11661174

11671175
@GuardedBy("this")
@@ -1265,12 +1273,14 @@ public void updateBlacklist(List<String> additions, List<String> removals) {
12651273
/**
12661274
* A utility class to track a task allocation.
12671275
*/
1268-
static class TaskRequest extends AMRMClient.ContainerRequest {
1276+
static class TaskRequest extends AMRMClient.ContainerRequest
1277+
implements TaskSchedulerStatistics.TaskRequestData {
12691278
final Object task;
12701279
final int vertexIndex;
12711280
final Object signature;
12721281
final Object cookie;
12731282
final ContainerId affinityContainerId;
1283+
final long created;
12741284

12751285
TaskRequest(Object task, int vertexIndex, Resource capability, String[] hosts, String[] racks,
12761286
Priority priority, Object signature, Object cookie) {
@@ -1285,6 +1295,7 @@ static class TaskRequest extends AMRMClient.ContainerRequest {
12851295
this.signature = signature;
12861296
this.cookie = cookie;
12871297
this.affinityContainerId = affinityContainerId;
1298+
this.created = Time.now();
12881299
}
12891300

12901301
Object getTask() {
@@ -1313,6 +1324,11 @@ boolean hasLocality() {
13131324
List<String> racks = getRacks();
13141325
return (nodes != null && !nodes.isEmpty()) || (racks != null && !racks.isEmpty());
13151326
}
1327+
1328+
@Override
1329+
public long getCreatedTime() {
1330+
return created;
1331+
}
13161332
}
13171333

13181334
private enum HeldContainerState {
@@ -2103,4 +2119,9 @@ protected void afterExecute(Runnable r, Throwable t) {
21032119
public int getHeldContainersCount() {
21042120
return heldContainers.size();
21052121
}
2122+
2123+
@Override
2124+
public TaskSchedulerStatistics getStatistics() {
2125+
return taskSchedulerStatistics;
2126+
}
21062127
}

0 commit comments

Comments
 (0)