[GLUTEN-12013][VL] Fix bloom-filter bytes corruption on whole-stage AQE fallback#12151
Open
brijrajk wants to merge 1 commit into
Open
[GLUTEN-12013][VL] Fix bloom-filter bytes corruption on whole-stage AQE fallback#12151brijrajk wants to merge 1 commit into
brijrajk wants to merge 1 commit into
Conversation
6cbe5c1 to
4a56662
Compare
…QE fallback
When ExpandFallbackPolicy triggers a whole-stage AQE fallback it reinstates
the plan from before HeuristicTransform (i.e. before pre-transform rewrites),
so BloomFilterMightContainJointRewriteRule's substitution of
BloomFilterMightContain -> VeloxBloomFilterMightContain is lost. If Stage 0
(bloom_filter_agg subquery) already executed natively it produced Velox-format
bytes; BloomFilterMightContain then calls BloomFilterImpl.readFrom() on those
bytes and throws:
java.io.IOException: Unexpected Bloom filter version number (16777217)
or the native assertion kBloomFilterV1 == version fires during merge.
Fix: register BloomFilterMightContainFallbackPatcher as a second fallback-policy
pass (after ExpandFallbackPolicy). It walks FallbackNode subtrees and replaces
any remaining BloomFilterMightContain inside FilterExec with
VeloxBloomFilterMightContain, so the JVM filter can read Velox-format bytes via
JNI even after falling back to the JVM execution path.
The patcher is guarded by requireBloomFilterAggMightContainJointFallback() so
it is a no-op for backends that do not require joint fallback (e.g. ClickHouse).
A regression test is added to GlutenBloomFilterAggregateQuerySuite that
reproduces the failure path:
- COLUMNAR_FILTER_ENABLED=false -> FilterExec falls back (net cost 2)
- WHOLESTAGE_FALLBACK_THRESHOLD=2 -> only filter stage falls back; agg runs
natively and emits Velox bytes
- ANSI_ENABLED=false -> prevents agg validation failure on Spark 4.0
which would raise agg-stage cost above 1
Fixes apache#12013
Generated-by: Claude Code (claude-sonnet-4-6)
4a56662 to
9bf19dc
Compare
|
Run Gluten Clickhouse CI on x86 |
Author
|
Could a maintainer please remove the CORE label? All three changed files are Velox-backend-specific (backends-velox/ and gluten-ut/spark40/) — no common core code is touched. VELOX label only is correct. Thanks! |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes are proposed in this pull request?
Fixes #12013
Root cause
When
ExpandFallbackPolicytriggers a whole-stage AQE fallback it reverts to the plan captured beforeHeuristicTransformruns (i.e. before all pre-transform rewrites). This means the substitution performed byBloomFilterMightContainJointRewriteRule— replacing vanilla Spark'sBloomFilterMightContainwithVeloxBloomFilterMightContain— is silently undone in the fallback plan.If Stage 0 (the
bloom_filter_aggsubquery) has already executed natively it has produced Velox-format bloom filter bytes. The vanillaBloomFilterMightContainin the fallen-back filter stage then callsBloomFilterImpl.readFrom()on those bytes, which throws:or causes a native assertion failure (
kBloomFilterV1 == version) during the merge phase.Fix
Register
BloomFilterMightContainFallbackPatcheras a second fallback-policy pass (afterExpandFallbackPolicy) inVeloxRuleApi. The patcher walks the subtree of everyFallbackNodeand replaces any remainingBloomFilterMightContaininsideFilterExecnodes withVeloxBloomFilterMightContain, so the JVM filter can continue to read Velox-format bytes via JNI even after the whole-stage fallback.The patcher is guarded by
requireBloomFilterAggMightContainJointFallback()so it is a no-op for backends that do not require joint fallback (e.g. ClickHouse).Files changed
BloomFilterMightContainFallbackPatcher.scala— NewRule[SparkPlan]that patches fallback plansVeloxRuleApi.scala— Registers the patcher as a second fallback-policy passGlutenBloomFilterAggregateQuerySuite.scala— Regression test for the exact failure scenarioHow was this patch tested?
A regression test
"Test bloom_filter_agg whole-stage fallback does not corrupt bloom filter bytes"was added toGlutenBloomFilterAggregateQuerySuite(taggedIssue12013).The test reproduces the precise failure path:
COLUMNAR_FILTER_ENABLED = false— forcesFilterExecto fall back (net transition cost = 2)COLUMNAR_WHOLESTAGE_FALLBACK_THRESHOLD = 2— only the filter stage triggers whole-stage fallback viaExpandFallbackPolicy; thebloom_filter_aggsubquery stages (inherent cost = 1 < threshold) continue to run natively and emit Velox-format bytesANSI_ENABLED = false— Spark 4.0 enables ANSI by default, which causesObjectHashAggregateExecto fail Gluten validation and raises the agg-stage transition cost above 1; disabling ANSI keeps the agg cost at 1 so only the filter falls back as intendedWithout the fix the test fails with
IOException: Unexpected Bloom filter version number (16777217). With the fix all 200,003 rows are returned correctly.The test was run inside the
gluten-devDocker container against thegluten-ut/spark40module:Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (claude-sonnet-4-6)