Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2496,14 +2496,28 @@ public PlanFragment visitPhysicalSetOperation(
setOperationNode.setColocate(true);
}

// TODO: open comment when support `enable_local_shuffle_planner`
// for (Plan child : setOperation.children()) {
// PhysicalPlan childPhysicalPlan = (PhysicalPlan) child;
// if (JoinUtils.isStorageBucketed(childPhysicalPlan.getPhysicalProperties())) {
// setOperationNode.setDistributionMode(DistributionMode.BUCKET_SHUFFLE);
// break;
// }
// }
// A storage-bucketed child means set-op bucket shuffle was chosen by
// ChildrenPropertiesRegulator, which only does so under the FE local-shuffle planner;
// the gate here keeps the two sites explicitly consistent. Mark the node BUCKET_SHUFFLE
// so the set sink/probe align by bucket instead of execution-bucketed hash.
//
// Unlike hash join, BUCKET_SHUFFLE is not exclusive with isColocate above: for a set
// operation isColocate describes the bucket-aligned scheduling of the fragment (the
// basic child scans buckets directly), while BUCKET_SHUFFLE describes how the other
// children arrive (bucket-shuffle exchanges). Both routes converge to the same
// bucket-hash local exchange requirement in SetOperationNode.enforceAndDeriveLocalExchange.
ConnectContext setOperationConnectContext = context.getConnectContext();
if (setOperationConnectContext != null
&& setOperationConnectContext.getSessionVariable().isEnableLocalShufflePlanner()
&& SessionVariable.canUseNereidsDistributePlanner(setOperationConnectContext)) {
for (Plan child : setOperation.children()) {
PhysicalPlan childPhysicalPlan = (PhysicalPlan) child;
if (JoinUtils.isStorageBucketed(childPhysicalPlan.getPhysicalProperties())) {
setOperationNode.setDistributionMode(DistributionMode.BUCKET_SHUFFLE);
break;
}
}
}

return setOperationFragment;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.functions.table.TableValuedFunction;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.Union;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort;
import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
import org.apache.doris.nereids.trees.plans.physical.PhysicalBucketedHashAggregate;
Expand Down Expand Up @@ -71,9 +72,11 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -440,53 +443,56 @@ public PhysicalProperties visitPhysicalSetOperation(PhysicalSetOperation setOper
return PhysicalProperties.GATHER;
}

