Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2073,7 +2073,7 @@ public PlanFragment visitPhysicalPartitionTopN(PhysicalPartitionTopN<? extends P
// in pipeline engine, we use parallel scan by default, but it broke the rule of data distribution
// we need turn of parallel scan to ensure to get correct result.
if (partitionTopN.getPhase() == PartitionTopnPhase.ONE_PHASE_GLOBAL_PTOPN
&& findOlapScanNodesByPassExchangeAndJoinNode(inputFragment.getPlanRoot())) {
&& findOlapScanNodesByPassExchangeNode(inputFragment.getPlanRoot())) {
inputFragment.setHasColocatePlanNode(true);
}
return inputFragment;
Expand Down Expand Up @@ -2327,7 +2327,7 @@ public PlanFragment visitPhysicalSetOperation(
// we need turn of parallel scan to ensure to get correct result.
// TODO: nereids forbid all parallel scan under PhysicalSetOperation temporary
if (!setOperation.getPhysicalProperties().equals(PhysicalProperties.ANY)
&& findOlapScanNodesByPassExchangeAndJoinNode(setOperationFragment.getPlanRoot())) {
&& findOlapScanNodesByPassExchangeNode(setOperationFragment.getPlanRoot())) {
setOperationFragment.setHasColocatePlanNode(true);
setOperationNode.setColocate(true);
}
Expand Down Expand Up @@ -2648,7 +2648,7 @@ public PlanFragment visitPhysicalWindow(PhysicalWindow<? extends Plan> physicalW
// in pipeline engine, we use parallel scan by default, but it broke the rule of data distribution
// we need turn of parallel scan to ensure to get correct result.
// TODO: nereids forbid all parallel scan under PhysicalSetOperation temporary
if (findOlapScanNodesByPassExchangeAndJoinNode(inputPlanFragment.getPlanRoot())) {
if (findOlapScanNodesByPassExchangeNode(inputPlanFragment.getPlanRoot())) {
inputPlanFragment.setHasColocatePlanNode(true);
analyticEvalNode.setColocate(true);
if (root instanceof SortNode) {
Expand Down Expand Up @@ -3200,11 +3200,11 @@ private PhysicalCTEConsumer getCTEConsumerChild(PhysicalPlan root) {
}
}

private boolean findOlapScanNodesByPassExchangeAndJoinNode(PlanNode root) {
private boolean findOlapScanNodesByPassExchangeNode(PlanNode root) {
if (root instanceof OlapScanNode) {
return true;
} else if (!(root instanceof JoinNodeBase || root instanceof ExchangeNode)) {
return root.getChildren().stream().anyMatch(child -> findOlapScanNodesByPassExchangeAndJoinNode(child));
} else if (!(root instanceof ExchangeNode)) {
return root.getChildren().stream().anyMatch(child -> findOlapScanNodesByPassExchangeNode(child));
}
return false;
}
Expand Down
Loading