Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ object VeloxRuleApi {

// Gluten columnar: Fallback policies.
injector.injectFallbackPolicy(c => p => ExpandFallbackPolicy(c.caller.isAqe(), p))
// When the Velox backend requires joint fallback, patch any BloomFilterMightContain
// expressions in the whole-stage fallback plan to use VeloxBloomFilterMightContain.
// This ensures the JVM filter can read Velox-format bytes produced by native Stage 0.
injector.injectFallbackPolicy(_ => _ => BloomFilterMightContainFallbackPatcher())

// Gluten columnar: Post rules.
injector.injectPost(c => RemoveTopmostColumnarToRow(c.session, c.caller.isAqe()))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.extension

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.expression.VeloxBloomFilterMightContain
import org.apache.gluten.extension.columnar.heuristic.FallbackNode

import org.apache.spark.sql.catalyst.expressions.BloomFilterMightContain
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{FilterExec, SparkPlan}

/**
* Fallback policy rule that patches `BloomFilterMightContain` -> `VeloxBloomFilterMightContain`
* inside whole-stage fallback plans when the Velox backend requires joint fallback.
*
* When [[org.apache.gluten.extension.columnar.heuristic.ExpandFallbackPolicy]] triggers a
* whole-stage fallback it returns the original vanilla Spark plan (containing vanilla
* `BloomFilterMightContain`) wrapped in a `FallbackNode`. If the bloom-filter producer (Stage 0)
* already ran natively it produced bytes in Velox's serialization format, which is incompatible
* with `BloomFilterImpl.readFrom()`. This rule replaces the vanilla expression with
* `VeloxBloomFilterMightContain`, which reads Velox-format bytes via JNI, so the JVM filter stage
* can execute correctly after falling back.
*
* This rule runs as a second fallback-policy pass, after `ExpandFallbackPolicy`, so it only acts
* when the plan is already wrapped in a `FallbackNode`.
*/
case class BloomFilterMightContainFallbackPatcher() extends Rule[SparkPlan] {

override def apply(plan: SparkPlan): SparkPlan = {
if (!BackendsApiManager.getSettings.requireBloomFilterAggMightContainJointFallback()) {
return plan
}
plan match {
case FallbackNode(fallbackPlan) =>
FallbackNode(patchBloomFilterMightContain(fallbackPlan))
case other =>
other
}
}

// Replace BloomFilterMightContain -> VeloxBloomFilterMightContain inside FilterExec nodes
// so that the JVM filter can read Velox-format bloom filter bytes from native Stage 0.
private def patchBloomFilterMightContain(plan: SparkPlan): SparkPlan = {
plan.transformWithSubqueries {
case filterExec: FilterExec =>
val newCondition = filterExec.condition.transform {
case BloomFilterMightContain(bloomFilterExpression, valueExpression) =>
VeloxBloomFilterMightContain(bloomFilterExpression, valueExpression)
}
filterExec.copy(condition = newCondition)
case other =>
other
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ import org.apache.spark.SparkConf
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.internal.SQLConf

import org.scalatest.Tag

/**
* ScalaTest tag for the issue-12013 regression test. Run with:
* {{{
* --test-tags=org.apache.gluten.tags.Issue12013
* }}}
*/
object Issue12013 extends Tag("org.apache.gluten.tags.Issue12013")

class GlutenBloomFilterAggregateQuerySuite
extends BloomFilterAggregateQuerySuite
with GlutenSQLTestsTrait
Expand Down Expand Up @@ -112,6 +122,64 @@ class GlutenBloomFilterAggregateQuerySuite
}
}

// Regression test for https://github.com/apache/gluten/issues/12013
// When ExpandFallbackPolicy triggers a whole-stage AQE fallback, the resulting plan comes
// from the original vanilla Spark plan which contains BloomFilterMightContain (not the Velox
// variant). If Stage 0 (bloom_filter_agg subquery) already ran natively it produced Velox-
// format bytes, which BloomFilterImpl.readFrom() cannot deserialize. BloomFilterMightContain-
// FallbackPatcher patches the fallback plan to use VeloxBloomFilterMightContain so Stage 1
// can read Velox bytes via JNI even after falling back to JVM.
testGluten(
"Test bloom_filter_agg whole-stage fallback does not corrupt bloom filter bytes",
Issue12013) {
val table = "bloom_filter_test"
val numEstimatedItems = 5000000L
val sqlString =
s"""
|SELECT col positive_membership_test
|FROM $table
|WHERE might_contain(
| (SELECT bloom_filter_agg(col,
| cast($numEstimatedItems as long),
| cast($veloxBloomFilterMaxNumBits as long))
| FROM $table), col)
|""".stripMargin

withTempView(table) {
(Seq(Long.MinValue, 0, Long.MaxValue) ++ (1L to 200000L))
.toDF("col")
.createOrReplaceTempView(table)
if (BackendsApiManager.getSettings.requireBloomFilterAggMightContainJointFallback()) {
// Disable columnar filter so FilterExec falls back, and set the whole-stage fallback
// threshold so ExpandFallbackPolicy promotes the individual fallback to whole-stage.
// This reproduces the scenario where the filter stage falls back to the original
// vanilla plan while the bloom_filter_agg subquery has already produced Velox-format
// bloom filter bytes.
//
// Threshold=2: a fallen-back FilterExec introduces two ColumnarToRow/RowToColumnar
// transitions (net transition cost=2), which meets the threshold and triggers the
// whole-stage AQE fallback. The bloom_filter_agg subquery stages have an inherent
// transition cost of 1, so they do NOT trigger the fallback and run natively.
//
// ANSI mode must be off: Spark 4.0 enables ANSI by default, which causes
// ObjectHashAggregateExec to fail Gluten validation ("does not support ansi mode"),
// raising the agg-stage transition cost above 1. With ANSI off the agg-stage cost
// stays at 1 (< threshold 2), so only the filter stage falls back as intended.
withSQLConf(
GlutenConfig.COLUMNAR_FILTER_ENABLED.key -> "false",
GlutenConfig.COLUMNAR_WHOLESTAGE_FALLBACK_THRESHOLD.key -> "2",
SQLConf.ANSI_ENABLED.key -> "false"
) {
val df = spark.sql(sqlString)
// Must not throw java.io.IOException: Unexpected Bloom filter version number (16777217)
df.collect
// All 200003 rows match the bloom filter built from the same data.
assert(df.count() == 200003L)
}
}
}
}

testGluten("Test bloom_filter_agg agg fallback") {
val table = "bloom_filter_test"
val numEstimatedItems = 5000000L
Expand Down
Loading