From 253c109a98787ac0faee5c310a58932e1f408246 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 18 Mar 2026 08:03:33 -0600 Subject: [PATCH 1/3] refactor: extract FilePartitionPlanner and ShimFilePartitionPlanner traits Create ShimFilePartitionPlanner traits for Spark 3.4, 3.5, and 4.0 that provide version-specific file operations (isNeededForSchema, getPartitionedFile, splitFiles). Create FilePartitionPlanner utility class that encapsulates partition listing, dynamic pruning, bucketed/non-bucketed splitting, and driver metric accumulation for file-based scans. --- .../sql/comet/FilePartitionPlanner.scala | 264 ++++++++++++++++++ .../shims/ShimFilePartitionPlanner.scala | 55 ++++ .../shims/ShimFilePartitionPlanner.scala | 118 ++++++++ .../shims/ShimFilePartitionPlanner.scala | 52 ++++ 4 files changed, 489 insertions(+) create mode 100644 spark/src/main/scala/org/apache/spark/sql/comet/FilePartitionPlanner.scala create mode 100644 spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimFilePartitionPlanner.scala create mode 100644 spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimFilePartitionPlanner.scala create mode 100644 spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimFilePartitionPlanner.scala diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/FilePartitionPlanner.scala b/spark/src/main/scala/org/apache/spark/sql/comet/FilePartitionPlanner.scala new file mode 100644 index 0000000000..cd80e7e65c --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/comet/FilePartitionPlanner.scala @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet + +import scala.collection.mutable.HashMap +import scala.concurrent.duration.NANOSECONDS + +import org.apache.hadoop.fs.Path +import org.apache.spark.SparkContext +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.comet.shims.ShimFilePartitionPlanner +import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.collection._ + +/** + * Utility class that encapsulates partition listing, dynamic pruning, bucketed/non-bucketed + * splitting, and driver metric accumulation for file-based scans. + * + * This is used by both CometScanExec (hybrid scans) and CometNativeScanExec (native scans). + */ +class FilePartitionPlanner( + @transient relation: HadoopFsRelation, + requiredSchema: StructType, + partitionFilters: Seq[Expression], + dataFilters: Seq[Expression], + optionalBucketSet: Option[BitSet], + optionalNumCoalescedBuckets: Option[Int], + bucketedScan: Boolean) + extends ShimFilePartitionPlanner + with Logging { + + private val accumulatedMetrics: HashMap[String, Long] = HashMap.empty + + private def isDynamicPruningFilter(e: Expression): Boolean = + e.find(_.isInstanceOf[PlanExpression[_]]).isDefined + + @transient lazy val selectedPartitions: Array[PartitionDirectory] = { + val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L) + val startTime = System.nanoTime() + val ret = + relation.location.listFiles( + partitionFilters.filterNot(isDynamicPruningFilter), + dataFilters) + setFilesNumAndSizeMetric(ret, true) + val timeTakenMs = + NANOSECONDS.toMillis((System.nanoTime() - startTime) + optimizerMetadataTimeNs) + accumulatedMetrics("metadataTime") = timeTakenMs + ret + }.toArray + + // We can only determine the actual partitions at runtime when a dynamic partition filter is + // present. This is because such a filter relies on information that is only available at run + // time (for instance the keys used in the other side of a join). + @transient private lazy val dynamicallySelectedPartitions: Array[PartitionDirectory] = { + val dynamicPartitionFilters = partitionFilters.filter(isDynamicPruningFilter) + + if (dynamicPartitionFilters.nonEmpty) { + val startTime = System.nanoTime() + // call the file index for the files matching all filters except dynamic partition filters + val predicate = dynamicPartitionFilters.reduce(And) + val partitionColumns = relation.partitionSchema + val boundPredicate = Predicate.create( + predicate.transform { case a: AttributeReference => + val index = partitionColumns.indexWhere(a.name == _.name) + BoundReference(index, partitionColumns(index).dataType, nullable = true) + }, + Nil) + val ret = selectedPartitions.filter(p => boundPredicate.eval(p.values)) + setFilesNumAndSizeMetric(ret, false) + val timeTakenMs = (System.nanoTime() - startTime) / 1000 / 1000 + accumulatedMetrics("pruningTime") = timeTakenMs + ret + } else { + selectedPartitions + } + } + + /** + * Compute the file partitions for this scan. + */ + def getFilePartitions(): Seq[FilePartition] = { + if (bucketedScan) { + createFilePartitionsForBucketedScan( + relation.bucketSpec.get, + dynamicallySelectedPartitions, + relation) + } else { + createFilePartitionsForNonBucketedScan(dynamicallySelectedPartitions, relation) + } + } + + /** + * Send the driver-side metrics. Before calling this function, selectedPartitions has been + * initialized. See SPARK-26327 for more details. + */ + def sendDriverMetrics( + metricsMap: Map[String, SQLMetric], + sparkContext: SparkContext): Unit = { + accumulatedMetrics.foreach(e => metricsMap(e._1).add(e._2)) + val executionId = sparkContext.getLocalProperty( + org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY) + org.apache.spark.sql.execution.metric.SQLMetrics.postDriverMetricUpdates( + sparkContext, + executionId, + metricsMap.filter(e => accumulatedMetrics.contains(e._1)).values.toSeq) + } + + /** Helper for computing total number and size of files in selected partitions. */ + private def setFilesNumAndSizeMetric( + partitions: Seq[PartitionDirectory], + static: Boolean): Unit = { + val filesNum = partitions.map(_.files.size.toLong).sum + val filesSize = partitions.map(_.files.map(_.getLen).sum).sum + if (!static || !partitionFilters.exists(isDynamicPruningFilter)) { + accumulatedMetrics("numFiles") = filesNum + accumulatedMetrics("filesSize") = filesSize + } else { + accumulatedMetrics("staticFilesNum") = filesNum + accumulatedMetrics("staticFilesSize") = filesSize + } + if (relation.partitionSchema.nonEmpty) { + accumulatedMetrics("numPartitions") = partitions.length + } + } + + /** + * Create file partitions for bucketed scans. + * + * Each partition being returned should include all the files with the same bucket id from all + * the given Hive partitions. + */ + private def createFilePartitionsForBucketedScan( + bucketSpec: BucketSpec, + selectedPartitions: Array[PartitionDirectory], + fsRelation: HadoopFsRelation): Seq[FilePartition] = { + logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") + val filesGroupedToBuckets = + selectedPartitions + .flatMap { p => + p.files.map { f => + getPartitionedFile(f, p) + } + } + .groupBy { f => + BucketingUtils + .getBucketId(new Path(f.filePath.toString()).getName) + .getOrElse( + throw QueryExecutionErrors.invalidBucketFile(f.filePath.toString())) + } + + val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) { + val bucketSet = optionalBucketSet.get + filesGroupedToBuckets.filter { f => + bucketSet.get(f._1) + } + } else { + filesGroupedToBuckets + } + + optionalNumCoalescedBuckets + .map { numCoalescedBuckets => + logInfo(s"Coalescing to ${numCoalescedBuckets} buckets") + val coalescedBuckets = + prunedFilesGroupedToBuckets.groupBy(_._1 % numCoalescedBuckets) + Seq.tabulate(numCoalescedBuckets) { bucketId => + val partitionedFiles = coalescedBuckets + .get(bucketId) + .map { + _.values.flatten.toArray + } + .getOrElse(Array.empty) + FilePartition(bucketId, partitionedFiles) + } + } + .getOrElse { + Seq.tabulate(bucketSpec.numBuckets) { bucketId => + FilePartition( + bucketId, + prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty)) + } + } + } + + /** + * Create file partitions for non-bucketed scans. + */ + private def createFilePartitionsForNonBucketedScan( + selectedPartitions: Array[PartitionDirectory], + fsRelation: HadoopFsRelation): Seq[FilePartition] = { + val openCostInBytes = + fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes + val maxSplitBytes = + FilePartition.maxSplitBytes(fsRelation.sparkSession, selectedPartitions) + logInfo( + s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + + s"open cost is considered as scanning $openCostInBytes bytes.") + + // Filter files with bucket pruning if possible + val bucketingEnabled = + fsRelation.sparkSession.sessionState.conf.bucketingEnabled + val shouldProcess: Path => Boolean = optionalBucketSet match { + case Some(bucketSet) if bucketingEnabled => + // Do not prune the file if bucket file name is invalid + filePath => BucketingUtils.getBucketId(filePath.getName).forall(bucketSet.get) + case _ => + _ => true + } + + val splitFiles = selectedPartitions + .flatMap { partition => + partition.files.flatMap { file => + // getPath() is very expensive so we only want to call it once in this block: + val filePath = file.getPath + + if (shouldProcess(filePath)) { + val isSplitable = relation.fileFormat.isSplitable( + relation.sparkSession, + relation.options, + filePath) && + // SPARK-39634: Allow file splitting in combination with row index generation + // once the fix for PARQUET-2161 is available. + !isNeededForSchema(requiredSchema) + this.splitFiles( + sparkSession = relation.sparkSession, + file = file, + filePath = filePath, + isSplitable = isSplitable, + maxSplitBytes = maxSplitBytes, + partitionValues = partition.values) + } else { + Seq.empty + } + } + } + .sortBy(_.length)(implicitly[Ordering[Long]].reverse) + + FilePartition.getFilePartitions( + relation.sparkSession, + splitFiles, + maxSplitBytes) + } +} diff --git a/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimFilePartitionPlanner.scala b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimFilePartitionPlanner.scala new file mode 100644 index 0000000000..c8d4b2ffac --- /dev/null +++ b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimFilePartitionPlanner.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet.shims + +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.PartitionedFileUtil +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.types.StructType + +import org.apache.comet.shims.ShimFileFormat + +trait ShimFilePartitionPlanner { + + protected def isNeededForSchema(sparkSchema: StructType): Boolean = { + // TODO: remove after PARQUET-2161 becomes available in Parquet (tracked in SPARK-39634) + ShimFileFormat.findRowIndexColumnIndexInSchema(sparkSchema) >= 0 + } + + protected def getPartitionedFile(f: FileStatus, p: PartitionDirectory): PartitionedFile = + PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values) + + protected def splitFiles( + sparkSession: SparkSession, + file: FileStatus, + filePath: Path, + isSplitable: Boolean, + maxSplitBytes: Long, + partitionValues: InternalRow): Seq[PartitionedFile] = + PartitionedFileUtil.splitFiles( + sparkSession, + file, + filePath, + isSplitable, + maxSplitBytes, + partitionValues) +} diff --git a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimFilePartitionPlanner.scala b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimFilePartitionPlanner.scala new file mode 100644 index 0000000000..13005aebd3 --- /dev/null +++ b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimFilePartitionPlanner.scala @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet.shims + +import scala.math.Ordering.Implicits._ + +import org.apache.hadoop.fs.Path +import org.apache.spark.SPARK_VERSION_SHORT +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.PartitionedFileUtil +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.VersionUtils + +trait ShimFilePartitionPlanner { + + private def isSparkVersionAtLeast355: Boolean = { + VersionUtils.majorMinorPatchVersion(SPARK_VERSION_SHORT) match { + case Some((major, minor, patch)) => (major, minor, patch) >= (3, 5, 5) + case None => + throw new IllegalArgumentException( + s"Malformed Spark version: $SPARK_VERSION_SHORT") + } + } + + // see SPARK-39634 + protected def isNeededForSchema(sparkSchema: StructType): Boolean = false + + protected def getPartitionedFile( + f: FileStatusWithMetadata, + p: PartitionDirectory): PartitionedFile = + // Use reflection to invoke the relevant method according to the spark version + // See https://github.com/apache/datafusion-comet/issues/1572 + if (isSparkVersionAtLeast355) { + PartitionedFileUtil.getClass + .getMethod( + "getPartitionedFile", + classOf[FileStatusWithMetadata], + classOf[Path], + classOf[InternalRow]) + .invoke(PartitionedFileUtil, f, f.getPath, p.values) + .asInstanceOf[PartitionedFile] + } else { + PartitionedFileUtil.getClass + .getMethod( + "getPartitionedFile", + classOf[FileStatusWithMetadata], + classOf[InternalRow]) + .invoke(PartitionedFileUtil, f, p.values) + .asInstanceOf[PartitionedFile] + } + + protected def splitFiles( + sparkSession: SparkSession, + file: FileStatusWithMetadata, + filePath: Path, + isSplitable: Boolean, + maxSplitBytes: Long, + partitionValues: InternalRow): Seq[PartitionedFile] = { + // Use reflection to invoke the relevant method according to the spark version + // See https://github.com/apache/datafusion-comet/issues/1572 + if (isSparkVersionAtLeast355) { + PartitionedFileUtil.getClass + .getMethod( + "splitFiles", + classOf[SparkSession], + classOf[FileStatusWithMetadata], + classOf[Path], + java.lang.Boolean.TYPE, + java.lang.Long.TYPE, + classOf[InternalRow]) + .invoke( + PartitionedFileUtil, + sparkSession, + file, + filePath, + java.lang.Boolean.valueOf(isSplitable), + java.lang.Long.valueOf(maxSplitBytes), + partitionValues) + .asInstanceOf[Seq[PartitionedFile]] + } else { + PartitionedFileUtil.getClass + .getMethod( + "splitFiles", + classOf[SparkSession], + classOf[FileStatusWithMetadata], + java.lang.Boolean.TYPE, + java.lang.Long.TYPE, + classOf[InternalRow]) + .invoke( + PartitionedFileUtil, + sparkSession, + file, + java.lang.Boolean.valueOf(isSplitable), + java.lang.Long.valueOf(maxSplitBytes), + partitionValues) + .asInstanceOf[Seq[PartitionedFile]] + } + } +} diff --git a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimFilePartitionPlanner.scala b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimFilePartitionPlanner.scala new file mode 100644 index 0000000000..c05855c275 --- /dev/null +++ b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimFilePartitionPlanner.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet.shims + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.PartitionedFileUtil +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.types.StructType + +trait ShimFilePartitionPlanner { + + // see SPARK-39634 + protected def isNeededForSchema(sparkSchema: StructType): Boolean = false + + protected def getPartitionedFile( + f: FileStatusWithMetadata, + p: PartitionDirectory): PartitionedFile = + PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values, 0, f.getLen) + + protected def splitFiles( + sparkSession: SparkSession, + file: FileStatusWithMetadata, + filePath: Path, + isSplitable: Boolean, + maxSplitBytes: Long, + partitionValues: InternalRow): Seq[PartitionedFile] = + PartitionedFileUtil.splitFiles( + file, + filePath, + isSplitable, + maxSplitBytes, + partitionValues) +} From f8eb81f647ec53d566546f4094056686e65bcb9d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 18 Mar 2026 08:08:37 -0600 Subject: [PATCH 2/3] refactor: delegate partition logic from CometScanExec to FilePartitionPlanner Remove duplicated splitFiles/getPartitionedFile/isNeededForSchema methods from all three ShimCometScanExec variants (now in ShimFilePartitionPlanner). CometScanExec delegates selectedPartitions and getFilePartitions() to a new @transient lazy val planner field, removing internal partition computation methods (dynamicallySelectedPartitions, createFilePartitionsForBucketedScan, createFilePartitionsForNonBucketedScan, createBucketedReadRDD, createReadRDD, setFilesNumAndSizeMetric, driverMetrics, sendDriverMetrics). CometNativeScanExec takes FilePartitionPlanner instead of CometScanExec, breaking the direct dependency. CometNativeScan serde passes op.planner instead of op when creating CometNativeScanExec. --- .../serde/operator/CometNativeScan.scala | 4 +- .../spark/sql/comet/CometNativeScanExec.scala | 15 +- .../spark/sql/comet/CometScanExec.scala | 273 +----------------- .../sql/comet/shims/ShimCometScanExec.scala | 29 +- .../sql/comet/shims/ShimCometScanExec.scala | 88 +----- .../sql/comet/shims/ShimCometScanExec.scala | 21 +- .../ParquetReadFromFakeHadoopFsSuite.scala | 4 +- 7 files changed, 30 insertions(+), 404 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala index d5d075760f..951c73f854 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala @@ -146,7 +146,7 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging { // Extract object store options from first file (S3 configs apply to all files in scan) var firstPartition: Option[PartitionedFile] = None - val filePartitions = scan.getFilePartitions() + val filePartitions = scan.planner.getFilePartitions() firstPartition = filePartitions.flatMap(_.files.headOption).headOption val partitionSchema = schema2Proto(scan.relation.partitionSchema.fields) @@ -205,6 +205,6 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging { } override def createExec(nativeOp: Operator, op: CometScanExec): CometNativeExec = { - CometNativeScanExec(nativeOp, op.wrapped, op.session, op) + CometNativeScanExec(nativeOp, op.wrapped, op.session, op.planner) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala index dcb975ac7a..01f6b049b6 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala @@ -67,7 +67,7 @@ case class CometNativeScanExec( disableBucketedScan: Boolean = false, originalPlan: FileSourceScanExec, override val serializedPlanOpt: SerializedPlan, - @transient scan: CometScanExec, // Lazy access to file partitions without serializing with plan + @transient planner: FilePartitionPlanner, // Lazy access to file partitions without serializing with plan sourceKey: String) // Key for PlanDataInjector to match common+partition data at runtime extends CometLeafExec with DataSourceScanExec @@ -143,8 +143,9 @@ case class CometNativeScanExec( // Extract common data from nativeOp val commonBytes = nativeOp.getNativeScan.getCommon.toByteArray - // Get file partitions from CometScanExec (handles bucketing, etc.) - val filePartitions = scan.getFilePartitions() + // Get file partitions from FilePartitionPlanner (handles bucketing, etc.) + val filePartitions = planner.getFilePartitions() + planner.sendDriverMetrics(metrics, sparkContext) // Serialize each partition's files import org.apache.comet.serde.operator.partition2Proto @@ -210,7 +211,7 @@ case class CometNativeScanExec( disableBucketedScan, originalPlan.doCanonicalize(), SerializedPlan(None), - null, // Transient scan not needed for canonicalization + null, // Transient planner not needed for canonicalization "" ) // sourceKey not needed for canonicalization } @@ -246,7 +247,7 @@ case class CometNativeScanExec( case Some(metric) => nativeMetrics + ("numOutputRows" -> metric) case None => nativeMetrics } - withAlias ++ scan.metrics.filterKeys(driverMetricKeys) + withAlias ++ originalPlan.driverMetrics.filterKeys(driverMetricKeys) } /** @@ -260,7 +261,7 @@ object CometNativeScanExec { nativeOp: Operator, scanExec: FileSourceScanExec, session: SparkSession, - scan: CometScanExec): CometNativeScanExec = { + planner: FilePartitionPlanner): CometNativeScanExec = { // TreeNode.mapProductIterator is protected method. def mapProductIterator[B: ClassTag](product: Product, f: Any => B): Array[B] = { val arr = Array.ofDim[B](product.productArity) @@ -310,7 +311,7 @@ object CometNativeScanExec { wrapped.disableBucketedScan, wrapped, SerializedPlan(None), - scan, + planner, sourceKey) scanExec.logicalLink.foreach(batchScanExec.setLogicalLink) batchScanExec diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala index 6151a43797..a12b93910f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala @@ -19,21 +19,16 @@ package org.apache.spark.sql.comet -import scala.collection.mutable.HashMap -import scala.concurrent.duration.NANOSECONDS import scala.reflect.ClassTag -import org.apache.hadoop.fs.Path import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst._ -import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.comet.shims.ShimCometScanExec -import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions} @@ -83,62 +78,20 @@ case class CometScanExec( override def vectorTypes: Option[Seq[String]] = wrapped.vectorTypes - private lazy val driverMetrics: HashMap[String, Long] = HashMap.empty - - /** - * Send the driver-side metrics. Before calling this function, selectedPartitions has been - * initialized. See SPARK-26327 for more details. - */ - private def sendDriverMetrics(): Unit = { - driverMetrics.foreach(e => metrics(e._1).add(e._2)) - val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) - SQLMetrics.postDriverMetricUpdates( - sparkContext, - executionId, - metrics.filter(e => driverMetrics.contains(e._1)).values.toSeq) - } + @transient lazy val planner: FilePartitionPlanner = new FilePartitionPlanner( + relation, + requiredSchema, + partitionFilters, + dataFilters, + optionalBucketSet, + optionalNumCoalescedBuckets, + bucketedScan) private def isDynamicPruningFilter(e: Expression): Boolean = e.find(_.isInstanceOf[PlanExpression[_]]).isDefined - @transient lazy val selectedPartitions: Array[PartitionDirectory] = { - val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L) - val startTime = System.nanoTime() - val ret = - relation.location.listFiles(partitionFilters.filterNot(isDynamicPruningFilter), dataFilters) - setFilesNumAndSizeMetric(ret, true) - val timeTakenMs = - NANOSECONDS.toMillis((System.nanoTime() - startTime) + optimizerMetadataTimeNs) - driverMetrics("metadataTime") = timeTakenMs - ret - }.toArray - - // We can only determine the actual partitions at runtime when a dynamic partition filter is - // present. This is because such a filter relies on information that is only available at run - // time (for instance the keys used in the other side of a join). - @transient private lazy val dynamicallySelectedPartitions: Array[PartitionDirectory] = { - val dynamicPartitionFilters = partitionFilters.filter(isDynamicPruningFilter) - - if (dynamicPartitionFilters.nonEmpty) { - val startTime = System.nanoTime() - // call the file index for the files matching all filters except dynamic partition filters - val predicate = dynamicPartitionFilters.reduce(And) - val partitionColumns = relation.partitionSchema - val boundPredicate = Predicate.create( - predicate.transform { case a: AttributeReference => - val index = partitionColumns.indexWhere(a.name == _.name) - BoundReference(index, partitionColumns(index).dataType, nullable = true) - }, - Nil) - val ret = selectedPartitions.filter(p => boundPredicate.eval(p.values)) - setFilesNumAndSizeMetric(ret, false) - val timeTakenMs = (System.nanoTime() - startTime) / 1000 / 1000 - driverMetrics("pruningTime") = timeTakenMs - ret - } else { - selectedPartitions - } - } + @transient lazy val selectedPartitions: Array[PartitionDirectory] = + planner.selectedPartitions // exposed for testing lazy val bucketedScan: Boolean = wrapped.bucketedScan @@ -214,41 +167,14 @@ case class CometScanExec( hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)) - val readRDD = if (bucketedScan) { - createBucketedReadRDD( - relation.bucketSpec.get, - readFile, - dynamicallySelectedPartitions, - relation) - } else { - createReadRDD(readFile, dynamicallySelectedPartitions, relation) - } - sendDriverMetrics() - readRDD + val filePartitions = getFilePartitions() + prepareRDD(relation, readFile, filePartitions) } override def inputRDDs(): Seq[RDD[InternalRow]] = { inputRDD :: Nil } - /** Helper for computing total number and size of files in selected partitions. */ - private def setFilesNumAndSizeMetric( - partitions: Seq[PartitionDirectory], - static: Boolean): Unit = { - val filesNum = partitions.map(_.files.size.toLong).sum - val filesSize = partitions.map(_.files.map(_.getLen).sum).sum - if (!static || !partitionFilters.exists(isDynamicPruningFilter)) { - driverMetrics("numFiles") = filesNum - driverMetrics("filesSize") = filesSize - } else { - driverMetrics("staticFilesNum") = filesNum - driverMetrics("staticFilesSize") = filesSize - } - if (relation.partitionSchema.nonEmpty) { - driverMetrics("numPartitions") = partitions.length - } - } - override lazy val metrics: Map[String, SQLMetric] = wrapped.driverMetrics ++ CometMetricNode.baseScanMetrics( session.sparkContext) ++ (relation.fileFormat match { @@ -296,178 +222,9 @@ case class CometScanExec( * for native scans that only need partition metadata. */ def getFilePartitions(): Seq[FilePartition] = { - val filePartitions = if (bucketedScan) { - createFilePartitionsForBucketedScan( - relation.bucketSpec.get, - dynamicallySelectedPartitions, - relation) - } else { - createFilePartitionsForNonBucketedScan(dynamicallySelectedPartitions, relation) - } - sendDriverMetrics() - filePartitions - } - - /** - * Create file partitions for bucketed scans without instantiating readers. - * - * @param bucketSpec - * the bucketing spec. - * @param selectedPartitions - * Hive-style partition that are part of the read. - * @param fsRelation - * [[HadoopFsRelation]] associated with the read. - */ - private def createFilePartitionsForBucketedScan( - bucketSpec: BucketSpec, - selectedPartitions: Array[PartitionDirectory], - fsRelation: HadoopFsRelation): Seq[FilePartition] = { - logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") - val filesGroupedToBuckets = - selectedPartitions - .flatMap { p => - p.files.map { f => - getPartitionedFile(f, p) - } - } - .groupBy { f => - BucketingUtils - .getBucketId(new Path(f.filePath.toString()).getName) - .getOrElse(throw QueryExecutionErrors.invalidBucketFile(f.filePath.toString())) - } - - val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) { - val bucketSet = optionalBucketSet.get - filesGroupedToBuckets.filter { f => - bucketSet.get(f._1) - } - } else { - filesGroupedToBuckets - } - - optionalNumCoalescedBuckets - .map { numCoalescedBuckets => - logInfo(s"Coalescing to ${numCoalescedBuckets} buckets") - val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 % numCoalescedBuckets) - Seq.tabulate(numCoalescedBuckets) { bucketId => - val partitionedFiles = coalescedBuckets - .get(bucketId) - .map { - _.values.flatten.toArray - } - .getOrElse(Array.empty) - FilePartition(bucketId, partitionedFiles) - } - } - .getOrElse { - Seq.tabulate(bucketSpec.numBuckets) { bucketId => - FilePartition(bucketId, prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty)) - } - } - } - - /** - * Create file partitions for non-bucketed scans without instantiating readers. - * - * @param selectedPartitions - * Hive-style partition that are part of the read. - * @param fsRelation - * [[HadoopFsRelation]] associated with the read. - */ - private def createFilePartitionsForNonBucketedScan( - selectedPartitions: Array[PartitionDirectory], - fsRelation: HadoopFsRelation): Seq[FilePartition] = { - val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes - val maxSplitBytes = - FilePartition.maxSplitBytes(fsRelation.sparkSession, selectedPartitions) - logInfo( - s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + - s"open cost is considered as scanning $openCostInBytes bytes.") - - // Filter files with bucket pruning if possible - val bucketingEnabled = fsRelation.sparkSession.sessionState.conf.bucketingEnabled - val shouldProcess: Path => Boolean = optionalBucketSet match { - case Some(bucketSet) if bucketingEnabled => - // Do not prune the file if bucket file name is invalid - filePath => BucketingUtils.getBucketId(filePath.getName).forall(bucketSet.get) - case _ => - _ => true - } - - val splitFiles = selectedPartitions - .flatMap { partition => - partition.files.flatMap { file => - // getPath() is very expensive so we only want to call it once in this block: - val filePath = file.getPath - - if (shouldProcess(filePath)) { - val isSplitable = relation.fileFormat.isSplitable( - relation.sparkSession, - relation.options, - filePath) && - // SPARK-39634: Allow file splitting in combination with row index generation once - // the fix for PARQUET-2161 is available. - !isNeededForSchema(requiredSchema) - super.splitFiles( - sparkSession = relation.sparkSession, - file = file, - filePath = filePath, - isSplitable = isSplitable, - maxSplitBytes = maxSplitBytes, - partitionValues = partition.values) - } else { - Seq.empty - } - } - } - .sortBy(_.length)(implicitly[Ordering[Long]].reverse) - - FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes) - } - - /** - * Create an RDD for bucketed reads. The non-bucketed variant of this function is - * [[createReadRDD]]. - * - * Each RDD partition being returned should include all the files with the same bucket id from - * all the given Hive partitions. - * - * @param bucketSpec - * the bucketing spec. - * @param readFile - * a function to read each (part of a) file. - * @param selectedPartitions - * Hive-style partition that are part of the read. - * @param fsRelation - * [[HadoopFsRelation]] associated with the read. - */ - private def createBucketedReadRDD( - bucketSpec: BucketSpec, - readFile: (PartitionedFile) => Iterator[InternalRow], - selectedPartitions: Array[PartitionDirectory], - fsRelation: HadoopFsRelation): RDD[InternalRow] = { - val filePartitions = - createFilePartitionsForBucketedScan(bucketSpec, selectedPartitions, fsRelation) - prepareRDD(fsRelation, readFile, filePartitions) - } - - /** - * Create an RDD for non-bucketed reads. The bucketed variant of this function is - * [[createBucketedReadRDD]]. - * - * @param readFile - * a function to read each (part of a) file. - * @param selectedPartitions - * Hive-style partition that are part of the read. - * @param fsRelation - * [[HadoopFsRelation]] associated with the read. - */ - private def createReadRDD( - readFile: (PartitionedFile) => Iterator[InternalRow], - selectedPartitions: Array[PartitionDirectory], - fsRelation: HadoopFsRelation): RDD[InternalRow] = { - val filePartitions = createFilePartitionsForNonBucketedScan(selectedPartitions, fsRelation) - prepareRDD(fsRelation, readFile, filePartitions) + val result = planner.getFilePartitions() + planner.sendDriverMetrics(metrics, sparkContext) + result } private def prepareRDD( diff --git a/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala index 7d8ba9f0f8..c970487d2a 100644 --- a/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala +++ b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala @@ -19,18 +19,14 @@ package org.apache.spark.sql.comet.shims -import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} -import org.apache.spark.sql.execution.{FileSourceScanExec, PartitionedFileUtil} +import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType -import org.apache.comet.shims.ShimFileFormat - trait ShimCometScanExec { def wrapped: FileSourceScanExec @@ -50,29 +46,6 @@ trait ShimCometScanExec { fileConstantMetadataColumns, options) - protected def isNeededForSchema(sparkSchema: StructType): Boolean = { - // TODO: remove after PARQUET-2161 becomes available in Parquet (tracked in SPARK-39634) - ShimFileFormat.findRowIndexColumnIndexInSchema(sparkSchema) >= 0 - } - - protected def getPartitionedFile(f: FileStatus, p: PartitionDirectory): PartitionedFile = - PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values) - - protected def splitFiles( - sparkSession: SparkSession, - file: FileStatus, - filePath: Path, - isSplitable: Boolean, - maxSplitBytes: Long, - partitionValues: InternalRow): Seq[PartitionedFile] = - PartitionedFileUtil.splitFiles( - sparkSession, - file, - filePath, - isSplitable, - maxSplitBytes, - partitionValues) - protected def getPushedDownFilters( relation: HadoopFsRelation, dataFilters: Seq[Expression]): Seq[Filter] = { diff --git a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala index 12be0cc53f..e39d402e15 100644 --- a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala +++ b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala @@ -19,19 +19,13 @@ package org.apache.spark.sql.comet.shims -import scala.math.Ordering.Implicits._ - -import org.apache.hadoop.fs.Path -import org.apache.spark.SPARK_VERSION_SHORT -import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} -import org.apache.spark.sql.execution.{FileSourceScanExec, PartitionedFileUtil} +import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType -import org.apache.spark.util.VersionUtils trait ShimCometScanExec { def wrapped: FileSourceScanExec @@ -39,14 +33,6 @@ trait ShimCometScanExec { lazy val fileConstantMetadataColumns: Seq[AttributeReference] = wrapped.fileConstantMetadataColumns - def isSparkVersionAtLeast355: Boolean = { - VersionUtils.majorMinorPatchVersion(SPARK_VERSION_SHORT) match { - case Some((major, minor, patch)) => (major, minor, patch) >= (3, 5, 5) - case None => - throw new IllegalArgumentException(s"Malformed Spark version: $SPARK_VERSION_SHORT") - } - } - protected def newFileScanRDD( fsRelation: HadoopFsRelation, readFunction: PartitionedFile => Iterator[InternalRow], @@ -61,78 +47,6 @@ trait ShimCometScanExec { fsRelation.fileFormat.fileConstantMetadataExtractors, options) - // see SPARK-39634 - protected def isNeededForSchema(sparkSchema: StructType): Boolean = false - - protected def getPartitionedFile( - f: FileStatusWithMetadata, - p: PartitionDirectory): PartitionedFile = - // Use reflection to invoke the relevant method according to the spark version - // See https://github.com/apache/datafusion-comet/issues/1572 - if (isSparkVersionAtLeast355) { - PartitionedFileUtil.getClass - .getMethod( - "getPartitionedFile", - classOf[FileStatusWithMetadata], - classOf[Path], - classOf[InternalRow]) - .invoke(PartitionedFileUtil, f, f.getPath, p.values) - .asInstanceOf[PartitionedFile] - } else { - PartitionedFileUtil.getClass - .getMethod("getPartitionedFile", classOf[FileStatusWithMetadata], classOf[InternalRow]) - .invoke(PartitionedFileUtil, f, p.values) - .asInstanceOf[PartitionedFile] - } - - protected def splitFiles( - sparkSession: SparkSession, - file: FileStatusWithMetadata, - filePath: Path, - isSplitable: Boolean, - maxSplitBytes: Long, - partitionValues: InternalRow): Seq[PartitionedFile] = { - // Use reflection to invoke the relevant method according to the spark version - // See https://github.com/apache/datafusion-comet/issues/1572 - if (isSparkVersionAtLeast355) { - PartitionedFileUtil.getClass - .getMethod( - "splitFiles", - classOf[SparkSession], - classOf[FileStatusWithMetadata], - classOf[Path], - java.lang.Boolean.TYPE, - java.lang.Long.TYPE, - classOf[InternalRow]) - .invoke( - PartitionedFileUtil, - sparkSession, - file, - filePath, - java.lang.Boolean.valueOf(isSplitable), - java.lang.Long.valueOf(maxSplitBytes), - partitionValues) - .asInstanceOf[Seq[PartitionedFile]] - } else { - PartitionedFileUtil.getClass - .getMethod( - "splitFiles", - classOf[SparkSession], - classOf[FileStatusWithMetadata], - java.lang.Boolean.TYPE, - java.lang.Long.TYPE, - classOf[InternalRow]) - .invoke( - PartitionedFileUtil, - sparkSession, - file, - java.lang.Boolean.valueOf(isSplitable), - java.lang.Long.valueOf(maxSplitBytes), - partitionValues) - .asInstanceOf[Seq[PartitionedFile]] - } - } - protected def getPushedDownFilters( relation: HadoopFsRelation, dataFilters: Seq[Expression]): Seq[Filter] = { diff --git a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala index 3d9b963a93..82f5439160 100644 --- a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala +++ b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala @@ -19,11 +19,9 @@ package org.apache.spark.sql.comet.shims -import org.apache.hadoop.fs.Path -import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, FileSourceConstantMetadataAttribute, Literal} -import org.apache.spark.sql.execution.{FileSourceScanExec, PartitionedFileUtil, ScalarSubquery} +import org.apache.spark.sql.execution.{FileSourceScanExec, ScalarSubquery} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions import org.apache.spark.sql.sources.Filter @@ -51,23 +49,6 @@ trait ShimCometScanExec extends ShimStreamSourceAwareSparkPlan { options) } - // see SPARK-39634 - protected def isNeededForSchema(sparkSchema: StructType): Boolean = false - - protected def getPartitionedFile( - f: FileStatusWithMetadata, - p: PartitionDirectory): PartitionedFile = - PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values, 0, f.getLen) - - protected def splitFiles( - sparkSession: SparkSession, - file: FileStatusWithMetadata, - filePath: Path, - isSplitable: Boolean, - maxSplitBytes: Long, - partitionValues: InternalRow): Seq[PartitionedFile] = - PartitionedFileUtil.splitFiles(file, filePath, isSplitable, maxSplitBytes, partitionValues) - protected def getPushedDownFilters( relation: HadoopFsRelation, dataFilters: Seq[Expression]): Seq[Filter] = { diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala index b8db737a3c..7be5343a52 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala @@ -66,8 +66,8 @@ class ParquetReadFromFakeHadoopFsSuite extends CometTestBase with AdaptiveSparkP p } assert(scans.size == 1) - // File partitions are now accessed from the scan field, not from the protobuf - val filePartitions = scans.head.scan.getFilePartitions() + // File partitions are now accessed from the planner field + val filePartitions = scans.head.planner.getFilePartitions() assert(filePartitions.nonEmpty) assert( filePartitions.head.files.head.filePath.toString From 82a71905e716e5c7839f292f4b7de8805f4c6d27 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 18 Mar 2026 08:28:31 -0600 Subject: [PATCH 3/3] style: fix scalastyle and spotless formatting --- .../spark/sql/comet/CometNativeScanExec.scala | 2 +- .../sql/comet/FilePartitionPlanner.scala | 24 ++++++------------- .../shims/ShimFilePartitionPlanner.scala | 8 ++----- .../shims/ShimFilePartitionPlanner.scala | 7 +----- 4 files changed, 11 insertions(+), 30 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala index 01f6b049b6..4bfa9880b7 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala @@ -67,7 +67,7 @@ case class CometNativeScanExec( disableBucketedScan: Boolean = false, originalPlan: FileSourceScanExec, override val serializedPlanOpt: SerializedPlan, - @transient planner: FilePartitionPlanner, // Lazy access to file partitions without serializing with plan + @transient planner: FilePartitionPlanner, sourceKey: String) // Key for PlanDataInjector to match common+partition data at runtime extends CometLeafExec with DataSourceScanExec diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/FilePartitionPlanner.scala b/spark/src/main/scala/org/apache/spark/sql/comet/FilePartitionPlanner.scala index cd80e7e65c..599ee9304b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/FilePartitionPlanner.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/FilePartitionPlanner.scala @@ -60,9 +60,7 @@ class FilePartitionPlanner( val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L) val startTime = System.nanoTime() val ret = - relation.location.listFiles( - partitionFilters.filterNot(isDynamicPruningFilter), - dataFilters) + relation.location.listFiles(partitionFilters.filterNot(isDynamicPruningFilter), dataFilters) setFilesNumAndSizeMetric(ret, true) val timeTakenMs = NANOSECONDS.toMillis((System.nanoTime() - startTime) + optimizerMetadataTimeNs) @@ -115,12 +113,10 @@ class FilePartitionPlanner( * Send the driver-side metrics. Before calling this function, selectedPartitions has been * initialized. See SPARK-26327 for more details. */ - def sendDriverMetrics( - metricsMap: Map[String, SQLMetric], - sparkContext: SparkContext): Unit = { + def sendDriverMetrics(metricsMap: Map[String, SQLMetric], sparkContext: SparkContext): Unit = { accumulatedMetrics.foreach(e => metricsMap(e._1).add(e._2)) - val executionId = sparkContext.getLocalProperty( - org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY) + val executionId = + sparkContext.getLocalProperty(org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY) org.apache.spark.sql.execution.metric.SQLMetrics.postDriverMetricUpdates( sparkContext, executionId, @@ -166,8 +162,7 @@ class FilePartitionPlanner( .groupBy { f => BucketingUtils .getBucketId(new Path(f.filePath.toString()).getName) - .getOrElse( - throw QueryExecutionErrors.invalidBucketFile(f.filePath.toString())) + .getOrElse(throw QueryExecutionErrors.invalidBucketFile(f.filePath.toString())) } val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) { @@ -196,9 +191,7 @@ class FilePartitionPlanner( } .getOrElse { Seq.tabulate(bucketSpec.numBuckets) { bucketId => - FilePartition( - bucketId, - prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty)) + FilePartition(bucketId, prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty)) } } } @@ -256,9 +249,6 @@ class FilePartitionPlanner( } .sortBy(_.length)(implicitly[Ordering[Long]].reverse) - FilePartition.getFilePartitions( - relation.sparkSession, - splitFiles, - maxSplitBytes) + FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes) } } diff --git a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimFilePartitionPlanner.scala b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimFilePartitionPlanner.scala index 13005aebd3..1ae3c50389 100644 --- a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimFilePartitionPlanner.scala +++ b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimFilePartitionPlanner.scala @@ -36,8 +36,7 @@ trait ShimFilePartitionPlanner { VersionUtils.majorMinorPatchVersion(SPARK_VERSION_SHORT) match { case Some((major, minor, patch)) => (major, minor, patch) >= (3, 5, 5) case None => - throw new IllegalArgumentException( - s"Malformed Spark version: $SPARK_VERSION_SHORT") + throw new IllegalArgumentException(s"Malformed Spark version: $SPARK_VERSION_SHORT") } } @@ -60,10 +59,7 @@ trait ShimFilePartitionPlanner { .asInstanceOf[PartitionedFile] } else { PartitionedFileUtil.getClass - .getMethod( - "getPartitionedFile", - classOf[FileStatusWithMetadata], - classOf[InternalRow]) + .getMethod("getPartitionedFile", classOf[FileStatusWithMetadata], classOf[InternalRow]) .invoke(PartitionedFileUtil, f, p.values) .asInstanceOf[PartitionedFile] } diff --git a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimFilePartitionPlanner.scala b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimFilePartitionPlanner.scala index c05855c275..11b69053a4 100644 --- a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimFilePartitionPlanner.scala +++ b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimFilePartitionPlanner.scala @@ -43,10 +43,5 @@ trait ShimFilePartitionPlanner { isSplitable: Boolean, maxSplitBytes: Long, partitionValues: InternalRow): Seq[PartitionedFile] = - PartitionedFileUtil.splitFiles( - file, - filePath, - isSplitable, - maxSplitBytes, - partitionValues) + PartitionedFileUtil.splitFiles(file, filePath, isSplitable, maxSplitBytes, partitionValues) }