Skip to content

Commit e6e99f0

Browse files
authored
[GH-2651] Add _metadata hidden column support for GeoPackage DataSource V2 reader (#2654)
1 parent d72bb63 commit e6e99f0

20 files changed

Lines changed: 689 additions & 24 deletions

File tree

spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,20 @@ import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions, Ge
2525
import org.apache.spark.broadcast.Broadcast
2626
import org.apache.spark.sql.SparkSession
2727
import org.apache.spark.sql.catalyst.InternalRow
28+
import org.apache.spark.sql.catalyst.expressions.{BoundReference, JoinedRow}
29+
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
2830
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory}
2931
import org.apache.spark.sql.execution.datasources.FilePartition
3032
import org.apache.spark.sql.types.StructType
33+
import org.apache.spark.unsafe.types.UTF8String
3134
import org.apache.spark.util.SerializableConfiguration
3235

3336
case class GeoPackagePartitionReaderFactory(
3437
sparkSession: SparkSession,
3538
broadcastedConf: Broadcast[SerializableConfiguration],
3639
loadOptions: GeoPackageOptions,
37-
dataSchema: StructType)
40+
dataSchema: StructType,
41+
metadataSchema: StructType)
3842
extends PartitionReaderFactory {
3943

4044
override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
@@ -72,7 +76,7 @@ case class GeoPackagePartitionReaderFactory(
7276
case _ => None
7377
}
7478

75-
GeoPackagePartitionReader(
79+
val baseReader = GeoPackagePartitionReader(
7680
rs = rs,
7781
options = GeoPackageReadOptions(
7882
tableName = loadOptions.tableName,
@@ -84,5 +88,52 @@ case class GeoPackagePartitionReaderFactory(
8488
broadcastedConf = broadcastedConf,
8589
currentTempFile = tempFile,
8690
copying = copied)
91+
92+
if (metadataSchema.nonEmpty) {
93+
val gpkgFile = partitionFiles.head
94+
val filePath = gpkgFile.filePath.toString
95+
val fileName = new Path(filePath).getName
96+
97+
val allMetadataValues: Map[String, Any] = Map(
98+
"file_path" -> UTF8String.fromString(filePath),
99+
"file_name" -> UTF8String.fromString(fileName),
100+
"file_size" -> gpkgFile.fileSize,
101+
"file_block_start" -> gpkgFile.start,
102+
"file_block_length" -> gpkgFile.length,
103+
"file_modification_time" -> (gpkgFile.modificationTime * 1000L))
104+
105+
val innerStructType = metadataSchema.fields.head.dataType.asInstanceOf[StructType]
106+
val prunedValues = innerStructType.fields.map(f => allMetadataValues(f.name))
107+
val metadataStruct = InternalRow.fromSeq(prunedValues.toSeq)
108+
val metadataRow = InternalRow.fromSeq(Seq(metadataStruct))
109+
110+
new PartitionReaderWithMetadata(baseReader, dataSchema, metadataSchema, metadataRow)
111+
} else {
112+
baseReader
113+
}
87114
}
88115
}
116+
117+
private[geopackage] class PartitionReaderWithMetadata(
118+
reader: PartitionReader[InternalRow],
119+
baseSchema: StructType,
120+
metadataSchema: StructType,
121+
metadataValues: InternalRow)
122+
extends PartitionReader[InternalRow] {
123+
124+
private val joinedRow = new JoinedRow()
125+
private val unsafeProjection =
126+
GenerateUnsafeProjection.generate(baseSchema.fields.zipWithIndex.map { case (f, i) =>
127+
BoundReference(i, f.dataType, f.nullable)
128+
} ++ metadataSchema.fields.zipWithIndex.map { case (f, i) =>
129+
BoundReference(baseSchema.length + i, f.dataType, f.nullable)
130+
})
131+
132+
override def next(): Boolean = reader.next()
133+
134+
override def get(): InternalRow = {
135+
unsafeProjection(joinedRow(reader.get(), metadataValues))
136+
}
137+
138+
override def close(): Unit = reader.close()
139+
}

spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,14 @@ case class GeoPackageScan(
3636
fileIndex: PartitioningAwareFileIndex,
3737
readDataSchema: StructType,
3838
readPartitionSchema: StructType,
39+
metadataSchema: StructType,
3940
options: CaseInsensitiveStringMap,
4041
loadOptions: GeoPackageOptions)
4142
extends FileScan {
4243

44+
override def readSchema(): StructType =
45+
StructType(readDataSchema.fields ++ readPartitionSchema.fields ++ metadataSchema.fields)
46+
4347
override def partitionFilters: Seq[Expression] = {
4448
Seq.empty
4549
}
@@ -54,6 +58,11 @@ case class GeoPackageScan(
5458
val broadcastedConf =
5559
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
5660

57-
GeoPackagePartitionReaderFactory(sparkSession, broadcastedConf, loadOptions, dataSchema)
61+
GeoPackagePartitionReaderFactory(
62+
sparkSession,
63+
broadcastedConf,
64+
loadOptions,
65+
dataSchema,
66+
metadataSchema)
5867
}
5968
}

spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,18 @@ class GeoPackageScanBuilder(
3636
userDefinedSchema: Option[StructType] = None)
3737
extends FileScanBuilder(sparkSession, fileIndex, dataSchema) {
3838

39+
private var _requiredMetadataSchema: StructType = StructType(Seq.empty)
40+
41+
override def pruneColumns(requiredSchema: StructType): Unit = {
42+
val resolver = sparkSession.sessionState.conf.resolver
43+
val metaFields = requiredSchema.fields.filter { field =>
44+
!dataSchema.fields.exists(df => resolver(df.name, field.name)) &&
45+
!fileIndex.partitionSchema.fields.exists(pf => resolver(pf.name, field.name))
46+
}
47+
_requiredMetadataSchema = StructType(metaFields)
48+
super.pruneColumns(requiredSchema)
49+
}
50+
3951
override def build(): Scan = {
4052
val paths = fileIndex.allFiles().map(_.getPath.toString)
4153

@@ -54,6 +66,7 @@ class GeoPackageScanBuilder(
5466
fileIndexAdjusted,
5567
dataSchema,
5668
readPartitionSchema(),
69+
_requiredMetadataSchema,
5770
options,
5871
loadOptions)
5972
}

spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@ import org.apache.hadoop.fs.FileStatus
2222
import org.apache.sedona.sql.datasources.geopackage.connection.{FileSystemUtils, GeoPackageConnectionManager}
2323
import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions, MetadataSchema, TableType}
2424
import org.apache.spark.sql.SparkSession
25+
import org.apache.spark.sql.connector.catalog.{MetadataColumn, SupportsMetadataColumns}
2526
import org.apache.spark.sql.connector.read.ScanBuilder
2627
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
2728
import org.apache.spark.sql.execution.datasources.FileFormat
2829
import org.apache.spark.sql.execution.datasources.v2.FileTable
29-
import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, StructField, StructType, TimestampType}
30+
import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampType}
3031
import org.apache.spark.sql.util.CaseInsensitiveStringMap
3132
import org.apache.spark.util.SerializableConfiguration
3233

