Skip to content

Commit 7d3a2a7

Browse files
committed
TEZ-4603: Bucket Map Join can hang if the source vertex parallelism is changed by reducer autoparallelism
1 parent 871866e commit 7d3a2a7

3 files changed

Lines changed: 40 additions & 1 deletion

File tree

tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,14 @@ public EdgeManagerPluginDescriptor getEdgeManagerDescriptor() {
261261
return edgeManagerDescriptor;
262262
}
263263

264+
/**
265+
* Returns a new EdgeProperty with the given EdgeManagerPluginDescriptor.
266+
*/
267+
public EdgeProperty withDescriptor(EdgeManagerPluginDescriptor newDescriptor) {
268+
return new EdgeProperty(newDescriptor, this.dataMovementType, this.dataSourceType,
269+
this.schedulingType, this.outputDescriptor, this.inputDescriptor);
270+
}
271+
264272
@Override
265273
public String toString() {
266274
return "{ " + dataMovementType + " : " + inputDescriptor.getClassName()

tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1998,7 +1998,12 @@ private void setParallelismWrapper(int parallelism, VertexLocationHint vertexLoc
19981998
Vertex sourceVertex = appContext.getCurrentDAG().getVertex(entry.getKey());
19991999
Edge edge = sourceVertices.get(sourceVertex);
20002000
try {
2001-
edge.setEdgeProperty(entry.getValue());
2001+
if (edge != null) {
2002+
edge.setEdgeProperty(entry.getValue());
2003+
} else {
2004+
LOG.warn("Edge is null, sourceVertex = {}, entry.getValue() = {}",
2005+
sourceVertex, entry.getValue());
2006+
}
20022007
} catch (Exception e) {
20032008
throw new TezUncheckedException(e);
20042009
}

tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.tez.dag.api.EdgeManagerPluginContext;
2828
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
2929
import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
30+
import org.apache.tez.dag.api.EdgeProperty;
3031
import org.apache.tez.dag.api.TezUncheckedException;
3132
import org.apache.tez.dag.api.UserPayload;
3233
import org.apache.tez.dag.api.VertexManagerPluginContext;
@@ -52,6 +53,7 @@
5253
import java.util.Comparator;
5354
import java.util.List;
5455
import java.util.Map;
56+
import java.util.HashMap;
5557

5658
/**
5759
* Starts scheduling tasks when number of completed source tasks crosses
@@ -520,6 +522,30 @@ ReconfigVertexParams computeRouting() {
520522
for(Map.Entry<String, SourceVertexInfo> entry : bipartiteItr) {
521523
entry.getValue().newDescriptor = descriptor;
522524
}
525+
526+
// Additionally, update custom edges.
527+
Map<String, EdgeProperty> outputEdges = getContext().getOutputVertexEdgeProperties();
528+
Map<String, EdgeProperty> updatedEdges = new HashMap<>();
529+
for (Map.Entry<String, EdgeProperty> entry : outputEdges.entrySet()) {
530+
if (entry.getValue().getDataMovementType() == EdgeProperty.DataMovementType.CUSTOM) {
531+
// Build a new custom edge manager configuration with updated parallelism.
532+
CustomShuffleEdgeManagerConfig customConfig = new CustomShuffleEdgeManagerConfig(
533+
currentParallelism, finalTaskParallelism, basePartitionRange,
534+
(remainderRangeForLastShuffler > 0 ? remainderRangeForLastShuffler : basePartitionRange));
535+
EdgeManagerPluginDescriptor newDescriptor = EdgeManagerPluginDescriptor.create(CustomShuffleEdgeManager.class.getName());
536+
newDescriptor.setUserPayload(customConfig.toUserPayload());
537+
538+
// Update the EdgeProperty with the new descriptor.
539+
EdgeProperty updatedProp = entry.getValue().withDescriptor(newDescriptor);
540+
updatedEdges.put(entry.getKey(), updatedProp);
541+
}
542+
}
543+
544+
// If any custom edges were updated, propagate the new configuration.
545+
if (!updatedEdges.isEmpty()) {
546+
getContext().reconfigureVertex(finalTaskParallelism, null, updatedEdges);
547+
}
548+
523549
ReconfigVertexParams params =
524550
new ReconfigVertexParams(finalTaskParallelism, null);
525551
return params;

0 commit comments

Comments
 (0)