// TODO: open comment when support `enable_local_shuffle_planner`
// int distributeToChildIndex
// = setOperation.<Integer>getMutableState(PhysicalSetOperation.DISTRIBUTE_TO_CHILD_INDEX).orElse(-1);
// if (distributeToChildIndex >= 0
// && childrenDistribution.get(distributeToChildIndex) instanceof DistributionSpecHash) {
// DistributionSpecHash childDistribution
// = (DistributionSpecHash) childrenDistribution.get(distributeToChildIndex);
// List<SlotReference> childToIndex = setOperation.getRegularChildrenOutputs().get(distributeToChildIndex);
// Map<ExprId, Integer> idToOutputIndex = new LinkedHashMap<>();
// for (int j = 0; j < childToIndex.size(); j++) {
// idToOutputIndex.put(childToIndex.get(j).getExprId(), j);
// }
//
// List<ExprId> orderedShuffledColumns = childDistribution.getOrderedShuffledColumns();
// List<ExprId> setOperationDistributeColumnIds = new ArrayList<>();
// for (ExprId tableDistributeColumnId : orderedShuffledColumns) {
// Integer index = idToOutputIndex.get(tableDistributeColumnId);
// if (index == null) {
// break;
// }
// setOperationDistributeColumnIds.add(setOperation.getOutput().get(index).getExprId());
// }
// // check whether the set operation output all distribution columns of the child
// if (setOperationDistributeColumnIds.size() == orderedShuffledColumns.size()) {
// boolean isUnion = setOperation instanceof Union;
// boolean shuffleToRight = distributeToChildIndex > 0;
// if (!isUnion && shuffleToRight) {
// return new PhysicalProperties(
// new DistributionSpecHash(
// setOperationDistributeColumnIds,
// ShuffleType.EXECUTION_BUCKETED
// )
// );
// } else {
// // keep the distribution as the child
// return new PhysicalProperties(
// new DistributionSpecHash(
// setOperationDistributeColumnIds,
// childDistribution.getShuffleType(),
// childDistribution.getTableId(),
// childDistribution.getSelectedIndexId(),
// childDistribution.getPartitionIds()
// )
// );
// }
// }
// }
// When set-op bucket shuffle is chosen (DISTRIBUTE_TO_CHILD_INDEX is set by
// ChildrenPropertiesRegulator, which only happens under the FE local-shuffle planner),
// the set operation keeps the basic child's bucket distribution as its own output so the
// bucket distribution propagates upward instead of being flattened to execution-bucketed.
int distributeToChildIndex
= setOperation.<Integer>getMutableState(PhysicalSetOperation.DISTRIBUTE_TO_CHILD_INDEX).orElse(-1);
if (distributeToChildIndex >= 0
&& childrenDistribution.get(distributeToChildIndex) instanceof DistributionSpecHash) {
DistributionSpecHash childDistribution
= (DistributionSpecHash) childrenDistribution.get(distributeToChildIndex);
List<SlotReference> childToIndex = setOperation.getRegularChildrenOutputs().get(distributeToChildIndex);
Map<ExprId, Integer> idToOutputIndex = new LinkedHashMap<>();
for (int j = 0; j < childToIndex.size(); j++) {
idToOutputIndex.put(childToIndex.get(j).getExprId(), j);
}

List<ExprId> orderedShuffledColumns = childDistribution.getOrderedShuffledColumns();
List<ExprId> setOperationDistributeColumnIds = new ArrayList<>();
for (ExprId tableDistributeColumnId : orderedShuffledColumns) {
Integer index = idToOutputIndex.get(tableDistributeColumnId);
if (index == null) {
break;
}
setOperationDistributeColumnIds.add(setOperation.getOutput().get(index).getExprId());
}
// check whether the set operation output all distribution columns of the child
if (setOperationDistributeColumnIds.size() == orderedShuffledColumns.size()) {
boolean isUnion = setOperation instanceof Union;
boolean shuffleToRight = distributeToChildIndex > 0;
if (!isUnion && shuffleToRight) {
return new PhysicalProperties(
new DistributionSpecHash(
setOperationDistributeColumnIds,
ShuffleType.EXECUTION_BUCKETED
)
);
} else {
// keep the distribution as the child
return new PhysicalProperties(
new DistributionSpecHash(
setOperationDistributeColumnIds,
childDistribution.getShuffleType(),
childDistribution.getTableId(),
childDistribution.getSelectedIndexId(),
childDistribution.getPartitionIds()
)
);
}
}
}

