Skip to content

feat(shuffleV2): support multi-CN via serve-all-buckets mode#24940

Open
aunjgr wants to merge 5 commits into
matrixorigin:mainfrom
aunjgr:feat/multi-cn-shuffle-v2
Open

feat(shuffleV2): support multi-CN via serve-all-buckets mode#24940
aunjgr wants to merge 5 commits into
matrixorigin:mainfrom
aunjgr:feat/multi-cn-shuffle-v2

Conversation

@aunjgr

@aunjgr aunjgr commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

What type of PR is this?

  • API-change
  • BUG
  • Improvement
  • Documentation
  • Feature
  • Test and CI
  • Code Refactoring

Which issue(s) this PR fixes:

issue #24097

What this PR does / why we need it:

Shuffle v2 (in-process ShufflePoolV2) was only used on single-CN because its Call() method only served the current worker's bucket. On multi-CN source scopes where Mcpu=1 (maxHolders==1), the single worker wrote rows to all N buckets but only drained bucket 0 — data for remote CNs was stranded in the pool.

Fix: When maxHolders == 1, Call() uses getAnyFullBatch() / getAnyLastBatch() to serve batches from all buckets. This lets the downstream Dispatch operator route each batch to the correct target CN based on ShuffleIDX.

Compile changes: The multi-CN constructor paths (constructShuffleOperatorForJoin and constructShuffleArgForGroup) now create v2 shuffle operators instead of v1. Single-CN paths (compileShuffleJoinV2 / compileShuffleGroupV2) are unchanged.

remoterun.go: Added vm.ShuffleV2 serialization/deserialization.

How scattering/gathering works:

Source CN (Mcpu=1):
  [child ops] -> [v2 Shuffle] -> [Dispatch]
                    |                 |
                    | set ShuffleIDX  | routes bat to register[ShuffleIDX]
                    | serve all N     |
                    | buckets         +- LocalRegs  -> PipelineSpool (same CN)
                    |                 +- RemoteRegs -> MoRPC (remote CN)

Target CN: MergeReceiver -> [HashJoin / Group]

🤖 Generated with Claude Code

@aunjgr aunjgr requested a review from ouyuanning as a code owner June 11, 2026 10:41
@qodo-code-review

Copy link
Copy Markdown

Qodo reviews are paused for this user.

Troubleshooting steps vary by plan Learn more →

On a Teams plan?
Reviews resume once this user has a paid seat and their Git account is linked in Qodo.
Link Git account →

Using GitHub Enterprise Server, GitLab Self-Managed, or Bitbucket Data Center?
These require an Enterprise plan - Contact us
Contact us →

aunjgr added 2 commits June 15, 2026 23:26
When maxHolders == 1 (single worker on multi-CN source scope),
Call() serves full batches from all buckets instead of only
CurrentShuffleIdx. This lets dispatch route data to the correct
target CNs.

Also swap multi-CN compile constructors to use v2 operators:
- constructShuffleOperatorForJoin: v1 -> v2
- constructShuffleArgForGroup: v1 -> v2

Add vm.ShuffleV2 serialization/deserialization in remoterun.go.
- Prepare defaults maxHolders to 1; dupOperator overrides with SetMaxHolders
- allStop checks stoppers >= holders instead of stoppers == maxHolders
- Call() serves all buckets when maxHolders == 1 (single worker)
- hashShuffle returns const/constNull directly instead of through pool
- Remove DEDUP hash join gate in compileJoin
- Add getAnyFullBatch/getAnyLastBatch to ShufflePoolV2
@matrix-meow matrix-meow added size/M Denotes a PR that changes [100,499] lines and removed size/S Denotes a PR that changes [10,99] lines labels Jun 15, 2026
aunjgr added 3 commits June 16, 2026 00:36
- Prepare defaults maxHolders to 1; dupOperator overrides with SetMaxHolders
- allStop checks stoppers >= holders instead of stoppers == maxHolders
- Call() serves all buckets when maxHolders == 1 (single worker)
- hashShuffle returns const/constNull directly instead of through pool
- Remove DEDUP hash join gate, restore v1 for multi-CN path
- Add getAnyFullBatch/getAnyLastBatch to ShufflePoolV2
- Multi-CN (len(cnlist)>1): newShuffleJoinScopeList uses V2 constructors
- Single-CN after-merge (Mcpu!=Dop): keeps V1 constructors
- Single-CN Mcpu==Dop: compileShuffleJoinV2 (V2, unchanged)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kind/enhancement kind/feature size/M Denotes a PR that changes [100,499] lines

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants