Skip to content

Commit 7f2742b

Browse files
authored
[AURON #1956] Add initial compatibility support for Spark 4.1 (UT/CI Pass) (#1958)
# Which issue does this PR close? Closes #1956 # Rationale for this change This PR prioritizes Spark 4.1 as the first supported Spark 4.x version to accelerate the Spark 4 compatibility initiative. The implementation is designed for extensibility, enabling easy addition of Spark 4.0 support later if needed. This balances rapid adoption of the latest stable Spark 4.x release with flexibility for other 4.x versions without major rework. # What changes are included in this PR? ## Spark 4 API Compatibility ### Servlet API Migration Updated `AuronAllExecutionsPage.scala` to support both `javax.servlet.http.HttpServletRequest` (Spark 3.x) and `jakarta.servlet.http.HttpServletRequest` (Spark 4.x) via version-specific `@sparkver` annotations, adapting to Spark 4's migration to Jakarta EE Servlet API. ### Shuffle API Changes Adapted shuffle components to address Spark 4.x's `ShuffleWriteProcessor.write` API refinement ([SPARK-44605](https://github.com/apache/spark/pull/42234/changes)), which triggers early execution of shuffle writers and breaks alignment with Spark 3.x execution logic: - Enhanced `AuronShuffleDependency` with a version-specific `getInputRdd` method (returns `null` for Spark 3.x, returns `_rdd` for Spark 4.x) — the exposed `inputRdd` field serializes the transient `_rdd`, allowing the `_rdd` to be retrieved on `Executor` in Spark 4.1's `ShuffleWriteProcessor.write` method. - Returned `Iterator.empty` in `NativeRDD.compute()` for `NativeRDD.ShuffleWrite` to defer execution to the `ShuffleWriteProcessor.write()` method, aligning with Spark 3.x execution logic. - Added a Spark 4.1-specific override of `ShuffleWriteProcessor.write` (which now takes `Iterator[_]` as its first parameter in Spark 4.x) in `NativeShuffleExchangeExec`: it asserts the input iterator is empty (validating adaptation logic), retrieves the RDD via `AuronShuffleDependency.inputRdd`, and reuses core shuffle logic through `internalWrite` to maintain consistency across Spark 3.x/4.x. ### SparkSession Package Path Change Addressed Spark 4.x's SparkSession package restructure: - Spark 3.x: org.apache.spark.sql.SparkSession → Spark 4.x: org.apache.spark.sql.classic.SparkSession - Updated references in NativeParquetInsertIntoHiveTableExec.scala and NativeBroadcastExchangeBase.scala ### New Data Types Added stubs for Spark 4.x's new `GeographyVal`/`GeometryVal`/`VariantVal` data types in columnar data structures (`AuronColumnarArray.scala`, `AuronColumnarStruct.scala`, `AuronColumnarBatchRow.scala`). These stubs throw `UnsupportedOperationException` to resolve compilation errors. # Are there any user-facing changes? # How was this patch tested? - [x] Enabled Spark 4.1 in CI pipeline - [x] Passed all existing Unit Tests (UT) - [x] Passed all TPC-DS Integration Tests (IT)
1 parent c2d5869 commit 7f2742b

49 files changed

Lines changed: 490 additions & 113 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/tpcds-reusable.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ jobs:
226226
if: steps.cache-spark-bin.outputs.cache-hit != 'true'
227227
run: |
228228
SPARK_PATH="spark/spark-${{ steps.get-dependency-version.outputs.sparkversion }}"
229-
if [ ${{ inputs.scalaver }} = "2.13" ]; then
229+
if [ ${{ inputs.scalaver }} = "2.13" && "${{ inputs.sparkver }}" != "spark-4.1" ]; then
230230
SPARK_FILE="spark-${{ steps.get-dependency-version.outputs.sparkversion }}-bin-${{ inputs.hadoop-profile }}-scala${{ inputs.scalaver }}.tgz"
231231
else
232232
SPARK_FILE="spark-${{ steps.get-dependency-version.outputs.sparkversion }}-bin-${{ inputs.hadoop-profile }}.tgz"

.github/workflows/tpcds.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,3 +87,12 @@ jobs:
8787
javaver: '21'
8888
scalaver: '2.13'
8989
hadoop-profile: 'hadoop3'
90+
91+
test-spark-41-jdk21-scala-2-13:
92+
name: Test spark-4.1 JDK21 Scala-2.13
93+
uses: ./.github/workflows/tpcds-reusable.yml
94+
with:
95+
sparkver: spark-4.1
96+
javaver: '21'
97+
scalaver: '2.13'
98+
hadoop-profile: 'hadoop3'

auron-build.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
# Define constants for supported component versions
3131
# -----------------------------------------------------------------------------
3232
SUPPORTED_OS_IMAGES=("centos7" "ubuntu24" "rockylinux8" "debian11" "azurelinux3")
33-
SUPPORTED_SPARK_VERSIONS=("3.0" "3.1" "3.2" "3.3" "3.4" "3.5")
33+
SUPPORTED_SPARK_VERSIONS=("3.0" "3.1" "3.2" "3.3" "3.4" "3.5" "4.1")
3434
SUPPORTED_SCALA_VERSIONS=("2.12" "2.13")
3535
SUPPORTED_CELEBORN_VERSIONS=("0.5" "0.6")
3636
# Currently only one supported version, but kept plural for consistency

auron-spark-ui/pom.xml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,19 @@
3636
<artifactId>spark-sql_${scalaVersion}</artifactId>
3737
<scope>provided</scope>
3838
</dependency>
39+
<!-- Required for XML processing (Scala 2.12+ split module, compatible with Spark 4.x) -->
40+
<dependency>
41+
<groupId>org.scala-lang.modules</groupId>
42+
<artifactId>scala-xml_${scalaVersion}</artifactId>
43+
<version>${scala-xml.version}</version>
44+
<scope>provided</scope>
45+
</dependency>
46+
<dependency>
47+
<groupId>org.apache.auron</groupId>
48+
<artifactId>spark-version-annotation-macros_${scalaVersion}</artifactId>
49+
<version>${project.version}</version>
50+
<scope>compile</scope>
51+
</dependency>
3952
</dependencies>
4053
<build>
4154
<plugins>

auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronAllExecutionsPage.scala

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,44 @@
1616
*/
1717
package org.apache.spark.sql.execution.ui
1818

19-
import javax.servlet.http.HttpServletRequest
20-
2119
import scala.xml.{Node, NodeSeq}
2220

2321
import org.apache.spark.internal.Logging
2422
import org.apache.spark.ui.{UIUtils, WebUIPage}
2523

24+
import org.apache.auron.sparkver
25+
2626
private[ui] class AuronAllExecutionsPage(parent: AuronSQLTab) extends WebUIPage("") with Logging {
2727

2828
private val sqlStore = parent.sqlStore
2929

30-
override def render(request: HttpServletRequest): Seq[Node] = {
30+
@sparkver("3.0 / 3.1 / 3.2 / 3.3 / 3.4 / 3.5")
31+
override def render(request: javax.servlet.http.HttpServletRequest): Seq[Node] = {
32+
val buildInfo = sqlStore.buildInfo()
33+
val infos =
34+
UIUtils.listingTable(propertyHeader, propertyRow, buildInfo.info, fixedWidth = true)
35+
val summary: NodeSeq =
36+
<div>
37+
<div>
38+
<span class="collapse-sql-properties collapse-table"
39+
onClick="collapseTable('collapse-sql-properties', 'sql-properties')">
40+
<h4>
41+
<span class="collapse-table-arrow arrow-open"></span>
42+
<a>Auron Build Information</a>
43+
</h4>
44+
</span>
45+
<div class="sql-properties collapsible-table">
46+
{infos}
47+
</div>
48+
</div>
49+
<br/>
50+
</div>
51+
52+
UIUtils.headerSparkPage(request, "Auron", summary, parent)
53+
}
54+
55+
@sparkver("4.1")
56+
override def render(request: jakarta.servlet.http.HttpServletRequest): Seq[Node] = {
3157
val buildInfo = sqlStore.buildInfo()
3258
val infos =
3359
UIUtils.listingTable(propertyHeader, propertyRow, buildInfo.info, fixedWidth = true)

dev/auron-it/pom.xml

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,45 @@
331331
</properties>
332332
</profile>
333333

334+
<profile>
335+
<id>spark-4.1</id>
336+
<properties>
337+
<shimName>spark-4.1</shimName>
338+
<sparkVersion>4.1.1</sparkVersion>
339+
</properties>
340+
<build>
341+
<plugins>
342+
<plugin>
343+
<groupId>org.apache.maven.plugins</groupId>
344+
<artifactId>maven-enforcer-plugin</artifactId>
345+
<version>${maven-enforcer-plugin.version}</version>
346+
<executions>
347+
<execution>
348+
<id>spark41-enforce-java-scala-version</id>
349+
<goals>
350+
<goal>enforce</goal>
351+
</goals>
352+
<configuration>
353+
<rules>
354+
<!-- Spark 4.1 requires JDK 17+ and Scala 2.13.x -->
355+
<requireJavaVersion>
356+
<version>[17,)</version>
357+
<message>Spark 4.1 requires JDK 17 or higher. Current: ${java.version}</message>
358+
</requireJavaVersion>
359+
<requireProperty>
360+
<property>scalaLongVersion</property>
361+
<regex>2\.13\.\d+</regex>
362+
<regexMessage>Spark 4.1 requires Scala 2.13.x. Current: ${scalaLongVersion}</regexMessage>
363+
</requireProperty>
364+
</rules>
365+
</configuration>
366+
</execution>
367+
</executions>
368+
</plugin>
369+
</plugins>
370+
</build>
371+
</profile>
372+
334373
<profile>
335374
<id>scala-2.12</id>
336375
<properties>
@@ -343,7 +382,7 @@
343382
<id>scala-2.13</id>
344383
<properties>
345384
<scalaVersion>2.13</scalaVersion>
346-
<scalaLongVersion>2.13.13</scalaLongVersion>
385+
<scalaLongVersion>2.13.17</scalaLongVersion>
347386
</properties>
348387
</profile>
349388
</profiles>

dev/auron-it/src/main/scala/org/apache/auron/integration/Main.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,8 @@ object Main {
114114
|Spark Version: ${Shims.get.shimVersion}
115115
|Data: ${args.dataLocation}
116116
|Queries: [${args.queryFilter.mkString(", ")}] (${if (args.queryFilter.isEmpty)
117-
"all"
118-
else args.queryFilter.length} queries)
117+
"all"
118+
else args.queryFilter.length} queries)
119119
|Extra Spark Conf: ${args.extraSparkConf}""".stripMargin)
120120

121121
if (args.auronOnly) println("Mode: Auron-only (skip baseline)")

dev/auron-it/src/main/scala/org/apache/auron/integration/SessionManager.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ class SessionManager(val extraSparkConf: Map[String, String]) {
3535
private lazy val commonConf: Map[String, String] = Map(
3636
"spark.master" -> resolveMaster(),
3737
"spark.sql.shuffle.partitions" -> "100",
38+
"spark.sql.unionOutputPartitioning" -> "false",
3839
"spark.ui.enabled" -> "false",
3940
"spark.sql.sources.useV1SourceList" -> "parquet",
4041
"spark.sql.autoBroadcastJoinThreshold" -> "-1")

pom.xml

Lines changed: 59 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,11 @@
5454
<protobufVersion>3.25.5</protobufVersion>
5555
<nettyVersion>4.2.7.Final</nettyVersion>
5656
<javaVersion>8</javaVersion>
57+
<maven.compiler.source>${javaVersion}</maven.compiler.source>
58+
<maven.compiler.target>${javaVersion}</maven.compiler.target>
5759
<scalaVersion>2.12</scalaVersion>
5860
<scalaLongVersion>2.12.18</scalaLongVersion>
61+
<scala-xml.version>2.1.0</scala-xml.version>
5962
<scalaJava8CompatVersion>1.0.2</scalaJava8CompatVersion>
6063
<maven.version>3.9.12</maven.version>
6164
<maven.plugin.scala.version>4.9.2</maven.plugin.scala.version>
@@ -383,6 +386,9 @@
383386
<arg>-feature</arg>
384387
<arg>-Ywarn-unused</arg>
385388
<arg>-Xfatal-warnings</arg>
389+
390+
<arg>-Wconf:msg=method newInstance in class Class is deprecated:s</arg>
391+
<arg>-Wconf:msg=class ThreadDeath in package lang is deprecated:s</arg>
386392
</args>
387393
</configuration>
388394
<dependencies>
@@ -465,6 +471,8 @@
465471
<version>${maven.plugin.surefire.version}</version>
466472
<!-- Note config is repeated in scalatest config -->
467473
<configuration>
474+
<forkCount>1</forkCount>
475+
<reuseForks>false</reuseForks>
468476
<skipTests>false</skipTests>
469477
<failIfNoSpecifiedTests>false</failIfNoSpecifiedTests>
470478
<argLine>${extraJavaTestArgs}</argLine>
@@ -486,6 +494,7 @@
486494
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
487495
<junitxml>.</junitxml>
488496
<filereports>TestSuite.txt</filereports>
497+
<forkMode>once</forkMode>
489498
<argLine>${extraJavaTestArgs}</argLine>
490499
<environmentVariables />
491500
<systemProperties>
@@ -827,6 +836,48 @@
827836
</properties>
828837
</profile>
829838

839+
<profile>
840+
<id>spark-4.1</id>
841+
<properties>
842+
<shimName>spark-4.1</shimName>
843+
<scalaTestVersion>3.2.9</scalaTestVersion>
844+
<sparkVersion>4.1.1</sparkVersion>
845+
<shortSparkVersion>4.1</shortSparkVersion>
846+
<nettyVersion>4.1.118.Final</nettyVersion>
847+
</properties>
848+
<build>
849+
<plugins>
850+
<plugin>
851+
<groupId>org.apache.maven.plugins</groupId>
852+
<artifactId>maven-enforcer-plugin</artifactId>
853+
<version>${maven-enforcer-plugin.version}</version>
854+
<executions>
855+
<execution>
856+
<id>spark41-enforce-java-scala-version</id>
857+
<goals>
858+
<goal>enforce</goal>
859+
</goals>
860+
<configuration>
861+
<rules>
862+
<!-- Spark 4.1 requires JDK 17+ and Scala 2.13.x -->
863+
<requireJavaVersion>
864+
<version>[17,)</version>
865+
<message>Spark 4.1 requires JDK 17 or higher. Current: ${java.version}</message>
866+
</requireJavaVersion>
867+
<requireProperty>
868+
<property>scalaLongVersion</property>
869+
<regex>2\.13\.\d+</regex>
870+
<regexMessage>Spark 4.1 requires Scala 2.13.x. Current: ${scalaLongVersion}</regexMessage>
871+
</requireProperty>
872+
</rules>
873+
</configuration>
874+
</execution>
875+
</executions>
876+
</plugin>
877+
</plugins>
878+
</build>
879+
</profile>
880+
830881
<profile>
831882
<id>jdk-8</id>
832883
<activation>
@@ -835,7 +886,7 @@
835886
<properties>
836887
<javaVersion>8</javaVersion>
837888
<spotless.plugin.version>2.30.0</spotless.plugin.version>
838-
<semanticdb.version>4.8.8</semanticdb.version>
889+
<semanticdb.version>4.14.5</semanticdb.version>
839890
<scalafmtVersion>3.0.0</scalafmtVersion>
840891
</properties>
841892
</profile>
@@ -848,7 +899,7 @@
848899
<properties>
849900
<javaVersion>11</javaVersion>
850901
<spotless.plugin.version>2.30.0</spotless.plugin.version>
851-
<semanticdb.version>4.8.8</semanticdb.version>
902+
<semanticdb.version>4.14.5</semanticdb.version>
852903
<scalafmtVersion>3.0.0</scalafmtVersion>
853904
</properties>
854905
</profile>
@@ -861,7 +912,7 @@
861912
<properties>
862913
<javaVersion>17</javaVersion>
863914
<spotless.plugin.version>2.45.0</spotless.plugin.version>
864-
<semanticdb.version>4.9.9</semanticdb.version>
915+
<semanticdb.version>4.14.5</semanticdb.version>
865916
<scalafmtVersion>3.9.9</scalafmtVersion>
866917
</properties>
867918
</profile>
@@ -874,7 +925,7 @@
874925
<properties>
875926
<javaVersion>21</javaVersion>
876927
<spotless.plugin.version>2.45.0</spotless.plugin.version>
877-
<semanticdb.version>4.9.9</semanticdb.version>
928+
<semanticdb.version>4.14.5</semanticdb.version>
878929
<scalafmtVersion>3.9.9</scalafmtVersion>
879930
</properties>
880931
</profile>
@@ -921,7 +972,7 @@
921972
</activation>
922973
<properties>
923974
<scalaVersion>2.13</scalaVersion>
924-
<scalaLongVersion>2.13.13</scalaLongVersion>
975+
<scalaLongVersion>2.13.17</scalaLongVersion>
925976
</properties>
926977
<build>
927978
<plugins>
@@ -944,11 +995,14 @@
944995
<arg>-Wconf:msg=^(?=.*?method|value|type|object|trait|inheritance)(?=.*?deprecated)(?=.*?since 2.13).+$:s</arg>
945996
<arg>-Wconf:msg=Auto-application to \`\(\)\` is deprecated:s</arg>
946997
<arg>-Wconf:msg=object JavaConverters in package collection is deprecated:s</arg>
998+
<arg>-Wconf:msg=method newInstance in class Class is deprecated:s</arg>
999+
<arg>-Wconf:msg=class ThreadDeath in package lang is deprecated:s</arg>
9471000
<arg>-Wconf:cat=unchecked&amp;msg=outer reference:s</arg>
9481001
<arg>-Wconf:cat=unchecked&amp;msg=eliminated by erasure:s</arg>
9491002
<arg>-Wconf:cat=unused-nowarn:s</arg>
9501003
<arg>-Wconf:msg=early initializers are deprecated:s</arg>
9511004
<arg>-Wconf:cat=other-match-analysis:s</arg>
1005+
<arg>-Wconf:cat=feature-existentials:s</arg>
9521006
</args>
9531007
<compilerPlugins>
9541008
<compilerPlugin>

spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/InterceptedValidateSparkPlan.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.auron.sparkver
2525

2626
object InterceptedValidateSparkPlan extends Logging {
2727

28-
@sparkver("3.2 / 3.3 / 3.4 / 3.5")
28+
@sparkver("3.2 / 3.3 / 3.4 / 3.5 / 4.1")
2929
def validate(plan: SparkPlan): Unit = {
3030
import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
3131
import org.apache.spark.sql.execution.auron.plan.NativeRenameColumnsBase
@@ -79,7 +79,7 @@ object InterceptedValidateSparkPlan extends Logging {
7979
throw new UnsupportedOperationException("validate is not supported in spark 3.0.3 or 3.1.3")
8080
}
8181

82-
@sparkver("3.2 / 3.3 / 3.4 / 3.5")
82+
@sparkver("3.2 / 3.3 / 3.4 / 3.5 / 4.1")
8383
private def errorOnInvalidBroadcastQueryStage(plan: SparkPlan): Unit = {
8484
import org.apache.spark.sql.execution.adaptive.InvalidAQEPlanException
8585
throw InvalidAQEPlanException("Invalid broadcast query stage", plan)

0 commit comments

Comments
 (0)