for (int i = 0; i < childrenDistribution.size(); i++) {
DistributionSpec childDistribution = childrenDistribution.get(i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -651,83 +652,103 @@ public List<List<PhysicalProperties>> visitPhysicalSetOperation(PhysicalSetOpera
} else if (requiredDistributionSpec instanceof DistributionSpecHash) {
// TODO: should use the most common hash spec as basic
DistributionSpecHash basic = (DistributionSpecHash) requiredDistributionSpec;
// TODO: open comment when support `enable_local_shuffle_planner`
// int bucketShuffleBasicIndex = -1;
// double basicRowCount = -1;

// find the bucket shuffle basic index
// try {
// ImmutableSet<ShuffleType> supportBucketShuffleTypes = ImmutableSet.of(
// ShuffleType.NATURAL,
// ShuffleType.STORAGE_BUCKETED
// );
// for (int i = 0; i < originChildrenProperties.size(); i++) {
// PhysicalProperties originChildrenProperty = originChildrenProperties.get(i);
// DistributionSpec childDistribution = originChildrenProperty.getDistributionSpec();
// if (childDistribution instanceof DistributionSpecHash
// && supportBucketShuffleTypes.contains(
// ((DistributionSpecHash) childDistribution).getShuffleType())
// && !(isBucketShuffleDownGrade(setOperation.child(i)))) {
// Statistics stats = setOperation.child(i).getStats();
// double rowCount = stats.getRowCount();
// if (rowCount > basicRowCount) {
// basicRowCount = rowCount;
// bucketShuffleBasicIndex = i;
// }
// }
// }
// } catch (Throwable t) {
// // catch stats exception
// LOG.warn("Can not find the most (bucket num, rowCount): " + t, t);
// bucketShuffleBasicIndex = -1;
// }

// use bucket shuffle
// if (bucketShuffleBasicIndex >= 0) {
// DistributionSpecHash notShuffleSideRequire
// = (DistributionSpecHash) requiredProperties.get(bucketShuffleBasicIndex)
// .getDistributionSpec();
//
// DistributionSpecHash notNeedShuffleOutput
// = (DistributionSpecHash) originChildrenProperties.get(bucketShuffleBasicIndex)
// .getDistributionSpec();
//
// for (int i = 0; i < originChildrenProperties.size(); i++) {
// DistributionSpecHash current
// = (DistributionSpecHash) originChildrenProperties.get(i).getDistributionSpec();
// if (i == bucketShuffleBasicIndex) {
// continue;
// }
//
// DistributionSpecHash currentRequire
// = (DistributionSpecHash) requiredProperties.get(i).getDistributionSpec();
//
// PhysicalProperties target = calAnotherSideRequired(
// ShuffleType.STORAGE_BUCKETED,
// notNeedShuffleOutput, current,
// notShuffleSideRequire,
// currentRequire);
// updateChildEnforceAndCost(i, target);
// }
// setOperation.setMutableState(
// PhysicalSetOperation.DISTRIBUTE_TO_CHILD_INDEX, bucketShuffleBasicIndex);
// use partitioned shuffle
// } else {
for (int i = 0; i < originChildrenProperties.size(); i++) {
DistributionSpecHash current
= (DistributionSpecHash) originChildrenProperties.get(i).getDistributionSpec();
if (current.getShuffleType() != ShuffleType.EXECUTION_BUCKETED
|| !bothSideShuffleKeysAreSameOrder(basic, current,
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
(DistributionSpecHash) requiredProperties.get(i).getDistributionSpec())) {
int bucketShuffleBasicIndex = -1;
double basicRowCount = -1;

// Bucket shuffle for set operation is only valid when the FE plans the local
// shuffle: with the BE-side local-shuffle planner the backend cannot infer the
// correct local shuffle type for the set sink/probe and computes wrong results.
// It also requires the nereids distribute planner: the legacy coordinator only
// supports bucket-shuffle-partitioned sinks whose dest fragment contains a bucket
// shuffle join, so a bucket-shuffle set operation fragment cannot be scheduled there.
// Otherwise, keep bucketShuffleBasicIndex = -1 and fall back to the
// execution-bucketed (partitioned) shuffle below.
ConnectContext setOperationContext = ConnectContext.get();
boolean enableLocalShufflePlanner = setOperationContext != null
&& setOperationContext.getSessionVariable().isEnableLocalShufflePlanner()
&& SessionVariable.canUseNereidsDistributePlanner(setOperationContext);

// find the bucket shuffle basic index: the largest natural / storage-bucketed child
// keeps its bucket distribution, every other child is bucket-shuffled to it.
// isBucketShuffleDownGrade reuses the join-side heuristics on purpose, including
// the enable_bucket_shuffle_join switch and bucket_shuffle_downgrade_ratio: bucket
// shuffle for set operation belongs to the same optimization family as bucket
// shuffle join, so the join switches govern both instead of introducing a separate
// session variable.
if (enableLocalShufflePlanner) {
try {
ImmutableSet<ShuffleType> supportBucketShuffleTypes = ImmutableSet.of(
ShuffleType.NATURAL,
ShuffleType.STORAGE_BUCKETED
);
for (int i = 0; i < originChildrenProperties.size(); i++) {
PhysicalProperties originChildrenProperty = originChildrenProperties.get(i);
DistributionSpec childDistribution = originChildrenProperty.getDistributionSpec();
if (childDistribution instanceof DistributionSpecHash
&& supportBucketShuffleTypes.contains(
((DistributionSpecHash) childDistribution).getShuffleType())
&& !(isBucketShuffleDownGrade(setOperation.child(i)))) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isEnableBucketShuffleJoin controls set operation bucket shuffle.

The isBucketShuffleDownGrade method (line 294) checks ConnectContext.get().getSessionVariable().isEnableBucketShuffleJoin(), a join-specific session variable. A user who disables bucket shuffle for joins (enable_bucket_shuffle_join=false) also silently loses the set operation bucket shuffle optimization, with no independent control.

This coupling existed in the original commented-out PR #59006 code and is now activated. Consider whether set operations warrant a separate session variable, or at minimum document this coupling in the session variable description.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This coupling is intentional: bucket shuffle for set operation belongs to the same optimization family as bucket shuffle join (same downgrade trade-off: bucket count vs instance parallelism), so the join switch and bucket_shuffle_downgrade_ratio govern both instead of introducing one more session variable. Documented the coupling at the call site. If you prefer an independent switch I can split it out in a follow-up.

Statistics stats = setOperation.child(i).getStats();
double rowCount = stats.getRowCount();
if (rowCount > basicRowCount) {
basicRowCount = rowCount;
bucketShuffleBasicIndex = i;
}
}
}
} catch (Throwable t) {
// catch stats exception
LOG.warn("Can not find the most (bucket num, rowCount): " + t, t);
bucketShuffleBasicIndex = -1;
}
}

if (bucketShuffleBasicIndex >= 0) {
// use bucket shuffle
DistributionSpecHash notShuffleSideRequire
= (DistributionSpecHash) requiredProperties.get(bucketShuffleBasicIndex)
.getDistributionSpec();

DistributionSpecHash notNeedShuffleOutput
= (DistributionSpecHash) originChildrenProperties.get(bucketShuffleBasicIndex)
.getDistributionSpec();

for (int i = 0; i < originChildrenProperties.size(); i++) {
DistributionSpecHash current
= (DistributionSpecHash) originChildrenProperties.get(i).getDistributionSpec();
if (i == bucketShuffleBasicIndex) {
continue;
}

DistributionSpecHash currentRequire
= (DistributionSpecHash) requiredProperties.get(i).getDistributionSpec();

PhysicalProperties target = calAnotherSideRequired(
ShuffleType.EXECUTION_BUCKETED, basic, current,
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
(DistributionSpecHash) requiredProperties.get(i).getDistributionSpec());
ShuffleType.STORAGE_BUCKETED,
notNeedShuffleOutput, current,
notShuffleSideRequire,
currentRequire);
updateChildEnforceAndCost(i, target);
}
setOperation.setMutableState(
PhysicalSetOperation.DISTRIBUTE_TO_CHILD_INDEX, bucketShuffleBasicIndex);
} else {
// use partitioned shuffle
for (int i = 0; i < originChildrenProperties.size(); i++) {
DistributionSpecHash current
= (DistributionSpecHash) originChildrenProperties.get(i).getDistributionSpec();
if (current.getShuffleType() != ShuffleType.EXECUTION_BUCKETED
|| !bothSideShuffleKeysAreSameOrder(basic, current,
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
(DistributionSpecHash) requiredProperties.get(i).getDistributionSpec())) {
PhysicalProperties target = calAnotherSideRequired(
ShuffleType.EXECUTION_BUCKETED, basic, current,
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
(DistributionSpecHash) requiredProperties.get(i).getDistributionSpec());
updateChildEnforceAndCost(i, target);
}
}
}
// }
}
return ImmutableList.of(originChildrenProperties);
}
Expand Down
Loading
Loading