Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -161,17 +161,24 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
val fileSizes = new JArrayList[JLong]()
val modificationTimes = new JArrayList[JLong]()
val partitionColumns = new JArrayList[JMap[String, String]]
val metadataColumns = new JArrayList[JMap[String, String]]
val needMetadataColumns = metadataColumnNames != null && metadataColumnNames.nonEmpty
val emptyMetadataColumn: JMap[String, String] =
java.util.Collections.emptyMap[String, String]()
val metadataColumns = new JArrayList[JMap[String, String]](f.files.length)
val otherMetadataColumns = new JArrayList[JMap[String, Object]]
f.files.foreach {
file =>
paths.add(new URI(file.filePath.toString()).toASCIIString)
starts.add(JLong.valueOf(file.start))
lengths.add(JLong.valueOf(file.length))
val metadataColumn =
SparkShimLoader.getSparkShims
.generateMetadataColumns(file, metadataColumnNames)
.asJava
if (needMetadataColumns) {
SparkShimLoader.getSparkShims
.generateMetadataColumns(file, metadataColumnNames)
.asJava
} else {
emptyMetadataColumn
}
metadataColumns.add(metadataColumn)
val partitionColumn = new JHashMap[String, String]()
for (i <- 0 until file.partitionValues.numFields) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,19 @@ class VeloxIteratorApi extends IteratorApi with Logging {
.unzip

val partitionColumns = getPartitionColumns(partitionSchema, partitionFiles)
val metadataColumns = partitionFiles
.map(
f => SparkShimLoader.getSparkShims.generateMetadataColumns(f, metadataColumnNames).asJava)
val needMetadataColumns = metadataColumnNames != null && metadataColumnNames.nonEmpty
val emptyMetadataColumn: java.util.Map[String, String] =
java.util.Collections.emptyMap[String, String]()
val metadataColumns: java.util.List[java.util.Map[String, String]] =
if (needMetadataColumns) {
partitionFiles
.map(
f =>
SparkShimLoader.getSparkShims.generateMetadataColumns(f, metadataColumnNames).asJava)
.asJava
} else {
java.util.Collections.nCopies(partitionFiles.size, emptyMetadataColumn)
}
val otherMetadataColumns = partitionFiles
.map(f => SparkShimLoader.getSparkShims.getOtherConstantMetadataColumnValues(f))

Expand All @@ -106,7 +116,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
fileSizes.asJava,
modificationTimes.asJava,
partitionColumns.map(_.asJava).asJava,
metadataColumns.asJava,
metadataColumns,
fileFormat,
locations.toList.asJava,
mapAsJavaMap(properties),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,26 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource
case _ => Seq(partition)
}

val metadataFromSpark = getMetadataColumns().map(_.name)

val inputFileRelatedMetadataKeys = Seq(
InputFileName().prettyName,
InputFileBlockStart().prettyName,
InputFileBlockLength().prettyName)

val neededInputFileRelatedMetadataKeys =
inputFileRelatedMetadataKeys.filter(k => output.exists(_.name == k))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix. If the output does not include a specific column, but the scan filter references it, do we still need to keep the metadata?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rui-mo thanks for you reply. I'll add a test to verify it.


val metadataColumnNames = (metadataFromSpark ++ neededInputFileRelatedMetadataKeys).distinct

BackendsApiManager.getIteratorApiInstance
.genSplitInfo(
partition.index,
part,
getPartitionSchema,
getDataSchema,
readFileFormat,
getMetadataColumns().map(_.name),
metadataColumnNames,
getProperties)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,33 @@ trait SparkShims {
def generateMetadataColumns(
file: PartitionedFile,
metadataColumnNames: Seq[String] = Seq.empty): Map[String, String] = {
Map(
InputFileName().prettyName -> file.filePath.toString,
InputFileBlockStart().prettyName -> file.start.toString,
InputFileBlockLength().prettyName -> file.length.toString
)
if (metadataColumnNames == null || metadataColumnNames.isEmpty) {
return Map.empty
}

val inputFileName = InputFileName().prettyName
val inputFileBlockStart = InputFileBlockStart().prettyName
val inputFileBlockLength = InputFileBlockLength().prettyName

if (
!metadataColumnNames.contains(inputFileName) &&
!metadataColumnNames.contains(inputFileBlockStart) &&
!metadataColumnNames.contains(inputFileBlockLength)
) {
return Map.empty
}

var metadataColumn = Map.empty[String, String]
if (metadataColumnNames.contains(inputFileName)) {
metadataColumn += (inputFileName -> file.filePath.toString)
}
if (metadataColumnNames.contains(inputFileBlockStart)) {
metadataColumn += (inputFileBlockStart -> file.start.toString)
}
if (metadataColumnNames.contains(inputFileBlockLength)) {
metadataColumn += (inputFileBlockLength -> file.length.toString)
}
metadataColumn
}

// For compatibility with Spark-3.5.
Expand Down
Loading