Skip to content

Commit 59e836b

Browse files
authored
[GH-2659] Fix file-based readers on Databricks by using reflection for DataSource.checkAndGlobPathIfNecessary (#2660)
1 parent 1e6303e commit 59e836b

1 file changed

Lines changed: 58 additions & 1 deletion

File tree

spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/SedonaFileIndexHelper.scala

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.apache.spark.sql.execution.datasources
2020

21+
import org.apache.hadoop.conf.Configuration
22+
import org.apache.hadoop.fs.Path
2123
import org.apache.spark.sql.SparkSession
2224
import org.apache.spark.sql.types.StructType
2325
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -36,6 +38,61 @@ import scala.collection.JavaConverters._
3638
*/
3739
object SedonaFileIndexHelper {
3840

41+
/**
42+
* Cached reflective reference to [[DataSource.checkAndGlobPathIfNecessary]].
43+
*
44+
* <p>We call this method via reflection to avoid binary incompatibility between OSS Apache
45+
* Spark and Databricks Runtime. On OSS Spark (3.5, 4.0, 4.1, etc.) this method has default
46+
* parameter values, but on Databricks Runtime (both Spark 3.5 and 4.0) the same method has all
47+
* required parameters with no defaults, and may also differ in parameter count.
48+
*
49+
* <p>Direct calls with named/default parameters cause the Scala compiler to generate synthetic
50+
* {@code $default$N()} accessor methods in the bytecode. When these accessors do not exist at
51+
* runtime (as is the case on Databricks), a {@link NoSuchMethodError} is thrown. Reflection
52+
* avoids this by resolving the method at runtime.
53+
*/
54+
private lazy val checkAndGlobMethod: java.lang.reflect.Method = {
55+
DataSource.getClass.getMethods
56+
.filter(_.getName == "checkAndGlobPathIfNecessary")
57+
.headOption
58+
.getOrElse(
59+
throw new NoSuchMethodException("DataSource.checkAndGlobPathIfNecessary not found"))
60+
}
61+
62+
private def checkAndGlobPathIfNecessary(
63+
paths: Seq[String],
64+
hadoopConf: Configuration,
65+
checkEmptyGlobPath: Boolean,
66+
checkFilesExist: Boolean,
67+
enableGlobbing: Boolean): Seq[Path] = {
68+
val method = checkAndGlobMethod
69+
val args: Array[AnyRef] = method.getParameterCount match {
70+
case 6 =>
71+
// OSS Apache Spark (3.x and 4.x): 6 parameters with numThreads at position 5
72+
Array(
73+
paths,
74+
hadoopConf,
75+
java.lang.Boolean.valueOf(checkEmptyGlobPath),
76+
java.lang.Boolean.valueOf(checkFilesExist),
77+
Integer.valueOf(40),
78+
java.lang.Boolean.valueOf(enableGlobbing))
79+
case _ =>
80+
// Databricks Runtime: 5 parameters (no numThreads)
81+
Array(
82+
paths,
83+
hadoopConf,
84+
java.lang.Boolean.valueOf(checkEmptyGlobPath),
85+
java.lang.Boolean.valueOf(checkFilesExist),
86+
java.lang.Boolean.valueOf(enableGlobbing))
87+
}
88+
try {
89+
method.invoke(DataSource, args: _*).asInstanceOf[Seq[Path]]
90+
} catch {
91+
case e: java.lang.reflect.InvocationTargetException =>
92+
throw e.getCause
93+
}
94+
}
95+
3996
/**
4097
* Build an [[InMemoryFileIndex]] for the given paths, resolving globs if necessary, without the
4198
* streaming metadata directory check.
@@ -49,7 +106,7 @@ object SedonaFileIndexHelper {
49106
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
50107
val globPathsEnabled =
51108
Option(options.get("globPaths")).map(v => java.lang.Boolean.parseBoolean(v)).getOrElse(true)
52-
val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(
109+
val rootPathsSpecified = checkAndGlobPathIfNecessary(
53110
paths,
54111
hadoopConf,
55112
checkEmptyGlobPath = true,

0 commit comments

Comments
 (0)