@@ -40,7 +41,8 @@ case class GeoPackageTable(
4041
userSpecifiedSchema: Option[StructType],
4142
fallbackFileFormat: Class[_ <: FileFormat],
4243
loadOptions: GeoPackageOptions)
43-
extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
44+
extends FileTable(sparkSession, options, paths, userSpecifiedSchema)
45+
with SupportsMetadataColumns {
4446

4547
override def inferSchema(files: Seq[FileStatus]): Option[StructType] = {
4648
if (loadOptions.showMetadata) {
@@ -74,6 +76,8 @@ case class GeoPackageTable(
7476
"GeoPackage"
7577
}
7678

79+
override def metadataColumns(): Array[MetadataColumn] = GeoPackageTable.fileMetadataColumns
80+
7781
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
7882
new GeoPackageScanBuilder(
7983
sparkSession,
@@ -88,3 +92,21 @@ case class GeoPackageTable(
8892
null
8993
}
9094
}
95+
96+
object GeoPackageTable {
97+
98+
private val FILE_METADATA_STRUCT_TYPE: StructType = StructType(
99+
Seq(
100+
StructField("file_path", StringType, nullable = false),
101+
StructField("file_name", StringType, nullable = false),
102+
StructField("file_size", LongType, nullable = false),
103+
StructField("file_block_start", LongType, nullable = false),
104+
StructField("file_block_length", LongType, nullable = false),
105+
StructField("file_modification_time", TimestampType, nullable = false)))
106+
107+
private[geopackage] val fileMetadataColumns: Array[MetadataColumn] = Array(new MetadataColumn {
108+
override def name: String = "_metadata"
109+
override def dataType: DataType = FILE_METADATA_STRUCT_TYPE
110+
override def isNullable: Boolean = false
111+
})
112+
}

spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
package org.apache.sedona.sql
2020

2121
import io.minio.{MakeBucketArgs, MinioClient}
22-
import org.apache.spark.sql.DataFrame
22+
import org.apache.spark.sql.{DataFrame, Row}
2323
import org.apache.spark.sql.functions.expr
2424
import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
2525
import org.apache.spark.sql.types._
@@ -321,6 +321,78 @@ class GeoPackageReaderTest extends TestBaseScala with Matchers {
321321
}
322322
}
323323

324+
describe("_metadata hidden column support") {
325+
it("should expose _metadata struct with all expected fields") {
326+
val df = readFeatureData("point1")
327+
val metaDf = df.select("_metadata")
328+
val metaSchema = metaDf.schema.fields.head.dataType.asInstanceOf[StructType]
329+
val fieldNames = metaSchema.fieldNames.toSet
330+
fieldNames should contain("file_path")
331+
fieldNames should contain("file_name")
332+
fieldNames should contain("file_size")
333+
fieldNames should contain("file_block_start")
334+
fieldNames should contain("file_block_length")
335+
fieldNames should contain("file_modification_time")
336+
}
337+
338+
it("should not include _metadata in select(*)") {
339+
val df = readFeatureData("point1")
340+
val starCols = df.select("*").columns.toSet
341+
starCols should not contain "_metadata"
342+
}
343+
344+
it("should return correct file_path and file_name in _metadata") {
345+
val df = readFeatureData("point1")
346+
val row = df.select("_metadata.file_path", "_metadata.file_name").head()
347+
val filePath = row.getString(0)
348+
val fileName = row.getString(1)
349+
filePath should endWith("example.gpkg")
350+
fileName shouldEqual "example.gpkg"
351+
}
352+
353+
it("should return actual file_size matching the .gpkg file on disk") {
354+
val df = readFeatureData("point1")
355+
val metaFileSize = df.select("_metadata.file_size").head().getLong(0)
356+
val actualFile = new java.io.File(path)
357+
metaFileSize shouldEqual actualFile.length()
358+
}
359+
360+
it("should return file_block_start=0 and file_block_length=file_size") {
361+
val df = readFeatureData("point1")
362+
val row = df
363+
.select(
364+
"_metadata.file_block_start",
365+
"_metadata.file_block_length",
366+
"_metadata.file_size")
367+
.head()
368+
row.getLong(0) shouldEqual 0L
369+
row.getLong(1) shouldEqual row.getLong(2)
370+
}
371+
372+
it("should return file_modification_time matching the .gpkg file on disk") {
373+
val df = readFeatureData("point1")
374+
val metaModTime = df.select("_metadata.file_modification_time").head().getTimestamp(0)
375+
val actualFile = new java.io.File(path)
376+
val expectedModTime = new java.sql.Timestamp(actualFile.lastModified())
377+
metaModTime shouldEqual expectedModTime
378+
}
379+
380+
it("should allow filtering on _metadata fields") {
381+
val df = readFeatureData("point1")
382+
val filtered = df.filter(df("_metadata.file_name") === "example.gpkg")
383+
filtered.count() shouldEqual df.count()
384+
val empty = df.filter(df("_metadata.file_name") === "nonexistent.gpkg")
385+
empty.count() shouldEqual 0
386+
}
387+
388+
it("should select _metadata along with data columns") {
389+
val df = readFeatureData("point1")
390+
val result = df.select("id", "_metadata.file_name").head()
391+
result.getInt(0) shouldEqual 1
392+
result.getString(1) shouldEqual "example.gpkg"
393+
}
394+
}
395+
324396
private def readFeatureData(tableName: String): DataFrame = {
325397
sparkSession.read
326398
.format("geopackage")

spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,20 @@ import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions, Ge
2525
import org.apache.spark.broadcast.Broadcast
2626
import org.apache.spark.sql.SparkSession
2727
import org.apache.spark.sql.catalyst.InternalRow
28+
import org.apache.spark.sql.catalyst.expressions.{BoundReference, JoinedRow}
29+
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
2830
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory}
2931
import org.apache.spark.sql.execution.datasources.FilePartition
3032
import org.apache.spark.sql.types.StructType
33+
import org.apache.spark.unsafe.types.UTF8String
3134
import org.apache.spark.util.SerializableConfiguration
3235

3336
case class GeoPackagePartitionReaderFactory(
3437
sparkSession: SparkSession,
3538
broadcastedConf: Broadcast[SerializableConfiguration],
3639
loadOptions: GeoPackageOptions,
37-
dataSchema: StructType)
40+
dataSchema: StructType,
41+
metadataSchema: StructType)
3842
extends PartitionReaderFactory {
3943

4044
override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
@@ -72,7 +76,7 @@ case class GeoPackagePartitionReaderFactory(
7276
case _ => None
7377
}
7478

75-
GeoPackagePartitionReader(
79+
val baseReader = GeoPackagePartitionReader(
7680
rs = rs,
7781
options = GeoPackageReadOptions(
7882
tableName = loadOptions.tableName,
@@ -84,5 +88,52 @@ case class GeoPackagePartitionReaderFactory(
8488
broadcastedConf = broadcastedConf,
8589
currentTempFile = tempFile,
8690
copying = copied)
91+
92+
if (metadataSchema.nonEmpty) {
93+
val gpkgFile = partitionFiles.head
94+
val filePath = gpkgFile.filePath.toString
95+
val fileName = new Path(filePath).getName
96+
97+
val allMetadataValues: Map[String, Any] = Map(
98+
"file_path" -> UTF8String.fromString(filePath),
99+
"file_name" -> UTF8String.fromString(fileName),
100+
"file_size" -> gpkgFile.fileSize,
101+
"file_block_start" -> gpkgFile.start,
102+
"file_block_length" -> gpkgFile.length,
103+
"file_modification_time" -> (gpkgFile.modificationTime * 1000L))
104+
105+
val innerStructType = metadataSchema.fields.head.dataType.asInstanceOf[StructType]
106+
val prunedValues = innerStructType.fields.map(f => allMetadataValues(f.name))
107+
val metadataStruct = InternalRow.fromSeq(prunedValues.toSeq)
108+
val metadataRow = InternalRow.fromSeq(Seq(metadataStruct))
109+
110+
new PartitionReaderWithMetadata(baseReader, dataSchema, metadataSchema, metadataRow)
111+
} else {
112+
baseReader
113+
}
87114
}
88115
}
116+
117+
private[geopackage] class PartitionReaderWithMetadata(
118+
reader: PartitionReader[InternalRow],
119+
baseSchema: StructType,
120+
metadataSchema: StructType,
121+
metadataValues: InternalRow)
122+
extends PartitionReader[InternalRow] {
123+
124+
private val joinedRow = new JoinedRow()
125+
private val unsafeProjection =
126+
GenerateUnsafeProjection.generate(baseSchema.fields.zipWithIndex.map { case (f, i) =>
127+
BoundReference(i, f.dataType, f.nullable)
128+
} ++ metadataSchema.fields.zipWithIndex.map { case (f, i) =>
129+
BoundReference(baseSchema.length + i, f.dataType, f.nullable)
130+
})
131+
132+
override def next(): Boolean = reader.next()
133+
134+
override def get(): InternalRow = {
135+
unsafeProjection(joinedRow(reader.get(), metadataValues))
136+
}
137+
138+
override def close(): Unit = reader.close()
139+
}

spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,14 @@ case class GeoPackageScan(
3636
fileIndex: PartitioningAwareFileIndex,
3737
readDataSchema: StructType,
3838
readPartitionSchema: StructType,
39+
metadataSchema: StructType,
3940
options: CaseInsensitiveStringMap,
4041
loadOptions: GeoPackageOptions)
4142
extends FileScan {
4243

44+
override def readSchema(): StructType =
45+
StructType(readDataSchema.fields ++ readPartitionSchema.fields ++ metadataSchema.fields)
46+
4347
override def partitionFilters: Seq[Expression] = {
4448
Seq.empty
4549
}
@@ -54,6 +58,11 @@ case class GeoPackageScan(
5458
val broadcastedConf =
5559
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
5660

57-
GeoPackagePartitionReaderFactory(sparkSession, broadcastedConf, loadOptions, dataSchema)
61+
GeoPackagePartitionReaderFactory(
62+
sparkSession,
63+
broadcastedConf,
64+
loadOptions,
65+
dataSchema,
66+
metadataSchema)
5867
}
5968
}

0 commit comments

Comments
 (0)