Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -1134,7 +1134,13 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with Logging {
Sig[AssertNotNull](ExpressionNames.ASSERT_NOT_NULL),
// For test purpose.
Sig[VeloxDummyExpression](VeloxDummyExpression.VELOX_DUMMY_EXPRESSION)
)
) ++ scala.util.Try(
// scalastyle:off classforname
Sig(
Class.forName("org.apache.spark.sql.catalyst.expressions.BitmapConstructAgg"),
ExpressionNames.BITMAP_CONSTRUCT_AGG)
// scalastyle:on classforname
).toOption.toSeq
}

override def rewriteSpillPath(path: String): String = {
Expand Down
3 changes: 2 additions & 1 deletion cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1323,7 +1323,8 @@ bool SubstraitToVeloxPlanValidator::validate(const ::substrait::AggregateRel& ag
"regr_slope",
"regr_intercept",
"regr_sxy",
"regr_replacement"};
"regr_replacement",
"bitmap_construct_agg"};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-up opportunity: register bitmap_or_agg (and bitmap_and_agg for Spark 4.1)

Problem: bitmap_or_agg was introduced in the same Spark 3.5 commit as bitmap_construct_agg, and bitmap_and_agg in Spark 4.1. If native Velox implementations exist for these, registering them together would avoid unnecessary columnar-to-row transitions when users combine bitmap functions in the same query stage.

Investigation Needed: Confirm whether bitmap_or_agg and bitmap_and_agg have native Velox implementations (look for registration in cpp/velox/udf/ or the Velox aggregate function registry). If yes, consider adding them in a follow-up PR:

      "bitmap_construct_agg",
      "bitmap_or_agg",
      "bitmap_and_agg"};

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've Velox PR for bitmap_or_agg in review phase. Will add follow up PR for Gluten once Velox PR merges. Thanks!


auto udafFuncs = UdfLoader::getInstance()->getRegisteredUdafNames();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ class VeloxTestSettings extends BackendTestSettings {
"INCONSISTENT_BEHAVIOR_CROSS_VERSION: compatibility with Spark 2.4/3.2 in reading/writing dates")
// Doesn't support unhex with failOnError=true.
.exclude("CONVERSION_INVALID_INPUT: to_binary conversion function hex")
// bitmap_construct_agg offloaded to Velox throws GlutenException instead of
// SparkArrayIndexOutOfBoundsException.
.exclude("INVALID_BITMAP_POSITION: position out of bounds")
.exclude("INVALID_BITMAP_POSITION: negative position")
enableSuite[GlutenQueryParsingErrorsSuite]
enableSuite[GlutenArithmeticExpressionSuite]
.exclude("SPARK-45786: Decimal multiply, divide, remainder, quot")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,25 @@
*/
package org.apache.spark.sql

import org.apache.gluten.execution.HashAggregateExecBaseTransformer

import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper

class GlutenBitmapExpressionsQuerySuite
extends BitmapExpressionsQuerySuite
with GlutenSQLTestsTrait {}
with GlutenSQLTestsTrait
with AdaptiveSparkPlanHelper {

test("bitmap_construct_agg routes to native") {
val df = spark.sql(
"SELECT bitmap_construct_agg(bitmap_bit_position(col)) " +
"FROM values (1L), (2L), (3L) AS t(col)")
df.collect()
assert(
collectWithSubqueries(df.queryExecution.executedPlan) {
case h: HashAggregateExecBaseTransformer => h
}.nonEmpty,
"Expected native HashAggregateExecBaseTransformer in plan"
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,10 @@ class VeloxTestSettings extends BackendTestSettings {
"INCONSISTENT_BEHAVIOR_CROSS_VERSION: compatibility with Spark 2.4/3.2 in reading/writing dates")
// Doesn't support unhex with failOnError=true.
.exclude("CONVERSION_INVALID_INPUT: to_binary conversion function hex")
// bitmap_construct_agg offloaded to Velox throws GlutenException instead of
// SparkArrayIndexOutOfBoundsException.
.exclude("INVALID_BITMAP_POSITION: position out of bounds")
.exclude("INVALID_BITMAP_POSITION: negative position")
enableSuite[GlutenQueryParsingErrorsSuite]
enableSuite[GlutenQueryContextSuite]
enableSuite[GlutenQueryExecutionAnsiErrorsSuite]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,25 @@
*/
package org.apache.spark.sql

import org.apache.gluten.execution.HashAggregateExecBaseTransformer

import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper

class GlutenBitmapExpressionsQuerySuite
extends BitmapExpressionsQuerySuite
with GlutenSQLTestsTrait {}
with GlutenSQLTestsTrait
with AdaptiveSparkPlanHelper {

test("bitmap_construct_agg routes to native") {
val df = spark.sql(
"SELECT bitmap_construct_agg(bitmap_bit_position(col)) " +
"FROM values (1L), (2L), (3L) AS t(col)")
df.collect()
assert(
collectWithSubqueries(df.queryExecution.executedPlan) {
case h: HashAggregateExecBaseTransformer => h
}.nonEmpty,
"Expected native HashAggregateExecBaseTransformer in plan"
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,10 @@ class VeloxTestSettings extends BackendTestSettings {
"INCONSISTENT_BEHAVIOR_CROSS_VERSION: compatibility with Spark 2.4/3.2 in reading/writing dates")
// Doesn't support unhex with failOnError=true.
.exclude("CONVERSION_INVALID_INPUT: to_binary conversion function hex")
// bitmap_construct_agg offloaded to Velox throws GlutenException instead of
// SparkArrayIndexOutOfBoundsException.
.exclude("INVALID_BITMAP_POSITION: position out of bounds")
.exclude("INVALID_BITMAP_POSITION: negative position")
enableSuite[GlutenQueryParsingErrorsSuite]
enableSuite[GlutenQueryContextSuite]
enableSuite[GlutenQueryExecutionAnsiErrorsSuite]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,25 @@
*/
package org.apache.spark.sql

import org.apache.gluten.execution.HashAggregateExecBaseTransformer

import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper

class GlutenBitmapExpressionsQuerySuite
extends BitmapExpressionsQuerySuite
with GlutenSQLTestsTrait {}
with GlutenSQLTestsTrait
with AdaptiveSparkPlanHelper {

test("bitmap_construct_agg routes to native") {
val df = spark.sql(
"SELECT bitmap_construct_agg(bitmap_bit_position(col)) " +
"FROM values (1L), (2L), (3L) AS t(col)")
df.collect()
assert(
collectWithSubqueries(df.queryExecution.executedPlan) {
case h: HashAggregateExecBaseTransformer => h
}.nonEmpty,
"Expected native HashAggregateExecBaseTransformer in plan"
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ object ExpressionNames {
final val COLLECT_LIST = "collect_list"
final val COLLECT_SET = "collect_set"
final val BLOOM_FILTER_AGG = "bloom_filter_agg"
final val BITMAP_CONSTRUCT_AGG = "bitmap_construct_agg"
final val VAR_SAMP = "var_samp"
final val VAR_POP = "var_pop"
final val BIT_AND_AGG = "bit_and"
Expand Down
Loading