Skip to content

Commit 1e6303e

Browse files
authored
[GH-2650] Fix warning message when reading shapefiles from S3 (#2655)
1 parent 0e52724 commit 1e6303e

17 files changed

Lines changed: 318 additions & 12 deletions

File tree

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.spark.sql.execution.datasources
20+
21+
import org.apache.spark.sql.SparkSession
22+
import org.apache.spark.sql.types.StructType
23+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
24+
25+
import scala.collection.JavaConverters._
26+
27+
/**
28+
* Helper for creating a [[PartitioningAwareFileIndex]] without going through the
29+
* [[org.apache.spark.sql.execution.streaming.FileStreamSink.hasMetadata]] check in
30+
* [[org.apache.spark.sql.execution.datasources.v2.FileTable.fileIndex]].
31+
*
32+
* <p>The streaming metadata check can produce spurious [[java.io.FileNotFoundException]] warnings
33+
* when reading from cloud storage (e.g., S3) because it attempts to stat the path as a directory.
34+
* For non-streaming, read-only file tables such as Shapefile and GeoPackage, this check is
35+
* unnecessary and can be safely bypassed.
36+
*/
37+
object SedonaFileIndexHelper {
38+
39+
/**
40+
* Build an [[InMemoryFileIndex]] for the given paths, resolving globs if necessary, without the
41+
* streaming metadata directory check.
42+
*/
43+
def createFileIndex(
44+
sparkSession: SparkSession,
45+
options: CaseInsensitiveStringMap,
46+
paths: Seq[String],
47+
userSpecifiedSchema: Option[StructType]): PartitioningAwareFileIndex = {
48+
val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
49+
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
50+
val globPathsEnabled =
51+
Option(options.get("globPaths")).map(v => java.lang.Boolean.parseBoolean(v)).getOrElse(true)
52+
val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(
53+
paths,
54+
hadoopConf,
55+
checkEmptyGlobPath = true,
56+
checkFilesExist = true,
57+
enableGlobbing = globPathsEnabled)
58+
val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
59+
new InMemoryFileIndex(
60+
sparkSession,
61+
rootPathsSpecified,
62+
caseSensitiveMap,
63+
userSpecifiedSchema,
64+
fileStatusCache)
65+
}
66+
}

spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.sql.SparkSession
2525
import org.apache.spark.sql.connector.catalog.{MetadataColumn, SupportsMetadataColumns}
2626
import org.apache.spark.sql.connector.read.ScanBuilder
2727
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
28-
import org.apache.spark.sql.execution.datasources.FileFormat
28+
import org.apache.spark.sql.execution.datasources.{FileFormat, PartitioningAwareFileIndex, SedonaFileIndexHelper}
2929
import org.apache.spark.sql.execution.datasources.v2.FileTable
3030
import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampType}
3131
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -44,6 +44,13 @@ case class GeoPackageTable(
4444
extends FileTable(sparkSession, options, paths, userSpecifiedSchema)
4545
with SupportsMetadataColumns {
4646

47+
// Override fileIndex to skip the FileStreamSink.hasMetadata check that causes
48+
// spurious FileNotFoundException warnings when reading from cloud storage (e.g., S3).
49+
// GeoPackage tables are always non-streaming batch sources, so the streaming
50+
// metadata check is unnecessary.
51+
override lazy val fileIndex: PartitioningAwareFileIndex =
52+
SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, userSpecifiedSchema)
53+
4754
override def inferSchema(files: Seq[FileStatus]): Option[StructType] = {
4855
if (loadOptions.showMetadata) {
4956
return MetadataSchema.schema

spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.sql.SparkSession
2424
import org.apache.spark.sql.connector.catalog.{MetadataColumn, SupportsMetadataColumns, TableCapability}
2525
import org.apache.spark.sql.connector.read.ScanBuilder
2626
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
27-
import org.apache.spark.sql.execution.datasources.FileFormat
27+
import org.apache.spark.sql.execution.datasources.{FileFormat, PartitioningAwareFileIndex, SedonaFileIndexHelper}
2828
import org.apache.sedona.sql.datasources.shapefile.ShapefileUtils.{baseSchema, fieldDescriptorsToSchema, mergeSchemas}
2929
import org.apache.spark.sql.execution.datasources.v2.FileTable
3030
import org.apache.spark.sql.types.{DataType, LongType, StringType, StructField, StructType, TimestampType}
@@ -52,6 +52,13 @@ case class ShapefileTable(
5252
extends FileTable(sparkSession, options, paths, userSpecifiedSchema)
5353
with SupportsMetadataColumns {
5454

55+
// Override fileIndex to skip the FileStreamSink.hasMetadata check that causes
56+
// spurious FileNotFoundException warnings when reading from cloud storage (e.g., S3).
57+
// Shapefile tables are always non-streaming batch sources, so the streaming
58+
// metadata check is unnecessary.
59+
override lazy val fileIndex: PartitioningAwareFileIndex =
60+
SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, userSpecifiedSchema)
61+
5562
override def formatName: String = "Shapefile"
5663

5764
override def capabilities: java.util.Set[TableCapability] =

spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.spark.sql.SparkSession
2323
import org.apache.spark.sql.connector.catalog.TableCapability
2424
import org.apache.spark.sql.connector.read.ScanBuilder
2525
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
26-
import org.apache.spark.sql.execution.datasources.FileFormat
26+
import org.apache.spark.sql.execution.datasources.{FileFormat, PartitioningAwareFileIndex, SedonaFileIndexHelper}
2727
import org.apache.spark.sql.execution.datasources.v2.FileTable
2828
import org.apache.spark.sql.types._
2929
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -36,6 +36,14 @@ case class GeoParquetMetadataTable(
3636
userSpecifiedSchema: Option[StructType],
3737
fallbackFileFormat: Class[_ <: FileFormat])
3838
extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
39+
40+
// Override fileIndex to skip the FileStreamSink.hasMetadata check that causes
41+
// spurious FileNotFoundException warnings when reading from cloud storage (e.g., S3).
42+
// GeoParquet metadata tables are always non-streaming batch sources, so the streaming
43+
// metadata check is unnecessary.
44+
override lazy val fileIndex: PartitioningAwareFileIndex =
45+
SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, userSpecifiedSchema)
46+
3947
override def formatName: String = "GeoParquet Metadata"
4048

4149
override def inferSchema(files: Seq[FileStatus]): Option[StructType] =

spark/spark-3.4/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
package org.apache.sedona.sql
2020

2121
import org.apache.commons.io.FileUtils
22+
import org.apache.log4j.{AppenderSkeleton, Level, Logger}
23+
import org.apache.log4j.spi.LoggingEvent
2224
import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
2325
import org.apache.spark.sql.types.{DateType, DecimalType, LongType, StringType, StructField, StructType, TimestampType}
2426
import org.locationtech.jts.geom.{Geometry, MultiPolygon, Point, Polygon}
@@ -27,6 +29,7 @@ import org.scalatest.BeforeAndAfterAll
2729

2830
import java.io.File
2931
import java.nio.file.Files
32+
import java.util.{ArrayList => JList}
3033
import scala.collection.mutable
3134

3235
class ShapefileTests extends TestBaseScala with BeforeAndAfterAll {
@@ -943,5 +946,40 @@ class ShapefileTests extends TestBaseScala with BeforeAndAfterAll {
943946
assert(r2.getLong(4) == dt2Shp.length())
944947
assert(r2.getTimestamp(5).getTime == dt2Shp.lastModified())
945948
}
949+
950+
it("reading shapefile by .shp path should not produce FileStreamSink metadata warning") {
951+
// GH-2650: When reading shapefiles by .shp path, ShapefileDataSource.transformPaths
952+
// converts it to a glob pattern (e.g., "file.???"). Without the fix, Spark's
953+
// FileTable.fileIndex calls FileStreamSink.hasMetadata which tries to stat the glob
954+
// path as a directory, causing a FileNotFoundException and a spurious WARN log:
955+
// "Assume no metadata directory. Error while looking for metadata directory..."
956+
val capturedWarnings = new JList[String]()
957+
val appender = new AppenderSkeleton {
958+
override def append(event: LoggingEvent): Unit = {
959+
val msg = event.getRenderedMessage
960+
if (msg != null && msg.contains("Assume no metadata directory")) {
961+
capturedWarnings.add(msg)
962+
}
963+
}
964+
override def close(): Unit = {}
965+
override def requiresLayout(): Boolean = false
966+
}
967+
appender.setThreshold(Level.WARN)
968+
val rootLogger = Logger.getRootLogger
969+
rootLogger.addAppender(appender)
970+
try {
971+
val df = sparkSession.read
972+
.format("shapefile")
973+
.load(resourceFolder + "shapefiles/datatypes/datatypes1.shp")
974+
df.collect()
975+
assert(
976+
capturedWarnings.isEmpty,
977+
"FileStreamSink metadata warning should not be emitted when reading shapefiles " +
978+
"by .shp path. This warning is caused by FileStreamSink.hasMetadata trying to " +
979+
"stat the glob path as a directory. Captured warnings: " + capturedWarnings)
980+
} finally {
981+
rootLogger.removeAppender(appender)
982+
}
983+
}
946984
}
947985
}

spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.sql.SparkSession
2525
import org.apache.spark.sql.connector.catalog.{MetadataColumn, SupportsMetadataColumns}
2626
import org.apache.spark.sql.connector.read.ScanBuilder
2727
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
28-
import org.apache.spark.sql.execution.datasources.FileFormat
28+
import org.apache.spark.sql.execution.datasources.{FileFormat, PartitioningAwareFileIndex, SedonaFileIndexHelper}
2929
import org.apache.spark.sql.execution.datasources.v2.FileTable
3030
import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampType}
3131
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -44,6 +44,13 @@ case class GeoPackageTable(
4444
extends FileTable(sparkSession, options, paths, userSpecifiedSchema)
4545
with SupportsMetadataColumns {
4646

47+
// Override fileIndex to skip the FileStreamSink.hasMetadata check that causes
48+
// spurious FileNotFoundException warnings when reading from cloud storage (e.g., S3).
49+
// GeoPackage tables are always non-streaming batch sources, so the streaming
50+
// metadata check is unnecessary.
51+
override lazy val fileIndex: PartitioningAwareFileIndex =
52+
SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, userSpecifiedSchema)
53+
4754
override def inferSchema(files: Seq[FileStatus]): Option[StructType] = {
4855
if (loadOptions.showMetadata) {
4956
return MetadataSchema.schema

spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.sql.SparkSession
2424
import org.apache.spark.sql.connector.catalog.{MetadataColumn, SupportsMetadataColumns, TableCapability}
2525
import org.apache.spark.sql.connector.read.ScanBuilder
2626
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
27-
import org.apache.spark.sql.execution.datasources.FileFormat
27+
import org.apache.spark.sql.execution.datasources.{FileFormat, PartitioningAwareFileIndex, SedonaFileIndexHelper}
2828
import org.apache.sedona.sql.datasources.shapefile.ShapefileUtils.{baseSchema, fieldDescriptorsToSchema, mergeSchemas}
2929
import org.apache.spark.sql.execution.datasources.v2.FileTable
3030
import org.apache.spark.sql.types.{DataType, LongType, StringType, StructField, StructType, TimestampType}
@@ -52,6 +52,13 @@ case class ShapefileTable(
5252
extends FileTable(sparkSession, options, paths, userSpecifiedSchema)
5353
with SupportsMetadataColumns {
5454

55+
// Override fileIndex to skip the FileStreamSink.hasMetadata check that causes
56+
// spurious FileNotFoundException warnings when reading from cloud storage (e.g., S3).
57+
// Shapefile tables are always non-streaming batch sources, so the streaming
58+
// metadata check is unnecessary.
59+
override lazy val fileIndex: PartitioningAwareFileIndex =
60+
SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, userSpecifiedSchema)
61+
5562
override def formatName: String = "Shapefile"
5663

5764
override def capabilities: java.util.Set[TableCapability] =

spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.spark.sql.SparkSession
2323
import org.apache.spark.sql.connector.catalog.TableCapability
2424
import org.apache.spark.sql.connector.read.ScanBuilder
2525
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
26-
import org.apache.spark.sql.execution.datasources.FileFormat
26+
import org.apache.spark.sql.execution.datasources.{FileFormat, PartitioningAwareFileIndex, SedonaFileIndexHelper}
2727
import org.apache.spark.sql.execution.datasources.v2.FileTable
2828
import org.apache.spark.sql.types._
2929
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -36,6 +36,14 @@ case class GeoParquetMetadataTable(
3636
userSpecifiedSchema: Option[StructType],
3737
fallbackFileFormat: Class[_ <: FileFormat])
3838
extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
39+
40+
// Override fileIndex to skip the FileStreamSink.hasMetadata check that causes
41+
// spurious FileNotFoundException warnings when reading from cloud storage (e.g., S3).
42+
// GeoParquet metadata tables are always non-streaming batch sources, so the streaming
43+
// metadata check is unnecessary.
44+
override lazy val fileIndex: PartitioningAwareFileIndex =
45+
SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, userSpecifiedSchema)
46+
3947
override def formatName: String = "GeoParquet Metadata"
4048

4149
override def inferSchema(files: Seq[FileStatus]): Option[StructType] =

spark/spark-3.5/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
package org.apache.sedona.sql
2020

2121
import org.apache.commons.io.FileUtils
22+
import org.apache.log4j.{AppenderSkeleton, Level, Logger}
23+
import org.apache.log4j.spi.LoggingEvent
2224
import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
2325
import org.apache.spark.sql.types.{DateType, DecimalType, LongType, StringType, StructField, StructType, TimestampType}
2426
import org.locationtech.jts.geom.{Geometry, MultiPolygon, Point, Polygon}
@@ -27,6 +29,7 @@ import org.scalatest.BeforeAndAfterAll
2729

2830
import java.io.File
2931
import java.nio.file.Files
32+
import java.util.{ArrayList => JList}
3033
import scala.collection.mutable
3134

3235
class ShapefileTests extends TestBaseScala with BeforeAndAfterAll {
@@ -955,5 +958,40 @@ class ShapefileTests extends TestBaseScala with BeforeAndAfterAll {
955958
assert(r2.getLong(4) == dt2Shp.length())
956959
assert(r2.getTimestamp(5).getTime == dt2Shp.lastModified())
957960
}
961+
962+
it("reading shapefile by .shp path should not produce FileStreamSink metadata warning") {
963+
// GH-2650: When reading shapefiles by .shp path, ShapefileDataSource.transformPaths
964+
// converts it to a glob pattern (e.g., "file.???"). Without the fix, Spark's
965+
// FileTable.fileIndex calls FileStreamSink.hasMetadata which tries to stat the glob
966+
// path as a directory, causing a FileNotFoundException and a spurious WARN log:
967+
// "Assume no metadata directory. Error while looking for metadata directory..."
968+
val capturedWarnings = new JList[String]()
969+
val appender = new AppenderSkeleton {
970+
override def append(event: LoggingEvent): Unit = {
971+
val msg = event.getRenderedMessage
972+
if (msg != null && msg.contains("Assume no metadata directory")) {
973+
capturedWarnings.add(msg)
974+
}
975+
}
976+
override def close(): Unit = {}
977+
override def requiresLayout(): Boolean = false
978+
}
979+
appender.setThreshold(Level.WARN)
980+
val rootLogger = Logger.getRootLogger
981+
rootLogger.addAppender(appender)
982+
try {
983+
val df = sparkSession.read
984+
.format("shapefile")
985+
.load(resourceFolder + "shapefiles/datatypes/datatypes1.shp")
986+
df.collect()
987+
assert(
988+
capturedWarnings.isEmpty,
989+
"FileStreamSink metadata warning should not be emitted when reading shapefiles " +
990+
"by .shp path. This warning is caused by FileStreamSink.hasMetadata trying to " +
991+
"stat the glob path as a directory. Captured warnings: " + capturedWarnings)
992+
} finally {
993+
rootLogger.removeAppender(appender)
994+
}
995+
}
958996
}
959997
}

spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.sql.SparkSession
2525
import org.apache.spark.sql.connector.catalog.{MetadataColumn, SupportsMetadataColumns}
2626
import org.apache.spark.sql.connector.read.ScanBuilder
2727
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
28-
import org.apache.spark.sql.execution.datasources.FileFormat
28+
import org.apache.spark.sql.execution.datasources.{FileFormat, PartitioningAwareFileIndex, SedonaFileIndexHelper}
2929
import org.apache.spark.sql.execution.datasources.v2.FileTable
3030
import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampType}
3131
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -44,6 +44,13 @@ case class GeoPackageTable(
4444
extends FileTable(sparkSession, options, paths, userSpecifiedSchema)
4545
with SupportsMetadataColumns {
4646

47+
// Override fileIndex to skip the FileStreamSink.hasMetadata check that causes
48+
// spurious FileNotFoundException warnings when reading from cloud storage (e.g., S3).
49+
// GeoPackage tables are always non-streaming batch sources, so the streaming
50+
// metadata check is unnecessary.
51+
override lazy val fileIndex: PartitioningAwareFileIndex =
52+
SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, userSpecifiedSchema)
53+
4754
override def inferSchema(files: Seq[FileStatus]): Option[StructType] = {
4855
if (loadOptions.showMetadata) {
4956
return MetadataSchema.schema

0 commit comments

Comments
 (0)