diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala index d5d075760f..951c73f854 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala @@ -146,7 +146,7 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging { // Extract object store options from first file (S3 configs apply to all files in scan) var firstPartition: Option[PartitionedFile] = None - val filePartitions = scan.getFilePartitions() + val filePartitions = scan.planner.getFilePartitions() firstPartition = filePartitions.flatMap(_.files.headOption).headOption val partitionSchema = schema2Proto(scan.relation.partitionSchema.fields) @@ -205,6 +205,6 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging { } override def createExec(nativeOp: Operator, op: CometScanExec): CometNativeExec = { - CometNativeScanExec(nativeOp, op.wrapped, op.session, op) + CometNativeScanExec(nativeOp, op.wrapped, op.session, op.planner) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala index dcb975ac7a..4bfa9880b7 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala @@ -67,7 +67,7 @@ case class CometNativeScanExec( disableBucketedScan: Boolean = false, originalPlan: FileSourceScanExec, override val serializedPlanOpt: SerializedPlan, - @transient scan: CometScanExec, // Lazy access to file partitions without serializing with plan + @transient planner: FilePartitionPlanner, sourceKey: String) // Key for PlanDataInjector to match common+partition data at runtime extends CometLeafExec with DataSourceScanExec @@ -143,8 +143,9 @@ case class CometNativeScanExec( // Extract common data from nativeOp val commonBytes = nativeOp.getNativeScan.getCommon.toByteArray - // Get file partitions from CometScanExec (handles bucketing, etc.) - val filePartitions = scan.getFilePartitions() + // Get file partitions from FilePartitionPlanner (handles bucketing, etc.) + val filePartitions = planner.getFilePartitions() + planner.sendDriverMetrics(metrics, sparkContext) // Serialize each partition's files import org.apache.comet.serde.operator.partition2Proto @@ -210,7 +211,7 @@ case class CometNativeScanExec( disableBucketedScan, originalPlan.doCanonicalize(), SerializedPlan(None), - null, // Transient scan not needed for canonicalization + null, // Transient planner not needed for canonicalization "" ) // sourceKey not needed for canonicalization } @@ -246,7 +247,7 @@ case class CometNativeScanExec( case Some(metric) => nativeMetrics + ("numOutputRows" -> metric) case None => nativeMetrics } - withAlias ++ scan.metrics.filterKeys(driverMetricKeys) + withAlias ++ originalPlan.driverMetrics.filterKeys(driverMetricKeys) } /** @@ -260,7 +261,7 @@ object CometNativeScanExec { nativeOp: Operator, scanExec: FileSourceScanExec, session: SparkSession, - scan: CometScanExec): CometNativeScanExec = { + planner: FilePartitionPlanner): CometNativeScanExec = { // TreeNode.mapProductIterator is protected method. def mapProductIterator[B: ClassTag](product: Product, f: Any => B): Array[B] = { val arr = Array.ofDim[B](product.productArity) @@ -310,7 +311,7 @@ object CometNativeScanExec { wrapped.disableBucketedScan, wrapped, SerializedPlan(None), - scan, + planner, sourceKey) scanExec.logicalLink.foreach(batchScanExec.setLogicalLink) batchScanExec diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala index 6151a43797..a12b93910f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala @@ -19,21 +19,16 @@ package org.apache.spark.sql.comet -import scala.collection.mutable.HashMap -import scala.concurrent.duration.NANOSECONDS import scala.reflect.ClassTag -import org.apache.hadoop.fs.Path import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst._ -import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.comet.shims.ShimCometScanExec -import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions} @@ -83,62 +78,20 @@ case class CometScanExec( override def vectorTypes: Option[Seq[String]] = wrapped.vectorTypes - private lazy val driverMetrics: HashMap[String, Long] = HashMap.empty - - /** - * Send the driver-side metrics. Before calling this function, selectedPartitions has been - * initialized. See SPARK-26327 for more details. - */ - private def sendDriverMetrics(): Unit = { - driverMetrics.foreach(e => metrics(e._1).add(e._2)) - val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) - SQLMetrics.postDriverMetricUpdates( - sparkContext, - executionId, - metrics.filter(e => driverMetrics.contains(e._1)).values.toSeq) - } + @transient lazy val planner: FilePartitionPlanner = new FilePartitionPlanner( + relation, + requiredSchema, + partitionFilters, + dataFilters, + optionalBucketSet, + optionalNumCoalescedBuckets, + bucketedScan) private def isDynamicPruningFilter(e: Expression): Boolean = e.find(_.isInstanceOf[PlanExpression[_]]).isDefined - @transient lazy val selectedPartitions: Array[PartitionDirectory] = { - val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L) - val startTime = System.nanoTime() - val ret = - relation.location.listFiles(partitionFilters.filterNot(isDynamicPruningFilter), dataFilters) - setFilesNumAndSizeMetric(ret, true) - val timeTakenMs = - NANOSECONDS.toMillis((System.nanoTime() - startTime) + optimizerMetadataTimeNs) - driverMetrics("metadataTime") = timeTakenMs - ret - }.toArray - - // We can only determine the actual partitions at runtime when a dynamic partition filter is - // present. This is because such a filter relies on information that is only available at run - // time (for instance the keys used in the other side of a join). - @transient private lazy val dynamicallySelectedPartitions: Array[PartitionDirectory] = { - val dynamicPartitionFilters = partitionFilters.filter(isDynamicPruningFilter) - - if (dynamicPartitionFilters.nonEmpty) { - val startTime = System.nanoTime() - // call the file index for the files matching all filters except dynamic partition filters - val predicate = dynamicPartitionFilters.reduce(And) - val partitionColumns = relation.partitionSchema - val boundPredicate = Predicate.create( - predicate.transform { case a: AttributeReference => - val index = partitionColumns.indexWhere(a.name == _.name) - BoundReference(index, partitionColumns(index).dataType, nullable = true) - }, - Nil) - val ret = selectedPartitions.filter(p => boundPredicate.eval(p.values)) - setFilesNumAndSizeMetric(ret, false) - val timeTakenMs = (System.nanoTime() - startTime) / 1000 / 1000 - driverMetrics("pruningTime") = timeTakenMs - ret - } else { - selectedPartitions - } - } + @transient lazy val selectedPartitions: Array[PartitionDirectory] = + planner.selectedPartitions // exposed for testing lazy val bucketedScan: Boolean = wrapped.bucketedScan @@ -214,41 +167,14 @@ case class CometScanExec( hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)) - val readRDD = if (bucketedScan) { - createBucketedReadRDD( - relation.bucketSpec.get, - readFile, - dynamicallySelectedPartitions, - relation) - } else { - createReadRDD(readFile, dynamicallySelectedPartitions, relation) - } - sendDriverMetrics() - readRDD + val filePartitions = getFilePartitions() + prepareRDD(relation, readFile, filePartitions) } override def inputRDDs(): Seq[RDD[InternalRow]] = { inputRDD :: Nil } - /** Helper for computing total number and size of files in selected partitions. */ - private def setFilesNumAndSizeMetric( - partitions: Seq[PartitionDirectory], - static: Boolean): Unit = { - val filesNum = partitions.map(_.files.size.toLong).sum - val filesSize = partitions.map(_.files.map(_.getLen).sum).sum - if (!static || !partitionFilters.exists(isDynamicPruningFilter)) { - driverMetrics("numFiles") = filesNum - driverMetrics("filesSize") = filesSize - } else { - driverMetrics("staticFilesNum") = filesNum - driverMetrics("staticFilesSize") = filesSize - } - if (relation.partitionSchema.nonEmpty) { - driverMetrics("numPartitions") = partitions.length - } - } - override lazy val metrics: Map[String, SQLMetric] = wrapped.driverMetrics ++ CometMetricNode.baseScanMetrics( session.sparkContext) ++ (relation.fileFormat match { @@ -296,178 +222,9 @@ case class CometScanExec( * for native scans that only need partition metadata. */ def getFilePartitions(): Seq[FilePartition] = { - val filePartitions = if (bucketedScan) { - createFilePartitionsForBucketedScan( - relation.bucketSpec.get, - dynamicallySelectedPartitions, - relation) - } else { - createFilePartitionsForNonBucketedScan(dynamicallySelectedPartitions, relation) - } - sendDriverMetrics() - filePartitions - } - - /** - * Create file partitions for bucketed scans without instantiating readers. - * - * @param bucketSpec - * the bucketing spec. - * @param selectedPartitions - * Hive-style partition that are part of the read. - * @param fsRelation - * [[HadoopFsRelation]] associated with the read. - */ - private def createFilePartitionsForBucketedScan( - bucketSpec: BucketSpec, - selectedPartitions: Array[PartitionDirectory], - fsRelation: HadoopFsRelation): Seq[FilePartition] = { - logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") - val filesGroupedToBuckets = - selectedPartitions - .flatMap { p => - p.files.map { f => - getPartitionedFile(f, p) - } - } - .groupBy { f => - BucketingUtils - .getBucketId(new Path(f.filePath.toString()).getName) - .getOrElse(throw QueryExecutionErrors.invalidBucketFile(f.filePath.toString())) - } - - val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) { - val bucketSet = optionalBucketSet.get - filesGroupedToBuckets.filter { f => - bucketSet.get(f._1) - } - } else { - filesGroupedToBuckets - } - - optionalNumCoalescedBuckets - .map { numCoalescedBuckets => - logInfo(s"Coalescing to ${numCoalescedBuckets} buckets") - val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 % numCoalescedBuckets) - Seq.tabulate(numCoalescedBuckets) { bucketId => - val partitionedFiles = coalescedBuckets - .get(bucketId) - .map { - _.values.flatten.toArray - } - .getOrElse(Array.empty) - FilePartition(bucketId, partitionedFiles) - } - } - .getOrElse { - Seq.tabulate(bucketSpec.numBuckets) { bucketId => - FilePartition(bucketId, prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty)) - } - } - } - - /** - * Create file partitions for non-bucketed scans without instantiating readers. - * - * @param selectedPartitions - * Hive-style partition that are part of the read. - * @param fsRelation - * [[HadoopFsRelation]] associated with the read. - */ - private def createFilePartitionsForNonBucketedScan( - selectedPartitions: Array[PartitionDirectory], - fsRelation: HadoopFsRelation): Seq[FilePartition] = { - val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes - val maxSplitBytes = - FilePartition.maxSplitBytes(fsRelation.sparkSession, selectedPartitions) - logInfo( - s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + - s"open cost is considered as scanning $openCostInBytes bytes.") - - // Filter files with bucket pruning if possible - val bucketingEnabled = fsRelation.sparkSession.sessionState.conf.bucketingEnabled - val shouldProcess: Path => Boolean = optionalBucketSet match { - case Some(bucketSet) if bucketingEnabled => - // Do not prune the file if bucket file name is invalid - filePath => BucketingUtils.getBucketId(filePath.getName).forall(bucketSet.get) - case _ => - _ => true - } - - val splitFiles = selectedPartitions - .flatMap { partition => - partition.files.flatMap { file => - // getPath() is very expensive so we only want to call it once in this block: - val filePath = file.getPath - - if (shouldProcess(filePath)) { - val isSplitable = relation.fileFormat.isSplitable( - relation.sparkSession, - relation.options, - filePath) && - // SPARK-39634: Allow file splitting in combination with row index generation once - // the fix for PARQUET-2161 is available. - !isNeededForSchema(requiredSchema) - super.splitFiles( - sparkSession = relation.sparkSession, - file = file, - filePath = filePath, - isSplitable = isSplitable, - maxSplitBytes = maxSplitBytes, - partitionValues = partition.values) - } else { - Seq.empty - } - } - } - .sortBy(_.length)(implicitly[Ordering[Long]].reverse) - - FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes) - } - - /** - * Create an RDD for bucketed reads. The non-bucketed variant of this function is - * [[createReadRDD]]. - * - * Each RDD partition being returned should include all the files with the same bucket id from - * all the given Hive partitions. - * - * @param bucketSpec - * the bucketing spec. - * @param readFile - * a function to read each (part of a) file. - * @param selectedPartitions - * Hive-style partition that are part of the read. - * @param fsRelation - * [[HadoopFsRelation]] associated with the read. - */ - private def createBucketedReadRDD( - bucketSpec: BucketSpec, - readFile: (PartitionedFile) => Iterator[InternalRow], - selectedPartitions: Array[PartitionDirectory], - fsRelation: HadoopFsRelation): RDD[InternalRow] = { - val filePartitions = - createFilePartitionsForBucketedScan(bucketSpec, selectedPartitions, fsRelation) - prepareRDD(fsRelation, readFile, filePartitions) - } - - /** - * Create an RDD for non-bucketed reads. The bucketed variant of this function is - * [[createBucketedReadRDD]]. - * - * @param readFile - * a function to read each (part of a) file. - * @param selectedPartitions - * Hive-style partition that are part of the read. - * @param fsRelation - * [[HadoopFsRelation]] associated with the read. - */ - private def createReadRDD( - readFile: (PartitionedFile) => Iterator[InternalRow], - selectedPartitions: Array[PartitionDirectory], - fsRelation: HadoopFsRelation): RDD[InternalRow] = { - val filePartitions = createFilePartitionsForNonBucketedScan(selectedPartitions, fsRelation) - prepareRDD(fsRelation, readFile, filePartitions) + val result = planner.getFilePartitions() + planner.sendDriverMetrics(metrics, sparkContext) + result } private def prepareRDD( diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/FilePartitionPlanner.scala b/spark/src/main/scala/org/apache/spark/sql/comet/FilePartitionPlanner.scala new file mode 100644 index 0000000000..599ee9304b --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/comet/FilePartitionPlanner.scala @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet + +import scala.collection.mutable.HashMap +import scala.concurrent.duration.NANOSECONDS + +import org.apache.hadoop.fs.Path +import org.apache.spark.SparkContext +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.comet.shims.ShimFilePartitionPlanner +import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.collection._ + +/** + * Utility class that encapsulates partition listing, dynamic pruning, bucketed/non-bucketed + * splitting, and driver metric accumulation for file-based scans. + * + * This is used by both CometScanExec (hybrid scans) and CometNativeScanExec (native scans). + */ +class FilePartitionPlanner( + @transient relation: HadoopFsRelation, + requiredSchema: StructType, + partitionFilters: Seq[Expression], + dataFilters: Seq[Expression], + optionalBucketSet: Option[BitSet], + optionalNumCoalescedBuckets: Option[Int], + bucketedScan: Boolean) + extends ShimFilePartitionPlanner + with Logging { + + private val accumulatedMetrics: HashMap[String, Long] = HashMap.empty + + private def isDynamicPruningFilter(e: Expression): Boolean = + e.find(_.isInstanceOf[PlanExpression[_]]).isDefined + + @transient lazy val selectedPartitions: Array[PartitionDirectory] = { + val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L) + val startTime = System.nanoTime() + val ret = + relation.location.listFiles(partitionFilters.filterNot(isDynamicPruningFilter), dataFilters) + setFilesNumAndSizeMetric(ret, true) + val timeTakenMs = + NANOSECONDS.toMillis((System.nanoTime() - startTime) + optimizerMetadataTimeNs) + accumulatedMetrics("metadataTime") = timeTakenMs + ret + }.toArray + + // We can only determine the actual partitions at runtime when a dynamic partition filter is + // present. This is because such a filter relies on information that is only available at run + // time (for instance the keys used in the other side of a join). + @transient private lazy val dynamicallySelectedPartitions: Array[PartitionDirectory] = { + val dynamicPartitionFilters = partitionFilters.filter(isDynamicPruningFilter) + + if (dynamicPartitionFilters.nonEmpty) { + val startTime = System.nanoTime() + // call the file index for the files matching all filters except dynamic partition filters + val predicate = dynamicPartitionFilters.reduce(And) + val partitionColumns = relation.partitionSchema + val boundPredicate = Predicate.create( + predicate.transform { case a: AttributeReference => + val index = partitionColumns.indexWhere(a.name == _.name) + BoundReference(index, partitionColumns(index).dataType, nullable = true) + }, + Nil) + val ret = selectedPartitions.filter(p => boundPredicate.eval(p.values)) + setFilesNumAndSizeMetric(ret, false) + val timeTakenMs = (System.nanoTime() - startTime) / 1000 / 1000 + accumulatedMetrics("pruningTime") = timeTakenMs + ret + } else { + selectedPartitions + } + } + + /** + * Compute the file partitions for this scan. + */ + def getFilePartitions(): Seq[FilePartition] = { + if (bucketedScan) { + createFilePartitionsForBucketedScan( + relation.bucketSpec.get, + dynamicallySelectedPartitions, + relation) + } else { + createFilePartitionsForNonBucketedScan(dynamicallySelectedPartitions, relation) + } + } + + /** + * Send the driver-side metrics. Before calling this function, selectedPartitions has been + * initialized. See SPARK-26327 for more details. + */ + def sendDriverMetrics(metricsMap: Map[String, SQLMetric], sparkContext: SparkContext): Unit = { + accumulatedMetrics.foreach(e => metricsMap(e._1).add(e._2)) + val executionId = + sparkContext.getLocalProperty(org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY) + org.apache.spark.sql.execution.metric.SQLMetrics.postDriverMetricUpdates( + sparkContext, + executionId, + metricsMap.filter(e => accumulatedMetrics.contains(e._1)).values.toSeq) + } + + /** Helper for computing total number and size of files in selected partitions. */ + private def setFilesNumAndSizeMetric( + partitions: Seq[PartitionDirectory], + static: Boolean): Unit = { + val filesNum = partitions.map(_.files.size.toLong).sum + val filesSize = partitions.map(_.files.map(_.getLen).sum).sum + if (!static || !partitionFilters.exists(isDynamicPruningFilter)) { + accumulatedMetrics("numFiles") = filesNum + accumulatedMetrics("filesSize") = filesSize + } else { + accumulatedMetrics("staticFilesNum") = filesNum + accumulatedMetrics("staticFilesSize") = filesSize + } + if (relation.partitionSchema.nonEmpty) { + accumulatedMetrics("numPartitions") = partitions.length + } + } + + /** + * Create file partitions for bucketed scans. + * + * Each partition being returned should include all the files with the same bucket id from all + * the given Hive partitions. + */ + private def createFilePartitionsForBucketedScan( + bucketSpec: BucketSpec, + selectedPartitions: Array[PartitionDirectory], + fsRelation: HadoopFsRelation): Seq[FilePartition] = { + logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") + val filesGroupedToBuckets = + selectedPartitions + .flatMap { p => + p.files.map { f => + getPartitionedFile(f, p) + } + } + .groupBy { f => + BucketingUtils + .getBucketId(new Path(f.filePath.toString()).getName) + .getOrElse(throw QueryExecutionErrors.invalidBucketFile(f.filePath.toString())) + } + + val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) { + val bucketSet = optionalBucketSet.get + filesGroupedToBuckets.filter { f => + bucketSet.get(f._1) + } + } else { + filesGroupedToBuckets + } + + optionalNumCoalescedBuckets + .map { numCoalescedBuckets => + logInfo(s"Coalescing to ${numCoalescedBuckets} buckets") + val coalescedBuckets = + prunedFilesGroupedToBuckets.groupBy(_._1 % numCoalescedBuckets) + Seq.tabulate(numCoalescedBuckets) { bucketId => + val partitionedFiles = coalescedBuckets + .get(bucketId) + .map { + _.values.flatten.toArray + } + .getOrElse(Array.empty) + FilePartition(bucketId, partitionedFiles) + } + } + .getOrElse { + Seq.tabulate(bucketSpec.numBuckets) { bucketId => + FilePartition(bucketId, prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty)) + } + } + } + + /** + * Create file partitions for non-bucketed scans. + */ + private def createFilePartitionsForNonBucketedScan( + selectedPartitions: Array[PartitionDirectory], + fsRelation: HadoopFsRelation): Seq[FilePartition] = { + val openCostInBytes = + fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes + val maxSplitBytes = + FilePartition.maxSplitBytes(fsRelation.sparkSession, selectedPartitions) + logInfo( + s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + + s"open cost is considered as scanning $openCostInBytes bytes.") + + // Filter files with bucket pruning if possible + val bucketingEnabled = + fsRelation.sparkSession.sessionState.conf.bucketingEnabled + val shouldProcess: Path => Boolean = optionalBucketSet match { + case Some(bucketSet) if bucketingEnabled => + // Do not prune the file if bucket file name is invalid + filePath => BucketingUtils.getBucketId(filePath.getName).forall(bucketSet.get) + case _ => + _ => true + } + + val splitFiles = selectedPartitions + .flatMap { partition => + partition.files.flatMap { file => + // getPath() is very expensive so we only want to call it once in this block: + val filePath = file.getPath + + if (shouldProcess(filePath)) { + val isSplitable = relation.fileFormat.isSplitable( + relation.sparkSession, + relation.options, + filePath) && + // SPARK-39634: Allow file splitting in combination with row index generation + // once the fix for PARQUET-2161 is available. + !isNeededForSchema(requiredSchema) + this.splitFiles( + sparkSession = relation.sparkSession, + file = file, + filePath = filePath, + isSplitable = isSplitable, + maxSplitBytes = maxSplitBytes, + partitionValues = partition.values) + } else { + Seq.empty + } + } + } + .sortBy(_.length)(implicitly[Ordering[Long]].reverse) + + FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes) + } +} diff --git a/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala index 7d8ba9f0f8..c970487d2a 100644 --- a/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala +++ b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala @@ -19,18 +19,14 @@ package org.apache.spark.sql.comet.shims -import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} -import org.apache.spark.sql.execution.{FileSourceScanExec, PartitionedFileUtil} +import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType -import org.apache.comet.shims.ShimFileFormat - trait ShimCometScanExec { def wrapped: FileSourceScanExec @@ -50,29 +46,6 @@ trait ShimCometScanExec { fileConstantMetadataColumns, options) - protected def isNeededForSchema(sparkSchema: StructType): Boolean = { - // TODO: remove after PARQUET-2161 becomes available in Parquet (tracked in SPARK-39634) - ShimFileFormat.findRowIndexColumnIndexInSchema(sparkSchema) >= 0 - } - - protected def getPartitionedFile(f: FileStatus, p: PartitionDirectory): PartitionedFile = - PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values) - - protected def splitFiles( - sparkSession: SparkSession, - file: FileStatus, - filePath: Path, - isSplitable: Boolean, - maxSplitBytes: Long, - partitionValues: InternalRow): Seq[PartitionedFile] = - PartitionedFileUtil.splitFiles( - sparkSession, - file, - filePath, - isSplitable, - maxSplitBytes, - partitionValues) - protected def getPushedDownFilters( relation: HadoopFsRelation, dataFilters: Seq[Expression]): Seq[Filter] = { diff --git a/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimFilePartitionPlanner.scala b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimFilePartitionPlanner.scala new file mode 100644 index 0000000000..c8d4b2ffac --- /dev/null +++ b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimFilePartitionPlanner.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet.shims + +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.PartitionedFileUtil +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.types.StructType + +import org.apache.comet.shims.ShimFileFormat + +trait ShimFilePartitionPlanner { + + protected def isNeededForSchema(sparkSchema: StructType): Boolean = { + // TODO: remove after PARQUET-2161 becomes available in Parquet (tracked in SPARK-39634) + ShimFileFormat.findRowIndexColumnIndexInSchema(sparkSchema) >= 0 + } + + protected def getPartitionedFile(f: FileStatus, p: PartitionDirectory): PartitionedFile = + PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values) + + protected def splitFiles( + sparkSession: SparkSession, + file: FileStatus, + filePath: Path, + isSplitable: Boolean, + maxSplitBytes: Long, + partitionValues: InternalRow): Seq[PartitionedFile] = + PartitionedFileUtil.splitFiles( + sparkSession, + file, + filePath, + isSplitable, + maxSplitBytes, + partitionValues) +} diff --git a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala index 12be0cc53f..e39d402e15 100644 --- a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala +++ b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala @@ -19,19 +19,13 @@ package org.apache.spark.sql.comet.shims -import scala.math.Ordering.Implicits._ - -import org.apache.hadoop.fs.Path -import org.apache.spark.SPARK_VERSION_SHORT -import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} -import org.apache.spark.sql.execution.{FileSourceScanExec, PartitionedFileUtil} +import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType -import org.apache.spark.util.VersionUtils trait ShimCometScanExec { def wrapped: FileSourceScanExec @@ -39,14 +33,6 @@ trait ShimCometScanExec { lazy val fileConstantMetadataColumns: Seq[AttributeReference] = wrapped.fileConstantMetadataColumns - def isSparkVersionAtLeast355: Boolean = { - VersionUtils.majorMinorPatchVersion(SPARK_VERSION_SHORT) match { - case Some((major, minor, patch)) => (major, minor, patch) >= (3, 5, 5) - case None => - throw new IllegalArgumentException(s"Malformed Spark version: $SPARK_VERSION_SHORT") - } - } - protected def newFileScanRDD( fsRelation: HadoopFsRelation, readFunction: PartitionedFile => Iterator[InternalRow], @@ -61,78 +47,6 @@ trait ShimCometScanExec { fsRelation.fileFormat.fileConstantMetadataExtractors, options) - // see SPARK-39634 - protected def isNeededForSchema(sparkSchema: StructType): Boolean = false - - protected def getPartitionedFile( - f: FileStatusWithMetadata, - p: PartitionDirectory): PartitionedFile = - // Use reflection to invoke the relevant method according to the spark version - // See https://github.com/apache/datafusion-comet/issues/1572 - if (isSparkVersionAtLeast355) { - PartitionedFileUtil.getClass - .getMethod( - "getPartitionedFile", - classOf[FileStatusWithMetadata], - classOf[Path], - classOf[InternalRow]) - .invoke(PartitionedFileUtil, f, f.getPath, p.values) - .asInstanceOf[PartitionedFile] - } else { - PartitionedFileUtil.getClass - .getMethod("getPartitionedFile", classOf[FileStatusWithMetadata], classOf[InternalRow]) - .invoke(PartitionedFileUtil, f, p.values) - .asInstanceOf[PartitionedFile] - } - - protected def splitFiles( - sparkSession: SparkSession, - file: FileStatusWithMetadata, - filePath: Path, - isSplitable: Boolean, - maxSplitBytes: Long, - partitionValues: InternalRow): Seq[PartitionedFile] = { - // Use reflection to invoke the relevant method according to the spark version - // See https://github.com/apache/datafusion-comet/issues/1572 - if (isSparkVersionAtLeast355) { - PartitionedFileUtil.getClass - .getMethod( - "splitFiles", - classOf[SparkSession], - classOf[FileStatusWithMetadata], - classOf[Path], - java.lang.Boolean.TYPE, - java.lang.Long.TYPE, - classOf[InternalRow]) - .invoke( - PartitionedFileUtil, - sparkSession, - file, - filePath, - java.lang.Boolean.valueOf(isSplitable), - java.lang.Long.valueOf(maxSplitBytes), - partitionValues) - .asInstanceOf[Seq[PartitionedFile]] - } else { - PartitionedFileUtil.getClass - .getMethod( - "splitFiles", - classOf[SparkSession], - classOf[FileStatusWithMetadata], - java.lang.Boolean.TYPE, - java.lang.Long.TYPE, - classOf[InternalRow]) - .invoke( - PartitionedFileUtil, - sparkSession, - file, - java.lang.Boolean.valueOf(isSplitable), - java.lang.Long.valueOf(maxSplitBytes), - partitionValues) - .asInstanceOf[Seq[PartitionedFile]] - } - } - protected def getPushedDownFilters( relation: HadoopFsRelation, dataFilters: Seq[Expression]): Seq[Filter] = { diff --git a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimFilePartitionPlanner.scala b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimFilePartitionPlanner.scala new file mode 100644 index 0000000000..1ae3c50389 --- /dev/null +++ b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimFilePartitionPlanner.scala @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet.shims + +import scala.math.Ordering.Implicits._ + +import org.apache.hadoop.fs.Path +import org.apache.spark.SPARK_VERSION_SHORT +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.PartitionedFileUtil +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.VersionUtils + +trait ShimFilePartitionPlanner { + + private def isSparkVersionAtLeast355: Boolean = { + VersionUtils.majorMinorPatchVersion(SPARK_VERSION_SHORT) match { + case Some((major, minor, patch)) => (major, minor, patch) >= (3, 5, 5) + case None => + throw new IllegalArgumentException(s"Malformed Spark version: $SPARK_VERSION_SHORT") + } + } + + // see SPARK-39634 + protected def isNeededForSchema(sparkSchema: StructType): Boolean = false + + protected def getPartitionedFile( + f: FileStatusWithMetadata, + p: PartitionDirectory): PartitionedFile = + // Use reflection to invoke the relevant method according to the spark version + // See https://github.com/apache/datafusion-comet/issues/1572 + if (isSparkVersionAtLeast355) { + PartitionedFileUtil.getClass + .getMethod( + "getPartitionedFile", + classOf[FileStatusWithMetadata], + classOf[Path], + classOf[InternalRow]) + .invoke(PartitionedFileUtil, f, f.getPath, p.values) + .asInstanceOf[PartitionedFile] + } else { + PartitionedFileUtil.getClass + .getMethod("getPartitionedFile", classOf[FileStatusWithMetadata], classOf[InternalRow]) + .invoke(PartitionedFileUtil, f, p.values) + .asInstanceOf[PartitionedFile] + } + + protected def splitFiles( + sparkSession: SparkSession, + file: FileStatusWithMetadata, + filePath: Path, + isSplitable: Boolean, + maxSplitBytes: Long, + partitionValues: InternalRow): Seq[PartitionedFile] = { + // Use reflection to invoke the relevant method according to the spark version + // See https://github.com/apache/datafusion-comet/issues/1572 + if (isSparkVersionAtLeast355) { + PartitionedFileUtil.getClass + .getMethod( + "splitFiles", + classOf[SparkSession], + classOf[FileStatusWithMetadata], + classOf[Path], + java.lang.Boolean.TYPE, + java.lang.Long.TYPE, + classOf[InternalRow]) + .invoke( + PartitionedFileUtil, + sparkSession, + file, + filePath, + java.lang.Boolean.valueOf(isSplitable), + java.lang.Long.valueOf(maxSplitBytes), + partitionValues) + .asInstanceOf[Seq[PartitionedFile]] + } else { + PartitionedFileUtil.getClass + .getMethod( + "splitFiles", + classOf[SparkSession], + classOf[FileStatusWithMetadata], + java.lang.Boolean.TYPE, + java.lang.Long.TYPE, + classOf[InternalRow]) + .invoke( + PartitionedFileUtil, + sparkSession, + file, + java.lang.Boolean.valueOf(isSplitable), + java.lang.Long.valueOf(maxSplitBytes), + partitionValues) + .asInstanceOf[Seq[PartitionedFile]] + } + } +} diff --git a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala index 3d9b963a93..82f5439160 100644 --- a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala +++ b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala @@ -19,11 +19,9 @@ package org.apache.spark.sql.comet.shims -import org.apache.hadoop.fs.Path -import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, FileSourceConstantMetadataAttribute, Literal} -import org.apache.spark.sql.execution.{FileSourceScanExec, PartitionedFileUtil, ScalarSubquery} +import org.apache.spark.sql.execution.{FileSourceScanExec, ScalarSubquery} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions import org.apache.spark.sql.sources.Filter @@ -51,23 +49,6 @@ trait ShimCometScanExec extends ShimStreamSourceAwareSparkPlan { options) } - // see SPARK-39634 - protected def isNeededForSchema(sparkSchema: StructType): Boolean = false - - protected def getPartitionedFile( - f: FileStatusWithMetadata, - p: PartitionDirectory): PartitionedFile = - PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values, 0, f.getLen) - - protected def splitFiles( - sparkSession: SparkSession, - file: FileStatusWithMetadata, - filePath: Path, - isSplitable: Boolean, - maxSplitBytes: Long, - partitionValues: InternalRow): Seq[PartitionedFile] = - PartitionedFileUtil.splitFiles(file, filePath, isSplitable, maxSplitBytes, partitionValues) - protected def getPushedDownFilters( relation: HadoopFsRelation, dataFilters: Seq[Expression]): Seq[Filter] = { diff --git a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimFilePartitionPlanner.scala b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimFilePartitionPlanner.scala new file mode 100644 index 0000000000..11b69053a4 --- /dev/null +++ b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimFilePartitionPlanner.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet.shims + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.PartitionedFileUtil +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.types.StructType + +trait ShimFilePartitionPlanner { + + // see SPARK-39634 + protected def isNeededForSchema(sparkSchema: StructType): Boolean = false + + protected def getPartitionedFile( + f: FileStatusWithMetadata, + p: PartitionDirectory): PartitionedFile = + PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values, 0, f.getLen) + + protected def splitFiles( + sparkSession: SparkSession, + file: FileStatusWithMetadata, + filePath: Path, + isSplitable: Boolean, + maxSplitBytes: Long, + partitionValues: InternalRow): Seq[PartitionedFile] = + PartitionedFileUtil.splitFiles(file, filePath, isSplitable, maxSplitBytes, partitionValues) +} diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala index b8db737a3c..7be5343a52 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala @@ -66,8 +66,8 @@ class ParquetReadFromFakeHadoopFsSuite extends CometTestBase with AdaptiveSparkP p } assert(scans.size == 1) - // File partitions are now accessed from the scan field, not from the protobuf - val filePartitions = scans.head.scan.getFilePartitions() + // File partitions are now accessed from the planner field + val filePartitions = scans.head.planner.getFilePartitions() assert(filePartitions.nonEmpty) assert( filePartitions.head.files.head.filePath.toString